Address PR comments.

This commit is contained in:
Suraj Gupta 2021-12-10 16:46:22 -05:00 committed by Josh Slocum
parent 90a652ccfb
commit a674edaa62
2 changed files with 42 additions and 61 deletions

View File

@ -1380,21 +1380,6 @@ ACTOR Future<Void> haltBlobGranules(BlobManagerData* bmData) {
return Void();
}
// TODO: refactor this into a common file
ACTOR Future<Optional<GranuleHistory>> getLatestGranuleHistory(Transaction* tr, KeyRange range) {
KeyRange historyRange = blobGranuleHistoryKeyRangeFor(range);
RangeResult result = wait(tr->getRange(historyRange, 1, Snapshot::False, Reverse::True));
ASSERT(result.size() <= 1);
Optional<GranuleHistory> history;
if (!result.empty()) {
std::pair<KeyRange, Version> decodedKey = decodeBlobGranuleHistoryKey(result[0].key);
ASSERT(range == decodedKey.first);
history = GranuleHistory(range, decodedKey.second, decodeBlobGranuleHistoryValue(result[0].value));
}
return history;
}
ACTOR Future<GranuleFiles> loadHistoryFiles(BlobManagerData* bmData, UID granuleID) {
state Transaction tr(bmData->db);
state KeyRange range = blobGranuleFileKeyRangeFor(granuleID);
@ -1402,7 +1387,7 @@ ACTOR Future<GranuleFiles> loadHistoryFiles(BlobManagerData* bmData, UID granule
state GranuleFiles files;
loop {
try {
wait(readGranuleFiles(&tr, &startKey, range.end, &files, granuleID));
wait(readGranuleFiles(&tr, &startKey, range.end, &files, granuleID, BM_DEBUG));
return files;
} catch (Error& e) {
wait(tr.onError(e));
@ -1480,6 +1465,10 @@ ACTOR Future<Void> fullyDeleteGranule(BlobManagerData* self, UID granuleId, KeyR
/*
* For the granule with id granuleId, finds the first snapshot file at a
* version <= pruneVersion and deletes all files older than it.
*
* Assumption: this granule's startVersion might change because the first snapshot
* file might be deleted. We will need to ensure we don't rely on the granule's startVersion
* (that's persisted as part of the key), but rather use the granule's first snapshot's version when needed
*/
ACTOR Future<Void> partiallyDeleteGranule(BlobManagerData* self, UID granuleId, Version pruneVersion) {
if (BM_DEBUG) {
@ -1551,12 +1540,6 @@ ACTOR Future<Void> partiallyDeleteGranule(BlobManagerData* self, UID granuleId,
wait(waitForAll(deletions));
// delete metadata in FDB (deleted file keys)
// TODO: do we need to also update the start version for the history entry?
// it would be a blind write here so might that cause a problem with history traversal in BW?
// do we gain any benefit from updating it? even if we keep the old start version, the worst is
// someone requests a version in [oldStartVersion, newStartVersion) and we fail to return
// any files for that request.
if (BM_DEBUG) {
printf("Partially deleting granule %s: deleting file keys\n", granuleId.toString().c_str());
}
@ -1603,8 +1586,10 @@ ACTOR Future<Void> pruneRange(BlobManagerData* self, KeyRef startKey, KeyRef end
state std::vector<std::tuple<UID, KeyRef>> toFullyDelete;
state std::vector<UID> toPartiallyDelete;
// set of granuleIds to track which granules we have already visited in traversal
state std::unordered_set<UID> visited; // track which granules we have already visited in traversal
// track which granules we have already added to traversal
// note: (startKey, startVersion) uniquely identifies a granule
state std::unordered_set<std::pair<const uint8_t*, Version>, boost::hash<std::pair<const uint8_t*, Version>>>
visited;
state KeyRange range(KeyRangeRef(startKey, endKey)); // range for [startKey, endKey)
@ -1646,6 +1631,7 @@ ACTOR Future<Void> pruneRange(BlobManagerData* self, KeyRef startKey, KeyRef end
if (BM_DEBUG) {
printf("Adding range to history queue\n");
}
visited.insert({ activeRange.range().begin.begin(), history.get().version });
historyEntryQueue.push({ activeRange.range(), history.get().version, MAX_VERSION });
}
break;
@ -1693,17 +1679,6 @@ ACTOR Future<Void> pruneRange(BlobManagerData* self, KeyRef startKey, KeyRef end
currHistoryNode.granuleID.toString().c_str());
}
// if we already saw this node, skip it; otherwise, mark it as visited
// TODO: doing the visited check here as opposed to when adding this node as a parent
// causes us to do another GET, but the parentGranules field does not contain parent gids
if (visited.count(currHistoryNode.granuleID)) {
if (BM_DEBUG) {
printf("Already processed %s, so skipping it\n", currHistoryNode.granuleID.toString().c_str());
}
continue;
}
visited.insert(currHistoryNode.granuleID);
// There are three cases this granule can fall into:
// - if the granule's end version is at or before the prune version or this is a force delete,
// this granule should be completely deleted
@ -1719,13 +1694,20 @@ ACTOR Future<Void> pruneRange(BlobManagerData* self, KeyRef startKey, KeyRef end
if (BM_DEBUG) {
printf("Granule %s will be partially deleted\n", currHistoryNode.granuleID.toString().c_str());
}
toPartiallyDelete.push_back({ currHistoryNode.granuleID });
toPartiallyDelete.push_back({ currHistoryNode.granuleID, currHistoryNode.contents().});
}
// add all of the node's parents to the queue
for (auto& parent : currHistoryNode.parentGranules) {
// the parent's end version is this node's startVersion,
// since this node must have started where it's parent finished
// if we already added this node to queue, skip it; otherwise, mark it as visited
if (visited.count({ parent.first.begin.begin(), parent.second })) {
if (BM_DEBUG) {
printf("Already added %s to queue, so skipping it\n", currHistoryNode.granuleID.toString().c_str());
}
continue;
}
visited.insert({ parent.first.begin.begin(), parent.second });
if (BM_DEBUG) {
printf("Adding parent [%s-%s) with versions [%lld-%lld) to queue\n",
parent.first.begin.printable().c_str(),
@ -1733,6 +1715,9 @@ ACTOR Future<Void> pruneRange(BlobManagerData* self, KeyRef startKey, KeyRef end
parent.second,
startVersion);
}
// the parent's end version is this node's startVersion,
// since this node must have started where it's parent finished
historyEntryQueue.push({ parent.first, parent.second, startVersion });
}
}
@ -1844,6 +1829,22 @@ ACTOR Future<Void> pruneRange(BlobManagerData* self, KeyRef startKey, KeyRef end
* case that the timer is up before any new prune intents arrive).
*/
ACTOR Future<Void> monitorPruneKeys(BlobManagerData* self) {
// setup bstore
try {
if (BM_DEBUG) {
printf("BM constructing backup container from %s\n", SERVER_KNOBS->BG_URL.c_str());
}
self->bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL);
if (BM_DEBUG) {
printf("BM constructed backup container\n");
}
} catch (Error& e) {
if (BM_DEBUG) {
printf("BM got backup container init error %s\n", e.name());
}
throw e;
}
try {
state Value oldPruneWatchVal;
loop {
@ -1870,7 +1871,7 @@ ACTOR Future<Void> monitorPruneKeys(BlobManagerData* self) {
}
// TODO: debugging code, remove it
if (newPruneWatchVal.get().toString().substr(0, 6) == "random") {
if (newPruneWatchVal.get().toString().substr(0, 6) == "prune=") {
state Reference<ReadYourWritesTransaction> dummy =
makeReference<ReadYourWritesTransaction>(self->db);
loop {
@ -2035,21 +2036,6 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
fmt::print("Blob manager acquired lock at epoch {}\n", epoch);
}
try {
if (BM_DEBUG) {
printf("BM constructing backup container from %s\n", SERVER_KNOBS->BG_URL.c_str());
}
self.bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL);
if (BM_DEBUG) {
printf("BM constructed backup container\n");
}
} catch (Error& e) {
if (BM_DEBUG) {
printf("BM got backup container init error %s\n", e.name());
}
throw e;
}
// although we start the recruiter, we wait until existing workers are ack'd
auto recruitBlobWorker = IAsyncListener<RequestStream<RecruitBlobWorkerRequest>>::create(
dbInfo, [](auto const& info) { return info.clusterInterface.recruitBlobWorker; });

View File

@ -60,7 +60,6 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
int64_t timeTravelTooOld = 0;
int64_t rowsRead = 0;
int64_t bytesRead = 0;
KeyRangeMap<Version> latestPruneVersions;
std::vector<Future<Void>> clients;
DatabaseConfiguration config;
@ -307,6 +306,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
OldRead(KeyRange range, Version v, RangeResult oldResult) : range(range), v(v), oldResult(oldResult) {}
};
// utility to prune <range> at pruneVersion=<version> with the <force> flag
ACTOR Future<Void> pruneAtVersion(Database cx, KeyRange range, Version version, bool force) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
loop {
@ -353,7 +353,6 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
state double endTime = last + self->testDuration;
state std::map<double, OldRead> timeTravelChecks;
state int64_t timeTravelChecksMemory = 0;
state KeyRangeMap<Version> latestPruneVersions;
TraceEvent("BlobGranuleVerifierStart");
if (BGV_DEBUG) {
@ -378,11 +377,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
// advance iterator before doing read, so if it gets error we don't retry it
try {
// before reading, prune at some version [0, readVersion)
Version pruneVersion = deterministicRandom()->randomInt(0, oldRead.v);
wait(self->pruneAtVersion(cx, oldRead.range, pruneVersion, false));
// FIXME: this doesnt actually guarantee that the prune executed. maybe add a delay?
// TODO: before reading, prune at some version [0, readVersion)
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> reReadResult =
wait(self->readFromBlob(cx, self, oldRead.range, oldRead.v));
self->compareResult(oldRead.oldResult, reReadResult, oldRead.range, oldRead.v, false);