Refactor Continuous Aggregate catalog code

Get rid of `GETSTRUCT` to fill the form data and use `heap_deform_tuple`
instead. This is necessary specially when you have variable lenght
fields and/or fields that accept NULL values. This refactoring will be
specially usefull in a following PR for Nested Continuous Aggregates
where we'll add a new metadata to the catalog that can accept NULL
values.

Also refactor the rename view and schema code paths improving the
readability and maintainability.
This commit is contained in:
Fabrízio de Royes Mello 2022-10-26 16:44:59 -03:00
parent 9b157d5438
commit 8d1e165d7f
2 changed files with 271 additions and 160 deletions

View File

@ -275,6 +275,92 @@ ts_materialization_invalidation_log_delete(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
static HeapTuple
continuous_agg_formdata_make_tuple(const FormData_continuous_agg *fd, TupleDesc desc)
{
Datum values[Natts_continuous_agg];
bool nulls[Natts_continuous_agg] = { false };
memset(values, 0, sizeof(Datum) * Natts_continuous_agg);
values[AttrNumberGetAttrOffset(Anum_continuous_agg_mat_hypertable_id)] =
Int32GetDatum(fd->mat_hypertable_id);
values[AttrNumberGetAttrOffset(Anum_continuous_agg_raw_hypertable_id)] =
Int32GetDatum(fd->raw_hypertable_id);
values[AttrNumberGetAttrOffset(Anum_continuous_agg_user_view_schema)] =
NameGetDatum(&fd->user_view_schema);
values[AttrNumberGetAttrOffset(Anum_continuous_agg_user_view_name)] =
NameGetDatum(&fd->user_view_name);
values[AttrNumberGetAttrOffset(Anum_continuous_agg_partial_view_schema)] =
NameGetDatum(&fd->partial_view_schema);
values[AttrNumberGetAttrOffset(Anum_continuous_agg_partial_view_name)] =
NameGetDatum(&fd->partial_view_name);
values[AttrNumberGetAttrOffset(Anum_continuous_agg_bucket_width)] =
Int64GetDatum(fd->bucket_width);
values[AttrNumberGetAttrOffset(Anum_continuous_agg_direct_view_schema)] =
NameGetDatum(&fd->direct_view_schema);
values[AttrNumberGetAttrOffset(Anum_continuous_agg_direct_view_name)] =
NameGetDatum(&fd->direct_view_name);
values[AttrNumberGetAttrOffset(Anum_continuous_agg_materialize_only)] =
BoolGetDatum(fd->materialized_only);
values[AttrNumberGetAttrOffset(Anum_continuous_agg_finalized)] = BoolGetDatum(fd->finalized);
return heap_form_tuple(desc, values, nulls);
}
static void
continuous_agg_formdata_fill(FormData_continuous_agg *fd, const TupleInfo *ti)
{
bool should_free;
HeapTuple tuple;
Datum values[Natts_continuous_agg];
bool nulls[Natts_continuous_agg] = { false };
tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
heap_deform_tuple(tuple, ts_scanner_get_tupledesc(ti), values, nulls);
fd->mat_hypertable_id =
DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_continuous_agg_mat_hypertable_id)]);
fd->raw_hypertable_id =
DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_continuous_agg_raw_hypertable_id)]);
memcpy(&fd->user_view_schema,
DatumGetName(values[AttrNumberGetAttrOffset(Anum_continuous_agg_user_view_schema)]),
NAMEDATALEN);
memcpy(&fd->user_view_name,
DatumGetName(values[AttrNumberGetAttrOffset(Anum_continuous_agg_user_view_name)]),
NAMEDATALEN);
memcpy(&fd->partial_view_schema,
DatumGetName(values[AttrNumberGetAttrOffset(Anum_continuous_agg_partial_view_schema)]),
NAMEDATALEN);
memcpy(&fd->partial_view_name,
DatumGetName(values[AttrNumberGetAttrOffset(Anum_continuous_agg_partial_view_name)]),
NAMEDATALEN);
fd->bucket_width =
DatumGetInt64(values[AttrNumberGetAttrOffset(Anum_continuous_agg_bucket_width)]);
memcpy(&fd->direct_view_schema,
DatumGetName(values[AttrNumberGetAttrOffset(Anum_continuous_agg_direct_view_schema)]),
NAMEDATALEN);
memcpy(&fd->direct_view_name,
DatumGetName(values[AttrNumberGetAttrOffset(Anum_continuous_agg_direct_view_name)]),
NAMEDATALEN);
fd->materialized_only =
DatumGetBool(values[AttrNumberGetAttrOffset(Anum_continuous_agg_materialize_only)]);
fd->finalized = DatumGetBool(values[AttrNumberGetAttrOffset(Anum_continuous_agg_finalized)]);
if (should_free)
heap_freetuple(tuple);
}
static void
continuous_agg_fill_bucket_function(int32 mat_hypertable_id, ContinuousAggsBucketFunction *bf)
{
@ -637,18 +723,16 @@ ts_continuous_agg_hypertable_status(int32 hypertable_id)
ts_scanner_foreach(&iterator)
{
bool should_free;
HeapTuple tuple = ts_scan_iterator_fetch_heap_tuple(&iterator, false, &should_free);
FormData_continuous_agg *data = (FormData_continuous_agg *) GETSTRUCT(tuple);
FormData_continuous_agg data;
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
if (data->raw_hypertable_id == hypertable_id)
continuous_agg_formdata_fill(&data, ti);
if (data.raw_hypertable_id == hypertable_id)
status |= HypertableIsRawTable;
if (data->mat_hypertable_id == hypertable_id)
if (data.mat_hypertable_id == hypertable_id)
status |= HypertableIsMaterialization;
if (should_free)
heap_freetuple(tuple);
if (status == HypertableIsMaterializationAndRaw)
{
ts_scan_iterator_close(&iterator);
@ -670,19 +754,17 @@ ts_continuous_aggs_find_by_raw_table_id(int32 raw_hypertable_id)
ts_scanner_foreach(&iterator)
{
ContinuousAgg *ca;
bool should_free;
HeapTuple tuple = ts_scan_iterator_fetch_heap_tuple(&iterator, false, &should_free);
Form_continuous_agg data = (Form_continuous_agg) GETSTRUCT(tuple);
FormData_continuous_agg data;
MemoryContext oldmctx;
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
continuous_agg_formdata_fill(&data, ti);
oldmctx = MemoryContextSwitchTo(ts_scan_iterator_get_result_memory_context(&iterator));
ca = palloc0(sizeof(*ca));
continuous_agg_init(ca, data);
continuous_agg_init(ca, &data);
continuous_aggs = lappend(continuous_aggs, ca);
MemoryContextSwitchTo(oldmctx);
if (should_free)
heap_freetuple(tuple);
}
return continuous_aggs;
@ -699,28 +781,27 @@ ts_continuous_agg_find_by_mat_hypertable_id(int32 mat_hypertable_id)
init_scan_by_mat_hypertable_id(&iterator, mat_hypertable_id);
ts_scanner_foreach(&iterator)
{
bool should_free;
HeapTuple tuple = ts_scan_iterator_fetch_heap_tuple(&iterator, false, &should_free);
Form_continuous_agg form = (Form_continuous_agg) GETSTRUCT(tuple);
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
FormData_continuous_agg form;
continuous_agg_formdata_fill(&form, ti);
/* Note that this scan can only match at most once, so we assert on
* `ca` here. */
Assert(ca == NULL);
ca = ts_scan_iterator_alloc_result(&iterator, sizeof(*ca));
continuous_agg_init(ca, form);
continuous_agg_init(ca, &form);
Assert(ca && ca->data.mat_hypertable_id == mat_hypertable_id);
if (should_free)
heap_freetuple(tuple);
}
ts_scan_iterator_close(&iterator);
return ca;
}
static bool
continuous_agg_fill_form_data(const char *schema, const char *name, ContinuousAggViewType type,
FormData_continuous_agg *fd)
continuous_agg_find_by_name(const char *schema, const char *name, ContinuousAggViewType type,
FormData_continuous_agg *fd)
{
ScanIterator iterator;
AttrNumber view_name_attrnum = 0;
@ -766,22 +847,20 @@ continuous_agg_fill_form_data(const char *schema, const char *name, ContinuousAg
ts_scanner_foreach(&iterator)
{
bool should_free;
HeapTuple tuple = ts_scan_iterator_fetch_heap_tuple(&iterator, false, &should_free);
FormData_continuous_agg *data = (FormData_continuous_agg *) GETSTRUCT(tuple);
ContinuousAggViewType vtype = type;
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
FormData_continuous_agg data;
continuous_agg_formdata_fill(&data, ti);
if (vtype == ContinuousAggAnyView)
vtype = ts_continuous_agg_view_type(data, schema, name);
vtype = ts_continuous_agg_view_type(&data, schema, name);
if (vtype != ContinuousAggAnyView)
{
memcpy(fd, data, sizeof(*fd));
memcpy(fd, &data, sizeof(*fd));
count++;
}
if (should_free)
heap_freetuple(tuple);
}
Assert(count <= 1);
@ -796,7 +875,7 @@ ts_continuous_agg_find_by_view_name(const char *schema, const char *name,
FormData_continuous_agg fd;
ContinuousAgg *ca;
if (!continuous_agg_fill_form_data(schema, name, type, &fd))
if (!continuous_agg_find_by_name(schema, name, type, &fd))
return NULL;
ca = palloc0(sizeof(ContinuousAgg));
@ -988,33 +1067,30 @@ drop_continuous_agg(FormData_continuous_agg *cadata, bool drop_user_view)
ts_scanner_foreach(&iterator)
{
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
bool should_free;
HeapTuple tuple = ts_scan_iterator_fetch_heap_tuple(&iterator, false, &should_free);
Form_continuous_agg form = (Form_continuous_agg) GETSTRUCT(tuple);
FormData_continuous_agg form;
continuous_agg_formdata_fill(&form, ti);
ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
/* Delete all related rows */
if (!raw_hypertable_has_other_caggs)
{
hypertable_invalidation_log_delete(form->raw_hypertable_id);
hypertable_invalidation_log_delete(form.raw_hypertable_id);
if (ts_cm_functions->remote_invalidation_log_delete)
ts_cm_functions->remote_invalidation_log_delete(form->raw_hypertable_id,
ts_cm_functions->remote_invalidation_log_delete(form.raw_hypertable_id,
HypertableIsRawTable);
}
ts_materialization_invalidation_log_delete_inner(form->mat_hypertable_id);
ts_materialization_invalidation_log_delete_inner(form.mat_hypertable_id);
if (ts_cm_functions->remote_invalidation_log_delete)
ts_cm_functions->remote_invalidation_log_delete(form->mat_hypertable_id,
ts_cm_functions->remote_invalidation_log_delete(form.mat_hypertable_id,
HypertableIsMaterialization);
if (!raw_hypertable_has_other_caggs)
{
invalidation_threshold_delete(form->raw_hypertable_id);
invalidation_threshold_delete(form.raw_hypertable_id);
}
if (should_free)
heap_freetuple(tuple);
}
if (cadata->bucket_width == BUCKET_WIDTH_VARIABLE)
@ -1063,21 +1139,19 @@ ts_continuous_agg_drop_hypertable_callback(int32 hypertable_id)
ts_scanner_foreach(&iterator)
{
bool should_free;
HeapTuple tuple = ts_scan_iterator_fetch_heap_tuple(&iterator, false, &should_free);
FormData_continuous_agg *data = (FormData_continuous_agg *) GETSTRUCT(tuple);
FormData_continuous_agg data;
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
if (data->raw_hypertable_id == hypertable_id)
drop_continuous_agg(data, true);
continuous_agg_formdata_fill(&data, ti);
if (data->mat_hypertable_id == hypertable_id)
if (data.raw_hypertable_id == hypertable_id)
drop_continuous_agg(&data, true);
if (data.mat_hypertable_id == hypertable_id)
ereport(ERROR,
(errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
errmsg("cannot drop the materialized table because it is required by a "
"continuous aggregate")));
if (should_free)
heap_freetuple(tuple);
}
}
@ -1127,7 +1201,7 @@ bool
ts_continuous_agg_drop(const char *view_schema, const char *view_name)
{
FormData_continuous_agg fd;
bool found = continuous_agg_fill_form_data(view_schema, view_name, ContinuousAggAnyView, &fd);
bool found = continuous_agg_find_by_name(view_schema, view_name, ContinuousAggAnyView, &fd);
if (found)
continuous_agg_drop_view_callback(&fd, view_schema, view_name);
@ -1169,124 +1243,160 @@ ts_continuous_agg_view_type(FormData_continuous_agg *data, const char *schema, c
return ContinuousAggAnyView;
}
static FormData_continuous_agg *
ensure_new_tuple(HeapTuple old_tuple, HeapTuple *new_tuple)
typedef struct CaggRenameCtx
{
if (*new_tuple == NULL)
*new_tuple = heap_copytuple(old_tuple);
const char *old_schema;
const char *old_name;
const char *new_schema;
const char *new_name;
ObjectType *object_type;
void (*process_rename)(FormData_continuous_agg *form, bool *do_update, void *data);
} CaggRenameCtx;
return (FormData_continuous_agg *) GETSTRUCT(*new_tuple);
static void
continuous_agg_rename_process_rename_schema(FormData_continuous_agg *form, bool *do_update,
void *data)
{
CaggRenameCtx *ctx = (CaggRenameCtx *) data;
if (ts_continuous_agg_is_user_view_schema(form, ctx->old_schema))
{
namestrcpy(&form->user_view_schema, ctx->new_schema);
*do_update = true;
}
if (ts_continuous_agg_is_partial_view_schema(form, ctx->old_schema))
{
namestrcpy(&form->partial_view_schema, ctx->new_schema);
*do_update = true;
}
if (ts_continuous_agg_is_direct_view_schema(form, ctx->old_schema))
{
namestrcpy(&form->direct_view_schema, ctx->new_schema);
*do_update = true;
}
}
static void
continuous_agg_rename_process_rename_view(FormData_continuous_agg *form, bool *do_update,
void *data)
{
CaggRenameCtx *ctx = (CaggRenameCtx *) data;
ContinuousAggViewType vtyp;
vtyp = ts_continuous_agg_view_type(form, ctx->old_schema, ctx->old_name);
switch (vtyp)
{
case ContinuousAggUserView:
{
if (*ctx->object_type == OBJECT_VIEW)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot alter continuous aggregate using ALTER VIEW"),
errhint("Use ALTER MATERIALIZED VIEW to alter a continuous aggregate.")));
Assert(*ctx->object_type == OBJECT_MATVIEW);
*ctx->object_type = OBJECT_VIEW;
namestrcpy(&form->user_view_schema, ctx->new_schema);
namestrcpy(&form->user_view_name, ctx->new_name);
*do_update = true;
break;
}
case ContinuousAggPartialView:
{
namestrcpy(&form->partial_view_schema, ctx->new_schema);
namestrcpy(&form->partial_view_name, ctx->new_name);
*do_update = true;
break;
}
case ContinuousAggDirectView:
{
namestrcpy(&form->direct_view_schema, ctx->new_schema);
namestrcpy(&form->direct_view_name, ctx->new_name);
*do_update = true;
break;
}
default:
break;
}
}
static ScanTupleResult
continuous_agg_rename(TupleInfo *ti, void *data)
{
CaggRenameCtx *ctx = (CaggRenameCtx *) data;
FormData_continuous_agg form;
bool do_update = false;
CatalogSecurityContext sec_ctx;
continuous_agg_formdata_fill(&form, ti);
ctx->process_rename(&form, &do_update, (void *) ctx);
if (do_update)
{
HeapTuple new_tuple =
continuous_agg_formdata_make_tuple(&form, ts_scanner_get_tupledesc(ti));
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ts_catalog_update_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti), new_tuple);
ts_catalog_restore_user(&sec_ctx);
heap_freetuple(new_tuple);
}
return SCAN_CONTINUE;
}
void
ts_continuous_agg_rename_schema_name(char *old_schema, char *new_schema)
ts_continuous_agg_rename_schema_name(const char *old_schema, const char *new_schema)
{
ScanIterator iterator =
ts_scan_iterator_create(CONTINUOUS_AGG, RowExclusiveLock, CurrentMemoryContext);
CaggRenameCtx cagg_rename_ctx = {
.old_schema = old_schema,
.new_schema = new_schema,
.process_rename = continuous_agg_rename_process_rename_schema,
};
ts_scanner_foreach(&iterator)
{
TupleInfo *tinfo = ts_scan_iterator_tuple_info(&iterator);
bool should_free;
HeapTuple tuple = ts_scan_iterator_fetch_heap_tuple(&iterator, false, &should_free);
FormData_continuous_agg *data = (FormData_continuous_agg *) GETSTRUCT(tuple);
HeapTuple new_tuple = NULL;
Catalog *catalog = ts_catalog_get();
if (ts_continuous_agg_is_user_view_schema(data, old_schema))
{
FormData_continuous_agg *new_data = ensure_new_tuple(tuple, &new_tuple);
namestrcpy(&new_data->user_view_schema, new_schema);
}
ScannerCtx scanctx = {
.table = catalog_get_table_id(catalog, CONTINUOUS_AGG),
.index = InvalidOid,
.tuple_found = continuous_agg_rename,
.data = &cagg_rename_ctx,
.lockmode = RowExclusiveLock,
.scandirection = ForwardScanDirection,
};
if (ts_continuous_agg_is_partial_view_schema(data, old_schema))
{
FormData_continuous_agg *new_data = ensure_new_tuple(tuple, &new_tuple);
namestrcpy(&new_data->partial_view_schema, new_schema);
}
if (ts_continuous_agg_is_direct_view_schema(data, old_schema))
{
FormData_continuous_agg *new_data = ensure_new_tuple(tuple, &new_tuple);
namestrcpy(&new_data->direct_view_schema, new_schema);
}
if (new_tuple != NULL)
{
ts_catalog_update(tinfo->scanrel, new_tuple);
heap_freetuple(new_tuple);
}
if (should_free)
heap_freetuple(tuple);
}
ts_scanner_scan(&scanctx);
}
extern void
ts_continuous_agg_rename_view(const char *old_schema, const char *name, const char *new_schema,
void
ts_continuous_agg_rename_view(const char *old_schema, const char *old_name, const char *new_schema,
const char *new_name, ObjectType *object_type)
{
ScanIterator iterator =
ts_scan_iterator_create(CONTINUOUS_AGG, RowExclusiveLock, CurrentMemoryContext);
CaggRenameCtx cagg_rename_ctx = {
.old_schema = old_schema,
.old_name = old_name,
.new_schema = new_schema,
.new_name = new_name,
.object_type = object_type,
.process_rename = continuous_agg_rename_process_rename_view,
};
Assert(object_type);
Catalog *catalog = ts_catalog_get();
ts_scanner_foreach(&iterator)
{
TupleInfo *tinfo = ts_scan_iterator_tuple_info(&iterator);
bool should_free;
HeapTuple tuple = ts_scan_iterator_fetch_heap_tuple(&iterator, false, &should_free);
FormData_continuous_agg *data = (FormData_continuous_agg *) GETSTRUCT(tuple);
HeapTuple new_tuple = NULL;
ContinuousAggViewType vtyp = ts_continuous_agg_view_type(data, old_schema, name);
ScannerCtx scanctx = {
.table = catalog_get_table_id(catalog, CONTINUOUS_AGG),
.index = InvalidOid,
.tuple_found = continuous_agg_rename,
.data = &cagg_rename_ctx,
.lockmode = RowExclusiveLock,
.scandirection = ForwardScanDirection,
};
switch (vtyp)
{
case ContinuousAggUserView:
{
FormData_continuous_agg *new_data;
if (*object_type == OBJECT_VIEW)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot alter continuous aggregate using ALTER VIEW"),
errhint(
"Use ALTER MATERIALIZED VIEW to alter a continuous aggregate.")));
Assert(*object_type == OBJECT_MATVIEW);
*object_type = OBJECT_VIEW;
new_data = ensure_new_tuple(tuple, &new_tuple);
namestrcpy(&new_data->user_view_schema, new_schema);
namestrcpy(&new_data->user_view_name, new_name);
break;
}
case ContinuousAggPartialView:
{
FormData_continuous_agg *new_data = ensure_new_tuple(tuple, &new_tuple);
namestrcpy(&new_data->partial_view_schema, new_schema);
namestrcpy(&new_data->partial_view_name, new_name);
break;
}
case ContinuousAggDirectView:
{
FormData_continuous_agg *new_data = ensure_new_tuple(tuple, &new_tuple);
namestrcpy(&new_data->direct_view_schema, new_schema);
namestrcpy(&new_data->direct_view_name, new_name);
break;
}
default:
break;
}
if (new_tuple != NULL)
{
ts_catalog_update(tinfo->scanrel, new_tuple);
heap_freetuple(new_tuple);
}
if (should_free)
heap_freetuple(tuple);
}
ts_scanner_scan(&scanctx);
}
TSDLLEXPORT int32

View File

@ -188,10 +188,11 @@ extern void ts_continuous_agg_drop_hypertable_callback(int32 hypertable_id);
extern TSDLLEXPORT ContinuousAggViewType ts_continuous_agg_view_type(FormData_continuous_agg *data,
const char *schema,
const char *name);
extern void ts_continuous_agg_rename_schema_name(char *old_schema, char *new_schema);
extern void ts_continuous_agg_rename_view(const char *old_schema, const char *name,
const char *new_schema, const char *new_name,
ObjectType *object_type);
extern TSDLLEXPORT void ts_continuous_agg_rename_schema_name(const char *old_schema,
const char *new_schema);
extern TSDLLEXPORT void ts_continuous_agg_rename_view(const char *old_schema, const char *old_name,
const char *new_schema, const char *new_name,
ObjectType *object_type);
extern TSDLLEXPORT int32 ts_number_of_continuous_aggs(void);