more pruning bug fixes

This commit is contained in:
Evan Tschannen 2022-03-15 13:34:59 -07:00
parent d46e551f11
commit 2c88a189a9
3 changed files with 56 additions and 35 deletions

View File

@ -2092,7 +2092,7 @@ ACTOR Future<GranuleFiles> loadHistoryFiles(Reference<BlobManagerData> bmData, U
* also removes the history entry for this granule from the system keyspace * also removes the history entry for this granule from the system keyspace
* TODO: ensure cannot fully delete granule that is still splitting! * TODO: ensure cannot fully delete granule that is still splitting!
*/ */
ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self, UID granuleId, KeyRef historyKey) { ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self, UID granuleId, Key historyKey) {
if (BM_DEBUG) { if (BM_DEBUG) {
fmt::print("Fully deleting granule {0}: init\n", granuleId.toString()); fmt::print("Fully deleting granule {0}: init\n", granuleId.toString());
} }
@ -2194,8 +2194,9 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self, UID g
} }
} }
// we would have only partially deleted the granule if such a snapshot existed if (latestSnapshotVersion == invalidVersion) {
ASSERT(latestSnapshotVersion != invalidVersion); return Void();
}
// delete all delta files older than latestSnapshotVersion // delete all delta files older than latestSnapshotVersion
for (auto deltaFile : files.deltaFiles) { for (auto deltaFile : files.deltaFiles) {
@ -2277,7 +2278,7 @@ ACTOR Future<Void> pruneRange(Reference<BlobManagerData> self, KeyRangeRef range
state std::queue<std::tuple<KeyRange, Version, Version>> historyEntryQueue; state std::queue<std::tuple<KeyRange, Version, Version>> historyEntryQueue;
// stacks of <granuleId, historyKey> and <granuleId> to track which granules to delete // stacks of <granuleId, historyKey> and <granuleId> to track which granules to delete
state std::vector<std::tuple<UID, KeyRef>> toFullyDelete; state std::vector<std::tuple<UID, Key>> toFullyDelete;
state std::vector<UID> toPartiallyDelete; state std::vector<UID> toPartiallyDelete;
// track which granules we have already added to traversal // track which granules we have already added to traversal
@ -2443,7 +2444,7 @@ ACTOR Future<Void> pruneRange(Reference<BlobManagerData> self, KeyRangeRef range
} }
for (i = toFullyDelete.size() - 1; i >= 0; --i) { for (i = toFullyDelete.size() - 1; i >= 0; --i) {
UID granuleId; UID granuleId;
KeyRef historyKey; Key historyKey;
std::tie(granuleId, historyKey) = toFullyDelete[i]; std::tie(granuleId, historyKey) = toFullyDelete[i];
// FIXME: consider batching into a single txn (need to take care of txn size limit) // FIXME: consider batching into a single txn (need to take care of txn size limit)
if (BM_DEBUG) { if (BM_DEBUG) {

View File

@ -1889,31 +1889,34 @@ ACTOR Future<Void> blobGranuleLoadHistory(Reference<BlobWorkerData> bwData,
stopVersion = prev.value().isValid() ? prev.value()->startVersion : invalidVersion; stopVersion = prev.value().isValid() ? prev.value()->startVersion : invalidVersion;
state std::vector<Reference<GranuleHistoryEntry>> historyEntryStack; state std::vector<Reference<GranuleHistoryEntry>> historyEntryStack;
state bool foundHistory = true;
// while the start version of the current granule's parent not past the last known start version, // while the start version of the current granule's parent not past the last known start version,
// walk backwards // walk backwards
while (curHistory.value.parentGranules.size() > 0 && while (curHistory.value.parentGranules.size() > 0 &&
curHistory.value.parentGranules[0].second >= stopVersion) { curHistory.value.parentGranules[0].second >= stopVersion) {
state GranuleHistory next; state GranuleHistory next;
loop { loop {
try { try {
Optional<Value> v = wait(tr.get(blobGranuleHistoryKeyFor( Optional<Value> v = wait(tr.get(blobGranuleHistoryKeyFor(
curHistory.value.parentGranules[0].first, curHistory.value.parentGranules[0].second))); curHistory.value.parentGranules[0].first, curHistory.value.parentGranules[0].second)));
if (!v.present()) { if (!v.present()) {
printf("No granule history present for [%s - %s) @ %lld!!\n", foundHistory = false;
curHistory.value.parentGranules[0].first.begin.printable().c_str(), } else {
curHistory.value.parentGranules[0].first.end.printable().c_str(),
curHistory.value.parentGranules[0].first);
}
ASSERT(v.present());
next = GranuleHistory(curHistory.value.parentGranules[0].first, next = GranuleHistory(curHistory.value.parentGranules[0].first,
curHistory.value.parentGranules[0].second, curHistory.value.parentGranules[0].second,
decodeBlobGranuleHistoryValue(v.get())); decodeBlobGranuleHistoryValue(v.get()));
}
break; break;
} catch (Error& e) { } catch (Error& e) {
wait(tr.onError(e)); wait(tr.onError(e));
} }
} }
if (!foundHistory) {
break;
}
ASSERT(next.version != invalidVersion); ASSERT(next.version != invalidVersion);
// granule next.granuleID goes from the version range [next.version, curHistory.version] // granule next.granuleID goes from the version range [next.version, curHistory.version]
@ -1924,8 +1927,14 @@ ACTOR Future<Void> blobGranuleLoadHistory(Reference<BlobWorkerData> bwData,
if (!historyEntryStack.empty()) { if (!historyEntryStack.empty()) {
Version oldestStartVersion = historyEntryStack.back()->startVersion; Version oldestStartVersion = historyEntryStack.back()->startVersion;
if (!foundHistory && stopVersion != invalidVersion) {
stopVersion = oldestStartVersion;
}
ASSERT(stopVersion == oldestStartVersion || stopVersion == invalidVersion); ASSERT(stopVersion == oldestStartVersion || stopVersion == invalidVersion);
} else { } else {
if (!foundHistory && stopVersion != invalidVersion) {
stopVersion = invalidVersion;
}
ASSERT(stopVersion == invalidVersion); ASSERT(stopVersion == invalidVersion);
} }
@ -1947,11 +1956,13 @@ ACTOR Future<Void> blobGranuleLoadHistory(Reference<BlobWorkerData> bwData,
while (i >= 0) { while (i >= 0) {
auto prevRanges = bwData->granuleHistory.rangeContaining(historyEntryStack[i]->range.begin); auto prevRanges = bwData->granuleHistory.rangeContaining(historyEntryStack[i]->range.begin);
// sanity check if (prevRanges.value().isValid() &&
ASSERT(!prevRanges.value().isValid() || prevRanges.value()->endVersion != historyEntryStack[i]->startVersion) {
prevRanges.value()->endVersion == historyEntryStack[i]->startVersion); historyEntryStack[i]->parentGranule = Reference<GranuleHistoryEntry>();
} else {
historyEntryStack[i]->parentGranule = prevRanges.value(); historyEntryStack[i]->parentGranule = prevRanges.value();
}
bwData->granuleHistory.insert(historyEntryStack[i]->range, historyEntryStack[i]); bwData->granuleHistory.insert(historyEntryStack[i]->range, historyEntryStack[i]);
i--; i--;
} }
@ -2199,7 +2210,11 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
when(wait(metadata->cancelled.getFuture())) { throw wrong_shard_server(); } when(wait(metadata->cancelled.getFuture())) { throw wrong_shard_server(); }
} }
ASSERT(!chunkFiles.snapshotFiles.empty()); if (chunkFiles.snapshotFiles.empty()) {
// a snapshot file must have been pruned
throw blob_granule_transaction_too_old();
}
ASSERT(!chunkFiles.deltaFiles.empty()); ASSERT(!chunkFiles.deltaFiles.empty());
ASSERT(chunkFiles.deltaFiles.back().version > req.readVersion); ASSERT(chunkFiles.deltaFiles.back().version > req.readVersion);
if (chunkFiles.snapshotFiles.front().version > req.readVersion) { if (chunkFiles.snapshotFiles.front().version > req.readVersion) {

View File

@ -39,8 +39,6 @@
#define BGV_DEBUG true #define BGV_DEBUG true
Version dbgPruneVersion = 0;
/* /*
* This workload is designed to verify the correctness of the blob data produced by the blob workers. * This workload is designed to verify the correctness of the blob data produced by the blob workers.
* As a read-only validation workload, it can piggyback off of other write or read/write workloads. * As a read-only validation workload, it can piggyback off of other write or read/write workloads.
@ -64,6 +62,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
int64_t rowsRead = 0; int64_t rowsRead = 0;
int64_t bytesRead = 0; int64_t bytesRead = 0;
std::vector<Future<Void>> clients; std::vector<Future<Void>> clients;
bool enablePruning;
DatabaseConfiguration config; DatabaseConfiguration config;
@ -79,6 +78,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
timeTravelLimit = getOption(options, LiteralStringRef("timeTravelLimit"), testDuration); timeTravelLimit = getOption(options, LiteralStringRef("timeTravelLimit"), testDuration);
timeTravelBufferSize = getOption(options, LiteralStringRef("timeTravelBufferSize"), 100000000); timeTravelBufferSize = getOption(options, LiteralStringRef("timeTravelBufferSize"), 100000000);
threads = getOption(options, LiteralStringRef("threads"), 1); threads = getOption(options, LiteralStringRef("threads"), 1);
enablePruning = getOption(options, LiteralStringRef("enablePruning"), false /*sharedRandomNumber % 2 == 0*/);
ASSERT(threads >= 1); ASSERT(threads >= 1);
if (BGV_DEBUG) { if (BGV_DEBUG) {
@ -454,13 +454,19 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
try { try {
state Version newPruneVersion = 0; state Version newPruneVersion = 0;
state bool doPruning = state bool doPruning = allowPruning && deterministicRandom()->random01() < 0.5;
allowPruning && prevPruneVersion < oldRead.v && deterministicRandom()->random01() < 0.5;
if (doPruning) { if (doPruning) {
newPruneVersion = deterministicRandom()->randomInt64(prevPruneVersion, oldRead.v); Version maxPruneVersion = oldRead.v;
for (auto& it : timeTravelChecks) {
maxPruneVersion = std::min(it.second.v, maxPruneVersion);
}
if (prevPruneVersion < maxPruneVersion) {
newPruneVersion = deterministicRandom()->randomInt64(prevPruneVersion, maxPruneVersion);
prevPruneVersion = std::max(prevPruneVersion, newPruneVersion); prevPruneVersion = std::max(prevPruneVersion, newPruneVersion);
dbgPruneVersion = prevPruneVersion;
wait(self->pruneAtVersion(cx, normalKeys, newPruneVersion, false)); wait(self->pruneAtVersion(cx, normalKeys, newPruneVersion, false));
} else {
doPruning = false;
}
} }
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> reReadResult = std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> reReadResult =
wait(self->readFromBlob(cx, self, oldRead.range, oldRead.v)); wait(self->readFromBlob(cx, self, oldRead.range, oldRead.v));
@ -487,7 +493,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
} }
} }
} catch (Error& e) { } catch (Error& e) {
if (e.code() == error_code_blob_granule_transaction_too_old && oldRead.v >= dbgPruneVersion) { if (e.code() == error_code_blob_granule_transaction_too_old) {
self->timeTravelTooOld++; self->timeTravelTooOld++;
// TODO: add debugging info for when this is a failure // TODO: add debugging info for when this is a failure
} }
@ -532,15 +538,14 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
Future<Void> start(Database const& cx) override { Future<Void> start(Database const& cx) override {
clients.reserve(threads + 1); clients.reserve(threads + 1);
clients.push_back(timeout(findGranules(cx, this), testDuration, Void())); clients.push_back(timeout(findGranules(cx, this), testDuration, Void()));
for (int i = 0; i < threads; i++) { if (enablePruning && clientId == 0) {
clients.push_back( clients.push_back(
timeout(reportErrors( timeout(reportErrors(verifyGranules(cx, this, true), "BlobGranuleVerifier"), testDuration, Void()));
// TODO change back } else if (!enablePruning) {
verifyGranules( for (int i = 0; i < threads; i++) {
cx, this, false /*clientId == 0 && i == 0 && deterministicRandom()->random01() < 0.5*/), clients.push_back(timeout(
"BlobGranuleVerifier"), reportErrors(verifyGranules(cx, this, false), "BlobGranuleVerifier"), testDuration, Void()));
testDuration, }
Void()));
} }
return delay(testDuration); return delay(testDuration);
} }