Refactor process utility

This breaks up the handling of process utility statements into a clean
and efficient switch statement with separate functions for each
statement.
This commit is contained in:
Erik Nordström 2017-08-07 13:35:21 +02:00 committed by Erik Nordström
parent 69ac1a37d8
commit a1a28ecf6f

View File

@ -37,6 +37,123 @@ prev_ProcessUtility(Node *parsetree,
} }
} }
/* Truncate a hypertable */
static void
process_truncate(Node *parsetree)
{
TruncateStmt *truncatestmt = (TruncateStmt *) parsetree;
ListCell *cell;
foreach(cell, truncatestmt->relations)
{
Oid relId = RangeVarGetRelid(lfirst(cell), NoLock, true);
if (OidIsValid(relId))
{
Cache *hcache = hypertable_cache_pin();
Hypertable *hentry = hypertable_cache_get_entry(hcache, relId);
if (hentry != NULL)
{
executor_level_enter();
spi_hypertable_truncate(hentry);
executor_level_exit();
}
cache_release(hcache);
}
}
}
/* Change the schema of a hypertable */
static void
process_alterobjectschema(Node *parsetree)
{
AlterObjectSchemaStmt *alterstmt = (AlterObjectSchemaStmt *) parsetree;
Oid relId = RangeVarGetRelid(alterstmt->relation, NoLock, true);
if (OidIsValid(relId))
{
Cache *hcache = hypertable_cache_pin();
Hypertable *hentry = hypertable_cache_get_entry(hcache, relId);
if (hentry != NULL && alterstmt->objectType == OBJECT_TABLE)
{
executor_level_enter();
spi_hypertable_rename(hentry,
alterstmt->newschema,
NameStr(hentry->fd.table_name));
executor_level_exit();
}
cache_release(hcache);
}
}
/* Rename hypertable */
static void
process_rename(Node *parsetree)
{
RenameStmt *renamestmt = (RenameStmt *) parsetree;
Oid relid = RangeVarGetRelid(renamestmt->relation, NoLock, true);
if (OidIsValid(relid))
{
Cache *hcache = hypertable_cache_pin();
Hypertable *hentry = hypertable_cache_get_entry(hcache, relid);
if (hentry != NULL && renamestmt->renameType == OBJECT_TABLE)
{
executor_level_enter();
spi_hypertable_rename(hentry,
NameStr(hentry->fd.schema_name),
renamestmt->newname);
executor_level_exit();
}
cache_release(hcache);
}
}
static void
process_copy(Node *parsetree, const char *query_string, char *completion_tag)
{
CopyStmt *stmt = (CopyStmt *) parsetree;
/*
* Needed to add the appropriate number of tuples to the completion tag
*/
uint64 processed;
Hypertable *ht = NULL;
Cache *hcache = NULL;
executor_level_enter();
if (stmt->is_from)
{
Oid relid = RangeVarGetRelid(stmt->relation, NoLock, true);
if (OidIsValid(relid))
{
hcache = hypertable_cache_pin();
ht = hypertable_cache_get_entry(hcache, relid);
}
}
if (stmt->is_from && ht != NULL)
timescaledb_DoCopy(stmt, query_string, &processed, ht);
else
DoCopy(stmt, query_string, &processed);
executor_level_exit();
processed += executor_get_additional_tuples_processed();
if (completion_tag)
snprintf(completion_tag, COMPLETION_TAG_BUFSIZE,
"COPY " UINT64_FORMAT, processed);
if (NULL != hcache)
cache_release(hcache);
}
/* Hook-intercept for ProcessUtility. Used to set COPY completion tag and */ /* Hook-intercept for ProcessUtility. Used to set COPY completion tag and */
/* renaming of hypertables. */ /* renaming of hypertables. */
static void static void
@ -53,128 +170,22 @@ timescaledb_ProcessUtility(Node *parsetree,
return; return;
} }
/* Truncate a hypertable */ switch (nodeTag(parsetree))
if (IsA(parsetree, TruncateStmt))
{ {
TruncateStmt *truncatestmt = (TruncateStmt *) parsetree; case T_TruncateStmt:
ListCell *cell; process_truncate(parsetree);
break;
foreach(cell, truncatestmt->relations) case T_AlterObjectSchemaStmt:
{ process_alterobjectschema(parsetree);
Oid relId = RangeVarGetRelid(lfirst(cell), NoLock, true); break;
case T_RenameStmt:
if (OidIsValid(relId)) process_rename(parsetree);
{ break;
Cache *hcache = hypertable_cache_pin(); case T_CopyStmt:
Hypertable *hentry = hypertable_cache_get_entry(hcache, relId); process_copy(parsetree, query_string, completion_tag);
return;
if (hentry != NULL) default:
{ break;
executor_level_enter();
spi_hypertable_truncate(hentry);
executor_level_exit();
}
cache_release(hcache);
}
}
prev_ProcessUtility((Node *) truncatestmt, query_string, context, params, dest, completion_tag);
return;
}
/* Change the schema of hypertable */
if (IsA(parsetree, AlterObjectSchemaStmt))
{
AlterObjectSchemaStmt *alterstmt = (AlterObjectSchemaStmt *) parsetree;
Oid relId = RangeVarGetRelid(alterstmt->relation, NoLock, true);
if (OidIsValid(relId))
{
Cache *hcache = hypertable_cache_pin();
Hypertable *hentry = hypertable_cache_get_entry(hcache, relId);
if (hentry != NULL && alterstmt->objectType == OBJECT_TABLE)
{
executor_level_enter();
spi_hypertable_rename(hentry,
alterstmt->newschema,
NameStr(hentry->fd.table_name));
executor_level_exit();
}
cache_release(hcache);
}
prev_ProcessUtility((Node *) alterstmt, query_string, context, params, dest, completion_tag);
return;
}
/* Rename hypertable */
if (IsA(parsetree, RenameStmt))
{
RenameStmt *renamestmt = (RenameStmt *) parsetree;
Oid relid = RangeVarGetRelid(renamestmt->relation, NoLock, true);
if (OidIsValid(relid))
{
Cache *hcache = hypertable_cache_pin();
Hypertable *hentry = hypertable_cache_get_entry(hcache, relid);
if (hentry != NULL && renamestmt->renameType == OBJECT_TABLE)
{
executor_level_enter();
spi_hypertable_rename(hentry,
NameStr(hentry->fd.schema_name),
renamestmt->newname);
executor_level_exit();
}
cache_release(hcache);
}
prev_ProcessUtility((Node *) renamestmt, query_string, context, params, dest, completion_tag);
return;
}
if (IsA(parsetree, CopyStmt))
{
CopyStmt *stmt = (CopyStmt *) parsetree;
/*
* Needed to add the appropriate number of tuples to the completion
* tag
*/
uint64 processed;
Hypertable *ht = NULL;
Cache *hcache = NULL;
executor_level_enter();
if (stmt->is_from)
{
Oid relid = RangeVarGetRelid(stmt->relation, NoLock, true);
if (OidIsValid(relid))
{
hcache = hypertable_cache_pin();
ht = hypertable_cache_get_entry(hcache, relid);
}
}
if (stmt->is_from && ht != NULL)
timescaledb_DoCopy(stmt, query_string, &processed, ht);
else
DoCopy(stmt, query_string, &processed);
executor_level_exit();
processed += executor_get_additional_tuples_processed();
if (completion_tag)
snprintf(completion_tag, COMPLETION_TAG_BUFSIZE,
"COPY " UINT64_FORMAT, processed);
if (NULL != hcache)
cache_release(hcache);
return;
} }
prev_ProcessUtility(parsetree, query_string, context, params, dest, completion_tag); prev_ProcessUtility(parsetree, query_string, context, params, dest, completion_tag);