diff --git a/sql/ddl_experimental.sql b/sql/ddl_experimental.sql index c3e66bfa5..e79280dde 100644 --- a/sql/ddl_experimental.sql +++ b/sql/ddl_experimental.sql @@ -37,3 +37,12 @@ CREATE OR REPLACE PROCEDURE timescaledb_experimental.copy_chunk( source_node NAME = NULL, destination_node NAME = NULL) AS '@MODULE_PATHNAME@', 'ts_copy_chunk_proc' LANGUAGE C; + +-- A copy_chunk or move_chunk procedure call involves multiple nodes and +-- depending on the data size can take a long time. Failures are possible +-- when this long running activity is ongoing. We need to be able to recover +-- and cleanup such failed chunk copy/move activities and it's done via this +-- procedure +CREATE OR REPLACE PROCEDURE timescaledb_experimental.cleanup_copy_chunk_operation( + operation_id NAME) +AS '@MODULE_PATHNAME@', 'ts_copy_chunk_cleanup_proc' LANGUAGE C; diff --git a/sql/pre_install/tables.sql b/sql/pre_install/tables.sql index 81868276e..bdfd68cd4 100644 --- a/sql/pre_install/tables.sql +++ b/sql/pre_install/tables.sql @@ -385,11 +385,10 @@ SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.remote_txn', '' -- carry over chunk copy/move operations from earlier (if it makes sense at all) -- -CREATE SEQUENCE IF NOT EXISTS _timescaledb_catalog.chunk_copy_activity_id_seq MINVALUE 1; +CREATE SEQUENCE IF NOT EXISTS _timescaledb_catalog.chunk_copy_operation_id_seq MINVALUE 1; -CREATE TABLE IF NOT EXISTS _timescaledb_catalog.chunk_copy_activity ( - id integer PRIMARY KEY DEFAULT nextval('_timescaledb_catalog.chunk_copy_activity_id_seq'), - operation_id name NOT NULL UNIQUE, -- the publisher/subscriber identifier used +CREATE TABLE IF NOT EXISTS _timescaledb_catalog.chunk_copy_operation ( + operation_id name PRIMARY KEY, -- the publisher/subscriber identifier used backend_pid integer NOT NULL, -- the pid of the backend running this activity completed_stage name NOT NULL, -- the completed stage/step time_start timestamptz NOT NULL DEFAULT NOW(), -- start time of the activity @@ -399,8 +398,6 @@ CREATE TABLE IF NOT EXISTS _timescaledb_catalog.chunk_copy_activity ( delete_on_source_node bool NOT NULL -- is a move or copy activity ); -ALTER SEQUENCE _timescaledb_catalog.chunk_copy_activity_id_seq OWNED BY _timescaledb_catalog.chunk_copy_activity.id; - -- Set table permissions -- We need to grant SELECT to PUBLIC for all tables even those not -- marked as being dumped because pg_dump will try to access all diff --git a/sql/updates/latest-dev.sql b/sql/updates/latest-dev.sql index 4cb4daca7..98596cdf3 100644 --- a/sql/updates/latest-dev.sql +++ b/sql/updates/latest-dev.sql @@ -5,18 +5,10 @@ DROP FUNCTION IF EXISTS _timescaledb_internal.allow_new_chunks; DROP FUNCTION IF EXISTS _timescaledb_internal.refresh_continuous_aggregate; DROP FUNCTION IF EXISTS _timescaledb_internal.create_chunk; --- Use the experimental schema for ths new procedure -CREATE OR REPLACE PROCEDURE timescaledb_experimental.move_chunk( - chunk REGCLASS, - source_node NAME = NULL, - destination_node NAME = NULL) -AS '@MODULE_PATHNAME@', 'ts_move_chunk_proc' LANGUAGE C; +CREATE SEQUENCE IF NOT EXISTS _timescaledb_catalog.chunk_copy_operation_id_seq MINVALUE 1; -CREATE SEQUENCE IF NOT EXISTS _timescaledb_catalog.chunk_copy_activity_id_seq MINVALUE 1; - -CREATE TABLE IF NOT EXISTS _timescaledb_catalog.chunk_copy_activity ( - id integer PRIMARY KEY DEFAULT nextval('_timescaledb_catalog.chunk_copy_activity_id_seq'), - operation_id name NOT NULL UNIQUE, -- the publisher/subscriber identifier used +CREATE TABLE IF NOT EXISTS _timescaledb_catalog.chunk_copy_operation ( + operation_id name PRIMARY KEY, -- the publisher/subscriber identifier used backend_pid integer NOT NULL, -- the pid of the backend running this activity completed_stage name NOT NULL, -- the completed stage/step time_start timestamptz NOT NULL DEFAULT NOW(), -- start time of the activity @@ -26,7 +18,5 @@ CREATE TABLE IF NOT EXISTS _timescaledb_catalog.chunk_copy_activity ( delete_on_source_node bool NOT NULL -- is a move or copy activity ); -ALTER SEQUENCE _timescaledb_catalog.chunk_copy_activity_id_seq OWNED BY _timescaledb_catalog.chunk_copy_activity.id; - -GRANT SELECT ON _timescaledb_catalog.chunk_copy_activity_id_seq TO PUBLIC; -GRANT SELECT ON _timescaledb_catalog.chunk_copy_activity TO PUBLIC; +GRANT SELECT ON _timescaledb_catalog.chunk_copy_operation_id_seq TO PUBLIC; +GRANT SELECT ON _timescaledb_catalog.chunk_copy_operation TO PUBLIC; diff --git a/sql/updates/reverse-dev.sql b/sql/updates/reverse-dev.sql index adfc363e3..6188f6a01 100644 --- a/sql/updates/reverse-dev.sql +++ b/sql/updates/reverse-dev.sql @@ -8,8 +8,9 @@ DROP FUNCTION IF EXISTS _timescaledb_internal.create_chunk; DROP PROCEDURE IF EXISTS _timescaledb_internal.wait_subscription_sync; DROP PROCEDURE IF EXISTS timescaledb_experimental.move_chunk; DROP PROCEDURE IF EXISTS timescaledb_experimental.copy_chunk; -DROP TABLE IF EXISTS _timescaledb_catalog.chunk_copy_activity; -DROP SEQUENCE IF EXISTS _timescaledb_catalog.chunk_copy_activity_id_seq; +DROP PROCEDURE IF EXISTS timescaledb_experimental.cleanup_copy_chunk_operation; +DROP TABLE IF EXISTS _timescaledb_catalog.chunk_copy_operation; +DROP SEQUENCE IF EXISTS _timescaledb_catalog.chunk_copy_operation_id_seq; DROP VIEW IF EXISTS timescaledb_experimental.chunk_replication_status; DROP SCHEMA IF EXISTS timescaledb_experimental CASCADE; diff --git a/src/catalog.c b/src/catalog.c index 14cffe7b1..037b300c1 100644 --- a/src/catalog.c +++ b/src/catalog.c @@ -103,9 +103,9 @@ static const TableInfoDef catalog_table_names[_MAX_CATALOG_TABLES + 1] = { .schema_name = CATALOG_SCHEMA_NAME, .table_name = REMOTE_TXN_TABLE_NAME, }, - [CHUNK_COPY_ACTIVITY] = { + [CHUNK_COPY_OPERATION] = { .schema_name = CATALOG_SCHEMA_NAME, - .table_name = CHUNK_COPY_ACTIVITY_TABLE_NAME, + .table_name = CHUNK_COPY_OPERATION_TABLE_NAME, }, [_MAX_CATALOG_TABLES] = { .schema_name = "invalid schema", @@ -250,10 +250,10 @@ static const TableIndexDef catalog_table_index_definitions[_MAX_CATALOG_TABLES] [REMOTE_TXN_DATA_NODE_NAME_IDX] = "remote_txn_data_node_name_idx" } }, - [CHUNK_COPY_ACTIVITY] = { - .length = _MAX_CHUNK_COPY_ACTIVITY_INDEX, + [CHUNK_COPY_OPERATION] = { + .length = _MAX_CHUNK_COPY_OPERATION_INDEX, .names = (char *[]) { - [CHUNK_COPY_ACTIVITY_PKEY_IDX] = "chunk_copy_activity_pkey", + [CHUNK_COPY_OPERATION_PKEY_IDX] = "chunk_copy_operation_pkey", }, } }; @@ -276,7 +276,7 @@ static const char *catalog_table_serial_id_names[_MAX_CATALOG_TABLES] = { [HYPERTABLE_COMPRESSION] = NULL, [COMPRESSION_CHUNK_SIZE] = NULL, [REMOTE_TXN] = NULL, - [CHUNK_COPY_ACTIVITY] = CATALOG_SCHEMA_NAME ".chunk_copy_activity_id_seq", + [CHUNK_COPY_OPERATION] = CATALOG_SCHEMA_NAME ".chunk_copy_operation_id_seq", }; typedef struct InternalFunctionDef diff --git a/src/catalog.h b/src/catalog.h index d5c32d273..1f38bbec7 100644 --- a/src/catalog.h +++ b/src/catalog.h @@ -53,7 +53,7 @@ typedef enum CatalogTable HYPERTABLE_COMPRESSION, COMPRESSION_CHUNK_SIZE, REMOTE_TXN, - CHUNK_COPY_ACTIVITY, + CHUNK_COPY_OPERATION, _MAX_CATALOG_TABLES, } CatalogTable; @@ -1189,27 +1189,25 @@ enum Anum_remote_data_node_name_idx * ********************************************/ -#define CHUNK_COPY_ACTIVITY_TABLE_NAME "chunk_copy_activity" +#define CHUNK_COPY_OPERATION_TABLE_NAME "chunk_copy_operation" -enum Anum_chunk_copy_activity +enum Anum_chunk_copy_operation { - Anum_chunk_copy_activity_id = 1, - Anum_chunk_copy_activity_operation_id, - Anum_chunk_copy_activity_backend_pid, - Anum_chunk_copy_activity_completed_stage, - Anum_chunk_copy_activity_time_start, - Anum_chunk_copy_activity_chunk_id, - Anum_chunk_copy_activity_source_node_name, - Anum_chunk_copy_activity_dest_node_name, - Anum_chunk_copy_activity_delete_on_src_node, - _Anum_chunk_copy_activity_max, + Anum_chunk_copy_operation_operation_id = 1, + Anum_chunk_copy_operation_backend_pid, + Anum_chunk_copy_operation_completed_stage, + Anum_chunk_copy_operation_time_start, + Anum_chunk_copy_operation_chunk_id, + Anum_chunk_copy_operation_source_node_name, + Anum_chunk_copy_operation_dest_node_name, + Anum_chunk_copy_operation_delete_on_src_node, + _Anum_chunk_copy_operation_max, }; -#define Natts_chunk_copy_activity (_Anum_chunk_copy_activity_max - 1) +#define Natts_chunk_copy_operation (_Anum_chunk_copy_operation_max - 1) -typedef struct FormData_chunk_copy_activity +typedef struct FormData_chunk_copy_operation { - int32 id; NameData operation_id; int32 backend_pid; NameData completed_stage; @@ -1218,20 +1216,19 @@ typedef struct FormData_chunk_copy_activity NameData source_node_name; NameData dest_node_name; bool delete_on_src_node; -} FormData_chunk_copy_activity; +} FormData_chunk_copy_operation; enum { - CHUNK_COPY_ACTIVITY_PKEY_IDX = 0, - _MAX_CHUNK_COPY_ACTIVITY_INDEX, + CHUNK_COPY_OPERATION_PKEY_IDX = 0, + _MAX_CHUNK_COPY_OPERATION_INDEX, }; -enum Anum_chunk_copy_activity_pkey_idx +enum Anum_chunk_copy_operation_pkey_idx { - Anum_chunk_copy_activity_pkey_idx_id = 1, - _Anum_chunk_copy_activity_pkey_idx_max, + Anum_chunk_copy_operation_idx_operation_id = 1, + _Anum_chunk_copy_operation_pkey_idx_max, }; -#define Natts_chunk_copy_activity_pkey_idx (_Anum_chunk_copy_activity_pkey_idx_max - 1) typedef enum CacheType { diff --git a/src/cross_module_fn.c b/src/cross_module_fn.c index 158426e3f..f44a915e6 100644 --- a/src/cross_module_fn.c +++ b/src/cross_module_fn.c @@ -44,6 +44,7 @@ CROSSMODULE_WRAPPER(reorder_chunk); CROSSMODULE_WRAPPER(move_chunk); CROSSMODULE_WRAPPER(move_chunk_proc); CROSSMODULE_WRAPPER(copy_chunk_proc); +CROSSMODULE_WRAPPER(copy_chunk_cleanup_proc); /* partialize/finalize aggregate */ CROSSMODULE_WRAPPER(partialize_agg); @@ -336,6 +337,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = { .move_chunk = error_no_default_fn_pg_community, .move_chunk_proc = error_no_default_fn_pg_community, .copy_chunk_proc = error_no_default_fn_pg_community, + .copy_chunk_cleanup_proc = error_no_default_fn_pg_community, .reorder_chunk = error_no_default_fn_pg_community, .partialize_agg = error_no_default_fn_pg_community, diff --git a/src/cross_module_fn.h b/src/cross_module_fn.h index 4fbde53e6..d41fdf22a 100644 --- a/src/cross_module_fn.h +++ b/src/cross_module_fn.h @@ -81,6 +81,7 @@ typedef struct CrossModuleFunctions PGFunction move_chunk; PGFunction move_chunk_proc; PGFunction copy_chunk_proc; + PGFunction copy_chunk_cleanup_proc; void (*ddl_command_start)(ProcessUtilityArgs *args); void (*ddl_command_end)(EventTriggerData *command); void (*sql_drop)(List *dropped_objects); diff --git a/src/utils.c b/src/utils.c index 2c62968c0..4b377eb55 100644 --- a/src/utils.c +++ b/src/utils.c @@ -525,8 +525,11 @@ ts_create_struct_from_tuple(HeapTuple tuple, MemoryContext mctx, size_t alloc_si { void *struct_ptr = MemoryContextAllocZero(mctx, alloc_size); - /* Make sure the function is not used when the tuple contains NULLs */ - Assert(copy_size == tuple->t_len - tuple->t_data->t_hoff); + /* + * Make sure the function is not used when the tuple contains NULLs. + * Also compare the aligned sizes in the assert. + */ + Assert(copy_size == MAXALIGN(tuple->t_len - tuple->t_data->t_hoff)); memcpy(struct_ptr, GETSTRUCT(tuple), copy_size); return struct_ptr; diff --git a/src/utils.h b/src/utils.h index 7eda9bd30..ff873eea8 100644 --- a/src/utils.h +++ b/src/utils.h @@ -82,8 +82,8 @@ typedef struct Dimension Dimension; extern TSDLLEXPORT Oid ts_get_integer_now_func(const Dimension *open_dim); -extern void *ts_create_struct_from_slot(TupleTableSlot *slot, MemoryContext mctx, size_t alloc_size, - size_t copy_size); +extern TSDLLEXPORT void *ts_create_struct_from_slot(TupleTableSlot *slot, MemoryContext mctx, + size_t alloc_size, size_t copy_size); extern TSDLLEXPORT AppendRelInfo *ts_get_appendrelinfo(PlannerInfo *root, Index rti, bool missing_ok); diff --git a/test/expected/drop_rename_hypertable.out b/test/expected/drop_rename_hypertable.out index 27a49bc61..2a0df71b7 100644 --- a/test/expected/drop_rename_hypertable.out +++ b/test/expected/drop_rename_hypertable.out @@ -196,7 +196,7 @@ SELECT * FROM _timescaledb_catalog.hypertable; ----------------------+--------------------------------------------------+-------+------------ _timescaledb_catalog | chunk | table | super_user _timescaledb_catalog | chunk_constraint | table | super_user - _timescaledb_catalog | chunk_copy_activity | table | super_user + _timescaledb_catalog | chunk_copy_operation | table | super_user _timescaledb_catalog | chunk_data_node | table | super_user _timescaledb_catalog | chunk_index | table | super_user _timescaledb_catalog | compression_algorithm | table | super_user diff --git a/test/expected/pg_dump.out b/test/expected/pg_dump.out index ef2efe87a..68e6cc018 100644 --- a/test/expected/pg_dump.out +++ b/test/expected/pg_dump.out @@ -556,8 +556,8 @@ WHERE refclassid = 'pg_catalog.pg_extension'::pg_catalog.regclass AND timescaledb_information.hypertables _timescaledb_internal.compressed_chunk_stats _timescaledb_internal.hypertable_chunk_local_size - _timescaledb_catalog.chunk_copy_activity - _timescaledb_catalog.chunk_copy_activity_id_seq + _timescaledb_catalog.chunk_copy_operation + _timescaledb_catalog.chunk_copy_operation_id_seq _timescaledb_catalog.compression_algorithm _timescaledb_internal.bgw_policy_chunk_stats _timescaledb_internal.bgw_job_stat diff --git a/tsl/src/chunk_api.c b/tsl/src/chunk_api.c index fe0976428..c9372498d 100644 --- a/tsl/src/chunk_api.c +++ b/tsl/src/chunk_api.c @@ -1709,7 +1709,7 @@ chunk_api_call_create_empty_chunk_table(const Hypertable *ht, const Chunk *chunk ts_dist_cmd_params_invoke_on_data_nodes(create_cmd, stmt_params_create_from_values(params, 4), list_make1((void *) node_name), - false)); + true)); } void diff --git a/tsl/src/chunk_copy.c b/tsl/src/chunk_copy.c index 9e30cc8fe..7592d1359 100644 --- a/tsl/src/chunk_copy.c +++ b/tsl/src/chunk_copy.c @@ -41,6 +41,7 @@ #include "chunk_api.h" #include "chunk_copy.h" #include "data_node.h" +#include "debug_point.h" #include "remote/dist_commands.h" #include "dist_util.h" @@ -65,14 +66,14 @@ struct ChunkCopyStage { const char *name; chunk_copy_stage_func function; - /* todo: abort function */ + chunk_copy_stage_func function_cleanup; }; /* To track a chunk move or copy activity */ struct ChunkCopy { /* catalog data */ - FormData_chunk_copy_activity fd; + FormData_chunk_copy_operation fd; /* current stage being executed */ const ChunkCopyStage *stage; /* chunk to copy */ @@ -85,38 +86,37 @@ struct ChunkCopy }; static HeapTuple -chunk_copy_activity_make_tuple(const FormData_chunk_copy_activity *fd, TupleDesc desc) +chunk_copy_operation_make_tuple(const FormData_chunk_copy_operation *fd, TupleDesc desc) { - Datum values[Natts_chunk_copy_activity]; - bool nulls[Natts_chunk_copy_activity] = { false }; + Datum values[Natts_chunk_copy_operation]; + bool nulls[Natts_chunk_copy_operation] = { false }; memset(values, 0, sizeof(values)); - values[AttrNumberGetAttrOffset(Anum_chunk_copy_activity_id)] = Int32GetDatum(fd->id); - values[AttrNumberGetAttrOffset(Anum_chunk_copy_activity_operation_id)] = + values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_operation_id)] = NameGetDatum(&fd->operation_id); - values[AttrNumberGetAttrOffset(Anum_chunk_copy_activity_backend_pid)] = + values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_backend_pid)] = Int32GetDatum(fd->backend_pid); - values[AttrNumberGetAttrOffset(Anum_chunk_copy_activity_completed_stage)] = + values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_completed_stage)] = NameGetDatum(&fd->completed_stage); - values[AttrNumberGetAttrOffset(Anum_chunk_copy_activity_time_start)] = + values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_time_start)] = TimestampTzGetDatum(fd->time_start); - values[AttrNumberGetAttrOffset(Anum_chunk_copy_activity_chunk_id)] = + values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_chunk_id)] = Int32GetDatum(fd->chunk_id); - values[AttrNumberGetAttrOffset(Anum_chunk_copy_activity_source_node_name)] = + values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_source_node_name)] = NameGetDatum(&fd->source_node_name); - values[AttrNumberGetAttrOffset(Anum_chunk_copy_activity_dest_node_name)] = + values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_dest_node_name)] = NameGetDatum(&fd->dest_node_name); - values[AttrNumberGetAttrOffset(Anum_chunk_copy_activity_delete_on_src_node)] = + values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_delete_on_src_node)] = BoolGetDatum(fd->delete_on_src_node); return heap_form_tuple(desc, values, nulls); } static void -chunk_copy_activity_insert_rel(Relation rel, const FormData_chunk_copy_activity *fd) +chunk_copy_operation_insert_rel(Relation rel, const FormData_chunk_copy_operation *fd) { CatalogSecurityContext sec_ctx; HeapTuple new_tuple; - new_tuple = chunk_copy_activity_make_tuple(fd, RelationGetDescr(rel)); + new_tuple = chunk_copy_operation_make_tuple(fd, RelationGetDescr(rel)); ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx); ts_catalog_insert(rel, new_tuple); @@ -125,24 +125,24 @@ chunk_copy_activity_insert_rel(Relation rel, const FormData_chunk_copy_activity } static void -chunk_copy_activity_insert(const FormData_chunk_copy_activity *fd) +chunk_copy_operation_insert(const FormData_chunk_copy_operation *fd) { Catalog *catalog; Relation rel; catalog = ts_catalog_get(); - rel = table_open(catalog_get_table_id(catalog, CHUNK_COPY_ACTIVITY), RowExclusiveLock); + rel = table_open(catalog_get_table_id(catalog, CHUNK_COPY_OPERATION), RowExclusiveLock); - chunk_copy_activity_insert_rel(rel, fd); + chunk_copy_operation_insert_rel(rel, fd); table_close(rel, RowExclusiveLock); } static ScanTupleResult -chunk_copy_activity_tuple_update(TupleInfo *ti, void *data) +chunk_copy_operation_tuple_update(TupleInfo *ti, void *data) { ChunkCopy *cc = data; - Datum values[Natts_chunk_copy_activity]; - bool nulls[Natts_chunk_copy_activity]; + Datum values[Natts_chunk_copy_operation]; + bool nulls[Natts_chunk_copy_operation]; CatalogSecurityContext sec_ctx; bool should_free; HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free); @@ -152,7 +152,7 @@ chunk_copy_activity_tuple_update(TupleInfo *ti, void *data) /* We only update the "completed_stage" field */ Assert(NULL != cc->stage); - values[AttrNumberGetAttrOffset(Anum_chunk_copy_activity_completed_stage)] = + values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_completed_stage)] = DirectFunctionCall1(namein, CStringGetDatum((cc->stage->name))); new_tuple = heap_form_tuple(ts_scanner_get_tupledesc(ti), values, nulls); @@ -168,14 +168,14 @@ chunk_copy_activity_tuple_update(TupleInfo *ti, void *data) } static int -chunk_copy_activity_scan_update_by_id(int32 id, tuple_found_func tuple_found, void *data, - LOCKMODE lockmode) +chunk_copy_operation_scan_update_by_id(const char *operation_id, tuple_found_func tuple_found, + void *data, LOCKMODE lockmode) { Catalog *catalog = ts_catalog_get(); ScanKeyData scankey[1]; ScannerCtx scanctx = { - .table = catalog_get_table_id(catalog, CHUNK_COPY_ACTIVITY), - .index = catalog_get_index(catalog, CHUNK_COPY_ACTIVITY, CHUNK_COPY_ACTIVITY_PKEY_IDX), + .table = catalog_get_table_id(catalog, CHUNK_COPY_OPERATION), + .index = catalog_get_index(catalog, CHUNK_COPY_OPERATION, CHUNK_COPY_OPERATION_PKEY_IDX), .nkeys = 1, .limit = 1, .scankey = scankey, @@ -186,16 +186,16 @@ chunk_copy_activity_scan_update_by_id(int32 id, tuple_found_func tuple_found, vo }; ScanKeyInit(&scankey[0], - Anum_chunk_copy_activity_pkey_idx_id, + Anum_chunk_copy_operation_idx_operation_id, BTEqualStrategyNumber, - F_INT4EQ, - Int32GetDatum(id)); + F_NAMEEQ, + DirectFunctionCall1(namein, CStringGetDatum(operation_id))); return ts_scanner_scan(&scanctx); } static void -chunk_copy_activity_update(ChunkCopy *cc) +chunk_copy_operation_update(ChunkCopy *cc) { NameData application_name; @@ -207,14 +207,14 @@ chunk_copy_activity_update(ChunkCopy *cc) pgstat_report_appname(application_name.data); - chunk_copy_activity_scan_update_by_id(cc->fd.id, - chunk_copy_activity_tuple_update, - cc, - RowExclusiveLock); + chunk_copy_operation_scan_update_by_id(NameStr(cc->fd.operation_id), + chunk_copy_operation_tuple_update, + cc, + RowExclusiveLock); } static ScanTupleResult -chunk_copy_activity_tuple_delete(TupleInfo *ti, void *data) +chunk_copy_operation_tuple_delete(TupleInfo *ti, void *data) { CatalogSecurityContext sec_ctx; @@ -226,34 +226,34 @@ chunk_copy_activity_tuple_delete(TupleInfo *ti, void *data) } static int -chunk_copy_activity_delete_by_id(int32 id) +chunk_copy_operation_delete_by_id(const char *operation_id) { Catalog *catalog = ts_catalog_get(); ScanKeyData scankey[1]; ScannerCtx scanctx = { - .table = catalog_get_table_id(catalog, CHUNK_COPY_ACTIVITY), - .index = catalog_get_index(catalog, CHUNK_COPY_ACTIVITY, CHUNK_COPY_ACTIVITY_PKEY_IDX), + .table = catalog_get_table_id(catalog, CHUNK_COPY_OPERATION), + .index = catalog_get_index(catalog, CHUNK_COPY_OPERATION, CHUNK_COPY_OPERATION_PKEY_IDX), .nkeys = 1, .limit = 1, .scankey = scankey, .data = NULL, - .tuple_found = chunk_copy_activity_tuple_delete, + .tuple_found = chunk_copy_operation_tuple_delete, .lockmode = RowExclusiveLock, .scandirection = ForwardScanDirection, }; ScanKeyInit(&scankey[0], - Anum_chunk_copy_activity_pkey_idx_id, + Anum_chunk_copy_operation_idx_operation_id, BTEqualStrategyNumber, - F_INT4EQ, - Int32GetDatum(id)); + F_NAMEEQ, + DirectFunctionCall1(namein, CStringGetDatum(operation_id))); return ts_scanner_scan(&scanctx); } static void -chunk_copy_init(ChunkCopy *cc, Oid chunk_relid, const char *src_node, const char *dst_node, - bool delete_on_src_node) +chunk_copy_setup(ChunkCopy *cc, Oid chunk_relid, const char *src_node, const char *dst_node, + bool delete_on_src_node) { Hypertable *ht; Cache *hcache; @@ -334,18 +334,10 @@ chunk_copy_init(ChunkCopy *cc, Oid chunk_relid, const char *src_node, const char dst_node))); /* - * Populate the FormData_chunk_copy_activity structure for use by various stages + * Populate the FormData_chunk_copy_operation structure for use by various stages * - * Get the operation id for this chunk move/copy activity. The naming - * convention is "ts_copy_seq-id_chunk-id and it can - * get truncated due to NAMEDATALEN restrictions + * The operation_id will be populated in the chunk_copy_stage_init function. */ - cc->fd.id = ts_catalog_table_next_seq_id(ts_catalog_get(), CHUNK_COPY_ACTIVITY); - snprintf(cc->fd.operation_id.data, - sizeof(cc->fd.operation_id.data), - "ts_copy_%d_%d", - cc->fd.id, - cc->chunk->fd.id); cc->fd.backend_pid = MyProcPid; namestrcpy(&cc->fd.completed_stage, CCS_INIT); cc->fd.time_start = GetCurrentTimestamp(); @@ -354,9 +346,6 @@ chunk_copy_init(ChunkCopy *cc, Oid chunk_relid, const char *src_node, const char namestrcpy(&cc->fd.dest_node_name, dst_node); cc->fd.delete_on_src_node = delete_on_src_node; - /* Persist the entry in the catalog */ - chunk_copy_activity_insert(&cc->fd); - ts_cache_release(hcache); MemoryContextSwitchTo(old); @@ -366,16 +355,40 @@ chunk_copy_init(ChunkCopy *cc, Oid chunk_relid, const char *src_node, const char } static void -chunk_copy_cleanup(ChunkCopy *cc) +chunk_copy_finish(ChunkCopy *cc) { /* Done using this long lived memory context */ MemoryContextDelete(cc->mcxt); /* Start a transaction for the final outer transaction */ StartTransactionCommand(); +} - /* All steps complete, delete this ccd entry from the catalog now */ - chunk_copy_activity_delete_by_id(cc->fd.id); +static void +chunk_copy_stage_init(ChunkCopy *cc) +{ + int32 id; + + /* + * Get the operation id for this chunk move/copy activity. The naming + * convention is "ts_copy_seq-id_chunk-id". + */ + id = ts_catalog_table_next_seq_id(ts_catalog_get(), CHUNK_COPY_OPERATION); + snprintf(cc->fd.operation_id.data, + sizeof(cc->fd.operation_id.data), + "ts_copy_%d_%d", + id, + cc->chunk->fd.id); + + /* Persist the Formdata entry in the catalog */ + chunk_copy_operation_insert(&cc->fd); +} + +static void +chunk_copy_stage_init_cleanup(ChunkCopy *cc) +{ + /* Failure in initial stages, delete this entry from the catalog */ + chunk_copy_operation_delete_by_id(NameStr(cc->fd.operation_id)); } static void @@ -394,6 +407,18 @@ chunk_copy_stage_create_empty_chunk(ChunkCopy *cc) ts_cache_release(hcache); } +static void +chunk_copy_stage_create_empty_chunk_cleanup(ChunkCopy *cc) +{ + /* + * Drop the chunk table on the dst_node. We use the API instead of just + * "DROP TABLE" because some metadata cleanup might also be needed + */ + chunk_api_call_chunk_drop_replica(cc->chunk, + NameStr(cc->fd.dest_node_name), + cc->dst_server->serverid); +} + static void chunk_copy_stage_create_publication(ChunkCopy *cc) { @@ -405,7 +430,7 @@ chunk_copy_stage_create_publication(ChunkCopy *cc) quote_qualified_identifier(NameStr(cc->chunk->fd.schema_name), NameStr(cc->chunk->fd.table_name))); - /* Create the publication in autocommit mode */ + /* Create the publication */ ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true); } @@ -424,6 +449,71 @@ chunk_copy_stage_create_replication_slot(ChunkCopy *cc) ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true); } +static void +chunk_copy_stage_create_replication_slot_cleanup(ChunkCopy *cc) +{ + char *cmd; + DistCmdResult *dist_res; + PGresult *res; + + /* Check if the slot exists on the source data node */ + cmd = psprintf("SELECT 1 FROM pg_catalog.pg_replication_slots WHERE slot_name = '%s'", + NameStr(cc->fd.operation_id)); + dist_res = + ts_dist_cmd_invoke_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true); + res = ts_dist_cmd_get_result_by_node_name(dist_res, NameStr(cc->fd.source_node_name)); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res)))); + + /* Drop replication slot on the source data node only if it exists */ + if (PQntuples(res) != 0) + { + cmd = psprintf("SELECT pg_drop_replication_slot('%s')", NameStr(cc->fd.operation_id)); + ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true); + } + + ts_dist_cmd_close_response(dist_res); +} + +static void +chunk_copy_stage_create_publication_cleanup(ChunkCopy *cc) +{ + char *cmd; + DistCmdResult *dist_res; + PGresult *res; + + /* + * Check if the replication slot exists and clean it up if so. This might + * happen if there's a failure in the create_replication_slot stage but + * PG might end up creating the slot even though we issued a ROLLBACK + */ + chunk_copy_stage_create_replication_slot_cleanup(cc); + + /* Check if the publication exists on the source data node */ + cmd = psprintf("SELECT 1 FROM pg_catalog.pg_publication WHERE pubname = '%s'", + NameStr(cc->fd.operation_id)); + dist_res = + ts_dist_cmd_invoke_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true); + res = ts_dist_cmd_get_result_by_node_name(dist_res, NameStr(cc->fd.source_node_name)); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res)))); + + /* Drop publication on the source node only if it exists */ + if (PQntuples(res) != 0) + { + cmd = psprintf("DROP PUBLICATION %s", NameStr(cc->fd.operation_id)); + + /* Drop the publication */ + ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true); + } + + ts_dist_cmd_close_response(dist_res); +} + static void chunk_copy_stage_create_subscription(ChunkCopy *cc) { @@ -441,6 +531,43 @@ chunk_copy_stage_create_subscription(ChunkCopy *cc) ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true); } +static void +chunk_copy_stage_create_subscription_cleanup(ChunkCopy *cc) +{ + char *cmd; + DistCmdResult *dist_res; + PGresult *res; + + /* Check if the subscription exists on the destination data node */ + cmd = psprintf("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = '%s'", + NameStr(cc->fd.operation_id)); + dist_res = + ts_dist_cmd_invoke_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true); + res = ts_dist_cmd_get_result_by_node_name(dist_res, NameStr(cc->fd.dest_node_name)); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res)))); + + /* Cleanup only if the subscription exists */ + if (PQntuples(res) != 0) + { + List *nodes = list_make1(NameStr(cc->fd.dest_node_name)); + + /* Disassociate the subscription from the replication slot first */ + cmd = + psprintf("ALTER SUBSCRIPTION %s SET (slot_name = NONE)", NameStr(cc->fd.operation_id)); + ts_dist_cmd_run_on_data_nodes(cmd, nodes, true); + + /* Drop the subscription now */ + pfree(cmd); + cmd = psprintf("DROP SUBSCRIPTION %s", NameStr(cc->fd.operation_id)); + ts_dist_cmd_run_on_data_nodes(cmd, nodes, true); + } + + ts_dist_cmd_close_response(dist_res); +} + static void chunk_copy_stage_sync_start(ChunkCopy *cc) { @@ -451,6 +578,35 @@ chunk_copy_stage_sync_start(ChunkCopy *cc) ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true); } +static void +chunk_copy_stage_sync_start_cleanup(ChunkCopy *cc) +{ + char *cmd; + DistCmdResult *dist_res; + PGresult *res; + + /* Check if the subscription exists on the destination data node */ + cmd = psprintf("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = '%s'", + NameStr(cc->fd.operation_id)); + dist_res = + ts_dist_cmd_invoke_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true); + res = ts_dist_cmd_get_result_by_node_name(dist_res, NameStr(cc->fd.dest_node_name)); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res)))); + + /* Alter subscription only if it exists */ + if (PQntuples(res) != 0) + { + /* Stop data transfer on the destination node */ + cmd = psprintf("ALTER SUBSCRIPTION %s DISABLE", NameStr(cc->fd.operation_id)); + ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true); + } + + ts_dist_cmd_close_response(dist_res); +} + static void chunk_copy_stage_sync(ChunkCopy *cc) { @@ -499,7 +655,10 @@ chunk_copy_stage_drop_subscription(ChunkCopy *cc) static void chunk_copy_stage_drop_publication(ChunkCopy *cc) { - const char *cmd; + char *cmd; + + cmd = psprintf("SELECT pg_drop_replication_slot('%s')", NameStr(cc->fd.operation_id)); + ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true); cmd = psprintf("DROP PUBLICATION %s", NameStr(cc->fd.operation_id)); ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true); @@ -553,31 +712,63 @@ chunk_copy_stage_delete_chunk(ChunkCopy *cc) } static const ChunkCopyStage chunk_copy_stages[] = { + /* Initial Marker */ + { CCS_INIT, chunk_copy_stage_init, chunk_copy_stage_init_cleanup }, - /* Create empty chunk table on the dst node */ - { CCS_CREATE_EMPTY_CHUNK, chunk_copy_stage_create_empty_chunk }, + /* + * Create empty chunk table on the dst node. + * The corresponding cleanup function should just delete this empty chunk. + */ + { CCS_CREATE_EMPTY_CHUNK, + chunk_copy_stage_create_empty_chunk, + chunk_copy_stage_create_empty_chunk_cleanup }, - /* Setup logical replication between nodes */ - { CCS_CREATE_PUBLICATION, chunk_copy_stage_create_publication }, - { CCS_CREATE_REPLICATION_SLOT, chunk_copy_stage_create_replication_slot }, - { CCS_CREATE_SUBSCRIPTION, chunk_copy_stage_create_subscription }, + /* + * Setup logical replication between nodes. + * The corresponding cleanup functions should drop the subscription and + * remove the replication slot followed by dropping of the publication on + * the source data node. + */ + { CCS_CREATE_PUBLICATION, + chunk_copy_stage_create_publication, + chunk_copy_stage_create_publication_cleanup }, + { CCS_CREATE_REPLICATION_SLOT, + chunk_copy_stage_create_replication_slot, + chunk_copy_stage_create_replication_slot_cleanup }, + { CCS_CREATE_SUBSCRIPTION, + chunk_copy_stage_create_subscription, + chunk_copy_stage_create_subscription_cleanup }, - /* Begin data transfer and wait for completion */ - { CCS_SYNC_START, chunk_copy_stage_sync_start }, - { CCS_SYNC, chunk_copy_stage_sync }, + /* + * Begin data transfer and wait for completion. + * The corresponding cleanup function should just disable the subscription so + * that earlier steps above can drop the subcription/publication cleanly. + */ + { CCS_SYNC_START, chunk_copy_stage_sync_start, chunk_copy_stage_sync_start_cleanup }, + { CCS_SYNC, chunk_copy_stage_sync, NULL }, - /* Cleanup */ - { CCS_DROP_PUBLICATION, chunk_copy_stage_drop_publication }, - { CCS_DROP_SUBSCRIPTION, chunk_copy_stage_drop_subscription }, + /* + * Cleanup. Nothing else required via the cleanup functions. + */ + { CCS_DROP_SUBSCRIPTION, chunk_copy_stage_drop_subscription, NULL }, + { CCS_DROP_PUBLICATION, chunk_copy_stage_drop_publication, NULL }, - /* Attach chunk to the hypertable on the dst_node */ - { CCS_ATTACH_CHUNK, chunk_copy_stage_attach_chunk }, + /* + * Attach chunk to the hypertable on the dst_node. + * The operation has succeeded from the destination data node perspective. + * No cleanup required here. + */ + { CCS_ATTACH_CHUNK, chunk_copy_stage_attach_chunk, NULL }, - /* Maybe delete chunk from the src_node (move operation) */ - { CCS_DELETE_CHUNK, chunk_copy_stage_delete_chunk }, + /* + * Maybe delete chunk from the src_node (move operation). + * Again, everything ok, so no cleanup required, we probably shouldn't be + * seeing this entry in the catalog table because the operation has succeeded. + */ + { CCS_DELETE_CHUNK, chunk_copy_stage_delete_chunk, NULL }, - /* Done */ - { NULL, NULL } + /* Done Marker */ + { NULL, NULL, NULL } }; static void @@ -598,7 +789,9 @@ chunk_copy_execute(ChunkCopy *cc) cc->stage->function(cc); /* Mark current stage as completed and update the catalog */ - chunk_copy_activity_update(cc); + chunk_copy_operation_update(cc); + + DEBUG_ERROR_INJECTION(stage->name); CommitTransactionCommand(); } @@ -610,8 +803,8 @@ chunk_copy(Oid chunk_relid, const char *src_node, const char *dst_node, bool del ChunkCopy cc; const MemoryContext oldcontext = CurrentMemoryContext; - /* Populate copy structure and insert initial catalog entry */ - chunk_copy_init(&cc, chunk_relid, src_node, dst_node, delete_on_src_node); + /* Populate copy structure */ + chunk_copy_setup(&cc, chunk_relid, src_node, dst_node, delete_on_src_node); /* Execute chunk copy in separate stages */ PG_TRY(); @@ -624,13 +817,185 @@ chunk_copy(Oid chunk_relid, const char *src_node, const char *dst_node, bool del ErrorData *edata; MemoryContextSwitchTo(oldcontext); edata = CopyErrorData(); - edata->hint = - psprintf("chunk copy operation id: %d (%s).", cc.fd.id, NameStr(cc.fd.operation_id)); + edata->detail = psprintf("Chunk copy operation id: %s.", NameStr(cc.fd.operation_id)); FlushErrorState(); ReThrowError(edata); } PG_END_TRY(); - /* Cleanup and delete the catalog entry */ - chunk_copy_cleanup(&cc); + /* Finish up and delete the catalog entry */ + chunk_copy_finish(&cc); +} + +static ScanTupleResult +chunk_copy_operation_tuple_found(TupleInfo *ti, void *const data) +{ + ChunkCopy **cc = data; + + *cc = STRUCT_FROM_SLOT(ti->slot, ti->mctx, ChunkCopy, FormData_chunk_copy_operation); + return SCAN_CONTINUE; +} + +static ChunkCopy * +chunk_copy_operation_get(const char *operation_id) +{ + ScanKeyData scankeys[1]; + ChunkCopy *cc = NULL; + int indexid; + MemoryContext old, mcxt; + + /* Objects need to be in long lived context */ + mcxt = + AllocSetContextCreate(PortalContext, "chunk copy cleanup activity", ALLOCSET_DEFAULT_SIZES); + old = MemoryContextSwitchTo(mcxt); + + if (operation_id != NULL) + { + ScanKeyInit(&scankeys[0], + Anum_chunk_copy_operation_idx_operation_id, + BTEqualStrategyNumber, + F_NAMEEQ, + DirectFunctionCall1(namein, CStringGetDatum(operation_id))); + indexid = CHUNK_COPY_OPERATION_PKEY_IDX; + } + else + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid chunk copy operation identifier"))); + + ts_catalog_scan_one(CHUNK_COPY_OPERATION, + indexid, + scankeys, + 1, + chunk_copy_operation_tuple_found, + AccessShareLock, + CHUNK_COPY_OPERATION_TABLE_NAME, + &cc); + + /* + * If a valid entry is returned then fill up the rest of the fields in the + * ChunkCopy structure + */ + if (cc) + { + cc->mcxt = mcxt; + cc->chunk = ts_chunk_get_by_id(cc->fd.chunk_id, true); + cc->stage = NULL; + + /* No other sanity checks need to be performed since they were done earlier */ + + /* Setup the src_node */ + cc->src_server = + data_node_get_foreign_server(NameStr(cc->fd.source_node_name), ACL_USAGE, true, false); + Assert(NULL != cc->src_server); + + /* Setup the dst_node */ + cc->dst_server = + data_node_get_foreign_server(NameStr(cc->fd.dest_node_name), ACL_USAGE, true, false); + Assert(NULL != cc->dst_server); + } + + MemoryContextSwitchTo(old); + + if (cc == NULL) + /* No entry found, long lived context not required */ + MemoryContextDelete(mcxt); + + return cc; +} + +static void +chunk_copy_cleanup_internal(ChunkCopy *cc, int stage_idx) +{ + bool first = true; + + /* Cleanup each copy stage in a separate transaction */ + do + { + StartTransactionCommand(); + + cc->stage = &chunk_copy_stages[stage_idx]; + if (cc->stage->function_cleanup) + cc->stage->function_cleanup(cc); + + /* Mark stage as cleaned up and update the catalog */ + if (!first && stage_idx != 0) + chunk_copy_operation_update(cc); + else + first = false; + + CommitTransactionCommand(); + } while (--stage_idx >= 0); +} + +void +chunk_copy_cleanup(const char *operation_id) +{ + ChunkCopy *cc; + const MemoryContext oldcontext = CurrentMemoryContext; + const ChunkCopyStage *stage; + bool found = false; + int stage_idx; + + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + (errmsg("must be superuser to cleanup a chunk copy operation")))); + + if (dist_util_membership() != DIST_MEMBER_ACCESS_NODE) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function must be run on the access node only"))); + + cc = chunk_copy_operation_get(operation_id); + + if (cc == NULL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid chunk copy operation identifier. Entry not found"))); + + /* Identify the last completed stage for this activity. */ + stage_idx = 0; + for (stage = &chunk_copy_stages[stage_idx]; stage->name != NULL; + stage = &chunk_copy_stages[++stage_idx]) + { + if (namestrcmp(&cc->fd.completed_stage, stage->name) == 0) + { + found = true; + break; + } + } + + /* should always find an entry, add ereport to quell compiler warning */ + Assert(found == true); + if (!found) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("stage '%s' not found for copy chunk cleanup", + NameStr(cc->fd.completed_stage)))); + + /* Commit to get out of starting transaction */ + PopActiveSnapshot(); + CommitTransactionCommand(); + + /* Run the corresponding cleanup steps to roll back the activity. */ + PG_TRY(); + { + chunk_copy_cleanup_internal(cc, stage_idx); + } + PG_CATCH(); + { + /* Include chunk copy id to the error message */ + ErrorData *edata; + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + edata->detail = psprintf("While cleaning up chunk copy operation id: %s.", + NameStr(cc->fd.operation_id)); + FlushErrorState(); + ReThrowError(edata); + } + PG_END_TRY(); + + /* Finish up and delete the catalog entry */ + chunk_copy_finish(cc); } diff --git a/tsl/src/chunk_copy.h b/tsl/src/chunk_copy.h index f4b12b996..2ff3acd75 100644 --- a/tsl/src/chunk_copy.h +++ b/tsl/src/chunk_copy.h @@ -8,5 +8,6 @@ extern void chunk_copy(Oid chunk_relid, const char *src_node, const char *dst_node, bool delete_on_src_node); +extern void chunk_copy_cleanup(const char *operation_id); #endif /* TIMESCALEDB_TSL_CHUNK_COPY_H */ diff --git a/tsl/src/init.c b/tsl/src/init.c index 42fa1c616..841002c42 100644 --- a/tsl/src/init.c +++ b/tsl/src/init.c @@ -121,6 +121,7 @@ CrossModuleFunctions tsl_cm_functions = { .move_chunk = tsl_move_chunk, .move_chunk_proc = tsl_move_chunk_proc, .copy_chunk_proc = tsl_copy_chunk_proc, + .copy_chunk_cleanup_proc = tsl_copy_chunk_cleanup_proc, .partialize_agg = tsl_partialize_agg, .finalize_agg_sfunc = tsl_finalize_agg_sfunc, .finalize_agg_ffunc = tsl_finalize_agg_ffunc, diff --git a/tsl/src/reorder.c b/tsl/src/reorder.c index 520d1bc37..bb8c3f2f2 100644 --- a/tsl/src/reorder.c +++ b/tsl/src/reorder.c @@ -202,7 +202,7 @@ tsl_move_chunk(PG_FUNCTION_ARGS) * We use a procedure because multiple steps need to be performed via multiple * transactions across the access node and the two datanodes that are involved. * The progress of the various stages/steps are tracked in the - * CHUNK_COPY_ACTIVITY catalog table + * CHUNK_COPY_OPERATION catalog table */ static void tsl_copy_or_move_chunk_proc(FunctionCallInfo fcinfo, bool delete_on_src_node) @@ -244,6 +244,27 @@ tsl_copy_chunk_proc(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +Datum +tsl_copy_chunk_cleanup_proc(PG_FUNCTION_ARGS) +{ + const char *operation_id = PG_ARGISNULL(0) ? NULL : NameStr(*PG_GETARG_NAME(0)); + + TS_PREVENT_FUNC_IF_READ_ONLY(); + + PreventInTransactionBlock(true, get_func_name(FC_FN_OID(fcinfo))); + + /* valid input has to be provided */ + if (operation_id == NULL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid chunk copy operation id"))); + + /* perform the cleanup/repair depending on the stage */ + chunk_copy_cleanup(operation_id); + + PG_RETURN_VOID(); +} + void reorder_chunk(Oid chunk_id, Oid index_id, bool verbose, Oid wait_id, Oid destination_tablespace, Oid index_tablespace) diff --git a/tsl/src/reorder.h b/tsl/src/reorder.h index 8bcb7b372..09c4a3bf3 100644 --- a/tsl/src/reorder.h +++ b/tsl/src/reorder.h @@ -13,6 +13,7 @@ extern Datum tsl_reorder_chunk(PG_FUNCTION_ARGS); extern Datum tsl_move_chunk(PG_FUNCTION_ARGS); extern Datum tsl_move_chunk_proc(PG_FUNCTION_ARGS); extern Datum tsl_copy_chunk_proc(PG_FUNCTION_ARGS); +extern Datum tsl_copy_chunk_cleanup_proc(PG_FUNCTION_ARGS); extern void reorder_chunk(Oid chunk_id, Oid index_id, bool verbose, Oid wait_id, Oid destination_tablespace, Oid index_tablespace); diff --git a/tsl/test/t/002_chunk_copy_move.pl b/tsl/test/t/002_chunk_copy_move.pl index b0237a6e4..3a85edd1b 100644 --- a/tsl/test/t/002_chunk_copy_move.pl +++ b/tsl/test/t/002_chunk_copy_move.pl @@ -8,12 +8,12 @@ use warnings; use AccessNode; use DataNode; use TestLib; -use Test::More tests => 21; +use Test::More tests => 272; #Initialize all the multi-node instances my $an = AccessNode->create('an'); my $dn1 = DataNode->create('dn1', allows_streaming => 'logical'); -my $dn2 = DataNode->create('dn2'); +my $dn2 = DataNode->create('dn2', allows_streaming => 'logical'); $an->add_data_node($dn1); $an->add_data_node($dn2); @@ -28,33 +28,59 @@ $an->safe_psql( ]); #Check that chunks are shown appropriately on all nodes of the multi-node setup -my $query = q[SELECT * from show_chunks('test');]; +my $query = q[SELECT * from show_chunks('test');]; +my $operation_id = "ts_copy_1_1"; -#Query Access node -$an->psql_is( - 'postgres', $query, q[_timescaledb_internal._dist_hyper_1_1_chunk -_timescaledb_internal._dist_hyper_1_2_chunk -_timescaledb_internal._dist_hyper_1_3_chunk -_timescaledb_internal._dist_hyper_1_4_chunk], 'AN shows correct set of chunks' +#Check chunk states before the move +check_pre_move_chunk_states(); + +#Setup the error injection function on the AN +my $extversion = $an->safe_psql('postgres', + "SELECT extversion from pg_catalog.pg_extension WHERE extname = 'timescaledb'" ); - -#Query datanode1 -$dn1->psql_is( +$an->safe_psql( 'postgres', - $query, - "_timescaledb_internal._dist_hyper_1_1_chunk\n_timescaledb_internal._dist_hyper_1_3_chunk\n_timescaledb_internal._dist_hyper_1_4_chunk", - 'DN1 shows correct set of chunks'); + qq[ + CREATE OR REPLACE FUNCTION error_injection_on(TEXT) RETURNS VOID LANGUAGE C VOLATILE STRICT + AS 'timescaledb-$extversion', 'ts_debug_point_enable'; + ]); -#Check contents on the chunk on DN1 -$dn1->psql_is('postgres', - "SELECT sum(device) FROM _timescaledb_internal._dist_hyper_1_1_chunk", - qq[406], "DN1 has correct contents in the chunk"); +#Induce errors in various stages in the chunk move activity and ensure that the +#cleanup function restores things to the previous sane state -#Query datanode2 -$dn2->psql_is( - 'postgres', $query, - "_timescaledb_internal._dist_hyper_1_2_chunk", - 'DN2 shows correct set of chunks'); +my @stages = + qw(init create_empty_chunk create_publication create_replication_slot create_subscription sync_start sync drop_publication drop_subscription attach_chunk delete_chunk); + +my ($stdout, $stderr, $ret); +my $curr_index = 1; +my $arrSize = @stages; + +while ($curr_index < $arrSize) +{ + #Enable the error at each stage + #Call the move_chunk procedure which should error out now + ($ret, $stdout, $stderr) = $an->psql('postgres', + "SELECT error_injection_on('$stages[$curr_index]'); CALL timescaledb_experimental.move_chunk(chunk=>'_timescaledb_internal._dist_hyper_1_1_chunk', source_node=> 'dn1', destination_node => 'dn2');" + ); + is($ret, 3, + "move_chunk fails as expected in stage '$stages[$curr_index]'"); + like( + $stderr, + qr/ERROR: error injected at debug point '$stages[$curr_index]'/, + 'failure in expected stage'); + + #The earlier debug error point gets released automatically since it's a session lock + #Call the cleanup procedure to make things right + $operation_id = "ts_copy_" . $curr_index . "_1"; + $an->safe_psql('postgres', + "CALL timescaledb_experimental.cleanup_copy_chunk_operation(operation_id=>'$operation_id');" + ); + + #Check chunk state is as before the move + check_pre_move_chunk_states(); + + $curr_index++; +} #Move chunk _timescaledb_internal._dist_hyper_1_1_chunk to DN2 from AN $an->safe_psql('postgres', @@ -82,7 +108,96 @@ $dn2->psql_is( "_timescaledb_internal._dist_hyper_1_2_chunk\n_timescaledb_internal._dist_hyper_1_1_chunk", 'DN2 shows correct set of chunks'); +#Copy chunk _timescaledb_internal._dist_hyper_1_1_chunk to DN1 from DN2 +$an->safe_psql('postgres', + "CALL timescaledb_experimental.copy_chunk(chunk=>'_timescaledb_internal._dist_hyper_1_1_chunk', source_node=> 'dn2', destination_node => 'dn1')" +); + +#Query datanode1 after the above copy +$dn1->psql_is( + 'postgres', + $query, + "_timescaledb_internal._dist_hyper_1_3_chunk\n_timescaledb_internal._dist_hyper_1_4_chunk\n_timescaledb_internal._dist_hyper_1_1_chunk", + 'DN1 shows correct set of chunks after the copy'); + +#Check contents on the chunk on DN2, after the copy +$dn1->psql_is( + 'postgres', + "SELECT sum(device) FROM _timescaledb_internal._dist_hyper_1_1_chunk", + qq[406], + "DN1 has correct contents after the copy in the chunk"); + +#Check contents on the chunk on DN2, after the copy +$dn2->psql_is( + 'postgres', + "SELECT sum(device) FROM _timescaledb_internal._dist_hyper_1_1_chunk", + qq[406], + "DN2 has correct contents after the copy in the chunk"); + +#Query datanode2 +$dn2->psql_is( + 'postgres', + $query, + "_timescaledb_internal._dist_hyper_1_2_chunk\n_timescaledb_internal._dist_hyper_1_1_chunk", + 'DN2 shows correct set of chunks after the copy'); done_testing(); +#Check the following +#1) chunk is still on "dn1", +#2) there's no entry on "dn2", +#3) there are no left over replication slots and publications on "dn1", +#4) there is no subscription on "dn2" +sub check_pre_move_chunk_states +{ + #Query Access node + $an->psql_is( + 'postgres', $query, q[_timescaledb_internal._dist_hyper_1_1_chunk +_timescaledb_internal._dist_hyper_1_2_chunk +_timescaledb_internal._dist_hyper_1_3_chunk +_timescaledb_internal._dist_hyper_1_4_chunk], 'AN shows correct set of chunks' + ); + + #Query datanode1 + $dn1->psql_is( + 'postgres', + $query, + "_timescaledb_internal._dist_hyper_1_1_chunk\n_timescaledb_internal._dist_hyper_1_3_chunk\n_timescaledb_internal._dist_hyper_1_4_chunk", + 'DN1 shows correct set of chunks'); + + #Check contents on the chunk on DN1 + $dn1->psql_is( + 'postgres', + "SELECT sum(device) FROM _timescaledb_internal._dist_hyper_1_1_chunk", + qq[406], + "DN1 has correct contents in the chunk"); + + #Query datanode2 + $dn2->psql_is( + 'postgres', $query, + "_timescaledb_internal._dist_hyper_1_2_chunk", + 'DN2 shows correct set of chunks'); + + #Check that there is no replication slot on datanode1 + $dn1->psql_is( + 'postgres', + "SELECT 1 FROM pg_catalog.pg_replication_slots WHERE slot_name = '$operation_id'", + "", + 'DN1 doesn\'t have left over replication slots'); + + #Check that there is no publication on datanode1 + $dn1->psql_is( + 'postgres', + "SELECT 1 FROM pg_catalog.pg_publication WHERE pubname = '$operation_id'", + "", + 'DN1 doesn\'t have left over publication'); + + #Check that there is no subscription on datanode2 + $dn2->psql_is( + 'postgres', + "SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = '$operation_id'", + "", + 'DN2 doesn\'t have left over subscription'); +} + 1; diff --git a/tsl/test/t/CMakeLists.txt b/tsl/test/t/CMakeLists.txt index 672da54e6..d763a2bb9 100644 --- a/tsl/test/t/CMakeLists.txt +++ b/tsl/test/t/CMakeLists.txt @@ -1,4 +1,9 @@ -set(PROVE_TEST_FILES 001_simple_multinode.pl 002_chunk_copy_move.pl) +set(PROVE_TEST_FILES 001_simple_multinode.pl) +set(PROVE_DEBUG_TEST_FILES 002_chunk_copy_move.pl) + +if(CMAKE_BUILD_TYPE MATCHES Debug) + list(APPEND PROVE_TEST_FILES ${PROVE_DEBUG_TEST_FILES}) +endif(CMAKE_BUILD_TYPE MATCHES Debug) foreach(P_FILE ${PROVE_TEST_FILES}) configure_file(${P_FILE} ${CMAKE_CURRENT_BINARY_DIR}/${P_FILE} COPYONLY)