updated keyServers removal to be multi-transactional in order to avoid hitting transaction timeout

This commit is contained in:
Jon Fu 2019-08-13 13:40:05 -07:00
parent ddfcbae929
commit 080fbc63dc
3 changed files with 48 additions and 51 deletions

View File

@ -2054,7 +2054,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
std::string errorStr = std::string errorStr =
"ERROR: It is unsafe to exclude the specified servers at this time.\n" "ERROR: It is unsafe to exclude the specified servers at this time.\n"
"Please try the exclude again in 30 seconds.\n" "Please try the exclude again in 30 seconds.\n"
"Type `exclude FORCE <ADDRESS>*' to exclude without performing safety checks.\n"; "Type `exclude FORCE permanent <ADDRESS>*' to exclude without performing safety checks.\n";
printf("%s", errorStr.c_str()); printf("%s", errorStr.c_str());
return true; return true;
} }

View File

@ -3137,9 +3137,6 @@ ACTOR Future<Void> waitForAllDataRemoved( Database cx, UID serverID, Version add
//we cannot remove a server immediately after adding it, because a perfectly timed master recovery could cause us to not store the mutations sent to the short lived storage server. //we cannot remove a server immediately after adding it, because a perfectly timed master recovery could cause us to not store the mutations sent to the short lived storage server.
if(ver > addedVersion + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) { if(ver > addedVersion + SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS) {
bool canRemove = wait( canRemoveStorageServer( &tr, serverID ) ); bool canRemove = wait( canRemoveStorageServer( &tr, serverID ) );
TraceEvent("FailedServerDataRemoved")
.detail("CanRemove", canRemove)
.detail("NumShards", teams->shardsAffectedByTeamFailure->getNumberOfShards(serverID));
// Current implementation of server erasure is sort of a hack that sets # shards to 0 // Current implementation of server erasure is sort of a hack that sets # shards to 0
// Defensive check for negative values instead of just 0 // Defensive check for negative values instead of just 0
if (canRemove && teams->shardsAffectedByTeamFailure->getNumberOfShards(serverID) <= 0) { if (canRemove && teams->shardsAffectedByTeamFailure->getNumberOfShards(serverID) <= 0) {
@ -3323,9 +3320,9 @@ ACTOR Future<Void> storageServerTracker(
status.isUndesired = true; status.isUndesired = true;
status.isWrongConfiguration = true; status.isWrongConfiguration = true;
if (self->failedServers.find(addr) != self->failedServers.end() || self->failedServers.find(ipaddr) != self->failedServers.end()) { if (self->failedServers.find(addr) != self->failedServers.end() || self->failedServers.find(ipaddr) != self->failedServers.end()) {
TraceEvent("FailedServerRemoveKeys") TraceEvent(SevWarn, "FailedServerRemoveKeys", self->distributorId)
.detail("Address", addr.toString()) .detail("Address", addr.toString())
.detail("ServerID", server->id); .detail("ServerID", server->id);
wait(removeKeysFromFailedServer(cx, server->id, self->lock)); wait(removeKeysFromFailedServer(cx, server->id, self->lock));
self->shardsAffectedByTeamFailure->eraseServer(server->id); self->shardsAffectedByTeamFailure->eraseServer(server->id);
} }

View File

@ -923,55 +923,55 @@ ACTOR Future<Void> removeStorageServer( Database cx, UID serverID, MoveKeysLock
} }
ACTOR Future<Void> removeKeysFromFailedServer(Database cx, UID serverID, MoveKeysLock lock) { ACTOR Future<Void> removeKeysFromFailedServer(Database cx, UID serverID, MoveKeysLock lock) {
state Transaction tr( cx ); state Key begin = allKeys.begin;
loop { // Multi-transactional removal in case of large number of shards, concern in violating 5s transaction limit
try { while (begin < allKeys.end) {
tr.info.taskID = TaskPriority::MoveKeys; state Transaction tr(cx);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); loop {
wait( checkMoveKeysLock(&tr, lock) ); try {
TraceEvent("RemoveKeysFromFailedServerLocked").detail("ServerID", serverID).detail("Version", tr.getReadVersion().get()); tr.info.taskID = TaskPriority::MoveKeys;
// Get all values of keyServers and remove serverID from every occurrence tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
// FIXME: Very inefficient going over every entry in keyServers, concern in violating 5s transaction limit wait(checkMoveKeysLock(&tr, lock));
// No shortcut because keyServers and serverKeys are not guaranteed same shard boundaries TraceEvent("RemoveKeysFromFailedServerLocked")
state Standalone<RangeResultRef> keyServers = wait( krmGetRanges(&tr, keyServersPrefix, allKeys) ); .detail("ServerID", serverID)
state KeyValueRef* it = keyServers.begin(); .detail("Version", tr.getReadVersion().get())
for ( ; it != keyServers.end() ; ++it) { .detail("Begin", begin);
state vector<UID> src; // Get all values of keyServers and remove serverID from every occurrence
state vector<UID> dest; // Very inefficient going over every entry in keyServers
decodeKeyServersValue(it->value, src, dest); // No shortcut because keyServers and serverKeys are not guaranteed same shard boundaries
TraceEvent("FailedServerCheckpoint1.0") state Standalone<RangeResultRef> keyServers =
.detail("Key", keyServersKey(it->key)) wait(krmGetRanges(&tr, keyServersPrefix, KeyRangeRef(begin, allKeys.end),
.detail("SrcSize", src.size()) SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT, SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES));
.detail("Src", describe(src)) for (auto it : keyServers) {
.detail("DestSize", dest.size()) vector<UID> src;
.detail("Dest", describe(dest)); vector<UID> dest;
decodeKeyServersValue(it.value, src, dest);
// // The failed server is not present // The failed server is not present
// if (std::find(src.begin(), src.end(), serverID) == src.end() && std::find(dest.begin(), dest.end(), serverID) == dest.end() ) { if (std::find(src.begin(), src.end(), serverID) == src.end() &&
// continue; std::find(dest.begin(), dest.end(), serverID) == dest.end()) {
// } continue;
}
// Update the vectors to remove failed server then set the value again // Update the vectors to remove failed server then set the value again
// Dest is usually empty, but keep this in case there is parallel data movement // Dest is usually empty, but keep this in case there is parallel data movement
src.erase(std::remove(src.begin(), src.end(), serverID), src.end()); src.erase(std::remove(src.begin(), src.end(), serverID), src.end());
dest.erase(std::remove(dest.begin(), dest.end(), serverID), dest.end()); dest.erase(std::remove(dest.begin(), dest.end(), serverID), dest.end());
TraceEvent("FailedServerCheckpoint1.1") tr.set(keyServersKey(it.key), keyServersValue(src, dest));
.detail("Key", keyServersKey(it->key)) }
.detail("SrcSize", src.size())
.detail("Src", describe(src)) // Set entire range for our serverID in serverKeys keyspace to false to signal erasure
.detail("DestSize", dest.size()) wait(krmSetRangeCoalescing(&tr, serverKeysPrefixFor(serverID), allKeys, allKeys, serverKeysFalse));
.detail("Dest", describe(dest));; wait(tr.commit());
tr.set(keyServersKey(it->key), keyServersValue(src, dest)); // Update beginning of next iteration's range
begin = keyServers.end()[-1].key;
break;
} catch (Error& e) {
wait(tr.onError(e));
} }
// Set entire range for our serverID in serverKeys keyspace to false to signal erasure
wait( krmSetRangeCoalescing( &tr, serverKeysPrefixFor(serverID), allKeys, allKeys, serverKeysFalse) );
wait( tr.commit() );
return Void();
} catch (Error& e) {
wait( tr.onError(e) );
} }
} }
return Void();
} }
ACTOR Future<Void> moveKeys( ACTOR Future<Void> moveKeys(