Introduced serverKeysTrueEmptyRange to indicate the SS that the new

shard is empty.
This commit is contained in:
helium 2021-09-09 14:15:07 -07:00
parent 8e0b572a18
commit 2ea055371b
6 changed files with 53 additions and 15 deletions

View File

@ -275,7 +275,7 @@ std::pair<vector<std::pair<UID, NetworkAddress>>, vector<std::pair<UID, NetworkA
const KeyRef serverKeysPrefix = LiteralStringRef("\xff/serverKeys/"); const KeyRef serverKeysPrefix = LiteralStringRef("\xff/serverKeys/");
const ValueRef serverKeysTrue = LiteralStringRef("1"), // compatible with what was serverKeysTrue const ValueRef serverKeysTrue = LiteralStringRef("1"), // compatible with what was serverKeysTrue
serverKeysFalse; serverKeysTrueEmptyRange = LiteralStringRef("3"), serverKeysFalse;
const Key serverKeysKey(UID serverID, const KeyRef& key) { const Key serverKeysKey(UID serverID, const KeyRef& key) {
BinaryWriter wr(Unversioned()); BinaryWriter wr(Unversioned());
@ -299,7 +299,7 @@ UID serverKeysDecodeServer(const KeyRef& key) {
return server_id; return server_id;
} }
bool serverHasKey(ValueRef storedValue) { bool serverHasKey(ValueRef storedValue) {
return storedValue == serverKeysTrue; return storedValue == serverKeysTrue || storedValue == serverKeysTrueEmptyRange;
} }
const KeyRef cacheKeysPrefix = LiteralStringRef("\xff\x02/cacheKeys/"); const KeyRef cacheKeysPrefix = LiteralStringRef("\xff\x02/cacheKeys/");

View File

@ -89,7 +89,7 @@ void decodeStorageCacheValue(const ValueRef& value, std::vector<uint16_t>& serve
// as the key, the value indicates whether the shard does or does not exist on the server. // as the key, the value indicates whether the shard does or does not exist on the server.
// These values can be changed as data movement occurs. // These values can be changed as data movement occurs.
extern const KeyRef serverKeysPrefix; extern const KeyRef serverKeysPrefix;
extern const ValueRef serverKeysTrue, serverKeysFalse; extern const ValueRef serverKeysTrue, serverKeysTrueEmptyRange, serverKeysFalse;
const Key serverKeysKey(UID serverID, const KeyRef& keys); const Key serverKeysKey(UID serverID, const KeyRef& keys);
const Key serverKeysPrefixFor(UID serverID); const Key serverKeysPrefixFor(UID serverID);
UID serverKeysDecodeServer(const KeyRef& key); UID serverKeysDecodeServer(const KeyRef& key);

View File

@ -35,7 +35,11 @@ Reference<StorageInfo> getStorageInfo(UID id,
auto cacheItr = storageCache->find(id); auto cacheItr = storageCache->find(id);
if (cacheItr == storageCache->end()) { if (cacheItr == storageCache->end()) {
storageInfo = makeReference<StorageInfo>(); storageInfo = makeReference<StorageInfo>();
storageInfo->tag = decodeServerTagValue(txnStateStore->readValue(serverTagKeyFor(id)).get().get()); Optional<Value> tag = txnStateStore->readValue(serverTagKeyFor(id)).get();
TraceEvent(SevWarn, "HeLiuDebuggetStorageInfo")
.detail("SSID", id)
.detail("Tag", tag.present() ? tag.get().toString() : "");
storageInfo->tag = decodeServerTagValue(tag.get());
storageInfo->interf = decodeServerListValue(txnStateStore->readValue(serverListKeyFor(id)).get().get()); storageInfo->interf = decodeServerListValue(txnStateStore->readValue(serverListKeyFor(id)).get().get());
(*storageCache)[id] = storageInfo; (*storageCache)[id] = storageInfo;
} else { } else {

View File

@ -1916,6 +1916,7 @@ ACTOR Future<Void> processCompleteTransactionStateRequest(TransactionStateResolv
updateTagInfo(src, info.tags, info.src_info); updateTagInfo(src, info.tags, info.src_info);
info.dest_info.clear(); info.dest_info.clear();
TraceEvent(SevWarn, "HeLiuDebugUpdateDest").detail("Dest", describe(dest));
updateTagInfo(dest, info.tags, info.dest_info); updateTagInfo(dest, info.tags, info.dest_info);
uniquify(info.tags); uniquify(info.tags);

View File

@ -1259,6 +1259,7 @@ ACTOR Future<Void> removeStorageServer(Database cx,
allLocalities.insert(dcId_locality[decodeTLogDatacentersKey(it.key)]); allLocalities.insert(dcId_locality[decodeTLogDatacentersKey(it.key)]);
} }
// If the storage server is in an invalid DC, remove the DC?
if (locality >= 0 && !allLocalities.count(locality)) { if (locality >= 0 && !allLocalities.count(locality)) {
for (auto& it : fTagLocalities.get()) { for (auto& it : fTagLocalities.get()) {
if (locality == decodeTagLocalityListValue(it.value)) { if (locality == decodeTagLocalityListValue(it.value)) {
@ -1305,7 +1306,11 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
UID serverID, UID serverID,
MoveKeysLock lock, MoveKeysLock lock,
const DDEnabledState* ddEnabledState) { const DDEnabledState* ddEnabledState) {
state std::vector<UID> targetTeam;
state Key begin = allKeys.begin; state Key begin = allKeys.begin;
state vector<UID> src;
state vector<UID> dest;
// Multi-transactional removal in case of large number of shards, concern in violating 5s transaction limit // Multi-transactional removal in case of large number of shards, concern in violating 5s transaction limit
while (begin < allKeys.end) { while (begin < allKeys.end) {
state Transaction tr(cx); state Transaction tr(cx);
@ -1328,12 +1333,23 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
KeyRangeRef(begin, allKeys.end), KeyRangeRef(begin, allKeys.end),
SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT, SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT,
SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES)); SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES));
for (int i = 0; i < keyServers.size() && targetTeam.empty(); ++i) {
decodeKeyServersValue(UIDtoTagMap, keyServers[i].value, src, dest);
if (std::find(dest.begin(), dest.end(), serverID) == dest.end()) {
targetTeam.insert(targetTeam.end(), dest.begin(), dest.end());
}
if (!targetTeam.empty()) {
break;
}
if (std::find(src.begin(), src.end(), serverID) == src.end()) {
targetTeam.insert(targetTeam.end(), src.begin(), src.end());
}
}
state KeyRange currentKeys = KeyRangeRef(begin, keyServers.end()[-1].key); state KeyRange currentKeys = KeyRangeRef(begin, keyServers.end()[-1].key);
state int i = 0; state int i = 0;
for (; i < keyServers.size() - 1; ++i) { for (; i < keyServers.size() - 1; ++i) {
state vector<UID> src;
state vector<UID> dest;
state std::vector<UID> serversToRemoveRange({ serverID });
state KeyValueRef it = keyServers[i]; state KeyValueRef it = keyServers[i];
decodeKeyServersValue(UIDtoTagMap, it.value, src, dest); decodeKeyServersValue(UIDtoTagMap, it.value, src, dest);
@ -1368,15 +1384,29 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
// Remove the shard from keyServers/ if the src list is empty, and also remove the shard from all // Remove the shard from keyServers/ if the src list is empty, and also remove the shard from all
// dest servers. // dest servers.
if (src.empty()) { if (src.empty()) {
ASSERT(!targetTeam.empty());
tr.set(keyServersKey(it.key), keyServersValue(UIDtoTagMap, targetTeam, {}));
vector<Future<Void>> actors;
for (const UID& id : dest) {
actors.push_back(krmSetRangeCoalescing(&tr,
serverKeysPrefixFor(id),
KeyRangeRef(it.key, keyServers[i + 1].key),
allKeys,
serverKeysFalse));
}
// Update serverKeys to include keys.
for (const UID& id : targetTeam) {
actors.push_back(krmSetRangeCoalescing(&tr,
serverKeysPrefixFor(id),
KeyRangeRef(it.key, keyServers[i + 1].key),
allKeys,
serverKeysTrueEmptyRange));
}
TraceEvent(SevWarn, "FailedServerRemoveRange", serverID) TraceEvent(SevWarn, "FailedServerRemoveRange", serverID)
.detail("Key", it.key) .detail("Key", it.key)
.detail("ValueDest", describe(dest)); .detail("OldDest", describe(dest))
serversToRemoveRange.insert(serversToRemoveRange.end(), dest.begin(), dest.end()); .detail("NewTeam", describe(targetTeam));
wait(krmSetRangeCoalescing(&tr, waitForAll(actors);
keyServersPrefix,
KeyRangeRef(it.key, keyServers[i + 1].key),
allKeys,
keyServers[i + 1].value));
} else { } else {
TraceEvent(SevDebug, "FailedServerSetKey", serverID) TraceEvent(SevDebug, "FailedServerSetKey", serverID)
.detail("Key", it.key) .detail("Key", it.key)

View File

@ -3470,6 +3470,7 @@ private:
KeyRef startKey; KeyRef startKey;
bool nowAssigned; bool nowAssigned;
bool emptyRange;
bool processedStartKey; bool processedStartKey;
KeyRef cacheStartKey; KeyRef cacheStartKey;
@ -3492,7 +3493,8 @@ private:
// The changes for version have already been received (and are being processed now). We need to fetch // The changes for version have already been received (and are being processed now). We need to fetch
// the data for change.version-1 (changes from versions < change.version) // the data for change.version-1 (changes from versions < change.version)
changeServerKeys(data, keys, nowAssigned, currentVersion - 1, CSK_UPDATE); const Version shardVersion = (emptyRange && nowAssigned) ? 1 : currentVersion - 1;
changeServerKeys(data, keys, nowAssigned, shardVersion, CSK_UPDATE);
} }
processedStartKey = false; processedStartKey = false;
@ -3502,6 +3504,7 @@ private:
// keys // keys
startKey = m.param1; startKey = m.param1;
nowAssigned = m.param2 != serverKeysFalse; nowAssigned = m.param2 != serverKeysFalse;
emptyRange = m.param2 == serverKeysTrueEmptyRange;
processedStartKey = true; processedStartKey = true;
} else if (m.type == MutationRef::SetValue && m.param1 == lastEpochEndPrivateKey) { } else if (m.type == MutationRef::SetValue && m.param1 == lastEpochEndPrivateKey) {
// lastEpochEnd transactions are guaranteed by the master to be alone in their own batch (version) // lastEpochEnd transactions are guaranteed by the master to be alone in their own batch (version)