mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-26 09:22:34 +08:00
fmt
This commit is contained in:
parent
c8a3413820
commit
ef7fdc0781
@ -897,6 +897,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns a random healthy team, which does not contain excludeServer.
|
||||||
std::vector<UID> getRandomHealthyTeam(const UID& excludeServer) {
|
std::vector<UID> getRandomHealthyTeam(const UID& excludeServer) {
|
||||||
int count = 0;
|
int count = 0;
|
||||||
Optional<int> idx;
|
Optional<int> idx;
|
||||||
@ -6144,6 +6145,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
|
|||||||
TraceEvent("DataDistributorDestroyTeamCollections").error(e);
|
TraceEvent("DataDistributorDestroyTeamCollections").error(e);
|
||||||
state std::vector<UID> teamForDroppedRange;
|
state std::vector<UID> teamForDroppedRange;
|
||||||
if (removeFailedServer.getFuture().isReady() && !removeFailedServer.getFuture().isError()) {
|
if (removeFailedServer.getFuture().isReady() && !removeFailedServer.getFuture().isError()) {
|
||||||
|
// Choose a random healthy team to host the to-be-dropped range.
|
||||||
const UID serverID = removeFailedServer.getFuture().get();
|
const UID serverID = removeFailedServer.getFuture().get();
|
||||||
std::vector<UID> pTeam = primaryTeamCollection->getRandomHealthyTeam(serverID);
|
std::vector<UID> pTeam = primaryTeamCollection->getRandomHealthyTeam(serverID);
|
||||||
teamForDroppedRange.insert(teamForDroppedRange.end(), pTeam.begin(), pTeam.end());
|
teamForDroppedRange.insert(teamForDroppedRange.end(), pTeam.begin(), pTeam.end());
|
||||||
|
@ -1301,6 +1301,8 @@ ACTOR Future<Void> removeStorageServer(Database cx,
|
|||||||
}
|
}
|
||||||
// Remove the server from keyServer list and set serverKeysFalse to the server's serverKeys list.
|
// Remove the server from keyServer list and set serverKeysFalse to the server's serverKeys list.
|
||||||
// Changes to keyServer and serverKey must happen symmetrically in a transaction.
|
// Changes to keyServer and serverKey must happen symmetrically in a transaction.
|
||||||
|
// If serverID is the last source server for a shard, the shard will be erased, and then be assigned
|
||||||
|
// to teamForDroppedRange.
|
||||||
ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
|
ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
|
||||||
UID serverID,
|
UID serverID,
|
||||||
std::vector<UID> teamForDroppedRange,
|
std::vector<UID> teamForDroppedRange,
|
||||||
@ -1333,21 +1335,6 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
|
|||||||
KeyRangeRef(begin, allKeys.end),
|
KeyRangeRef(begin, allKeys.end),
|
||||||
SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT,
|
SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT,
|
||||||
SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES));
|
SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES));
|
||||||
|
|
||||||
// teamForDroppedRange.clear();
|
|
||||||
// for (int i = 0; i < keyServers.size() && teamForDroppedRange.empty(); ++i) {
|
|
||||||
// decodeKeyServersValue(UIDtoTagMap, keyServers[i].value, src, dest);
|
|
||||||
// if (std::find(dest.begin(), dest.end(), serverID) == dest.end()) {
|
|
||||||
// teamForDroppedRange.insert(teamForDroppedRange.end(), dest.begin(), dest.end());
|
|
||||||
// }
|
|
||||||
// if (!teamForDroppedRange.empty()) {
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// if (std::find(src.begin(), src.end(), serverID) == src.end()) {
|
|
||||||
// teamForDroppedRange.insert(teamForDroppedRange.end(), src.begin(), src.end());
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
state KeyRange currentKeys = KeyRangeRef(begin, keyServers.end()[-1].key);
|
state KeyRange currentKeys = KeyRangeRef(begin, keyServers.end()[-1].key);
|
||||||
state int i = 0;
|
state int i = 0;
|
||||||
for (; i < keyServers.size() - 1; ++i) {
|
for (; i < keyServers.size() - 1; ++i) {
|
||||||
@ -1360,17 +1347,13 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
TraceEvent("FailedServerRemoveBegin", serverID)
|
|
||||||
.detail("Key", it.key)
|
|
||||||
.detail("ValueSrc", describe(src))
|
|
||||||
.detail("ValueDest", describe(dest));
|
|
||||||
|
|
||||||
// Update the vectors to remove failed server then set the value again
|
// Update the vectors to remove failed server then set the value again
|
||||||
// Dest is usually empty, but keep this in case there is parallel data movement
|
// Dest is usually empty, but keep this in case there is parallel data movement
|
||||||
src.erase(std::remove(src.begin(), src.end(), serverID), src.end());
|
src.erase(std::remove(src.begin(), src.end(), serverID), src.end());
|
||||||
dest.erase(std::remove(dest.begin(), dest.end(), serverID), dest.end());
|
dest.erase(std::remove(dest.begin(), dest.end(), serverID), dest.end());
|
||||||
|
|
||||||
// If the last src server is to be removed, first check if there are dest servers who is
|
// If the last src server is to be removed, first check if there are dest servers who is
|
||||||
// hosting a read-write copy of the data, and move such dest servers to the src list.
|
// hosting a read-write copy of the keyrange, and move such dest servers to the src list.
|
||||||
if (src.empty() && !dest.empty()) {
|
if (src.empty() && !dest.empty()) {
|
||||||
std::vector<UID> newSources =
|
std::vector<UID> newSources =
|
||||||
wait(pickReadWriteServers(&tr, dest, KeyRangeRef(it.key, keyServers[i + 1].key)));
|
wait(pickReadWriteServers(&tr, dest, KeyRangeRef(it.key, keyServers[i + 1].key)));
|
||||||
@ -1382,12 +1365,18 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
|
|||||||
src.push_back(id);
|
src.push_back(id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Remove the shard from keyServers/ if the src list is empty, and also remove the shard from all
|
|
||||||
// dest servers.
|
// Move the keyrange to teamForDroppedRange if the src list becomes empty, and also remove the shard
|
||||||
|
// from all dest servers.
|
||||||
if (src.empty()) {
|
if (src.empty()) {
|
||||||
assert(!teamForDroppedRange.empty());
|
assert(!teamForDroppedRange.empty());
|
||||||
|
|
||||||
|
// Assign the shard to teamFroDroppedRange in keyServer space.
|
||||||
tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, teamForDroppedRange, {}));
|
tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, teamForDroppedRange, {}));
|
||||||
|
|
||||||
vector<Future<Void>> actors;
|
vector<Future<Void>> actors;
|
||||||
|
|
||||||
|
// Unassign the shard from the dest servers.
|
||||||
for (const UID& id : dest) {
|
for (const UID& id : dest) {
|
||||||
actors.push_back(krmSetRangeCoalescing(&tr,
|
actors.push_back(krmSetRangeCoalescing(&tr,
|
||||||
serverKeysPrefixFor(id),
|
serverKeysPrefixFor(id),
|
||||||
@ -1395,7 +1384,15 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
|
|||||||
allKeys,
|
allKeys,
|
||||||
serverKeysFalse));
|
serverKeysFalse));
|
||||||
}
|
}
|
||||||
// Update serverKeys to include keys.
|
if (!dest.empty()) {
|
||||||
|
TraceEvent(SevWarn, "FailedServerDropRangeFromDest", serverID)
|
||||||
|
.detail("Begin", it.key)
|
||||||
|
.detail("End", keyServers[i + 1].key)
|
||||||
|
.detail("Dest", describe(dest));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assign the shard to the new team as an empty range.
|
||||||
|
// Note, there could be data loss.
|
||||||
for (const UID& id : teamForDroppedRange) {
|
for (const UID& id : teamForDroppedRange) {
|
||||||
actors.push_back(krmSetRangeCoalescing(&tr,
|
actors.push_back(krmSetRangeCoalescing(&tr,
|
||||||
serverKeysPrefixFor(id),
|
serverKeysPrefixFor(id),
|
||||||
@ -1403,13 +1400,14 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
|
|||||||
allKeys,
|
allKeys,
|
||||||
serverKeysTrueEmptyRange));
|
serverKeysTrueEmptyRange));
|
||||||
}
|
}
|
||||||
TraceEvent(SevWarn, "FailedServerRemoveRange", serverID)
|
|
||||||
.detail("Key", it.key)
|
|
||||||
.detail("OldDest", describe(dest))
|
|
||||||
.detail("NewTeam", describe(teamForDroppedRange));
|
|
||||||
wait(waitForAll(actors));
|
wait(waitForAll(actors));
|
||||||
|
TraceEvent(SevWarn, "FailedServerDropRange", serverID)
|
||||||
|
.detail("Begin", it.key)
|
||||||
|
.detail("End", keyServers[i + 1].key)
|
||||||
|
.detail("NewTeam", describe(teamForDroppedRange));
|
||||||
} else {
|
} else {
|
||||||
TraceEvent("FailedServerSetKey", serverID)
|
TraceEvent(SevDebug, "FailedServerSetKey", serverID)
|
||||||
.detail("Key", it.key)
|
.detail("Key", it.key)
|
||||||
.detail("ValueSrc", describe(src))
|
.detail("ValueSrc", describe(src))
|
||||||
.detail("ValueDest", describe(dest));
|
.detail("ValueDest", describe(dest));
|
||||||
@ -1418,12 +1416,12 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Set entire range for our serverID in serverKeys keyspace to false to signal erasure
|
// Set entire range for our serverID in serverKeys keyspace to false to signal erasure
|
||||||
TraceEvent("FailedServerSetRange", serverID)
|
TraceEvent(SevDebug, "FailedServerSetRange", serverID)
|
||||||
.detail("Begin", currentKeys.begin)
|
.detail("Begin", currentKeys.begin)
|
||||||
.detail("End", currentKeys.end);
|
.detail("End", currentKeys.end);
|
||||||
wait(krmSetRangeCoalescing(&tr, serverKeysPrefixFor(serverID), currentKeys, allKeys, serverKeysFalse));
|
wait(krmSetRangeCoalescing(&tr, serverKeysPrefixFor(serverID), currentKeys, allKeys, serverKeysFalse));
|
||||||
wait(tr.commit());
|
wait(tr.commit());
|
||||||
TraceEvent("FailedServerCommitSuccess", serverID)
|
TraceEvent(SevDebug, "FailedServerCommitSuccess", serverID)
|
||||||
.detail("Begin", currentKeys.begin)
|
.detail("Begin", currentKeys.begin)
|
||||||
.detail("End", currentKeys.end)
|
.detail("End", currentKeys.end)
|
||||||
.detail("CommitVersion", tr.getCommittedVersion());
|
.detail("CommitVersion", tr.getCommittedVersion());
|
||||||
|
@ -3478,7 +3478,7 @@ private:
|
|||||||
bool processedCacheStartKey;
|
bool processedCacheStartKey;
|
||||||
|
|
||||||
void applyPrivateData(StorageServer* data, MutationRef const& m) {
|
void applyPrivateData(StorageServer* data, MutationRef const& m) {
|
||||||
TraceEvent("SSPrivateMutation", data->thisServerID).detail("Mutation", m);
|
TraceEvent(SevDebug, "SSPrivateMutation", data->thisServerID).detail("Mutation", m);
|
||||||
|
|
||||||
if (processedStartKey) {
|
if (processedStartKey) {
|
||||||
// Because of the implementation of the krm* functions, we expect changes in pairs, [begin,end)
|
// Because of the implementation of the krm* functions, we expect changes in pairs, [begin,end)
|
||||||
@ -3494,6 +3494,8 @@ private:
|
|||||||
|
|
||||||
// The changes for version have already been received (and are being processed now). We need to fetch
|
// The changes for version have already been received (and are being processed now). We need to fetch
|
||||||
// the data for change.version-1 (changes from versions < change.version)
|
// the data for change.version-1 (changes from versions < change.version)
|
||||||
|
// If emptyRange, treat the shard as empty, see removeKeysFromFailedServer() for more details about this
|
||||||
|
// scenario.
|
||||||
const Version shardVersion = (emptyRange && nowAssigned) ? 0 : currentVersion - 1;
|
const Version shardVersion = (emptyRange && nowAssigned) ? 0 : currentVersion - 1;
|
||||||
changeServerKeys(data, keys, nowAssigned, shardVersion, CSK_UPDATE);
|
changeServerKeys(data, keys, nowAssigned, shardVersion, CSK_UPDATE);
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user