Delete related rows for compression

This fixes delete of relate rows when we have compressed
hypertables. Namely we delete rows from:

- compression_chunk_size
- hypertable_compression

We also fix hypertable_compression to handle NULLS correctly.

We add a stub for tests with continuous aggs as well as compression.
But, that's broken for now so it's commented out. Will be fixed
in another PR.
This commit is contained in:
Matvey Arye 2019-09-06 17:20:33 -04:00 committed by Matvey Arye
parent 06557257f5
commit df4c444551
13 changed files with 178 additions and 65 deletions

View File

@ -17,6 +17,7 @@ set(SOURCES
constraint_aware_append.c
cross_module_fn.c
copy.c
compression_chunk_size.c
compression_with_clause.c
compression_segment_meta_min_max.c
dimension.c

View File

@ -55,6 +55,7 @@
#include "cache.h"
#include "bgw_policy/chunk_stats.h"
#include "scan_iterator.h"
#include "compression_chunk_size.h"
TS_FUNCTION_INFO_V1(ts_chunk_show_chunks);
TS_FUNCTION_INFO_V1(ts_chunk_drop_chunks);
@ -1701,6 +1702,7 @@ chunk_tuple_delete(TupleInfo *ti, void *data)
chunk_formdata_fill(&form, ti->tuple, ti->desc);
ts_chunk_constraint_delete_by_chunk_id(form.id, ccs);
ts_chunk_index_delete_by_chunk_id(form.id, true);
ts_compression_chunk_size_delete(form.id);
/* Check for dimension slices that are orphaned by the chunk deletion */
for (i = 0; i < ccs->num_constraints; i++)

View File

@ -0,0 +1,39 @@
/*
* This file and its contents are licensed under the Apache License 2.0.
* Please see the included NOTICE for copyright information and
* LICENSE-APACHE for a copy of the license.
*/
#include <postgres.h>
#include "compression_chunk_size.h"
#include "catalog.h"
#include "scanner.h"
#include "scan_iterator.h"
static void
init_scan_by_uncompressed_chunk_id(ScanIterator *iterator, int32 uncompressed_chunk_id)
{
iterator->ctx.index =
catalog_get_index(ts_catalog_get(), COMPRESSION_CHUNK_SIZE, COMPRESSION_CHUNK_SIZE_PKEY);
ts_scan_iterator_scan_key_init(iterator,
Anum_compression_chunk_size_pkey_chunk_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(uncompressed_chunk_id));
}
TSDLLEXPORT int
ts_compression_chunk_size_delete(int32 uncompressed_chunk_id)
{
ScanIterator iterator =
ts_scan_iterator_create(COMPRESSION_CHUNK_SIZE, RowExclusiveLock, CurrentMemoryContext);
int count = 0;
init_scan_by_uncompressed_chunk_id(&iterator, uncompressed_chunk_id);
ts_scanner_foreach(&iterator)
{
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
ts_catalog_delete(ti->scanrel, ti->tuple);
}
return count;
}

View File

@ -0,0 +1,13 @@
/*
* This file and its contents are licensed under the Apache License 2.0.
* Please see the included NOTICE for copyright information and
* LICENSE-APACHE for a copy of the license.
*/
#ifndef TIMESCALEDB_COMPRESSION_CHUNK_SIZE_H
#define TIMESCALEDB_COMPRESSION_CHUNK_SIZE_H
#include <postgres.h>
#include <compat.h>
extern TSDLLEXPORT int ts_compression_chunk_size_delete(int32 uncompressed_chunk_id);
#endif

View File

@ -40,6 +40,7 @@
#include "dimension.h"
#include "chunk.h"
#include "chunk_adaptive.h"
#include "hypertable_compression.h"
#include "subspace_store.h"
#include "hypertable_cache.h"
@ -565,6 +566,9 @@ hypertable_tuple_delete(TupleInfo *ti, void *data)
/* Remove any dependent continuous aggs */
ts_continuous_agg_drop_hypertable_callback(hypertable_id);
/* remove any associated compression definitions */
ts_hypertable_compression_delete_by_hypertable_id(hypertable_id);
if (!compressed_hypertable_id_isnull)
{
Hypertable *compressed_hypertable = ts_hypertable_get_by_id(compressed_hypertable_id);

View File

@ -15,33 +15,48 @@
static void
hypertable_compression_fill_from_tuple(FormData_hypertable_compression *fd, TupleInfo *ti)
{
HeapTuple tuple = ti->tuple;
TupleDesc desc = ti->desc;
Datum val;
bool isnull;
memcpy((void *) fd, GETSTRUCT(tuple), sizeof(FormData_hypertable_compression));
/* copy the part that could have null values explictly */
val = heap_getattr(tuple, Anum_hypertable_compression_segmentby_column_index, desc, &isnull);
if (isnull)
Datum values[Natts_hypertable_compression];
bool isnulls[Natts_hypertable_compression];
heap_deform_tuple(ti->tuple, ti->desc, values, isnulls);
Assert(!isnulls[AttrNumberGetAttrOffset(Anum_hypertable_compression_hypertable_id)]);
Assert(!isnulls[AttrNumberGetAttrOffset(Anum_hypertable_compression_attname)]);
Assert(!isnulls[AttrNumberGetAttrOffset(Anum_hypertable_compression_algo_id)]);
fd->hypertable_id =
DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_hypertable_compression_hypertable_id)]);
memcpy(&fd->attname,
DatumGetName(values[AttrNumberGetAttrOffset(Anum_hypertable_compression_attname)]),
NAMEDATALEN);
fd->algo_id =
DatumGetInt16(values[AttrNumberGetAttrOffset(Anum_hypertable_compression_algo_id)]);
if (isnulls[AttrNumberGetAttrOffset(Anum_hypertable_compression_segmentby_column_index)])
fd->segmentby_column_index = 0;
else
fd->segmentby_column_index = DatumGetInt16(val);
val = heap_getattr(tuple, Anum_hypertable_compression_orderby_column_index, desc, &isnull);
if (isnull)
fd->segmentby_column_index = DatumGetInt16(
values[AttrNumberGetAttrOffset(Anum_hypertable_compression_segmentby_column_index)]);
if (isnulls[AttrNumberGetAttrOffset(Anum_hypertable_compression_orderby_column_index)])
fd->orderby_column_index = 0;
else
{
fd->orderby_column_index = DatumGetInt16(val);
val = heap_getattr(tuple, Anum_hypertable_compression_orderby_asc, desc, &isnull);
fd->orderby_asc = BoolGetDatum(val);
val = heap_getattr(tuple, Anum_hypertable_compression_orderby_nullsfirst, desc, &isnull);
fd->orderby_nullsfirst = BoolGetDatum(val);
Assert(!isnulls[AttrNumberGetAttrOffset(Anum_hypertable_compression_orderby_asc)]);
Assert(!isnulls[AttrNumberGetAttrOffset(Anum_hypertable_compression_orderby_nullsfirst)]);
fd->orderby_column_index = DatumGetInt16(
values[AttrNumberGetAttrOffset(Anum_hypertable_compression_orderby_column_index)]);
fd->orderby_asc =
BoolGetDatum(values[AttrNumberGetAttrOffset(Anum_hypertable_compression_orderby_asc)]);
fd->orderby_nullsfirst = BoolGetDatum(
values[AttrNumberGetAttrOffset(Anum_hypertable_compression_orderby_nullsfirst)]);
}
}
void
hypertable_compression_fill_tuple_values(FormData_hypertable_compression *fd, Datum *values,
bool *nulls)
TSDLLEXPORT void
ts_hypertable_compression_fill_tuple_values(FormData_hypertable_compression *fd, Datum *values,
bool *nulls)
{
memset(nulls, 0, sizeof(bool) * Natts_hypertable_compression);
values[AttrNumberGetAttrOffset(Anum_hypertable_compression_hypertable_id)] =
@ -80,8 +95,8 @@ hypertable_compression_fill_tuple_values(FormData_hypertable_compression *fd, Da
/* returns length of list and fills passed in list with pointers
* to FormData_hypertable_compression
*/
List *
get_hypertablecompression_info(int32 htid)
TSDLLEXPORT List *
ts_hypertable_compression_get(int32 htid)
{
List *fdlist = NIL;
FormData_hypertable_compression *colfd = NULL;
@ -109,8 +124,8 @@ get_hypertablecompression_info(int32 htid)
return fdlist;
}
bool
hypertable_compression_delete_by_hypertable_id(int32 htid)
TSDLLEXPORT bool
ts_hypertable_compression_delete_by_hypertable_id(int32 htid)
{
int count = 0;
ScanIterator iterator =

View File

@ -11,12 +11,11 @@
#include <catalog.h>
#include <chunk.h>
extern TSDLLEXPORT List *get_hypertablecompression_info(int32 htid);
extern TSDLLEXPORT List *ts_hypertable_compression_get(int32 htid);
extern TSDLLEXPORT void
hypertable_compression_fill_tuple_values(FormData_hypertable_compression *fd, Datum *values,
bool *nulls);
ts_hypertable_compression_fill_tuple_values(FormData_hypertable_compression *fd, Datum *values,
bool *nulls);
extern TSDLLEXPORT bool hypertable_compression_delete_by_hypertable_id(int32 htid);
extern TSDLLEXPORT bool ts_is_compression_hypertable(int32 hypertable_id);
extern TSDLLEXPORT bool ts_hypertable_compression_delete_by_hypertable_id(int32 htid);
#endif

View File

@ -25,6 +25,7 @@
#include "scanner.h"
#include "scan_iterator.h"
#include "license.h"
#include "compression_chunk_size.h"
#if !PG96
#include <utils/fmgrprotos.h>
@ -67,34 +68,6 @@ compute_chunk_size(Oid chunk_relid)
return ret;
}
static void
init_scan_by_uncompressed_chunk_id(ScanIterator *iterator, int32 uncompressed_chunk_id)
{
iterator->ctx.index =
catalog_get_index(ts_catalog_get(), COMPRESSION_CHUNK_SIZE, COMPRESSION_CHUNK_SIZE_PKEY);
ts_scan_iterator_scan_key_init(iterator,
Anum_compression_chunk_size_pkey_chunk_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(uncompressed_chunk_id));
}
static int
compression_chunk_size_delete(int32 uncompressed_chunk_id)
{
ScanIterator iterator =
ts_scan_iterator_create(COMPRESSION_CHUNK_SIZE, RowExclusiveLock, CurrentMemoryContext);
int count = 0;
init_scan_by_uncompressed_chunk_id(&iterator, uncompressed_chunk_id);
ts_scanner_foreach(&iterator)
{
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
ts_catalog_delete(ti->scanrel, ti->tuple);
}
return count;
}
static void
compression_chunk_size_catalog_insert(int32 src_chunk_id, ChunkSize *src_size,
int32 compress_chunk_id, ChunkSize *compress_size)
@ -197,7 +170,7 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
LockRelationOid(catalog_get_table_id(ts_catalog_get(), CHUNK), RowExclusiveLock);
// get compression properties for hypertable
htcols_list = get_hypertablecompression_info(cxt.srcht->fd.id);
htcols_list = ts_hypertable_compression_get(cxt.srcht->fd.id);
htcols_listlen = list_length(htcols_list);
// create compressed chunk DDL and compress the data
compress_ht_chunk = create_compress_chunk_table(cxt.compress_ht, cxt.srcht_chunk);
@ -274,7 +247,7 @@ decompress_chunk_impl(Oid uncompressed_hypertable_relid, Oid uncompressed_chunk_
LockRelationOid(catalog_get_table_id(ts_catalog_get(), CHUNK), RowExclusiveLock);
decompress_chunk(compressed_chunk->table_id, uncompressed_chunk->table_id);
compression_chunk_size_delete(uncompressed_chunk->fd.id);
ts_compression_chunk_size_delete(uncompressed_chunk->fd.id);
ts_chunk_set_compressed_chunk(uncompressed_chunk, INVALID_CHUNK_ID, true);
ts_chunk_drop(compressed_chunk, DROP_RESTRICT, -1);

View File

@ -328,7 +328,7 @@ compresscolinfo_add_catalog_entries(CompressColInfo *compress_cols, int32 htid)
{
FormData_hypertable_compression *fd = &compress_cols->col_meta[i];
fd->hypertable_id = htid;
hypertable_compression_fill_tuple_values(fd, &values[0], &nulls[0]);
ts_hypertable_compression_fill_tuple_values(fd, &values[0], &nulls[0]);
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ts_catalog_insert_values(rel, desc, values, nulls);
ts_catalog_restore_user(&sec_ctx);
@ -561,7 +561,7 @@ tsl_process_compress_table(AlterTableCmd *cmd, Hypertable *ht,
/* need to drop the old compressed hypertable in case the segment by columns changed (and
* thus the column types of compressed hypertable need to change) */
ts_hypertable_drop(compressed, DROP_RESTRICT);
hypertable_compression_delete_by_hypertable_id(ht->fd.id);
ts_hypertable_compression_delete_by_hypertable_id(ht->fd.id);
}
compress_htid = create_compression_table(ownerid, &compress_cols);

View File

@ -53,7 +53,7 @@ ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hyp
Index compressed_index;
RelOptInfo *compressed_rel;
Path *path;
List *compression_info = get_hypertablecompression_info(ht->fd.id);
List *compression_info = ts_hypertable_compression_get(ht->fd.id);
/*
* since we rely on parallel coordination from the scan below
* this node it is probably not beneficial to have more

View File

@ -170,7 +170,7 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
if (eflags & EXEC_FLAG_BACKWARD)
state->reverse = !state->reverse;
state->hypertable_compression_info = get_hypertablecompression_info(state->hypertable_id);
state->hypertable_compression_info = ts_hypertable_compression_get(state->hypertable_id);
initialize_column_state(state);

View File

@ -84,6 +84,16 @@ WHERE hypertable.table_name like 'test1';
26
(1 row)
--make sure there are no orphaned _timescaledb_catalog.compression_chunk_size entries (should be 0)
SELECT count(*) as orphaned_compression_chunk_size
FROM _timescaledb_catalog.compression_chunk_size size
LEFT JOIN _timescaledb_catalog.chunk chunk ON (chunk.id = size.chunk_id)
WHERE chunk.id IS NULL;
orphaned_compression_chunk_size
---------------------------------
0
(1 row)
SELECT count(*) as count_chunks_compressed
FROM _timescaledb_catalog.chunk chunk
INNER JOIN _timescaledb_catalog.hypertable comp_hyper ON (chunk.hypertable_id = comp_hyper.id)
@ -232,6 +242,16 @@ WHERE uncomp_hyper.table_name like 'test1';
1
(1 row)
--make sure there are no orphaned _timescaledb_catalog.compression_chunk_size entries (should be 0)
SELECT count(*) as orphaned_compression_chunk_size
FROM _timescaledb_catalog.compression_chunk_size size
LEFT JOIN _timescaledb_catalog.chunk chunk ON (chunk.id = size.chunk_id)
WHERE chunk.id IS NULL;
orphaned_compression_chunk_size
---------------------------------
0
(1 row)
--
-- DROP HYPERTABLE
--
@ -262,6 +282,12 @@ SELECT count(*) FROM _timescaledb_catalog.hypertable hypertable;
0
(1 row)
SELECT count(*) FROM _timescaledb_catalog.hypertable_compression;
count
-------
0
(1 row)
ROLLBACK;
--create a dependent object on the compressed hypertable to test cascade behaviour
CREATE VIEW dependent_1 AS SELECT * FROM :COMPRESSED_HYPER_NAME;
@ -269,12 +295,25 @@ CREATE VIEW dependent_1 AS SELECT * FROM :COMPRESSED_HYPER_NAME;
DROP TABLE :UNCOMPRESSED_HYPER_NAME;
ERROR: cannot drop table _timescaledb_internal._compressed_hypertable_2 because other objects depend on it
\set ON_ERROR_STOP 1
BEGIN;
DROP TABLE :UNCOMPRESSED_HYPER_NAME CASCADE;
NOTICE: drop cascades to 2 other objects
--verify that there are no more hypertable remaining
SELECT count(*) FROM _timescaledb_catalog.hypertable hypertable
SELECT count(*) FROM _timescaledb_catalog.hypertable hypertable;
count
-------
0
(1 row)
ROLLBACK;
DROP VIEW dependent_1;
--create a cont agg view on the ht as well then the drop should nuke everything
--TODO put back when cont aggs work
--CREATE VIEW test1_cont_view WITH ( timescaledb.continuous, timescaledb.refresh_interval='72 hours')
--AS SELECT time_bucket('1 hour', "Time"), SUM(i)
-- FROM test1
-- GROUP BY 1;
--REFRESH MATERIALIZED VIEW test1_cont_view;
--SELECT count(*) FROM test1_cont_view;
--DROP TABLE :UNCOMPRESSED_HYPER_NAME CASCADE;
--verify that there are no more hypertable remaining
--SELECT count(*) FROM _timescaledb_catalog.hypertable hypertable;

View File

@ -48,6 +48,12 @@ FROM _timescaledb_catalog.chunk chunk
INNER JOIN _timescaledb_catalog.hypertable hypertable ON (chunk.hypertable_id = hypertable.id)
WHERE hypertable.table_name like 'test1';
--make sure there are no orphaned _timescaledb_catalog.compression_chunk_size entries (should be 0)
SELECT count(*) as orphaned_compression_chunk_size
FROM _timescaledb_catalog.compression_chunk_size size
LEFT JOIN _timescaledb_catalog.chunk chunk ON (chunk.id = size.chunk_id)
WHERE chunk.id IS NULL;
SELECT count(*) as count_chunks_compressed
FROM _timescaledb_catalog.chunk chunk
INNER JOIN _timescaledb_catalog.hypertable comp_hyper ON (chunk.hypertable_id = comp_hyper.id)
@ -144,6 +150,11 @@ INNER JOIN _timescaledb_catalog.hypertable comp_hyper ON (chunk.hypertable_id =
INNER JOIN _timescaledb_catalog.hypertable uncomp_hyper ON (comp_hyper.id = uncomp_hyper.compressed_hypertable_id)
WHERE uncomp_hyper.table_name like 'test1';
--make sure there are no orphaned _timescaledb_catalog.compression_chunk_size entries (should be 0)
SELECT count(*) as orphaned_compression_chunk_size
FROM _timescaledb_catalog.compression_chunk_size size
LEFT JOIN _timescaledb_catalog.chunk chunk ON (chunk.id = size.chunk_id)
WHERE chunk.id IS NULL;
--
-- DROP HYPERTABLE
@ -170,6 +181,7 @@ DROP TABLE :UNCOMPRESSED_HYPER_NAME;
--verify that there are no more hypertable remaining
SELECT count(*) FROM _timescaledb_catalog.hypertable hypertable;
SELECT count(*) FROM _timescaledb_catalog.hypertable_compression;
ROLLBACK;
--create a dependent object on the compressed hypertable to test cascade behaviour
@ -179,7 +191,23 @@ CREATE VIEW dependent_1 AS SELECT * FROM :COMPRESSED_HYPER_NAME;
DROP TABLE :UNCOMPRESSED_HYPER_NAME;
\set ON_ERROR_STOP 1
BEGIN;
DROP TABLE :UNCOMPRESSED_HYPER_NAME CASCADE;
SELECT count(*) FROM _timescaledb_catalog.hypertable hypertable;
ROLLBACK;
DROP VIEW dependent_1;
--create a cont agg view on the ht as well then the drop should nuke everything
--TODO put back when cont aggs work
--CREATE VIEW test1_cont_view WITH ( timescaledb.continuous, timescaledb.refresh_interval='72 hours')
--AS SELECT time_bucket('1 hour', "Time"), SUM(i)
-- FROM test1
-- GROUP BY 1;
--REFRESH MATERIALIZED VIEW test1_cont_view;
--SELECT count(*) FROM test1_cont_view;
--DROP TABLE :UNCOMPRESSED_HYPER_NAME CASCADE;
--verify that there are no more hypertable remaining
SELECT count(*) FROM _timescaledb_catalog.hypertable hypertable
--SELECT count(*) FROM _timescaledb_catalog.hypertable hypertable;