mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-16 02:23:49 +08:00
Implement cleanup for chunk copy/move
A chunk copy/move operation is carried out in stages and it can fail in any of them. We track the last completed stage in the "chunk_copy_operation" catalog table. In case of failure, a "chunk_copy_cleanup" function can be invoked to bring the chunk back to its original state on the source datanode and all transient objects like replication slot, publication, subscription, empty chunk, metadata updates, etc are cleaned up. Includes test case changes for each and every stage induced failure. To avoid confusion between chunk copy activity and chunk copy operation this patch also consistently uses "operation" everywhere now instead of "activity"
This commit is contained in:
parent
f071f89ade
commit
2ffa1bf436
@ -37,3 +37,12 @@ CREATE OR REPLACE PROCEDURE timescaledb_experimental.copy_chunk(
|
||||
source_node NAME = NULL,
|
||||
destination_node NAME = NULL)
|
||||
AS '@MODULE_PATHNAME@', 'ts_copy_chunk_proc' LANGUAGE C;
|
||||
|
||||
-- A copy_chunk or move_chunk procedure call involves multiple nodes and
|
||||
-- depending on the data size can take a long time. Failures are possible
|
||||
-- when this long running activity is ongoing. We need to be able to recover
|
||||
-- and cleanup such failed chunk copy/move activities and it's done via this
|
||||
-- procedure
|
||||
CREATE OR REPLACE PROCEDURE timescaledb_experimental.cleanup_copy_chunk_operation(
|
||||
operation_id NAME)
|
||||
AS '@MODULE_PATHNAME@', 'ts_copy_chunk_cleanup_proc' LANGUAGE C;
|
||||
|
@ -385,11 +385,10 @@ SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.remote_txn', ''
|
||||
-- carry over chunk copy/move operations from earlier (if it makes sense at all)
|
||||
--
|
||||
|
||||
CREATE SEQUENCE IF NOT EXISTS _timescaledb_catalog.chunk_copy_activity_id_seq MINVALUE 1;
|
||||
CREATE SEQUENCE IF NOT EXISTS _timescaledb_catalog.chunk_copy_operation_id_seq MINVALUE 1;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS _timescaledb_catalog.chunk_copy_activity (
|
||||
id integer PRIMARY KEY DEFAULT nextval('_timescaledb_catalog.chunk_copy_activity_id_seq'),
|
||||
operation_id name NOT NULL UNIQUE, -- the publisher/subscriber identifier used
|
||||
CREATE TABLE IF NOT EXISTS _timescaledb_catalog.chunk_copy_operation (
|
||||
operation_id name PRIMARY KEY, -- the publisher/subscriber identifier used
|
||||
backend_pid integer NOT NULL, -- the pid of the backend running this activity
|
||||
completed_stage name NOT NULL, -- the completed stage/step
|
||||
time_start timestamptz NOT NULL DEFAULT NOW(), -- start time of the activity
|
||||
@ -399,8 +398,6 @@ CREATE TABLE IF NOT EXISTS _timescaledb_catalog.chunk_copy_activity (
|
||||
delete_on_source_node bool NOT NULL -- is a move or copy activity
|
||||
);
|
||||
|
||||
ALTER SEQUENCE _timescaledb_catalog.chunk_copy_activity_id_seq OWNED BY _timescaledb_catalog.chunk_copy_activity.id;
|
||||
|
||||
-- Set table permissions
|
||||
-- We need to grant SELECT to PUBLIC for all tables even those not
|
||||
-- marked as being dumped because pg_dump will try to access all
|
||||
|
@ -5,18 +5,10 @@ DROP FUNCTION IF EXISTS _timescaledb_internal.allow_new_chunks;
|
||||
DROP FUNCTION IF EXISTS _timescaledb_internal.refresh_continuous_aggregate;
|
||||
DROP FUNCTION IF EXISTS _timescaledb_internal.create_chunk;
|
||||
|
||||
-- Use the experimental schema for ths new procedure
|
||||
CREATE OR REPLACE PROCEDURE timescaledb_experimental.move_chunk(
|
||||
chunk REGCLASS,
|
||||
source_node NAME = NULL,
|
||||
destination_node NAME = NULL)
|
||||
AS '@MODULE_PATHNAME@', 'ts_move_chunk_proc' LANGUAGE C;
|
||||
CREATE SEQUENCE IF NOT EXISTS _timescaledb_catalog.chunk_copy_operation_id_seq MINVALUE 1;
|
||||
|
||||
CREATE SEQUENCE IF NOT EXISTS _timescaledb_catalog.chunk_copy_activity_id_seq MINVALUE 1;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS _timescaledb_catalog.chunk_copy_activity (
|
||||
id integer PRIMARY KEY DEFAULT nextval('_timescaledb_catalog.chunk_copy_activity_id_seq'),
|
||||
operation_id name NOT NULL UNIQUE, -- the publisher/subscriber identifier used
|
||||
CREATE TABLE IF NOT EXISTS _timescaledb_catalog.chunk_copy_operation (
|
||||
operation_id name PRIMARY KEY, -- the publisher/subscriber identifier used
|
||||
backend_pid integer NOT NULL, -- the pid of the backend running this activity
|
||||
completed_stage name NOT NULL, -- the completed stage/step
|
||||
time_start timestamptz NOT NULL DEFAULT NOW(), -- start time of the activity
|
||||
@ -26,7 +18,5 @@ CREATE TABLE IF NOT EXISTS _timescaledb_catalog.chunk_copy_activity (
|
||||
delete_on_source_node bool NOT NULL -- is a move or copy activity
|
||||
);
|
||||
|
||||
ALTER SEQUENCE _timescaledb_catalog.chunk_copy_activity_id_seq OWNED BY _timescaledb_catalog.chunk_copy_activity.id;
|
||||
|
||||
GRANT SELECT ON _timescaledb_catalog.chunk_copy_activity_id_seq TO PUBLIC;
|
||||
GRANT SELECT ON _timescaledb_catalog.chunk_copy_activity TO PUBLIC;
|
||||
GRANT SELECT ON _timescaledb_catalog.chunk_copy_operation_id_seq TO PUBLIC;
|
||||
GRANT SELECT ON _timescaledb_catalog.chunk_copy_operation TO PUBLIC;
|
||||
|
@ -8,8 +8,9 @@ DROP FUNCTION IF EXISTS _timescaledb_internal.create_chunk;
|
||||
DROP PROCEDURE IF EXISTS _timescaledb_internal.wait_subscription_sync;
|
||||
DROP PROCEDURE IF EXISTS timescaledb_experimental.move_chunk;
|
||||
DROP PROCEDURE IF EXISTS timescaledb_experimental.copy_chunk;
|
||||
DROP TABLE IF EXISTS _timescaledb_catalog.chunk_copy_activity;
|
||||
DROP SEQUENCE IF EXISTS _timescaledb_catalog.chunk_copy_activity_id_seq;
|
||||
DROP PROCEDURE IF EXISTS timescaledb_experimental.cleanup_copy_chunk_operation;
|
||||
DROP TABLE IF EXISTS _timescaledb_catalog.chunk_copy_operation;
|
||||
DROP SEQUENCE IF EXISTS _timescaledb_catalog.chunk_copy_operation_id_seq;
|
||||
DROP VIEW IF EXISTS timescaledb_experimental.chunk_replication_status;
|
||||
DROP SCHEMA IF EXISTS timescaledb_experimental CASCADE;
|
||||
|
||||
|
@ -103,9 +103,9 @@ static const TableInfoDef catalog_table_names[_MAX_CATALOG_TABLES + 1] = {
|
||||
.schema_name = CATALOG_SCHEMA_NAME,
|
||||
.table_name = REMOTE_TXN_TABLE_NAME,
|
||||
},
|
||||
[CHUNK_COPY_ACTIVITY] = {
|
||||
[CHUNK_COPY_OPERATION] = {
|
||||
.schema_name = CATALOG_SCHEMA_NAME,
|
||||
.table_name = CHUNK_COPY_ACTIVITY_TABLE_NAME,
|
||||
.table_name = CHUNK_COPY_OPERATION_TABLE_NAME,
|
||||
},
|
||||
[_MAX_CATALOG_TABLES] = {
|
||||
.schema_name = "invalid schema",
|
||||
@ -250,10 +250,10 @@ static const TableIndexDef catalog_table_index_definitions[_MAX_CATALOG_TABLES]
|
||||
[REMOTE_TXN_DATA_NODE_NAME_IDX] = "remote_txn_data_node_name_idx"
|
||||
}
|
||||
},
|
||||
[CHUNK_COPY_ACTIVITY] = {
|
||||
.length = _MAX_CHUNK_COPY_ACTIVITY_INDEX,
|
||||
[CHUNK_COPY_OPERATION] = {
|
||||
.length = _MAX_CHUNK_COPY_OPERATION_INDEX,
|
||||
.names = (char *[]) {
|
||||
[CHUNK_COPY_ACTIVITY_PKEY_IDX] = "chunk_copy_activity_pkey",
|
||||
[CHUNK_COPY_OPERATION_PKEY_IDX] = "chunk_copy_operation_pkey",
|
||||
},
|
||||
}
|
||||
};
|
||||
@ -276,7 +276,7 @@ static const char *catalog_table_serial_id_names[_MAX_CATALOG_TABLES] = {
|
||||
[HYPERTABLE_COMPRESSION] = NULL,
|
||||
[COMPRESSION_CHUNK_SIZE] = NULL,
|
||||
[REMOTE_TXN] = NULL,
|
||||
[CHUNK_COPY_ACTIVITY] = CATALOG_SCHEMA_NAME ".chunk_copy_activity_id_seq",
|
||||
[CHUNK_COPY_OPERATION] = CATALOG_SCHEMA_NAME ".chunk_copy_operation_id_seq",
|
||||
};
|
||||
|
||||
typedef struct InternalFunctionDef
|
||||
|
@ -53,7 +53,7 @@ typedef enum CatalogTable
|
||||
HYPERTABLE_COMPRESSION,
|
||||
COMPRESSION_CHUNK_SIZE,
|
||||
REMOTE_TXN,
|
||||
CHUNK_COPY_ACTIVITY,
|
||||
CHUNK_COPY_OPERATION,
|
||||
_MAX_CATALOG_TABLES,
|
||||
} CatalogTable;
|
||||
|
||||
@ -1189,27 +1189,25 @@ enum Anum_remote_data_node_name_idx
|
||||
*
|
||||
********************************************/
|
||||
|
||||
#define CHUNK_COPY_ACTIVITY_TABLE_NAME "chunk_copy_activity"
|
||||
#define CHUNK_COPY_OPERATION_TABLE_NAME "chunk_copy_operation"
|
||||
|
||||
enum Anum_chunk_copy_activity
|
||||
enum Anum_chunk_copy_operation
|
||||
{
|
||||
Anum_chunk_copy_activity_id = 1,
|
||||
Anum_chunk_copy_activity_operation_id,
|
||||
Anum_chunk_copy_activity_backend_pid,
|
||||
Anum_chunk_copy_activity_completed_stage,
|
||||
Anum_chunk_copy_activity_time_start,
|
||||
Anum_chunk_copy_activity_chunk_id,
|
||||
Anum_chunk_copy_activity_source_node_name,
|
||||
Anum_chunk_copy_activity_dest_node_name,
|
||||
Anum_chunk_copy_activity_delete_on_src_node,
|
||||
_Anum_chunk_copy_activity_max,
|
||||
Anum_chunk_copy_operation_operation_id = 1,
|
||||
Anum_chunk_copy_operation_backend_pid,
|
||||
Anum_chunk_copy_operation_completed_stage,
|
||||
Anum_chunk_copy_operation_time_start,
|
||||
Anum_chunk_copy_operation_chunk_id,
|
||||
Anum_chunk_copy_operation_source_node_name,
|
||||
Anum_chunk_copy_operation_dest_node_name,
|
||||
Anum_chunk_copy_operation_delete_on_src_node,
|
||||
_Anum_chunk_copy_operation_max,
|
||||
};
|
||||
|
||||
#define Natts_chunk_copy_activity (_Anum_chunk_copy_activity_max - 1)
|
||||
#define Natts_chunk_copy_operation (_Anum_chunk_copy_operation_max - 1)
|
||||
|
||||
typedef struct FormData_chunk_copy_activity
|
||||
typedef struct FormData_chunk_copy_operation
|
||||
{
|
||||
int32 id;
|
||||
NameData operation_id;
|
||||
int32 backend_pid;
|
||||
NameData completed_stage;
|
||||
@ -1218,20 +1216,19 @@ typedef struct FormData_chunk_copy_activity
|
||||
NameData source_node_name;
|
||||
NameData dest_node_name;
|
||||
bool delete_on_src_node;
|
||||
} FormData_chunk_copy_activity;
|
||||
} FormData_chunk_copy_operation;
|
||||
|
||||
enum
|
||||
{
|
||||
CHUNK_COPY_ACTIVITY_PKEY_IDX = 0,
|
||||
_MAX_CHUNK_COPY_ACTIVITY_INDEX,
|
||||
CHUNK_COPY_OPERATION_PKEY_IDX = 0,
|
||||
_MAX_CHUNK_COPY_OPERATION_INDEX,
|
||||
};
|
||||
|
||||
enum Anum_chunk_copy_activity_pkey_idx
|
||||
enum Anum_chunk_copy_operation_pkey_idx
|
||||
{
|
||||
Anum_chunk_copy_activity_pkey_idx_id = 1,
|
||||
_Anum_chunk_copy_activity_pkey_idx_max,
|
||||
Anum_chunk_copy_operation_idx_operation_id = 1,
|
||||
_Anum_chunk_copy_operation_pkey_idx_max,
|
||||
};
|
||||
#define Natts_chunk_copy_activity_pkey_idx (_Anum_chunk_copy_activity_pkey_idx_max - 1)
|
||||
|
||||
typedef enum CacheType
|
||||
{
|
||||
|
@ -44,6 +44,7 @@ CROSSMODULE_WRAPPER(reorder_chunk);
|
||||
CROSSMODULE_WRAPPER(move_chunk);
|
||||
CROSSMODULE_WRAPPER(move_chunk_proc);
|
||||
CROSSMODULE_WRAPPER(copy_chunk_proc);
|
||||
CROSSMODULE_WRAPPER(copy_chunk_cleanup_proc);
|
||||
|
||||
/* partialize/finalize aggregate */
|
||||
CROSSMODULE_WRAPPER(partialize_agg);
|
||||
@ -336,6 +337,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
|
||||
.move_chunk = error_no_default_fn_pg_community,
|
||||
.move_chunk_proc = error_no_default_fn_pg_community,
|
||||
.copy_chunk_proc = error_no_default_fn_pg_community,
|
||||
.copy_chunk_cleanup_proc = error_no_default_fn_pg_community,
|
||||
.reorder_chunk = error_no_default_fn_pg_community,
|
||||
|
||||
.partialize_agg = error_no_default_fn_pg_community,
|
||||
|
@ -81,6 +81,7 @@ typedef struct CrossModuleFunctions
|
||||
PGFunction move_chunk;
|
||||
PGFunction move_chunk_proc;
|
||||
PGFunction copy_chunk_proc;
|
||||
PGFunction copy_chunk_cleanup_proc;
|
||||
void (*ddl_command_start)(ProcessUtilityArgs *args);
|
||||
void (*ddl_command_end)(EventTriggerData *command);
|
||||
void (*sql_drop)(List *dropped_objects);
|
||||
|
@ -525,8 +525,11 @@ ts_create_struct_from_tuple(HeapTuple tuple, MemoryContext mctx, size_t alloc_si
|
||||
{
|
||||
void *struct_ptr = MemoryContextAllocZero(mctx, alloc_size);
|
||||
|
||||
/* Make sure the function is not used when the tuple contains NULLs */
|
||||
Assert(copy_size == tuple->t_len - tuple->t_data->t_hoff);
|
||||
/*
|
||||
* Make sure the function is not used when the tuple contains NULLs.
|
||||
* Also compare the aligned sizes in the assert.
|
||||
*/
|
||||
Assert(copy_size == MAXALIGN(tuple->t_len - tuple->t_data->t_hoff));
|
||||
memcpy(struct_ptr, GETSTRUCT(tuple), copy_size);
|
||||
|
||||
return struct_ptr;
|
||||
|
@ -82,8 +82,8 @@ typedef struct Dimension Dimension;
|
||||
|
||||
extern TSDLLEXPORT Oid ts_get_integer_now_func(const Dimension *open_dim);
|
||||
|
||||
extern void *ts_create_struct_from_slot(TupleTableSlot *slot, MemoryContext mctx, size_t alloc_size,
|
||||
size_t copy_size);
|
||||
extern TSDLLEXPORT void *ts_create_struct_from_slot(TupleTableSlot *slot, MemoryContext mctx,
|
||||
size_t alloc_size, size_t copy_size);
|
||||
|
||||
extern TSDLLEXPORT AppendRelInfo *ts_get_appendrelinfo(PlannerInfo *root, Index rti,
|
||||
bool missing_ok);
|
||||
|
@ -196,7 +196,7 @@ SELECT * FROM _timescaledb_catalog.hypertable;
|
||||
----------------------+--------------------------------------------------+-------+------------
|
||||
_timescaledb_catalog | chunk | table | super_user
|
||||
_timescaledb_catalog | chunk_constraint | table | super_user
|
||||
_timescaledb_catalog | chunk_copy_activity | table | super_user
|
||||
_timescaledb_catalog | chunk_copy_operation | table | super_user
|
||||
_timescaledb_catalog | chunk_data_node | table | super_user
|
||||
_timescaledb_catalog | chunk_index | table | super_user
|
||||
_timescaledb_catalog | compression_algorithm | table | super_user
|
||||
|
@ -556,8 +556,8 @@ WHERE refclassid = 'pg_catalog.pg_extension'::pg_catalog.regclass AND
|
||||
timescaledb_information.hypertables
|
||||
_timescaledb_internal.compressed_chunk_stats
|
||||
_timescaledb_internal.hypertable_chunk_local_size
|
||||
_timescaledb_catalog.chunk_copy_activity
|
||||
_timescaledb_catalog.chunk_copy_activity_id_seq
|
||||
_timescaledb_catalog.chunk_copy_operation
|
||||
_timescaledb_catalog.chunk_copy_operation_id_seq
|
||||
_timescaledb_catalog.compression_algorithm
|
||||
_timescaledb_internal.bgw_policy_chunk_stats
|
||||
_timescaledb_internal.bgw_job_stat
|
||||
|
@ -1709,7 +1709,7 @@ chunk_api_call_create_empty_chunk_table(const Hypertable *ht, const Chunk *chunk
|
||||
ts_dist_cmd_params_invoke_on_data_nodes(create_cmd,
|
||||
stmt_params_create_from_values(params, 4),
|
||||
list_make1((void *) node_name),
|
||||
false));
|
||||
true));
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -41,6 +41,7 @@
|
||||
#include "chunk_api.h"
|
||||
#include "chunk_copy.h"
|
||||
#include "data_node.h"
|
||||
#include "debug_point.h"
|
||||
#include "remote/dist_commands.h"
|
||||
#include "dist_util.h"
|
||||
|
||||
@ -65,14 +66,14 @@ struct ChunkCopyStage
|
||||
{
|
||||
const char *name;
|
||||
chunk_copy_stage_func function;
|
||||
/* todo: abort function */
|
||||
chunk_copy_stage_func function_cleanup;
|
||||
};
|
||||
|
||||
/* To track a chunk move or copy activity */
|
||||
struct ChunkCopy
|
||||
{
|
||||
/* catalog data */
|
||||
FormData_chunk_copy_activity fd;
|
||||
FormData_chunk_copy_operation fd;
|
||||
/* current stage being executed */
|
||||
const ChunkCopyStage *stage;
|
||||
/* chunk to copy */
|
||||
@ -85,38 +86,37 @@ struct ChunkCopy
|
||||
};
|
||||
|
||||
static HeapTuple
|
||||
chunk_copy_activity_make_tuple(const FormData_chunk_copy_activity *fd, TupleDesc desc)
|
||||
chunk_copy_operation_make_tuple(const FormData_chunk_copy_operation *fd, TupleDesc desc)
|
||||
{
|
||||
Datum values[Natts_chunk_copy_activity];
|
||||
bool nulls[Natts_chunk_copy_activity] = { false };
|
||||
Datum values[Natts_chunk_copy_operation];
|
||||
bool nulls[Natts_chunk_copy_operation] = { false };
|
||||
memset(values, 0, sizeof(values));
|
||||
values[AttrNumberGetAttrOffset(Anum_chunk_copy_activity_id)] = Int32GetDatum(fd->id);
|
||||
values[AttrNumberGetAttrOffset(Anum_chunk_copy_activity_operation_id)] =
|
||||
values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_operation_id)] =
|
||||
NameGetDatum(&fd->operation_id);
|
||||
values[AttrNumberGetAttrOffset(Anum_chunk_copy_activity_backend_pid)] =
|
||||
values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_backend_pid)] =
|
||||
Int32GetDatum(fd->backend_pid);
|
||||
values[AttrNumberGetAttrOffset(Anum_chunk_copy_activity_completed_stage)] =
|
||||
values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_completed_stage)] =
|
||||
NameGetDatum(&fd->completed_stage);
|
||||
values[AttrNumberGetAttrOffset(Anum_chunk_copy_activity_time_start)] =
|
||||
values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_time_start)] =
|
||||
TimestampTzGetDatum(fd->time_start);
|
||||
values[AttrNumberGetAttrOffset(Anum_chunk_copy_activity_chunk_id)] =
|
||||
values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_chunk_id)] =
|
||||
Int32GetDatum(fd->chunk_id);
|
||||
values[AttrNumberGetAttrOffset(Anum_chunk_copy_activity_source_node_name)] =
|
||||
values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_source_node_name)] =
|
||||
NameGetDatum(&fd->source_node_name);
|
||||
values[AttrNumberGetAttrOffset(Anum_chunk_copy_activity_dest_node_name)] =
|
||||
values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_dest_node_name)] =
|
||||
NameGetDatum(&fd->dest_node_name);
|
||||
values[AttrNumberGetAttrOffset(Anum_chunk_copy_activity_delete_on_src_node)] =
|
||||
values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_delete_on_src_node)] =
|
||||
BoolGetDatum(fd->delete_on_src_node);
|
||||
return heap_form_tuple(desc, values, nulls);
|
||||
}
|
||||
|
||||
static void
|
||||
chunk_copy_activity_insert_rel(Relation rel, const FormData_chunk_copy_activity *fd)
|
||||
chunk_copy_operation_insert_rel(Relation rel, const FormData_chunk_copy_operation *fd)
|
||||
{
|
||||
CatalogSecurityContext sec_ctx;
|
||||
HeapTuple new_tuple;
|
||||
|
||||
new_tuple = chunk_copy_activity_make_tuple(fd, RelationGetDescr(rel));
|
||||
new_tuple = chunk_copy_operation_make_tuple(fd, RelationGetDescr(rel));
|
||||
|
||||
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
|
||||
ts_catalog_insert(rel, new_tuple);
|
||||
@ -125,24 +125,24 @@ chunk_copy_activity_insert_rel(Relation rel, const FormData_chunk_copy_activity
|
||||
}
|
||||
|
||||
static void
|
||||
chunk_copy_activity_insert(const FormData_chunk_copy_activity *fd)
|
||||
chunk_copy_operation_insert(const FormData_chunk_copy_operation *fd)
|
||||
{
|
||||
Catalog *catalog;
|
||||
Relation rel;
|
||||
|
||||
catalog = ts_catalog_get();
|
||||
rel = table_open(catalog_get_table_id(catalog, CHUNK_COPY_ACTIVITY), RowExclusiveLock);
|
||||
rel = table_open(catalog_get_table_id(catalog, CHUNK_COPY_OPERATION), RowExclusiveLock);
|
||||
|
||||
chunk_copy_activity_insert_rel(rel, fd);
|
||||
chunk_copy_operation_insert_rel(rel, fd);
|
||||
table_close(rel, RowExclusiveLock);
|
||||
}
|
||||
|
||||
static ScanTupleResult
|
||||
chunk_copy_activity_tuple_update(TupleInfo *ti, void *data)
|
||||
chunk_copy_operation_tuple_update(TupleInfo *ti, void *data)
|
||||
{
|
||||
ChunkCopy *cc = data;
|
||||
Datum values[Natts_chunk_copy_activity];
|
||||
bool nulls[Natts_chunk_copy_activity];
|
||||
Datum values[Natts_chunk_copy_operation];
|
||||
bool nulls[Natts_chunk_copy_operation];
|
||||
CatalogSecurityContext sec_ctx;
|
||||
bool should_free;
|
||||
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
|
||||
@ -152,7 +152,7 @@ chunk_copy_activity_tuple_update(TupleInfo *ti, void *data)
|
||||
|
||||
/* We only update the "completed_stage" field */
|
||||
Assert(NULL != cc->stage);
|
||||
values[AttrNumberGetAttrOffset(Anum_chunk_copy_activity_completed_stage)] =
|
||||
values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_completed_stage)] =
|
||||
DirectFunctionCall1(namein, CStringGetDatum((cc->stage->name)));
|
||||
|
||||
new_tuple = heap_form_tuple(ts_scanner_get_tupledesc(ti), values, nulls);
|
||||
@ -168,14 +168,14 @@ chunk_copy_activity_tuple_update(TupleInfo *ti, void *data)
|
||||
}
|
||||
|
||||
static int
|
||||
chunk_copy_activity_scan_update_by_id(int32 id, tuple_found_func tuple_found, void *data,
|
||||
LOCKMODE lockmode)
|
||||
chunk_copy_operation_scan_update_by_id(const char *operation_id, tuple_found_func tuple_found,
|
||||
void *data, LOCKMODE lockmode)
|
||||
{
|
||||
Catalog *catalog = ts_catalog_get();
|
||||
ScanKeyData scankey[1];
|
||||
ScannerCtx scanctx = {
|
||||
.table = catalog_get_table_id(catalog, CHUNK_COPY_ACTIVITY),
|
||||
.index = catalog_get_index(catalog, CHUNK_COPY_ACTIVITY, CHUNK_COPY_ACTIVITY_PKEY_IDX),
|
||||
.table = catalog_get_table_id(catalog, CHUNK_COPY_OPERATION),
|
||||
.index = catalog_get_index(catalog, CHUNK_COPY_OPERATION, CHUNK_COPY_OPERATION_PKEY_IDX),
|
||||
.nkeys = 1,
|
||||
.limit = 1,
|
||||
.scankey = scankey,
|
||||
@ -186,16 +186,16 @@ chunk_copy_activity_scan_update_by_id(int32 id, tuple_found_func tuple_found, vo
|
||||
};
|
||||
|
||||
ScanKeyInit(&scankey[0],
|
||||
Anum_chunk_copy_activity_pkey_idx_id,
|
||||
Anum_chunk_copy_operation_idx_operation_id,
|
||||
BTEqualStrategyNumber,
|
||||
F_INT4EQ,
|
||||
Int32GetDatum(id));
|
||||
F_NAMEEQ,
|
||||
DirectFunctionCall1(namein, CStringGetDatum(operation_id)));
|
||||
|
||||
return ts_scanner_scan(&scanctx);
|
||||
}
|
||||
|
||||
static void
|
||||
chunk_copy_activity_update(ChunkCopy *cc)
|
||||
chunk_copy_operation_update(ChunkCopy *cc)
|
||||
{
|
||||
NameData application_name;
|
||||
|
||||
@ -207,14 +207,14 @@ chunk_copy_activity_update(ChunkCopy *cc)
|
||||
|
||||
pgstat_report_appname(application_name.data);
|
||||
|
||||
chunk_copy_activity_scan_update_by_id(cc->fd.id,
|
||||
chunk_copy_activity_tuple_update,
|
||||
cc,
|
||||
RowExclusiveLock);
|
||||
chunk_copy_operation_scan_update_by_id(NameStr(cc->fd.operation_id),
|
||||
chunk_copy_operation_tuple_update,
|
||||
cc,
|
||||
RowExclusiveLock);
|
||||
}
|
||||
|
||||
static ScanTupleResult
|
||||
chunk_copy_activity_tuple_delete(TupleInfo *ti, void *data)
|
||||
chunk_copy_operation_tuple_delete(TupleInfo *ti, void *data)
|
||||
{
|
||||
CatalogSecurityContext sec_ctx;
|
||||
|
||||
@ -226,34 +226,34 @@ chunk_copy_activity_tuple_delete(TupleInfo *ti, void *data)
|
||||
}
|
||||
|
||||
static int
|
||||
chunk_copy_activity_delete_by_id(int32 id)
|
||||
chunk_copy_operation_delete_by_id(const char *operation_id)
|
||||
{
|
||||
Catalog *catalog = ts_catalog_get();
|
||||
ScanKeyData scankey[1];
|
||||
ScannerCtx scanctx = {
|
||||
.table = catalog_get_table_id(catalog, CHUNK_COPY_ACTIVITY),
|
||||
.index = catalog_get_index(catalog, CHUNK_COPY_ACTIVITY, CHUNK_COPY_ACTIVITY_PKEY_IDX),
|
||||
.table = catalog_get_table_id(catalog, CHUNK_COPY_OPERATION),
|
||||
.index = catalog_get_index(catalog, CHUNK_COPY_OPERATION, CHUNK_COPY_OPERATION_PKEY_IDX),
|
||||
.nkeys = 1,
|
||||
.limit = 1,
|
||||
.scankey = scankey,
|
||||
.data = NULL,
|
||||
.tuple_found = chunk_copy_activity_tuple_delete,
|
||||
.tuple_found = chunk_copy_operation_tuple_delete,
|
||||
.lockmode = RowExclusiveLock,
|
||||
.scandirection = ForwardScanDirection,
|
||||
};
|
||||
|
||||
ScanKeyInit(&scankey[0],
|
||||
Anum_chunk_copy_activity_pkey_idx_id,
|
||||
Anum_chunk_copy_operation_idx_operation_id,
|
||||
BTEqualStrategyNumber,
|
||||
F_INT4EQ,
|
||||
Int32GetDatum(id));
|
||||
F_NAMEEQ,
|
||||
DirectFunctionCall1(namein, CStringGetDatum(operation_id)));
|
||||
|
||||
return ts_scanner_scan(&scanctx);
|
||||
}
|
||||
|
||||
static void
|
||||
chunk_copy_init(ChunkCopy *cc, Oid chunk_relid, const char *src_node, const char *dst_node,
|
||||
bool delete_on_src_node)
|
||||
chunk_copy_setup(ChunkCopy *cc, Oid chunk_relid, const char *src_node, const char *dst_node,
|
||||
bool delete_on_src_node)
|
||||
{
|
||||
Hypertable *ht;
|
||||
Cache *hcache;
|
||||
@ -334,18 +334,10 @@ chunk_copy_init(ChunkCopy *cc, Oid chunk_relid, const char *src_node, const char
|
||||
dst_node)));
|
||||
|
||||
/*
|
||||
* Populate the FormData_chunk_copy_activity structure for use by various stages
|
||||
* Populate the FormData_chunk_copy_operation structure for use by various stages
|
||||
*
|
||||
* Get the operation id for this chunk move/copy activity. The naming
|
||||
* convention is "ts_copy_seq-id_chunk-id and it can
|
||||
* get truncated due to NAMEDATALEN restrictions
|
||||
* The operation_id will be populated in the chunk_copy_stage_init function.
|
||||
*/
|
||||
cc->fd.id = ts_catalog_table_next_seq_id(ts_catalog_get(), CHUNK_COPY_ACTIVITY);
|
||||
snprintf(cc->fd.operation_id.data,
|
||||
sizeof(cc->fd.operation_id.data),
|
||||
"ts_copy_%d_%d",
|
||||
cc->fd.id,
|
||||
cc->chunk->fd.id);
|
||||
cc->fd.backend_pid = MyProcPid;
|
||||
namestrcpy(&cc->fd.completed_stage, CCS_INIT);
|
||||
cc->fd.time_start = GetCurrentTimestamp();
|
||||
@ -354,9 +346,6 @@ chunk_copy_init(ChunkCopy *cc, Oid chunk_relid, const char *src_node, const char
|
||||
namestrcpy(&cc->fd.dest_node_name, dst_node);
|
||||
cc->fd.delete_on_src_node = delete_on_src_node;
|
||||
|
||||
/* Persist the entry in the catalog */
|
||||
chunk_copy_activity_insert(&cc->fd);
|
||||
|
||||
ts_cache_release(hcache);
|
||||
MemoryContextSwitchTo(old);
|
||||
|
||||
@ -366,16 +355,40 @@ chunk_copy_init(ChunkCopy *cc, Oid chunk_relid, const char *src_node, const char
|
||||
}
|
||||
|
||||
static void
|
||||
chunk_copy_cleanup(ChunkCopy *cc)
|
||||
chunk_copy_finish(ChunkCopy *cc)
|
||||
{
|
||||
/* Done using this long lived memory context */
|
||||
MemoryContextDelete(cc->mcxt);
|
||||
|
||||
/* Start a transaction for the final outer transaction */
|
||||
StartTransactionCommand();
|
||||
}
|
||||
|
||||
/* All steps complete, delete this ccd entry from the catalog now */
|
||||
chunk_copy_activity_delete_by_id(cc->fd.id);
|
||||
static void
|
||||
chunk_copy_stage_init(ChunkCopy *cc)
|
||||
{
|
||||
int32 id;
|
||||
|
||||
/*
|
||||
* Get the operation id for this chunk move/copy activity. The naming
|
||||
* convention is "ts_copy_seq-id_chunk-id".
|
||||
*/
|
||||
id = ts_catalog_table_next_seq_id(ts_catalog_get(), CHUNK_COPY_OPERATION);
|
||||
snprintf(cc->fd.operation_id.data,
|
||||
sizeof(cc->fd.operation_id.data),
|
||||
"ts_copy_%d_%d",
|
||||
id,
|
||||
cc->chunk->fd.id);
|
||||
|
||||
/* Persist the Formdata entry in the catalog */
|
||||
chunk_copy_operation_insert(&cc->fd);
|
||||
}
|
||||
|
||||
static void
|
||||
chunk_copy_stage_init_cleanup(ChunkCopy *cc)
|
||||
{
|
||||
/* Failure in initial stages, delete this entry from the catalog */
|
||||
chunk_copy_operation_delete_by_id(NameStr(cc->fd.operation_id));
|
||||
}
|
||||
|
||||
static void
|
||||
@ -394,6 +407,18 @@ chunk_copy_stage_create_empty_chunk(ChunkCopy *cc)
|
||||
ts_cache_release(hcache);
|
||||
}
|
||||
|
||||
static void
|
||||
chunk_copy_stage_create_empty_chunk_cleanup(ChunkCopy *cc)
|
||||
{
|
||||
/*
|
||||
* Drop the chunk table on the dst_node. We use the API instead of just
|
||||
* "DROP TABLE" because some metadata cleanup might also be needed
|
||||
*/
|
||||
chunk_api_call_chunk_drop_replica(cc->chunk,
|
||||
NameStr(cc->fd.dest_node_name),
|
||||
cc->dst_server->serverid);
|
||||
}
|
||||
|
||||
static void
|
||||
chunk_copy_stage_create_publication(ChunkCopy *cc)
|
||||
{
|
||||
@ -405,7 +430,7 @@ chunk_copy_stage_create_publication(ChunkCopy *cc)
|
||||
quote_qualified_identifier(NameStr(cc->chunk->fd.schema_name),
|
||||
NameStr(cc->chunk->fd.table_name)));
|
||||
|
||||
/* Create the publication in autocommit mode */
|
||||
/* Create the publication */
|
||||
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
|
||||
}
|
||||
|
||||
@ -424,6 +449,71 @@ chunk_copy_stage_create_replication_slot(ChunkCopy *cc)
|
||||
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
|
||||
}
|
||||
|
||||
static void
|
||||
chunk_copy_stage_create_replication_slot_cleanup(ChunkCopy *cc)
|
||||
{
|
||||
char *cmd;
|
||||
DistCmdResult *dist_res;
|
||||
PGresult *res;
|
||||
|
||||
/* Check if the slot exists on the source data node */
|
||||
cmd = psprintf("SELECT 1 FROM pg_catalog.pg_replication_slots WHERE slot_name = '%s'",
|
||||
NameStr(cc->fd.operation_id));
|
||||
dist_res =
|
||||
ts_dist_cmd_invoke_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
|
||||
res = ts_dist_cmd_get_result_by_node_name(dist_res, NameStr(cc->fd.source_node_name));
|
||||
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
|
||||
|
||||
/* Drop replication slot on the source data node only if it exists */
|
||||
if (PQntuples(res) != 0)
|
||||
{
|
||||
cmd = psprintf("SELECT pg_drop_replication_slot('%s')", NameStr(cc->fd.operation_id));
|
||||
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
|
||||
}
|
||||
|
||||
ts_dist_cmd_close_response(dist_res);
|
||||
}
|
||||
|
||||
static void
|
||||
chunk_copy_stage_create_publication_cleanup(ChunkCopy *cc)
|
||||
{
|
||||
char *cmd;
|
||||
DistCmdResult *dist_res;
|
||||
PGresult *res;
|
||||
|
||||
/*
|
||||
* Check if the replication slot exists and clean it up if so. This might
|
||||
* happen if there's a failure in the create_replication_slot stage but
|
||||
* PG might end up creating the slot even though we issued a ROLLBACK
|
||||
*/
|
||||
chunk_copy_stage_create_replication_slot_cleanup(cc);
|
||||
|
||||
/* Check if the publication exists on the source data node */
|
||||
cmd = psprintf("SELECT 1 FROM pg_catalog.pg_publication WHERE pubname = '%s'",
|
||||
NameStr(cc->fd.operation_id));
|
||||
dist_res =
|
||||
ts_dist_cmd_invoke_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
|
||||
res = ts_dist_cmd_get_result_by_node_name(dist_res, NameStr(cc->fd.source_node_name));
|
||||
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
|
||||
|
||||
/* Drop publication on the source node only if it exists */
|
||||
if (PQntuples(res) != 0)
|
||||
{
|
||||
cmd = psprintf("DROP PUBLICATION %s", NameStr(cc->fd.operation_id));
|
||||
|
||||
/* Drop the publication */
|
||||
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
|
||||
}
|
||||
|
||||
ts_dist_cmd_close_response(dist_res);
|
||||
}
|
||||
|
||||
static void
|
||||
chunk_copy_stage_create_subscription(ChunkCopy *cc)
|
||||
{
|
||||
@ -441,6 +531,43 @@ chunk_copy_stage_create_subscription(ChunkCopy *cc)
|
||||
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
|
||||
}
|
||||
|
||||
static void
|
||||
chunk_copy_stage_create_subscription_cleanup(ChunkCopy *cc)
|
||||
{
|
||||
char *cmd;
|
||||
DistCmdResult *dist_res;
|
||||
PGresult *res;
|
||||
|
||||
/* Check if the subscription exists on the destination data node */
|
||||
cmd = psprintf("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = '%s'",
|
||||
NameStr(cc->fd.operation_id));
|
||||
dist_res =
|
||||
ts_dist_cmd_invoke_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
|
||||
res = ts_dist_cmd_get_result_by_node_name(dist_res, NameStr(cc->fd.dest_node_name));
|
||||
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
|
||||
|
||||
/* Cleanup only if the subscription exists */
|
||||
if (PQntuples(res) != 0)
|
||||
{
|
||||
List *nodes = list_make1(NameStr(cc->fd.dest_node_name));
|
||||
|
||||
/* Disassociate the subscription from the replication slot first */
|
||||
cmd =
|
||||
psprintf("ALTER SUBSCRIPTION %s SET (slot_name = NONE)", NameStr(cc->fd.operation_id));
|
||||
ts_dist_cmd_run_on_data_nodes(cmd, nodes, true);
|
||||
|
||||
/* Drop the subscription now */
|
||||
pfree(cmd);
|
||||
cmd = psprintf("DROP SUBSCRIPTION %s", NameStr(cc->fd.operation_id));
|
||||
ts_dist_cmd_run_on_data_nodes(cmd, nodes, true);
|
||||
}
|
||||
|
||||
ts_dist_cmd_close_response(dist_res);
|
||||
}
|
||||
|
||||
static void
|
||||
chunk_copy_stage_sync_start(ChunkCopy *cc)
|
||||
{
|
||||
@ -451,6 +578,35 @@ chunk_copy_stage_sync_start(ChunkCopy *cc)
|
||||
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
|
||||
}
|
||||
|
||||
static void
|
||||
chunk_copy_stage_sync_start_cleanup(ChunkCopy *cc)
|
||||
{
|
||||
char *cmd;
|
||||
DistCmdResult *dist_res;
|
||||
PGresult *res;
|
||||
|
||||
/* Check if the subscription exists on the destination data node */
|
||||
cmd = psprintf("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = '%s'",
|
||||
NameStr(cc->fd.operation_id));
|
||||
dist_res =
|
||||
ts_dist_cmd_invoke_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
|
||||
res = ts_dist_cmd_get_result_by_node_name(dist_res, NameStr(cc->fd.dest_node_name));
|
||||
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
|
||||
|
||||
/* Alter subscription only if it exists */
|
||||
if (PQntuples(res) != 0)
|
||||
{
|
||||
/* Stop data transfer on the destination node */
|
||||
cmd = psprintf("ALTER SUBSCRIPTION %s DISABLE", NameStr(cc->fd.operation_id));
|
||||
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
|
||||
}
|
||||
|
||||
ts_dist_cmd_close_response(dist_res);
|
||||
}
|
||||
|
||||
static void
|
||||
chunk_copy_stage_sync(ChunkCopy *cc)
|
||||
{
|
||||
@ -499,7 +655,10 @@ chunk_copy_stage_drop_subscription(ChunkCopy *cc)
|
||||
static void
|
||||
chunk_copy_stage_drop_publication(ChunkCopy *cc)
|
||||
{
|
||||
const char *cmd;
|
||||
char *cmd;
|
||||
|
||||
cmd = psprintf("SELECT pg_drop_replication_slot('%s')", NameStr(cc->fd.operation_id));
|
||||
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
|
||||
|
||||
cmd = psprintf("DROP PUBLICATION %s", NameStr(cc->fd.operation_id));
|
||||
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
|
||||
@ -553,31 +712,63 @@ chunk_copy_stage_delete_chunk(ChunkCopy *cc)
|
||||
}
|
||||
|
||||
static const ChunkCopyStage chunk_copy_stages[] = {
|
||||
/* Initial Marker */
|
||||
{ CCS_INIT, chunk_copy_stage_init, chunk_copy_stage_init_cleanup },
|
||||
|
||||
/* Create empty chunk table on the dst node */
|
||||
{ CCS_CREATE_EMPTY_CHUNK, chunk_copy_stage_create_empty_chunk },
|
||||
/*
|
||||
* Create empty chunk table on the dst node.
|
||||
* The corresponding cleanup function should just delete this empty chunk.
|
||||
*/
|
||||
{ CCS_CREATE_EMPTY_CHUNK,
|
||||
chunk_copy_stage_create_empty_chunk,
|
||||
chunk_copy_stage_create_empty_chunk_cleanup },
|
||||
|
||||
/* Setup logical replication between nodes */
|
||||
{ CCS_CREATE_PUBLICATION, chunk_copy_stage_create_publication },
|
||||
{ CCS_CREATE_REPLICATION_SLOT, chunk_copy_stage_create_replication_slot },
|
||||
{ CCS_CREATE_SUBSCRIPTION, chunk_copy_stage_create_subscription },
|
||||
/*
|
||||
* Setup logical replication between nodes.
|
||||
* The corresponding cleanup functions should drop the subscription and
|
||||
* remove the replication slot followed by dropping of the publication on
|
||||
* the source data node.
|
||||
*/
|
||||
{ CCS_CREATE_PUBLICATION,
|
||||
chunk_copy_stage_create_publication,
|
||||
chunk_copy_stage_create_publication_cleanup },
|
||||
{ CCS_CREATE_REPLICATION_SLOT,
|
||||
chunk_copy_stage_create_replication_slot,
|
||||
chunk_copy_stage_create_replication_slot_cleanup },
|
||||
{ CCS_CREATE_SUBSCRIPTION,
|
||||
chunk_copy_stage_create_subscription,
|
||||
chunk_copy_stage_create_subscription_cleanup },
|
||||
|
||||
/* Begin data transfer and wait for completion */
|
||||
{ CCS_SYNC_START, chunk_copy_stage_sync_start },
|
||||
{ CCS_SYNC, chunk_copy_stage_sync },
|
||||
/*
|
||||
* Begin data transfer and wait for completion.
|
||||
* The corresponding cleanup function should just disable the subscription so
|
||||
* that earlier steps above can drop the subcription/publication cleanly.
|
||||
*/
|
||||
{ CCS_SYNC_START, chunk_copy_stage_sync_start, chunk_copy_stage_sync_start_cleanup },
|
||||
{ CCS_SYNC, chunk_copy_stage_sync, NULL },
|
||||
|
||||
/* Cleanup */
|
||||
{ CCS_DROP_PUBLICATION, chunk_copy_stage_drop_publication },
|
||||
{ CCS_DROP_SUBSCRIPTION, chunk_copy_stage_drop_subscription },
|
||||
/*
|
||||
* Cleanup. Nothing else required via the cleanup functions.
|
||||
*/
|
||||
{ CCS_DROP_SUBSCRIPTION, chunk_copy_stage_drop_subscription, NULL },
|
||||
{ CCS_DROP_PUBLICATION, chunk_copy_stage_drop_publication, NULL },
|
||||
|
||||
/* Attach chunk to the hypertable on the dst_node */
|
||||
{ CCS_ATTACH_CHUNK, chunk_copy_stage_attach_chunk },
|
||||
/*
|
||||
* Attach chunk to the hypertable on the dst_node.
|
||||
* The operation has succeeded from the destination data node perspective.
|
||||
* No cleanup required here.
|
||||
*/
|
||||
{ CCS_ATTACH_CHUNK, chunk_copy_stage_attach_chunk, NULL },
|
||||
|
||||
/* Maybe delete chunk from the src_node (move operation) */
|
||||
{ CCS_DELETE_CHUNK, chunk_copy_stage_delete_chunk },
|
||||
/*
|
||||
* Maybe delete chunk from the src_node (move operation).
|
||||
* Again, everything ok, so no cleanup required, we probably shouldn't be
|
||||
* seeing this entry in the catalog table because the operation has succeeded.
|
||||
*/
|
||||
{ CCS_DELETE_CHUNK, chunk_copy_stage_delete_chunk, NULL },
|
||||
|
||||
/* Done */
|
||||
{ NULL, NULL }
|
||||
/* Done Marker */
|
||||
{ NULL, NULL, NULL }
|
||||
};
|
||||
|
||||
static void
|
||||
@ -598,7 +789,9 @@ chunk_copy_execute(ChunkCopy *cc)
|
||||
cc->stage->function(cc);
|
||||
|
||||
/* Mark current stage as completed and update the catalog */
|
||||
chunk_copy_activity_update(cc);
|
||||
chunk_copy_operation_update(cc);
|
||||
|
||||
DEBUG_ERROR_INJECTION(stage->name);
|
||||
|
||||
CommitTransactionCommand();
|
||||
}
|
||||
@ -610,8 +803,8 @@ chunk_copy(Oid chunk_relid, const char *src_node, const char *dst_node, bool del
|
||||
ChunkCopy cc;
|
||||
const MemoryContext oldcontext = CurrentMemoryContext;
|
||||
|
||||
/* Populate copy structure and insert initial catalog entry */
|
||||
chunk_copy_init(&cc, chunk_relid, src_node, dst_node, delete_on_src_node);
|
||||
/* Populate copy structure */
|
||||
chunk_copy_setup(&cc, chunk_relid, src_node, dst_node, delete_on_src_node);
|
||||
|
||||
/* Execute chunk copy in separate stages */
|
||||
PG_TRY();
|
||||
@ -624,13 +817,185 @@ chunk_copy(Oid chunk_relid, const char *src_node, const char *dst_node, bool del
|
||||
ErrorData *edata;
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
edata = CopyErrorData();
|
||||
edata->hint =
|
||||
psprintf("chunk copy operation id: %d (%s).", cc.fd.id, NameStr(cc.fd.operation_id));
|
||||
edata->detail = psprintf("Chunk copy operation id: %s.", NameStr(cc.fd.operation_id));
|
||||
FlushErrorState();
|
||||
ReThrowError(edata);
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
/* Cleanup and delete the catalog entry */
|
||||
chunk_copy_cleanup(&cc);
|
||||
/* Finish up and delete the catalog entry */
|
||||
chunk_copy_finish(&cc);
|
||||
}
|
||||
|
||||
static ScanTupleResult
|
||||
chunk_copy_operation_tuple_found(TupleInfo *ti, void *const data)
|
||||
{
|
||||
ChunkCopy **cc = data;
|
||||
|
||||
*cc = STRUCT_FROM_SLOT(ti->slot, ti->mctx, ChunkCopy, FormData_chunk_copy_operation);
|
||||
return SCAN_CONTINUE;
|
||||
}
|
||||
|
||||
static ChunkCopy *
|
||||
chunk_copy_operation_get(const char *operation_id)
|
||||
{
|
||||
ScanKeyData scankeys[1];
|
||||
ChunkCopy *cc = NULL;
|
||||
int indexid;
|
||||
MemoryContext old, mcxt;
|
||||
|
||||
/* Objects need to be in long lived context */
|
||||
mcxt =
|
||||
AllocSetContextCreate(PortalContext, "chunk copy cleanup activity", ALLOCSET_DEFAULT_SIZES);
|
||||
old = MemoryContextSwitchTo(mcxt);
|
||||
|
||||
if (operation_id != NULL)
|
||||
{
|
||||
ScanKeyInit(&scankeys[0],
|
||||
Anum_chunk_copy_operation_idx_operation_id,
|
||||
BTEqualStrategyNumber,
|
||||
F_NAMEEQ,
|
||||
DirectFunctionCall1(namein, CStringGetDatum(operation_id)));
|
||||
indexid = CHUNK_COPY_OPERATION_PKEY_IDX;
|
||||
}
|
||||
else
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("invalid chunk copy operation identifier")));
|
||||
|
||||
ts_catalog_scan_one(CHUNK_COPY_OPERATION,
|
||||
indexid,
|
||||
scankeys,
|
||||
1,
|
||||
chunk_copy_operation_tuple_found,
|
||||
AccessShareLock,
|
||||
CHUNK_COPY_OPERATION_TABLE_NAME,
|
||||
&cc);
|
||||
|
||||
/*
|
||||
* If a valid entry is returned then fill up the rest of the fields in the
|
||||
* ChunkCopy structure
|
||||
*/
|
||||
if (cc)
|
||||
{
|
||||
cc->mcxt = mcxt;
|
||||
cc->chunk = ts_chunk_get_by_id(cc->fd.chunk_id, true);
|
||||
cc->stage = NULL;
|
||||
|
||||
/* No other sanity checks need to be performed since they were done earlier */
|
||||
|
||||
/* Setup the src_node */
|
||||
cc->src_server =
|
||||
data_node_get_foreign_server(NameStr(cc->fd.source_node_name), ACL_USAGE, true, false);
|
||||
Assert(NULL != cc->src_server);
|
||||
|
||||
/* Setup the dst_node */
|
||||
cc->dst_server =
|
||||
data_node_get_foreign_server(NameStr(cc->fd.dest_node_name), ACL_USAGE, true, false);
|
||||
Assert(NULL != cc->dst_server);
|
||||
}
|
||||
|
||||
MemoryContextSwitchTo(old);
|
||||
|
||||
if (cc == NULL)
|
||||
/* No entry found, long lived context not required */
|
||||
MemoryContextDelete(mcxt);
|
||||
|
||||
return cc;
|
||||
}
|
||||
|
||||
static void
|
||||
chunk_copy_cleanup_internal(ChunkCopy *cc, int stage_idx)
|
||||
{
|
||||
bool first = true;
|
||||
|
||||
/* Cleanup each copy stage in a separate transaction */
|
||||
do
|
||||
{
|
||||
StartTransactionCommand();
|
||||
|
||||
cc->stage = &chunk_copy_stages[stage_idx];
|
||||
if (cc->stage->function_cleanup)
|
||||
cc->stage->function_cleanup(cc);
|
||||
|
||||
/* Mark stage as cleaned up and update the catalog */
|
||||
if (!first && stage_idx != 0)
|
||||
chunk_copy_operation_update(cc);
|
||||
else
|
||||
first = false;
|
||||
|
||||
CommitTransactionCommand();
|
||||
} while (--stage_idx >= 0);
|
||||
}
|
||||
|
||||
void
|
||||
chunk_copy_cleanup(const char *operation_id)
|
||||
{
|
||||
ChunkCopy *cc;
|
||||
const MemoryContext oldcontext = CurrentMemoryContext;
|
||||
const ChunkCopyStage *stage;
|
||||
bool found = false;
|
||||
int stage_idx;
|
||||
|
||||
if (!superuser())
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
(errmsg("must be superuser to cleanup a chunk copy operation"))));
|
||||
|
||||
if (dist_util_membership() != DIST_MEMBER_ACCESS_NODE)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("function must be run on the access node only")));
|
||||
|
||||
cc = chunk_copy_operation_get(operation_id);
|
||||
|
||||
if (cc == NULL)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("invalid chunk copy operation identifier. Entry not found")));
|
||||
|
||||
/* Identify the last completed stage for this activity. */
|
||||
stage_idx = 0;
|
||||
for (stage = &chunk_copy_stages[stage_idx]; stage->name != NULL;
|
||||
stage = &chunk_copy_stages[++stage_idx])
|
||||
{
|
||||
if (namestrcmp(&cc->fd.completed_stage, stage->name) == 0)
|
||||
{
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* should always find an entry, add ereport to quell compiler warning */
|
||||
Assert(found == true);
|
||||
if (!found)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("stage '%s' not found for copy chunk cleanup",
|
||||
NameStr(cc->fd.completed_stage))));
|
||||
|
||||
/* Commit to get out of starting transaction */
|
||||
PopActiveSnapshot();
|
||||
CommitTransactionCommand();
|
||||
|
||||
/* Run the corresponding cleanup steps to roll back the activity. */
|
||||
PG_TRY();
|
||||
{
|
||||
chunk_copy_cleanup_internal(cc, stage_idx);
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
/* Include chunk copy id to the error message */
|
||||
ErrorData *edata;
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
edata = CopyErrorData();
|
||||
edata->detail = psprintf("While cleaning up chunk copy operation id: %s.",
|
||||
NameStr(cc->fd.operation_id));
|
||||
FlushErrorState();
|
||||
ReThrowError(edata);
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
/* Finish up and delete the catalog entry */
|
||||
chunk_copy_finish(cc);
|
||||
}
|
||||
|
@ -8,5 +8,6 @@
|
||||
|
||||
extern void chunk_copy(Oid chunk_relid, const char *src_node, const char *dst_node,
|
||||
bool delete_on_src_node);
|
||||
extern void chunk_copy_cleanup(const char *operation_id);
|
||||
|
||||
#endif /* TIMESCALEDB_TSL_CHUNK_COPY_H */
|
||||
|
@ -121,6 +121,7 @@ CrossModuleFunctions tsl_cm_functions = {
|
||||
.move_chunk = tsl_move_chunk,
|
||||
.move_chunk_proc = tsl_move_chunk_proc,
|
||||
.copy_chunk_proc = tsl_copy_chunk_proc,
|
||||
.copy_chunk_cleanup_proc = tsl_copy_chunk_cleanup_proc,
|
||||
.partialize_agg = tsl_partialize_agg,
|
||||
.finalize_agg_sfunc = tsl_finalize_agg_sfunc,
|
||||
.finalize_agg_ffunc = tsl_finalize_agg_ffunc,
|
||||
|
@ -202,7 +202,7 @@ tsl_move_chunk(PG_FUNCTION_ARGS)
|
||||
* We use a procedure because multiple steps need to be performed via multiple
|
||||
* transactions across the access node and the two datanodes that are involved.
|
||||
* The progress of the various stages/steps are tracked in the
|
||||
* CHUNK_COPY_ACTIVITY catalog table
|
||||
* CHUNK_COPY_OPERATION catalog table
|
||||
*/
|
||||
static void
|
||||
tsl_copy_or_move_chunk_proc(FunctionCallInfo fcinfo, bool delete_on_src_node)
|
||||
@ -244,6 +244,27 @@ tsl_copy_chunk_proc(PG_FUNCTION_ARGS)
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
Datum
|
||||
tsl_copy_chunk_cleanup_proc(PG_FUNCTION_ARGS)
|
||||
{
|
||||
const char *operation_id = PG_ARGISNULL(0) ? NULL : NameStr(*PG_GETARG_NAME(0));
|
||||
|
||||
TS_PREVENT_FUNC_IF_READ_ONLY();
|
||||
|
||||
PreventInTransactionBlock(true, get_func_name(FC_FN_OID(fcinfo)));
|
||||
|
||||
/* valid input has to be provided */
|
||||
if (operation_id == NULL)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("invalid chunk copy operation id")));
|
||||
|
||||
/* perform the cleanup/repair depending on the stage */
|
||||
chunk_copy_cleanup(operation_id);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
void
|
||||
reorder_chunk(Oid chunk_id, Oid index_id, bool verbose, Oid wait_id, Oid destination_tablespace,
|
||||
Oid index_tablespace)
|
||||
|
@ -13,6 +13,7 @@ extern Datum tsl_reorder_chunk(PG_FUNCTION_ARGS);
|
||||
extern Datum tsl_move_chunk(PG_FUNCTION_ARGS);
|
||||
extern Datum tsl_move_chunk_proc(PG_FUNCTION_ARGS);
|
||||
extern Datum tsl_copy_chunk_proc(PG_FUNCTION_ARGS);
|
||||
extern Datum tsl_copy_chunk_cleanup_proc(PG_FUNCTION_ARGS);
|
||||
extern void reorder_chunk(Oid chunk_id, Oid index_id, bool verbose, Oid wait_id,
|
||||
Oid destination_tablespace, Oid index_tablespace);
|
||||
|
||||
|
@ -8,12 +8,12 @@ use warnings;
|
||||
use AccessNode;
|
||||
use DataNode;
|
||||
use TestLib;
|
||||
use Test::More tests => 21;
|
||||
use Test::More tests => 272;
|
||||
|
||||
#Initialize all the multi-node instances
|
||||
my $an = AccessNode->create('an');
|
||||
my $dn1 = DataNode->create('dn1', allows_streaming => 'logical');
|
||||
my $dn2 = DataNode->create('dn2');
|
||||
my $dn2 = DataNode->create('dn2', allows_streaming => 'logical');
|
||||
|
||||
$an->add_data_node($dn1);
|
||||
$an->add_data_node($dn2);
|
||||
@ -28,33 +28,59 @@ $an->safe_psql(
|
||||
]);
|
||||
|
||||
#Check that chunks are shown appropriately on all nodes of the multi-node setup
|
||||
my $query = q[SELECT * from show_chunks('test');];
|
||||
my $query = q[SELECT * from show_chunks('test');];
|
||||
my $operation_id = "ts_copy_1_1";
|
||||
|
||||
#Query Access node
|
||||
$an->psql_is(
|
||||
'postgres', $query, q[_timescaledb_internal._dist_hyper_1_1_chunk
|
||||
_timescaledb_internal._dist_hyper_1_2_chunk
|
||||
_timescaledb_internal._dist_hyper_1_3_chunk
|
||||
_timescaledb_internal._dist_hyper_1_4_chunk], 'AN shows correct set of chunks'
|
||||
#Check chunk states before the move
|
||||
check_pre_move_chunk_states();
|
||||
|
||||
#Setup the error injection function on the AN
|
||||
my $extversion = $an->safe_psql('postgres',
|
||||
"SELECT extversion from pg_catalog.pg_extension WHERE extname = 'timescaledb'"
|
||||
);
|
||||
|
||||
#Query datanode1
|
||||
$dn1->psql_is(
|
||||
$an->safe_psql(
|
||||
'postgres',
|
||||
$query,
|
||||
"_timescaledb_internal._dist_hyper_1_1_chunk\n_timescaledb_internal._dist_hyper_1_3_chunk\n_timescaledb_internal._dist_hyper_1_4_chunk",
|
||||
'DN1 shows correct set of chunks');
|
||||
qq[
|
||||
CREATE OR REPLACE FUNCTION error_injection_on(TEXT) RETURNS VOID LANGUAGE C VOLATILE STRICT
|
||||
AS 'timescaledb-$extversion', 'ts_debug_point_enable';
|
||||
]);
|
||||
|
||||
#Check contents on the chunk on DN1
|
||||
$dn1->psql_is('postgres',
|
||||
"SELECT sum(device) FROM _timescaledb_internal._dist_hyper_1_1_chunk",
|
||||
qq[406], "DN1 has correct contents in the chunk");
|
||||
#Induce errors in various stages in the chunk move activity and ensure that the
|
||||
#cleanup function restores things to the previous sane state
|
||||
|
||||
#Query datanode2
|
||||
$dn2->psql_is(
|
||||
'postgres', $query,
|
||||
"_timescaledb_internal._dist_hyper_1_2_chunk",
|
||||
'DN2 shows correct set of chunks');
|
||||
my @stages =
|
||||
qw(init create_empty_chunk create_publication create_replication_slot create_subscription sync_start sync drop_publication drop_subscription attach_chunk delete_chunk);
|
||||
|
||||
my ($stdout, $stderr, $ret);
|
||||
my $curr_index = 1;
|
||||
my $arrSize = @stages;
|
||||
|
||||
while ($curr_index < $arrSize)
|
||||
{
|
||||
#Enable the error at each stage
|
||||
#Call the move_chunk procedure which should error out now
|
||||
($ret, $stdout, $stderr) = $an->psql('postgres',
|
||||
"SELECT error_injection_on('$stages[$curr_index]'); CALL timescaledb_experimental.move_chunk(chunk=>'_timescaledb_internal._dist_hyper_1_1_chunk', source_node=> 'dn1', destination_node => 'dn2');"
|
||||
);
|
||||
is($ret, 3,
|
||||
"move_chunk fails as expected in stage '$stages[$curr_index]'");
|
||||
like(
|
||||
$stderr,
|
||||
qr/ERROR: error injected at debug point '$stages[$curr_index]'/,
|
||||
'failure in expected stage');
|
||||
|
||||
#The earlier debug error point gets released automatically since it's a session lock
|
||||
#Call the cleanup procedure to make things right
|
||||
$operation_id = "ts_copy_" . $curr_index . "_1";
|
||||
$an->safe_psql('postgres',
|
||||
"CALL timescaledb_experimental.cleanup_copy_chunk_operation(operation_id=>'$operation_id');"
|
||||
);
|
||||
|
||||
#Check chunk state is as before the move
|
||||
check_pre_move_chunk_states();
|
||||
|
||||
$curr_index++;
|
||||
}
|
||||
|
||||
#Move chunk _timescaledb_internal._dist_hyper_1_1_chunk to DN2 from AN
|
||||
$an->safe_psql('postgres',
|
||||
@ -82,7 +108,96 @@ $dn2->psql_is(
|
||||
"_timescaledb_internal._dist_hyper_1_2_chunk\n_timescaledb_internal._dist_hyper_1_1_chunk",
|
||||
'DN2 shows correct set of chunks');
|
||||
|
||||
#Copy chunk _timescaledb_internal._dist_hyper_1_1_chunk to DN1 from DN2
|
||||
$an->safe_psql('postgres',
|
||||
"CALL timescaledb_experimental.copy_chunk(chunk=>'_timescaledb_internal._dist_hyper_1_1_chunk', source_node=> 'dn2', destination_node => 'dn1')"
|
||||
);
|
||||
|
||||
#Query datanode1 after the above copy
|
||||
$dn1->psql_is(
|
||||
'postgres',
|
||||
$query,
|
||||
"_timescaledb_internal._dist_hyper_1_3_chunk\n_timescaledb_internal._dist_hyper_1_4_chunk\n_timescaledb_internal._dist_hyper_1_1_chunk",
|
||||
'DN1 shows correct set of chunks after the copy');
|
||||
|
||||
#Check contents on the chunk on DN2, after the copy
|
||||
$dn1->psql_is(
|
||||
'postgres',
|
||||
"SELECT sum(device) FROM _timescaledb_internal._dist_hyper_1_1_chunk",
|
||||
qq[406],
|
||||
"DN1 has correct contents after the copy in the chunk");
|
||||
|
||||
#Check contents on the chunk on DN2, after the copy
|
||||
$dn2->psql_is(
|
||||
'postgres',
|
||||
"SELECT sum(device) FROM _timescaledb_internal._dist_hyper_1_1_chunk",
|
||||
qq[406],
|
||||
"DN2 has correct contents after the copy in the chunk");
|
||||
|
||||
#Query datanode2
|
||||
$dn2->psql_is(
|
||||
'postgres',
|
||||
$query,
|
||||
"_timescaledb_internal._dist_hyper_1_2_chunk\n_timescaledb_internal._dist_hyper_1_1_chunk",
|
||||
'DN2 shows correct set of chunks after the copy');
|
||||
|
||||
done_testing();
|
||||
|
||||
#Check the following
|
||||
#1) chunk is still on "dn1",
|
||||
#2) there's no entry on "dn2",
|
||||
#3) there are no left over replication slots and publications on "dn1",
|
||||
#4) there is no subscription on "dn2"
|
||||
sub check_pre_move_chunk_states
|
||||
{
|
||||
#Query Access node
|
||||
$an->psql_is(
|
||||
'postgres', $query, q[_timescaledb_internal._dist_hyper_1_1_chunk
|
||||
_timescaledb_internal._dist_hyper_1_2_chunk
|
||||
_timescaledb_internal._dist_hyper_1_3_chunk
|
||||
_timescaledb_internal._dist_hyper_1_4_chunk], 'AN shows correct set of chunks'
|
||||
);
|
||||
|
||||
#Query datanode1
|
||||
$dn1->psql_is(
|
||||
'postgres',
|
||||
$query,
|
||||
"_timescaledb_internal._dist_hyper_1_1_chunk\n_timescaledb_internal._dist_hyper_1_3_chunk\n_timescaledb_internal._dist_hyper_1_4_chunk",
|
||||
'DN1 shows correct set of chunks');
|
||||
|
||||
#Check contents on the chunk on DN1
|
||||
$dn1->psql_is(
|
||||
'postgres',
|
||||
"SELECT sum(device) FROM _timescaledb_internal._dist_hyper_1_1_chunk",
|
||||
qq[406],
|
||||
"DN1 has correct contents in the chunk");
|
||||
|
||||
#Query datanode2
|
||||
$dn2->psql_is(
|
||||
'postgres', $query,
|
||||
"_timescaledb_internal._dist_hyper_1_2_chunk",
|
||||
'DN2 shows correct set of chunks');
|
||||
|
||||
#Check that there is no replication slot on datanode1
|
||||
$dn1->psql_is(
|
||||
'postgres',
|
||||
"SELECT 1 FROM pg_catalog.pg_replication_slots WHERE slot_name = '$operation_id'",
|
||||
"",
|
||||
'DN1 doesn\'t have left over replication slots');
|
||||
|
||||
#Check that there is no publication on datanode1
|
||||
$dn1->psql_is(
|
||||
'postgres',
|
||||
"SELECT 1 FROM pg_catalog.pg_publication WHERE pubname = '$operation_id'",
|
||||
"",
|
||||
'DN1 doesn\'t have left over publication');
|
||||
|
||||
#Check that there is no subscription on datanode2
|
||||
$dn2->psql_is(
|
||||
'postgres',
|
||||
"SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = '$operation_id'",
|
||||
"",
|
||||
'DN2 doesn\'t have left over subscription');
|
||||
}
|
||||
|
||||
1;
|
||||
|
@ -1,4 +1,9 @@
|
||||
set(PROVE_TEST_FILES 001_simple_multinode.pl 002_chunk_copy_move.pl)
|
||||
set(PROVE_TEST_FILES 001_simple_multinode.pl)
|
||||
set(PROVE_DEBUG_TEST_FILES 002_chunk_copy_move.pl)
|
||||
|
||||
if(CMAKE_BUILD_TYPE MATCHES Debug)
|
||||
list(APPEND PROVE_TEST_FILES ${PROVE_DEBUG_TEST_FILES})
|
||||
endif(CMAKE_BUILD_TYPE MATCHES Debug)
|
||||
|
||||
foreach(P_FILE ${PROVE_TEST_FILES})
|
||||
configure_file(${P_FILE} ${CMAKE_CURRENT_BINARY_DIR}/${P_FILE} COPYONLY)
|
||||
|
Loading…
x
Reference in New Issue
Block a user