mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-18 11:45:11 +08:00
Add hooks for hypertable drops
To properly clean up the OSM catalog we need a way to reliably track hypertable deletion (including internal hypertables for CAGGS).
This commit is contained in:
parent
474b09bdfc
commit
4c0075010d
@ -31,6 +31,7 @@ set(SOURCES
|
|||||||
init.c
|
init.c
|
||||||
jsonb_utils.c
|
jsonb_utils.c
|
||||||
license_guc.c
|
license_guc.c
|
||||||
|
osm_callbacks.c
|
||||||
partitioning.c
|
partitioning.c
|
||||||
process_utility.c
|
process_utility.c
|
||||||
scanner.c
|
scanner.c
|
||||||
|
22
src/chunk.c
22
src/chunk.c
@ -54,6 +54,7 @@
|
|||||||
#include "hypercube.h"
|
#include "hypercube.h"
|
||||||
#include "hypertable.h"
|
#include "hypertable.h"
|
||||||
#include "hypertable_cache.h"
|
#include "hypertable_cache.h"
|
||||||
|
#include "osm_callbacks.h"
|
||||||
#include "partitioning.h"
|
#include "partitioning.h"
|
||||||
#include "process_utility.h"
|
#include "process_utility.h"
|
||||||
#include "scan_iterator.h"
|
#include "scan_iterator.h"
|
||||||
@ -1156,27 +1157,15 @@ ts_chunk_create_only_table(Hypertable *ht, Hypercube *cube, const char *schema_n
|
|||||||
return chunk;
|
return chunk;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if PG14_GE
|
|
||||||
#define OSM_CHUNK_INSERT_CHECK_HOOK "osm_chunk_insert_check_hook"
|
|
||||||
typedef int (*ts_osm_chunk_insert_hook_type)(Oid ht_oid, int64 range_start, int64 range_end);
|
|
||||||
static ts_osm_chunk_insert_hook_type
|
|
||||||
get_osm_chunk_insert_hook()
|
|
||||||
{
|
|
||||||
ts_osm_chunk_insert_hook_type *func_ptr =
|
|
||||||
(ts_osm_chunk_insert_hook_type *) find_rendezvous_variable(OSM_CHUNK_INSERT_CHECK_HOOK);
|
|
||||||
return *func_ptr;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
static Chunk *
|
static Chunk *
|
||||||
chunk_create_from_hypercube_after_lock(const Hypertable *ht, Hypercube *cube,
|
chunk_create_from_hypercube_after_lock(const Hypertable *ht, Hypercube *cube,
|
||||||
const char *schema_name, const char *table_name,
|
const char *schema_name, const char *table_name,
|
||||||
const char *prefix)
|
const char *prefix)
|
||||||
{
|
{
|
||||||
#if PG14_GE
|
#if PG14_GE
|
||||||
ts_osm_chunk_insert_hook_type insert_func_ptr = get_osm_chunk_insert_hook();
|
OsmCallbacks *callbacks = ts_get_osm_callbacks();
|
||||||
|
|
||||||
if (insert_func_ptr)
|
if (callbacks)
|
||||||
{
|
{
|
||||||
/* OSM only uses first dimension . doesn't work with multinode tables yet*/
|
/* OSM only uses first dimension . doesn't work with multinode tables yet*/
|
||||||
Dimension *dim = &ht->space->dimensions[0];
|
Dimension *dim = &ht->space->dimensions[0];
|
||||||
@ -1185,7 +1174,10 @@ chunk_create_from_hypercube_after_lock(const Hypertable *ht, Hypercube *cube,
|
|||||||
ts_internal_to_time_int64(cube->slices[0]->fd.range_start, dim->fd.column_type);
|
ts_internal_to_time_int64(cube->slices[0]->fd.range_start, dim->fd.column_type);
|
||||||
int64 range_end =
|
int64 range_end =
|
||||||
ts_internal_to_time_int64(cube->slices[0]->fd.range_end, dim->fd.column_type);
|
ts_internal_to_time_int64(cube->slices[0]->fd.range_end, dim->fd.column_type);
|
||||||
int chunk_exists = insert_func_ptr(ht->main_table_relid, range_start, range_end);
|
|
||||||
|
int chunk_exists =
|
||||||
|
callbacks->chunk_insert_check_hook(ht->main_table_relid, range_start, range_end);
|
||||||
|
|
||||||
if (chunk_exists)
|
if (chunk_exists)
|
||||||
{
|
{
|
||||||
Oid outfuncid = InvalidOid;
|
Oid outfuncid = InvalidOid;
|
||||||
|
@ -63,6 +63,7 @@
|
|||||||
#include "cross_module_fn.h"
|
#include "cross_module_fn.h"
|
||||||
#include "scan_iterator.h"
|
#include "scan_iterator.h"
|
||||||
#include "debug_assert.h"
|
#include "debug_assert.h"
|
||||||
|
#include "osm_callbacks.h"
|
||||||
|
|
||||||
Oid
|
Oid
|
||||||
ts_rel_get_owner(Oid relid)
|
ts_rel_get_owner(Oid relid)
|
||||||
@ -625,6 +626,20 @@ hypertable_tuple_delete(TupleInfo *ti, void *data)
|
|||||||
ts_hypertable_drop(compressed_hypertable, DROP_RESTRICT);
|
ts_hypertable_drop(compressed_hypertable, DROP_RESTRICT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if PG14_GE
|
||||||
|
OsmCallbacks *callbacks = ts_get_osm_callbacks();
|
||||||
|
|
||||||
|
/* Invoke the OSM callback if set */
|
||||||
|
if (callbacks)
|
||||||
|
{
|
||||||
|
Name schema_name =
|
||||||
|
DatumGetName(slot_getattr(ti->slot, Anum_hypertable_schema_name, &isnull));
|
||||||
|
Name table_name = DatumGetName(slot_getattr(ti->slot, Anum_hypertable_table_name, &isnull));
|
||||||
|
|
||||||
|
callbacks->hypertable_drop_hook(NameStr(*schema_name), NameStr(*table_name));
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
|
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
|
||||||
ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
|
ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
|
||||||
ts_catalog_restore_user(&sec_ctx);
|
ts_catalog_restore_user(&sec_ctx);
|
||||||
|
18
src/osm_callbacks.c
Normal file
18
src/osm_callbacks.c
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
/*
|
||||||
|
* This file and its contents are licensed under the Apache License 2.0.
|
||||||
|
* Please see the included NOTICE for copyright information and
|
||||||
|
* LICENSE-APACHE for a copy of the license.
|
||||||
|
*/
|
||||||
|
#include "osm_callbacks.h"
|
||||||
|
|
||||||
|
#include <fmgr.h>
|
||||||
|
|
||||||
|
#define OSM_CALLBACKS_VAR_NAME "osm_callbacks"
|
||||||
|
|
||||||
|
OsmCallbacks *
|
||||||
|
ts_get_osm_callbacks(void)
|
||||||
|
{
|
||||||
|
OsmCallbacks **ptr = (OsmCallbacks **) find_rendezvous_variable(OSM_CALLBACKS_VAR_NAME);
|
||||||
|
|
||||||
|
return *ptr;
|
||||||
|
}
|
29
src/osm_callbacks.h
Normal file
29
src/osm_callbacks.h
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
/*
|
||||||
|
* This file and its contents are licensed under the Apache License 2.0.
|
||||||
|
* Please see the included NOTICE for copyright information and
|
||||||
|
* LICENSE-APACHE for a copy of the license.
|
||||||
|
*/
|
||||||
|
#ifndef TIMESCALEDB_OSM_CALLBACKS_H
|
||||||
|
#define TIMESCALEDB_OSM_CALLBACKS_H
|
||||||
|
|
||||||
|
#include <postgres.h>
|
||||||
|
#include <catalog/objectaddress.h>
|
||||||
|
|
||||||
|
typedef int (*chunk_insert_check_hook_type)(Oid ht_oid, int64 range_start, int64 range_end);
|
||||||
|
typedef void (*hypertable_drop_hook_type)(const char *schema_name, const char *table_name);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Object Storage Manager callbacks.
|
||||||
|
*
|
||||||
|
* chunk_insert_check_hook - checks whether the specified range is managed by OSM
|
||||||
|
* hypertable_drop_hook - used for OSM catalog cleanups
|
||||||
|
*/
|
||||||
|
typedef struct
|
||||||
|
{
|
||||||
|
chunk_insert_check_hook_type chunk_insert_check_hook;
|
||||||
|
hypertable_drop_hook_type hypertable_drop_hook;
|
||||||
|
} OsmCallbacks;
|
||||||
|
|
||||||
|
extern OsmCallbacks *ts_get_osm_callbacks(void);
|
||||||
|
|
||||||
|
#endif /* TIMESCALEDB_OSM_CALLBACKS_H */
|
@ -439,6 +439,33 @@ SELECT * FROM hyper1_cagg ORDER BY 1;
|
|||||||
30 | 2
|
30 | 2
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- check that dropping cagg triggers OSM callback
|
||||||
|
SELECT ts_setup_osm_hook();
|
||||||
|
ts_setup_osm_hook
|
||||||
|
-------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
DROP MATERIALIZED VIEW hyper1_cagg CASCADE;
|
||||||
|
NOTICE: drop cascades to table _timescaledb_internal._hyper_4_9_chunk
|
||||||
|
NOTICE: hypertable_drop_hook
|
||||||
|
DROP TABLE test1.hyper1;
|
||||||
|
NOTICE: hypertable_drop_hook
|
||||||
|
ROLLBACK;
|
||||||
|
BEGIN;
|
||||||
|
DROP TABLE test1.hyper1 CASCADE;
|
||||||
|
NOTICE: drop cascades to 3 other objects
|
||||||
|
NOTICE: drop cascades to table _timescaledb_internal._hyper_4_9_chunk
|
||||||
|
NOTICE: hypertable_drop_hook
|
||||||
|
NOTICE: hypertable_drop_hook
|
||||||
|
ROLLBACK;
|
||||||
|
SELECT ts_undo_osm_hook();
|
||||||
|
ts_undo_osm_hook
|
||||||
|
------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
--TEST error case (un)freeze a non-chunk
|
--TEST error case (un)freeze a non-chunk
|
||||||
CREATE TABLE nochunk_tab( a timestamp, b integer);
|
CREATE TABLE nochunk_tab( a timestamp, b integer);
|
||||||
\set ON_ERROR_STOP 0
|
\set ON_ERROR_STOP 0
|
||||||
@ -629,6 +656,7 @@ SELECT ts_setup_osm_hook();
|
|||||||
\set ON_ERROR_STOP 0
|
\set ON_ERROR_STOP 0
|
||||||
--the mock hook returns true always. so cannot create a new chunk on the hypertable
|
--the mock hook returns true always. so cannot create a new chunk on the hypertable
|
||||||
INSERT INTO ht_try VALUES ('2022-06-05 01:00', 222, 222);
|
INSERT INTO ht_try VALUES ('2022-06-05 01:00', 222, 222);
|
||||||
|
NOTICE: chunk_insert_check_hook
|
||||||
ERROR: Cannot insert into tiered chunk range of public.ht_try - attempt to create new chunk with range [Sat Jun 04 17:00:00 2022 PDT Sun Jun 05 17:00:00 2022 PDT] failed
|
ERROR: Cannot insert into tiered chunk range of public.ht_try - attempt to create new chunk with range [Sat Jun 04 17:00:00 2022 PDT Sun Jun 05 17:00:00 2022 PDT] failed
|
||||||
\set ON_ERROR_STOP 1
|
\set ON_ERROR_STOP 1
|
||||||
SELECT ts_undo_osm_hook();
|
SELECT ts_undo_osm_hook();
|
||||||
|
@ -267,6 +267,17 @@ SELECT _timescaledb_internal.drop_chunk( :'CHNAME1');
|
|||||||
SELECT * from test1.hyper1 ORDER BY 1;
|
SELECT * from test1.hyper1 ORDER BY 1;
|
||||||
SELECT * FROM hyper1_cagg ORDER BY 1;
|
SELECT * FROM hyper1_cagg ORDER BY 1;
|
||||||
|
|
||||||
|
-- check that dropping cagg triggers OSM callback
|
||||||
|
SELECT ts_setup_osm_hook();
|
||||||
|
BEGIN;
|
||||||
|
DROP MATERIALIZED VIEW hyper1_cagg CASCADE;
|
||||||
|
DROP TABLE test1.hyper1;
|
||||||
|
ROLLBACK;
|
||||||
|
BEGIN;
|
||||||
|
DROP TABLE test1.hyper1 CASCADE;
|
||||||
|
ROLLBACK;
|
||||||
|
SELECT ts_undo_osm_hook();
|
||||||
|
|
||||||
--TEST error case (un)freeze a non-chunk
|
--TEST error case (un)freeze a non-chunk
|
||||||
CREATE TABLE nochunk_tab( a timestamp, b integer);
|
CREATE TABLE nochunk_tab( a timestamp, b integer);
|
||||||
\set ON_ERROR_STOP 0
|
\set ON_ERROR_STOP 0
|
||||||
|
@ -12,6 +12,7 @@
|
|||||||
#include "bgw/job.h"
|
#include "bgw/job.h"
|
||||||
#include "export.h"
|
#include "export.h"
|
||||||
#include "bgw_policy/chunk_stats.h"
|
#include "bgw_policy/chunk_stats.h"
|
||||||
|
#include "osm_callbacks.h"
|
||||||
|
|
||||||
TS_FUNCTION_INFO_V1(ts_test_chunk_stats_insert);
|
TS_FUNCTION_INFO_V1(ts_test_chunk_stats_insert);
|
||||||
|
|
||||||
@ -35,13 +36,26 @@ ts_test_chunk_stats_insert(PG_FUNCTION_ARGS)
|
|||||||
PG_RETURN_NULL();
|
PG_RETURN_NULL();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef int (*chunk_insert_check_hook_type)(Oid, int64, int64);
|
||||||
|
typedef void (*hypertable_drop_hook_type)(const char *, const char *);
|
||||||
|
|
||||||
static int
|
static int
|
||||||
osm_insert_hook_mock(Oid ht_oid, int64 range_start, int64 range_end)
|
osm_insert_hook_mock(Oid ht_oid, int64 range_start, int64 range_end)
|
||||||
{
|
{
|
||||||
/* always return true */
|
/* always return true */
|
||||||
|
elog(NOTICE, "chunk_insert_check_hook");
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
osm_ht_drop_hook_mock(const char *schema_name, const char *table_name)
|
||||||
|
{
|
||||||
|
elog(NOTICE, "hypertable_drop_hook");
|
||||||
|
}
|
||||||
|
|
||||||
|
OsmCallbacks fake_osm_callbacks = { .chunk_insert_check_hook = osm_insert_hook_mock,
|
||||||
|
.hypertable_drop_hook = osm_ht_drop_hook_mock };
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Dummy function to mock OSM_INSERT hook called at chunk creation for tiered data
|
* Dummy function to mock OSM_INSERT hook called at chunk creation for tiered data
|
||||||
*/
|
*/
|
||||||
@ -49,10 +63,9 @@ TS_FUNCTION_INFO_V1(ts_setup_osm_hook);
|
|||||||
Datum
|
Datum
|
||||||
ts_setup_osm_hook(PG_FUNCTION_ARGS)
|
ts_setup_osm_hook(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
typedef int (*MOCK_OSM_INSERT_HOOK)(Oid, int64, int64);
|
OsmCallbacks **ptr = (OsmCallbacks **) find_rendezvous_variable("osm_callbacks");
|
||||||
MOCK_OSM_INSERT_HOOK *var =
|
*ptr = &fake_osm_callbacks;
|
||||||
(MOCK_OSM_INSERT_HOOK *) find_rendezvous_variable("osm_chunk_insert_check_hook");
|
|
||||||
*var = osm_insert_hook_mock;
|
|
||||||
PG_RETURN_NULL();
|
PG_RETURN_NULL();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -60,9 +73,8 @@ TS_FUNCTION_INFO_V1(ts_undo_osm_hook);
|
|||||||
Datum
|
Datum
|
||||||
ts_undo_osm_hook(PG_FUNCTION_ARGS)
|
ts_undo_osm_hook(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
typedef int (*MOCK_OSM_INSERT_HOOK)(Oid, int64, int64);
|
OsmCallbacks **ptr = (OsmCallbacks **) find_rendezvous_variable("osm_callbacks");
|
||||||
MOCK_OSM_INSERT_HOOK *var =
|
*ptr = NULL;
|
||||||
(MOCK_OSM_INSERT_HOOK *) find_rendezvous_variable("osm_chunk_insert_check_hook");
|
|
||||||
*var = NULL;
|
|
||||||
PG_RETURN_NULL();
|
PG_RETURN_NULL();
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user