From a2114550e07a27d85a5cb37257f79cb2819b0c67 Mon Sep 17 00:00:00 2001 From: Xiaoge Su Date: Sun, 22 Aug 2021 15:43:09 -0700 Subject: [PATCH] Refactor ApplyMetadataMutation for better readability --- fdbserver/ApplyMetadataMutation.cpp | 1603 +++++++++++++++------------ fdbserver/ApplyMetadataMutation.h | 19 +- 2 files changed, 907 insertions(+), 715 deletions(-) diff --git a/fdbserver/ApplyMetadataMutation.cpp b/fdbserver/ApplyMetadataMutation.cpp index a3e70e123a..852b11a47b 100644 --- a/fdbserver/ApplyMetadataMutation.cpp +++ b/fdbserver/ApplyMetadataMutation.cpp @@ -18,15 +18,15 @@ * limitations under the License. */ -#include "fdbclient/MutationList.h" -#include "fdbclient/KeyBackedTypes.h" // for key backed map codecs for tss mapping -#include "fdbclient/SystemData.h" #include "fdbclient/BackupAgent.actor.h" +#include "fdbclient/KeyBackedTypes.h" // for key backed map codecs for tss mapping +#include "fdbclient/MutationList.h" #include "fdbclient/Notified.h" +#include "fdbclient/SystemData.h" #include "fdbserver/ApplyMetadataMutation.h" #include "fdbserver/IKeyValueStore.h" -#include "fdbserver/LogSystem.h" #include "fdbserver/LogProtocolMessage.h" +#include "fdbserver/LogSystem.h" Reference getStorageInfo(UID id, std::map>* storageCache, @@ -43,685 +43,828 @@ Reference getStorageInfo(UID id, } return storageInfo; } +namespace { + +inline bool isSystemKey(KeyRef key) { + return key.size() && key[0] == systemKeys.begin[0]; +} + +// It is incredibly important that any modifications to txnStateStore are done in such a way that the same operations +// will be done on all commit proxies at the same time. Otherwise, the data stored in txnStateStore will become +// corrupted. +class ApplyMetadataMutationsImpl { +private: + // The following variables are incoming parameters + + const SpanID& spanContext; + + const UID& dbgid; + + Arena& arena; + + const VectorRef& mutations; + + // Transaction KV store + IKeyValueStore* txnStateStore; + + // non-null if these mutations were part of a new commit handled by this commit proxy + LogPushData* toCommit = nullptr; + + // Flag indicates if the configure is changed + bool& confChange; + + Reference logSystem = Reference(); + Version popVersion = 0; + KeyRangeMap>* vecBackupKeys = nullptr; + KeyRangeMap* keyInfo = nullptr; + KeyRangeMap* cacheInfo = nullptr; + std::map* uid_applyMutationsData = nullptr; + RequestStream commit = RequestStream(); + Database cx = Database(); + NotifiedVersion* commitVersion = nullptr; + std::map>* storageCache = nullptr; + std::map* tag_popped = nullptr; + std::unordered_map* tssMapping = nullptr; + + // true if the mutations were already written to the txnStateStore as part of recovery + bool initialCommit = false; + +private: + // The following variables are used internally -// It is incredibly important that any modifications to txnStateStore are done in such a way that -// the same operations will be done on all commit proxies at the same time. Otherwise, the data -// stored in txnStateStore will become corrupted. -void applyMetadataMutations(SpanID const& spanContext, - UID const& dbgid, - Arena& arena, - VectorRef const& mutations, - IKeyValueStore* txnStateStore, - LogPushData* toCommit, // non-null if these mutations were part of a new commit handled by this commit proxy - bool& confChange, - Reference logSystem, - Version popVersion, - KeyRangeMap>* vecBackupKeys, - KeyRangeMap* keyInfo, - KeyRangeMap* cacheInfo, - std::map* uid_applyMutationsData, - RequestStream commit, - Database cx, - NotifiedVersion* commitVersion, - std::map>* storageCache, - std::map* tag_popped, - std::unordered_map* tssMapping, - bool initialCommit // true if the mutations were already written to the txnStateStore as part of recovery -) { - // std::map> cacheRangeInfo; std::map cachedRangeInfo; // Testing Storage Server removal (clearing serverTagKey) needs to read tss server list value to determine it is a // tss + find partner's tag to send the private mutation. Since the removeStorageServer transaction clears both the // storage list and server tag, we have to enforce ordering, proccessing the server tag first, and postpone the // server list clear until the end; - // Similarly, the TSS mapping change key needs to read the server list at the end of the commit std::vector tssServerListToRemove; + + // Similar to tssServerListToRemove, the TSS mapping change key needs to read the server list at the end of the + // commit std::vector> tssMappingToAdd; - for (auto const& m : mutations) { - //TraceEvent("MetadataMutation", dbgid).detail("M", m.toString()); +private: + bool dummyConfChange = false; + +private: + void checkSetKeyServersPrefix(MutationRef m) { + if (!m.param1.startsWith(keyServersPrefix)) { + return; + } + + if (!initialCommit) + txnStateStore->set(KeyValueRef(m.param1, m.param2)); + + if (!keyInfo) { + return; + } + KeyRef k = m.param1.removePrefix(keyServersPrefix); + if (k == allKeys.end) { + return; + } + + KeyRef end = keyInfo->rangeContaining(k).end(); + KeyRangeRef insertRange(k, end); + vector src, dest; + // txnStateStore is always an in-memory KVS, and must always be recovered before + // applyMetadataMutations is called, so a wait here should never be needed. + Future fResult = txnStateStore->readRange(serverTagKeys); + decodeKeyServersValue(fResult.get(), m.param2, src, dest); + + ASSERT(storageCache); + ServerCacheInfo info; + info.tags.reserve(src.size() + dest.size()); + info.src_info.reserve(src.size()); + info.dest_info.reserve(dest.size()); + + for (const auto& id : src) { + auto storageInfo = getStorageInfo(id, storageCache, txnStateStore); + ASSERT(!storageInfo->interf.isTss()); + ASSERT(storageInfo->tag != invalidTag); + info.tags.push_back(storageInfo->tag); + info.src_info.push_back(storageInfo); + } + for (const auto& id : dest) { + auto storageInfo = getStorageInfo(id, storageCache, txnStateStore); + ASSERT(!storageInfo->interf.isTss()); + ASSERT(storageInfo->tag != invalidTag); + info.tags.push_back(storageInfo->tag); + info.dest_info.push_back(storageInfo); + } + uniquify(info.tags); + keyInfo->insert(insertRange, info); + } + + void checkSetServerKeysPrefix(MutationRef m) { + if (!m.param1.startsWith(serverKeysPrefix)) { + return; + } if (toCommit) { - toCommit->addTransactionInfo(spanContext); + Tag tag = decodeServerTagValue( + txnStateStore->readValue(serverTagKeyFor(serverKeysDecodeServer(m.param1))).get().get()); + MutationRef privatized = m; + privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); + TraceEvent(SevDebug, "SendingPrivateMutation", dbgid) + .detail("Original", m) + .detail("Privatized", privatized) + .detail("Server", serverKeysDecodeServer(m.param1)) + .detail("TagKey", serverTagKeyFor(serverKeysDecodeServer(m.param1))) + .detail("Tag", tag.toString()); + + toCommit->addTag(tag); + toCommit->writeTypedMessage(privatized); + } + } + + void checkSetServerTagsPrefix(MutationRef m) { + if (!m.param1.startsWith(serverTagPrefix)) { + return; + } + UID id = decodeServerTagKey(m.param1); + Tag tag = decodeServerTagValue(m.param2); + + if (toCommit) { + MutationRef privatized = m; + privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); + TraceEvent("ServerTag", dbgid).detail("Server", id).detail("Tag", tag.toString()); + + toCommit->addTag(tag); + toCommit->writeTypedMessage(LogProtocolMessage(), true); + toCommit->addTag(tag); + toCommit->writeTypedMessage(privatized); + } + if (!initialCommit) { + txnStateStore->set(KeyValueRef(m.param1, m.param2)); + if (storageCache) { + auto cacheItr = storageCache->find(id); + if (cacheItr == storageCache->end()) { + Reference storageInfo = makeReference(); + storageInfo->tag = tag; + Optional interfKey = txnStateStore->readValue(serverListKeyFor(id)).get(); + if (interfKey.present()) { + storageInfo->interf = decodeServerListValue(interfKey.get()); + } + (*storageCache)[id] = storageInfo; + } else { + cacheItr->second->tag = tag; + // These tag vectors will be repopulated by the proxy when it detects their sizes are 0. + for (auto& it : keyInfo->ranges()) { + it.value().tags.clear(); + } + } + } + } + } + + void checkSetStorageCachePrefix(MutationRef m) { + if (!m.param1.startsWith(storageCachePrefix)) + return; + if (cacheInfo) { + KeyRef k = m.param1.removePrefix(storageCachePrefix); + + // Create a private mutation for storage servers + // This is done to make the storage servers aware of the cached key-ranges + if (toCommit) { + MutationRef privatized = m; + privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); + //TraceEvent(SevDebug, "SendingPrivateMutation", dbgid).detail("Original", m.toString()).detail("Privatized", privatized.toString()); + cachedRangeInfo[k] = privatized; + } + if (k != allKeys.end) { + KeyRef end = cacheInfo->rangeContaining(k).end(); + vector serverIndices; + decodeStorageCacheValue(m.param2, serverIndices); + cacheInfo->insert(KeyRangeRef(k, end), serverIndices.size() > 0); + } + } + if (!initialCommit) + txnStateStore->set(KeyValueRef(m.param1, m.param2)); + } + + void checkSetCacheKeysPrefix(MutationRef m) { + if (!m.param1.startsWith(cacheKeysPrefix) || toCommit == nullptr) { + return; } - if (m.param1.size() && m.param1[0] == systemKeys.begin[0] && m.type == MutationRef::SetValue) { - if (m.param1.startsWith(keyServersPrefix)) { - if (keyInfo) { - KeyRef k = m.param1.removePrefix(keyServersPrefix); - if (k != allKeys.end) { - KeyRef end = keyInfo->rangeContaining(k).end(); - KeyRangeRef insertRange(k, end); - vector src, dest; - // txnStateStore is always an in-memory KVS, and must always be recovered before - // applyMetadataMutations is called, so a wait here should never be needed. - Future fResult = txnStateStore->readRange(serverTagKeys); - decodeKeyServersValue(fResult.get(), m.param2, src, dest); + // Create a private mutation for cache servers + // This is done to make the cache servers aware of the cached key-ranges + MutationRef privatized = m; + privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); + toCommit->addTag(cacheTag); + toCommit->writeTypedMessage(privatized); + } - ASSERT(storageCache); - ServerCacheInfo info; - info.tags.reserve(src.size() + dest.size()); - info.src_info.reserve(src.size()); - info.dest_info.reserve(dest.size()); - - for (const auto& id : src) { - auto storageInfo = getStorageInfo(id, storageCache, txnStateStore); - ASSERT(!storageInfo->interf.isTss()); - ASSERT(storageInfo->tag != invalidTag); - info.tags.push_back(storageInfo->tag); - info.src_info.push_back(storageInfo); - } - for (const auto& id : dest) { - auto storageInfo = getStorageInfo(id, storageCache, txnStateStore); - ASSERT(!storageInfo->interf.isTss()); - ASSERT(storageInfo->tag != invalidTag); - info.tags.push_back(storageInfo->tag); - info.dest_info.push_back(storageInfo); - } - uniquify(info.tags); - keyInfo->insert(insertRange, info); - } - } - if (!initialCommit) - txnStateStore->set(KeyValueRef(m.param1, m.param2)); - } else if (m.param1.startsWith(serverKeysPrefix)) { - if (toCommit) { - Tag tag = decodeServerTagValue( - txnStateStore->readValue(serverTagKeyFor(serverKeysDecodeServer(m.param1))).get().get()); - MutationRef privatized = m; - privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); - TraceEvent(SevDebug, "SendingPrivateMutation", dbgid) - .detail("Original", m) - .detail("Privatized", privatized) - .detail("Server", serverKeysDecodeServer(m.param1)) - .detail("TagKey", serverTagKeyFor(serverKeysDecodeServer(m.param1))) - .detail("Tag", tag.toString()); - - toCommit->addTag(tag); - toCommit->writeTypedMessage(privatized); - } - } else if (m.param1.startsWith(serverTagPrefix)) { - UID id = decodeServerTagKey(m.param1); - Tag tag = decodeServerTagValue(m.param2); - - if (toCommit) { - MutationRef privatized = m; - privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); - TraceEvent("ServerTag", dbgid).detail("Server", id).detail("Tag", tag.toString()); - - toCommit->addTag(tag); - toCommit->writeTypedMessage(LogProtocolMessage(), true); - toCommit->addTag(tag); - toCommit->writeTypedMessage(privatized); - } - if (!initialCommit) { - txnStateStore->set(KeyValueRef(m.param1, m.param2)); - if (storageCache) { - auto cacheItr = storageCache->find(id); - if (cacheItr == storageCache->end()) { - Reference storageInfo = makeReference(); - storageInfo->tag = tag; - Optional interfKey = txnStateStore->readValue(serverListKeyFor(id)).get(); - if (interfKey.present()) { - storageInfo->interf = decodeServerListValue(interfKey.get()); - } - (*storageCache)[id] = storageInfo; - } else { - cacheItr->second->tag = tag; - // These tag vectors will be repopulated by the proxy when it detects their sizes are 0. - for (auto& it : keyInfo->ranges()) { - it.value().tags.clear(); - } - } - } - } - } else if (m.param1.startsWith(storageCachePrefix)) { - if (cacheInfo) { - KeyRef k = m.param1.removePrefix(storageCachePrefix); - - // Create a private mutation for storage servers - // This is done to make the storage servers aware of the cached key-ranges - if (toCommit) { - MutationRef privatized = m; - privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); - //TraceEvent(SevDebug, "SendingPrivateMutation", dbgid).detail("Original", m.toString()).detail("Privatized", privatized.toString()); - cachedRangeInfo[k] = privatized; - } - if (k != allKeys.end) { - KeyRef end = cacheInfo->rangeContaining(k).end(); - vector serverIndices; - decodeStorageCacheValue(m.param2, serverIndices); - cacheInfo->insert(KeyRangeRef(k, end), serverIndices.size() > 0); - } - } - if (!initialCommit) - txnStateStore->set(KeyValueRef(m.param1, m.param2)); - } else if (m.param1.startsWith(cacheKeysPrefix)) { - // Create a private mutation for cache servers - // This is done to make the cache servers aware of the cached key-ranges - if (toCommit) { - MutationRef privatized = m; - privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); - //TraceEvent(SevDebug, "SendingPrivateMutation", dbgid).detail("Original", m.toString()).detail("Privatized", privatized.toString()); - toCommit->addTag(cacheTag); - toCommit->writeTypedMessage(privatized); - } - } else if (m.param1.startsWith(configKeysPrefix) || m.param1 == coordinatorsKey) { - if (Optional(m.param2) != - txnStateStore->readValue(m.param1) - .get() - .castTo()) { // FIXME: Make this check more specific, here or by reading - // configuration whenever there is a change - if ((!m.param1.startsWith(excludedServersPrefix) && m.param1 != excludedServersVersionKey) && - (!m.param1.startsWith(failedServersPrefix) && m.param1 != failedServersVersionKey) && - (!m.param1.startsWith(excludedLocalityPrefix) && m.param1 != excludedLocalityVersionKey) && - (!m.param1.startsWith(failedLocalityPrefix) && m.param1 != failedLocalityVersionKey)) { - auto t = txnStateStore->readValue(m.param1).get(); - TraceEvent("MutationRequiresRestart", dbgid) - .detail("M", m) - .detail("PrevValue", t.orDefault("(none)"_sr)) - .detail("ToCommit", toCommit != nullptr); - confChange = true; - } - } - if (!initialCommit) - txnStateStore->set(KeyValueRef(m.param1, m.param2)); - } else if (m.param1.startsWith(serverListPrefix)) { - if (!initialCommit) { - txnStateStore->set(KeyValueRef(m.param1, m.param2)); - if (storageCache) { - UID id = decodeServerListKey(m.param1); - StorageServerInterface interf = decodeServerListValue(m.param2); - - auto cacheItr = storageCache->find(id); - if (cacheItr == storageCache->end()) { - Reference storageInfo = makeReference(); - storageInfo->interf = interf; - Optional tagKey = txnStateStore->readValue(serverTagKeyFor(id)).get(); - if (tagKey.present()) { - storageInfo->tag = decodeServerTagValue(tagKey.get()); - } - (*storageCache)[id] = storageInfo; - } else { - cacheItr->second->interf = interf; - } - } - } - } else if (m.param1.startsWith(tssMappingKeys.begin)) { - // Normally uses key backed map, so have to use same unpacking code here. - UID ssId = Codec::unpack(Tuple::unpack(m.param1.removePrefix(tssMappingKeys.begin))); - UID tssId = Codec::unpack(Tuple::unpack(m.param2)); - if (!initialCommit) { - txnStateStore->set(KeyValueRef(m.param1, m.param2)); - } - if (tssMapping) { - tssMappingToAdd.push_back(std::pair(ssId, tssId)); - } - - if (toCommit) { - // send private mutation to SS that it now has a TSS pair - MutationRef privatized = m; - privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); - - Optional tagV = txnStateStore->readValue(serverTagKeyFor(ssId)).get(); - if (tagV.present()) { - toCommit->addTag(decodeServerTagValue(tagV.get())); - toCommit->writeTypedMessage(privatized); - } - } - } else if (m.param1.startsWith(tssQuarantineKeys.begin)) { - if (!initialCommit) { - txnStateStore->set(KeyValueRef(m.param1, m.param2)); - - if (toCommit) { - UID tssId = decodeTssQuarantineKey(m.param1); - Optional ssiV = txnStateStore->readValue(serverListKeyFor(tssId)).get(); - if (ssiV.present()) { - StorageServerInterface ssi = decodeServerListValue(ssiV.get()); - if (ssi.isTss()) { - Optional tagV = - txnStateStore->readValue(serverTagKeyFor(ssi.tssPairID.get())).get(); - if (tagV.present()) { - MutationRef privatized = m; - privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); - toCommit->addTag(decodeServerTagValue(tagV.get())); - toCommit->writeTypedMessage(privatized); - } - } - } - } - } - } else if (m.param1 == databaseLockedKey || m.param1 == metadataVersionKey || - m.param1 == mustContainSystemMutationsKey || - m.param1.startsWith(applyMutationsBeginRange.begin) || - m.param1.startsWith(applyMutationsAddPrefixRange.begin) || - m.param1.startsWith(applyMutationsRemovePrefixRange.begin) || - m.param1.startsWith(tagLocalityListPrefix) || m.param1.startsWith(serverTagHistoryPrefix) || - m.param1.startsWith(testOnlyTxnStateStorePrefixRange.begin)) { - if (!initialCommit) - txnStateStore->set(KeyValueRef(m.param1, m.param2)); - } else if (m.param1.startsWith(applyMutationsEndRange.begin)) { - if (!initialCommit) - txnStateStore->set(KeyValueRef(m.param1, m.param2)); - if (uid_applyMutationsData != nullptr) { - Key uid = m.param1.removePrefix(applyMutationsEndRange.begin); - auto& p = (*uid_applyMutationsData)[uid]; - p.endVersion = BinaryReader::fromStringRef(m.param2, Unversioned()); - if (p.keyVersion == Reference>()) - p.keyVersion = makeReference>(); - if (!p.worker.isValid() || p.worker.isReady()) { - auto addPrefixValue = - txnStateStore->readValue(uid.withPrefix(applyMutationsAddPrefixRange.begin)).get(); - auto removePrefixValue = - txnStateStore->readValue(uid.withPrefix(applyMutationsRemovePrefixRange.begin)).get(); - auto beginValue = - txnStateStore->readValue(uid.withPrefix(applyMutationsBeginRange.begin)).get(); - p.worker = applyMutations( - cx, - uid, - addPrefixValue.present() ? addPrefixValue.get() : Key(), - removePrefixValue.present() ? removePrefixValue.get() : Key(), - beginValue.present() ? BinaryReader::fromStringRef(beginValue.get(), Unversioned()) - : 0, - &p.endVersion, - commit, - commitVersion, - p.keyVersion); - } - } - } else if (m.param1.startsWith(applyMutationsKeyVersionMapRange.begin)) { - if (!initialCommit) - txnStateStore->set(KeyValueRef(m.param1, m.param2)); - if (uid_applyMutationsData != nullptr) { - if (m.param1.size() >= applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID)) { - Key uid = m.param1.substr(applyMutationsKeyVersionMapRange.begin.size(), sizeof(UID)); - Key k = m.param1.substr(applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID)); - auto& p = (*uid_applyMutationsData)[uid]; - if (p.keyVersion == Reference>()) - p.keyVersion = makeReference>(); - p.keyVersion->rawInsert(k, BinaryReader::fromStringRef(m.param2, Unversioned())); - } - } - } else if (m.param1.startsWith(logRangesRange.begin)) { - if (!initialCommit) - txnStateStore->set(KeyValueRef(m.param1, m.param2)); - if (vecBackupKeys) { - Key logDestination; - KeyRef logRangeBegin = logRangesDecodeKey(m.param1, nullptr); - Key logRangeEnd = logRangesDecodeValue(m.param2, &logDestination); - - // Insert the logDestination into each range of vecBackupKeys overlapping the decoded range - for (auto& logRange : vecBackupKeys->modify(KeyRangeRef(logRangeBegin, logRangeEnd))) { - logRange->value().insert(logDestination); - } - for (auto& logRange : vecBackupKeys->modify(singleKeyRange(metadataVersionKey))) { - logRange->value().insert(logDestination); - } - - // Log the modification - TraceEvent("LogRangeAdd") - .detail("LogRanges", vecBackupKeys->size()) - .detail("MutationKey", m.param1) - .detail("LogRangeBegin", logRangeBegin) - .detail("LogRangeEnd", logRangeEnd); - } - } else if (m.param1.startsWith(globalKeysPrefix)) { - if (toCommit) { - // Notifies all servers that a Master's server epoch ends - auto allServers = txnStateStore->readRange(serverTagKeys).get(); - std::set allTags; - - if (m.param1 == killStorageKey) { - int8_t safeLocality = BinaryReader::fromStringRef(m.param2, Unversioned()); - for (auto& kv : allServers) { - Tag t = decodeServerTagValue(kv.value); - if (t.locality != safeLocality) { - allTags.insert(t); - } - } - } else { - for (auto& kv : allServers) { - allTags.insert(decodeServerTagValue(kv.value)); - } - } - allTags.insert(cacheTag); - - if (m.param1 == lastEpochEndKey) { - toCommit->addTags(allTags); - toCommit->writeTypedMessage(LogProtocolMessage(), true); - } - - MutationRef privatized = m; - privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); - toCommit->addTags(allTags); - toCommit->writeTypedMessage(privatized); - } - } else if (m.param1 == minRequiredCommitVersionKey) { - Version requested = BinaryReader::fromStringRef(m.param2, Unversioned()); - TraceEvent("MinRequiredCommitVersion", dbgid).detail("Min", requested).detail("Current", popVersion); - if (!initialCommit) - txnStateStore->set(KeyValueRef(m.param1, m.param2)); + void checkSetConfigKeys(MutationRef m) { + if (!m.param1.startsWith(configKeysPrefix) && m.param1 != coordinatorsKey) { + return; + } + if (Optional(m.param2) != + txnStateStore->readValue(m.param1) + .get() + .castTo()) { // FIXME: Make this check more specific, here or by reading + // configuration whenever there is a change + if ((!m.param1.startsWith(excludedServersPrefix) && m.param1 != excludedServersVersionKey) && + (!m.param1.startsWith(failedServersPrefix) && m.param1 != failedServersVersionKey) && + (!m.param1.startsWith(excludedLocalityPrefix) && m.param1 != excludedLocalityVersionKey) && + (!m.param1.startsWith(failedLocalityPrefix) && m.param1 != failedLocalityVersionKey)) { + auto t = txnStateStore->readValue(m.param1).get(); + TraceEvent("MutationRequiresRestart", dbgid) + .detail("M", m) + .detail("PrevValue", t.orDefault("(none)"_sr)) + .detail("ToCommit", toCommit != nullptr); confChange = true; - TEST(true); // Recovering at a higher version. - } else if (m.param1 == writeRecoveryKey) { - TraceEvent("WriteRecoveryKeySet", dbgid).log(); - if (!initialCommit) - txnStateStore->set(KeyValueRef(m.param1, m.param2)); - TEST(true); // Snapshot created, setting writeRecoveryKey in txnStateStore } - } else if (m.param2.size() > 1 && m.param2[0] == systemKeys.begin[0] && m.type == MutationRef::ClearRange) { - KeyRangeRef range(m.param1, m.param2); + } + if (!initialCommit) + txnStateStore->set(KeyValueRef(m.param1, m.param2)); + } - if (keyServersKeys.intersects(range)) { - KeyRangeRef r = range & keyServersKeys; - if (keyInfo) { - KeyRangeRef clearRange(r.begin.removePrefix(keyServersPrefix), - r.end.removePrefix(keyServersPrefix)); - keyInfo->insert(clearRange, - clearRange.begin == StringRef() - ? ServerCacheInfo() - : keyInfo->rangeContainingKeyBefore(clearRange.begin).value()); - } + void checkSetServerListPrefix(MutationRef m) { + if (!m.param1.startsWith(serverListPrefix)) { + return; + } + if (!initialCommit) { + txnStateStore->set(KeyValueRef(m.param1, m.param2)); + if (storageCache) { + UID id = decodeServerListKey(m.param1); + StorageServerInterface interf = decodeServerListValue(m.param2); - if (!initialCommit) - txnStateStore->clear(r); - } - if (configKeys.intersects(range)) { - if (!initialCommit) - txnStateStore->clear(range & configKeys); - if (!excludedServersKeys.contains(range) && !failedServersKeys.contains(range) && - !excludedLocalityKeys.contains(range) && !failedLocalityKeys.contains(range)) { - TraceEvent("MutationRequiresRestart", dbgid).detail("M", m); - confChange = true; - } - } - if (serverListKeys.intersects(range)) { - if (!initialCommit) { - KeyRangeRef rangeToClear = range & serverListKeys; - if (rangeToClear.singleKeyRange()) { - UID id = decodeServerListKey(rangeToClear.begin); - Optional ssiV = txnStateStore->readValue(serverListKeyFor(id)).get(); - if (ssiV.present() && decodeServerListValue(ssiV.get()).isTss()) { - tssServerListToRemove.push_back(rangeToClear); - } else { - txnStateStore->clear(rangeToClear); - } - } else { - txnStateStore->clear(rangeToClear); + auto cacheItr = storageCache->find(id); + if (cacheItr == storageCache->end()) { + Reference storageInfo = makeReference(); + storageInfo->interf = interf; + Optional tagKey = txnStateStore->readValue(serverTagKeyFor(id)).get(); + if (tagKey.present()) { + storageInfo->tag = decodeServerTagValue(tagKey.get()); } + (*storageCache)[id] = storageInfo; + } else { + cacheItr->second->interf = interf; } } - if (tagLocalityListKeys.intersects(range)) { - if (!initialCommit) - txnStateStore->clear(range & tagLocalityListKeys); - } - if (serverTagKeys.intersects(range)) { - // Storage server removal always happens in a separate version from any prior writes (or any subsequent - // reuse of the tag) so we can safely destroy the tag here without any concern about intra-batch - // ordering - if (logSystem && popVersion) { - auto serverKeysCleared = txnStateStore->readRange(range & serverTagKeys) - .get(); // read is expected to be immediately available - for (auto& kv : serverKeysCleared) { - Tag tag = decodeServerTagValue(kv.value); - TraceEvent("ServerTagRemove") - .detail("PopVersion", popVersion) - .detail("Tag", tag.toString()) - .detail("Server", decodeServerTagKey(kv.key)); - logSystem->pop(popVersion, decodeServerTagValue(kv.value)); - (*tag_popped)[tag] = popVersion; - - if (toCommit) { - MutationRef privatized = m; - privatized.param1 = kv.key.withPrefix(systemKeys.begin, arena); - privatized.param2 = keyAfter(kv.key, arena).withPrefix(systemKeys.begin, arena); - - toCommit->addTag(decodeServerTagValue(kv.value)); - toCommit->writeTypedMessage(privatized); - } - } - // Might be a tss removal, which doesn't store a tag there. - // Chained if is a little verbose, but avoids unecessary work - if (toCommit && !initialCommit && !serverKeysCleared.size()) { - KeyRangeRef maybeTssRange = range & serverTagKeys; - if (maybeTssRange.singleKeyRange()) { - UID id = decodeServerTagKey(maybeTssRange.begin); - Optional ssiV = txnStateStore->readValue(serverListKeyFor(id)).get(); - - if (ssiV.present()) { - StorageServerInterface ssi = decodeServerListValue(ssiV.get()); - if (ssi.isTss()) { - Optional tagV = - txnStateStore->readValue(serverTagKeyFor(ssi.tssPairID.get())).get(); - if (tagV.present()) { - MutationRef privatized = m; - privatized.param1 = maybeTssRange.begin.withPrefix(systemKeys.begin, arena); - privatized.param2 = - keyAfter(maybeTssRange.begin, arena).withPrefix(systemKeys.begin, arena); - - toCommit->addTag(decodeServerTagValue(tagV.get())); - toCommit->writeTypedMessage(privatized); - } - } - } - } - } - } - if (!initialCommit) { - KeyRangeRef clearRange = range & serverTagKeys; - txnStateStore->clear(clearRange); - if (storageCache && clearRange.singleKeyRange()) { - storageCache->erase(decodeServerTagKey(clearRange.begin)); - } - } - } - if (serverTagHistoryKeys.intersects(range)) { - // Once a tag has been removed from history we should pop it, since we no longer have a record of the - // tag once it has been removed from history - if (logSystem && popVersion) { - auto serverKeysCleared = txnStateStore->readRange(range & serverTagHistoryKeys) - .get(); // read is expected to be immediately available - for (auto& kv : serverKeysCleared) { - Tag tag = decodeServerTagValue(kv.value); - TraceEvent("ServerTagHistoryRemove") - .detail("PopVersion", popVersion) - .detail("Tag", tag.toString()) - .detail("Version", decodeServerTagHistoryKey(kv.key)); - logSystem->pop(popVersion, tag); - (*tag_popped)[tag] = popVersion; - } - } - if (!initialCommit) - txnStateStore->clear(range & serverTagHistoryKeys); - } - if (tssMappingKeys.intersects(range)) { - KeyRangeRef rangeToClear = range & tssMappingKeys; - ASSERT(rangeToClear.singleKeyRange()); - - // Normally uses key backed map, so have to use same unpacking code here. - UID ssId = Codec::unpack(Tuple::unpack(m.param1.removePrefix(tssMappingKeys.begin))); - if (!initialCommit) { - txnStateStore->clear(rangeToClear); - } - - if (tssMapping) { - tssMapping->erase(ssId); - } - - if (toCommit) { - // send private mutation to SS to notify that it no longer has a tss pair - Optional tagV = txnStateStore->readValue(serverTagKeyFor(ssId)).get(); - if (tagV.present()) { - MutationRef privatized = m; - privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); - toCommit->addTag(decodeServerTagValue(tagV.get())); - toCommit->writeTypedMessage(privatized); - } - } - } - if (tssQuarantineKeys.intersects(range)) { - if (!initialCommit) { - KeyRangeRef rangeToClear = range & tssQuarantineKeys; - ASSERT(rangeToClear.singleKeyRange()); - txnStateStore->clear(rangeToClear); - - if (toCommit) { - UID tssId = decodeTssQuarantineKey(m.param1); - Optional ssiV = txnStateStore->readValue(serverListKeyFor(tssId)).get(); - if (ssiV.present()) { - StorageServerInterface ssi = decodeServerListValue(ssiV.get()); - if (ssi.isTss()) { - Optional tagV = - txnStateStore->readValue(serverTagKeyFor(ssi.tssPairID.get())).get(); - if (tagV.present()) { - MutationRef privatized = m; - privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); - toCommit->addTag(decodeServerTagValue(tagV.get())); - toCommit->writeTypedMessage(privatized); - } - } - } - } - } - } - if (range.contains(coordinatorsKey)) { - if (!initialCommit) - txnStateStore->clear(singleKeyRange(coordinatorsKey)); - } - if (range.contains(databaseLockedKey)) { - if (!initialCommit) - txnStateStore->clear(singleKeyRange(databaseLockedKey)); - } - if (range.contains(metadataVersionKey)) { - if (!initialCommit) - txnStateStore->clear(singleKeyRange(metadataVersionKey)); - } - if (range.contains(mustContainSystemMutationsKey)) { - if (!initialCommit) - txnStateStore->clear(singleKeyRange(mustContainSystemMutationsKey)); - } - if (range.contains(writeRecoveryKey)) { - if (!initialCommit) - txnStateStore->clear(singleKeyRange(writeRecoveryKey)); - } - if (range.intersects(testOnlyTxnStateStorePrefixRange)) { - if (!initialCommit) - txnStateStore->clear(range & testOnlyTxnStateStorePrefixRange); - } - if (range.intersects(applyMutationsEndRange)) { - KeyRangeRef commonEndRange(range & applyMutationsEndRange); - if (!initialCommit) - txnStateStore->clear(commonEndRange); - if (uid_applyMutationsData != nullptr) { - uid_applyMutationsData->erase( - uid_applyMutationsData->lower_bound(m.param1.substr(applyMutationsEndRange.begin.size())), - m.param2 == applyMutationsEndRange.end ? uid_applyMutationsData->end() - : uid_applyMutationsData->lower_bound(m.param2.substr( - applyMutationsEndRange.begin.size()))); - } - } - if (range.intersects(applyMutationsKeyVersionMapRange)) { - KeyRangeRef commonApplyRange(range & applyMutationsKeyVersionMapRange); - if (!initialCommit) - txnStateStore->clear(commonApplyRange); - if (uid_applyMutationsData != nullptr) { - if (m.param1.size() >= applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID) && - m.param2.size() >= applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID)) { - Key uid = m.param1.substr(applyMutationsKeyVersionMapRange.begin.size(), sizeof(UID)); - Key uid2 = m.param2.substr(applyMutationsKeyVersionMapRange.begin.size(), sizeof(UID)); - - if (uid == uid2) { - auto& p = (*uid_applyMutationsData)[uid]; - if (p.keyVersion == Reference>()) - p.keyVersion = makeReference>(); - p.keyVersion->rawErase(KeyRangeRef( - m.param1.substr(applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID)), - m.param2.substr(applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID)))); - } - } - } - } - if (range.intersects(logRangesRange)) { - KeyRangeRef commonLogRange(range & logRangesRange); - - TraceEvent("LogRangeClear") - .detail("RangeBegin", range.begin) - .detail("RangeEnd", range.end) - .detail("IntersectBegin", commonLogRange.begin) - .detail("IntersectEnd", commonLogRange.end); - - // Remove the key range from the vector, if defined - if (vecBackupKeys) { - KeyRef logKeyBegin; - Key logKeyEnd, logDestination; - - // Identify the backup keys being removed - // read is expected to be immediately available - auto logRangesAffected = txnStateStore->readRange(commonLogRange).get(); - - TraceEvent("LogRangeClearBegin").detail("AffectedLogRanges", logRangesAffected.size()); - - // Add the backup name to the backup locations that do not have it - for (auto logRangeAffected : logRangesAffected) { - // Parse the backup key and name - logKeyBegin = logRangesDecodeKey(logRangeAffected.key, nullptr); - - // Decode the log destination and key value - logKeyEnd = logRangesDecodeValue(logRangeAffected.value, &logDestination); - - TraceEvent("LogRangeErase") - .detail("AffectedKey", logRangeAffected.key) - .detail("AffectedValue", logRangeAffected.value) - .detail("LogKeyBegin", logKeyBegin) - .detail("LogKeyEnd", logKeyEnd) - .detail("LogDestination", logDestination); - - // Identify the locations to place the backup key - auto logRanges = vecBackupKeys->modify(KeyRangeRef(logKeyBegin, logKeyEnd)); - - // Remove the log prefix from the ranges which include it - for (auto logRange : logRanges) { - auto& logRangeMap = logRange->value(); - - // Remove the backup name from the range - logRangeMap.erase(logDestination); - } - - bool foundKey = false; - for (auto& it : vecBackupKeys->intersectingRanges(normalKeys)) { - if (it.value().count(logDestination) > 0) { - foundKey = true; - break; - } - } - if (!foundKey) { - auto logRanges = vecBackupKeys->modify(singleKeyRange(metadataVersionKey)); - for (auto logRange : logRanges) { - auto& logRangeMap = logRange->value(); - logRangeMap.erase(logDestination); - } - } - } - - // Coallesce the entire range - vecBackupKeys->coalesce(allKeys); - } - - if (!initialCommit) - txnStateStore->clear(commonLogRange); - } } } - for (KeyRangeRef& range : tssServerListToRemove) { - txnStateStore->clear(range); + void checkSetTSSMappingKeys(MutationRef m) { + if (!m.param1.startsWith(tssMappingKeys.begin)) { + return; + } + // Normally uses key backed map, so have to use same unpacking code here. + UID ssId = Codec::unpack(Tuple::unpack(m.param1.removePrefix(tssMappingKeys.begin))); + UID tssId = Codec::unpack(Tuple::unpack(m.param2)); + if (!initialCommit) { + txnStateStore->set(KeyValueRef(m.param1, m.param2)); + } + if (tssMapping) { + tssMappingToAdd.push_back(std::pair(ssId, tssId)); + } + + if (toCommit) { + // send private mutation to SS that it now has a TSS pair + MutationRef privatized = m; + privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); + + Optional tagV = txnStateStore->readValue(serverTagKeyFor(ssId)).get(); + if (tagV.present()) { + toCommit->addTag(decodeServerTagValue(tagV.get())); + toCommit->writeTypedMessage(privatized); + } + } } - for (auto& tssPair : tssMappingToAdd) { - // read tss server list from txn state store and add it to tss mapping - StorageServerInterface tssi = - decodeServerListValue(txnStateStore->readValue(serverListKeyFor(tssPair.second)).get().get()); - (*tssMapping)[tssPair.first] = tssi; + void checkSetTSSQuarantineKeys(MutationRef m) { + if (!m.param1.startsWith(tssQuarantineKeys.begin)) { + return; + } + if (initialCommit) { + return; + } + txnStateStore->set(KeyValueRef(m.param1, m.param2)); + + if (!toCommit) { + return; + } + UID tssId = decodeTssQuarantineKey(m.param1); + Optional ssiV = txnStateStore->readValue(serverListKeyFor(tssId)).get(); + if (!ssiV.present()) { + return; + } + StorageServerInterface ssi = decodeServerListValue(ssiV.get()); + if (!ssi.isTss()) { + return; + } + Optional tagV = txnStateStore->readValue(serverTagKeyFor(ssi.tssPairID.get())).get(); + if (tagV.present()) { + MutationRef privatized = m; + privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); + toCommit->addTag(decodeServerTagValue(tagV.get())); + toCommit->writeTypedMessage(privatized); + } + } + + void checkSetApplyMutationsEndRange(MutationRef m) { + if (!m.param1.startsWith(applyMutationsEndRange.begin)) { + return; + } + + if (!initialCommit) + txnStateStore->set(KeyValueRef(m.param1, m.param2)); + + if (uid_applyMutationsData == nullptr) { + return; + } + + Key uid = m.param1.removePrefix(applyMutationsEndRange.begin); + auto& p = (*uid_applyMutationsData)[uid]; + p.endVersion = BinaryReader::fromStringRef(m.param2, Unversioned()); + if (p.keyVersion == Reference>()) + p.keyVersion = makeReference>(); + if (p.worker.isValid() && !p.worker.isReady()) { + return; + } + auto addPrefixValue = txnStateStore->readValue(uid.withPrefix(applyMutationsAddPrefixRange.begin)).get(); + auto removePrefixValue = txnStateStore->readValue(uid.withPrefix(applyMutationsRemovePrefixRange.begin)).get(); + auto beginValue = txnStateStore->readValue(uid.withPrefix(applyMutationsBeginRange.begin)).get(); + p.worker = applyMutations( + cx, + uid, + addPrefixValue.present() ? addPrefixValue.get() : Key(), + removePrefixValue.present() ? removePrefixValue.get() : Key(), + beginValue.present() ? BinaryReader::fromStringRef(beginValue.get(), Unversioned()) : 0, + &p.endVersion, + commit, + commitVersion, + p.keyVersion); + } + + void checkSetApplyMutationsKeyVersionMapRange(MutationRef m) { + if (!m.param1.startsWith(applyMutationsKeyVersionMapRange.begin)) { + return; + } + if (!initialCommit) + txnStateStore->set(KeyValueRef(m.param1, m.param2)); + + if (uid_applyMutationsData == nullptr) { + return; + } + if (m.param1.size() >= applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID)) { + Key uid = m.param1.substr(applyMutationsKeyVersionMapRange.begin.size(), sizeof(UID)); + Key k = m.param1.substr(applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID)); + auto& p = (*uid_applyMutationsData)[uid]; + if (p.keyVersion == Reference>()) + p.keyVersion = makeReference>(); + p.keyVersion->rawInsert(k, BinaryReader::fromStringRef(m.param2, Unversioned())); + } + } + + void checkSetLogRangesRange(MutationRef m) { + if (!m.param1.startsWith(logRangesRange.begin)) { + return; + } + if (!initialCommit) + txnStateStore->set(KeyValueRef(m.param1, m.param2)); + if (!vecBackupKeys) { + return; + } + Key logDestination; + KeyRef logRangeBegin = logRangesDecodeKey(m.param1, nullptr); + Key logRangeEnd = logRangesDecodeValue(m.param2, &logDestination); + + // Insert the logDestination into each range of vecBackupKeys overlapping the decoded range + for (auto& logRange : vecBackupKeys->modify(KeyRangeRef(logRangeBegin, logRangeEnd))) { + logRange->value().insert(logDestination); + } + for (auto& logRange : vecBackupKeys->modify(singleKeyRange(metadataVersionKey))) { + logRange->value().insert(logDestination); + } + + TraceEvent("LogRangeAdd") + .detail("LogRanges", vecBackupKeys->size()) + .detail("MutationKey", m.param1) + .detail("LogRangeBegin", logRangeBegin) + .detail("LogRangeEnd", logRangeEnd); + } + + void checkSetGlobalKeys(MutationRef m) { + if (!m.param1.startsWith(globalKeysPrefix)) { + return; + } + if (!toCommit) { + return; + } + + // Notifies all servers that a Master's server epoch ends + auto allServers = txnStateStore->readRange(serverTagKeys).get(); + std::set allTags; + + if (m.param1 == killStorageKey) { + int8_t safeLocality = BinaryReader::fromStringRef(m.param2, Unversioned()); + for (auto& kv : allServers) { + Tag t = decodeServerTagValue(kv.value); + if (t.locality != safeLocality) { + allTags.insert(t); + } + } + } else { + for (auto& kv : allServers) { + allTags.insert(decodeServerTagValue(kv.value)); + } + } + allTags.insert(cacheTag); + + if (m.param1 == lastEpochEndKey) { + toCommit->addTags(allTags); + toCommit->writeTypedMessage(LogProtocolMessage(), true); + } + + MutationRef privatized = m; + privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); + toCommit->addTags(allTags); + toCommit->writeTypedMessage(privatized); + } + + void checkSetOtherKeys(MutationRef m) { + if (initialCommit) + return; + if (m.param1 == databaseLockedKey || m.param1 == metadataVersionKey || + m.param1 == mustContainSystemMutationsKey || m.param1.startsWith(applyMutationsBeginRange.begin) || + m.param1.startsWith(applyMutationsAddPrefixRange.begin) || + m.param1.startsWith(applyMutationsRemovePrefixRange.begin) || m.param1.startsWith(tagLocalityListPrefix) || + m.param1.startsWith(serverTagHistoryPrefix) || + m.param1.startsWith(testOnlyTxnStateStorePrefixRange.begin)) { + + txnStateStore->set(KeyValueRef(m.param1, m.param2)); + } + } + + void checkSetMinRequiredCommitVersionKey(MutationRef m) { + if (m.param1 != minRequiredCommitVersionKey) { + return; + } + Version requested = BinaryReader::fromStringRef(m.param2, Unversioned()); + TraceEvent("MinRequiredCommitVersion", dbgid).detail("Min", requested).detail("Current", popVersion); + if (!initialCommit) + txnStateStore->set(KeyValueRef(m.param1, m.param2)); + confChange = true; + TEST(true); // Recovering at a higher version. + } + + void checkSetWriteRecoverKey(MutationRef m) { + if (m.param1 != writeRecoveryKey) { + return; + } + TraceEvent("WriteRecoveryKeySet", dbgid).log(); + if (!initialCommit) + txnStateStore->set(KeyValueRef(m.param1, m.param2)); + TEST(true); // Snapshot created, setting writeRecoveryKey in txnStateStore + } + + void checkClearKeyServerKeys(KeyRangeRef range) { + if (!keyServersKeys.intersects(range)) { + return; + } + KeyRangeRef r = range & keyServersKeys; + if (keyInfo) { + KeyRangeRef clearRange(r.begin.removePrefix(keyServersPrefix), r.end.removePrefix(keyServersPrefix)); + keyInfo->insert(clearRange, + clearRange.begin == StringRef() + ? ServerCacheInfo() + : keyInfo->rangeContainingKeyBefore(clearRange.begin).value()); + } + + if (!initialCommit) + txnStateStore->clear(r); + } + + void checkClearConfigKeys(MutationRef m, KeyRangeRef range) { + if (!configKeys.intersects(range)) { + return; + } + if (!initialCommit) + txnStateStore->clear(range & configKeys); + if (!excludedServersKeys.contains(range) && !failedServersKeys.contains(range) && + !excludedLocalityKeys.contains(range) && !failedLocalityKeys.contains(range)) { + TraceEvent("MutationRequiresRestart", dbgid).detail("M", m); + confChange = true; + } + } + + void checkClearServerListKeys(KeyRangeRef range) { + if (!serverListKeys.intersects(range)) { + return; + } + if (initialCommit) { + return; + } + KeyRangeRef rangeToClear = range & serverListKeys; + if (rangeToClear.singleKeyRange()) { + UID id = decodeServerListKey(rangeToClear.begin); + Optional ssiV = txnStateStore->readValue(serverListKeyFor(id)).get(); + if (ssiV.present() && decodeServerListValue(ssiV.get()).isTss()) { + tssServerListToRemove.push_back(rangeToClear); + } else { + txnStateStore->clear(rangeToClear); + } + } else { + txnStateStore->clear(rangeToClear); + } + } + + void checkClearTagLocalityListKeys(KeyRangeRef range) { + if (!tagLocalityListKeys.intersects(range)) { + return; + } + if (initialCommit) { + return; + } + txnStateStore->clear(range & tagLocalityListKeys); + } + + void checkClearServerTagKeys(MutationRef m, KeyRangeRef range) { + if (!serverTagKeys.intersects(range)) { + return; + } + + // Storage server removal always happens in a separate version from any prior writes (or any subsequent + // reuse of the tag) so we can safely destroy the tag here without any concern about intra-batch + // ordering + if (logSystem && popVersion) { + auto serverKeysCleared = + txnStateStore->readRange(range & serverTagKeys).get(); // read is expected to be immediately available + for (auto& kv : serverKeysCleared) { + Tag tag = decodeServerTagValue(kv.value); + TraceEvent("ServerTagRemove") + .detail("PopVersion", popVersion) + .detail("Tag", tag.toString()) + .detail("Server", decodeServerTagKey(kv.key)); + logSystem->pop(popVersion, decodeServerTagValue(kv.value)); + (*tag_popped)[tag] = popVersion; + + if (toCommit) { + MutationRef privatized = m; + privatized.param1 = kv.key.withPrefix(systemKeys.begin, arena); + privatized.param2 = keyAfter(kv.key, arena).withPrefix(systemKeys.begin, arena); + + toCommit->addTag(decodeServerTagValue(kv.value)); + toCommit->writeTypedMessage(privatized); + } + } + // Might be a tss removal, which doesn't store a tag there. + // Chained if is a little verbose, but avoids unecessary work + if (toCommit && !initialCommit && !serverKeysCleared.size()) { + KeyRangeRef maybeTssRange = range & serverTagKeys; + if (maybeTssRange.singleKeyRange()) { + UID id = decodeServerTagKey(maybeTssRange.begin); + Optional ssiV = txnStateStore->readValue(serverListKeyFor(id)).get(); + + if (ssiV.present()) { + StorageServerInterface ssi = decodeServerListValue(ssiV.get()); + if (ssi.isTss()) { + Optional tagV = txnStateStore->readValue(serverTagKeyFor(ssi.tssPairID.get())).get(); + if (tagV.present()) { + MutationRef privatized = m; + privatized.param1 = maybeTssRange.begin.withPrefix(systemKeys.begin, arena); + privatized.param2 = + keyAfter(maybeTssRange.begin, arena).withPrefix(systemKeys.begin, arena); + + toCommit->addTag(decodeServerTagValue(tagV.get())); + toCommit->writeTypedMessage(privatized); + } + } + } + } + } + } + + if (!initialCommit) { + KeyRangeRef clearRange = range & serverTagKeys; + txnStateStore->clear(clearRange); + if (storageCache && clearRange.singleKeyRange()) { + storageCache->erase(decodeServerTagKey(clearRange.begin)); + } + } + } + + void checkClearServerTagHistoryKeys(KeyRangeRef range) { + if (!serverTagHistoryKeys.intersects(range)) { + return; + } + // Once a tag has been removed from history we should pop it, since we no longer have a record of the + // tag once it has been removed from history + if (logSystem && popVersion) { + auto serverKeysCleared = txnStateStore->readRange(range & serverTagHistoryKeys) + .get(); // read is expected to be immediately available + for (auto& kv : serverKeysCleared) { + Tag tag = decodeServerTagValue(kv.value); + TraceEvent("ServerTagHistoryRemove") + .detail("PopVersion", popVersion) + .detail("Tag", tag.toString()) + .detail("Version", decodeServerTagHistoryKey(kv.key)); + logSystem->pop(popVersion, tag); + (*tag_popped)[tag] = popVersion; + } + } + if (!initialCommit) + txnStateStore->clear(range & serverTagHistoryKeys); + } + + void checkClearApplyMutationsEndRange(MutationRef m, KeyRangeRef range) { + if (!range.intersects(applyMutationsEndRange)) { + return; + } + KeyRangeRef commonEndRange(range & applyMutationsEndRange); + if (!initialCommit) + txnStateStore->clear(commonEndRange); + if (uid_applyMutationsData != nullptr) { + uid_applyMutationsData->erase( + uid_applyMutationsData->lower_bound(m.param1.substr(applyMutationsEndRange.begin.size())), + m.param2 == applyMutationsEndRange.end + ? uid_applyMutationsData->end() + : uid_applyMutationsData->lower_bound(m.param2.substr(applyMutationsEndRange.begin.size()))); + } + } + + void checkClearApplyMutationKeyVersionMapRange(MutationRef m, KeyRangeRef range) { + if (!range.intersects(applyMutationsKeyVersionMapRange)) { + return; + } + KeyRangeRef commonApplyRange(range & applyMutationsKeyVersionMapRange); + if (!initialCommit) + txnStateStore->clear(commonApplyRange); + if (uid_applyMutationsData == nullptr) { + return; + } + if (m.param1.size() >= applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID) && + m.param2.size() >= applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID)) { + Key uid = m.param1.substr(applyMutationsKeyVersionMapRange.begin.size(), sizeof(UID)); + Key uid2 = m.param2.substr(applyMutationsKeyVersionMapRange.begin.size(), sizeof(UID)); + + if (uid == uid2) { + auto& p = (*uid_applyMutationsData)[uid]; + if (p.keyVersion == Reference>()) + p.keyVersion = makeReference>(); + p.keyVersion->rawErase( + KeyRangeRef(m.param1.substr(applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID)), + m.param2.substr(applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID)))); + } + } + } + + void checkClearLogRangesRange(KeyRangeRef range) { + if (!range.intersects(logRangesRange)) { + return; + } + KeyRangeRef commonLogRange(range & logRangesRange); + + TraceEvent("LogRangeClear") + .detail("RangeBegin", range.begin) + .detail("RangeEnd", range.end) + .detail("IntersectBegin", commonLogRange.begin) + .detail("IntersectEnd", commonLogRange.end); + + // Remove the key range from the vector, if defined + if (vecBackupKeys) { + KeyRef logKeyBegin; + Key logKeyEnd, logDestination; + + // Identify the backup keys being removed + // read is expected to be immediately available + auto logRangesAffected = txnStateStore->readRange(commonLogRange).get(); + + TraceEvent("LogRangeClearBegin").detail("AffectedLogRanges", logRangesAffected.size()); + + // Add the backup name to the backup locations that do not have it + for (auto logRangeAffected : logRangesAffected) { + // Parse the backup key and name + logKeyBegin = logRangesDecodeKey(logRangeAffected.key, nullptr); + + // Decode the log destination and key value + logKeyEnd = logRangesDecodeValue(logRangeAffected.value, &logDestination); + + TraceEvent("LogRangeErase") + .detail("AffectedKey", logRangeAffected.key) + .detail("AffectedValue", logRangeAffected.value) + .detail("LogKeyBegin", logKeyBegin) + .detail("LogKeyEnd", logKeyEnd) + .detail("LogDestination", logDestination); + + // Identify the locations to place the backup key + auto logRanges = vecBackupKeys->modify(KeyRangeRef(logKeyBegin, logKeyEnd)); + + // Remove the log prefix from the ranges which include it + for (auto logRange : logRanges) { + auto& logRangeMap = logRange->value(); + + // Remove the backup name from the range + logRangeMap.erase(logDestination); + } + + bool foundKey = false; + for (auto& it : vecBackupKeys->intersectingRanges(normalKeys)) { + if (it.value().count(logDestination) > 0) { + foundKey = true; + break; + } + } + if (!foundKey) { + auto logRanges = vecBackupKeys->modify(singleKeyRange(metadataVersionKey)); + for (auto logRange : logRanges) { + auto& logRangeMap = logRange->value(); + logRangeMap.erase(logDestination); + } + } + } + + // Coallesce the entire range + vecBackupKeys->coalesce(allKeys); + } + + if (!initialCommit) + txnStateStore->clear(commonLogRange); + } + + void checkClearTssMappingKeys(MutationRef m, KeyRangeRef range) { + if (!tssMappingKeys.intersects(range)) { + return; + } + KeyRangeRef rangeToClear = range & tssMappingKeys; + ASSERT(rangeToClear.singleKeyRange()); + + // Normally uses key backed map, so have to use same unpacking code here. + UID ssId = Codec::unpack(Tuple::unpack(m.param1.removePrefix(tssMappingKeys.begin))); + if (!initialCommit) { + txnStateStore->clear(rangeToClear); + } + + if (tssMapping) { + tssMapping->erase(ssId); + } + + if (!toCommit) { + return; + } + // send private mutation to SS to notify that it no longer has a tss pair + if (Optional tagV = txnStateStore->readValue(serverTagKeyFor(ssId)).get(); tagV.present()) { + MutationRef privatized = m; + privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); + toCommit->addTag(decodeServerTagValue(tagV.get())); + toCommit->writeTypedMessage(privatized); + } + } + + void checkClearTssQuarantineKeys(MutationRef m, KeyRangeRef range) { + if (!tssQuarantineKeys.intersects(range)) { + return; + } + if (initialCommit) { + return; + } + + KeyRangeRef rangeToClear = range & tssQuarantineKeys; + ASSERT(rangeToClear.singleKeyRange()); + txnStateStore->clear(rangeToClear); + + if (!toCommit) { + return; + } + UID tssId = decodeTssQuarantineKey(m.param1); + if (Optional ssiV = txnStateStore->readValue(serverListKeyFor(tssId)).get(); ssiV.present()) { + if (StorageServerInterface ssi = decodeServerListValue(ssiV.get()); ssi.isTss()) { + if (Optional tagV = txnStateStore->readValue(serverTagKeyFor(ssi.tssPairID.get())).get(); + tagV.present()) { + + MutationRef privatized = m; + privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena); + toCommit->addTag(decodeServerTagValue(tagV.get())); + toCommit->writeTypedMessage(privatized); + } + } + } + } + + void checkClearMiscRangeKeys(KeyRangeRef range) { + if (initialCommit) { + return; + } + if (range.contains(coordinatorsKey)) { + txnStateStore->clear(singleKeyRange(coordinatorsKey)); + } + if (range.contains(databaseLockedKey)) { + txnStateStore->clear(singleKeyRange(databaseLockedKey)); + } + if (range.contains(metadataVersionKey)) { + txnStateStore->clear(singleKeyRange(metadataVersionKey)); + } + if (range.contains(mustContainSystemMutationsKey)) { + txnStateStore->clear(singleKeyRange(mustContainSystemMutationsKey)); + } + if (range.contains(writeRecoveryKey)) { + txnStateStore->clear(singleKeyRange(writeRecoveryKey)); + } + if (range.intersects(testOnlyTxnStateStorePrefixRange)) { + txnStateStore->clear(range & testOnlyTxnStateStorePrefixRange); + } } // If we accumulated private mutations for cached key-ranges, we also need to @@ -732,7 +875,11 @@ void applyMetadataMutations(SpanID const& spanContext, // TODO Note that, we are currently not handling the case when cached key-ranges move out // to different storage servers. This would require some checking when keys in the keyServersPrefix change. // For the first implementation, we could just send the entire map to every storage server. Revisit! - if (cachedRangeInfo.size() != 0 && toCommit) { + void tagStorageServersForCachedKeyRanges() { + if (cachedRangeInfo.size() == 0 || !toCommit) { + return; + } + std::map::iterator itr; KeyRef keyBegin, keyEnd; vector serverIndices; @@ -787,7 +934,91 @@ void applyMetadataMutations(SpanID const& spanContext, toCommit->writeTypedMessage(mutationEnd); } } -} + +public: + ApplyMetadataMutationsImpl(const SpanID& spanContext_, + const UID& dbgid_, + Arena& arena_, + const VectorRef& mutations_, + IKeyValueStore* txnStateStore_) + : spanContext(spanContext_), dbgid(dbgid_), arena(arena_), mutations(mutations_), txnStateStore(txnStateStore_), + confChange(dummyConfChange) {} + + ApplyMetadataMutationsImpl(const SpanID& spanContext_, + Arena& arena_, + const VectorRef& mutations_, + ProxyCommitData& proxyCommitData_, + Reference logSystem_, + LogPushData* toCommit_, + bool& confChange_, + Version popVersion_, + bool initialCommit_) + : spanContext(spanContext_), dbgid(proxyCommitData_.dbgid), arena(arena_), mutations(mutations_), + txnStateStore(proxyCommitData_.txnStateStore), toCommit(toCommit_), confChange(confChange_), + logSystem(logSystem_), popVersion(popVersion_), vecBackupKeys(&proxyCommitData_.vecBackupKeys), + keyInfo(&proxyCommitData_.keyInfo), cacheInfo(&proxyCommitData_.cacheInfo), + uid_applyMutationsData(proxyCommitData_.firstProxy ? &proxyCommitData_.uid_applyMutationsData : nullptr), + commit(proxyCommitData_.commit), cx(proxyCommitData_.cx), commitVersion(&proxyCommitData_.committedVersion), + storageCache(&proxyCommitData_.storageCache), tag_popped(&proxyCommitData_.tag_popped), + tssMapping(&proxyCommitData_.tssMapping), initialCommit(initialCommit_) {} + + void apply() { + for (auto const& m : mutations) { + if (toCommit) { + toCommit->addTransactionInfo(spanContext); + } + + if (m.type == MutationRef::SetValue && isSystemKey(m.param1)) { + checkSetKeyServersPrefix(m); + checkSetServerKeysPrefix(m); + checkSetServerTagsPrefix(m); + checkSetStorageCachePrefix(m); + checkSetCacheKeysPrefix(m); + checkSetConfigKeys(m); + checkSetServerListPrefix(m); + checkSetTSSMappingKeys(m); + checkSetTSSQuarantineKeys(m); + checkSetApplyMutationsEndRange(m); + checkSetApplyMutationsKeyVersionMapRange(m); + checkSetLogRangesRange(m); + checkSetGlobalKeys(m); + checkSetWriteRecoverKey(m); + checkSetMinRequiredCommitVersionKey(m); + checkSetOtherKeys(m); + } else if (m.type == MutationRef::ClearRange && isSystemKey(m.param2)) { + KeyRangeRef range(m.param1, m.param2); + + checkClearKeyServerKeys(range); + checkClearConfigKeys(m, range); + checkClearServerListKeys(range); + checkClearTagLocalityListKeys(range); + checkClearServerTagKeys(m, range); + checkClearServerTagHistoryKeys(range); + checkClearApplyMutationsEndRange(m, range); + checkClearApplyMutationKeyVersionMapRange(m, range); + checkClearLogRangesRange(range); + checkClearTssMappingKeys(m, range); + checkClearTssQuarantineKeys(m, range); + checkClearMiscRangeKeys(range); + } + } + + for (KeyRangeRef& range : tssServerListToRemove) { + txnStateStore->clear(range); + } + + for (auto& tssPair : tssMappingToAdd) { + // read tss server list from txn state store and add it to tss mapping + StorageServerInterface tssi = + decodeServerListValue(txnStateStore->readValue(serverListKeyFor(tssPair.second)).get().get()); + (*tssMapping)[tssPair.first] = tssi; + } + + tagStorageServersForCachedKeyRanges(); + } +}; + +} // anonymous namespace void applyMetadataMutations(SpanID const& spanContext, ProxyCommitData& proxyCommitData, @@ -799,31 +1030,9 @@ void applyMetadataMutations(SpanID const& spanContext, Version popVersion, bool initialCommit) { - std::map* uid_applyMutationsData = nullptr; - if (proxyCommitData.firstProxy) { - uid_applyMutationsData = &proxyCommitData.uid_applyMutationsData; - } - - applyMetadataMutations(spanContext, - proxyCommitData.dbgid, - arena, - mutations, - proxyCommitData.txnStateStore, - toCommit, - confChange, - logSystem, - popVersion, - &proxyCommitData.vecBackupKeys, - &proxyCommitData.keyInfo, - &proxyCommitData.cacheInfo, - uid_applyMutationsData, - proxyCommitData.commit, - proxyCommitData.cx, - &proxyCommitData.committedVersion, - &proxyCommitData.storageCache, - &proxyCommitData.tag_popped, - &proxyCommitData.tssMapping, - initialCommit); + ApplyMetadataMutationsImpl( + spanContext, arena, mutations, proxyCommitData, logSystem, toCommit, confChange, popVersion, initialCommit) + .apply(); } void applyMetadataMutations(SpanID const& spanContext, @@ -831,27 +1040,5 @@ void applyMetadataMutations(SpanID const& spanContext, Arena& arena, const VectorRef& mutations, IKeyValueStore* txnStateStore) { - - bool confChange; // Dummy variable, not used. - - applyMetadataMutations(spanContext, - dbgid, - arena, - mutations, - txnStateStore, - /* toCommit= */ nullptr, - confChange, - Reference(), - /* popVersion= */ 0, - /* vecBackupKeys= */ nullptr, - /* keyInfo= */ nullptr, - /* cacheInfo= */ nullptr, - /* uid_applyMutationsData= */ nullptr, - RequestStream(), - Database(), - /* commitVersion= */ nullptr, - /* storageCache= */ nullptr, - /* tag_popped= */ nullptr, - /* tssMapping= */ nullptr, - /* initialCommit= */ false); -} + ApplyMetadataMutationsImpl(spanContext, dbgid, arena, mutations, txnStateStore).apply(); +} \ No newline at end of file diff --git a/fdbserver/ApplyMetadataMutation.h b/fdbserver/ApplyMetadataMutation.h index 26aa4435aa..f3cc17e07d 100644 --- a/fdbserver/ApplyMetadataMutation.h +++ b/fdbserver/ApplyMetadataMutation.h @@ -22,22 +22,27 @@ #define FDBSERVER_APPLYMETADATAMUTATION_H #pragma once -#include "fdbclient/MutationList.h" -#include "fdbclient/SystemData.h" #include "fdbclient/BackupAgent.actor.h" +#include "fdbclient/MutationList.h" #include "fdbclient/Notified.h" +#include "fdbclient/SystemData.h" #include "fdbserver/IKeyValueStore.h" -#include "fdbserver/LogSystem.h" #include "fdbserver/LogProtocolMessage.h" +#include "fdbserver/LogSystem.h" #include "fdbserver/ProxyCommitData.actor.h" inline bool isMetadataMutation(MutationRef const& m) { // FIXME: This is conservative - not everything in system keyspace is necessarily processed by // applyMetadataMutations - return (m.type == MutationRef::SetValue && m.param1.size() && m.param1[0] == systemKeys.begin[0] && - !m.param1.startsWith(nonMetadataSystemKeys.begin)) || - (m.type == MutationRef::ClearRange && m.param2.size() > 1 && m.param2[0] == systemKeys.begin[0] && - !nonMetadataSystemKeys.contains(KeyRangeRef(m.param1, m.param2))); + if (m.type == MutationRef::SetValue) { + return m.param1.size() && m.param1[0] == systemKeys.begin[0] && + !m.param1.startsWith(nonMetadataSystemKeys.begin); + } else if (m.type == MutationRef::ClearRange) { + return m.param2.size() > 1 && m.param2[0] == systemKeys.begin[0] && + !nonMetadataSystemKeys.contains(KeyRangeRef(m.param1, m.param2)); + } else { + return false; + } } Reference getStorageInfo(UID id,