more cleanup

This commit is contained in:
Josh Slocum 2022-03-10 13:53:06 -06:00
parent c35e020da7
commit 903f7adbc4
6 changed files with 34 additions and 35 deletions

View File

@ -6747,6 +6747,7 @@ Future<Standalone<VectorRef<KeyRef>>> Transaction::getRangeSplitPoints(KeyRange
return ::getRangeSplitPoints(trState, keys, chunkSize);
}
// TODO REMOVE when correctness clean
#define BG_REQUEST_DEBUG false
// the blob granule requests are a bit funky because they piggyback off the existing transaction to read from the system

View File

@ -45,12 +45,7 @@ ACTOR Future<Optional<GranuleHistory>> getLatestGranuleHistory(Transaction* tr,
// Gets the files based on the file key range [startKey, endKey)
// and populates the files object accordingly
ACTOR Future<Void> readGranuleFiles(Transaction* tr,
Key* startKey,
Key endKey,
GranuleFiles* files,
UID granuleID,
bool debug) {
ACTOR Future<Void> readGranuleFiles(Transaction* tr, Key* startKey, Key endKey, GranuleFiles* files, UID granuleID) {
loop {
int lim = BUGGIFY ? 2 : 1000;
@ -85,18 +80,12 @@ ACTOR Future<Void> readGranuleFiles(Transaction* tr,
break;
}
}
if (debug) {
printf("Loaded %d snapshot and %d delta files for %s\n",
files->snapshotFiles.size(),
files->deltaFiles.size(),
granuleID.toString().c_str());
}
return Void();
}
// Wrapper around readGranuleFiles
// Gets all files belonging to the granule with id granule ID
ACTOR Future<GranuleFiles> loadHistoryFiles(Database cx, UID granuleID, bool debug) {
ACTOR Future<GranuleFiles> loadHistoryFiles(Database cx, UID granuleID) {
state KeyRange range = blobGranuleFileKeyRangeFor(granuleID);
state Key startKey = range.begin;
state GranuleFiles files;
@ -106,7 +95,7 @@ ACTOR Future<GranuleFiles> loadHistoryFiles(Database cx, UID granuleID, bool deb
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
wait(readGranuleFiles(&tr, &startKey, range.end, &files, granuleID, debug));
wait(readGranuleFiles(&tr, &startKey, range.end, &files, granuleID));
return files;
} catch (Error& e) {
wait(tr.onError(e));

View File

@ -64,12 +64,7 @@ struct GranuleFiles {
class Transaction;
ACTOR Future<Optional<GranuleHistory>> getLatestGranuleHistory(Transaction* tr, KeyRange range);
ACTOR Future<Void> readGranuleFiles(Transaction* tr,
Key* startKey,
Key endKey,
GranuleFiles* files,
UID granuleID,
bool debug);
ACTOR Future<Void> readGranuleFiles(Transaction* tr, Key* startKey, Key endKey, GranuleFiles* files, UID granuleID);
ACTOR Future<GranuleFiles> loadHistoryFiles(Database cx, UID granuleID, bool debug);
ACTOR Future<GranuleFiles> loadHistoryFiles(Database cx, UID granuleID);
#endif

View File

@ -2043,7 +2043,7 @@ ACTOR Future<GranuleFiles> loadHistoryFiles(Reference<BlobManagerData> bmData, U
state GranuleFiles files;
loop {
try {
wait(readGranuleFiles(&tr, &startKey, range.end, &files, granuleID, BM_DEBUG));
wait(readGranuleFiles(&tr, &startKey, range.end, &files, granuleID));
return files;
} catch (Error& e) {
wait(tr.onError(e));
@ -2062,7 +2062,7 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self, UID granu
}
// get files
GranuleFiles files = wait(loadHistoryFiles(self->db, granuleId, BM_DEBUG));
GranuleFiles files = wait(loadHistoryFiles(self->db, granuleId));
std::vector<Future<Void>> deletions;
std::vector<std::string> filesToDelete; // TODO: remove, just for debugging
@ -2134,7 +2134,7 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self, UID g
}
// get files
GranuleFiles files = wait(loadHistoryFiles(self->db, granuleId, BM_DEBUG));
GranuleFiles files = wait(loadHistoryFiles(self->db, granuleId));
// represents the version of the latest snapshot file in this granule with G.version < pruneVersion
Version latestSnapshotVersion = invalidVersion;
@ -2605,8 +2605,8 @@ ACTOR Future<Void> doLockChecks(Reference<BlobManagerData> bmData) {
}
}
ACTOR Future<Void> blobManagerExclusionSafetyCheck(Reference<BlobManagerData> self,
BlobManagerExclusionSafetyCheckRequest req) {
static void blobManagerExclusionSafetyCheck(Reference<BlobManagerData> self,
BlobManagerExclusionSafetyCheckRequest req) {
TraceEvent("BMExclusionSafetyCheckBegin", self->id).log();
BlobManagerExclusionSafetyCheckReply reply(true);
// make sure at least one blob worker remains after exclusions
@ -2632,8 +2632,6 @@ ACTOR Future<Void> blobManagerExclusionSafetyCheck(Reference<BlobManagerData> se
TraceEvent("BMExclusionSafetyCheckEnd", self->id).log();
req.reply.send(reply);
return Void();
}
ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
@ -2692,7 +2690,7 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
}
when(BlobManagerExclusionSafetyCheckRequest exclCheckReq =
waitNext(bmInterf.blobManagerExclCheckReq.getFuture())) {
self->addActor.send(blobManagerExclusionSafetyCheck(self, exclCheckReq));
blobManagerExclusionSafetyCheck(self, exclCheckReq);
}
when(wait(collection)) {
TraceEvent("BlobManagerActorCollectionError");

View File

@ -281,7 +281,7 @@ ACTOR Future<GranuleFiles> loadHistoryFiles(Reference<BlobWorkerData> bwData, UI
state GranuleFiles files;
loop {
try {
wait(readGranuleFiles(&tr, &startKey, range.end, &files, granuleID, BW_DEBUG));
wait(readGranuleFiles(&tr, &startKey, range.end, &files, granuleID));
return files;
} catch (Error& e) {
wait(tr.onError(e));
@ -296,7 +296,7 @@ ACTOR Future<GranuleFiles> loadPreviousFiles(Transaction* tr, UID granuleID) {
// no need to add conflict range for read b/c of granule lock
state Key startKey = range.begin;
state GranuleFiles files;
wait(readGranuleFiles(tr, &startKey, range.end, &files, granuleID, BW_DEBUG));
wait(readGranuleFiles(tr, &startKey, range.end, &files, granuleID));
return files;
}
@ -967,7 +967,9 @@ ACTOR Future<Void> handleCompletedDeltaFile(Reference<BlobWorkerData> bwData,
if (completedDeltaFile.version > cfStartVersion) {
if (BW_DEBUG) {
fmt::print("Popping change feed {0} at {1}\n", cfKey.printable(), completedDeltaFile.version);
fmt::print("Popping change feed {0} at {1}\n",
cfKeyToGranuleID(cfKey).toString().c_str(),
completedDeltaFile.version);
}
// FIXME: for a write-hot shard, we could potentially batch these and only pop the largest one after several
// have completed
@ -1595,7 +1597,8 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
DEBUG_MUTATION("BlobWorkerBuffer", deltas.version, delta, bwData->id)
.detail("Granule", metadata->keyRange)
.detail("ChangeFeedID", readOldChangeFeed ? oldCFKey.get() : cfKey)
.detail("ChangeFeedID",
cfKeyToGranuleID(readOldChangeFeed ? oldCFKey.get() : cfKey))
.detail("OldChangeFeed", readOldChangeFeed ? "T" : "F");
}
metadata->currentDeltas.push_back_deep(metadata->currentDeltas.arena(), deltas);
@ -1990,6 +1993,16 @@ ACTOR Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version v
// if we don't have to wait for change feed version to catch up or wait for any pending file writes to complete,
// nothing to do
if (BW_REQUEST_DEBUG) {
printf("WFV %lld) CF=%lld, pendingD=%lld, durableD=%lld, pendingS=%lld, durableS=%lld\n",
v,
metadata->activeCFData.get()->getVersion(),
metadata->pendingDeltaVersion,
metadata->durableDeltaVersion.get(),
metadata->pendingSnapshotVersion,
metadata->durableSnapshotVersion.get());
}
ASSERT(metadata->activeCFData.get().isValid());
if (v <= metadata->activeCFData.get()->getVersion() &&
@ -2163,7 +2176,7 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
// lazily load files for old granule if not present
chunkRange = cur->range;
if (!cur->files.isValid() || cur->files.isError()) {
cur->files = loadHistoryFiles(bwData->db, cur->granuleID, BW_DEBUG);
cur->files = loadHistoryFiles(bwData->db, cur->granuleID);
}
choose {

View File

@ -1204,7 +1204,10 @@ public:
auto& clientVersions = changeFeedClientVersions[addr];
Version minVersion = version.get();
for (auto& it : clientVersions) {
// fmt::print("Blocked client {0} @ {1}\n", it.first.toString().substr(0, 8), it.second);
/*fmt::print("SS {0} Blocked client {1} @ {2}\n",
thisServerID.toString().substr(0, 4),
it.first.toString().substr(0, 8),
it.second);*/
minVersion = std::min(minVersion, it.second);
}
return minVersion;