Ensuring BM split retry is idempotent

This commit is contained in:
Josh Slocum 2022-03-10 11:54:57 -06:00
parent c8c97e0256
commit 4b254d259c
3 changed files with 71 additions and 18 deletions

View File

@ -883,24 +883,15 @@ ACTOR Future<Void> maybeSplitRange(Reference<BlobManagerData> bmData,
coalescedRanges.push_back(coalescedRanges.arena(), newRanges.back());
ASSERT(coalescedRanges.size() == SERVER_KNOBS->BG_MAX_SPLIT_FANOUT + 1);
if (BM_DEBUG) {
fmt::print(
"Downsampled split from {0} -> {1} granules", newRanges.size() - 1, SERVER_KNOBS->BG_MAX_SPLIT_FANOUT);
fmt::print("Downsampled split from {0} -> {1} granules\n",
newRanges.size() - 1,
SERVER_KNOBS->BG_MAX_SPLIT_FANOUT);
}
newRanges = coalescedRanges;
ASSERT(newRanges.size() <= SERVER_KNOBS->BG_MAX_SPLIT_FANOUT + 1);
}
if (BM_DEBUG) {
fmt::print("Splitting range [{0} - {1}) into {2} granules @ {3}:\n",
granuleRange.begin.printable(),
granuleRange.end.printable(),
newRanges.size() - 1,
latestVersion);
for (int i = 0; i < newRanges.size(); i++) {
fmt::print(" {}\n", newRanges[i].printable());
}
}
ASSERT(granuleRange.begin == newRanges.front());
ASSERT(granuleRange.end == newRanges.back());
@ -913,6 +904,19 @@ ACTOR Future<Void> maybeSplitRange(Reference<BlobManagerData> bmData,
newGranuleIDs.push_back(deterministicRandom()->randomUniqueID());
}
if (BM_DEBUG) {
fmt::print("Splitting range [{0} - {1}) into {2} granules @ {3}:\n",
granuleRange.begin.printable(),
granuleRange.end.printable(),
newRanges.size() - 1,
latestVersion);
for (int i = 0; i < newRanges.size(); i++) {
fmt::print(" {}:{}\n",
(i < newGranuleIDs.size() ? newGranuleIDs[i] : UID()).toString().substr(0, 6).c_str(),
newRanges[i].printable());
}
}
// Need to split range. Persist intent to split and split metadata to DB BEFORE sending split assignments to blob
// workers, so that nothing is lost on blob manager recovery
loop {
@ -925,6 +929,47 @@ ACTOR Future<Void> maybeSplitRange(Reference<BlobManagerData> bmData,
// make sure we're still manager when this transaction gets committed
wait(checkManagerLock(tr, bmData));
// TODO can do this + lock in parallel
// Read splitState to see if anything was committed instead of reading granule mapping because we don't want
// to conflict with mapping changes/reassignments
state RangeResult existingState =
wait(tr->getRange(blobGranuleSplitKeyRangeFor(granuleID), SERVER_KNOBS->BG_MAX_SPLIT_FANOUT + 2));
ASSERT_WE_THINK(!existingState.more && existingState.size() <= SERVER_KNOBS->BG_MAX_SPLIT_FANOUT + 1);
// maybe someone decreased the knob, we should gracefully handle it not in simulation
if (existingState.more || existingState.size() > SERVER_KNOBS->BG_MAX_SPLIT_FANOUT) {
RangeResult tryAgain = wait(tr->getRange(blobGranuleSplitKeyRangeFor(granuleID), 10000));
ASSERT(!tryAgain.more);
existingState = tryAgain;
}
if (!existingState.empty()) {
// Something was previously committed, we must go with that decision.
// Read its boundaries and override our planned split boundaries
TEST(true); // Overriding split ranges with existing ones from DB
RangeResult existingBoundaries = wait(tr->getRange(
KeyRangeRef(granuleRange.begin.withPrefix(blobGranuleMappingKeys.begin),
keyAfter(granuleRange.end).withPrefix(blobGranuleMappingKeys.begin)),
existingState.size() + 1)); // +1 because this is boundaries and existingState was granules
ASSERT(!existingBoundaries.more);
ASSERT(existingBoundaries.size() == existingState.size() + 1);
newRanges.clear();
newRanges.arena().dependsOn(existingBoundaries.arena());
for (auto& it : existingBoundaries) {
newRanges.push_back(newRanges.arena(), it.key.removePrefix(blobGranuleMappingKeys.begin));
}
ASSERT(newRanges.front() == granuleRange.begin);
ASSERT(newRanges.back() == granuleRange.end);
if (BM_DEBUG) {
fmt::print("Replaced old range splits for [{0} - {1}) with {2}:\n",
granuleRange.begin.printable(),
granuleRange.end.printable(),
newRanges.size() - 1);
for (int i = 0; i < newRanges.size(); i++) {
fmt::print(" {}\n", newRanges[i].printable());
}
}
break;
}
// acquire lock for old granule to make sure nobody else modifies it
state Key lockKey = blobGranuleLockKeyFor(granuleRange);
Optional<Value> lockValue = wait(tr->get(lockKey));
@ -955,8 +1000,8 @@ ACTOR Future<Void> maybeSplitRange(Reference<BlobManagerData> bmData,
if (!(bmData->epoch > ownerEpoch || (bmData->epoch == ownerEpoch && newLockSeqno > ownerSeqno))) {
fmt::print("BM seqno for granule [{0} - {1}) out of order for lock! manager: ({2}, {3}), owner: "
"({4}, {5}})\n",
granuleRange.begin.printable().c_str(),
granuleRange.end.printable().c_str(),
granuleRange.begin.printable(),
granuleRange.end.printable(),
bmData->epoch,
newLockSeqno,
ownerEpoch,
@ -966,6 +1011,11 @@ ACTOR Future<Void> maybeSplitRange(Reference<BlobManagerData> bmData,
} else if (bmData->epoch == ownerEpoch && newLockSeqno < ownerSeqno) {
// we retried, and between retries we reassigned this range elsewhere. Cancel this split
TEST(true); // BM maybe split cancelled by subsequent move
if (BM_DEBUG) {
fmt::print("Splitting range [{0} - {1}) cancelled by move elsewhere!\n",
granuleRange.begin.printable(),
granuleRange.end.printable());
}
return Void();
}
@ -1036,6 +1086,10 @@ ACTOR Future<Void> maybeSplitRange(Reference<BlobManagerData> bmData,
bmData->rangesToAssign.send(raAssignSplit);
}
// ensure the new assignments actually got processed and the split boundaries are reflected in the granule mapping
// before returning
wait(bmData->rangesToAssign.onEmpty());
return Void();
}

View File

@ -329,12 +329,12 @@ ACTOR Future<Void> updateGranuleSplitState(Transaction* tr,
BlobGranuleSplitState newState) {
state KeyRange currentRange = blobGranuleSplitKeyRangeFor(parentGranuleID);
state RangeResult totalState = wait(tr->getRange(currentRange, SERVER_KNOBS->BG_MAX_SPLIT_FANOUT + 2));
state RangeResult totalState = wait(tr->getRange(currentRange, SERVER_KNOBS->BG_MAX_SPLIT_FANOUT + 1));
// FIXME: remove above conflict range?
tr->addWriteConflictRange(currentRange);
ASSERT_WE_THINK(!totalState.more && totalState.size() <= SERVER_KNOBS->BG_MAX_SPLIT_FANOUT + 1);
ASSERT_WE_THINK(!totalState.more && totalState.size() <= SERVER_KNOBS->BG_MAX_SPLIT_FANOUT);
// maybe someone decreased the knob, we should gracefully handle it not in simulation
if (totalState.more || totalState.size() > SERVER_KNOBS->BG_MAX_SPLIT_FANOUT + 1) {
if (totalState.more || totalState.size() > SERVER_KNOBS->BG_MAX_SPLIT_FANOUT) {
RangeResult tryAgain = wait(tr->getRange(currentRange, 10000));
ASSERT(!tryAgain.more);
totalState = tryAgain;

View File

@ -2125,7 +2125,6 @@ ACTOR Future<int64_t> getNextBMEpoch(ClusterControllerData* self) {
wait(tr->commit());
return newEpoch;
} catch (Error& e) {
printf("Acquiring blob manager lock got error %s\n", e.name());
wait(tr->onError(e));
}
}