mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-19 12:13:24 +08:00
Handle subtxn for cache pinning
This commit adds logic for cache pinning to handle subtxn. It also makes it easier to find cache pinning leaks. Finally, it fixes handling of cross-commit operations like VACUUM and CLUSTER. Previously, such operations incorrectly released that cache pin on the first commit even though the object was used after that.
This commit is contained in:
parent
26ef77fd25
commit
78d36b52d4
97
src/cache.c
97
src/cache.c
@ -25,6 +25,7 @@ cache_init(Cache *cache)
|
|||||||
cache->htab = hash_create(cache->name, cache->numelements,
|
cache->htab = hash_create(cache->name, cache->numelements,
|
||||||
&cache->hctl, cache->flags);
|
&cache->hctl, cache->flags);
|
||||||
cache->refcount = 1;
|
cache->refcount = 1;
|
||||||
|
cache->release_on_commit = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@ -155,33 +156,14 @@ cache_remove(Cache *cache, void *key)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Transaction end callback that cleans up any pinned caches. This is a
|
|
||||||
* safeguard that protects against indefinitely pinned caches (memory leaks)
|
|
||||||
* that may occur if a transaction ends (normally or abnormally) while a pin is
|
|
||||||
* held. Without this, a cache_pin() call always needs to be paired with a
|
|
||||||
* cache_release() call and wrapped in a PG_TRY() block to capture and handle
|
|
||||||
* any exceptions that occur.
|
|
||||||
*
|
|
||||||
* Note that this makes cache_release() optional, although timely
|
|
||||||
* cache_release() calls are still encouraged to release memory as early as
|
|
||||||
* possible during long-running transactions. PG_TRY() blocks are not needed,
|
|
||||||
* however.
|
|
||||||
*/
|
|
||||||
static void
|
static void
|
||||||
cache_xact_end(XactEvent event, void *arg)
|
release_all_pinned_caches()
|
||||||
{
|
{
|
||||||
switch (event)
|
|
||||||
{
|
|
||||||
case XACT_EVENT_COMMIT:
|
|
||||||
case XACT_EVENT_ABORT:
|
|
||||||
case XACT_EVENT_PARALLEL_ABORT:
|
|
||||||
{
|
|
||||||
ListCell *lc;
|
ListCell *lc;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* release once for every occurence of a cache in the pinned
|
* release once for every occurence of a cache in the pinned caches list.
|
||||||
* caches list
|
* On abort, release irrespective of cache->release_on_commit.
|
||||||
*/
|
*/
|
||||||
foreach(lc, pinned_caches)
|
foreach(lc, pinned_caches)
|
||||||
{
|
{
|
||||||
@ -190,23 +172,86 @@ cache_xact_end(XactEvent event, void *arg)
|
|||||||
cache->refcount--;
|
cache->refcount--;
|
||||||
cache_destroy(cache);
|
cache_destroy(cache);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
list_free(pinned_caches);
|
list_free(pinned_caches);
|
||||||
pinned_caches = NIL;
|
pinned_caches = NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Transaction end callback that cleans up any pinned caches. This is a
|
||||||
|
* safeguard that protects against indefinitely pinned caches (memory leaks)
|
||||||
|
* that may occur if a transaction ends (normally or abnormally) while a pin is
|
||||||
|
* held. Without this, a cache_pin() call always needs to be paired with a
|
||||||
|
* cache_release() call and wrapped in a PG_TRY() block to capture and handle
|
||||||
|
* any exceptions that occur.
|
||||||
|
*
|
||||||
|
* Note that this checks that cache_release() is always called by the end
|
||||||
|
* of a non-aborted transaction unless cache->release_on_commit is set to true.
|
||||||
|
* */
|
||||||
|
static void
|
||||||
|
cache_xact_end(XactEvent event, void *arg)
|
||||||
|
{
|
||||||
|
switch (event)
|
||||||
|
{
|
||||||
|
case XACT_EVENT_ABORT:
|
||||||
|
case XACT_EVENT_PARALLEL_ABORT:
|
||||||
|
release_all_pinned_caches();
|
||||||
|
default:
|
||||||
|
{
|
||||||
|
ListCell *lc;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Only caches left should be marked as non-released
|
||||||
|
*/
|
||||||
|
foreach(lc, pinned_caches)
|
||||||
|
{
|
||||||
|
Cache *cache = lfirst(lc);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This assert makes sure that that we don't have a cache
|
||||||
|
* leak when running with debugging
|
||||||
|
*/
|
||||||
|
Assert(!cache->release_on_commit);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This may still happen in optimized environments where
|
||||||
|
* Assert is turned off. In that case, release.
|
||||||
|
*/
|
||||||
|
if (cache->release_on_commit)
|
||||||
|
cache_release(cache);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
cache_subxact_abort(SubXactEvent event, SubTransactionId mySubid,
|
||||||
|
SubTransactionId parentSubid, void *arg)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Note that cache->release_on_commit is irrelevant here since can't have
|
||||||
|
* cross-commit operations in subtxns
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
* In subtxns, caches should have already been released, unless an abort
|
||||||
|
* happened
|
||||||
|
*/
|
||||||
|
Assert(SUBXACT_EVENT_ABORT_SUB == event || list_length(pinned_caches) == 0);
|
||||||
|
release_all_pinned_caches();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void
|
void
|
||||||
_cache_init(void)
|
_cache_init(void)
|
||||||
{
|
{
|
||||||
RegisterXactCallback(cache_xact_end, NULL);
|
RegisterXactCallback(cache_xact_end, NULL);
|
||||||
|
RegisterSubXactCallback(cache_subxact_abort, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
_cache_fini(void)
|
_cache_fini(void)
|
||||||
{
|
{
|
||||||
UnregisterXactCallback(cache_xact_end, NULL);
|
UnregisterXactCallback(cache_xact_end, NULL);
|
||||||
|
UnregisterSubXactCallback(cache_subxact_abort, NULL);
|
||||||
}
|
}
|
||||||
|
@ -31,6 +31,9 @@ typedef struct Cache
|
|||||||
void *(*create_entry) (struct Cache *, CacheQuery *);
|
void *(*create_entry) (struct Cache *, CacheQuery *);
|
||||||
void *(*update_entry) (struct Cache *, CacheQuery *);
|
void *(*update_entry) (struct Cache *, CacheQuery *);
|
||||||
void (*pre_destroy_hook) (struct Cache *);
|
void (*pre_destroy_hook) (struct Cache *);
|
||||||
|
bool release_on_commit; /* This should be false if doing
|
||||||
|
* cross-commit operations like
|
||||||
|
* CLUSTER or VACUUM */
|
||||||
} Cache;
|
} Cache;
|
||||||
|
|
||||||
extern void cache_init(Cache *cache);
|
extern void cache_init(Cache *cache);
|
||||||
|
@ -278,9 +278,14 @@ foreach_chunk_relid(Oid relid, process_chunk_t process_chunk, void *arg)
|
|||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
if (NULL == ht)
|
if (NULL == ht)
|
||||||
|
{
|
||||||
|
cache_release(hcache);
|
||||||
return -1;
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
hcache->release_on_commit = false;
|
||||||
ret = foreach_chunk(ht, process_chunk, arg);
|
ret = foreach_chunk(ht, process_chunk, arg);
|
||||||
|
hcache->release_on_commit = true;
|
||||||
|
|
||||||
cache_release(hcache);
|
cache_release(hcache);
|
||||||
|
|
||||||
@ -477,6 +482,8 @@ process_drop_index(DropStmt *stmt)
|
|||||||
*/
|
*/
|
||||||
chunk_index_delete(chunk, idxrelid, false);
|
chunk_index_delete(chunk, idxrelid, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cache_release(hcache);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1132,6 +1139,8 @@ process_cluster_start(Node *parsetree, ProcessUtilityContext context)
|
|||||||
chunk_indexes = chunk_index_get_mappings(ht, index_relid);
|
chunk_indexes = chunk_index_get_mappings(ht, index_relid);
|
||||||
MemoryContextSwitchTo(old);
|
MemoryContextSwitchTo(old);
|
||||||
|
|
||||||
|
hcache->release_on_commit = false;
|
||||||
|
|
||||||
/* Commit to get out of starting transaction */
|
/* Commit to get out of starting transaction */
|
||||||
PopActiveSnapshot();
|
PopActiveSnapshot();
|
||||||
CommitTransactionCommand();
|
CommitTransactionCommand();
|
||||||
@ -1158,6 +1167,8 @@ process_cluster_start(Node *parsetree, ProcessUtilityContext context)
|
|||||||
PopActiveSnapshot();
|
PopActiveSnapshot();
|
||||||
CommitTransactionCommand();
|
CommitTransactionCommand();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
hcache->release_on_commit = true;
|
||||||
/* Start a new transaction for the cleanup work. */
|
/* Start a new transaction for the cleanup work. */
|
||||||
StartTransactionCommand();
|
StartTransactionCommand();
|
||||||
|
|
||||||
@ -1166,7 +1177,6 @@ process_cluster_start(Node *parsetree, ProcessUtilityContext context)
|
|||||||
}
|
}
|
||||||
|
|
||||||
cache_release(hcache);
|
cache_release(hcache);
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user