Add recompress_chunk function

After inserts go into a compressed chunk, the chunk is marked as
unordered.This PR adds a new function recompress_chunk that
compresses the data and sets the status back to compressed. Further
optimizations for this function are planned but not part of this PR.

This function can be invoked by calling
SELECT recompress_chunk(<chunk_name>).

recompress_chunk function is automatically invoked by the compression
policy job, when it sees that a chunk is in unordered state.
This commit is contained in:
gayyappan 2021-05-24 23:12:44 +02:00 committed by gayyappan
parent 2c05ccf1e7
commit 4f865f7870
22 changed files with 758 additions and 32 deletions

View File

@ -28,3 +28,9 @@ CREATE OR REPLACE FUNCTION decompress_chunk(
uncompressed_chunk REGCLASS,
if_compressed BOOLEAN = false
) RETURNS REGCLASS AS '@MODULE_PATHNAME@', 'ts_decompress_chunk' LANGUAGE C STRICT VOLATILE;
CREATE OR REPLACE FUNCTION recompress_chunk(
chunk REGCLASS,
if_not_compressed BOOLEAN = false
) RETURNS REGCLASS AS '@MODULE_PATHNAME@', 'ts_recompress_chunk' LANGUAGE C STRICT VOLATILE;

View File

@ -3731,12 +3731,12 @@ ts_chunks_in(PG_FUNCTION_ARGS)
pg_unreachable();
}
/* Check if this chunk can be compressed, that it is not dropped and has not
* already been compressed. */
bool
ts_chunk_can_be_compressed(int32 chunk_id)
/* Return the compression status for the chunk
*/
ChunkCompressionStatus
ts_chunk_get_compression_status(int32 chunk_id)
{
bool can_be_compressed = false;
ChunkCompressionStatus st = CHUNK_COMPRESS_NONE;
ScanIterator iterator = ts_scan_iterator_create(CHUNK, AccessShareLock, CurrentMemoryContext);
iterator.ctx.index = catalog_get_index(ts_catalog_get(), CHUNK, CHUNK_ID_INDEX);
ts_scan_iterator_scan_key_init(&iterator,
@ -3748,30 +3748,50 @@ ts_chunk_can_be_compressed(int32 chunk_id)
ts_scanner_foreach(&iterator)
{
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
bool dropped_isnull, status_isnull, status_is_compressed;
Datum dropped, status;
bool dropped_isnull, status_isnull;
Datum status;
dropped = slot_getattr(ti->slot, Anum_chunk_dropped, &dropped_isnull);
bool dropped = DatumGetBool(slot_getattr(ti->slot, Anum_chunk_dropped, &dropped_isnull));
Assert(!dropped_isnull);
status = slot_getattr(ti->slot, Anum_chunk_status, &status_isnull);
Assert(!status_isnull);
status_is_compressed = ts_flags_are_set_32(DatumGetInt32(status), CHUNK_STATUS_COMPRESSED);
can_be_compressed = !DatumGetBool(dropped) && !status_is_compressed;
if (!dropped)
{
bool status_is_compressed =
ts_flags_are_set_32(DatumGetInt32(status), CHUNK_STATUS_COMPRESSED);
bool status_is_unordered =
ts_flags_are_set_32(DatumGetInt32(status), CHUNK_STATUS_COMPRESSED_UNORDERED);
if (status_is_unordered)
{
Assert(status_is_compressed);
st = CHUNK_COMPRESS_UNORDERED;
}
else if (status_is_compressed)
st = CHUNK_COMPRESS_ORDERED;
else
st = CHUNK_COMPRESS_NONE;
}
else
st = CHUNK_DROPPED;
}
ts_scan_iterator_close(&iterator);
return can_be_compressed;
return st;
}
/*Note that only a compressed chunk can have unordered flag set */
bool
ts_chunk_is_unordered(const Chunk *chunk)
{
/* only compressed Chunks can be unordered so we should never be
* called for uncompressed chunks */
Assert(ts_flags_are_set_32(chunk->fd.status, CHUNK_STATUS_COMPRESSED));
return ts_flags_are_set_32(chunk->fd.status, CHUNK_STATUS_COMPRESSED_UNORDERED);
}
bool
ts_chunk_is_compressed(const Chunk *chunk)
{
return ts_flags_are_set_32(chunk->fd.status, CHUNK_STATUS_COMPRESSED);
}
Datum
ts_chunk_show(PG_FUNCTION_ARGS)
{

View File

@ -25,6 +25,17 @@
#define DROP_CHUNKS_NARGS 4
#define COMPRESS_CHUNK_FUNCNAME "compress_chunk"
#define COMPRESS_CHUNK_NARGS 2
#define DECOMPRESS_CHUNK_FUNCNAME "decompress_chunk"
#define RECOMPRESS_CHUNK_FUNCNAME "recompress_chunk"
#define RECOMPRESS_CHUNK_NARGS 2
typedef enum ChunkCompressionStatus
{
CHUNK_COMPRESS_NONE = 0,
CHUNK_COMPRESS_UNORDERED,
CHUNK_COMPRESS_ORDERED,
CHUNK_DROPPED
} ChunkCompressionStatus;
typedef struct Hypercube Hypercube;
typedef struct Point Point;
@ -162,8 +173,9 @@ extern TSDLLEXPORT Chunk *ts_chunk_find_or_create_without_cuts(Hypertable *ht, H
bool *created);
extern TSDLLEXPORT Chunk *ts_chunk_get_compressed_chunk_parent(const Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_is_unordered(const Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_is_compressed(const Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_contains_compressed_data(const Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_can_be_compressed(int32 chunk_id);
extern TSDLLEXPORT ChunkCompressionStatus ts_chunk_get_compression_status(int32 chunk_id);
extern TSDLLEXPORT Datum ts_chunk_id_from_relid(PG_FUNCTION_ARGS);
extern TSDLLEXPORT List *ts_chunk_get_chunk_ids_by_hypertable_id(int32 hypertable_id);
extern TSDLLEXPORT List *ts_chunk_get_data_node_name_list(const Chunk *chunk);

View File

@ -64,6 +64,7 @@ CROSSMODULE_WRAPPER(array_compressor_append);
CROSSMODULE_WRAPPER(array_compressor_finish);
CROSSMODULE_WRAPPER(compress_chunk);
CROSSMODULE_WRAPPER(decompress_chunk);
CROSSMODULE_WRAPPER(recompress_chunk);
/* continous aggregate */
CROSSMODULE_WRAPPER(continuous_agg_invalidation_trigger);
@ -345,6 +346,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.process_compress_table = process_compress_table_default,
.compress_chunk = error_no_default_fn_pg_community,
.decompress_chunk = error_no_default_fn_pg_community,
.recompress_chunk = error_no_default_fn_pg_community,
.compressed_data_decompress_forward = error_no_default_fn_pg_community,
.compressed_data_decompress_reverse = error_no_default_fn_pg_community,
.deltadelta_compressor_append = error_no_default_fn_pg_community,

View File

@ -103,6 +103,7 @@ typedef struct CrossModuleFunctions
void (*process_rename_cmd)(Hypertable *ht, const RenameStmt *stmt);
PGFunction compress_chunk;
PGFunction decompress_chunk;
PGFunction recompress_chunk;
/* The compression functions below are not installed in SQL as part of create extension;
* They are installed and tested during testing scripts. They are exposed in cross-module
* functions because they may be very useful for debugging customer problems if the sql

View File

@ -897,7 +897,7 @@ dimension_slice_check_chunk_stats_tuple_found(TupleInfo *ti, void *data)
BgwPolicyChunkStats *chunk_stat = ts_bgw_policy_chunk_stats_find(info->job_id, chunk_id);
if ((chunk_stat == NULL || chunk_stat->fd.num_times_job_run == 0) &&
ts_chunk_can_be_compressed(chunk_id))
ts_chunk_get_compression_status(chunk_id) == CHUNK_COMPRESS_NONE)
{
/* Save the chunk_id */
info->chunk_id = chunk_id;
@ -943,9 +943,12 @@ dimension_slice_check_is_chunk_uncompressed_tuple_found(TupleInfo *ti, void *dat
foreach (lc, chunk_ids)
{
int32 chunk_id = lfirst_int(lc);
if (ts_chunk_can_be_compressed(chunk_id))
ChunkCompressionStatus st = ts_chunk_get_compression_status(chunk_id);
if (st == CHUNK_COMPRESS_NONE || st == CHUNK_COMPRESS_UNORDERED)
{
/* found a chunk that has not yet been compressed */
/* found a chunk that is not compressed or needs recompress
* caller needs to check the correct chunk status
*/
*((int32 *) data) = chunk_id;
return SCAN_DONE;
}

View File

@ -764,14 +764,28 @@ ts_chunk_insert_state_destroy(ChunkInsertState *state)
ExecCloseIndices(state->result_relation_info);
table_close(state->rel, NoLock);
/* TODO check if we nee to keep rel open for this case. Is compress_rel
* sufficient?
*/
if (state->compress_rel)
{
ResultRelInfo *orig_chunk_rri = state->orig_result_relation_info;
Oid chunk_relid = RelationGetRelid(orig_chunk_rri->ri_RelationDesc);
table_close(state->compress_rel, NoLock);
ts_cm_functions->compress_row_end(state->compress_state);
ts_cm_functions->compress_row_destroy(state->compress_state);
Chunk *chunk = ts_chunk_get_by_relid(chunk_relid, true);
if (!ts_chunk_is_unordered(chunk))
ts_chunk_set_unordered(chunk);
}
else if (RelationGetForm(state->result_relation_info->ri_RelationDesc)->relkind ==
RELKIND_FOREIGN_TABLE)
{
/* If a distributed chunk shows compressed status on AN,
* we mark it as unordered , because the insert now goes into
* a previously compressed chunk
*/
Oid chunk_relid = RelationGetRelid(state->result_relation_info->ri_RelationDesc);
Chunk *chunk = ts_chunk_get_by_relid(chunk_relid, true);
if (ts_chunk_is_compressed(chunk) && (!ts_chunk_is_unordered(chunk)))
ts_chunk_set_unordered(chunk);
}
if (NULL != state->slot)

View File

@ -55,6 +55,7 @@ WHERE oid IN (
last
locf
move_chunk
recompress_chunk
refresh_continuous_aggregate
remove_compression_policy
remove_continuous_aggregate_policy
@ -75,5 +76,5 @@ WHERE oid IN (
timescaledb_fdw_validator
timescaledb_post_restore
timescaledb_pre_restore
(56 rows)
(57 rows)

View File

@ -399,8 +399,65 @@ policy_invoke_compress_chunk(Chunk *chunk)
castNode(Const, makeBoolConst(true, false)),
};
Oid type_id[COMPRESS_CHUNK_NARGS] = { REGCLASSOID, BOOLOID };
char *const schema_name = ts_extension_schema_name();
List *const fqn = list_make2(makeString(schema_name), makeString(COMPRESS_CHUNK_FUNCNAME));
char *schema_name = ts_extension_schema_name();
List *fqn = list_make2(makeString(schema_name), makeString(COMPRESS_CHUNK_FUNCNAME));
StaticAssertStmt(lengthof(type_id) == lengthof(argarr),
"argarr and type_id should have matching lengths");
func_oid = LookupFuncName(fqn, lengthof(type_id), type_id, false);
Assert(func_oid); /* LookupFuncName should not return an invalid OID */
/* Prepare the function expr with argument list */
get_func_result_type(func_oid, &restype, NULL);
for (i = 0; i < lengthof(argarr); i++)
args = lappend(args, argarr[i]);
fexpr = makeFuncExpr(func_oid, restype, args, InvalidOid, InvalidOid, COERCE_EXPLICIT_CALL);
fexpr->funcretset = false;
estate = CreateExecutorState();
econtext = CreateExprContext(estate);
ExprState *exprstate = ExecInitExpr(&fexpr->xpr, NULL);
ExecEvalExprSwitchContext(exprstate, econtext, &isnull);
/* Cleanup */
FreeExprContext(econtext, false);
FreeExecutorState(estate);
}
/*
* Invoke recompress_chunk via fmgr so that the call can be deparsed and sent to
* remote data nodes.
*/
static void
policy_invoke_recompress_chunk(Chunk *chunk)
{
EState *estate;
ExprContext *econtext;
FuncExpr *fexpr;
Oid relid = chunk->table_id;
Oid restype;
Oid func_oid;
List *args = NIL;
int i;
bool isnull;
Const *argarr[RECOMPRESS_CHUNK_NARGS] = {
makeConst(REGCLASSOID,
-1,
InvalidOid,
sizeof(relid),
ObjectIdGetDatum(relid),
false,
false),
castNode(Const, makeBoolConst(true, false)),
};
Oid type_id[RECOMPRESS_CHUNK_NARGS] = { REGCLASSOID, BOOLOID };
char *schema_name = ts_extension_schema_name();
List *fqn = list_make2(makeString(schema_name), makeString(RECOMPRESS_CHUNK_FUNCNAME));
StaticAssertStmt(lengthof(type_id) == lengthof(argarr),
"argarr and type_id should have matching lengths");
@ -450,9 +507,19 @@ policy_compression_execute(int32 job_id, Jsonb *config)
{
Chunk *chunk = ts_chunk_get_by_id(chunkid, true);
if (hypertable_is_distributed(policy_data.hypertable))
policy_invoke_compress_chunk(chunk);
{
if (ts_chunk_is_unordered(chunk))
policy_invoke_recompress_chunk(chunk);
else
policy_invoke_compress_chunk(chunk);
}
else
tsl_compress_chunk_wrapper(chunk, true);
{
if (ts_chunk_is_unordered(chunk))
tsl_recompress_chunk_wrapper(chunk);
else
tsl_compress_chunk_wrapper(chunk, true);
}
elog(LOG,
"completed compressing chunk %s.%s",
NameStr(chunk->fd.schema_name),

View File

@ -16,6 +16,7 @@
#include <nodes/makefuncs.h>
#include <nodes/pg_list.h>
#include <nodes/parsenodes.h>
#include <parser/parse_func.h>
#include <storage/lmgr.h>
#include <trigger.h>
#include <utils/builtins.h>
@ -509,3 +510,115 @@ tsl_decompress_chunk(PG_FUNCTION_ARGS)
PG_RETURN_OID(uncompressed_chunk_id);
}
/* setup FunctionCallInfo for compress_chunk/decompress_chunk
* alloc memory for decompfn_fcinfo and init it.
*/
static void
get_compression_fcinfo(char *fname, FmgrInfo *decompfn, FunctionCallInfo *decompfn_fcinfo,
FunctionCallInfo orig_fcinfo)
{
/* compress_chunk, decompress_chunk have the same args */
Oid argtyp[] = { REGCLASSOID, BOOLOID };
fmNodePtr cxt =
orig_fcinfo->context; /* pass in the context from the current FunctionCallInfo */
Oid decomp_func_oid =
LookupFuncName(list_make1(makeString(fname)), lengthof(argtyp), argtyp, false);
fmgr_info(decomp_func_oid, decompfn);
*decompfn_fcinfo = HEAP_FCINFO(2);
InitFunctionCallInfoData(**decompfn_fcinfo,
decompfn,
2,
InvalidOid, /* collation */
cxt,
NULL);
FC_ARG(*decompfn_fcinfo, 0) = FC_ARG(orig_fcinfo, 0);
FC_NULL(*decompfn_fcinfo, 0) = FC_NULL(orig_fcinfo, 0);
FC_ARG(*decompfn_fcinfo, 1) = FC_ARG(orig_fcinfo, 1);
FC_NULL(*decompfn_fcinfo, 1) = FC_NULL(orig_fcinfo, 1);
}
static Datum
tsl_recompress_remote_chunk(Chunk *uncompressed_chunk, FunctionCallInfo fcinfo)
{
FmgrInfo decompfn;
FmgrInfo compfn;
FunctionCallInfo decompfn_fcinfo;
FunctionCallInfo compfn_fcinfo;
get_compression_fcinfo(DECOMPRESS_CHUNK_FUNCNAME, &decompfn, &decompfn_fcinfo, fcinfo);
FunctionCallInvoke(decompfn_fcinfo);
if (decompfn_fcinfo->isnull)
{
ereport(WARNING,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("decompression failed for recompress_chunk for %u",
uncompressed_chunk->fd.id)));
PG_RETURN_NULL();
}
get_compression_fcinfo(COMPRESS_CHUNK_FUNCNAME, &compfn, &compfn_fcinfo, fcinfo);
Datum compoid = FunctionCallInvoke(compfn_fcinfo);
if (compfn_fcinfo->isnull)
{
ereport(WARNING,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("compression failed for recompress_chunk for %u",
uncompressed_chunk->fd.id)));
PG_RETURN_NULL();
}
return compoid;
}
bool
tsl_recompress_chunk_wrapper(Chunk *uncompressed_chunk)
{
Oid uncompressed_chunk_relid = uncompressed_chunk->table_id;
if (ts_chunk_is_unordered(uncompressed_chunk))
{
if (!decompress_chunk_impl(uncompressed_chunk->hypertable_relid,
uncompressed_chunk_relid,
false))
return false;
}
Chunk *chunk = ts_chunk_get_by_relid(uncompressed_chunk_relid, true);
Assert(!ts_chunk_is_compressed(chunk));
tsl_compress_chunk_wrapper(chunk, false);
return true;
}
Datum
tsl_recompress_chunk(PG_FUNCTION_ARGS)
{
Oid uncompressed_chunk_id = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
bool if_compressed = PG_ARGISNULL(1) ? false : PG_GETARG_BOOL(1);
Chunk *uncompressed_chunk = ts_chunk_get_by_relid(uncompressed_chunk_id, true);
if (uncompressed_chunk == NULL)
elog(ERROR, "unknown chunk id %d", uncompressed_chunk_id);
if (!ts_chunk_is_unordered(uncompressed_chunk))
{
if (!ts_chunk_is_compressed(uncompressed_chunk))
{
ereport((if_compressed ? NOTICE : ERROR),
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("call compress_chunk instead of recompress_chunk")));
PG_RETURN_NULL();
}
else
{
ereport((if_compressed ? NOTICE : ERROR),
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("chunk \"%s\" is already compressed",
get_rel_name(uncompressed_chunk->table_id))));
PG_RETURN_NULL();
}
}
if (uncompressed_chunk->relkind == RELKIND_FOREIGN_TABLE)
return tsl_recompress_remote_chunk(uncompressed_chunk, fcinfo);
else
{
tsl_recompress_chunk_wrapper(uncompressed_chunk);
PG_RETURN_OID(uncompressed_chunk_id);
}
}

View File

@ -11,6 +11,8 @@
extern Datum tsl_compress_chunk(PG_FUNCTION_ARGS);
extern Datum tsl_decompress_chunk(PG_FUNCTION_ARGS);
extern Datum tsl_recompress_chunk(PG_FUNCTION_ARGS);
extern void tsl_compress_chunk_wrapper(Chunk *chunk, bool if_not_compressed);
extern bool tsl_recompress_chunk_wrapper(Chunk *chunk);
#endif /* TIMESCALEDB_TSL_COMPRESSION_UTILS_H */

View File

@ -1676,9 +1676,5 @@ compress_row_end(CompressSingleRowState *cr)
void
compress_row_destroy(CompressSingleRowState *cr)
{
Chunk *chunk = ts_chunk_get_by_relid(cr->in_rel->rd_id, true);
if (!ts_chunk_is_unordered(chunk))
ts_chunk_set_unordered(chunk);
ExecDropSingleTupleTableSlot(cr->out_slot);
}

View File

@ -146,6 +146,7 @@ CrossModuleFunctions tsl_cm_functions = {
.process_rename_cmd = tsl_process_rename_cmd,
.compress_chunk = tsl_compress_chunk,
.decompress_chunk = tsl_decompress_chunk,
.recompress_chunk = tsl_recompress_chunk,
.compress_row_init = compress_row_init,
.compress_row_exec = compress_row_exec,
.compress_row_end = compress_row_end,

View File

@ -223,3 +223,158 @@ SELECT COUNT(*) AS dropped_chunks_count
SELECT add_compression_policy AS job_id
FROM add_compression_policy('conditions', INTERVAL '1 day') \gset
CALL run_job(:job_id);
\i include/recompress_basic.sql
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
CREATE OR REPLACE VIEW compressed_chunk_info_view AS
SELECT
h.schema_name AS hypertable_schema,
h.table_name AS hypertable_name,
c.schema_name as chunk_schema,
c.table_name as chunk_name,
c.status as chunk_status,
comp.schema_name as compressed_chunk_schema,
comp.table_name as compressed_chunk_name
FROM
_timescaledb_catalog.hypertable h JOIN
_timescaledb_catalog.chunk c ON h.id = c.hypertable_id
LEFT JOIN _timescaledb_catalog.chunk comp
ON comp.id = c.compressed_chunk_id
;
CREATE TABLE test2 (timec timestamptz NOT NULL, i integer ,
b bigint, t text);
SELECT table_name from create_hypertable('test2', 'timec', chunk_time_interval=> INTERVAL '7 days');
table_name
------------
test2
(1 row)
INSERT INTO test2 SELECT q, 10, 11, 'hello' FROM generate_series( '2020-01-03 10:00:00+00', '2020-01-03 12:00:00+00' , '5 min'::interval) q;
ALTER TABLE test2 set (timescaledb.compress,
timescaledb.compress_segmentby = 'b',
timescaledb.compress_orderby = 'timec DESC');
SELECT compress_chunk(c)
FROM show_chunks('test2') c;
compress_chunk
------------------------------------------
_timescaledb_internal._hyper_10_48_chunk
(1 row)
---insert into the middle of the range ---
INSERT INTO test2 values ( '2020-01-03 10:01:00+00', 20, 11, '2row');
INSERT INTO test2 values ( '2020-01-03 11:01:00+00', 20, 11, '3row');
INSERT INTO test2 values ( '2020-01-03 12:01:00+00', 20, 11, '4row');
--- insert a new segment by ---
INSERT INTO test2 values ( '2020-01-03 11:01:00+00', 20, 12, '12row');
SELECT time_bucket(INTERVAL '2 hour', timec), b, count(*)
FROM test2
GROUP BY time_bucket(INTERVAL '2 hour', timec), b
ORDER BY 1, 2;
time_bucket | b | count
------------------------------+----+-------
Fri Jan 03 02:00:00 2020 PST | 11 | 26
Fri Jan 03 02:00:00 2020 PST | 12 | 1
Fri Jan 03 04:00:00 2020 PST | 11 | 2
(3 rows)
--check status for chunk --
SELECT chunk_status,
chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'test2' ORDER BY chunk_name;
chunk_status | CHUNK_NAME
--------------+--------------------
3 | _hyper_10_48_chunk
(1 row)
SELECT compressed_chunk_schema || '.' || compressed_chunk_name as "COMP_CHUNK_NAME",
chunk_schema || '.' || chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'test2' \gset
SELECT count(*) from test2;
count
-------
29
(1 row)
SELECT recompress_chunk(:'CHUNK_NAME'::regclass);
recompress_chunk
------------------------------------------
_timescaledb_internal._hyper_10_48_chunk
(1 row)
SELECT chunk_status,
chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'test2' ORDER BY chunk_name;
chunk_status | CHUNK_NAME
--------------+--------------------
1 | _hyper_10_48_chunk
(1 row)
--- insert into a compressed chunk again + a new chunk--
INSERT INTO test2 values ( '2020-01-03 11:01:03+00', 20, 11, '33row'),
( '2020-01-03 11:01:06+00', 20, 11, '36row'),
( '2020-01-03 11:02:00+00', 20, 12, '12row'),
( '2020-04-03 00:02:00+00', 30, 13, '3013row');
SELECT time_bucket(INTERVAL '2 hour', timec), b, count(*)
FROM test2
GROUP BY time_bucket(INTERVAL '2 hour', timec), b
ORDER BY 1, 2;
time_bucket | b | count
------------------------------+----+-------
Fri Jan 03 02:00:00 2020 PST | 11 | 28
Fri Jan 03 02:00:00 2020 PST | 12 | 2
Fri Jan 03 04:00:00 2020 PST | 11 | 2
Thu Apr 02 17:00:00 2020 PDT | 13 | 1
(4 rows)
--chunk status should be unordered for the previously compressed chunk
SELECT chunk_status,
chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'test2' ORDER BY chunk_name;
chunk_status | CHUNK_NAME
--------------+--------------------
3 | _hyper_10_48_chunk
0 | _hyper_10_51_chunk
(2 rows)
SELECT add_compression_policy AS job_id
FROM add_compression_policy('test2', '30d'::interval) \gset
CALL run_job(:job_id);
CALL run_job(:job_id);
-- status should be compressed ---
SELECT chunk_status,
chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'test2' ORDER BY chunk_name;
chunk_status | CHUNK_NAME
--------------+--------------------
1 | _hyper_10_48_chunk
1 | _hyper_10_51_chunk
(2 rows)
\set ON_ERROR_STOP 0
-- call recompress_chunk when status is not unordered
SELECT recompress_chunk(:'CHUNK_NAME'::regclass, true);
psql:include/recompress_basic.sql:95: NOTICE: chunk "_hyper_10_48_chunk" is already compressed
recompress_chunk
------------------
(1 row)
SELECT recompress_chunk(:'CHUNK_NAME'::regclass, false);
psql:include/recompress_basic.sql:96: ERROR: chunk "_hyper_10_48_chunk" is already compressed
--now decompress it , then try and recompress
SELECT decompress_chunk(:'CHUNK_NAME'::regclass);
decompress_chunk
------------------------------------------
_timescaledb_internal._hyper_10_48_chunk
(1 row)
SELECT recompress_chunk(:'CHUNK_NAME'::regclass);
psql:include/recompress_basic.sql:100: ERROR: call compress_chunk instead of recompress_chunk
\set ON_ERROR_STOP 1

View File

@ -499,6 +499,25 @@ INSERT INTO test_ordering VALUES (105),(104),(103);
-> Index Only Scan Backward using _hyper_11_17_chunk_test_ordering_time_idx on _hyper_11_17_chunk
(7 rows)
--insert into compressed + uncompressed chunk
INSERT INTO test_ordering VALUES (21), (22),(113);
SELECT count(*) FROM test_ordering;
count
-------
10
(1 row)
INSERT INTO test_ordering VALUES (106) RETURNING *;
time
------
106
(1 row)
-- insert into compressed chunk does not support RETURNING
\set ON_ERROR_STOP 0
INSERT INTO test_ordering VALUES (23), (24),(115) RETURNING *;
ERROR: insert with ON CONFLICT and RETURNING clause is not supported on compressed chunks
\set ON_ERROR_STOP 1
SELECT compress_chunk(format('%I.%I',chunk_schema,chunk_name), true) FROM timescaledb_information.chunks WHERE hypertable_name = 'test_ordering';
NOTICE: chunk "_hyper_11_15_chunk" is already compressed
compress_chunk

View File

@ -498,6 +498,25 @@ INSERT INTO test_ordering VALUES (105),(104),(103);
-> Index Only Scan Backward using _hyper_11_17_chunk_test_ordering_time_idx on _hyper_11_17_chunk
(7 rows)
--insert into compressed + uncompressed chunk
INSERT INTO test_ordering VALUES (21), (22),(113);
SELECT count(*) FROM test_ordering;
count
-------
10
(1 row)
INSERT INTO test_ordering VALUES (106) RETURNING *;
time
------
106
(1 row)
-- insert into compressed chunk does not support RETURNING
\set ON_ERROR_STOP 0
INSERT INTO test_ordering VALUES (23), (24),(115) RETURNING *;
ERROR: insert with ON CONFLICT and RETURNING clause is not supported on compressed chunks
\set ON_ERROR_STOP 1
SELECT compress_chunk(format('%I.%I',chunk_schema,chunk_name), true) FROM timescaledb_information.chunks WHERE hypertable_name = 'test_ordering';
NOTICE: chunk "_hyper_11_15_chunk" is already compressed
compress_chunk

View File

@ -498,6 +498,25 @@ INSERT INTO test_ordering VALUES (105),(104),(103);
-> Index Only Scan Backward using _hyper_11_17_chunk_test_ordering_time_idx on _hyper_11_17_chunk
(7 rows)
--insert into compressed + uncompressed chunk
INSERT INTO test_ordering VALUES (21), (22),(113);
SELECT count(*) FROM test_ordering;
count
-------
10
(1 row)
INSERT INTO test_ordering VALUES (106) RETURNING *;
time
------
106
(1 row)
-- insert into compressed chunk does not support RETURNING
\set ON_ERROR_STOP 0
INSERT INTO test_ordering VALUES (23), (24),(115) RETURNING *;
ERROR: insert with ON CONFLICT and RETURNING clause is not supported on compressed chunks
\set ON_ERROR_STOP 1
SELECT compress_chunk(format('%I.%I',chunk_schema,chunk_name), true) FROM timescaledb_information.chunks WHERE hypertable_name = 'test_ordering';
NOTICE: chunk "_hyper_11_15_chunk" is already compressed
compress_chunk

View File

@ -608,7 +608,7 @@ hypertable_id|attname |compression_algorithm_id|segmentby_column_index|orderby_c
(1 row)
-- insert data into compressed chunk
-- TEST insert data into compressed chunk
INSERT INTO compressed
SELECT '2019-08-01 01:00', 300, 300, 3, 'newcolv' ;
SELECT * from compressed where new_coli = 3;
@ -773,3 +773,110 @@ from chunk_compression_stats('test_table_int') where compression_status like 'Co
_dist_hyper_3_9_chunk | db_dist_compression_2 | 24576 | 16384
(4 rows)
--TEST8 insert into compressed chunks on dist. hypertable
CREATE TABLE test_recomp_int(time bigint, val int);
SELECT create_distributed_hypertable('test_recomp_int', 'time', chunk_time_interval => 20);
NOTICE: adding not-null constraint to column "time"
create_distributed_hypertable
-------------------------------
(4,public,test_recomp_int,t)
(1 row)
CREATE OR REPLACE FUNCTION dummy_now() RETURNS BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 100::BIGINT';
CALL distributed_exec($$
CREATE OR REPLACE FUNCTION dummy_now() RETURNS BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 100::BIGINT'
$$);
select set_integer_now_func('test_recomp_int', 'dummy_now');
set_integer_now_func
----------------------
(1 row)
insert into test_recomp_int select generate_series(1,5), 10;
alter table test_recomp_int set (timescaledb.compress);
CREATE VIEW test_recomp_int_chunk_status as
SELECT
c.table_name as chunk_name,
c.status as chunk_status
FROM _timescaledb_catalog.hypertable h, _timescaledb_catalog.chunk c
WHERE h.id = c.hypertable_id and h.table_name = 'test_recomp_int';
--compress chunks
SELECT compress_chunk(chunk)
FROM show_chunks('test_recomp_int') AS chunk
ORDER BY chunk;
compress_chunk
----------------------------------------------
_timescaledb_internal._dist_hyper_4_14_chunk
(1 row)
--check the status
SELECT * from test_recomp_int_chunk_status ORDER BY 1;
chunk_name | chunk_status
------------------------+--------------
_dist_hyper_4_14_chunk | 1
(1 row)
-- insert into compressed chunks of test_recomp_int (using same value for val)--
insert into test_recomp_int select 10, 10;
SELECT count(*) from test_recomp_int where val = 10;
count
-------
6
(1 row)
--chunk status should change ---
SELECT * from test_recomp_int_chunk_status ORDER BY 1;
chunk_name | chunk_status
------------------------+--------------
_dist_hyper_4_14_chunk | 3
(1 row)
SELECT
c.schema_name || '.' || c.table_name as "CHUNK_NAME"
FROM _timescaledb_catalog.hypertable h, _timescaledb_catalog.chunk c
WHERE h.id = c.hypertable_id and h.table_name = 'test_recomp_int' \gset
--call recompress_chunk directly on distributed chunk
SELECT recompress_chunk(:'CHUNK_NAME'::regclass);
recompress_chunk
----------------------------------------------
_timescaledb_internal._dist_hyper_4_14_chunk
(1 row)
--check chunk status now, should be compressed
SELECT * from test_recomp_int_chunk_status ORDER BY 1;
chunk_name | chunk_status
------------------------+--------------
_dist_hyper_4_14_chunk | 1
(1 row)
SELECT count(*) from test_recomp_int;
count
-------
6
(1 row)
--add a policy--
select add_compression_policy('test_recomp_int', 1::int) AS compressjob_id
\gset
--once again add data to the compressed chunk
insert into test_recomp_int select generate_series(5,7), 10;
SELECT * from test_recomp_int_chunk_status ORDER BY 1;
chunk_name | chunk_status
------------------------+--------------
_dist_hyper_4_14_chunk | 3
(1 row)
--run the compression policy job, it will recompress chunks that are unordered
CALL run_job(:compressjob_id);
SELECT count(*) from test_recomp_int;
count
-------
9
(1 row)
SELECT * from test_recomp_int_chunk_status ORDER BY 1;
chunk_name | chunk_status
------------------------+--------------
_dist_hyper_4_14_chunk | 1
(1 row)

View File

@ -141,3 +141,5 @@ SELECT COUNT(*) AS dropped_chunks_count
SELECT add_compression_policy AS job_id
FROM add_compression_policy('conditions', INTERVAL '1 day') \gset
CALL run_job(:job_id);
\i include/recompress_basic.sql

View File

@ -280,6 +280,15 @@ INSERT INTO test_ordering VALUES (105),(104),(103);
-- should be ordered append
:PREFIX SELECT * FROM test_ordering ORDER BY 1;
--insert into compressed + uncompressed chunk
INSERT INTO test_ordering VALUES (21), (22),(113);
SELECT count(*) FROM test_ordering;
INSERT INTO test_ordering VALUES (106) RETURNING *;
-- insert into compressed chunk does not support RETURNING
\set ON_ERROR_STOP 0
INSERT INTO test_ordering VALUES (23), (24),(115) RETURNING *;
\set ON_ERROR_STOP 1
SELECT compress_chunk(format('%I.%I',chunk_schema,chunk_name), true) FROM timescaledb_information.chunks WHERE hypertable_name = 'test_ordering';
-- should be ordered append

View File

@ -197,7 +197,7 @@ SELECT * FROM test.remote_exec( NULL,
hypertable_id = (SELECT id from _timescaledb_catalog.hypertable
WHERE table_name = 'compressed' ) ORDER BY attname; $$ );
-- insert data into compressed chunk
-- TEST insert data into compressed chunk
INSERT INTO compressed
SELECT '2019-08-01 01:00', 300, 300, 3, 'newcolv' ;
SELECT * from compressed where new_coli = 3;
@ -290,3 +290,60 @@ CALL run_job(:compressjob_id);
CALL run_job(:compressjob_id);
select chunk_name, node_name, before_compression_total_bytes, after_compression_total_bytes
from chunk_compression_stats('test_table_int') where compression_status like 'Compressed' order by chunk_name;
--TEST8 insert into compressed chunks on dist. hypertable
CREATE TABLE test_recomp_int(time bigint, val int);
SELECT create_distributed_hypertable('test_recomp_int', 'time', chunk_time_interval => 20);
CREATE OR REPLACE FUNCTION dummy_now() RETURNS BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 100::BIGINT';
CALL distributed_exec($$
CREATE OR REPLACE FUNCTION dummy_now() RETURNS BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 100::BIGINT'
$$);
select set_integer_now_func('test_recomp_int', 'dummy_now');
insert into test_recomp_int select generate_series(1,5), 10;
alter table test_recomp_int set (timescaledb.compress);
CREATE VIEW test_recomp_int_chunk_status as
SELECT
c.table_name as chunk_name,
c.status as chunk_status
FROM _timescaledb_catalog.hypertable h, _timescaledb_catalog.chunk c
WHERE h.id = c.hypertable_id and h.table_name = 'test_recomp_int';
--compress chunks
SELECT compress_chunk(chunk)
FROM show_chunks('test_recomp_int') AS chunk
ORDER BY chunk;
--check the status
SELECT * from test_recomp_int_chunk_status ORDER BY 1;
-- insert into compressed chunks of test_recomp_int (using same value for val)--
insert into test_recomp_int select 10, 10;
SELECT count(*) from test_recomp_int where val = 10;
--chunk status should change ---
SELECT * from test_recomp_int_chunk_status ORDER BY 1;
SELECT
c.schema_name || '.' || c.table_name as "CHUNK_NAME"
FROM _timescaledb_catalog.hypertable h, _timescaledb_catalog.chunk c
WHERE h.id = c.hypertable_id and h.table_name = 'test_recomp_int' \gset
--call recompress_chunk directly on distributed chunk
SELECT recompress_chunk(:'CHUNK_NAME'::regclass);
--check chunk status now, should be compressed
SELECT * from test_recomp_int_chunk_status ORDER BY 1;
SELECT count(*) from test_recomp_int;
--add a policy--
select add_compression_policy('test_recomp_int', 1::int) AS compressjob_id
\gset
--once again add data to the compressed chunk
insert into test_recomp_int select generate_series(5,7), 10;
SELECT * from test_recomp_int_chunk_status ORDER BY 1;
--run the compression policy job, it will recompress chunks that are unordered
CALL run_job(:compressjob_id);
SELECT count(*) from test_recomp_int;
SELECT * from test_recomp_int_chunk_status ORDER BY 1;

View File

@ -0,0 +1,101 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
CREATE OR REPLACE VIEW compressed_chunk_info_view AS
SELECT
h.schema_name AS hypertable_schema,
h.table_name AS hypertable_name,
c.schema_name as chunk_schema,
c.table_name as chunk_name,
c.status as chunk_status,
comp.schema_name as compressed_chunk_schema,
comp.table_name as compressed_chunk_name
FROM
_timescaledb_catalog.hypertable h JOIN
_timescaledb_catalog.chunk c ON h.id = c.hypertable_id
LEFT JOIN _timescaledb_catalog.chunk comp
ON comp.id = c.compressed_chunk_id
;
CREATE TABLE test2 (timec timestamptz NOT NULL, i integer ,
b bigint, t text);
SELECT table_name from create_hypertable('test2', 'timec', chunk_time_interval=> INTERVAL '7 days');
INSERT INTO test2 SELECT q, 10, 11, 'hello' FROM generate_series( '2020-01-03 10:00:00+00', '2020-01-03 12:00:00+00' , '5 min'::interval) q;
ALTER TABLE test2 set (timescaledb.compress,
timescaledb.compress_segmentby = 'b',
timescaledb.compress_orderby = 'timec DESC');
SELECT compress_chunk(c)
FROM show_chunks('test2') c;
---insert into the middle of the range ---
INSERT INTO test2 values ( '2020-01-03 10:01:00+00', 20, 11, '2row');
INSERT INTO test2 values ( '2020-01-03 11:01:00+00', 20, 11, '3row');
INSERT INTO test2 values ( '2020-01-03 12:01:00+00', 20, 11, '4row');
--- insert a new segment by ---
INSERT INTO test2 values ( '2020-01-03 11:01:00+00', 20, 12, '12row');
SELECT time_bucket(INTERVAL '2 hour', timec), b, count(*)
FROM test2
GROUP BY time_bucket(INTERVAL '2 hour', timec), b
ORDER BY 1, 2;
--check status for chunk --
SELECT chunk_status,
chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'test2' ORDER BY chunk_name;
SELECT compressed_chunk_schema || '.' || compressed_chunk_name as "COMP_CHUNK_NAME",
chunk_schema || '.' || chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'test2' \gset
SELECT count(*) from test2;
SELECT recompress_chunk(:'CHUNK_NAME'::regclass);
SELECT chunk_status,
chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'test2' ORDER BY chunk_name;
--- insert into a compressed chunk again + a new chunk--
INSERT INTO test2 values ( '2020-01-03 11:01:03+00', 20, 11, '33row'),
( '2020-01-03 11:01:06+00', 20, 11, '36row'),
( '2020-01-03 11:02:00+00', 20, 12, '12row'),
( '2020-04-03 00:02:00+00', 30, 13, '3013row');
SELECT time_bucket(INTERVAL '2 hour', timec), b, count(*)
FROM test2
GROUP BY time_bucket(INTERVAL '2 hour', timec), b
ORDER BY 1, 2;
--chunk status should be unordered for the previously compressed chunk
SELECT chunk_status,
chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'test2' ORDER BY chunk_name;
SELECT add_compression_policy AS job_id
FROM add_compression_policy('test2', '30d'::interval) \gset
CALL run_job(:job_id);
CALL run_job(:job_id);
-- status should be compressed ---
SELECT chunk_status,
chunk_name as "CHUNK_NAME"
FROM compressed_chunk_info_view
WHERE hypertable_name = 'test2' ORDER BY chunk_name;
\set ON_ERROR_STOP 0
-- call recompress_chunk when status is not unordered
SELECT recompress_chunk(:'CHUNK_NAME'::regclass, true);
SELECT recompress_chunk(:'CHUNK_NAME'::regclass, false);
--now decompress it , then try and recompress
SELECT decompress_chunk(:'CHUNK_NAME'::regclass);
SELECT recompress_chunk(:'CHUNK_NAME'::regclass);
\set ON_ERROR_STOP 1