diff --git a/.unreleased/pr_6178 b/.unreleased/pr_6178 new file mode 100644 index 000000000..15e1037f6 --- /dev/null +++ b/.unreleased/pr_6178 @@ -0,0 +1 @@ +Implements: #6178 Show batches/tuples decompressed during DML operations in EXPLAIN output diff --git a/src/cross_module_fn.h b/src/cross_module_fn.h index e31ec3e20..eade22adf 100644 --- a/src/cross_module_fn.h +++ b/src/cross_module_fn.h @@ -35,6 +35,7 @@ typedef struct Hypertable Hypertable; typedef struct Chunk Chunk; typedef struct ChunkInsertState ChunkInsertState; typedef struct CopyChunkState CopyChunkState; +typedef struct HypertableModifyState HypertableModifyState; typedef struct CrossModuleFunctions { @@ -139,7 +140,7 @@ typedef struct CrossModuleFunctions PGFunction decompress_chunk; void (*decompress_batches_for_insert)(ChunkInsertState *state, Chunk *chunk, TupleTableSlot *slot); - bool (*decompress_target_segments)(ModifyTableState *ps); + bool (*decompress_target_segments)(HypertableModifyState *ht_state); /* The compression functions below are not installed in SQL as part of create extension; * They are installed and tested during testing scripts. They are exposed in cross-module * functions because they may be very useful for debugging customer problems if the sql diff --git a/src/nodes/chunk_dispatch/chunk_dispatch.h b/src/nodes/chunk_dispatch/chunk_dispatch.h index c7c849fe8..85bed5d69 100644 --- a/src/nodes/chunk_dispatch/chunk_dispatch.h +++ b/src/nodes/chunk_dispatch/chunk_dispatch.h @@ -27,7 +27,7 @@ typedef struct ChunkDispatch { /* Link to the executor state for INSERTs. This is not set for COPY path. */ - const struct ChunkDispatchState *dispatch_state; + struct ChunkDispatchState *dispatch_state; Hypertable *hypertable; SubspaceStore *cache; EState *estate; @@ -74,6 +74,8 @@ typedef struct ChunkDispatchState ResultRelInfo *rri; /* flag to represent dropped attributes */ bool is_dropped_attr_exists; + int64 batches_decompressed; + int64 tuples_decompressed; } ChunkDispatchState; extern TSDLLEXPORT bool ts_is_chunk_dispatch_state(PlanState *state); diff --git a/src/nodes/chunk_dispatch/chunk_insert_state.c b/src/nodes/chunk_dispatch/chunk_insert_state.c index c44779a65..a65096b0a 100644 --- a/src/nodes/chunk_dispatch/chunk_insert_state.c +++ b/src/nodes/chunk_dispatch/chunk_insert_state.c @@ -595,6 +595,7 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch) CheckValidResultRel(relinfo, chunk_dispatch_get_cmd_type(dispatch)); state = palloc0(sizeof(ChunkInsertState)); + state->cds = dispatch->dispatch_state; state->mctx = cis_context; state->rel = rel; state->result_relation_info = relinfo; diff --git a/src/nodes/chunk_dispatch/chunk_insert_state.h b/src/nodes/chunk_dispatch/chunk_insert_state.h index fe3cf1316..d3428c314 100644 --- a/src/nodes/chunk_dispatch/chunk_insert_state.h +++ b/src/nodes/chunk_dispatch/chunk_insert_state.h @@ -15,6 +15,7 @@ #include "cross_module_fn.h" typedef struct TSCopyMultiInsertBuffer TSCopyMultiInsertBuffer; +typedef struct ChunkDispatchState ChunkDispatchState; typedef struct ChunkInsertState { @@ -22,6 +23,7 @@ typedef struct ChunkInsertState ResultRelInfo *result_relation_info; /* Per-chunk arbiter indexes for ON CONFLICT handling */ List *arbiter_indexes; + ChunkDispatchState *cds; /* When the tuple descriptors for the main hypertable (root) and a chunk * differs, it is necessary to convert tuples to chunk format before diff --git a/src/nodes/hypertable_modify.c b/src/nodes/hypertable_modify.c index afc541456..77135e3af 100644 --- a/src/nodes/hypertable_modify.c +++ b/src/nodes/hypertable_modify.c @@ -242,6 +242,32 @@ hypertable_modify_explain(CustomScanState *node, List *ancestors, ExplainState * mtstate->ps.instrument = node->ss.ps.instrument; #endif + /* + * For INSERT we have to read the number of decompressed batches and + * tuples from the ChunkDispatchState below the ModifyTable. + */ + if ((mtstate->operation == CMD_INSERT +#if PG15_GE + || mtstate->operation == CMD_MERGE +#endif + ) && + outerPlanState(mtstate)) + { + List *chunk_dispatch_states = get_chunk_dispatch_states(outerPlanState(mtstate)); + ListCell *lc; + + foreach (lc, chunk_dispatch_states) + { + ChunkDispatchState *cds = (ChunkDispatchState *) lfirst(lc); + state->batches_decompressed += cds->batches_decompressed; + state->tuples_decompressed += cds->tuples_decompressed; + } + } + if (state->batches_decompressed > 0) + ExplainPropertyInteger("Batches decompressed", NULL, state->batches_decompressed, es); + if (state->tuples_decompressed > 0) + ExplainPropertyInteger("Tuples decompressed", NULL, state->tuples_decompressed, es); + if (NULL != state->fdwroutine) { appendStringInfo(es->str, "Insert on distributed hypertable"); @@ -793,7 +819,7 @@ ExecModifyTable(CustomScanState *cs_node, PlanState *pstate) { if (ts_cm_functions->decompress_target_segments) { - ts_cm_functions->decompress_target_segments(node); + ts_cm_functions->decompress_target_segments(ht_state); ht_state->comp_chunks_processed = true; /* * save snapshot set during ExecutorStart(), since this is the same diff --git a/src/nodes/hypertable_modify.h b/src/nodes/hypertable_modify.h index e69df42a4..581c45ed8 100644 --- a/src/nodes/hypertable_modify.h +++ b/src/nodes/hypertable_modify.h @@ -30,6 +30,8 @@ typedef struct HypertableModifyState bool comp_chunks_processed; Snapshot snapshot; FdwRoutine *fdwroutine; + int64 tuples_decompressed; + int64 batches_decompressed; } HypertableModifyState; extern void ts_hypertable_modify_fixup_tlist(Plan *plan); diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index fcdf77417..5fa698b28 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -59,6 +59,7 @@ #include "gorilla.h" #include "guc.h" #include "nodes/chunk_dispatch/chunk_insert_state.h" +#include "nodes/hypertable_modify.h" #include "indexing.h" #include "segment_meta.h" #include "ts_catalog/compression_chunk_size.h" @@ -1556,6 +1557,7 @@ row_decompressor_decompress_row(RowDecompressor *decompressor, Tuplesortstate *t decompressor->compressed_datums, decompressor->compressed_is_nulls); + decompressor->batches_decompressed++; do { /* we're done if all the decompressors return NULL */ @@ -1578,6 +1580,7 @@ row_decompressor_decompress_row(RowDecompressor *decompressor, Tuplesortstate *t decompressor->decompressed_datums, decompressor->decompressed_is_nulls); TupleTableSlot *slot = MakeSingleTupleTableSlot(decompressor->out_desc, &TTSOpsVirtual); + decompressor->tuples_decompressed++; if (tuplesortstate == NULL) { @@ -2097,6 +2100,8 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo &tmfd, false); Assert(result == TM_Ok); + cis->cds->batches_decompressed += decompressor.batches_decompressed; + cis->cds->tuples_decompressed += decompressor.tuples_decompressed; } table_endscan(heapScan); @@ -3199,7 +3204,8 @@ decompress_batches_using_index(RowDecompressor *decompressor, Relation index_rel * 4. Update catalog table to change status of moved chunk. */ static void -decompress_batches_for_update_delete(Chunk *chunk, List *predicates, EState *estate) +decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chunk, + List *predicates, EState *estate) { /* process each chunk with its corresponding predicates */ @@ -3292,6 +3298,8 @@ decompress_batches_for_update_delete(Chunk *chunk, List *predicates, EState *est filter = lfirst(lc); pfree(filter); } + ht_state->batches_decompressed += decompressor.batches_decompressed; + ht_state->tuples_decompressed += decompressor.tuples_decompressed; } /* @@ -3299,19 +3307,32 @@ decompress_batches_for_update_delete(Chunk *chunk, List *predicates, EState *est * Once Scan node is found check if chunk is compressed, if so then * decompress those segments which match the filter conditions if present. */ -static bool decompress_chunk_walker(PlanState *ps, List *relids); + +struct decompress_chunk_context +{ + List *relids; + HypertableModifyState *ht_state; +}; + +static bool decompress_chunk_walker(PlanState *ps, struct decompress_chunk_context *ctx); bool -decompress_target_segments(ModifyTableState *ps) +decompress_target_segments(HypertableModifyState *ht_state) { - List *relids = castNode(ModifyTable, ps->ps.plan)->resultRelations; - Assert(relids); + ModifyTableState *ps = + linitial_node(ModifyTableState, castNode(CustomScanState, ht_state)->custom_ps); - return decompress_chunk_walker(&ps->ps, relids); + struct decompress_chunk_context ctx = { + .ht_state = ht_state, + .relids = castNode(ModifyTable, ps->ps.plan)->resultRelations, + }; + Assert(ctx.relids); + + return decompress_chunk_walker(&ps->ps, &ctx); } static bool -decompress_chunk_walker(PlanState *ps, List *relids) +decompress_chunk_walker(PlanState *ps, struct decompress_chunk_context *ctx) { RangeTblEntry *rte = NULL; bool needs_decompression = false; @@ -3330,7 +3351,7 @@ decompress_chunk_walker(PlanState *ps, List *relids) case T_IndexScanState: { /* Get the index quals on the original table and also include - * any filters that are used to for filtering heap tuples + * any filters that are used for filtering heap tuples */ predicates = list_union(((IndexScan *) ps->plan)->indexqualorig, ps->plan->qual); needs_decompression = true; @@ -3362,7 +3383,7 @@ decompress_chunk_walker(PlanState *ps, List *relids) * even when it is a self join */ int scanrelid = ((Scan *) ps->plan)->scanrelid; - if (list_member_int(relids, scanrelid)) + if (list_member_int(ctx->relids, scanrelid)) { rte = rt_fetch(scanrelid, ps->state->es_range_table); current_chunk = ts_chunk_get_by_relid(rte->relid, false); @@ -3374,7 +3395,10 @@ decompress_chunk_walker(PlanState *ps, List *relids) errmsg("UPDATE/DELETE is disabled on compressed chunks"), errhint("Set timescaledb.enable_dml_decompression to TRUE."))); - decompress_batches_for_update_delete(current_chunk, predicates, ps->state); + decompress_batches_for_update_delete(ctx->ht_state, + current_chunk, + predicates, + ps->state); /* This is a workaround specifically for bitmap heap scans: * during node initialization, initialize the scan state with the active snapshot @@ -3400,7 +3424,7 @@ decompress_chunk_walker(PlanState *ps, List *relids) if (predicates) pfree(predicates); - return planstate_tree_walker(ps, decompress_chunk_walker, relids); + return planstate_tree_walker(ps, decompress_chunk_walker, ctx); } #endif diff --git a/tsl/src/compression/compression.h b/tsl/src/compression/compression.h index fbfd6e49b..db850d405 100644 --- a/tsl/src/compression/compression.h +++ b/tsl/src/compression/compression.h @@ -150,6 +150,8 @@ typedef struct RowDecompressor bool *decompressed_is_nulls; MemoryContext per_compressed_row_ctx; + int64 batches_decompressed; + int64 tuples_decompressed; } RowDecompressor; /* @@ -323,7 +325,8 @@ typedef struct ChunkInsertState ChunkInsertState; extern void decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlot *slot); #if PG14_GE -extern bool decompress_target_segments(ModifyTableState *ps); +typedef struct HypertableModifyState HypertableModifyState; +extern bool decompress_target_segments(HypertableModifyState *ht_state); #endif /* CompressSingleRowState methods */ struct CompressSingleRowState; diff --git a/tsl/test/shared/expected/decompress_tracking.out b/tsl/test/shared/expected/decompress_tracking.out new file mode 100644 index 000000000..86132c9ce --- /dev/null +++ b/tsl/test/shared/expected/decompress_tracking.out @@ -0,0 +1,100 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\set EXPLAIN 'EXPLAIN (costs off,timing off,summary off)' +\set EXPLAIN_ANALYZE 'EXPLAIN (analyze,costs off,timing off,summary off)' +CREATE TABLE decompress_tracking(time timestamptz not null, device text, value float, primary key(time, device)); +SELECT table_name FROM create_hypertable('decompress_tracking','time'); + table_name + decompress_tracking +(1 row) + +ALTER TABLE decompress_tracking SET (timescaledb.compress, timescaledb.compress_segmentby='device'); +INSERT INTO decompress_tracking SELECT '2020-01-01'::timestamptz + format('%s hour', g)::interval, 'd1', random() FROM generate_series(1,10) g; +INSERT INTO decompress_tracking SELECT '2020-01-01'::timestamptz + format('%s hour', g)::interval, 'd2', random() FROM generate_series(1,20) g; +INSERT INTO decompress_tracking SELECT '2020-01-01'::timestamptz + format('%s hour', g)::interval, 'd3', random() FROM generate_series(1,30) g; +SELECT count(compress_chunk(ch)) FROM show_chunks('decompress_tracking') ch; + count + 2 +(1 row) + +-- no tracking without analyze +:EXPLAIN UPDATE decompress_tracking SET value = value + 3; +QUERY PLAN + Custom Scan (HypertableModify) + -> Update on decompress_tracking + Update on _hyper_X_X_chunk decompress_tracking_1 + Update on _hyper_X_X_chunk decompress_tracking_2 + -> Result + -> Append + -> Seq Scan on _hyper_X_X_chunk decompress_tracking_1 + -> Seq Scan on _hyper_X_X_chunk decompress_tracking_2 +(8 rows) + +BEGIN; :EXPLAIN_ANALYZE UPDATE decompress_tracking SET value = value + 3; ROLLBACK; +QUERY PLAN + Custom Scan (HypertableModify) (actual rows=0 loops=1) + Batches decompressed: 5 + Tuples decompressed: 60 + -> Update on decompress_tracking (actual rows=0 loops=1) + Update on _hyper_X_X_chunk decompress_tracking_1 + Update on _hyper_X_X_chunk decompress_tracking_2 + -> Result (actual rows=60 loops=1) + -> Append (actual rows=60 loops=1) + -> Seq Scan on _hyper_X_X_chunk decompress_tracking_1 (actual rows=40 loops=1) + -> Seq Scan on _hyper_X_X_chunk decompress_tracking_2 (actual rows=20 loops=1) +(10 rows) + +BEGIN; :EXPLAIN_ANALYZE DELETE FROM decompress_tracking; ROLLBACK; +QUERY PLAN + Custom Scan (HypertableModify) (actual rows=0 loops=1) + Batches decompressed: 5 + Tuples decompressed: 60 + -> Delete on decompress_tracking (actual rows=0 loops=1) + Delete on _hyper_X_X_chunk decompress_tracking_1 + Delete on _hyper_X_X_chunk decompress_tracking_2 + -> Append (actual rows=60 loops=1) + -> Seq Scan on _hyper_X_X_chunk decompress_tracking_1 (actual rows=40 loops=1) + -> Seq Scan on _hyper_X_X_chunk decompress_tracking_2 (actual rows=20 loops=1) +(9 rows) + +BEGIN; :EXPLAIN_ANALYZE INSERT INTO decompress_tracking SELECT '2020-01-01 1:30','d1',random(); ROLLBACK; +QUERY PLAN + Custom Scan (HypertableModify) (actual rows=0 loops=1) + Batches decompressed: 1 + Tuples decompressed: 10 + -> Insert on decompress_tracking (actual rows=0 loops=1) + -> Custom Scan (ChunkDispatch) (actual rows=1 loops=1) + -> Subquery Scan on "*SELECT*" (actual rows=1 loops=1) + -> Result (actual rows=1 loops=1) +(7 rows) + +BEGIN; :EXPLAIN_ANALYZE INSERT INTO decompress_tracking SELECT '2020-01-01','d2',random(); ROLLBACK; +QUERY PLAN + Custom Scan (HypertableModify) (actual rows=0 loops=1) + -> Insert on decompress_tracking (actual rows=0 loops=1) + -> Custom Scan (ChunkDispatch) (actual rows=1 loops=1) + -> Subquery Scan on "*SELECT*" (actual rows=1 loops=1) + -> Result (actual rows=1 loops=1) +(5 rows) + +BEGIN; :EXPLAIN_ANALYZE INSERT INTO decompress_tracking SELECT '2020-01-01','d4',random(); ROLLBACK; +QUERY PLAN + Custom Scan (HypertableModify) (actual rows=0 loops=1) + -> Insert on decompress_tracking (actual rows=0 loops=1) + -> Custom Scan (ChunkDispatch) (actual rows=1 loops=1) + -> Subquery Scan on "*SELECT*" (actual rows=1 loops=1) + -> Result (actual rows=1 loops=1) +(5 rows) + +BEGIN; :EXPLAIN_ANALYZE INSERT INTO decompress_tracking (VALUES ('2020-01-01 1:30','d1',random()),('2020-01-01 1:30','d2',random())); ROLLBACK; +QUERY PLAN + Custom Scan (HypertableModify) (actual rows=0 loops=1) + Batches decompressed: 2 + Tuples decompressed: 25 + -> Insert on decompress_tracking (actual rows=0 loops=1) + -> Custom Scan (ChunkDispatch) (actual rows=2 loops=1) + -> Values Scan on "*VALUES*" (actual rows=2 loops=1) +(6 rows) + +DROP TABLE decompress_tracking; diff --git a/tsl/test/shared/sql/CMakeLists.txt b/tsl/test/shared/sql/CMakeLists.txt index 3b3d6fbc4..6c6ca3204 100644 --- a/tsl/test/shared/sql/CMakeLists.txt +++ b/tsl/test/shared/sql/CMakeLists.txt @@ -15,7 +15,8 @@ set(TEST_TEMPLATES_SHARED transparent_decompress_chunk.sql.in space_constraint.sql.in) if((${PG_VERSION_MAJOR} GREATER_EQUAL "14")) - list(APPEND TEST_FILES_SHARED compression_dml.sql memoize.sql) + list(APPEND TEST_FILES_SHARED compression_dml.sql decompress_tracking.sql + memoize.sql) endif() # this test was changing the contents of tables in shared_setup.sql thus causing diff --git a/tsl/test/shared/sql/decompress_tracking.sql b/tsl/test/shared/sql/decompress_tracking.sql new file mode 100644 index 000000000..ab09d8ac9 --- /dev/null +++ b/tsl/test/shared/sql/decompress_tracking.sql @@ -0,0 +1,28 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. + +\set EXPLAIN 'EXPLAIN (costs off,timing off,summary off)' +\set EXPLAIN_ANALYZE 'EXPLAIN (analyze,costs off,timing off,summary off)' + +CREATE TABLE decompress_tracking(time timestamptz not null, device text, value float, primary key(time, device)); +SELECT table_name FROM create_hypertable('decompress_tracking','time'); +ALTER TABLE decompress_tracking SET (timescaledb.compress, timescaledb.compress_segmentby='device'); + +INSERT INTO decompress_tracking SELECT '2020-01-01'::timestamptz + format('%s hour', g)::interval, 'd1', random() FROM generate_series(1,10) g; +INSERT INTO decompress_tracking SELECT '2020-01-01'::timestamptz + format('%s hour', g)::interval, 'd2', random() FROM generate_series(1,20) g; +INSERT INTO decompress_tracking SELECT '2020-01-01'::timestamptz + format('%s hour', g)::interval, 'd3', random() FROM generate_series(1,30) g; + +SELECT count(compress_chunk(ch)) FROM show_chunks('decompress_tracking') ch; + +-- no tracking without analyze +:EXPLAIN UPDATE decompress_tracking SET value = value + 3; + +BEGIN; :EXPLAIN_ANALYZE UPDATE decompress_tracking SET value = value + 3; ROLLBACK; +BEGIN; :EXPLAIN_ANALYZE DELETE FROM decompress_tracking; ROLLBACK; +BEGIN; :EXPLAIN_ANALYZE INSERT INTO decompress_tracking SELECT '2020-01-01 1:30','d1',random(); ROLLBACK; +BEGIN; :EXPLAIN_ANALYZE INSERT INTO decompress_tracking SELECT '2020-01-01','d2',random(); ROLLBACK; +BEGIN; :EXPLAIN_ANALYZE INSERT INTO decompress_tracking SELECT '2020-01-01','d4',random(); ROLLBACK; +BEGIN; :EXPLAIN_ANALYZE INSERT INTO decompress_tracking (VALUES ('2020-01-01 1:30','d1',random()),('2020-01-01 1:30','d2',random())); ROLLBACK; + +DROP TABLE decompress_tracking;