mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-16 18:43:18 +08:00
Fix remote txn heal logic
* A few tweaks to the remote txn resolution logic * Add logic to delete a specific record in remote_txn table by GID * Allow heal logic to move on to other cleanup if one specific GID fails * Do not rely on ongoing txns while cleaning up entries from remote_txn table Includes test case changes to try out various failure scenarios in the healing function. Fixes #3219
This commit is contained in:
parent
ea342f1396
commit
4cecdb50f9
@ -1369,7 +1369,7 @@ data_node_delete(PG_FUNCTION_ARGS)
|
|||||||
OP_DELETE);
|
OP_DELETE);
|
||||||
|
|
||||||
/* clean up persistent transaction records */
|
/* clean up persistent transaction records */
|
||||||
remote_txn_persistent_record_delete_for_data_node(server->serverid);
|
remote_txn_persistent_record_delete_for_data_node(server->serverid, NULL);
|
||||||
|
|
||||||
stmt = (DropStmt){
|
stmt = (DropStmt){
|
||||||
.type = T_DropStmt,
|
.type = T_DropStmt,
|
||||||
|
@ -6,6 +6,7 @@
|
|||||||
#include "libpq-fe.h"
|
#include "libpq-fe.h"
|
||||||
#include <postgres.h>
|
#include <postgres.h>
|
||||||
#include <access/xact.h>
|
#include <access/xact.h>
|
||||||
|
#include <storage/procarray.h>
|
||||||
#include <utils/builtins.h>
|
#include <utils/builtins.h>
|
||||||
#include <utils/snapmgr.h>
|
#include <utils/snapmgr.h>
|
||||||
#include <libpq-fe.h>
|
#include <libpq-fe.h>
|
||||||
@ -152,13 +153,17 @@ remote_txn_begin(RemoteTxn *entry, int curlevel)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Check if the access node transaction which is driving the 2PC on the datanodes is
|
||||||
|
* still in progress.
|
||||||
|
*/
|
||||||
bool
|
bool
|
||||||
remote_txn_is_still_in_progress(TransactionId access_node_xid)
|
remote_txn_is_still_in_progress_on_access_node(TransactionId access_node_xid)
|
||||||
{
|
{
|
||||||
if (TransactionIdIsCurrentTransactionId(access_node_xid))
|
if (TransactionIdIsCurrentTransactionId(access_node_xid))
|
||||||
elog(ERROR, "checking if a commit is still in progress on same txn");
|
elog(ERROR, "checking if a commit is still in progress on same txn");
|
||||||
|
|
||||||
return XidInMVCCSnapshot(access_node_xid, GetTransactionSnapshot());
|
return TransactionIdIsInProgress(access_node_xid);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t
|
size_t
|
||||||
@ -731,27 +736,43 @@ persistent_record_tuple_delete(TupleInfo *ti, void *data)
|
|||||||
return SCAN_CONTINUE;
|
return SCAN_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* If gid is NULL, then delete all entries belonging to the provided datanode. */
|
||||||
int
|
int
|
||||||
remote_txn_persistent_record_delete_for_data_node(Oid foreign_server_oid)
|
remote_txn_persistent_record_delete_for_data_node(Oid foreign_server_oid, const char *gid)
|
||||||
{
|
{
|
||||||
Catalog *catalog = ts_catalog_get();
|
Catalog *catalog = ts_catalog_get();
|
||||||
ScanKeyData scankey[1];
|
ScanKeyData scankey[1];
|
||||||
ScannerCtx scanctx;
|
ScannerCtx scanctx;
|
||||||
|
int scanidx;
|
||||||
ForeignServer *server = GetForeignServer(foreign_server_oid);
|
ForeignServer *server = GetForeignServer(foreign_server_oid);
|
||||||
|
|
||||||
ScanKeyInit(&scankey[0],
|
if (gid == NULL)
|
||||||
Anum_remote_txn_data_node_name_idx_data_node_name,
|
{
|
||||||
BTEqualStrategyNumber,
|
ScanKeyInit(&scankey[0],
|
||||||
F_NAMEEQ,
|
Anum_remote_txn_data_node_name_idx_data_node_name,
|
||||||
CStringGetDatum(server->servername));
|
BTEqualStrategyNumber,
|
||||||
|
F_NAMEEQ,
|
||||||
|
CStringGetDatum(server->servername));
|
||||||
|
scanidx = REMOTE_TXN_DATA_NODE_NAME_IDX;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ScanKeyInit(&scankey[0],
|
||||||
|
Anum_remote_txn_pkey_idx_remote_transaction_id,
|
||||||
|
BTEqualStrategyNumber,
|
||||||
|
F_TEXTEQ,
|
||||||
|
CStringGetTextDatum(gid));
|
||||||
|
scanidx = REMOTE_TXN_PKEY_IDX;
|
||||||
|
}
|
||||||
|
|
||||||
scanctx = (ScannerCtx){
|
scanctx = (ScannerCtx){
|
||||||
.table = catalog->tables[REMOTE_TXN].id,
|
.table = catalog->tables[REMOTE_TXN].id,
|
||||||
.index = catalog_get_index(catalog, REMOTE_TXN, REMOTE_TXN_DATA_NODE_NAME_IDX),
|
.index = catalog_get_index(catalog, REMOTE_TXN, scanidx),
|
||||||
.nkeys = 1,
|
.nkeys = 1,
|
||||||
.scankey = scankey,
|
.scankey = scankey,
|
||||||
.tuple_found = persistent_record_tuple_delete,
|
.tuple_found = persistent_record_tuple_delete,
|
||||||
.lockmode = RowExclusiveLock,
|
.lockmode = RowExclusiveLock,
|
||||||
|
.snapshot = GetTransactionSnapshot(),
|
||||||
.scandirection = ForwardScanDirection,
|
.scandirection = ForwardScanDirection,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -36,7 +36,7 @@ extern void remote_txn_set_will_prep_statement(RemoteTxn *entry,
|
|||||||
RemoteTxnPrepStmtOption prep_stmt_option);
|
RemoteTxnPrepStmtOption prep_stmt_option);
|
||||||
extern TSConnection *remote_txn_get_connection(RemoteTxn *txn);
|
extern TSConnection *remote_txn_get_connection(RemoteTxn *txn);
|
||||||
extern TSConnectionId remote_txn_get_connection_id(RemoteTxn *txn);
|
extern TSConnectionId remote_txn_get_connection_id(RemoteTxn *txn);
|
||||||
extern bool remote_txn_is_still_in_progress(TransactionId access_node_xid);
|
extern bool remote_txn_is_still_in_progress_on_access_node(TransactionId access_node_xid);
|
||||||
extern size_t remote_txn_size(void);
|
extern size_t remote_txn_size(void);
|
||||||
extern bool remote_txn_is_at_sub_txn_level(RemoteTxn *entry, int curlevel);
|
extern bool remote_txn_is_at_sub_txn_level(RemoteTxn *entry, int curlevel);
|
||||||
extern bool remote_txn_is_ongoing(RemoteTxn *entry);
|
extern bool remote_txn_is_ongoing(RemoteTxn *entry);
|
||||||
@ -50,7 +50,8 @@ extern void remote_txn_report_prepare_transaction_result(RemoteTxn *txn, bool su
|
|||||||
/* Persitent record */
|
/* Persitent record */
|
||||||
extern RemoteTxnId *remote_txn_persistent_record_write(TSConnectionId id);
|
extern RemoteTxnId *remote_txn_persistent_record_write(TSConnectionId id);
|
||||||
extern bool remote_txn_persistent_record_exists(const RemoteTxnId *gid);
|
extern bool remote_txn_persistent_record_exists(const RemoteTxnId *gid);
|
||||||
extern int remote_txn_persistent_record_delete_for_data_node(Oid foreign_server_oid);
|
extern int remote_txn_persistent_record_delete_for_data_node(Oid foreign_server_oid,
|
||||||
|
const char *gid);
|
||||||
|
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
/* Debugging functions used in testing */
|
/* Debugging functions used in testing */
|
||||||
|
@ -17,14 +17,15 @@
|
|||||||
|
|
||||||
#define GID_SEP "-"
|
#define GID_SEP "-"
|
||||||
|
|
||||||
#define GID_PREFIX "ts"
|
/* The separator is part of the GID prefix */
|
||||||
|
#define GID_PREFIX "ts-"
|
||||||
/* This is the maximum size of the literal accepted by PREPARE TRANSACTION, etc. */
|
/* This is the maximum size of the literal accepted by PREPARE TRANSACTION, etc. */
|
||||||
#define GID_MAX_SIZE 200
|
#define GID_MAX_SIZE 200
|
||||||
|
|
||||||
#define REMOTE_TXN_ID_VERSION ((uint8) 1)
|
#define REMOTE_TXN_ID_VERSION ((uint8) 1)
|
||||||
|
|
||||||
/* current_pattern: ts-version-xid-server_id-user_id */
|
/* current_pattern: ts-version-xid-server_id-user_id */
|
||||||
#define FMT_PATTERN GID_PREFIX GID_SEP "%hhu" GID_SEP "%u" GID_SEP "%u" GID_SEP "%u"
|
#define FMT_PATTERN GID_PREFIX "%hhu" GID_SEP "%u" GID_SEP "%u" GID_SEP "%u"
|
||||||
|
|
||||||
static char *
|
static char *
|
||||||
remote_txn_id_get_sql(const char *command, RemoteTxnId *id)
|
remote_txn_id_get_sql(const char *command, RemoteTxnId *id)
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
*/
|
*/
|
||||||
#include <postgres.h>
|
#include <postgres.h>
|
||||||
#include <utils/fmgrprotos.h>
|
#include <utils/fmgrprotos.h>
|
||||||
|
#include <utils/guc.h>
|
||||||
#include <utils/snapmgr.h>
|
#include <utils/snapmgr.h>
|
||||||
#include <utils/fmgroids.h>
|
#include <utils/fmgroids.h>
|
||||||
#include <access/xact.h>
|
#include <access/xact.h>
|
||||||
@ -18,13 +19,24 @@
|
|||||||
RemoteTxnResolution
|
RemoteTxnResolution
|
||||||
remote_txn_resolution(Oid foreign_server, const RemoteTxnId *transaction_id)
|
remote_txn_resolution(Oid foreign_server, const RemoteTxnId *transaction_id)
|
||||||
{
|
{
|
||||||
if (remote_txn_is_still_in_progress(transaction_id->xid))
|
if (remote_txn_is_still_in_progress_on_access_node(transaction_id->xid))
|
||||||
/* transaction still ongoing; don't know it's state */
|
/* transaction still ongoing; don't know its state */
|
||||||
return REMOTE_TXN_RESOLUTION_UNKNOWN;
|
return REMOTE_TXN_RESOLUTION_IN_PROGRESS;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If an entry exists in the "remote_txn" table and is visible then it means
|
||||||
|
* that the transaction committed on the AN
|
||||||
|
*/
|
||||||
if (remote_txn_persistent_record_exists(transaction_id))
|
if (remote_txn_persistent_record_exists(transaction_id))
|
||||||
return REMOTE_TXN_RESOLUTION_COMMT;
|
return REMOTE_TXN_RESOLUTION_COMMIT;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If the txn is not in progress and is not committed as per the "remote_txn"
|
||||||
|
* table then it's presumed to be aborted.
|
||||||
|
*
|
||||||
|
* We could ask PG machinery to confirm the abort but as long as we are sticking
|
||||||
|
* to one uniform behavior consistently it should be ok for now.
|
||||||
|
*/
|
||||||
return REMOTE_TXN_RESOLUTION_ABORT;
|
return REMOTE_TXN_RESOLUTION_ABORT;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -58,8 +70,11 @@ remote_txn_heal_data_node(PG_FUNCTION_ARGS)
|
|||||||
*/
|
*/
|
||||||
PGresult *res;
|
PGresult *res;
|
||||||
int row;
|
int row;
|
||||||
List *unknown_txn_gid = NIL;
|
List *in_progress_txn_gids = NIL, *healed_txn_gids = NIL;
|
||||||
int non_ts_txns = 0;
|
int non_ts_txns = 0, ntuples;
|
||||||
|
#ifdef TS_DEBUG
|
||||||
|
int n_gid_errors = 0; /* how many errors to induce? */
|
||||||
|
#endif
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This function cannot be called inside a transaction block since effects
|
* This function cannot be called inside a transaction block since effects
|
||||||
@ -70,9 +85,10 @@ remote_txn_heal_data_node(PG_FUNCTION_ARGS)
|
|||||||
res = remote_connection_query_ok(conn, GET_PREPARED_XACT_SQL);
|
res = remote_connection_query_ok(conn, GET_PREPARED_XACT_SQL);
|
||||||
|
|
||||||
Assert(1 == PQnfields(res));
|
Assert(1 == PQnfields(res));
|
||||||
for (row = 0; row < PQntuples(res); row++)
|
ntuples = PQntuples(res);
|
||||||
|
for (row = 0; row < ntuples; row++)
|
||||||
{
|
{
|
||||||
const char *id_string = PQgetvalue(res, row, 0);
|
char *id_string = PQgetvalue(res, row, 0);
|
||||||
RemoteTxnId *tpc_gid;
|
RemoteTxnId *tpc_gid;
|
||||||
RemoteTxnResolution resolution;
|
RemoteTxnResolution resolution;
|
||||||
|
|
||||||
@ -85,18 +101,101 @@ remote_txn_heal_data_node(PG_FUNCTION_ARGS)
|
|||||||
tpc_gid = remote_txn_id_in(id_string);
|
tpc_gid = remote_txn_id_in(id_string);
|
||||||
resolution = remote_txn_resolution(foreign_server_oid, tpc_gid);
|
resolution = remote_txn_resolution(foreign_server_oid, tpc_gid);
|
||||||
|
|
||||||
|
#ifdef TS_DEBUG
|
||||||
|
/*
|
||||||
|
* Induce an error in the GID so that the remote side errors out when it tries
|
||||||
|
* to heal it.
|
||||||
|
*
|
||||||
|
* We inject the error by checking the value of the below session variable. Not
|
||||||
|
* a full GUC, just a tool to allow us to randomly inject error for testing
|
||||||
|
* purposes. Depending on the value we will inject an error in the GID and also
|
||||||
|
* additionally change the resolution as per the accepted value:
|
||||||
|
*
|
||||||
|
* "commit" : change GID + set resolution as COMMITTED
|
||||||
|
* "abort" : change GID + set resolution as ABORTED
|
||||||
|
* "inprogress" : set resolution as IN_PROGRESS
|
||||||
|
*
|
||||||
|
* Any other setting will not have any effect
|
||||||
|
*
|
||||||
|
* We currently induce error in one GID processing. If needed this can be
|
||||||
|
* changed in the future via another session variable to set to a specific
|
||||||
|
* number of errors to induce. Note that this variable is incremented only
|
||||||
|
* for valid values of "timescaledb.debug_inject_gid_error.
|
||||||
|
*
|
||||||
|
* Current logic also means that the first GID being processed will always
|
||||||
|
* induce a change in resolution behavior. But that's ok, we could randomize
|
||||||
|
* it later to any arbitrary integer value less than ntuples in the future.
|
||||||
|
*/
|
||||||
|
if (n_gid_errors < 1)
|
||||||
|
{
|
||||||
|
const char *inject_gid_error =
|
||||||
|
GetConfigOption("timescaledb.debug_inject_gid_error", true, false);
|
||||||
|
|
||||||
|
/* increment the user_id field to cause mismatch in GID */
|
||||||
|
if (inject_gid_error)
|
||||||
|
{
|
||||||
|
if (strcmp(inject_gid_error, "abort") == 0)
|
||||||
|
{
|
||||||
|
tpc_gid->id.user_id++;
|
||||||
|
resolution = REMOTE_TXN_RESOLUTION_ABORT;
|
||||||
|
n_gid_errors++;
|
||||||
|
}
|
||||||
|
else if (strcmp(inject_gid_error, "commit") == 0)
|
||||||
|
{
|
||||||
|
tpc_gid->id.user_id++;
|
||||||
|
resolution = REMOTE_TXN_RESOLUTION_COMMIT;
|
||||||
|
n_gid_errors++;
|
||||||
|
}
|
||||||
|
else if (strcmp(inject_gid_error, "inprogress") == 0)
|
||||||
|
{
|
||||||
|
resolution = REMOTE_TXN_RESOLUTION_IN_PROGRESS;
|
||||||
|
n_gid_errors++;
|
||||||
|
}
|
||||||
|
/* any other value is simply ignored, n_gid_errors is also not incremented */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
/*
|
||||||
|
* We don't expect these commands to fail, but if they do, continue and move on to
|
||||||
|
* healing up the next GID in the list. The ones that failed will get retried if
|
||||||
|
* they are still around on the datanodes the next time over.
|
||||||
|
*/
|
||||||
switch (resolution)
|
switch (resolution)
|
||||||
{
|
{
|
||||||
case REMOTE_TXN_RESOLUTION_COMMT:
|
case REMOTE_TXN_RESOLUTION_COMMIT:
|
||||||
remote_connection_cmd_ok(conn, remote_txn_id_commit_prepared_sql(tpc_gid));
|
if (PQresultStatus(
|
||||||
resolved++;
|
remote_connection_exec(conn, remote_txn_id_commit_prepared_sql(tpc_gid))) ==
|
||||||
|
PGRES_COMMAND_OK)
|
||||||
|
{
|
||||||
|
healed_txn_gids = lappend(healed_txn_gids, id_string);
|
||||||
|
resolved++;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
ereport(WARNING,
|
||||||
|
(errmsg("could not commit prepared transaction on data node \"%s\"",
|
||||||
|
remote_connection_node_name(conn)),
|
||||||
|
errhint("To retry, manually run \"COMMIT PREPARED %s\" on the data "
|
||||||
|
"node or run the healing function again.",
|
||||||
|
id_string)));
|
||||||
break;
|
break;
|
||||||
case REMOTE_TXN_RESOLUTION_ABORT:
|
case REMOTE_TXN_RESOLUTION_ABORT:
|
||||||
remote_connection_cmd_ok(conn, remote_txn_id_rollback_prepared_sql(tpc_gid));
|
if (PQresultStatus(remote_connection_exec(conn,
|
||||||
resolved++;
|
remote_txn_id_rollback_prepared_sql(
|
||||||
|
tpc_gid))) == PGRES_COMMAND_OK)
|
||||||
|
{
|
||||||
|
healed_txn_gids = lappend(healed_txn_gids, id_string);
|
||||||
|
resolved++;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
ereport(WARNING,
|
||||||
|
(errmsg("could not roll back prepared transaction on data node \"%s\"",
|
||||||
|
remote_connection_node_name(conn)),
|
||||||
|
errhint("To retry, manually run \"ROLLBACK PREPARED %s\" on the data "
|
||||||
|
"node or run the healing function again.",
|
||||||
|
id_string)));
|
||||||
break;
|
break;
|
||||||
case REMOTE_TXN_RESOLUTION_UNKNOWN:
|
case REMOTE_TXN_RESOLUTION_IN_PROGRESS:
|
||||||
unknown_txn_gid = lappend(unknown_txn_gid, tpc_gid);
|
in_progress_txn_gids = lappend(in_progress_txn_gids, id_string);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -107,10 +206,24 @@ remote_txn_heal_data_node(PG_FUNCTION_ARGS)
|
|||||||
remote_result_close(res);
|
remote_result_close(res);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Perform cleanup of all records if there are no unknown txns.
|
* Perform cleanup of all records if there are no in progress txns and if the number of
|
||||||
|
* resolved entities is same as the number of rows obtained from the datanode.
|
||||||
|
*
|
||||||
|
* In a heavily loaded system there's a possibility of ongoing transactions always being
|
||||||
|
* present in which case we will never get a chance to clean up entries in "remote_txn"
|
||||||
|
* table. So, we track healed gids in a list and delete those specific rows to keep the
|
||||||
|
* "remote_txn" table from growing up indefinitely.
|
||||||
*/
|
*/
|
||||||
if (list_length(unknown_txn_gid) == 0)
|
if (list_length(in_progress_txn_gids) == 0 && resolved == ntuples)
|
||||||
remote_txn_persistent_record_delete_for_data_node(foreign_server_oid);
|
remote_txn_persistent_record_delete_for_data_node(foreign_server_oid, NULL);
|
||||||
|
else if (resolved)
|
||||||
|
{
|
||||||
|
ListCell *lc;
|
||||||
|
Assert(healed_txn_gids != NIL);
|
||||||
|
|
||||||
|
foreach (lc, healed_txn_gids)
|
||||||
|
remote_txn_persistent_record_delete_for_data_node(foreign_server_oid, lfirst(lc));
|
||||||
|
}
|
||||||
|
|
||||||
remote_connection_close(conn);
|
remote_connection_close(conn);
|
||||||
PG_RETURN_INT32(resolved);
|
PG_RETURN_INT32(resolved);
|
||||||
|
@ -39,7 +39,8 @@
|
|||||||
* on the access node:
|
* on the access node:
|
||||||
*
|
*
|
||||||
* Case 1 - The transaction is ongoing:
|
* Case 1 - The transaction is ongoing:
|
||||||
* In this case the state of the remote transaction is unknown (REMOTE_TXN_RESOLVE_UNKNOWN)
|
* In this case the state of the remote transaction is in progress
|
||||||
|
*(REMOTE_TXN_RESOLVE_IN_PROGRESS)
|
||||||
*
|
*
|
||||||
* Case 2 - The transaction is committed:
|
* Case 2 - The transaction is committed:
|
||||||
* The remote transaction MUST BE be committed (REMOTE_TXN_RESOLVE_COMMT)
|
* The remote transaction MUST BE be committed (REMOTE_TXN_RESOLVE_COMMT)
|
||||||
@ -74,9 +75,9 @@
|
|||||||
|
|
||||||
typedef enum RemoteTxnResolution
|
typedef enum RemoteTxnResolution
|
||||||
{
|
{
|
||||||
REMOTE_TXN_RESOLUTION_UNKNOWN = 0,
|
REMOTE_TXN_RESOLUTION_IN_PROGRESS = 0,
|
||||||
REMOTE_TXN_RESOLUTION_ABORT,
|
REMOTE_TXN_RESOLUTION_ABORT,
|
||||||
REMOTE_TXN_RESOLUTION_COMMT
|
REMOTE_TXN_RESOLUTION_COMMIT
|
||||||
} RemoteTxnResolution;
|
} RemoteTxnResolution;
|
||||||
|
|
||||||
extern RemoteTxnResolution remote_txn_resolution(Oid foreign_server,
|
extern RemoteTxnResolution remote_txn_resolution(Oid foreign_server,
|
||||||
|
@ -6,6 +6,10 @@ CREATE OR REPLACE FUNCTION create_records()
|
|||||||
RETURNS VOID
|
RETURNS VOID
|
||||||
AS :TSL_MODULE_PATHNAME, 'ts_test_remote_txn_resolve_create_records'
|
AS :TSL_MODULE_PATHNAME, 'ts_test_remote_txn_resolve_create_records'
|
||||||
LANGUAGE C;
|
LANGUAGE C;
|
||||||
|
CREATE OR REPLACE FUNCTION create_prepared_record()
|
||||||
|
RETURNS VOID
|
||||||
|
AS :TSL_MODULE_PATHNAME, 'ts_test_remote_txn_resolve_create_prepared_record'
|
||||||
|
LANGUAGE C;
|
||||||
CREATE OR REPLACE FUNCTION create_records_with_concurrent_heal()
|
CREATE OR REPLACE FUNCTION create_records_with_concurrent_heal()
|
||||||
RETURNS VOID
|
RETURNS VOID
|
||||||
AS :TSL_MODULE_PATHNAME, 'ts_test_remote_txn_resolve_create_records_with_concurrent_heal'
|
AS :TSL_MODULE_PATHNAME, 'ts_test_remote_txn_resolve_create_records_with_concurrent_heal'
|
||||||
@ -137,6 +141,187 @@ SELECT count(*) FROM _timescaledb_catalog.remote_txn;
|
|||||||
1
|
1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_records();
|
||||||
|
create_records
|
||||||
|
----------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- create an additional prepared entry. This will allow us to test heal behavior when one
|
||||||
|
-- attempt errors out and when the other should succeed. The debug_inject_gid_error logic
|
||||||
|
-- only induces one error for now. This can be modified later as desired via another
|
||||||
|
-- session variable.
|
||||||
|
SELECT create_prepared_record();
|
||||||
|
create_prepared_record
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
--inject errors in the GID and test "commit" resolution for it
|
||||||
|
SET timescaledb.debug_inject_gid_error TO 'commit';
|
||||||
|
--heal should error out and the prepared transaction should still be visible
|
||||||
|
SELECT _timescaledb_internal.remote_txn_heal_data_node((SELECT OID FROM pg_foreign_server WHERE srvname = 'loopback2'));
|
||||||
|
WARNING: could not commit prepared transaction on data node "loopback2"
|
||||||
|
remote_txn_heal_data_node
|
||||||
|
---------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM table_modified_by_txns;
|
||||||
|
describes
|
||||||
|
--------------------------------
|
||||||
|
committed
|
||||||
|
prepared not comitted
|
||||||
|
committed with concurrent heal
|
||||||
|
committed
|
||||||
|
prepared not comitted
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_prepared_xacts;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM _timescaledb_catalog.remote_txn;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Again process 2 records where one errors out and other succeeds
|
||||||
|
SELECT create_prepared_record();
|
||||||
|
create_prepared_record
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
--inject errors in the GID and test "abort" resolution for it
|
||||||
|
SET timescaledb.debug_inject_gid_error TO 'abort';
|
||||||
|
--heal should error out and the prepared transaction should still be visible
|
||||||
|
SELECT _timescaledb_internal.remote_txn_heal_data_node((SELECT OID FROM pg_foreign_server WHERE srvname = 'loopback2'));
|
||||||
|
WARNING: could not roll back prepared transaction on data node "loopback2"
|
||||||
|
remote_txn_heal_data_node
|
||||||
|
---------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM table_modified_by_txns;
|
||||||
|
describes
|
||||||
|
--------------------------------
|
||||||
|
committed
|
||||||
|
prepared not comitted
|
||||||
|
committed with concurrent heal
|
||||||
|
committed
|
||||||
|
prepared not comitted
|
||||||
|
prepared not comitted
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_prepared_xacts;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM _timescaledb_catalog.remote_txn;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Again process 2 records where one errors out and other succeeds
|
||||||
|
SELECT create_prepared_record();
|
||||||
|
create_prepared_record
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
--test "inprogress" resolution for the prepared 2PC
|
||||||
|
SET timescaledb.debug_inject_gid_error TO 'inprogress';
|
||||||
|
--heal will not error out but the prepared transaction should still be visible
|
||||||
|
SELECT _timescaledb_internal.remote_txn_heal_data_node((SELECT OID FROM pg_foreign_server WHERE srvname = 'loopback2'));
|
||||||
|
remote_txn_heal_data_node
|
||||||
|
---------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM table_modified_by_txns;
|
||||||
|
describes
|
||||||
|
--------------------------------
|
||||||
|
committed
|
||||||
|
prepared not comitted
|
||||||
|
committed with concurrent heal
|
||||||
|
committed
|
||||||
|
prepared not comitted
|
||||||
|
prepared not comitted
|
||||||
|
prepared not comitted
|
||||||
|
(7 rows)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_prepared_xacts;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM _timescaledb_catalog.remote_txn;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Again process 2 records where one errors out and other succeeds
|
||||||
|
SELECT create_prepared_record();
|
||||||
|
create_prepared_record
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
--set to any random value so that it does not have any effect and allows healing
|
||||||
|
SET timescaledb.debug_inject_gid_error TO 'ignored';
|
||||||
|
SELECT _timescaledb_internal.remote_txn_heal_data_node((SELECT OID FROM pg_foreign_server WHERE srvname = 'loopback'));
|
||||||
|
remote_txn_heal_data_node
|
||||||
|
---------------------------
|
||||||
|
2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT _timescaledb_internal.remote_txn_heal_data_node((SELECT OID FROM pg_foreign_server WHERE srvname = 'loopback2'));
|
||||||
|
remote_txn_heal_data_node
|
||||||
|
---------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT _timescaledb_internal.remote_txn_heal_data_node((SELECT OID FROM pg_foreign_server WHERE srvname = 'loopback3'));
|
||||||
|
remote_txn_heal_data_node
|
||||||
|
---------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM table_modified_by_txns;
|
||||||
|
describes
|
||||||
|
--------------------------------
|
||||||
|
committed
|
||||||
|
prepared not comitted
|
||||||
|
committed with concurrent heal
|
||||||
|
committed
|
||||||
|
prepared not comitted
|
||||||
|
prepared not comitted
|
||||||
|
prepared not comitted
|
||||||
|
prepared not comitted
|
||||||
|
prepared not comitted
|
||||||
|
(9 rows)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_prepared_xacts;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM _timescaledb_catalog.remote_txn;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
--test that it is safe to have non-ts prepared-txns with heal
|
--test that it is safe to have non-ts prepared-txns with heal
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO public.table_modified_by_txns VALUES ('non-ts-txn');
|
INSERT INTO public.table_modified_by_txns VALUES ('non-ts-txn');
|
||||||
@ -169,8 +354,14 @@ SELECT * FROM table_modified_by_txns;
|
|||||||
committed
|
committed
|
||||||
prepared not comitted
|
prepared not comitted
|
||||||
committed with concurrent heal
|
committed with concurrent heal
|
||||||
|
committed
|
||||||
|
prepared not comitted
|
||||||
|
prepared not comitted
|
||||||
|
prepared not comitted
|
||||||
|
prepared not comitted
|
||||||
|
prepared not comitted
|
||||||
non-ts-txn
|
non-ts-txn
|
||||||
(4 rows)
|
(10 rows)
|
||||||
|
|
||||||
SELECT count(*) FROM pg_prepared_xacts;
|
SELECT count(*) FROM pg_prepared_xacts;
|
||||||
count
|
count
|
||||||
|
@ -9,6 +9,11 @@ RETURNS VOID
|
|||||||
AS :TSL_MODULE_PATHNAME, 'ts_test_remote_txn_resolve_create_records'
|
AS :TSL_MODULE_PATHNAME, 'ts_test_remote_txn_resolve_create_records'
|
||||||
LANGUAGE C;
|
LANGUAGE C;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION create_prepared_record()
|
||||||
|
RETURNS VOID
|
||||||
|
AS :TSL_MODULE_PATHNAME, 'ts_test_remote_txn_resolve_create_prepared_record'
|
||||||
|
LANGUAGE C;
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION create_records_with_concurrent_heal()
|
CREATE OR REPLACE FUNCTION create_records_with_concurrent_heal()
|
||||||
RETURNS VOID
|
RETURNS VOID
|
||||||
AS :TSL_MODULE_PATHNAME, 'ts_test_remote_txn_resolve_create_records_with_concurrent_heal'
|
AS :TSL_MODULE_PATHNAME, 'ts_test_remote_txn_resolve_create_records_with_concurrent_heal'
|
||||||
@ -61,6 +66,51 @@ SELECT * FROM table_modified_by_txns;
|
|||||||
SELECT count(*) FROM pg_prepared_xacts;
|
SELECT count(*) FROM pg_prepared_xacts;
|
||||||
SELECT count(*) FROM _timescaledb_catalog.remote_txn;
|
SELECT count(*) FROM _timescaledb_catalog.remote_txn;
|
||||||
|
|
||||||
|
SELECT create_records();
|
||||||
|
-- create an additional prepared entry. This will allow us to test heal behavior when one
|
||||||
|
-- attempt errors out and when the other should succeed. The debug_inject_gid_error logic
|
||||||
|
-- only induces one error for now. This can be modified later as desired via another
|
||||||
|
-- session variable.
|
||||||
|
SELECT create_prepared_record();
|
||||||
|
--inject errors in the GID and test "commit" resolution for it
|
||||||
|
SET timescaledb.debug_inject_gid_error TO 'commit';
|
||||||
|
--heal should error out and the prepared transaction should still be visible
|
||||||
|
SELECT _timescaledb_internal.remote_txn_heal_data_node((SELECT OID FROM pg_foreign_server WHERE srvname = 'loopback2'));
|
||||||
|
SELECT * FROM table_modified_by_txns;
|
||||||
|
SELECT count(*) FROM pg_prepared_xacts;
|
||||||
|
SELECT count(*) FROM _timescaledb_catalog.remote_txn;
|
||||||
|
|
||||||
|
-- Again process 2 records where one errors out and other succeeds
|
||||||
|
SELECT create_prepared_record();
|
||||||
|
--inject errors in the GID and test "abort" resolution for it
|
||||||
|
SET timescaledb.debug_inject_gid_error TO 'abort';
|
||||||
|
--heal should error out and the prepared transaction should still be visible
|
||||||
|
SELECT _timescaledb_internal.remote_txn_heal_data_node((SELECT OID FROM pg_foreign_server WHERE srvname = 'loopback2'));
|
||||||
|
SELECT * FROM table_modified_by_txns;
|
||||||
|
SELECT count(*) FROM pg_prepared_xacts;
|
||||||
|
SELECT count(*) FROM _timescaledb_catalog.remote_txn;
|
||||||
|
|
||||||
|
-- Again process 2 records where one errors out and other succeeds
|
||||||
|
SELECT create_prepared_record();
|
||||||
|
--test "inprogress" resolution for the prepared 2PC
|
||||||
|
SET timescaledb.debug_inject_gid_error TO 'inprogress';
|
||||||
|
--heal will not error out but the prepared transaction should still be visible
|
||||||
|
SELECT _timescaledb_internal.remote_txn_heal_data_node((SELECT OID FROM pg_foreign_server WHERE srvname = 'loopback2'));
|
||||||
|
SELECT * FROM table_modified_by_txns;
|
||||||
|
SELECT count(*) FROM pg_prepared_xacts;
|
||||||
|
SELECT count(*) FROM _timescaledb_catalog.remote_txn;
|
||||||
|
|
||||||
|
-- Again process 2 records where one errors out and other succeeds
|
||||||
|
SELECT create_prepared_record();
|
||||||
|
--set to any random value so that it does not have any effect and allows healing
|
||||||
|
SET timescaledb.debug_inject_gid_error TO 'ignored';
|
||||||
|
SELECT _timescaledb_internal.remote_txn_heal_data_node((SELECT OID FROM pg_foreign_server WHERE srvname = 'loopback'));
|
||||||
|
SELECT _timescaledb_internal.remote_txn_heal_data_node((SELECT OID FROM pg_foreign_server WHERE srvname = 'loopback2'));
|
||||||
|
SELECT _timescaledb_internal.remote_txn_heal_data_node((SELECT OID FROM pg_foreign_server WHERE srvname = 'loopback3'));
|
||||||
|
SELECT * FROM table_modified_by_txns;
|
||||||
|
SELECT count(*) FROM pg_prepared_xacts;
|
||||||
|
SELECT count(*) FROM _timescaledb_catalog.remote_txn;
|
||||||
|
|
||||||
--test that it is safe to have non-ts prepared-txns with heal
|
--test that it is safe to have non-ts prepared-txns with heal
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO public.table_modified_by_txns VALUES ('non-ts-txn');
|
INSERT INTO public.table_modified_by_txns VALUES ('non-ts-txn');
|
||||||
|
@ -23,10 +23,17 @@ test_basic_persistent_record(TSConnectionId cid)
|
|||||||
RemoteTxnId *id = remote_txn_id_create(GetTopTransactionId(), cid);
|
RemoteTxnId *id = remote_txn_id_create(GetTopTransactionId(), cid);
|
||||||
|
|
||||||
TestAssertTrue(!remote_txn_persistent_record_exists(id));
|
TestAssertTrue(!remote_txn_persistent_record_exists(id));
|
||||||
|
|
||||||
remote_txn_persistent_record_write(cid);
|
remote_txn_persistent_record_write(cid);
|
||||||
TestAssertTrue(remote_txn_persistent_record_exists(id));
|
TestAssertTrue(remote_txn_persistent_record_exists(id));
|
||||||
|
/* delete by just specifying the data node */
|
||||||
|
remote_txn_persistent_record_delete_for_data_node(cid.server_id, NULL);
|
||||||
|
TestAssertTrue(!remote_txn_persistent_record_exists(id));
|
||||||
|
|
||||||
remote_txn_persistent_record_delete_for_data_node(cid.server_id);
|
remote_txn_persistent_record_write(cid);
|
||||||
|
TestAssertTrue(remote_txn_persistent_record_exists(id));
|
||||||
|
/* delete by specifying the exact GID */
|
||||||
|
remote_txn_persistent_record_delete_for_data_node(cid.server_id, remote_txn_id_out(id));
|
||||||
TestAssertTrue(!remote_txn_persistent_record_exists(id));
|
TestAssertTrue(!remote_txn_persistent_record_exists(id));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
#include "test_utils.h"
|
#include "test_utils.h"
|
||||||
|
|
||||||
TS_FUNCTION_INFO_V1(ts_test_remote_txn_resolve_create_records);
|
TS_FUNCTION_INFO_V1(ts_test_remote_txn_resolve_create_records);
|
||||||
|
TS_FUNCTION_INFO_V1(ts_test_remote_txn_resolve_create_prepared_record);
|
||||||
TS_FUNCTION_INFO_V1(ts_test_remote_txn_resolve_create_records_with_concurrent_heal);
|
TS_FUNCTION_INFO_V1(ts_test_remote_txn_resolve_create_records_with_concurrent_heal);
|
||||||
|
|
||||||
static RemoteTxn *
|
static RemoteTxn *
|
||||||
@ -68,6 +69,18 @@ ts_test_remote_txn_resolve_create_records(PG_FUNCTION_ARGS)
|
|||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* create an additional prepared gid in a separate transaction */
|
||||||
|
Datum
|
||||||
|
ts_test_remote_txn_resolve_create_prepared_record(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
TSConnectionId id;
|
||||||
|
|
||||||
|
id.server_id = GetForeignServerByName("loopback", false)->serverid;
|
||||||
|
create_prepared_txn(&id);
|
||||||
|
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
send_heal()
|
send_heal()
|
||||||
{
|
{
|
||||||
|
Loading…
x
Reference in New Issue
Block a user