mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-16 02:42:23 +08:00
reworked blob manager recovery to be more efficient
This commit is contained in:
parent
e4e7b638c8
commit
9d9cb961a1
@ -1129,6 +1129,7 @@ const KeyRangeRef blobGranuleFileKeys(LiteralStringRef("\xff\x02/bgf/"), Literal
|
|||||||
const KeyRangeRef blobGranuleMappingKeys(LiteralStringRef("\xff\x02/bgm/"), LiteralStringRef("\xff\x02/bgm0"));
|
const KeyRangeRef blobGranuleMappingKeys(LiteralStringRef("\xff\x02/bgm/"), LiteralStringRef("\xff\x02/bgm0"));
|
||||||
const KeyRangeRef blobGranuleLockKeys(LiteralStringRef("\xff\x02/bgl/"), LiteralStringRef("\xff\x02/bgl0"));
|
const KeyRangeRef blobGranuleLockKeys(LiteralStringRef("\xff\x02/bgl/"), LiteralStringRef("\xff\x02/bgl0"));
|
||||||
const KeyRangeRef blobGranuleSplitKeys(LiteralStringRef("\xff\x02/bgs/"), LiteralStringRef("\xff\x02/bgs0"));
|
const KeyRangeRef blobGranuleSplitKeys(LiteralStringRef("\xff\x02/bgs/"), LiteralStringRef("\xff\x02/bgs0"));
|
||||||
|
const KeyRangeRef blobGranuleSplitBoundaryKeys(LiteralStringRef("\xff\x02/bgsb/"), LiteralStringRef("\xff\x02/bgsb0"));
|
||||||
const KeyRangeRef blobGranuleHistoryKeys(LiteralStringRef("\xff\x02/bgh/"), LiteralStringRef("\xff\x02/bgh0"));
|
const KeyRangeRef blobGranuleHistoryKeys(LiteralStringRef("\xff\x02/bgh/"), LiteralStringRef("\xff\x02/bgh0"));
|
||||||
const KeyRangeRef blobGranulePruneKeys(LiteralStringRef("\xff\x02/bgp/"), LiteralStringRef("\xff\x02/bgp0"));
|
const KeyRangeRef blobGranulePruneKeys(LiteralStringRef("\xff\x02/bgp/"), LiteralStringRef("\xff\x02/bgp0"));
|
||||||
const KeyRef blobGranulePruneChangeKey = LiteralStringRef("\xff\x02/bgpChange");
|
const KeyRef blobGranulePruneChangeKey = LiteralStringRef("\xff\x02/bgpChange");
|
||||||
@ -1283,6 +1284,34 @@ std::pair<BlobGranuleSplitState, Version> decodeBlobGranuleSplitValue(const Valu
|
|||||||
return std::pair(st, bigEndian64(v));
|
return std::pair(st, bigEndian64(v));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const Key blobGranuleSplitBoundaryKeyFor(UID const& parentGranuleID, KeyRef const& granuleStart) {
|
||||||
|
BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule()));
|
||||||
|
wr.serializeBytes(blobGranuleSplitBoundaryKeys.begin);
|
||||||
|
wr << parentGranuleID;
|
||||||
|
wr << granuleStart;
|
||||||
|
return wr.toValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::pair<UID, Key> decodeBlobGranuleSplitBoundaryKey(KeyRef const& key) {
|
||||||
|
UID parentGranuleID;
|
||||||
|
Key granuleStart;
|
||||||
|
BinaryReader reader(key.removePrefix(blobGranuleSplitBoundaryKeys.begin),
|
||||||
|
AssumeVersion(ProtocolVersion::withBlobGranule()));
|
||||||
|
|
||||||
|
reader >> parentGranuleID;
|
||||||
|
reader >> granuleStart;
|
||||||
|
return std::pair(parentGranuleID, granuleStart);
|
||||||
|
}
|
||||||
|
|
||||||
|
const KeyRange blobGranuleSplitBoundaryKeyRangeFor(UID const& parentGranuleID) {
|
||||||
|
BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule()));
|
||||||
|
wr.serializeBytes(blobGranuleSplitBoundaryKeys.begin);
|
||||||
|
wr << parentGranuleID;
|
||||||
|
|
||||||
|
Key startKey = wr.toValue();
|
||||||
|
return KeyRangeRef(startKey, strinc(startKey));
|
||||||
|
}
|
||||||
|
|
||||||
const Key blobGranuleHistoryKeyFor(KeyRangeRef const& range, Version version) {
|
const Key blobGranuleHistoryKeyFor(KeyRangeRef const& range, Version version) {
|
||||||
BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule()));
|
BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule()));
|
||||||
wr.serializeBytes(blobGranuleHistoryKeys.begin);
|
wr.serializeBytes(blobGranuleHistoryKeys.begin);
|
||||||
|
@ -556,6 +556,9 @@ extern const KeyRangeRef blobGranuleLockKeys;
|
|||||||
// \xff\x02/bgs/(parentGranuleUID, granuleUID) = [[BlobGranuleSplitState]]
|
// \xff\x02/bgs/(parentGranuleUID, granuleUID) = [[BlobGranuleSplitState]]
|
||||||
extern const KeyRangeRef blobGranuleSplitKeys;
|
extern const KeyRangeRef blobGranuleSplitKeys;
|
||||||
|
|
||||||
|
// \xff\x02/bgsb/(parentGranuleID, granuleStartKey) = []
|
||||||
|
extern const KeyRangeRef blobGranuleSplitBoundaryKeys;
|
||||||
|
|
||||||
// \xff\x02/bgh/(beginKey,endKey,startVersion) = { granuleUID, [parentGranuleHistoryKeys] }
|
// \xff\x02/bgh/(beginKey,endKey,startVersion) = { granuleUID, [parentGranuleHistoryKeys] }
|
||||||
extern const KeyRangeRef blobGranuleHistoryKeys;
|
extern const KeyRangeRef blobGranuleHistoryKeys;
|
||||||
|
|
||||||
@ -589,6 +592,10 @@ const KeyRange blobGranuleSplitKeyRangeFor(UID const& parentGranuleID);
|
|||||||
const Value blobGranuleSplitValueFor(BlobGranuleSplitState st);
|
const Value blobGranuleSplitValueFor(BlobGranuleSplitState st);
|
||||||
std::pair<BlobGranuleSplitState, Version> decodeBlobGranuleSplitValue(ValueRef const& value);
|
std::pair<BlobGranuleSplitState, Version> decodeBlobGranuleSplitValue(ValueRef const& value);
|
||||||
|
|
||||||
|
const Key blobGranuleSplitBoundaryKeyFor(UID const& parentGranuleID, KeyRef const& granuleStart);
|
||||||
|
std::pair<UID, Key> decodeBlobGranuleSplitBoundaryKey(KeyRef const& key);
|
||||||
|
const KeyRange blobGranuleSplitBoundaryKeyRangeFor(UID const& parentGranuleID);
|
||||||
|
|
||||||
const Key blobGranuleHistoryKeyFor(KeyRangeRef const& range, Version version);
|
const Key blobGranuleHistoryKeyFor(KeyRangeRef const& range, Version version);
|
||||||
std::pair<KeyRange, Version> decodeBlobGranuleHistoryKey(KeyRef const& key);
|
std::pair<KeyRange, Version> decodeBlobGranuleHistoryKey(KeyRef const& key);
|
||||||
const KeyRange blobGranuleHistoryKeyRangeFor(KeyRangeRef const& range);
|
const KeyRange blobGranuleHistoryKeyRangeFor(KeyRangeRef const& range);
|
||||||
|
@ -832,6 +832,7 @@ ACTOR Future<Void> maybeSplitRange(BlobManagerData* bmData,
|
|||||||
UID newGranuleID = deterministicRandom()->randomUniqueID();
|
UID newGranuleID = deterministicRandom()->randomUniqueID();
|
||||||
|
|
||||||
Key splitKey = blobGranuleSplitKeyFor(granuleID, newGranuleID);
|
Key splitKey = blobGranuleSplitKeyFor(granuleID, newGranuleID);
|
||||||
|
tr->set(blobGranuleSplitBoundaryKeyFor(granuleID, newRanges[i]), Value());
|
||||||
|
|
||||||
tr->atomicOp(splitKey,
|
tr->atomicOp(splitKey,
|
||||||
blobGranuleSplitValueFor(BlobGranuleSplitState::Initialized),
|
blobGranuleSplitValueFor(BlobGranuleSplitState::Initialized),
|
||||||
@ -851,6 +852,7 @@ ACTOR Future<Void> maybeSplitRange(BlobManagerData* bmData,
|
|||||||
latestVersion);*/
|
latestVersion);*/
|
||||||
tr->set(historyKey, blobGranuleHistoryValueFor(historyValue));
|
tr->set(historyKey, blobGranuleHistoryValueFor(historyValue));
|
||||||
}
|
}
|
||||||
|
tr->set(blobGranuleSplitBoundaryKeyFor(granuleID, newRanges.back()), Value());
|
||||||
|
|
||||||
wait(tr->commit());
|
wait(tr->commit());
|
||||||
break;
|
break;
|
||||||
@ -1185,15 +1187,13 @@ ACTOR Future<Void> recoverBlobManager(BlobManagerData* bmData) {
|
|||||||
// BM is recovering. Now the mapping at this time looks like G->deadBW. But the rangeAssigner handles this:
|
// BM is recovering. Now the mapping at this time looks like G->deadBW. But the rangeAssigner handles this:
|
||||||
// we'll try to assign a range to a dead worker and fail and reassign it to the next best worker.
|
// we'll try to assign a range to a dead worker and fail and reassign it to the next best worker.
|
||||||
//
|
//
|
||||||
// 2. We get all granule history entries, to get a mapping from granule id to key range, for step 3.
|
// 3. We get the existing split intentions and boundaries that were Started but not acknowledged by any blob workers
|
||||||
//
|
// and add them to our key range map, bmData->granuleAssignments. Note that we are adding them on top of the
|
||||||
// 3. We get the existing split intentions that were Started but not acknowledged by any blob workers and
|
// granule mappings and since we are using a key range map, we end up with the same set of shard boundaries as
|
||||||
// add them to our key range map, bmData->granuleAssignments. Note that we are adding them on top of
|
// the old blob manager had. For these splits, we simply assign the range to the next best worker. This is not
|
||||||
// the granule mappings and since we are using a key range map, we end up with the same set of shard
|
// any worst than what the old blob manager would have done. Details: Note that this means that if a worker we
|
||||||
// boundaries as the old blob manager had. For these splits, we simply assign the range to the next
|
// intended to give a splitted range to dies before the new BM recovers, then we'll simply assign the range to
|
||||||
// best worker. This is not any worst than what the old blob manager would have done.
|
// the next best worker.
|
||||||
// Details: Note that this means that if a worker we intended to give a splitted range to dies
|
|
||||||
// before the new BM recovers, then we'll simply assign the range to the next best worker.
|
|
||||||
//
|
//
|
||||||
// 4. For every range in our granuleAssignments, we send an assign request to the stream of requests,
|
// 4. For every range in our granuleAssignments, we send an assign request to the stream of requests,
|
||||||
// ultimately giving every range back to some worker (trying to mimic the state of the old BM).
|
// ultimately giving every range back to some worker (trying to mimic the state of the old BM).
|
||||||
@ -1208,16 +1208,17 @@ ACTOR Future<Void> recoverBlobManager(BlobManagerData* bmData) {
|
|||||||
state int rowLimit = BUGGIFY ? deterministicRandom()->randomInt(2, 10) : 10000;
|
state int rowLimit = BUGGIFY ? deterministicRandom()->randomInt(2, 10) : 10000;
|
||||||
|
|
||||||
if (BM_DEBUG) {
|
if (BM_DEBUG) {
|
||||||
printf("BM %lld recovering:\n", bmData->epoch);
|
fmt::print("BM {0} recovering:\n", bmData->epoch);
|
||||||
printf("BM %lld found old assignments:\n", bmData->epoch);
|
fmt::print("BM {0} found old assignments:\n", bmData->epoch);
|
||||||
}
|
}
|
||||||
// Step 1. Get the latest known mapping of granules to blob workers (i.e. assignments)
|
|
||||||
|
// TODO could populate most/all of this list by just asking existing blob workers for their range sets to reduce DB
|
||||||
|
// read load on BM restart Step 1. Get the latest known mapping of granules to blob workers (i.e. assignments)
|
||||||
state KeyRef beginKey = normalKeys.begin;
|
state KeyRef beginKey = normalKeys.begin;
|
||||||
loop {
|
loop {
|
||||||
try {
|
try {
|
||||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||||
wait(checkManagerLock(tr, bmData));
|
|
||||||
|
|
||||||
// TODO: replace row limit with knob
|
// TODO: replace row limit with knob
|
||||||
KeyRange nextRange(KeyRangeRef(beginKey, normalKeys.end));
|
KeyRange nextRange(KeyRangeRef(beginKey, normalKeys.end));
|
||||||
@ -1256,98 +1257,197 @@ ACTOR Future<Void> recoverBlobManager(BlobManagerData* bmData) {
|
|||||||
|
|
||||||
beginKey = lastEndKey;
|
beginKey = lastEndKey;
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
|
if (BM_DEBUG) {
|
||||||
|
fmt::print("BM {0} got error reading granule mapping during recovery: {1}\n", bmData->epoch, e.name());
|
||||||
|
}
|
||||||
wait(tr->onError(e));
|
wait(tr->onError(e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO could avoid if no splits in progress
|
// TODO use range stream instead
|
||||||
// Step 2. Read all history entries, so we can know the range of each sub-granule that is splitting
|
|
||||||
|
state UID currentParentID = UID();
|
||||||
|
state Optional<UID> nextParentID;
|
||||||
|
state std::vector<Key> splitBoundaries;
|
||||||
|
state std::vector<std::pair<UID, BlobGranuleSplitState>> splitStates;
|
||||||
|
|
||||||
|
state Key splitBeginKey = blobGranuleSplitKeys.begin;
|
||||||
|
state RangeResult splitResult;
|
||||||
|
splitResult.readThrough = splitBeginKey;
|
||||||
|
splitResult.more = true;
|
||||||
|
state int splitResultIdx = 0;
|
||||||
|
|
||||||
|
state Key boundaryBeginKey = blobGranuleSplitBoundaryKeys.begin;
|
||||||
|
state RangeResult boundaryResult;
|
||||||
|
boundaryResult.readThrough = boundaryBeginKey;
|
||||||
|
boundaryResult.more = true;
|
||||||
|
state int boundaryResultIdx = 0;
|
||||||
|
|
||||||
|
// Step 3. Get the latest known split intentions and boundaries
|
||||||
tr->reset();
|
tr->reset();
|
||||||
beginKey = blobGranuleHistoryKeys.begin;
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||||
|
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||||
|
|
||||||
if (BM_DEBUG) {
|
if (BM_DEBUG) {
|
||||||
printf("BM %lld found history entries:\n", bmData->epoch);
|
fmt::print("BM {0} found in progress splits:\n", bmData->epoch);
|
||||||
}
|
}
|
||||||
loop {
|
loop {
|
||||||
try {
|
// Advance both split and boundary readers until we hit another granule or EOS, to get the full state for one
|
||||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
// granule split. Effectively a stream merge.
|
||||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
|
||||||
wait(checkManagerLock(tr, bmData));
|
|
||||||
|
|
||||||
RangeResult results = wait(tr->getRange(KeyRangeRef(beginKey, blobGranuleHistoryKeys.end), rowLimit));
|
// Advance split reader
|
||||||
|
loop {
|
||||||
// Add the granules for the started split intentions to the in-memory key range map
|
if (splitResultIdx >= splitResult.size()) {
|
||||||
for (auto history : results) {
|
if (!splitResult.more) {
|
||||||
KeyRange granuleRange;
|
break;
|
||||||
Version version;
|
|
||||||
std::tie(granuleRange, version) = decodeBlobGranuleHistoryKey(history.key);
|
|
||||||
Standalone<BlobGranuleHistoryValue> v = decodeBlobGranuleHistoryValue(history.value);
|
|
||||||
granuleIdToRange[v.granuleID] = granuleRange;
|
|
||||||
if (BM_DEBUG) {
|
|
||||||
fmt::print(" {0}=[{1} - {2})\n",
|
|
||||||
v.granuleID,
|
|
||||||
granuleRange.begin.printable(),
|
|
||||||
granuleRange.end.printable());
|
|
||||||
}
|
}
|
||||||
}
|
ASSERT(splitResult.readThrough.present());
|
||||||
|
splitBeginKey = splitResult.readThrough.get();
|
||||||
if (!results.more) {
|
loop {
|
||||||
break;
|
try {
|
||||||
}
|
RangeResult r =
|
||||||
|
wait(tr->getRange(KeyRangeRef(splitBeginKey, blobGranuleSplitKeys.end), rowLimit));
|
||||||
beginKey = results.readThrough.get();
|
ASSERT(r.size() > 0 || !r.more);
|
||||||
} catch (Error& e) {
|
splitResult = r;
|
||||||
wait(tr->onError(e));
|
splitResultIdx = 0;
|
||||||
}
|
break;
|
||||||
}
|
} catch (Error& e) {
|
||||||
|
if (BM_DEBUG) {
|
||||||
// Step 3. Get the latest known split intentions
|
fmt::print("BM {0} got error advancing split cursor: {1}\n", bmData->epoch, e.name());
|
||||||
tr->reset();
|
}
|
||||||
beginKey = blobGranuleSplitKeys.begin;
|
wait(tr->onError(e));
|
||||||
if (BM_DEBUG) {
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||||
printf("BM %lld found in progress splits:\n", bmData->epoch);
|
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||||
}
|
|
||||||
loop {
|
|
||||||
try {
|
|
||||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
|
||||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
|
||||||
wait(checkManagerLock(tr, bmData));
|
|
||||||
|
|
||||||
// TODO: replace row limit with knob
|
|
||||||
RangeResult results = wait(tr->getRange(KeyRangeRef(beginKey, blobGranuleSplitKeys.end), rowLimit));
|
|
||||||
|
|
||||||
// Add the granules for the started split intentions to the in-memory key range map
|
|
||||||
for (auto split : results) {
|
|
||||||
UID parentGranuleID, granuleID;
|
|
||||||
BlobGranuleSplitState splitState;
|
|
||||||
Version version;
|
|
||||||
|
|
||||||
std::tie(parentGranuleID, granuleID) = decodeBlobGranuleSplitKey(split.key);
|
|
||||||
if (split.value.size() == 0) {
|
|
||||||
printf("No value for %s/%s split??\n",
|
|
||||||
parentGranuleID.toString().c_str(),
|
|
||||||
granuleID.toString().c_str());
|
|
||||||
ASSERT(split.value.size() > 0);
|
|
||||||
}
|
|
||||||
std::tie(splitState, version) = decodeBlobGranuleSplitValue(split.value);
|
|
||||||
|
|
||||||
// TODO THIS RANGE IS WRONG
|
|
||||||
ASSERT(granuleIdToRange.count(granuleID) == 1);
|
|
||||||
const KeyRange range = granuleIdToRange[granuleID];
|
|
||||||
if (splitState <= BlobGranuleSplitState::Initialized) {
|
|
||||||
// the empty UID signifies that we need to find an owner (worker) for this range
|
|
||||||
workerAssignments.insert(range, UID());
|
|
||||||
if (BM_DEBUG) {
|
|
||||||
fmt::print(" [{0} - {1})\n", range.begin.printable(), range.end.printable());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
// if we got a response and there are zero rows, we are done
|
||||||
|
if (splitResult.empty()) {
|
||||||
if (!results.more) {
|
ASSERT(!splitResult.more);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
while (splitResultIdx < splitResult.size()) {
|
||||||
|
UID parentGranuleID, granuleID;
|
||||||
|
|
||||||
beginKey = results.readThrough.get();
|
std::tie(parentGranuleID, granuleID) = decodeBlobGranuleSplitKey(splitResult[splitResultIdx].key);
|
||||||
|
if (parentGranuleID != currentParentID) {
|
||||||
|
nextParentID = parentGranuleID;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
BlobGranuleSplitState splitState;
|
||||||
|
Version version;
|
||||||
|
std::tie(splitState, version) = decodeBlobGranuleSplitValue(splitResult[splitResultIdx].value);
|
||||||
|
splitStates.push_back(std::pair(granuleID, splitState));
|
||||||
|
splitResultIdx++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Advance boundary reader
|
||||||
|
loop {
|
||||||
|
if (boundaryResultIdx >= boundaryResult.size()) {
|
||||||
|
if (!boundaryResult.more) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
ASSERT(boundaryResult.readThrough.present());
|
||||||
|
boundaryBeginKey = boundaryResult.readThrough.get();
|
||||||
|
loop {
|
||||||
|
try {
|
||||||
|
RangeResult r = wait(
|
||||||
|
tr->getRange(KeyRangeRef(boundaryBeginKey, blobGranuleSplitBoundaryKeys.end), rowLimit));
|
||||||
|
ASSERT(r.size() > 0 || !r.more);
|
||||||
|
boundaryResult = r;
|
||||||
|
boundaryResultIdx = 0;
|
||||||
|
break;
|
||||||
|
} catch (Error& e) {
|
||||||
|
if (BM_DEBUG) {
|
||||||
|
fmt::print("BM {0} got error advancing boundary cursor: {1}\n", bmData->epoch, e.name());
|
||||||
|
}
|
||||||
|
wait(tr->onError(e));
|
||||||
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||||
|
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// if we got a response and there are zero rows, we are done
|
||||||
|
if (boundaryResult.empty()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
while (boundaryResultIdx < boundaryResult.size()) {
|
||||||
|
UID parentGranuleID;
|
||||||
|
Key boundaryKey;
|
||||||
|
std::tie(parentGranuleID, boundaryKey) =
|
||||||
|
decodeBlobGranuleSplitBoundaryKey(boundaryResult[boundaryResultIdx].key);
|
||||||
|
if (parentGranuleID != currentParentID) {
|
||||||
|
// nextParentID should have already been set by split reader
|
||||||
|
ASSERT(nextParentID.present());
|
||||||
|
ASSERT(nextParentID.get() == parentGranuleID);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
splitBoundaries.push_back(boundaryKey);
|
||||||
|
boundaryResultIdx++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// process this split
|
||||||
|
if (currentParentID != UID()) {
|
||||||
|
if (BM_DEBUG) {
|
||||||
|
fmt::print(" [{0} - {1}) {2}:\n",
|
||||||
|
splitBoundaries.front().printable(),
|
||||||
|
splitBoundaries.back().printable(),
|
||||||
|
currentParentID.toString().substr(0, 6));
|
||||||
|
}
|
||||||
|
ASSERT(splitBoundaries.size() - 1 == splitStates.size());
|
||||||
|
for (int i = 0; i < splitStates.size(); i++) {
|
||||||
|
// if this split boundary had not been opened by a blob worker before the last manager crashed, we must
|
||||||
|
// ensure it gets assigned to one
|
||||||
|
KeyRange range = KeyRange(KeyRangeRef(splitBoundaries[i], splitBoundaries[i + 1]));
|
||||||
|
|
||||||
|
if (BM_DEBUG) {
|
||||||
|
printf(" ");
|
||||||
|
}
|
||||||
|
if (splitStates[i].second <= BlobGranuleSplitState::Initialized) {
|
||||||
|
// the empty UID signifies that we need to find an owner (worker) for this range
|
||||||
|
if (BM_DEBUG) {
|
||||||
|
printf("*** ");
|
||||||
|
}
|
||||||
|
workerAssignments.insert(range, UID());
|
||||||
|
}
|
||||||
|
if (BM_DEBUG) {
|
||||||
|
fmt::print("[{0} - {1}) {2}\n",
|
||||||
|
range.begin.printable(),
|
||||||
|
range.end.printable(),
|
||||||
|
splitStates[i].first.toString().substr(0, 6));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
splitBoundaries.clear();
|
||||||
|
splitStates.clear();
|
||||||
|
|
||||||
|
if (!nextParentID.present()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
currentParentID = nextParentID.get();
|
||||||
|
nextParentID.reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 4. Send assign requests for all the granules and transfer assignments
|
||||||
|
// from local workerAssignments to bmData
|
||||||
|
// before we take ownership of all of the ranges, check the manager lock again
|
||||||
|
tr->reset();
|
||||||
|
loop {
|
||||||
|
try {
|
||||||
|
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||||
|
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||||
|
wait(checkManagerLock(tr, bmData));
|
||||||
|
break;
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
|
if (BM_DEBUG) {
|
||||||
|
fmt::print("BM {0} got error checking lock after recovery: {1}\n", bmData->epoch, e.name());
|
||||||
|
}
|
||||||
wait(tr->onError(e));
|
wait(tr->onError(e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1355,8 +1455,7 @@ ACTOR Future<Void> recoverBlobManager(BlobManagerData* bmData) {
|
|||||||
if (BM_DEBUG) {
|
if (BM_DEBUG) {
|
||||||
fmt::print("BM {0} final ranges:\n", bmData->epoch);
|
fmt::print("BM {0} final ranges:\n", bmData->epoch);
|
||||||
}
|
}
|
||||||
// Step 4. Send assign requests for all the granules and transfer assignments
|
|
||||||
// from local workerAssignments to bmData
|
|
||||||
for (auto& range : workerAssignments.intersectingRanges(normalKeys)) {
|
for (auto& range : workerAssignments.intersectingRanges(normalKeys)) {
|
||||||
if (!range.value().present()) {
|
if (!range.value().present()) {
|
||||||
continue;
|
continue;
|
||||||
@ -1619,6 +1718,7 @@ ACTOR Future<GranuleFiles> loadHistoryFiles(BlobManagerData* bmData, UID granule
|
|||||||
/*
|
/*
|
||||||
* Deletes all files pertaining to the granule with id granuleId and
|
* Deletes all files pertaining to the granule with id granuleId and
|
||||||
* also removes the history entry for this granule from the system keyspace
|
* also removes the history entry for this granule from the system keyspace
|
||||||
|
* TODO ensure cannot fully delete granule that is still splitting!
|
||||||
*/
|
*/
|
||||||
ACTOR Future<Void> fullyDeleteGranule(BlobManagerData* self, UID granuleId, KeyRef historyKey) {
|
ACTOR Future<Void> fullyDeleteGranule(BlobManagerData* self, UID granuleId, KeyRef historyKey) {
|
||||||
if (BM_DEBUG) {
|
if (BM_DEBUG) {
|
||||||
@ -1849,7 +1949,8 @@ ACTOR Future<Void> pruneRange(BlobManagerData* self, KeyRef startKey, KeyRef end
|
|||||||
activeRange.end().printable());
|
activeRange.end().printable());
|
||||||
}
|
}
|
||||||
Optional<GranuleHistory> history = wait(getLatestGranuleHistory(&tr, activeRange.range()));
|
Optional<GranuleHistory> history = wait(getLatestGranuleHistory(&tr, activeRange.range()));
|
||||||
// TODO: can we tell from the krm that this range is not valid, so that we don't need to do a get
|
// TODO: can we tell from the krm that this range is not valid, so that we don't need to do a
|
||||||
|
// get
|
||||||
if (history.present()) {
|
if (history.present()) {
|
||||||
if (BM_DEBUG) {
|
if (BM_DEBUG) {
|
||||||
printf("Adding range to history queue\n");
|
printf("Adding range to history queue\n");
|
||||||
@ -2187,15 +2288,16 @@ ACTOR Future<Void> monitorPruneKeys(BlobManagerData* self) {
|
|||||||
prunes.emplace_back(pruneRange(self, rangeStartKey, rangeEndKey, pruneVersion, force));
|
prunes.emplace_back(pruneRange(self, rangeStartKey, rangeEndKey, pruneVersion, force));
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait for this set of prunes to complete before starting the next ones since if we prune
|
// wait for this set of prunes to complete before starting the next ones since if we
|
||||||
// a range R at version V and while we are doing that, the time expires, we will end up
|
// prune a range R at version V and while we are doing that, the time expires, we will
|
||||||
// trying to prune the same range again since the work isn't finished and the prunes will
|
// end up trying to prune the same range again since the work isn't finished and the
|
||||||
// race
|
// prunes will race
|
||||||
//
|
//
|
||||||
// TODO: this isn't that efficient though. Instead we could keep metadata as part of the
|
// TODO: this isn't that efficient though. Instead we could keep metadata as part of the
|
||||||
// BM's memory that tracks which prunes are active. Once done, we can mark that work as
|
// BM's memory that tracks which prunes are active. Once done, we can mark that work as
|
||||||
// done. If the BM fails then all prunes will fail and so the next BM will have a clear set
|
// done. If the BM fails then all prunes will fail and so the next BM will have a clear
|
||||||
// of metadata (i.e. no work in progress) so we will end up doing the work in the new BM
|
// set of metadata (i.e. no work in progress) so we will end up doing the work in the
|
||||||
|
// new BM
|
||||||
wait(waitForAll(prunes));
|
wait(waitForAll(prunes));
|
||||||
|
|
||||||
if (!pruneIntents.more) {
|
if (!pruneIntents.more) {
|
||||||
@ -2304,8 +2406,8 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
|
|||||||
// DB has [B - D). It should show up coalesced in knownBlobRanges, and [C - D) should be removed.
|
// DB has [B - D). It should show up coalesced in knownBlobRanges, and [C - D) should be removed.
|
||||||
// DB has [A - D). It should show up coalesced in knownBlobRanges, and [A - B) should be removed.
|
// DB has [A - D). It should show up coalesced in knownBlobRanges, and [A - B) should be removed.
|
||||||
// DB has [A - B) and [C - D). They should show up in knownBlobRanges, and [B - C) should be in removed.
|
// DB has [A - B) and [C - D). They should show up in knownBlobRanges, and [B - C) should be in removed.
|
||||||
// DB has [B - C). It should show up in knownBlobRanges, [B - C) should be in added, and [A - B) and [C - D) should
|
// DB has [B - C). It should show up in knownBlobRanges, [B - C) should be in added, and [A - B) and [C - D)
|
||||||
// be in removed.
|
// should be in removed.
|
||||||
TEST_CASE(":/blobmanager/updateranges") {
|
TEST_CASE(":/blobmanager/updateranges") {
|
||||||
KeyRangeMap<bool> knownBlobRanges(false, normalKeys.end);
|
KeyRangeMap<bool> knownBlobRanges(false, normalKeys.end);
|
||||||
Arena ar;
|
Arena ar;
|
||||||
|
@ -376,6 +376,7 @@ ACTOR Future<Void> updateGranuleSplitState(Transaction* tr,
|
|||||||
Key oldGranuleLockKey = blobGranuleLockKeyFor(parentGranuleRange);
|
Key oldGranuleLockKey = blobGranuleLockKeyFor(parentGranuleRange);
|
||||||
tr->clear(singleKeyRange(oldGranuleLockKey));
|
tr->clear(singleKeyRange(oldGranuleLockKey));
|
||||||
tr->clear(currentRange);
|
tr->clear(currentRange);
|
||||||
|
tr->clear(blobGranuleSplitBoundaryKeyRangeFor(parentGranuleID));
|
||||||
} else {
|
} else {
|
||||||
tr->atomicOp(myStateKey, blobGranuleSplitValueFor(newState), MutationRef::SetVersionstampedValue);
|
tr->atomicOp(myStateKey, blobGranuleSplitValueFor(newState), MutationRef::SetVersionstampedValue);
|
||||||
if (newState == BlobGranuleSplitState::Assigned && currentState == BlobGranuleSplitState::Initialized &&
|
if (newState == BlobGranuleSplitState::Assigned && currentState == BlobGranuleSplitState::Initialized &&
|
||||||
|
Loading…
x
Reference in New Issue
Block a user