Add new chunk state CHUNK_STATUS_COMPRESSED_PARTIAL

A chunk is in this state when it is compressed but also has
uncompressed data in the uncompressed chunk. Individual tuples
can only ever exist in either area. This is preparation patch
to add support for uncompressed staging area for DML operations.
This commit is contained in:
Sven Klemm 2022-11-04 20:44:53 +01:00 committed by Sven Klemm
parent 5b0bff384b
commit 3059290bea
3 changed files with 38 additions and 7 deletions

View File

@ -68,18 +68,24 @@ BEGIN
-- Chunk names are in the internal catalog, but we only care about
-- the chunk name here.
-- status bits:
-- 1: compressed
-- 2: compressed unordered
-- 4: frozen
-- 8: compressed partial
chunk_name := parse_ident(chunk::text);
CASE status
WHEN 0 THEN
CASE
WHEN status = 0 THEN
RAISE EXCEPTION 'call compress_chunk instead of recompress_chunk';
WHEN 1 THEN
WHEN status = 1 THEN
IF if_not_compressed THEN
RAISE NOTICE 'nothing to recompress in chunk "%"', chunk_name[array_upper(chunk_name,1)];
RETURN;
ELSE
RAISE EXCEPTION 'nothing to recompress in chunk "%"', chunk_name[array_upper(chunk_name,1)];
END IF;
WHEN 3 THEN
WHEN status = 3 OR status = 9 OR status = 11 THEN
PERFORM @extschema@.decompress_chunk(chunk);
COMMIT;
-- SET LOCAL is only active until end of transaction.
@ -87,6 +93,8 @@ BEGIN
-- want to bleed out search_path to caller, so we do SET LOCAL
-- again after COMMIT
SET LOCAL search_path TO pg_catalog, pg_temp;
ELSE
RAISE EXCEPTION 'unexpected chunk status % in chunk "%"', status, chunk_name[array_upper(chunk_name,1)];
END CASE;
PERFORM @extschema@.compress_chunk(chunk, if_not_compressed);
END

View File

@ -175,6 +175,11 @@ static Chunk *chunk_resurrect(const Hypertable *ht, int chunk_id);
*
*/
#define CHUNK_STATUS_FROZEN 4
/*
* A chunk is in this state when it is compressed but also has uncompressed tuples
* in the uncompressed chunk.
*/
#define CHUNK_STATUS_COMPRESSED_PARTIAL 8
static HeapTuple
chunk_formdata_make_tuple(const FormData_chunk *fd, TupleDesc desc)
@ -3507,9 +3512,17 @@ ts_chunk_set_schema(Chunk *chunk, const char *newschema)
bool
ts_chunk_set_unordered(Chunk *chunk)
{
Assert(ts_chunk_is_compressed(chunk));
return ts_chunk_add_status(chunk, CHUNK_STATUS_COMPRESSED_UNORDERED);
}
bool
ts_chunk_set_partial(Chunk *chunk)
{
Assert(ts_chunk_is_compressed(chunk));
return ts_chunk_add_status(chunk, CHUNK_STATUS_COMPRESSED_PARTIAL);
}
/*No inserts,updates and deletes are permitted on a frozen chunk.
* Compression policies etc do not run on a frozen chunk.
* Only valid operation is dropping the chunk
@ -3605,7 +3618,8 @@ chunk_change_compressed_status_in_tuple(TupleInfo *ti, int32 compressed_chunk_id
form.compressed_chunk_id = INVALID_CHUNK_ID;
form.status =
ts_clear_flags_32(form.status,
CHUNK_STATUS_COMPRESSED | CHUNK_STATUS_COMPRESSED_UNORDERED);
CHUNK_STATUS_COMPRESSED | CHUNK_STATUS_COMPRESSED_UNORDERED |
CHUNK_STATUS_COMPRESSED_PARTIAL);
}
new_tuple = chunk_formdata_make_tuple(&form, ts_scanner_get_tupledesc(ti));
@ -4373,7 +4387,7 @@ ts_chunk_get_compression_status(int32 chunk_id)
return st;
}
/*Note that only a compressed chunk can have unordered flag set */
/* Note that only a compressed chunk can have unordered flag set */
bool
ts_chunk_is_unordered(const Chunk *chunk)
{
@ -4386,6 +4400,13 @@ ts_chunk_is_compressed(const Chunk *chunk)
return ts_flags_are_set_32(chunk->fd.status, CHUNK_STATUS_COMPRESSED);
}
/* Note that only a compressed chunk can have partial flag set */
bool
ts_chunk_is_partial(const Chunk *chunk)
{
return ts_flags_are_set_32(chunk->fd.status, CHUNK_STATUS_COMPRESSED_PARTIAL);
}
static const char *
get_chunk_operation_str(ChunkOperation cmd)
{

View File

@ -168,7 +168,7 @@ extern TSDLLEXPORT int32 ts_chunk_get_hypertable_id_by_relid(Oid relid);
extern TSDLLEXPORT int32 ts_chunk_get_compressed_chunk_id(int32 chunk_id);
extern bool ts_chunk_get_hypertable_id_and_status_by_relid(Oid relid, int32 *hypertable_id,
int32 *chunk_status);
extern Oid ts_chunk_get_relid(int32 chunk_id, bool missing_ok);
extern TSDLLEXPORT Oid ts_chunk_get_relid(int32 chunk_id, bool missing_ok);
extern Oid ts_chunk_get_schema_id(int32 chunk_id, bool missing_ok);
extern bool ts_chunk_get_id(const char *schema, const char *table, int32 *chunk_id,
bool missing_ok);
@ -187,6 +187,7 @@ extern TSDLLEXPORT List *ts_chunk_get_window(int32 dimension_id, int64 point, in
MemoryContext mctx);
extern void ts_chunks_rename_schema_name(char *old_schema, char *new_schema);
extern TSDLLEXPORT bool ts_chunk_set_partial(Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_set_unordered(Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_set_frozen(Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_unset_frozen(Chunk *chunk);
@ -202,6 +203,7 @@ ts_chunk_find_or_create_without_cuts(const Hypertable *ht, Hypercube *hc, const
const char *table_name, Oid chunk_table_relid, bool *created);
extern TSDLLEXPORT Chunk *ts_chunk_get_compressed_chunk_parent(const Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_is_unordered(const Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_is_partial(const Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_is_compressed(const Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_validate_chunk_status_for_operation(Oid chunk_relid,
int32 chunk_status,