Handle TRUNCATE without upcall and handle ONLY modifier

This change refactors the handling of TRUNCATE so
that it is performed directly in process utility without
doing an upcall to PL/pgSQL.

It also adds handling for the ONLY modifier to TRUNCATE,
which shouldn't work on a hypertable. TRUNCATE now generates
an error if TRUNCATE ONLY is used on a hypertable.
This commit is contained in:
Erik Nordström 2018-01-31 00:25:59 +01:00 committed by Erik Nordström
parent b7ebe06f2e
commit 6adce4cbd8
9 changed files with 130 additions and 92 deletions

View File

@ -112,26 +112,6 @@ BEGIN
END
$BODY$;
CREATE OR REPLACE FUNCTION _timescaledb_internal.truncate_hypertable(
schema_name NAME,
table_name NAME,
cascade BOOLEAN = FALSE
)
RETURNS VOID
LANGUAGE PLPGSQL VOLATILE
SET search_path = '_timescaledb_internal'
AS
$BODY$
DECLARE
hypertable_row _timescaledb_catalog.hypertable;
chunk_row _timescaledb_catalog.chunk;
BEGIN
--TODO: should this cascade?
PERFORM _timescaledb_internal.drop_chunks_impl(NULL, table_name, schema_name, cascade, true);
END
$BODY$;
--documentation of these function located in chunk_index.h
CREATE OR REPLACE FUNCTION _timescaledb_internal.chunk_index_clone(chunk_index_oid OID) RETURNS OID
AS '@MODULE_PATHNAME@', 'chunk_index_clone' LANGUAGE C VOLATILE STRICT;

View File

@ -39,6 +39,7 @@ DROP FUNCTION _timescaledb_internal.verify_hypertable_indexes(regclass);
DROP FUNCTION _timescaledb_internal.validate_triggers(regclass);
DROP FUNCTION _timescaledb_internal.chunk_create_table(int, name);
DROP FUNCTION _timescaledb_internal.ddl_change_owner(oid, name);
DROP FUNCTION _timescaledb_internal.truncate_hypertable(name,name,boolean);
-- Remove redundant index
DROP INDEX _timescaledb_catalog.dimension_slice_dimension_id_range_start_range_end_idx;

View File

@ -110,10 +110,6 @@ const static InternalFunctionDef internal_function_definitions[_MAX_INTERNAL_FUN
[DDL_ADD_CHUNK_CONSTRAINT] = {
.name = "chunk_constraint_add_table_constraint",
.args = 1,
},
[TRUNCATE_HYPERTABLE] = {
.name = "truncate_hypertable",
.args = 3
}
};

View File

@ -47,7 +47,6 @@ typedef enum CatalogTable
typedef enum InternalFunction
{
DDL_ADD_CHUNK_CONSTRAINT,
TRUNCATE_HYPERTABLE,
_MAX_INTERNAL_FUNCTIONS,
} InternalFunction;

View File

@ -22,6 +22,7 @@
#include <utils/lsyscache.h>
#include <utils/syscache.h>
#include <utils/builtins.h>
#include <utils/guc.h>
#include <utils/snapmgr.h>
#include <parser/parse_utilcmd.h>
@ -50,14 +51,6 @@ static ProcessUtility_hook_type prev_ProcessUtility_hook;
static bool expect_chunk_modification = false;
/* Macros for DDL upcalls to PL/pgSQL */
#define process_truncate_hypertable(hypertable, cascade) \
CatalogInternalCall3(TRUNCATE_HYPERTABLE, \
NameGetDatum(&(hypertable)->fd.schema_name), \
NameGetDatum(&(hypertable)->fd.table_name), \
BoolGetDatum(cascade))
typedef struct ProcessUtilityArgs
{
#if PG10
@ -147,37 +140,6 @@ relation_not_only(RangeVar *rv)
errmsg("ONLY option not supported on hypertable operations")));
}
/* Truncate a hypertable */
static void
process_truncate(Node *parsetree)
{
TruncateStmt *truncatestmt = (TruncateStmt *) parsetree;
Cache *hcache = hypertable_cache_pin();
ListCell *cell;
foreach(cell, truncatestmt->relations)
{
RangeVar *relation = lfirst(cell);
Oid relid;
if (NULL == relation)
continue;
relid = RangeVarGetRelid(relation, NoLock, true);
if (OidIsValid(relid))
{
Hypertable *ht = hypertable_cache_get_entry(hcache, relid);
if (ht != NULL)
{
process_truncate_hypertable(ht, truncatestmt->behavior == DROP_CASCADE);
}
}
}
cache_release(hcache);
}
/* Change the schema of a hypertable */
static void
process_alterobjectschema(Node *parsetree)
@ -358,6 +320,77 @@ process_vacuum(Node *parsetree, ProcessUtilityContext context)
return true;
}
static void
process_truncate_chunk(Hypertable *ht, Oid chunk_relid, void *arg)
{
TruncateStmt *stmt = arg;
ObjectAddress objaddr = {
.classId = RelationRelationId,
.objectId = chunk_relid,
};
performDeletion(&objaddr, stmt->behavior, 0);
}
#if PG10
#define TRUNCATE_RECURSE(rv) \
(rv)->inh
#elif PG96
#define TRUNCATE_RECURSE(rv) \
((rv)->inhOpt == INH_DEFAULT ? SQL_inheritance : ((rv)->inhOpt == INH_YES))
#endif
/*
* Truncate a hypertable.
*/
static bool
process_truncate(ProcessUtilityArgs *args)
{
TruncateStmt *stmt = (TruncateStmt *) args->parsetree;
Cache *hcache = hypertable_cache_pin();
ListCell *cell;
/* Call standard process utility first to truncate all tables */
prev_ProcessUtility(args);
/* For all hypertables, we drop the now empty chunks */
foreach(cell, stmt->relations)
{
RangeVar *rv = lfirst(cell);
Oid relid;
if (NULL == rv)
continue;
relid = RangeVarGetRelid(rv, NoLock, true);
if (OidIsValid(relid))
{
Hypertable *ht = hypertable_cache_get_entry(hcache, relid);
if (ht != NULL)
{
if (!TRUNCATE_RECURSE(rv))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot truncate only a hypertable"),
errhint("Do not specify the ONLY keyword, or use truncate"
" only on the chunks directly.")));
/* Delete the metadata */
chunk_delete_by_hypertable_id(ht->fd.id);
/* Drop the chunk tables */
foreach_chunk(ht, process_truncate_chunk, stmt);
}
}
}
cache_release(hcache);
return true;
}
static void
process_drop_table_chunk(Hypertable *ht, Oid chunk_relid, void *arg)
{
@ -1628,32 +1661,29 @@ process_create_trigger_end(Node *parsetree)
* Handle DDL commands before they have been processed by PostgreSQL.
*/
static bool
process_ddl_command_start(Node *parsetree,
const char *query_string,
ProcessUtilityContext context,
char *completion_tag)
process_ddl_command_start(ProcessUtilityArgs *args)
{
bool handled = false;
switch (nodeTag(parsetree))
switch (nodeTag(args->parsetree))
{
case T_TruncateStmt:
process_truncate(parsetree);
break;
case T_AlterObjectSchemaStmt:
process_alterobjectschema(parsetree);
process_alterobjectschema(args->parsetree);
break;
case T_TruncateStmt:
handled = process_truncate(args);
break;
case T_AlterTableStmt:
process_altertable_start(parsetree);
process_altertable_start(args->parsetree);
break;
case T_RenameStmt:
process_rename(parsetree);
process_rename(args->parsetree);
break;
case T_IndexStmt:
process_index_start(parsetree);
process_index_start(args->parsetree);
break;
case T_CreateTrigStmt:
process_create_trigger_start(parsetree);
process_create_trigger_start(args->parsetree);
break;
case T_DropStmt:
@ -1663,19 +1693,19 @@ process_ddl_command_start(Node *parsetree,
* table is dropped, the drop respects CASCADE in the expected
* way.
*/
process_drop(parsetree);
process_drop(args->parsetree);
break;
case T_CopyStmt:
handled = process_copy(parsetree, query_string, completion_tag);
handled = process_copy(args->parsetree, args->query_string, args->completion_tag);
break;
case T_VacuumStmt:
handled = process_vacuum(parsetree, context);
handled = process_vacuum(args->parsetree, args->context);
break;
case T_ReindexStmt:
handled = process_reindex(parsetree);
handled = process_reindex(args->parsetree);
break;
case T_ClusterStmt:
handled = process_cluster_start(parsetree, context);
handled = process_cluster_start(args->parsetree, args->context);
break;
default:
break;
@ -1762,10 +1792,7 @@ timescaledb_ddl_command_start(
return;
}
if (!process_ddl_command_start(args.parsetree,
args.query_string,
args.context,
args.completion_tag))
if (!process_ddl_command_start(&args))
prev_ProcessUtility(&args);
}

View File

@ -49,7 +49,7 @@ SELECT count(*)
AND refobjid = (SELECT oid FROM pg_extension WHERE extname = 'timescaledb');
count
-------
95
94
(1 row)
SELECT * FROM test.show_columns('public."two_Partitions"');
@ -235,7 +235,7 @@ SELECT count(*)
AND refobjid = (SELECT oid FROM pg_extension WHERE extname = 'timescaledb');
count
-------
95
94
(1 row)
--main table and chunk schemas should be the same

View File

@ -133,9 +133,29 @@ CREATE TRIGGER _test_truncate_after
TRUNCATE "two_Partitions";
WARNING: FIRING trigger when: BEFORE level: STATEMENT op: TRUNCATE cnt: <NULL> trigger_name _test_truncate_before
WARNING: FIRING trigger when: AFTER level: STATEMENT op: TRUNCATE cnt: <NULL> trigger_name _test_truncate_after
ERROR: cannot drop table _hyper_1_5_chunk because other objects depend on it
ERROR: cannot drop table _timescaledb_internal._hyper_1_5_chunk because other objects depend on it
-- cannot TRUNCATE ONLY a hypertable
TRUNCATE ONLY "two_Partitions" CASCADE;
ERROR: cannot truncate only a hypertable
\set ON_ERROR_STOP 1
TRUNCATE "two_Partitions" CASCADE;
-- create a regular table to make sure we can truncate it in the same call
CREATE TABLE truncate_normal (color int);
INSERT INTO truncate_normal VALUES (1);
SELECT * FROM truncate_normal;
color
-------
1
(1 row)
SELECT * FROM test.show_subtables('"two_Partitions"');
Child | Tablespace
----------------------------------------+------------
_timescaledb_internal._hyper_1_5_chunk |
_timescaledb_internal._hyper_1_6_chunk |
_timescaledb_internal._hyper_1_7_chunk |
(3 rows)
TRUNCATE "two_Partitions", truncate_normal CASCADE;
WARNING: FIRING trigger when: BEFORE level: STATEMENT op: TRUNCATE cnt: <NULL> trigger_name _test_truncate_before
WARNING: FIRING trigger when: AFTER level: STATEMENT op: TRUNCATE cnt: <NULL> trigger_name _test_truncate_after
-- should be empty
@ -149,3 +169,8 @@ SELECT * FROM "two_Partitions";
------------+-----------+----------+----------+----------+-------------
(0 rows)
SELECT * FROM truncate_normal;
color
-------
(0 rows)

View File

@ -42,7 +42,7 @@ set(TEST_FILES
tablespace.sql
timestamp.sql
triggers.sql
truncate_hypertable.sql
truncate.sql
update.sql
upsert.sql
util.sql

View File

@ -53,9 +53,19 @@ CREATE TRIGGER _test_truncate_after
\set ON_ERROR_STOP 0
TRUNCATE "two_Partitions";
-- cannot TRUNCATE ONLY a hypertable
TRUNCATE ONLY "two_Partitions" CASCADE;
\set ON_ERROR_STOP 1
TRUNCATE "two_Partitions" CASCADE;
-- create a regular table to make sure we can truncate it in the same call
CREATE TABLE truncate_normal (color int);
INSERT INTO truncate_normal VALUES (1);
SELECT * FROM truncate_normal;
SELECT * FROM test.show_subtables('"two_Partitions"');
TRUNCATE "two_Partitions", truncate_normal CASCADE;
-- should be empty
SELECT * FROM test.show_subtables('"two_Partitions"');
SELECT * FROM "two_Partitions";
SELECT * FROM truncate_normal;