Merge pull request #7612 from sfc-gh-jslocum/granule_merging_batch

Granule merging batch
This commit is contained in:
Josh Slocum 2022-07-20 11:59:55 -05:00 committed by GitHub
commit cc5ba89fcf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 369 additions and 365 deletions

View File

@ -916,7 +916,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( BG_ENABLE_MERGING, true ); if (randomize && BUGGIFY) BG_ENABLE_MERGING = false;
init( BG_MERGE_CANDIDATE_THRESHOLD_SECONDS, isSimulated ? 20.0 : 30 * 60 ); if (randomize && BUGGIFY) BG_MERGE_CANDIDATE_THRESHOLD_SECONDS = 5.0;
init( BG_MERGE_CANDIDATE_DELAY_SECONDS, BG_MERGE_CANDIDATE_THRESHOLD_SECONDS / 10.0 );
init( BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM, 8 ); if( randomize && BUGGIFY ) BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM = 1;
init( BLOB_WORKER_TIMEOUT, 10.0 ); if( randomize && BUGGIFY ) BLOB_WORKER_TIMEOUT = 1.0;
@ -928,6 +928,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( BLOB_MANAGER_STATUS_EXP_BACKOFF_MIN, 0.1 );
init( BLOB_MANAGER_STATUS_EXP_BACKOFF_MAX, 5.0 );
init( BLOB_MANAGER_STATUS_EXP_BACKOFF_EXPONENT, 1.5 );
init( BLOB_MANAGER_CONCURRENT_MERGE_CHECKS, 64 ); if( randomize && BUGGIFY ) BLOB_MANAGER_CONCURRENT_MERGE_CHECKS = 1 << deterministicRandom()->randomInt(0, 7);
init( BGCC_TIMEOUT, isSimulated ? 10.0 : 120.0 );
init( BGCC_MIN_INTERVAL, isSimulated ? 1.0 : 10.0 );

View File

@ -1524,9 +1524,9 @@ std::pair<BlobGranuleSplitState, Version> decodeBlobGranuleSplitValue(const Valu
const Value blobGranuleMergeValueFor(KeyRange mergeKeyRange,
std::vector<UID> parentGranuleIDs,
std::vector<KeyRange> parentGranuleRanges,
std::vector<Key> parentGranuleRanges,
std::vector<Version> parentGranuleStartVersions) {
ASSERT(parentGranuleIDs.size() == parentGranuleRanges.size());
ASSERT(parentGranuleIDs.size() == parentGranuleRanges.size() - 1);
ASSERT(parentGranuleIDs.size() == parentGranuleStartVersions.size());
BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule()));
@ -1536,12 +1536,12 @@ const Value blobGranuleMergeValueFor(KeyRange mergeKeyRange,
wr << parentGranuleStartVersions;
return addVersionStampAtEnd(wr.toValue());
}
std::tuple<KeyRange, Version, std::vector<UID>, std::vector<KeyRange>, std::vector<Version>>
decodeBlobGranuleMergeValue(ValueRef const& value) {
std::tuple<KeyRange, Version, std::vector<UID>, std::vector<Key>, std::vector<Version>> decodeBlobGranuleMergeValue(
ValueRef const& value) {
KeyRange range;
Version v;
std::vector<UID> parentGranuleIDs;
std::vector<KeyRange> parentGranuleRanges;
std::vector<Key> parentGranuleRanges;
std::vector<Version> parentGranuleStartVersions;
BinaryReader reader(value, IncludeVersion());
@ -1551,7 +1551,7 @@ decodeBlobGranuleMergeValue(ValueRef const& value) {
reader >> parentGranuleStartVersions;
reader >> v;
ASSERT(parentGranuleIDs.size() == parentGranuleRanges.size());
ASSERT(parentGranuleIDs.size() == parentGranuleRanges.size() - 1);
ASSERT(parentGranuleIDs.size() == parentGranuleStartVersions.size());
ASSERT(bigEndian64(v) >= 0);
@ -1581,6 +1581,8 @@ const KeyRange blobGranuleHistoryKeyRangeFor(KeyRangeRef const& range) {
}
const Value blobGranuleHistoryValueFor(Standalone<BlobGranuleHistoryValue> const& historyValue) {
ASSERT(historyValue.parentVersions.empty() ||
historyValue.parentBoundaries.size() - 1 == historyValue.parentVersions.size());
BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule()));
wr << historyValue;
return wr.toValue();
@ -1590,6 +1592,8 @@ Standalone<BlobGranuleHistoryValue> decodeBlobGranuleHistoryValue(const ValueRef
Standalone<BlobGranuleHistoryValue> historyValue;
BinaryReader reader(value, IncludeVersion());
reader >> historyValue;
ASSERT(historyValue.parentVersions.empty() ||
historyValue.parentBoundaries.size() - 1 == historyValue.parentVersions.size());
return historyValue;
}

View File

@ -244,11 +244,13 @@ enum BlobGranuleSplitState { Unknown = 0, Initialized = 1, Assigned = 2, Done =
struct BlobGranuleHistoryValue {
constexpr static FileIdentifier file_identifier = 991434;
UID granuleID;
VectorRef<std::pair<KeyRangeRef, Version>> parentGranules;
// VectorRef<std::pair<KeyRangeRef, Version>> parentGranules;
VectorRef<KeyRef> parentBoundaries;
VectorRef<Version> parentVersions;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, granuleID, parentGranules);
serializer(ar, granuleID, parentBoundaries, parentVersions);
}
};

View File

@ -892,6 +892,7 @@ public:
int BG_CONSISTENCY_CHECK_TARGET_SPEED_KB;
bool BG_ENABLE_MERGING;
int BG_MERGE_CANDIDATE_THRESHOLD_SECONDS;
int BG_MERGE_CANDIDATE_DELAY_SECONDS;
int BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM;
double BLOB_WORKER_TIMEOUT; // Blob Manager's reaction time to a blob worker failure
@ -902,6 +903,7 @@ public:
double BLOB_MANAGER_STATUS_EXP_BACKOFF_MIN;
double BLOB_MANAGER_STATUS_EXP_BACKOFF_MAX;
double BLOB_MANAGER_STATUS_EXP_BACKOFF_EXPONENT;
int BLOB_MANAGER_CONCURRENT_MERGE_CHECKS;
double BGCC_TIMEOUT;
double BGCC_MIN_INTERVAL;

View File

@ -658,11 +658,11 @@ std::pair<BlobGranuleSplitState, Version> decodeBlobGranuleSplitValue(ValueRef c
const Value blobGranuleMergeValueFor(KeyRange mergeKeyRange,
std::vector<UID> parentGranuleIDs,
std::vector<KeyRange> parentGranuleRanges,
std::vector<Key> parentGranuleRanges,
std::vector<Version> parentGranuleStartVersions);
// FIXME: probably just define object type for this?
std::tuple<KeyRange, Version, std::vector<UID>, std::vector<KeyRange>, std::vector<Version>>
decodeBlobGranuleMergeValue(ValueRef const& value);
std::tuple<KeyRange, Version, std::vector<UID>, std::vector<Key>, std::vector<Version>> decodeBlobGranuleMergeValue(
ValueRef const& value);
const Key blobGranuleHistoryKeyFor(KeyRangeRef const& range, Version version);
std::pair<KeyRange, Version> decodeBlobGranuleHistoryKey(KeyRef const& key);

View File

@ -165,4 +165,25 @@ bool compareFDBAndBlob(RangeResult fdb,
}
}
return correct;
}
ACTOR Future<Void> clearAndAwaitMerge(Database cx, KeyRange range) {
// clear key range and check whether it is merged or not, repeatedly
state Transaction tr(cx);
loop {
try {
Standalone<VectorRef<KeyRangeRef>> ranges = wait(tr.getBlobGranuleRanges(range));
if (ranges.size() == 1) {
return Void();
}
CODE_PROBE(true, "ClearAndAwaitMerge doing clear");
tr.clear(range);
wait(tr.commit());
wait(delay(30.0)); // sleep a bit before checking on merge again
tr.reset();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}

View File

@ -25,6 +25,7 @@
#include <vector>
#include <unordered_map>
#include "fdbrpc/simulator.h"
#include "fmt/format.h"
#include "fdbclient/BackupContainerFileSystem.h"
#include "fdbclient/BlobGranuleCommon.h"
@ -281,6 +282,43 @@ struct BlobManagerStats {
}
};
enum MergeCandidateState {
MergeCandidateCannotMerge,
MergeCandidateCanMerge,
MergeCandidateUnknown,
MergeCandidateMerging
};
// The current merge algorithm, skipping just granules that will be merge-eligible on the next pass, but not
// their neighbors, is optimal for guaranteeing merges to make progress where possible, with decently
// optimal but not globally optimal merge behavior.
// Alternative algorithms include not doing a two-pass consideration at all and immediately considering
// all merge candidates, which guarantees the most progress but pretty much guarantees undesirably
// suboptimal merge decisions, because of the time variance of granules becoming merge candidates. Or,
// also skipping adjacent eligible granules in addition to the one that will be eligible next pass,
// which ensures optimally large merges in a future pass, but adds decent delay to doing the merge. Or,
// smarter considering of merge candidates adjacent to the one that will be eligible next pass
// (depending on whether potential future merges with adjacent ones could include this candidate), which
// would be the best of both worlds, but would add a decent amount of code complexity.
struct MergeCandidateInfo {
MergeCandidateState st;
UID granuleID;
Version startVersion;
bool mergeNow;
MergeCandidateInfo() : st(MergeCandidateUnknown), startVersion(invalidVersion), mergeNow(false) {}
MergeCandidateInfo(MergeCandidateState st) : st(st), startVersion(invalidVersion), mergeNow(false) {
ASSERT(st != MergeCandidateCanMerge);
}
MergeCandidateInfo(UID granuleID, Version startVersion)
: st(MergeCandidateCanMerge), granuleID(granuleID), startVersion(startVersion), mergeNow(false) {}
bool canMerge() const { return st == MergeCandidateCanMerge; }
bool canMergeNow() const { return st == MergeCandidateCanMerge && mergeNow; }
};
struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
UID id;
Database db;
@ -301,11 +339,13 @@ struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
KeyRangeMap<BoundaryEvaluation> boundaryEvaluations;
KeyRangeMap<bool> knownBlobRanges;
BGTenantMap tenantData;
KeyRangeMap<Optional<std::pair<UID, Version>>> mergeCandidates; // granule range to granule id + start version.
KeyRangeMap<MergeCandidateInfo> mergeCandidates; // granule range to granule id + start version.
KeyRangeMap<Version> activeGranuleMerges; // range map of active granule merges, because range in boundaryEval
// doesn't correspond to merge range. invalidVersion is no merge,
// 0 is no merge version determined yet
FlowLock concurrentMergeChecks;
AsyncTrigger startRecruiting;
Debouncer restartRecruiting;
std::set<NetworkAddress> recruitingLocalities; // the addrs of the workers being recruited on
@ -321,9 +361,10 @@ struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
BlobManagerData(UID id, Reference<AsyncVar<ServerDBInfo> const> dbInfo, Database db, Optional<Key> dcId)
: id(id), db(db), dcId(dcId), stats(id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &workersById),
knownBlobRanges(false, normalKeys.end), tenantData(BGTenantMap(dbInfo)),
mergeCandidates(Optional<std::pair<UID, Version>>(), normalKeys.end),
activeGranuleMerges(invalidVersion, normalKeys.end), restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY),
recruitingStream(0) {}
mergeCandidates(MergeCandidateInfo(MergeCandidateUnknown), normalKeys.end),
activeGranuleMerges(invalidVersion, normalKeys.end),
concurrentMergeChecks(SERVER_KNOBS->BLOB_MANAGER_CONCURRENT_MERGE_CHECKS),
restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY), recruitingStream(0) {}
// only initialize blob store if actually needed
void initBStore() {
@ -347,6 +388,7 @@ struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
}
return false;
}
Version activeMergeVersion(const KeyRangeRef& range) {
auto ranges = activeGranuleMerges.intersectingRanges(range);
Version v = invalidVersion;
@ -355,6 +397,30 @@ struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
}
return v;
}
void setMergeCandidate(const KeyRangeRef& range, UID granuleID, Version startVersion) {
// Want this to be idempotent. If a granule was already reported as merge-eligible, we want to use the existing
// merge and mergeNow state.
auto it = mergeCandidates.rangeContaining(range.begin);
if (it->begin() == range.begin && it.end() == range.end) {
if (it->cvalue().st != MergeCandidateCanMerge) {
// same range, just update
it->value() = MergeCandidateInfo(granuleID, startVersion);
} else {
// else no-op, but validate data
ASSERT(granuleID == it->cvalue().granuleID);
ASSERT(startVersion == it->cvalue().startVersion);
}
} else if (it->cvalue().st != MergeCandidateMerging) {
mergeCandidates.insert(range, MergeCandidateInfo(granuleID, startVersion));
}
}
void clearMergeCandidate(const KeyRangeRef& range, MergeCandidateState st) {
ASSERT(st != MergeCandidateCanMerge);
mergeCandidates.insert(range, MergeCandidateInfo(st));
}
};
ACTOR Future<Standalone<VectorRef<KeyRef>>> splitRange(Reference<BlobManagerData> bmData,
@ -1272,8 +1338,9 @@ ACTOR Future<Void> maybeSplitRange(Reference<BlobManagerData> bmData,
Standalone<BlobGranuleHistoryValue> historyValue;
historyValue.granuleID = newGranuleIDs[i];
historyValue.parentGranules.push_back(historyValue.arena(),
std::pair(granuleRange, granuleStartVersion));
historyValue.parentBoundaries.push_back(historyValue.arena(), granuleRange.begin);
historyValue.parentBoundaries.push_back(historyValue.arena(), granuleRange.end);
historyValue.parentVersions.push_back(historyValue.arena(), granuleStartVersion);
tr->set(historyKey, blobGranuleHistoryValueFor(historyValue));
@ -1490,7 +1557,7 @@ ACTOR Future<Void> forceGranuleFlush(Reference<BlobManagerData> bmData, KeyRange
ACTOR Future<std::pair<UID, Version>> persistMergeGranulesStart(Reference<BlobManagerData> bmData,
KeyRange mergeRange,
std::vector<UID> parentGranuleIDs,
std::vector<KeyRange> parentGranuleRanges,
std::vector<Key> parentGranuleRanges,
std::vector<Version> parentGranuleStartVersions) {
state UID mergeGranuleID = deterministicRandom()->randomUniqueID();
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
@ -1548,7 +1615,7 @@ ACTOR Future<Void> persistMergeGranulesDone(Reference<BlobManagerData> bmData,
KeyRange mergeRange,
Version mergeVersion,
std::vector<UID> parentGranuleIDs,
std::vector<KeyRange> parentGranuleRanges,
std::vector<Key> parentGranuleRanges,
std::vector<Version> parentGranuleStartVersions) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
// pick worker that has part of old range, it will soon get overridden anyway
@ -1579,13 +1646,14 @@ ACTOR Future<Void> persistMergeGranulesDone(Reference<BlobManagerData> bmData,
state int parentIdx;
// TODO: could parallelize these
for (parentIdx = 0; parentIdx < parentGranuleIDs.size(); parentIdx++) {
state Key lockKey = blobGranuleLockKeyFor(parentGranuleRanges[parentIdx]);
KeyRange parentRange(KeyRangeRef(parentGranuleRanges[parentIdx], parentGranuleRanges[parentIdx + 1]));
state Key lockKey = blobGranuleLockKeyFor(parentRange);
state Future<Optional<Value>> oldLockFuture = tr->get(lockKey);
wait(updateChangeFeed(tr,
granuleIDToCFKey(parentGranuleIDs[parentIdx]),
ChangeFeedStatus::CHANGE_FEED_DESTROY,
parentGranuleRanges[parentIdx]));
parentRange));
if (BM_DEBUG) {
fmt::print("Granule merge destroying CF {0} ({1})!\n",
parentGranuleIDs[parentIdx].shortString().substr(0, 6),
@ -1614,10 +1682,10 @@ ACTOR Future<Void> persistMergeGranulesDone(Reference<BlobManagerData> bmData,
Standalone<BlobGranuleHistoryValue> historyValue;
historyValue.granuleID = mergeGranuleID;
for (parentIdx = 0; parentIdx < parentGranuleIDs.size(); parentIdx++) {
historyValue.parentGranules.push_back(
historyValue.arena(),
std::pair(parentGranuleRanges[parentIdx], parentGranuleStartVersions[parentIdx]));
historyValue.parentBoundaries.push_back(historyValue.arena(), parentGranuleRanges[parentIdx]);
historyValue.parentVersions.push_back(historyValue.arena(), parentGranuleStartVersions[parentIdx]);
}
historyValue.parentBoundaries.push_back(historyValue.arena(), parentGranuleRanges.back());
tr->set(historyKey, blobGranuleHistoryValueFor(historyValue));
@ -1645,7 +1713,7 @@ ACTOR Future<Void> finishMergeGranules(Reference<BlobManagerData> bmData,
KeyRange mergeRange,
Version mergeVersion,
std::vector<UID> parentGranuleIDs,
std::vector<KeyRange> parentGranuleRanges,
std::vector<Key> parentGranuleRanges,
std::vector<Version> parentGranuleStartVersions) {
// wait for BM to be fully recovered before starting actual merges
@ -1684,307 +1752,196 @@ ACTOR Future<Void> finishMergeGranules(Reference<BlobManagerData> bmData,
bmData->boundaryEvaluations.insert(mergeRange,
BoundaryEvaluation(bmData->epoch, seqnoForEval, BoundaryEvalType::MERGE, 0, 0));
bmData->clearMergeCandidate(mergeRange, MergeCandidateMerging);
return Void();
}
// Make a decision on whether to merge this granule with surrounding ones.
ACTOR Future<Void> maybeMergeRange(Reference<BlobManagerData> bmData,
UID granuleID,
KeyRange granuleRange,
Version granuleStartVersion) {
state std::deque<std::tuple<UID, KeyRange, Version>> beforeCandidates, afterCandidates;
ACTOR Future<Void> doMerge(Reference<BlobManagerData> bmData,
KeyRange mergeRange,
std::vector<std::tuple<UID, KeyRange, Version>> toMerge) {
// switch to format persist merge wants
state std::vector<UID> ids;
state std::vector<Key> ranges;
state std::vector<Version> startVersions;
for (auto& it : toMerge) {
ids.push_back(std::get<0>(it));
ranges.push_back(std::get<1>(it).begin);
startVersions.push_back(std::get<2>(it));
}
ranges.push_back(std::get<1>(toMerge.back()).end);
try {
std::pair<UID, Version> persistMerge =
wait(persistMergeGranulesStart(bmData, mergeRange, ids, ranges, startVersions));
wait(finishMergeGranules(
bmData, persistMerge.first, mergeRange, persistMerge.second, ids, ranges, startVersions));
return Void();
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled || e.code() == error_code_blob_manager_replaced) {
throw;
}
TraceEvent(SevError, "UnexpectedErrorGranuleMerge").error(e).detail("Range", mergeRange);
throw e;
}
}
// Needs to not be an actor to run synchronously for the race checking.
// Technically this could just be the first part of doMerge, but this guarantees no waits happen for the checks before
// the logic starts
static void attemptStartMerge(Reference<BlobManagerData> bmData,
const std::vector<std::tuple<UID, KeyRange, Version>>& toMerge) {
if (toMerge.size() < 2) {
return;
}
// TODO REMOVE validation eventually
for (int i = 0; i < toMerge.size() - 1; i++) {
ASSERT(std::get<1>(toMerge[i]).end == std::get<1>(toMerge[i + 1]).begin);
}
KeyRange mergeRange(KeyRangeRef(std::get<1>(toMerge.front()).begin, std::get<1>(toMerge.back()).end));
// merge/merge races should not be possible because granuleMergeChecker should only start attemptMerges() for
// disjoint ranges, and merge candidate is not updated if it is already in the state MergeCandidateMerging
ASSERT(!bmData->isMergeActive(mergeRange));
// Check to avoid races where a split eval came in while merge was evaluating. This also effectively checks
// boundaryEvals because they're both updated before maybeSplitRange is called. This handles split/merge races.
auto reCheckMergeCandidates = bmData->mergeCandidates.intersectingRanges(mergeRange);
for (auto it : reCheckMergeCandidates) {
if (!it->cvalue().canMergeNow()) {
CODE_PROBE(true, " granule no longer merge candidate after checking metrics, aborting merge");
return;
}
}
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): Start\n",
fmt::print("BM {0} Starting merge of [{1} - {2}) ({3})\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable());
mergeRange.begin.printable(),
mergeRange.end.printable(),
toMerge.size());
}
CODE_PROBE(true, "Doing granule merge");
bmData->activeGranuleMerges.insert(mergeRange, 0);
bmData->clearMergeCandidate(mergeRange, MergeCandidateMerging);
// Now, after setting activeGranuleMerges, we have committed to doing the merge, so any subsequent split eval for
// any of the ranges will be ignored. This handles merge/split races.
bmData->addActor.send(doMerge(bmData, mergeRange, toMerge));
}
// look for candidates to the left
if (granuleRange.begin != normalKeys.begin) {
auto rangeBefore = bmData->mergeCandidates.rangeContainingKeyBefore(granuleRange.begin);
while (rangeBefore.cvalue().present() && beforeCandidates.size() < SERVER_KNOBS->BG_MAX_MERGE_FANIN - 1) {
// if it is a merge candidate, add it to the list
beforeCandidates.push_front(
std::tuple(rangeBefore.cvalue().get().first, rangeBefore.range(), rangeBefore.cvalue().get().second));
// Greedily merges any consecutive 2+ granules in a row that are mergeable
ACTOR Future<Void> attemptMerges(Reference<BlobManagerData> bmData,
std::vector<std::tuple<UID, KeyRange, Version>> candidates) {
ASSERT(candidates.size() >= 2);
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): Before candidate [{3} - {4})\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
rangeBefore.begin().printable(),
rangeBefore.end().printable());
}
ASSERT(rangeBefore.begin() >= normalKeys.begin);
if (rangeBefore.begin() == normalKeys.begin) {
break;
} else {
--rangeBefore;
}
}
// TODO REMOVE validation eventually
for (int i = 0; i < candidates.size() - 1; i++) {
ASSERT(std::get<1>(candidates[i]).end == std::get<1>(candidates[i + 1]).begin);
}
CODE_PROBE(true, "Candidate ranges to merge");
wait(bmData->concurrentMergeChecks.take());
state FlowLock::Releaser holdingDVL(bmData->concurrentMergeChecks);
// look for candidates to right
if (granuleRange.end != normalKeys.end) {
auto rangeAfter = bmData->mergeCandidates.rangeContaining(granuleRange.end);
while (rangeAfter.cvalue().present() && afterCandidates.size() < SERVER_KNOBS->BG_MAX_MERGE_FANIN - 1) {
// if it is a merge candidate, add it to the list
afterCandidates.push_back(
std::tuple(rangeAfter.cvalue().get().first, rangeAfter.range(), rangeAfter.cvalue().get().second));
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): After candidate [{3} - {4})\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
rangeAfter.begin().printable(),
rangeAfter.end().printable());
}
ASSERT(rangeAfter.end() <= normalKeys.end);
if (rangeAfter.end() == normalKeys.end) {
break;
} else {
++rangeAfter;
}
}
}
if (beforeCandidates.empty() && afterCandidates.empty()) {
CODE_PROBE(true, "no consecutive merge candidates");
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): No merge candidates\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable());
}
return Void();
}
CODE_PROBE(true, "consecutive granule merge candidates");
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): Checking metrics for {3} candidates ({4} - {5})\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
beforeCandidates.size() + afterCandidates.size() + 1,
beforeCandidates.size(),
afterCandidates.size());
}
// get metrics for current granule to see if it is still mergeable
StorageMetrics targetGranuleMetrics = wait(bmData->db->getStorageMetrics(granuleRange, CLIENT_KNOBS->TOO_MANY));
if (targetGranuleMetrics.bytesPerKSecond >= SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC ||
targetGranuleMetrics.bytes >= SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES) {
CODE_PROBE(true, "granule merge candidate no longer mergeable");
return Void();
}
// best set of granules to merge
state std::vector<UID> bestGranuleIDs;
state std::vector<KeyRange> bestGranuleRanges;
state std::vector<Version> bestGranuleStartVersions;
state KeyRange bestGranuleRange;
// current set of granules being evaluated
state std::deque<std::tuple<UID, KeyRange, Version, int64_t>> windowGranules;
state int64_t windowBytes = targetGranuleMetrics.bytes;
windowGranules.push_back(std::tuple(granuleID, granuleRange, granuleStartVersion, windowBytes));
// first walk backwards through before candidates until combined granule would be too large to merge, or we hit a
// granule that has too high bytesPerKSec and isn't mergeable
// start merging any set of 2+ consecutive granules that can be merged
state int64_t currentBytes = 0;
// large keys can cause a large number of granules in the merge to exceed the maximum value size
state int currentKeySumBytes = 0;
state std::vector<std::tuple<UID, KeyRange, Version>> currentCandidates;
state int i;
for (i = beforeCandidates.size() - 1; i >= 0; i--) {
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): Checking before candidate [{3} - {4})\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
std::get<1>(beforeCandidates[i]).begin.printable(),
std::get<1>(beforeCandidates[i]).end.printable());
for (i = 0; i < candidates.size(); i++) {
StorageMetrics metrics =
wait(bmData->db->getStorageMetrics(std::get<1>(candidates[i]), CLIENT_KNOBS->TOO_MANY));
if (metrics.bytes >= SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES ||
metrics.bytesPerKSecond >= SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC) {
// This granule cannot be merged with any neighbors.
// If current candidates up to here can be merged, merge them and skip over this one
attemptStartMerge(bmData, currentCandidates);
currentCandidates.clear();
currentBytes = 0;
currentKeySumBytes = 0;
continue;
}
StorageMetrics beforeMetrics =
wait(bmData->db->getStorageMetrics(std::get<1>(beforeCandidates[i]), CLIENT_KNOBS->TOO_MANY));
if (windowBytes + beforeMetrics.bytes >= SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES ||
beforeMetrics.bytesPerKSecond >= SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC) {
break;
// if the current window is already at the maximum merge size, or adding this granule would push the window over
// the edge, merge the existing candidates if possible
ASSERT(currentCandidates.size() <= SERVER_KNOBS->BG_MAX_MERGE_FANIN);
if (currentCandidates.size() == SERVER_KNOBS->BG_MAX_MERGE_FANIN ||
currentBytes + metrics.bytes > SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES ||
currentKeySumBytes >= CLIENT_KNOBS->VALUE_SIZE_LIMIT / 2) {
ASSERT(currentBytes <= SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES);
CODE_PROBE(currentKeySumBytes >= CLIENT_KNOBS->VALUE_SIZE_LIMIT / 2, "merge early because of key size");
attemptStartMerge(bmData, currentCandidates);
currentCandidates.clear();
currentBytes = 0;
currentKeySumBytes = 0;
}
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): Before Candidate [{3} - {4}): {5} bytes\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
std::get<1>(beforeCandidates[i]).begin.printable(),
std::get<1>(beforeCandidates[i]).end.printable(),
beforeMetrics.bytes);
// add this granule to the window
if (currentCandidates.empty()) {
currentKeySumBytes += std::get<1>(candidates[i]).begin.size();
}
windowBytes += beforeMetrics.bytes;
windowGranules.push_front(std::tuple(std::get<0>(beforeCandidates[i]),
std::get<1>(beforeCandidates[i]),
std::get<2>(beforeCandidates[i]),
beforeMetrics.bytes));
currentKeySumBytes += std::get<1>(candidates[i]).end.size();
currentCandidates.push_back(candidates[i]);
}
// set first window as the best range
bestGranuleRange = KeyRangeRef(std::get<1>(windowGranules.front()).begin, std::get<1>(windowGranules.back()).end);
for (auto& it : windowGranules) {
bestGranuleIDs.push_back(std::get<0>(it));
bestGranuleRanges.push_back(std::get<1>(it));
bestGranuleStartVersions.push_back(std::get<2>(it));
}
// Do moving window algorithm where we add the next afterCandidate to the merge window, and then remove the tail end
// of beforeCandidates until we are down to a mergeable granule
for (i = 0; i < afterCandidates.size(); i++) {
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): Checking after candidate [{3} - {4})\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
std::get<1>(afterCandidates[i]).begin.printable(),
std::get<1>(afterCandidates[i]).end.printable());
}
// include this granule in the window
StorageMetrics afterMetrics =
wait(bmData->db->getStorageMetrics(std::get<1>(afterCandidates[i]), CLIENT_KNOBS->TOO_MANY));
if (afterMetrics.bytes >= SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES ||
afterMetrics.bytesPerKSecond >= SERVER_KNOBS->SHARD_MIN_BYTES_PER_KSEC) {
break;
}
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): After Candidate [{3} - {4}): {5} bytes\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
std::get<1>(afterCandidates[i]).begin.printable(),
std::get<1>(afterCandidates[i]).end.printable(),
afterMetrics.bytes);
}
windowBytes += afterMetrics.bytes;
windowGranules.push_back(std::tuple(std::get<0>(afterCandidates[i]),
std::get<1>(afterCandidates[i]),
std::get<2>(afterCandidates[i]),
afterMetrics.bytes));
// slide the window forward back down to mergeable size
while (windowBytes >= SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES) {
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): window bytes {3} >= target {4}\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
windowBytes,
SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES);
}
ASSERT(!windowGranules.empty());
if (std::get<0>(windowGranules.front()) == granuleID) {
// merge must include target granule
break;
}
if (BM_DEBUG) {
fmt::print(
"BM {0} maybe merge [{1} - {2}): After Candidate [{3} - {4}) popping [{5} - {6}): {7} bytes\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
std::get<1>(afterCandidates[i]).begin.printable(),
std::get<1>(afterCandidates[i]).end.printable(),
std::get<1>(windowGranules.front()).begin.printable(),
std::get<1>(windowGranules.front()).end.printable(),
std::get<3>(windowGranules.front()));
}
windowBytes -= std::get<3>(windowGranules.front());
windowGranules.pop_front();
}
// compare this candidate window to previous best
if (windowBytes >= SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES) {
break;
} else if (windowGranules.size() > bestGranuleIDs.size()) {
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): new best granules {3}\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
windowGranules.size());
}
bestGranuleRange =
KeyRangeRef(std::get<1>(windowGranules.front()).begin, std::get<1>(windowGranules.back()).end);
bestGranuleIDs.clear();
bestGranuleRanges.clear();
bestGranuleStartVersions.clear();
for (auto& it : windowGranules) {
bestGranuleIDs.push_back(std::get<0>(it));
bestGranuleRanges.push_back(std::get<1>(it));
bestGranuleStartVersions.push_back(std::get<2>(it));
}
}
}
CODE_PROBE(bestGranuleIDs.size() == 1, "Cannot combine merge candidates into mergeable granule");
CODE_PROBE(bestGranuleIDs.size() > 1, "Granule ready for merge!");
if (bestGranuleIDs.size() > 1) {
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): Found {3} consecutive granules in range [{4} - {5}):\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
bestGranuleIDs.size(),
bestGranuleRange.begin.printable(),
bestGranuleRange.end.printable());
}
// This code block must execute withou a wait for the lock checks (isMergeActive, mergeCandidates) to not
// deadlock and to avoid merge-merge races.
if ((!g_network->isSimulated() || !g_simulator.speedUpSimulation) && !bmData->isMergeActive(bestGranuleRange)) {
// check to avoid races where a split eval came in while merge was evaluating
auto reCheckMergeCandidates = bmData->mergeCandidates.intersectingRanges(bestGranuleRange);
bool mergeStillOk = true;
for (auto it : reCheckMergeCandidates) {
if (!it->cvalue().present()) {
CODE_PROBE(true, "granule no longer merge candidate after checking metrics, because of split eval");
mergeStillOk = false;
break;
}
}
if (mergeStillOk) {
fmt::print("BM {0} maybe merge [{1} - {2}): Starting merge of [{3} - {4}) ({5})\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable(),
bestGranuleRange.begin.printable(),
bestGranuleRange.end.printable(),
bestGranuleIDs.size());
CODE_PROBE(true, "Doing granule merge!");
bmData->activeGranuleMerges.insert(bestGranuleRange, 0);
bmData->mergeCandidates.insert(bestGranuleRange, Optional<std::pair<UID, Version>>());
state std::pair<UID, Version> persistMerge = wait(persistMergeGranulesStart(
bmData, bestGranuleRange, bestGranuleIDs, bestGranuleRanges, bestGranuleStartVersions));
wait(finishMergeGranules(bmData,
persistMerge.first,
bestGranuleRange,
persistMerge.second,
bestGranuleIDs,
bestGranuleRanges,
bestGranuleStartVersions));
}
}
} else {
if (BM_DEBUG) {
fmt::print("BM {0} maybe merge [{1} - {2}): No mergeable granules after checking metrics\n",
bmData->epoch,
granuleRange.begin.printable(),
granuleRange.end.printable());
}
}
attemptStartMerge(bmData, currentCandidates);
return Void();
}
// Uses single-pass algorithm to identify mergeable sections of granules.
// To ensure each granule waits to see whether all of its neighbors are merge-eligible before merging it, a newly
// merge-eligible granule will be ignored on the first pass
ACTOR Future<Void> granuleMergeChecker(Reference<BlobManagerData> bmData) {
// initial sleep
wait(delayJittered(SERVER_KNOBS->BG_MERGE_CANDIDATE_DELAY_SECONDS));
// TODO could optimize to not check if there are no new merge-eligible granules and none in merge pending state
loop {
double sleepTime = SERVER_KNOBS->BG_MERGE_CANDIDATE_DELAY_SECONDS;
// Check more frequently if speedUpSimulation is set. This may
if (g_network->isSimulated() && g_simulator.speedUpSimulation) {
sleepTime = std::min(5.0, sleepTime);
}
// start delay at the start of the loop, to account for time spend in calculation
state Future<Void> intervalDelay = delayJittered(sleepTime);
// go over granule states, and start a findMergeableGranules for each sub-range of mergeable granules
// FIXME: avoid SlowTask by breaking this up periodically
// Break it up into parallel chunks. This makes it possible to process large ranges, but does mean the merges
// can be slightly suboptimal at boundaries. Use relatively large chunks to minimize the impact of this.
int maxRangeSize = SERVER_KNOBS->BG_MAX_MERGE_FANIN * 10;
state std::vector<Future<Void>> mergeChecks;
auto allRanges = bmData->mergeCandidates.ranges();
std::vector<std::tuple<UID, KeyRange, Version>> currentCandidates;
for (auto& it : allRanges) {
if (!it->cvalue().canMergeNow() || currentCandidates.size() == maxRangeSize) {
if (currentCandidates.size() >= 2) {
mergeChecks.push_back(attemptMerges(bmData, currentCandidates));
}
currentCandidates.clear();
}
if (it->cvalue().canMergeNow()) {
currentCandidates.push_back(std::tuple(it->cvalue().granuleID, it->range(), it->cvalue().startVersion));
} else if (it->cvalue().canMerge()) {
// set flag so this can get merged on the next pass
it->value().mergeNow = true;
}
}
if (currentCandidates.size() >= 2) {
mergeChecks.push_back(attemptMerges(bmData, currentCandidates));
}
CODE_PROBE(mergeChecks.size() > 1, "parallel merge checks");
wait(waitForAll(mergeChecks));
// if the calculation took longer than the desired interval, still wait a bit
wait(intervalDelay && delay(5.0));
}
}
ACTOR Future<Void> deregisterBlobWorker(Reference<BlobManagerData> bmData, BlobWorkerInterface interf) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bmData->db);
loop {
@ -2309,34 +2266,22 @@ ACTOR Future<Void> monitorBlobWorkerStatus(Reference<BlobManagerData> bmData, Bl
// clear merge candidates for range, if not already merging
if (clearMergeCandidate) {
bmData->mergeCandidates.insert(rep.granuleRange, Optional<std::pair<UID, Version>>());
bmData->clearMergeCandidate(rep.granuleRange, MergeCandidateCannotMerge);
}
}
if (rep.mergeCandidate && !ignore) {
// mark granule as merge candidate
ASSERT(!rep.doSplit);
// TODO: do we need any sort of validation that this is coming from the worker that currently owns
// the granule?
if (existingInProgress.present()) {
// TODO LOG?
} else {
if (BM_DEBUG) {
fmt::print("Manager {0} evaluating [{1} - {2}) {3}\n",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str(),
newEval.toString());
}
if (!bmData->isMergeActive(rep.granuleRange)) {
ASSERT(rep.mergeCandidate);
CODE_PROBE(true, "Granule merge candidate");
bmData->mergeCandidates.insert(rep.granuleRange,
std::pair(rep.granuleID, rep.startVersion));
newEval.inProgress =
maybeMergeRange(bmData, rep.granuleID, rep.granuleRange, rep.startVersion);
// still update epoch/seqno even if not doing a merge eval
bmData->boundaryEvaluations.insert(rep.granuleRange, newEval);
}
CODE_PROBE(true, "Granule merge candidate");
if (BM_DEBUG) {
fmt::print("Manager {0} merge candidate granule [{1} - {2}) {3}\n",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str(),
newEval.toString());
}
bmData->boundaryEvaluations.insert(rep.granuleRange, newEval);
bmData->setMergeCandidate(rep.granuleRange, rep.granuleID, rep.startVersion);
}
}
} catch (Error& e) {
@ -2579,7 +2524,7 @@ ACTOR Future<Void> resumeActiveMerges(Reference<BlobManagerData> bmData) {
UID mergeGranuleID = decodeBlobGranuleMergeKey(it.key);
KeyRange mergeRange;
std::vector<UID> parentGranuleIDs;
std::vector<KeyRange> parentGranuleRanges;
std::vector<Key> parentGranuleRanges;
std::vector<Version> parentGranuleStartVersions;
Version mergeVersion;
std::tie(mergeRange, mergeVersion, parentGranuleIDs, parentGranuleRanges, parentGranuleStartVersions) =
@ -2597,15 +2542,16 @@ ACTOR Future<Void> resumeActiveMerges(Reference<BlobManagerData> bmData) {
// report updated status. Start with early (epoch, seqno) to guarantee lower than later status
BoundaryEvaluation eval(1, 0, BoundaryEvalType::MERGE, 1, 0);
ASSERT(!bmData->isMergeActive(mergeRange));
eval.inProgress = finishMergeGranules(bmData,
mergeGranuleID,
mergeRange,
mergeVersion,
parentGranuleIDs,
parentGranuleRanges,
parentGranuleStartVersions);
bmData->addActor.send(finishMergeGranules(bmData,
mergeGranuleID,
mergeRange,
mergeVersion,
parentGranuleIDs,
parentGranuleRanges,
parentGranuleStartVersions));
bmData->boundaryEvaluations.insert(mergeRange, eval);
bmData->activeGranuleMerges.insert(mergeRange, mergeVersion);
bmData->clearMergeCandidate(mergeRange, MergeCandidateMerging);
}
if (result.more) {
@ -3563,27 +3509,30 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
}
// add all of the node's parents to the queue
for (auto& parent : currHistoryNode.parentGranules) {
for (int i = 0; i < currHistoryNode.parentVersions.size(); i++) {
// for (auto& parent : currHistoryNode.parentVersions.size()) {
// if we already added this node to queue, skip it; otherwise, mark it as visited
if (visited.count({ parent.first.begin.begin(), parent.second })) {
KeyRangeRef parentRange(currHistoryNode.parentBoundaries[i], currHistoryNode.parentBoundaries[i + 1]);
Version parentVersion = currHistoryNode.parentVersions[i];
if (visited.count({ parentRange.begin.begin(), parentVersion })) {
if (BM_DEBUG) {
fmt::print("Already added {0} to queue, so skipping it\n", currHistoryNode.granuleID.toString());
}
continue;
}
visited.insert({ parent.first.begin.begin(), parent.second });
visited.insert({ parentRange.begin.begin(), parentVersion });
if (BM_DEBUG) {
fmt::print("Adding parent [{0} - {1}) with versions [{2} - {3}) to queue\n",
parent.first.begin.printable(),
parent.first.end.printable(),
parent.second,
parentRange.begin.printable(),
parentRange.end.printable(),
parentVersion,
startVersion);
}
// the parent's end version is this node's startVersion,
// since this node must have started where it's parent finished
historyEntryQueue.push({ parent.first, parent.second, startVersion });
historyEntryQueue.push({ parentRange, parentVersion, startVersion });
}
}
@ -3983,6 +3932,9 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
if (SERVER_KNOBS->BG_CONSISTENCY_CHECK_ENABLED) {
self->addActor.send(bgConsistencyCheck(self));
}
if (SERVER_KNOBS->BG_ENABLE_MERGING) {
self->addActor.send(granuleMergeChecker(self));
}
if (BUGGIFY) {
self->addActor.send(chaosRangeMover(self));

View File

@ -1188,13 +1188,13 @@ ACTOR Future<Void> granuleCheckMergeCandidate(Reference<BlobWorkerData> bwData,
}
// wait for the last snapshot to finish, so that the delay is from the last snapshot
wait(waitStart);
wait(delayJittered(SERVER_KNOBS->BG_MERGE_CANDIDATE_THRESHOLD_SECONDS));
double jitter = deterministicRandom()->random01() * 0.8 * SERVER_KNOBS->BG_MERGE_CANDIDATE_DELAY_SECONDS;
wait(delay(SERVER_KNOBS->BG_MERGE_CANDIDATE_THRESHOLD_SECONDS + jitter));
loop {
// this actor will be cancelled if a split check happened, or if the granule was moved away, so this
// being here means that granule is cold enough during that period. Now we just need to check if it is
// also small enough to be a merge candidate.
StorageMetrics currentMetrics = wait(bwData->db->getStorageMetrics(metadata->keyRange, CLIENT_KNOBS->TOO_MANY));
state int64_t granuleBytes = currentMetrics.bytes;
// FIXME: maybe separate knob and/or value for write rate?
if (currentMetrics.bytes >= SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_BYTES / 2 ||
@ -2352,7 +2352,7 @@ ACTOR Future<Void> blobGranuleLoadHistory(Reference<BlobWorkerData> bwData,
GranuleStartState startState = wait(assignFuture);
state Optional<GranuleHistory> activeHistory = startState.history;
if (activeHistory.present() && activeHistory.get().value.parentGranules.size() > 0) {
if (activeHistory.present() && activeHistory.get().value.parentVersions.size() > 0) {
state int64_t loadId = nextHistoryLoadId++;
if (BW_HISTORY_DEBUG) {
fmt::print("HL {0} {1}) Loading history data for [{2} - {3})\n",
@ -2368,7 +2368,7 @@ ACTOR Future<Void> blobGranuleLoadHistory(Reference<BlobWorkerData> bwData,
std::priority_queue<OrderedHistoryKey, std::vector<OrderedHistoryKey>, std::greater<OrderedHistoryKey>>
rootGranules;
state Transaction tr(bwData->db);
if (!activeHistory.get().value.parentGranules.empty()) {
if (!activeHistory.get().value.parentVersions.empty()) {
if (BW_HISTORY_DEBUG) {
fmt::print("HL {0} {1}) Starting history [{2} - {3}) @ {4}\n",
bwData->id.shortString().substr(0, 5),
@ -2437,17 +2437,16 @@ ACTOR Future<Void> blobGranuleLoadHistory(Reference<BlobWorkerData> bwData,
state bool noParentsPresent = true;
// FIXME: parallelize this for all parents/all entries in queue?
loop {
if (pIdx >= curHistory.value.parentGranules.size()) {
if (pIdx >= curHistory.value.parentVersions.size()) {
break;
}
try {
Optional<Value> v =
wait(tr.get(blobGranuleHistoryKeyFor(curHistory.value.parentGranules[pIdx].first,
curHistory.value.parentGranules[pIdx].second)));
state KeyRangeRef parentRange(curHistory.value.parentBoundaries[pIdx],
curHistory.value.parentBoundaries[pIdx + 1]);
state Version parentVersion = curHistory.value.parentVersions[pIdx];
Optional<Value> v = wait(tr.get(blobGranuleHistoryKeyFor(parentRange, parentVersion)));
if (v.present()) {
next = GranuleHistory(curHistory.value.parentGranules[pIdx].first,
curHistory.value.parentGranules[pIdx].second,
decodeBlobGranuleHistoryValue(v.get()));
next = GranuleHistory(parentRange, parentVersion, decodeBlobGranuleHistoryValue(v.get()));
ASSERT(next.version != invalidVersion);
auto inserted = forwardHistory.insert({ next.value.granuleID, ForwardHistoryValue() });
@ -3410,12 +3409,13 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
// If anything in previousGranules, need to do the handoff logic and set
// ret.previousChangeFeedId, and the previous durable version will come from the previous
// granules
if (info.history.present() && info.history.get().value.parentGranules.size() > 0) {
if (info.history.present() && info.history.get().value.parentVersions.size() > 0) {
CODE_PROBE(true, "Granule open found parent");
if (info.history.get().value.parentGranules.size() == 1) { // split
state Key parentHistoryKey =
blobGranuleHistoryKeyFor(info.history.get().value.parentGranules[0].first,
info.history.get().value.parentGranules[0].second);
if (info.history.get().value.parentVersions.size() == 1) { // split
state KeyRangeRef parentRange(info.history.get().value.parentBoundaries[0],
info.history.get().value.parentBoundaries[1]);
state Version parentVersion = info.history.get().value.parentVersions[0];
state Key parentHistoryKey = blobGranuleHistoryKeyFor(parentRange, parentVersion);
Optional<Value> historyParentValue = wait(tr.get(parentHistoryKey));
@ -3424,8 +3424,7 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
decodeBlobGranuleHistoryValue(historyParentValue.get());
UID parentGranuleID = val.granuleID;
info.splitParentGranule =
std::pair(info.history.get().value.parentGranules[0].first, parentGranuleID);
info.splitParentGranule = std::pair(parentRange, parentGranuleID);
state std::pair<BlobGranuleSplitState, Version> granuleSplitState =
std::pair(BlobGranuleSplitState::Initialized, invalidVersion);
@ -3479,8 +3478,12 @@ ACTOR Future<GranuleStartState> openGranule(Reference<BlobWorkerData> bwData, As
// Can't roll back past re-snapshot version
info.changeFeedStartVersion = info.history.get().version;
for (auto& it : info.history.get().value.parentGranules) {
parentGranulesToSnapshot.push_back(loadParentGranuleForMergeSnapshot(&tr, it.first, it.second));
for (int i = 0; i < info.history.get().value.parentVersions.size(); i++) {
KeyRangeRef parentRange(info.history.get().value.parentBoundaries[i],
info.history.get().value.parentBoundaries[i + 1]);
Version parentVersion = info.history.get().value.parentVersions[i];
parentGranulesToSnapshot.push_back(
loadParentGranuleForMergeSnapshot(&tr, parentRange, parentVersion));
}
state int pIdx;

View File

@ -51,6 +51,8 @@ bool compareFDBAndBlob(RangeResult fdb,
Version v,
bool debug);
ACTOR Future<Void> clearAndAwaitMerge(Database cx, KeyRange range);
#include "flow/unactorcompiler.h"
#endif

View File

@ -149,6 +149,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
// parameters global across all clients
int64_t targetByteRate;
bool doMergeCheckAtEnd;
std::vector<Reference<ThreadData>> directories;
std::vector<Future<Void>> clients;
@ -162,6 +163,9 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
// different parameters within those constraints
int64_t randomness = sharedRandomNumber;
doMergeCheckAtEnd = randomness % 10 == 0;
randomness /= 10;
// randomize between low and high directory count
int64_t targetDirectories = 1 + (randomness % 8);
randomness /= 8;
@ -910,7 +914,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
}
wait(self->checkTenantRanges(self, cx, threadData));
bool initialCheck = result;
state bool initialCheck = result;
result &= threadData->mismatches == 0 && (threadData->timeTravelTooOld == 0);
fmt::print("Blob Granule Workload Directory {0} {1}:\n", threadData->directoryID, result ? "passed" : "failed");
@ -933,6 +937,12 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
// For some reason simulation is still passing when this fails?.. so assert for now
ASSERT(result);
if (self->clientId == 0 && SERVER_KNOBS->BG_ENABLE_MERGING && self->doMergeCheckAtEnd) {
CODE_PROBE(true, "BGCorrectness clearing database and awaiting merge");
// TODO: Enable check
// wait(clearAndAwaitMerge(cx, threadData->directoryRange));
}
return result;
}

View File

@ -451,7 +451,8 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
if (BGV_DEBUG && startReadVersion != readVersion) {
fmt::print("Availability check updated read version from {0} to {1}\n", startReadVersion, readVersion);
}
bool result = availabilityPassed && self->mismatches == 0 && (checks > 0) && (self->timeTravelTooOld == 0);
state bool result =
availabilityPassed && self->mismatches == 0 && (checks > 0) && (self->timeTravelTooOld == 0);
fmt::print("Blob Granule Verifier {0} {1}:\n", self->clientId, result ? "passed" : "failed");
fmt::print(" {} successful final granule checks\n", checks);
fmt::print(" {} failed final granule checks\n", availabilityPassed ? 0 : 1);
@ -470,6 +471,12 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
// For some reason simulation is still passing when this fails?.. so assert for now
ASSERT(result);
if (self->clientId == 0 && SERVER_KNOBS->BG_ENABLE_MERGING && deterministicRandom()->random01() < 0.1) {
CODE_PROBE(true, "BGV clearing database and awaiting merge");
// TODO: Enable check
// wait(clearAndAwaitMerge(cx, normalKeys));
}
return result;
}