From 6e5d68718467866c5637e72b65f845b0335ac36e Mon Sep 17 00:00:00 2001 From: gayyappan Date: Tue, 5 Sep 2023 15:05:12 -0400 Subject: [PATCH] Add drop_chunks hook for OSM Introduce version number for OsmCallbacks struct Add a callback for cascading drop chunks to OSM and integrate with drop_chunks Add backward compatibility for OsmCallbacks --- .unreleased/PR_6067 | 1 + src/chunk.c | 45 +++++++++++-- src/hypertable.c | 7 +-- src/osm_callbacks.c | 64 ++++++++++++++++++- src/osm_callbacks.h | 20 +++++- tsl/test/expected/chunk_utils_internal.out | 73 ++++++++++++++++------ tsl/test/sql/chunk_utils_internal.sql | 35 ++++++----- tsl/test/src/test_chunk_stats.c | 30 +++++++-- 8 files changed, 224 insertions(+), 51 deletions(-) create mode 100644 .unreleased/PR_6067 diff --git a/.unreleased/PR_6067 b/.unreleased/PR_6067 new file mode 100644 index 000000000..18229a13e --- /dev/null +++ b/.unreleased/PR_6067 @@ -0,0 +1 @@ +Implements: #6067 Adds drop_chunks hook for OSM diff --git a/src/chunk.c b/src/chunk.c index c568b6789..0c4fa0646 100644 --- a/src/chunk.c +++ b/src/chunk.c @@ -1159,9 +1159,9 @@ chunk_create_from_hypercube_after_lock(const Hypertable *ht, Hypercube *cube, const char *prefix) { #if PG14_GE - OsmCallbacks *callbacks = ts_get_osm_callbacks(); + chunk_insert_check_hook_type osm_chunk_insert_hook = ts_get_osm_chunk_insert_hook(); - if (callbacks) + if (osm_chunk_insert_hook) { /* OSM only uses first dimension . doesn't work with multinode tables yet*/ Dimension *dim = &ht->space->dimensions[0]; @@ -1171,8 +1171,7 @@ chunk_create_from_hypercube_after_lock(const Hypertable *ht, Hypercube *cube, int64 range_end = ts_internal_to_time_int64(cube->slices[0]->fd.range_end, dim->fd.column_type); - int chunk_exists = - callbacks->chunk_insert_check_hook(ht->main_table_relid, range_start, range_end); + int chunk_exists = osm_chunk_insert_hook(ht->main_table_relid, range_start, range_end); if (chunk_exists) { @@ -3961,6 +3960,7 @@ ts_chunk_do_drop_chunks(Hypertable *ht, int64 older_than, int64 newer_than, int3 DEBUG_WAITPOINT("drop_chunks_chunks_found"); + int32 osm_chunk_id = ts_chunk_get_osm_chunk_id(ht->fd.id); if (has_continuous_aggs) { /* Exclusively lock all chunks, and invalidate the continuous @@ -3990,6 +3990,11 @@ ts_chunk_do_drop_chunks(Hypertable *ht, int64 older_than, int64 newer_than, int3 * therefore be able to refresh accordingly.*/ for (uint64 i = 0; i < num_chunks; i++) { + if (osm_chunk_id == chunks[i].fd.id) + { + // we do not rebuild continuous aggs if tiered data is dropped */ + continue; + } int64 start = ts_chunk_primary_dimension_start(&chunks[i]); int64 end = ts_chunk_primary_dimension_end(&chunks[i]); @@ -4009,8 +4014,11 @@ ts_chunk_do_drop_chunks(Hypertable *ht, int64 older_than, int64 newer_than, int3 /* frozen chunks are skipped. Not dropped. */ if (!ts_chunk_validate_chunk_status_for_operation(&chunks[i], CHUNK_DROP, - false /*throw_error */)) + false /*throw_error */) || + osm_chunk_id == chunks[i].fd.id) + { continue; + } /* store chunk name for output */ schema_name = quote_identifier(chunks[i].fd.schema_name.data); @@ -4031,6 +4039,33 @@ ts_chunk_do_drop_chunks(Hypertable *ht, int64 older_than, int64 newer_than, int3 data_nodes = list_append_unique_oid(data_nodes, cdn->foreign_server_oid); } } + // if we have tiered chunks cascade drop to tiering layer as well +#if PG14_GE + + if (osm_chunk_id != INVALID_CHUNK_ID) + { + hypertable_drop_chunks_hook_type osm_drop_chunks_hook = + ts_get_osm_hypertable_drop_chunks_hook(); + if (osm_drop_chunks_hook) + { + ListCell *lc; + Dimension *dim = &ht->space->dimensions[0]; + /* convert to PG timestamp from timescaledb internal format */ + int64 range_start = ts_internal_to_time_int64(newer_than, dim->fd.column_type); + int64 range_end = ts_internal_to_time_int64(older_than, dim->fd.column_type); + Chunk *osm_chunk = ts_chunk_get_by_id(osm_chunk_id, true); + List *osm_dropped_names = osm_drop_chunks_hook(osm_chunk->table_id, + NameStr(ht->fd.schema_name), + NameStr(ht->fd.table_name), + range_start, + range_end); + foreach (lc, osm_dropped_names) + { + dropped_chunk_names = lappend(dropped_chunk_names, lfirst(lc)); + } + } + } +#endif /* When dropping chunks for a given CAgg then force set the watermark */ if (is_materialization_hypertable) diff --git a/src/hypertable.c b/src/hypertable.c index b03894260..332b4cc79 100644 --- a/src/hypertable.c +++ b/src/hypertable.c @@ -685,16 +685,15 @@ hypertable_tuple_delete(TupleInfo *ti, void *data) } #if PG14_GE - OsmCallbacks *callbacks = ts_get_osm_callbacks(); - + hypertable_drop_hook_type osm_htdrop_hook = ts_get_osm_hypertable_drop_hook(); /* Invoke the OSM callback if set */ - if (callbacks) + if (osm_htdrop_hook) { Name schema_name = DatumGetName(slot_getattr(ti->slot, Anum_hypertable_schema_name, &isnull)); Name table_name = DatumGetName(slot_getattr(ti->slot, Anum_hypertable_table_name, &isnull)); - callbacks->hypertable_drop_hook(NameStr(*schema_name), NameStr(*table_name)); + osm_htdrop_hook(NameStr(*schema_name), NameStr(*table_name)); } #endif diff --git a/src/osm_callbacks.c b/src/osm_callbacks.c index e9af4aaf9..c072ad237 100644 --- a/src/osm_callbacks.c +++ b/src/osm_callbacks.c @@ -7,12 +7,70 @@ #include -#define OSM_CALLBACKS_VAR_NAME "osm_callbacks" +#define OSM_CALLBACKS "osm_callbacks" +#define OSM_CALLBACKS_VAR_NAME "osm_callbacks_versioned" -OsmCallbacks * +static OsmCallbacks_Versioned * ts_get_osm_callbacks(void) { - OsmCallbacks **ptr = (OsmCallbacks **) find_rendezvous_variable(OSM_CALLBACKS_VAR_NAME); + OsmCallbacks_Versioned **ptr = + (OsmCallbacks_Versioned **) find_rendezvous_variable(OSM_CALLBACKS_VAR_NAME); return *ptr; } + +/* This interface and version of the struct will be removed once we have a new version of OSM on all + * instances + */ +static OsmCallbacks * +ts_get_osm_callbacks_old(void) +{ + OsmCallbacks **ptr = (OsmCallbacks **) find_rendezvous_variable(OSM_CALLBACKS); + + return *ptr; +} + +chunk_insert_check_hook_type +ts_get_osm_chunk_insert_hook() +{ + OsmCallbacks_Versioned *ptr = ts_get_osm_callbacks(); + if (ptr) + { + if (ptr->version_num == 1) + return ptr->chunk_insert_check_hook; + } + else + { + OsmCallbacks *ptr_old = ts_get_osm_callbacks_old(); + if (ptr_old) + return ptr_old->chunk_insert_check_hook; + } + return NULL; +} + +hypertable_drop_hook_type +ts_get_osm_hypertable_drop_hook() +{ + OsmCallbacks_Versioned *ptr = ts_get_osm_callbacks(); + if (ptr) + { + if (ptr->version_num == 1) + return ptr->hypertable_drop_hook; + } + else + { + OsmCallbacks *ptr_old = ts_get_osm_callbacks_old(); + if (ptr_old) + return ptr->hypertable_drop_hook; + } + return NULL; +} + +hypertable_drop_chunks_hook_type +ts_get_osm_hypertable_drop_chunks_hook() +{ + OsmCallbacks_Versioned *ptr = ts_get_osm_callbacks(); + if (ptr && ptr->version_num == 1) + return ptr->hypertable_drop_chunks_hook; + return NULL; +} diff --git a/src/osm_callbacks.h b/src/osm_callbacks.h index dada88629..dbf1ea07d 100644 --- a/src/osm_callbacks.h +++ b/src/osm_callbacks.h @@ -9,8 +9,13 @@ #include #include +/* range_start and range_end are in PG internal timestamp format. */ typedef int (*chunk_insert_check_hook_type)(Oid ht_oid, int64 range_start, int64 range_end); typedef void (*hypertable_drop_hook_type)(const char *schema_name, const char *table_name); +typedef List *(*hypertable_drop_chunks_hook_type)(Oid osm_chunk_oid, + const char *hypertable_schema_name, + const char *hypertable_name, int64 range_start, + int64 range_end); /* * Object Storage Manager callbacks. @@ -18,12 +23,25 @@ typedef void (*hypertable_drop_hook_type)(const char *schema_name, const char *t * chunk_insert_check_hook - checks whether the specified range is managed by OSM * hypertable_drop_hook - used for OSM catalog cleanups */ +/* This struct is retained for backward compatibility. We'll remove this in one + * of the upcoming releases + */ typedef struct { chunk_insert_check_hook_type chunk_insert_check_hook; hypertable_drop_hook_type hypertable_drop_hook; } OsmCallbacks; -extern OsmCallbacks *ts_get_osm_callbacks(void); +typedef struct +{ + int64 version_num; + chunk_insert_check_hook_type chunk_insert_check_hook; + hypertable_drop_hook_type hypertable_drop_hook; + hypertable_drop_chunks_hook_type hypertable_drop_chunks_hook; +} OsmCallbacks_Versioned; + +extern chunk_insert_check_hook_type ts_get_osm_chunk_insert_hook(void); +extern hypertable_drop_hook_type ts_get_osm_hypertable_drop_hook(void); +extern hypertable_drop_chunks_hook_type ts_get_osm_hypertable_drop_chunks_hook(void); #endif /* TIMESCALEDB_OSM_CALLBACKS_H */ diff --git a/tsl/test/expected/chunk_utils_internal.out b/tsl/test/expected/chunk_utils_internal.out index 8b41aedb8..232a77c2d 100644 --- a/tsl/test/expected/chunk_utils_internal.out +++ b/tsl/test/expected/chunk_utils_internal.out @@ -905,6 +905,59 @@ where conrelid = 'child_hyper_constr'::regclass ORDER BY 1; hyper_constr_temp_check (1 row) +--TEST retention policy is applied on OSM chunk by calling registered callback +CREATE OR REPLACE FUNCTION dummy_now_smallint() RETURNS BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 500::bigint' ; +SELECT set_integer_now_func('hyper_constr', 'dummy_now_smallint'); + set_integer_now_func +---------------------- + +(1 row) + +SELECT add_retention_policy('hyper_constr', 100::int) AS deljob_id \gset +--add hooks for osm callbacks that are triggered when drop_chunks is invoked--- +SELECT ts_setup_osm_hook(); + ts_setup_osm_hook +------------------- + +(1 row) + +BEGIN; +SELECT drop_chunks('hyper_constr', 10::int); +NOTICE: hypertable_drop_chunks_hook + drop_chunks +------------------------------ + _timescaledb_internal.dummy0 + _timescaledb_internal.dummy1 +(2 rows) + +SELECT id, table_name FROM _timescaledb_catalog.chunk +where hypertable_id = (Select id from _timescaledb_catalog.hypertable where table_name = 'hyper_constr'); + id | table_name +----+-------------------- + 13 | _hyper_7_13_chunk + 14 | child_hyper_constr +(2 rows) + +ROLLBACK; +CALL run_job(:deljob_id); +NOTICE: hypertable_drop_chunks_hook +CALL run_job(:deljob_id); +NOTICE: hypertable_drop_chunks_hook +SELECT chunk_name, range_start, range_end +FROM chunk_view +WHERE hypertable_name = 'hyper_constr' +ORDER BY chunk_name; + chunk_name | range_start | range_end +--------------------+-----------------------------------+------------------------------------ + child_hyper_constr | Wed Dec 31 16:00:00.0001 1969 PST | Wed Dec 31 16:00:00.00011 1969 PST +(1 row) + +SELECT ts_undo_osm_hook(); + ts_undo_osm_hook +------------------ + +(1 row) + ----- TESTS for copy into frozen chunk ------------ \c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER CREATE TABLE test1.copy_test ( @@ -1008,26 +1061,6 @@ SELECT indexname, tablename FROM pg_indexes WHERE indexname = 'hyper_constr_mid_ (1 row) DROP INDEX hyper_constr_mid_idx; ---TEST policy is applied on OSM chunk --- XXX this is to be updated once the hook for dropping chunks is added -CREATE OR REPLACE FUNCTION dummy_now_smallint() RETURNS BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 500::bigint' ; -SELECT set_integer_now_func('hyper_constr', 'dummy_now_smallint'); - set_integer_now_func ----------------------- - -(1 row) - -SELECT add_retention_policy('hyper_constr', 100::int) AS deljob_id \gset -CALL run_job(:deljob_id); -CALL run_job(:deljob_id); -SELECT chunk_name, range_start, range_end -FROM chunk_view -WHERE hypertable_name = 'hyper_constr' -ORDER BY chunk_name; - chunk_name | range_start | range_end -------------+-------------+----------- -(0 rows) - -- test range of dimension slice for osm chunk for different datatypes CREATE TABLE osm_int2(time int2 NOT NULL); CREATE TABLE osm_int4(time int4 NOT NULL); diff --git a/tsl/test/sql/chunk_utils_internal.sql b/tsl/test/sql/chunk_utils_internal.sql index df32680f7..59f3c02ce 100644 --- a/tsl/test/sql/chunk_utils_internal.sql +++ b/tsl/test/sql/chunk_utils_internal.sql @@ -508,6 +508,27 @@ SELECT * FROM hyper_constr order by time; SELECT conname FROM pg_constraint where conrelid = 'child_hyper_constr'::regclass ORDER BY 1; +--TEST retention policy is applied on OSM chunk by calling registered callback +CREATE OR REPLACE FUNCTION dummy_now_smallint() RETURNS BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 500::bigint' ; + +SELECT set_integer_now_func('hyper_constr', 'dummy_now_smallint'); +SELECT add_retention_policy('hyper_constr', 100::int) AS deljob_id \gset + +--add hooks for osm callbacks that are triggered when drop_chunks is invoked--- +SELECT ts_setup_osm_hook(); +BEGIN; +SELECT drop_chunks('hyper_constr', 10::int); +SELECT id, table_name FROM _timescaledb_catalog.chunk +where hypertable_id = (Select id from _timescaledb_catalog.hypertable where table_name = 'hyper_constr'); +ROLLBACK; +CALL run_job(:deljob_id); +CALL run_job(:deljob_id); +SELECT chunk_name, range_start, range_end +FROM chunk_view +WHERE hypertable_name = 'hyper_constr' +ORDER BY chunk_name; +SELECT ts_undo_osm_hook(); + ----- TESTS for copy into frozen chunk ------------ \c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER CREATE TABLE test1.copy_test ( @@ -585,20 +606,6 @@ CREATE INDEX hyper_constr_mid_idx ON hyper_constr(mid, time) WITH (timescaledb.t SELECT indexname, tablename FROM pg_indexes WHERE indexname = 'hyper_constr_mid_idx'; DROP INDEX hyper_constr_mid_idx; ---TEST policy is applied on OSM chunk --- XXX this is to be updated once the hook for dropping chunks is added -CREATE OR REPLACE FUNCTION dummy_now_smallint() RETURNS BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 500::bigint' ; - -SELECT set_integer_now_func('hyper_constr', 'dummy_now_smallint'); -SELECT add_retention_policy('hyper_constr', 100::int) AS deljob_id \gset - -CALL run_job(:deljob_id); -CALL run_job(:deljob_id); -SELECT chunk_name, range_start, range_end -FROM chunk_view -WHERE hypertable_name = 'hyper_constr' -ORDER BY chunk_name; - -- test range of dimension slice for osm chunk for different datatypes CREATE TABLE osm_int2(time int2 NOT NULL); CREATE TABLE osm_int4(time int4 NOT NULL); diff --git a/tsl/test/src/test_chunk_stats.c b/tsl/test/src/test_chunk_stats.c index 2b4fd28ee..1a815b051 100644 --- a/tsl/test/src/test_chunk_stats.c +++ b/tsl/test/src/test_chunk_stats.c @@ -38,6 +38,7 @@ ts_test_chunk_stats_insert(PG_FUNCTION_ARGS) typedef int (*chunk_insert_check_hook_type)(Oid, int64, int64); typedef void (*hypertable_drop_hook_type)(const char *, const char *); +typedef List *(*hypertable_drop_chunks_hook_type)(Oid, const char *, const char *, int64, int64); static int osm_insert_hook_mock(Oid ht_oid, int64 range_start, int64 range_end) @@ -53,8 +54,27 @@ osm_ht_drop_hook_mock(const char *schema_name, const char *table_name) elog(NOTICE, "hypertable_drop_hook"); } -OsmCallbacks fake_osm_callbacks = { .chunk_insert_check_hook = osm_insert_hook_mock, - .hypertable_drop_hook = osm_ht_drop_hook_mock }; +static List * +osm_ht_drop_chunks_hook_mock(Oid osm_chunk_oid, const char *schema_name, const char *table_name, + int64 range_start, int64 range_end) +{ + List *ret = NIL; + elog(NOTICE, "hypertable_drop_chunks_hook "); + for (int i = 0; i < 2; i++) + { + char *chunk_name; + chunk_name = psprintf("%s%d", "_timescaledb_internal.dummy", i); + ret = lappend(ret, chunk_name); + } + return ret; +} + +OsmCallbacks_Versioned fake_osm_callbacks = { + .version_num = 1, + .chunk_insert_check_hook = osm_insert_hook_mock, + .hypertable_drop_hook = osm_ht_drop_hook_mock, + .hypertable_drop_chunks_hook = osm_ht_drop_chunks_hook_mock, +}; /* * Dummy function to mock OSM_INSERT hook called at chunk creation for tiered data @@ -63,7 +83,8 @@ TS_FUNCTION_INFO_V1(ts_setup_osm_hook); Datum ts_setup_osm_hook(PG_FUNCTION_ARGS) { - OsmCallbacks **ptr = (OsmCallbacks **) find_rendezvous_variable("osm_callbacks"); + OsmCallbacks_Versioned **ptr = + (OsmCallbacks_Versioned **) find_rendezvous_variable("osm_callbacks_versioned"); *ptr = &fake_osm_callbacks; PG_RETURN_NULL(); @@ -73,7 +94,8 @@ TS_FUNCTION_INFO_V1(ts_undo_osm_hook); Datum ts_undo_osm_hook(PG_FUNCTION_ARGS) { - OsmCallbacks **ptr = (OsmCallbacks **) find_rendezvous_variable("osm_callbacks"); + OsmCallbacks_Versioned **ptr = + (OsmCallbacks_Versioned **) find_rendezvous_variable("osm_callbacks_versioned"); *ptr = NULL; PG_RETURN_NULL();