Lock dimension slices when creating new chunk

This change makes two changes to address issues with processes doing
concurrent inserts and `drop_chunks` calls:

- When a new chunk is created, any dimension slices that existed prior
  to creating the new chunk are locked to prevent them from being
  dropped before the chunk-creating process commits.

- When a chunk is being dropped, concurrent inserts into the chunk
  that is being dropped will try to lock the dimension slices of the
  chunk. In case the locking fails (due to the slices being
  concurrently deleted), the insert process will treat the chunk as
  not existing and will instead recreate it. Previously, the chunk
  slices (and thus chunk) would be found, but the insert would fail
  when committing since the chunk was concurrently deleted.

A prior commit (PR #2150) partially solved a related problem, but
didn't lock all the slices of a chunk. That commit also threw an error
when a lock on a slice could not be taken due to the slice being
deleted by another transaction. This is now changed to treat that case
as a missing slice instead, causing it to be recreated.

Fixes #1986
This commit is contained in:
Erik Nordström 2020-10-08 12:51:05 +02:00 committed by Erik Nordström
parent 5564ad8c6c
commit 2cc2df23bd
9 changed files with 227 additions and 51 deletions

View File

@ -113,7 +113,7 @@ static int chunk_scan_ctx_foreach_chunk_stub(ChunkScanCtx *ctx, on_chunk_stub_fu
uint16 limit);
static Datum chunks_return_srf(FunctionCallInfo fcinfo);
static int chunk_cmp(const void *ch1, const void *ch2);
static Chunk *chunk_find(Hypertable *ht, Point *p, bool resurrect);
static Chunk *chunk_find(Hypertable *ht, Point *p, bool resurrect, bool lock_slices);
static void init_scan_by_qualified_table_name(ScanIterator *iterator, const char *schema_name,
const char *table_name);
static Hypertable *find_hypertable_from_table_or_cagg(Cache *hcache, Oid relid);
@ -1075,6 +1075,14 @@ ts_chunk_find_or_create_without_cuts(Hypertable *ht, Hypercube *hc, const char *
if (NULL == stub)
{
ScanTupLock tuplock = {
.lockmode = LockTupleKeyShare,
.waitpolicy = LockWaitBlock,
};
/* Lock all slices that already exist to ensure they remain when we
* commit since we won't create those slices ourselves. */
ts_hypercube_find_existing_slices(hc, &tuplock);
chunk = chunk_create_from_hypercube_after_lock(ht, hc, schema_name, table_name, NULL);
if (NULL != created)
@ -1118,8 +1126,10 @@ ts_chunk_create_from_point(Hypertable *ht, Point *p, const char *schema, const c
*/
LockRelationOid(ht->main_table_relid, ShareUpdateExclusiveLock);
/* Recheck if someone else created the chunk before we got the table lock */
chunk = chunk_find(ht, p, true);
/* Recheck if someone else created the chunk before we got the table
* lock. The returned chunk will have all slices locked so that they
* aren't removed. */
chunk = chunk_find(ht, p, true, true);
if (NULL == chunk)
{
@ -1366,7 +1376,7 @@ dimension_slice_and_chunk_constraint_join(ChunkScanCtx *scanctx, DimensionVec *v
* to two chunks.
*/
static void
chunk_point_scan(ChunkScanCtx *scanctx, Point *p)
chunk_point_scan(ChunkScanCtx *scanctx, Point *p, bool lock_slices)
{
int i;
@ -1374,11 +1384,15 @@ chunk_point_scan(ChunkScanCtx *scanctx, Point *p)
for (i = 0; i < scanctx->space->num_dimensions; i++)
{
DimensionVec *vec;
ScanTupLock tuplock = {
.lockmode = LockTupleKeyShare,
.waitpolicy = LockWaitBlock,
};
vec = ts_dimension_slice_scan_limit(scanctx->space->dimensions[i].fd.id,
p->coordinates[i],
0,
NULL);
lock_slices ? &tuplock : NULL);
dimension_slice_and_chunk_constraint_join(scanctx, vec);
}
@ -1610,7 +1624,7 @@ chunk_resurrect(Hypertable *ht, ChunkStub *stub)
* case it needs to live beyond the lifetime of the other data.
*/
static Chunk *
chunk_find(Hypertable *ht, Point *p, bool resurrect)
chunk_find(Hypertable *ht, Point *p, bool resurrect, bool lock_slices)
{
ChunkStub *stub;
Chunk *chunk = NULL;
@ -1623,7 +1637,7 @@ chunk_find(Hypertable *ht, Point *p, bool resurrect)
ctx.early_abort = true;
/* Scan for the chunk matching the point */
chunk_point_scan(&ctx, p);
chunk_point_scan(&ctx, p, lock_slices);
/* Find the stub that has N matching dimension constraints */
stub = chunk_scan_ctx_get_chunk_stub(&ctx);
@ -1657,9 +1671,9 @@ chunk_find(Hypertable *ht, Point *p, bool resurrect)
}
Chunk *
ts_chunk_find(Hypertable *ht, Point *p)
ts_chunk_find(Hypertable *ht, Point *p, bool lock_slices)
{
return chunk_find(ht, p, false);
return chunk_find(ht, p, false, lock_slices);
}
/*
@ -3257,6 +3271,8 @@ ts_chunk_do_drop_chunks(Hypertable *ht, int64 older_than, int64 newer_than, int3
if (affected_data_nodes)
*affected_data_nodes = data_nodes;
DEBUG_WAITPOINT("drop_chunks_end");
return dropped_chunk_names;
}

View File

@ -110,7 +110,7 @@ extern Chunk *ts_chunk_create_from_point(Hypertable *ht, Point *p, const char *s
extern TSDLLEXPORT Chunk *ts_chunk_create_base(int32 id, int16 num_constraints, const char relkind);
extern TSDLLEXPORT ChunkStub *ts_chunk_stub_create(int32 id, int16 num_constraints);
extern Chunk *ts_chunk_find(Hypertable *ht, Point *p);
extern Chunk *ts_chunk_find(Hypertable *ht, Point *p, bool lock_slices);
extern Chunk **ts_chunk_find_all(Hypertable *ht, List *dimension_vecs, LOCKMODE lockmode,
unsigned int *num_chunks);
extern List *ts_chunk_find_all_oids(Hypertable *ht, List *dimension_vecs, LOCKMODE lockmode);

View File

@ -112,6 +112,12 @@ lock_result_ok_or_abort(TupleInfo *ti, DimensionSlice *slice)
#if PG12_GE
case TM_Deleted:
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("dimension slice %d deleted by other transaction", slice->fd.id),
errhint("Retry the operation again.")));
pg_unreachable();
break;
#endif
case TM_Updated:
ereport(ERROR,
@ -146,9 +152,26 @@ static ScanTupleResult
dimension_vec_tuple_found(TupleInfo *ti, void *data)
{
DimensionVec **slices = data;
DimensionSlice *slice = dimension_slice_from_slot(ti->slot);
DimensionSlice *slice;
lock_result_ok_or_abort(ti, slice);
switch (ti->lockresult)
{
case TM_SelfModified:
case TM_Ok:
break;
#if PG12_GE
case TM_Deleted:
#endif
case TM_Updated:
/* Treat as not found */
return SCAN_CONTINUE;
default:
elog(ERROR, "unexpected tuple lock status: %d", ti->lockresult);
pg_unreachable();
break;
}
slice = dimension_slice_from_slot(ti->slot);
*slices = ts_dimension_vec_add_slice(slices, slice);
return SCAN_CONTINUE;
@ -524,14 +547,32 @@ ts_dimension_slice_delete_by_id(int32 dimension_slice_id, bool delete_constraint
static ScanTupleResult
dimension_slice_fill(TupleInfo *ti, void *data)
{
DimensionSlice **slice = data;
bool should_free;
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
switch (ti->lockresult)
{
case TM_SelfModified:
case TM_Ok:
{
DimensionSlice **slice = data;
bool should_free;
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
memcpy(&(*slice)->fd, GETSTRUCT(tuple), sizeof(FormData_dimension_slice));
memcpy(&(*slice)->fd, GETSTRUCT(tuple), sizeof(FormData_dimension_slice));
if (should_free)
heap_freetuple(tuple);
if (should_free)
heap_freetuple(tuple);
break;
}
#if PG12_GE
case TM_Deleted:
#endif
case TM_Updated:
/* Same as not found */
break;
default:
elog(ERROR, "unexpected tuple lock status: %d", ti->lockresult);
pg_unreachable();
break;
}
return SCAN_DONE;
}
@ -545,7 +586,7 @@ dimension_slice_fill(TupleInfo *ti, void *data)
* otherwise.
*/
bool
ts_dimension_slice_scan_for_existing(DimensionSlice *slice)
ts_dimension_slice_scan_for_existing(DimensionSlice *slice, ScanTupLock *tuplock)
{
ScanKeyData scankey[3];
@ -573,7 +614,7 @@ ts_dimension_slice_scan_for_existing(DimensionSlice *slice)
&slice,
1,
AccessShareLock,
NULL,
tuplock,
CurrentMemoryContext);
}
@ -747,7 +788,11 @@ dimension_slice_insert_relation(Relation rel, DimensionSlice *slice)
/*
* Insert slices into the catalog.
*
* Only slices that don't already exists in the catalog will be inserted.
* Only slices that don't already exist in the catalog will be inserted. Note
* that all slices that already exist (i.e., have a valid ID) MUST be locked
* with a tuple lock (e.g., FOR KEY SHARE) prior to calling this function
* since they won't be created. Otherwise it is not possible to guarantee that
* all slices still exist once the transaction commits.
*
* Returns the number of slices inserted.
*/
@ -762,9 +807,6 @@ ts_dimension_slice_insert_multi(DimensionSlice **slices, Size num_slices)
for (i = 0; i < num_slices; i++)
{
slices[i]->fd.id = 0;
ts_dimension_slice_scan_for_existing(slices[i]);
if (slices[i]->fd.id == 0)
{
dimension_slice_insert_relation(rel, slices[i]);

View File

@ -50,7 +50,7 @@ ts_dimension_slice_scan_range_limit(int32 dimension_id, StrategyNumber start_str
int limit, ScanTupLock *tuplock);
extern DimensionVec *ts_dimension_slice_collision_scan_limit(int32 dimension_id, int64 range_start,
int64 range_end, int limit);
extern bool ts_dimension_slice_scan_for_existing(DimensionSlice *slice);
extern bool ts_dimension_slice_scan_for_existing(DimensionSlice *slice, ScanTupLock *tuplock);
extern DimensionSlice *ts_dimension_slice_scan_by_id_and_lock(int32 dimension_slice_id,
ScanTupLock *tuplock,
MemoryContext mctx);

View File

@ -211,6 +211,35 @@ ts_hypercube_from_constraints(ChunkConstraints *constraints, MemoryContext mctx)
return hc;
}
/*
* Find slices in the hypercube that already exists in metadata.
*
* If a slice exists in metadata, the slice ID will be filled in on the
* existing slice in the hypercube. Optionally, also lock the slice when
* found.
*/
int
ts_hypercube_find_existing_slices(Hypercube *cube, ScanTupLock *tuplock)
{
int i;
int num_found = 0;
for (i = 0; i < cube->num_slices; i++)
{
/*
* Check if there's already an existing slice with the calculated
* range. If a slice already exists, use that slice's ID instead
* of a new one.
*/
bool found = ts_dimension_slice_scan_for_existing(cube->slices[i], tuplock);
if (found)
num_found++;
}
return num_found;
}
/*
* Calculate the hypercube that encloses the given point.
*
@ -278,11 +307,8 @@ ts_hypercube_calculate_from_point(Hyperspace *hs, Point *p, ScanTupLock *tuplock
* Check if there's already an existing slice with the calculated
* range. If a slice already exists, use that slice's ID instead
* of a new one.
*
* The tuples are already locked in
* `chunk_create_from_point_after_lock`, so nothing to do here.
*/
ts_dimension_slice_scan_for_existing(cube->slices[i]);
ts_dimension_slice_scan_for_existing(cube->slices[i], tuplock);
}
}

View File

@ -30,6 +30,7 @@ extern TSDLLEXPORT Hypercube *ts_hypercube_alloc(int16 num_dimensions);
extern void ts_hypercube_free(Hypercube *hc);
extern TSDLLEXPORT void ts_hypercube_add_slice(Hypercube *hc, DimensionSlice *slice);
extern Hypercube *ts_hypercube_from_constraints(ChunkConstraints *constraints, MemoryContext mctx);
extern int ts_hypercube_find_existing_slices(Hypercube *cube, ScanTupLock *tuplock);
extern Hypercube *ts_hypercube_calculate_from_point(Hyperspace *hs, Point *p, ScanTupLock *tuplock);
extern bool ts_hypercubes_collide(Hypercube *cube1, Hypercube *cube2);
extern TSDLLEXPORT DimensionSlice *ts_hypercube_get_slice_by_dimension_id(Hypercube *hc,

View File

@ -1108,7 +1108,7 @@ hypertable_chunk_store_add(Hypertable *h, Chunk *chunk)
}
static inline Chunk *
hypertable_get_chunk(Hypertable *h, Point *point, bool create_if_not_exists)
hypertable_get_chunk(Hypertable *h, Point *point, bool create_if_not_exists, bool lock_chunk_slices)
{
Chunk *chunk;
ChunkStoreEntry *cse = ts_subspace_store_get(h->chunk_cache, point);
@ -1124,7 +1124,7 @@ hypertable_get_chunk(Hypertable *h, Point *point, bool create_if_not_exists)
* allocates a lot of transient data. We don't want this allocated on
* the cache's memory context.
*/
chunk = ts_chunk_find(h, point);
chunk = ts_chunk_find(h, point, lock_chunk_slices);
if (NULL == chunk)
{
@ -1149,14 +1149,16 @@ hypertable_get_chunk(Hypertable *h, Point *point, bool create_if_not_exists)
Chunk *
ts_hypertable_find_chunk_if_exists(Hypertable *h, Point *point)
{
return hypertable_get_chunk(h, point, false);
return hypertable_get_chunk(h, point, false, false);
}
/* gets the chunk for a given point, creating it if it does not exist */
/* gets the chunk for a given point, creating it if it does not exist. If an
* existing chunk exists, all its dimension slices will be locked in FOR KEY
* SHARE mode. */
Chunk *
ts_hypertable_get_or_create_chunk(Hypertable *h, Point *point)
{
return hypertable_get_chunk(h, point, true);
return hypertable_get_chunk(h, point, true, true);
}
bool

View File

@ -1,22 +1,90 @@
Parsed test spec with 3 sessions
Parsed test spec with 5 sessions
starting permutation: s1a s2a s3a s3b
starting permutation: s3_chunks_found_wait s1_drop_chunks s2_drop_chunks s3_chunks_found_release s3_show_missing_slices s3_show_num_chunks s3_show_data
step s3_chunks_found_wait: SELECT debug_waitpoint_enable('drop_chunks_chunks_found');
debug_waitpoint_enable
step s1a: SELECT COUNT(*) FROM drop_chunks('dropchunks_race_t1', TIMESTAMPTZ '2020-03-01'); <waiting ...>
step s2a: SELECT COUNT(*) FROM drop_chunks('dropchunks_race_t1', TIMESTAMPTZ '2020-03-01'); <waiting ...>
step s3a: SELECT debug_waitpoint_release('drop_chunks_chunks_found');
step s1_drop_chunks: SELECT count(*) FROM drop_chunks('dropchunks_race_t1', TIMESTAMPTZ '2020-03-01'); <waiting ...>
step s2_drop_chunks: SELECT count(*) FROM drop_chunks('dropchunks_race_t1', TIMESTAMPTZ '2020-03-01'); <waiting ...>
step s3_chunks_found_release: SELECT debug_waitpoint_release('drop_chunks_chunks_found');
debug_waitpoint_release
step s1a: <... completed>
step s1_drop_chunks: <... completed>
count
1
step s2a: <... completed>
error in steps s3a s1a s2a: ERROR: some chunks could not be read since they are being concurrently updated
step s3b: SELECT COUNT(*) FROM _timescaledb_catalog.chunk_constraint WHERE dimension_slice_id NOT IN (SELECT id FROM _timescaledb_catalog.dimension_slice);
step s2_drop_chunks: <... completed>
count
0
step s3_show_missing_slices: SELECT count(*) FROM _timescaledb_catalog.chunk_constraint WHERE dimension_slice_id NOT IN (SELECT id FROM _timescaledb_catalog.dimension_slice);
count
0
step s3_show_num_chunks: SELECT count(*) FROM show_chunks('dropchunks_race_t1') ORDER BY 1;
count
0
step s3_show_data: SELECT * FROM dropchunks_race_t1 ORDER BY 1;
time device temp
starting permutation: s4_chunks_dropped_wait s1_drop_chunks s5_insert_new_chunk s4_chunks_dropped_release s3_show_missing_slices s3_show_num_chunks s3_show_data
step s4_chunks_dropped_wait: SELECT debug_waitpoint_enable('drop_chunks_end');
debug_waitpoint_enable
step s1_drop_chunks: SELECT count(*) FROM drop_chunks('dropchunks_race_t1', TIMESTAMPTZ '2020-03-01'); <waiting ...>
step s5_insert_new_chunk: INSERT INTO dropchunks_race_t1 VALUES ('2020-03-01 10:30', 1, 2.2); <waiting ...>
step s4_chunks_dropped_release: SELECT debug_waitpoint_release('drop_chunks_end');
debug_waitpoint_release
step s1_drop_chunks: <... completed>
count
1
step s5_insert_new_chunk: <... completed>
step s3_show_missing_slices: SELECT count(*) FROM _timescaledb_catalog.chunk_constraint WHERE dimension_slice_id NOT IN (SELECT id FROM _timescaledb_catalog.dimension_slice);
count
0
step s3_show_num_chunks: SELECT count(*) FROM show_chunks('dropchunks_race_t1') ORDER BY 1;
count
1
step s3_show_data: SELECT * FROM dropchunks_race_t1 ORDER BY 1;
time device temp
Sun Mar 01 10:30:00 2020 PST1 2.2
starting permutation: s4_chunks_dropped_wait s1_drop_chunks s5_insert_old_chunk s4_chunks_dropped_release s3_show_missing_slices s3_show_num_chunks s3_show_data
step s4_chunks_dropped_wait: SELECT debug_waitpoint_enable('drop_chunks_end');
debug_waitpoint_enable
step s1_drop_chunks: SELECT count(*) FROM drop_chunks('dropchunks_race_t1', TIMESTAMPTZ '2020-03-01'); <waiting ...>
step s5_insert_old_chunk: INSERT INTO dropchunks_race_t1 VALUES ('2020-01-02 10:31', 1, 1.1); <waiting ...>
step s4_chunks_dropped_release: SELECT debug_waitpoint_release('drop_chunks_end');
debug_waitpoint_release
step s1_drop_chunks: <... completed>
count
1
step s5_insert_old_chunk: <... completed>
step s3_show_missing_slices: SELECT count(*) FROM _timescaledb_catalog.chunk_constraint WHERE dimension_slice_id NOT IN (SELECT id FROM _timescaledb_catalog.dimension_slice);
count
0
step s3_show_num_chunks: SELECT count(*) FROM show_chunks('dropchunks_race_t1') ORDER BY 1;
count
1
step s3_show_data: SELECT * FROM dropchunks_race_t1 ORDER BY 1;
time device temp
Thu Jan 02 10:31:00 2020 PST1 1.1

View File

@ -8,10 +8,10 @@ setup {
SELECT create_hypertable('dropchunks_race_t1', 'time', 'device', 2);
INSERT INTO dropchunks_race_t1 VALUES ('2020-01-03 10:30', 1, 32.2);
CREATE FUNCTION debug_waitpoint_enable(TEXT) RETURNS VOID LANGUAGE C VOLATILE STRICT
CREATE OR REPLACE FUNCTION debug_waitpoint_enable(TEXT) RETURNS VOID LANGUAGE C VOLATILE STRICT
AS '@TS_MODULE_PATHNAME@', 'ts_debug_waitpoint_enable';
CREATE FUNCTION debug_waitpoint_release(TEXT) RETURNS VOID LANGUAGE C VOLATILE STRICT
CREATE OR REPLACE FUNCTION debug_waitpoint_release(TEXT) RETURNS VOID LANGUAGE C VOLATILE STRICT
AS '@TS_MODULE_PATHNAME@', 'ts_debug_waitpoint_release';
}
@ -20,14 +20,35 @@ teardown {
}
session "s1"
step "s1a" { SELECT COUNT(*) FROM drop_chunks('dropchunks_race_t1', TIMESTAMPTZ '2020-03-01'); }
step "s1_drop_chunks" { SELECT count(*) FROM drop_chunks('dropchunks_race_t1', TIMESTAMPTZ '2020-03-01'); }
session "s2"
step "s2a" { SELECT COUNT(*) FROM drop_chunks('dropchunks_race_t1', TIMESTAMPTZ '2020-03-01'); }
step "s2_drop_chunks" { SELECT count(*) FROM drop_chunks('dropchunks_race_t1', TIMESTAMPTZ '2020-03-01'); }
session "s3"
setup { SELECT debug_waitpoint_enable('drop_chunks_chunks_found'); }
step "s3a" { SELECT debug_waitpoint_release('drop_chunks_chunks_found'); }
step "s3b" { SELECT COUNT(*) FROM _timescaledb_catalog.chunk_constraint WHERE dimension_slice_id NOT IN (SELECT id FROM _timescaledb_catalog.dimension_slice); }
step "s3_chunks_found_wait" { SELECT debug_waitpoint_enable('drop_chunks_chunks_found'); }
step "s3_chunks_found_release" { SELECT debug_waitpoint_release('drop_chunks_chunks_found'); }
step "s3_show_missing_slices" { SELECT count(*) FROM _timescaledb_catalog.chunk_constraint WHERE dimension_slice_id NOT IN (SELECT id FROM _timescaledb_catalog.dimension_slice); }
step "s3_show_num_chunks" { SELECT count(*) FROM show_chunks('dropchunks_race_t1') ORDER BY 1; }
step "s3_show_data" { SELECT * FROM dropchunks_race_t1 ORDER BY 1; }
permutation "s1a" "s2a" "s3a" "s3b"
session "s4"
step "s4_chunks_dropped_wait" { SELECT debug_waitpoint_enable('drop_chunks_end'); }
step "s4_chunks_dropped_release" { SELECT debug_waitpoint_release('drop_chunks_end'); }
session "s5"
step "s5_insert_old_chunk" { INSERT INTO dropchunks_race_t1 VALUES ('2020-01-02 10:31', 1, 1.1); }
step "s5_insert_new_chunk" { INSERT INTO dropchunks_race_t1 VALUES ('2020-03-01 10:30', 1, 2.2); }
# Test race between two drop_chunks processes.
permutation "s3_chunks_found_wait" "s1_drop_chunks" "s2_drop_chunks" "s3_chunks_found_release" "s3_show_missing_slices" "s3_show_num_chunks" "s3_show_data"
# Test race between drop_chunks and an insert into a new chunk. The
# new chunk will share a slice with the chunk that is about to be
# dropped. The shared slice must persist after drop_chunks completes,
# or otherwise the new chunk will lack one slice.
permutation "s4_chunks_dropped_wait" "s1_drop_chunks" "s5_insert_new_chunk" "s4_chunks_dropped_release" "s3_show_missing_slices" "s3_show_num_chunks" "s3_show_data"
# Test race between drop_chunks and an insert into the chunk being
# concurrently dropped. The chunk and slices should be recreated.
permutation "s4_chunks_dropped_wait" "s1_drop_chunks" "s5_insert_old_chunk" "s4_chunks_dropped_release" "s3_show_missing_slices" "s3_show_num_chunks" "s3_show_data"