mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-15 18:32:18 +08:00
Merge pull request #5441 from xis19/master
Refactor ApplyMetadataMutation for better readability
This commit is contained in:
commit
5d84ffe019
@ -18,15 +18,15 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/MutationList.h"
|
||||
#include "fdbclient/KeyBackedTypes.h" // for key backed map codecs for tss mapping
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/BackupAgent.actor.h"
|
||||
#include "fdbclient/KeyBackedTypes.h" // for key backed map codecs for tss mapping
|
||||
#include "fdbclient/MutationList.h"
|
||||
#include "fdbclient/Notified.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbserver/ApplyMetadataMutation.h"
|
||||
#include "fdbserver/IKeyValueStore.h"
|
||||
#include "fdbserver/LogSystem.h"
|
||||
#include "fdbserver/LogProtocolMessage.h"
|
||||
#include "fdbserver/LogSystem.h"
|
||||
|
||||
Reference<StorageInfo> getStorageInfo(UID id,
|
||||
std::map<UID, Reference<StorageInfo>>* storageCache,
|
||||
@ -43,53 +43,115 @@ Reference<StorageInfo> getStorageInfo(UID id,
|
||||
}
|
||||
return storageInfo;
|
||||
}
|
||||
namespace {
|
||||
|
||||
inline bool isSystemKey(KeyRef key) {
|
||||
return key.size() && key[0] == systemKeys.begin[0];
|
||||
}
|
||||
|
||||
// It is incredibly important that any modifications to txnStateStore are done in such a way that the same operations
|
||||
// will be done on all commit proxies at the same time. Otherwise, the data stored in txnStateStore will become
|
||||
// corrupted.
|
||||
class ApplyMetadataMutationsImpl {
|
||||
|
||||
public:
|
||||
ApplyMetadataMutationsImpl(const SpanID& spanContext_,
|
||||
const UID& dbgid_,
|
||||
Arena& arena_,
|
||||
const VectorRef<MutationRef>& mutations_,
|
||||
IKeyValueStore* txnStateStore_)
|
||||
: spanContext(spanContext_), dbgid(dbgid_), arena(arena_), mutations(mutations_), txnStateStore(txnStateStore_),
|
||||
confChange(dummyConfChange) {}
|
||||
|
||||
ApplyMetadataMutationsImpl(const SpanID& spanContext_,
|
||||
Arena& arena_,
|
||||
const VectorRef<MutationRef>& mutations_,
|
||||
ProxyCommitData& proxyCommitData_,
|
||||
Reference<ILogSystem> logSystem_,
|
||||
LogPushData* toCommit_,
|
||||
bool& confChange_,
|
||||
Version popVersion_,
|
||||
bool initialCommit_)
|
||||
: spanContext(spanContext_), dbgid(proxyCommitData_.dbgid), arena(arena_), mutations(mutations_),
|
||||
txnStateStore(proxyCommitData_.txnStateStore), toCommit(toCommit_), confChange(confChange_),
|
||||
logSystem(logSystem_), popVersion(popVersion_), vecBackupKeys(&proxyCommitData_.vecBackupKeys),
|
||||
keyInfo(&proxyCommitData_.keyInfo), cacheInfo(&proxyCommitData_.cacheInfo),
|
||||
uid_applyMutationsData(proxyCommitData_.firstProxy ? &proxyCommitData_.uid_applyMutationsData : nullptr),
|
||||
commit(proxyCommitData_.commit), cx(proxyCommitData_.cx), commitVersion(&proxyCommitData_.committedVersion),
|
||||
storageCache(&proxyCommitData_.storageCache), tag_popped(&proxyCommitData_.tag_popped),
|
||||
tssMapping(&proxyCommitData_.tssMapping), initialCommit(initialCommit_) {}
|
||||
|
||||
private:
|
||||
// The following variables are incoming parameters
|
||||
|
||||
const SpanID& spanContext;
|
||||
|
||||
const UID& dbgid;
|
||||
|
||||
Arena& arena;
|
||||
|
||||
const VectorRef<MutationRef>& mutations;
|
||||
|
||||
// Transaction KV store
|
||||
IKeyValueStore* txnStateStore;
|
||||
|
||||
// non-null if these mutations were part of a new commit handled by this commit proxy
|
||||
LogPushData* toCommit = nullptr;
|
||||
|
||||
// Flag indicates if the configure is changed
|
||||
bool& confChange;
|
||||
|
||||
Reference<ILogSystem> logSystem = Reference<ILogSystem>();
|
||||
Version popVersion = 0;
|
||||
KeyRangeMap<std::set<Key>>* vecBackupKeys = nullptr;
|
||||
KeyRangeMap<ServerCacheInfo>* keyInfo = nullptr;
|
||||
KeyRangeMap<bool>* cacheInfo = nullptr;
|
||||
std::map<Key, ApplyMutationsData>* uid_applyMutationsData = nullptr;
|
||||
RequestStream<CommitTransactionRequest> commit = RequestStream<CommitTransactionRequest>();
|
||||
Database cx = Database();
|
||||
NotifiedVersion* commitVersion = nullptr;
|
||||
std::map<UID, Reference<StorageInfo>>* storageCache = nullptr;
|
||||
std::map<Tag, Version>* tag_popped = nullptr;
|
||||
std::unordered_map<UID, StorageServerInterface>* tssMapping = nullptr;
|
||||
|
||||
// true if the mutations were already written to the txnStateStore as part of recovery
|
||||
bool initialCommit = false;
|
||||
|
||||
private:
|
||||
// The following variables are used internally
|
||||
|
||||
// It is incredibly important that any modifications to txnStateStore are done in such a way that
|
||||
// the same operations will be done on all commit proxies at the same time. Otherwise, the data
|
||||
// stored in txnStateStore will become corrupted.
|
||||
void applyMetadataMutations(SpanID const& spanContext,
|
||||
UID const& dbgid,
|
||||
Arena& arena,
|
||||
VectorRef<MutationRef> const& mutations,
|
||||
IKeyValueStore* txnStateStore,
|
||||
LogPushData* toCommit, // non-null if these mutations were part of a new commit handled by this commit proxy
|
||||
bool& confChange,
|
||||
Reference<ILogSystem> logSystem,
|
||||
Version popVersion,
|
||||
KeyRangeMap<std::set<Key>>* vecBackupKeys,
|
||||
KeyRangeMap<ServerCacheInfo>* keyInfo,
|
||||
KeyRangeMap<bool>* cacheInfo,
|
||||
std::map<Key, ApplyMutationsData>* uid_applyMutationsData,
|
||||
RequestStream<CommitTransactionRequest> commit,
|
||||
Database cx,
|
||||
NotifiedVersion* commitVersion,
|
||||
std::map<UID, Reference<StorageInfo>>* storageCache,
|
||||
std::map<Tag, Version>* tag_popped,
|
||||
std::unordered_map<UID, StorageServerInterface>* tssMapping,
|
||||
bool initialCommit // true if the mutations were already written to the txnStateStore as part of recovery
|
||||
) {
|
||||
// std::map<keyRef, vector<uint16_t>> cacheRangeInfo;
|
||||
std::map<KeyRef, MutationRef> cachedRangeInfo;
|
||||
|
||||
// Testing Storage Server removal (clearing serverTagKey) needs to read tss server list value to determine it is a
|
||||
// tss + find partner's tag to send the private mutation. Since the removeStorageServer transaction clears both the
|
||||
// storage list and server tag, we have to enforce ordering, proccessing the server tag first, and postpone the
|
||||
// server list clear until the end;
|
||||
// Similarly, the TSS mapping change key needs to read the server list at the end of the commit
|
||||
std::vector<KeyRangeRef> tssServerListToRemove;
|
||||
|
||||
// Similar to tssServerListToRemove, the TSS mapping change key needs to read the server list at the end of the
|
||||
// commit
|
||||
std::vector<std::pair<UID, UID>> tssMappingToAdd;
|
||||
|
||||
for (auto const& m : mutations) {
|
||||
//TraceEvent("MetadataMutation", dbgid).detail("M", m.toString());
|
||||
if (toCommit) {
|
||||
toCommit->addTransactionInfo(spanContext);
|
||||
private:
|
||||
bool dummyConfChange = false;
|
||||
|
||||
private:
|
||||
void checkSetKeyServersPrefix(MutationRef m) {
|
||||
if (!m.param1.startsWith(keyServersPrefix)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (m.param1.size() && m.param1[0] == systemKeys.begin[0] && m.type == MutationRef::SetValue) {
|
||||
if (m.param1.startsWith(keyServersPrefix)) {
|
||||
if (keyInfo) {
|
||||
if (!initialCommit)
|
||||
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
||||
|
||||
if (!keyInfo) {
|
||||
return;
|
||||
}
|
||||
KeyRef k = m.param1.removePrefix(keyServersPrefix);
|
||||
if (k != allKeys.end) {
|
||||
if (k == allKeys.end) {
|
||||
return;
|
||||
}
|
||||
|
||||
KeyRef end = keyInfo->rangeContaining(k).end();
|
||||
KeyRangeRef insertRange(k, end);
|
||||
vector<UID> src, dest;
|
||||
@ -121,10 +183,11 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
uniquify(info.tags);
|
||||
keyInfo->insert(insertRange, info);
|
||||
}
|
||||
|
||||
void checkSetServerKeysPrefix(MutationRef m) {
|
||||
if (!m.param1.startsWith(serverKeysPrefix)) {
|
||||
return;
|
||||
}
|
||||
if (!initialCommit)
|
||||
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
||||
} else if (m.param1.startsWith(serverKeysPrefix)) {
|
||||
if (toCommit) {
|
||||
Tag tag = decodeServerTagValue(
|
||||
txnStateStore->readValue(serverTagKeyFor(serverKeysDecodeServer(m.param1))).get().get());
|
||||
@ -140,7 +203,12 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
toCommit->addTag(tag);
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
}
|
||||
} else if (m.param1.startsWith(serverTagPrefix)) {
|
||||
}
|
||||
|
||||
void checkSetServerTagsPrefix(MutationRef m) {
|
||||
if (!m.param1.startsWith(serverTagPrefix)) {
|
||||
return;
|
||||
}
|
||||
UID id = decodeServerTagKey(m.param1);
|
||||
Tag tag = decodeServerTagValue(m.param2);
|
||||
|
||||
@ -175,7 +243,11 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (m.param1.startsWith(storageCachePrefix)) {
|
||||
}
|
||||
|
||||
void checkSetStorageCachePrefix(MutationRef m) {
|
||||
if (!m.param1.startsWith(storageCachePrefix))
|
||||
return;
|
||||
if (cacheInfo) {
|
||||
KeyRef k = m.param1.removePrefix(storageCachePrefix);
|
||||
|
||||
@ -196,17 +268,25 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
}
|
||||
if (!initialCommit)
|
||||
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
||||
} else if (m.param1.startsWith(cacheKeysPrefix)) {
|
||||
}
|
||||
|
||||
void checkSetCacheKeysPrefix(MutationRef m) {
|
||||
if (!m.param1.startsWith(cacheKeysPrefix) || toCommit == nullptr) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Create a private mutation for cache servers
|
||||
// This is done to make the cache servers aware of the cached key-ranges
|
||||
if (toCommit) {
|
||||
MutationRef privatized = m;
|
||||
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
|
||||
//TraceEvent(SevDebug, "SendingPrivateMutation", dbgid).detail("Original", m.toString()).detail("Privatized", privatized.toString());
|
||||
toCommit->addTag(cacheTag);
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
}
|
||||
} else if (m.param1.startsWith(configKeysPrefix) || m.param1 == coordinatorsKey) {
|
||||
|
||||
void checkSetConfigKeys(MutationRef m) {
|
||||
if (!m.param1.startsWith(configKeysPrefix) && m.param1 != coordinatorsKey) {
|
||||
return;
|
||||
}
|
||||
if (Optional<StringRef>(m.param2) !=
|
||||
txnStateStore->readValue(m.param1)
|
||||
.get()
|
||||
@ -226,7 +306,12 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
}
|
||||
if (!initialCommit)
|
||||
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
||||
} else if (m.param1.startsWith(serverListPrefix)) {
|
||||
}
|
||||
|
||||
void checkSetServerListPrefix(MutationRef m) {
|
||||
if (!m.param1.startsWith(serverListPrefix)) {
|
||||
return;
|
||||
}
|
||||
if (!initialCommit) {
|
||||
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
||||
if (storageCache) {
|
||||
@ -247,7 +332,12 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (m.param1.startsWith(tssMappingKeys.begin)) {
|
||||
}
|
||||
|
||||
void checkSetTSSMappingKeys(MutationRef m) {
|
||||
if (!m.param1.startsWith(tssMappingKeys.begin)) {
|
||||
return;
|
||||
}
|
||||
// Normally uses key backed map, so have to use same unpacking code here.
|
||||
UID ssId = Codec<UID>::unpack(Tuple::unpack(m.param1.removePrefix(tssMappingKeys.begin)));
|
||||
UID tssId = Codec<UID>::unpack(Tuple::unpack(m.param2));
|
||||
@ -269,18 +359,27 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
}
|
||||
}
|
||||
} else if (m.param1.startsWith(tssQuarantineKeys.begin)) {
|
||||
if (!initialCommit) {
|
||||
}
|
||||
|
||||
void checkSetTSSQuarantineKeys(MutationRef m) {
|
||||
if (!m.param1.startsWith(tssQuarantineKeys.begin) || initialCommit) {
|
||||
return;
|
||||
}
|
||||
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
||||
|
||||
if (toCommit) {
|
||||
if (!toCommit) {
|
||||
return;
|
||||
}
|
||||
UID tssId = decodeTssQuarantineKey(m.param1);
|
||||
Optional<Value> ssiV = txnStateStore->readValue(serverListKeyFor(tssId)).get();
|
||||
if (ssiV.present()) {
|
||||
if (!ssiV.present()) {
|
||||
return;
|
||||
}
|
||||
StorageServerInterface ssi = decodeServerListValue(ssiV.get());
|
||||
if (ssi.isTss()) {
|
||||
Optional<Value> tagV =
|
||||
txnStateStore->readValue(serverTagKeyFor(ssi.tssPairID.get())).get();
|
||||
if (!ssi.isTss()) {
|
||||
return;
|
||||
}
|
||||
Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssi.tssPairID.get())).get();
|
||||
if (tagV.present()) {
|
||||
MutationRef privatized = m;
|
||||
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
|
||||
@ -288,51 +387,52 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
}
|
||||
}
|
||||
|
||||
void checkSetApplyMutationsEndRange(MutationRef m) {
|
||||
if (!m.param1.startsWith(applyMutationsEndRange.begin)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (m.param1 == databaseLockedKey || m.param1 == metadataVersionKey ||
|
||||
m.param1 == mustContainSystemMutationsKey ||
|
||||
m.param1.startsWith(applyMutationsBeginRange.begin) ||
|
||||
m.param1.startsWith(applyMutationsAddPrefixRange.begin) ||
|
||||
m.param1.startsWith(applyMutationsRemovePrefixRange.begin) ||
|
||||
m.param1.startsWith(tagLocalityListPrefix) || m.param1.startsWith(serverTagHistoryPrefix) ||
|
||||
m.param1.startsWith(testOnlyTxnStateStorePrefixRange.begin)) {
|
||||
|
||||
if (!initialCommit)
|
||||
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
||||
} else if (m.param1.startsWith(applyMutationsEndRange.begin)) {
|
||||
if (!initialCommit)
|
||||
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
||||
if (uid_applyMutationsData != nullptr) {
|
||||
|
||||
if (uid_applyMutationsData == nullptr) {
|
||||
return;
|
||||
}
|
||||
|
||||
Key uid = m.param1.removePrefix(applyMutationsEndRange.begin);
|
||||
auto& p = (*uid_applyMutationsData)[uid];
|
||||
p.endVersion = BinaryReader::fromStringRef<Version>(m.param2, Unversioned());
|
||||
if (p.keyVersion == Reference<KeyRangeMap<Version>>())
|
||||
p.keyVersion = makeReference<KeyRangeMap<Version>>();
|
||||
if (!p.worker.isValid() || p.worker.isReady()) {
|
||||
auto addPrefixValue =
|
||||
txnStateStore->readValue(uid.withPrefix(applyMutationsAddPrefixRange.begin)).get();
|
||||
auto removePrefixValue =
|
||||
txnStateStore->readValue(uid.withPrefix(applyMutationsRemovePrefixRange.begin)).get();
|
||||
auto beginValue =
|
||||
txnStateStore->readValue(uid.withPrefix(applyMutationsBeginRange.begin)).get();
|
||||
if (p.worker.isValid() && !p.worker.isReady()) {
|
||||
return;
|
||||
}
|
||||
auto addPrefixValue = txnStateStore->readValue(uid.withPrefix(applyMutationsAddPrefixRange.begin)).get();
|
||||
auto removePrefixValue = txnStateStore->readValue(uid.withPrefix(applyMutationsRemovePrefixRange.begin)).get();
|
||||
auto beginValue = txnStateStore->readValue(uid.withPrefix(applyMutationsBeginRange.begin)).get();
|
||||
p.worker = applyMutations(
|
||||
cx,
|
||||
uid,
|
||||
addPrefixValue.present() ? addPrefixValue.get() : Key(),
|
||||
removePrefixValue.present() ? removePrefixValue.get() : Key(),
|
||||
beginValue.present() ? BinaryReader::fromStringRef<Version>(beginValue.get(), Unversioned())
|
||||
: 0,
|
||||
beginValue.present() ? BinaryReader::fromStringRef<Version>(beginValue.get(), Unversioned()) : 0,
|
||||
&p.endVersion,
|
||||
commit,
|
||||
commitVersion,
|
||||
p.keyVersion);
|
||||
}
|
||||
|
||||
void checkSetApplyMutationsKeyVersionMapRange(MutationRef m) {
|
||||
if (!m.param1.startsWith(applyMutationsKeyVersionMapRange.begin)) {
|
||||
return;
|
||||
}
|
||||
} else if (m.param1.startsWith(applyMutationsKeyVersionMapRange.begin)) {
|
||||
if (!initialCommit)
|
||||
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
||||
if (uid_applyMutationsData != nullptr) {
|
||||
|
||||
if (uid_applyMutationsData == nullptr) {
|
||||
return;
|
||||
}
|
||||
if (m.param1.size() >= applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID)) {
|
||||
Key uid = m.param1.substr(applyMutationsKeyVersionMapRange.begin.size(), sizeof(UID));
|
||||
Key k = m.param1.substr(applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID));
|
||||
@ -342,10 +442,16 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
p.keyVersion->rawInsert(k, BinaryReader::fromStringRef<Version>(m.param2, Unversioned()));
|
||||
}
|
||||
}
|
||||
} else if (m.param1.startsWith(logRangesRange.begin)) {
|
||||
|
||||
void checkSetLogRangesRange(MutationRef m) {
|
||||
if (!m.param1.startsWith(logRangesRange.begin)) {
|
||||
return;
|
||||
}
|
||||
if (!initialCommit)
|
||||
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
||||
if (vecBackupKeys) {
|
||||
if (!vecBackupKeys) {
|
||||
return;
|
||||
}
|
||||
Key logDestination;
|
||||
KeyRef logRangeBegin = logRangesDecodeKey(m.param1, nullptr);
|
||||
Key logRangeEnd = logRangesDecodeValue(m.param2, &logDestination);
|
||||
@ -358,15 +464,21 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
logRange->value().insert(logDestination);
|
||||
}
|
||||
|
||||
// Log the modification
|
||||
TraceEvent("LogRangeAdd")
|
||||
.detail("LogRanges", vecBackupKeys->size())
|
||||
.detail("MutationKey", m.param1)
|
||||
.detail("LogRangeBegin", logRangeBegin)
|
||||
.detail("LogRangeEnd", logRangeEnd);
|
||||
}
|
||||
} else if (m.param1.startsWith(globalKeysPrefix)) {
|
||||
if (toCommit) {
|
||||
|
||||
void checkSetGlobalKeys(MutationRef m) {
|
||||
if (!m.param1.startsWith(globalKeysPrefix)) {
|
||||
return;
|
||||
}
|
||||
if (!toCommit) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Notifies all servers that a Master's server epoch ends
|
||||
auto allServers = txnStateStore->readRange(serverTagKeys).get();
|
||||
std::set<Tag> allTags;
|
||||
@ -396,27 +508,50 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
toCommit->addTags(allTags);
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
}
|
||||
} else if (m.param1 == minRequiredCommitVersionKey) {
|
||||
|
||||
void checkSetOtherKeys(MutationRef m) {
|
||||
if (initialCommit)
|
||||
return;
|
||||
if (m.param1 == databaseLockedKey || m.param1 == metadataVersionKey ||
|
||||
m.param1 == mustContainSystemMutationsKey || m.param1.startsWith(applyMutationsBeginRange.begin) ||
|
||||
m.param1.startsWith(applyMutationsAddPrefixRange.begin) ||
|
||||
m.param1.startsWith(applyMutationsRemovePrefixRange.begin) || m.param1.startsWith(tagLocalityListPrefix) ||
|
||||
m.param1.startsWith(serverTagHistoryPrefix) ||
|
||||
m.param1.startsWith(testOnlyTxnStateStorePrefixRange.begin)) {
|
||||
|
||||
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
||||
}
|
||||
}
|
||||
|
||||
void checkSetMinRequiredCommitVersionKey(MutationRef m) {
|
||||
if (m.param1 != minRequiredCommitVersionKey) {
|
||||
return;
|
||||
}
|
||||
Version requested = BinaryReader::fromStringRef<Version>(m.param2, Unversioned());
|
||||
TraceEvent("MinRequiredCommitVersion", dbgid).detail("Min", requested).detail("Current", popVersion);
|
||||
if (!initialCommit)
|
||||
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
||||
confChange = true;
|
||||
TEST(true); // Recovering at a higher version.
|
||||
} else if (m.param1 == writeRecoveryKey) {
|
||||
}
|
||||
|
||||
void checkSetWriteRecoverKey(MutationRef m) {
|
||||
if (m.param1 != writeRecoveryKey) {
|
||||
return;
|
||||
}
|
||||
TraceEvent("WriteRecoveryKeySet", dbgid).log();
|
||||
if (!initialCommit)
|
||||
txnStateStore->set(KeyValueRef(m.param1, m.param2));
|
||||
TEST(true); // Snapshot created, setting writeRecoveryKey in txnStateStore
|
||||
}
|
||||
} else if (m.param2.size() > 1 && m.param2[0] == systemKeys.begin[0] && m.type == MutationRef::ClearRange) {
|
||||
KeyRangeRef range(m.param1, m.param2);
|
||||
|
||||
if (keyServersKeys.intersects(range)) {
|
||||
void checkClearKeyServerKeys(KeyRangeRef range) {
|
||||
if (!keyServersKeys.intersects(range)) {
|
||||
return;
|
||||
}
|
||||
KeyRangeRef r = range & keyServersKeys;
|
||||
if (keyInfo) {
|
||||
KeyRangeRef clearRange(r.begin.removePrefix(keyServersPrefix),
|
||||
r.end.removePrefix(keyServersPrefix));
|
||||
KeyRangeRef clearRange(r.begin.removePrefix(keyServersPrefix), r.end.removePrefix(keyServersPrefix));
|
||||
keyInfo->insert(clearRange,
|
||||
clearRange.begin == StringRef()
|
||||
? ServerCacheInfo()
|
||||
@ -426,7 +561,11 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
if (!initialCommit)
|
||||
txnStateStore->clear(r);
|
||||
}
|
||||
if (configKeys.intersects(range)) {
|
||||
|
||||
void checkClearConfigKeys(MutationRef m, KeyRangeRef range) {
|
||||
if (!configKeys.intersects(range)) {
|
||||
return;
|
||||
}
|
||||
if (!initialCommit)
|
||||
txnStateStore->clear(range & configKeys);
|
||||
if (!excludedServersKeys.contains(range) && !failedServersKeys.contains(range) &&
|
||||
@ -435,8 +574,14 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
confChange = true;
|
||||
}
|
||||
}
|
||||
if (serverListKeys.intersects(range)) {
|
||||
if (!initialCommit) {
|
||||
|
||||
void checkClearServerListKeys(KeyRangeRef range) {
|
||||
if (!serverListKeys.intersects(range)) {
|
||||
return;
|
||||
}
|
||||
if (initialCommit) {
|
||||
return;
|
||||
}
|
||||
KeyRangeRef rangeToClear = range & serverListKeys;
|
||||
if (rangeToClear.singleKeyRange()) {
|
||||
UID id = decodeServerListKey(rangeToClear.begin);
|
||||
@ -450,18 +595,28 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
txnStateStore->clear(rangeToClear);
|
||||
}
|
||||
}
|
||||
|
||||
void checkClearTagLocalityListKeys(KeyRangeRef range) {
|
||||
if (!tagLocalityListKeys.intersects(range)) {
|
||||
return;
|
||||
}
|
||||
if (initialCommit) {
|
||||
return;
|
||||
}
|
||||
if (tagLocalityListKeys.intersects(range)) {
|
||||
if (!initialCommit)
|
||||
txnStateStore->clear(range & tagLocalityListKeys);
|
||||
}
|
||||
if (serverTagKeys.intersects(range)) {
|
||||
|
||||
void checkClearServerTagKeys(MutationRef m, KeyRangeRef range) {
|
||||
if (!serverTagKeys.intersects(range)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Storage server removal always happens in a separate version from any prior writes (or any subsequent
|
||||
// reuse of the tag) so we can safely destroy the tag here without any concern about intra-batch
|
||||
// ordering
|
||||
if (logSystem && popVersion) {
|
||||
auto serverKeysCleared = txnStateStore->readRange(range & serverTagKeys)
|
||||
.get(); // read is expected to be immediately available
|
||||
auto serverKeysCleared =
|
||||
txnStateStore->readRange(range & serverTagKeys).get(); // read is expected to be immediately available
|
||||
for (auto& kv : serverKeysCleared) {
|
||||
Tag tag = decodeServerTagValue(kv.value);
|
||||
TraceEvent("ServerTagRemove")
|
||||
@ -491,8 +646,7 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
if (ssiV.present()) {
|
||||
StorageServerInterface ssi = decodeServerListValue(ssiV.get());
|
||||
if (ssi.isTss()) {
|
||||
Optional<Value> tagV =
|
||||
txnStateStore->readValue(serverTagKeyFor(ssi.tssPairID.get())).get();
|
||||
Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssi.tssPairID.get())).get();
|
||||
if (tagV.present()) {
|
||||
MutationRef privatized = m;
|
||||
privatized.param1 = maybeTssRange.begin.withPrefix(systemKeys.begin, arena);
|
||||
@ -507,6 +661,7 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!initialCommit) {
|
||||
KeyRangeRef clearRange = range & serverTagKeys;
|
||||
txnStateStore->clear(clearRange);
|
||||
@ -515,7 +670,11 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
}
|
||||
}
|
||||
}
|
||||
if (serverTagHistoryKeys.intersects(range)) {
|
||||
|
||||
void checkClearServerTagHistoryKeys(KeyRangeRef range) {
|
||||
if (!serverTagHistoryKeys.intersects(range)) {
|
||||
return;
|
||||
}
|
||||
// Once a tag has been removed from history we should pop it, since we no longer have a record of the
|
||||
// tag once it has been removed from history
|
||||
if (logSystem && popVersion) {
|
||||
@ -534,97 +693,33 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
if (!initialCommit)
|
||||
txnStateStore->clear(range & serverTagHistoryKeys);
|
||||
}
|
||||
if (tssMappingKeys.intersects(range)) {
|
||||
KeyRangeRef rangeToClear = range & tssMappingKeys;
|
||||
ASSERT(rangeToClear.singleKeyRange());
|
||||
|
||||
// Normally uses key backed map, so have to use same unpacking code here.
|
||||
UID ssId = Codec<UID>::unpack(Tuple::unpack(m.param1.removePrefix(tssMappingKeys.begin)));
|
||||
if (!initialCommit) {
|
||||
txnStateStore->clear(rangeToClear);
|
||||
void checkClearApplyMutationsEndRange(MutationRef m, KeyRangeRef range) {
|
||||
if (!range.intersects(applyMutationsEndRange)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (tssMapping) {
|
||||
tssMapping->erase(ssId);
|
||||
}
|
||||
|
||||
if (toCommit) {
|
||||
// send private mutation to SS to notify that it no longer has a tss pair
|
||||
Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssId)).get();
|
||||
if (tagV.present()) {
|
||||
MutationRef privatized = m;
|
||||
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
|
||||
toCommit->addTag(decodeServerTagValue(tagV.get()));
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (tssQuarantineKeys.intersects(range)) {
|
||||
if (!initialCommit) {
|
||||
KeyRangeRef rangeToClear = range & tssQuarantineKeys;
|
||||
ASSERT(rangeToClear.singleKeyRange());
|
||||
txnStateStore->clear(rangeToClear);
|
||||
|
||||
if (toCommit) {
|
||||
UID tssId = decodeTssQuarantineKey(m.param1);
|
||||
Optional<Value> ssiV = txnStateStore->readValue(serverListKeyFor(tssId)).get();
|
||||
if (ssiV.present()) {
|
||||
StorageServerInterface ssi = decodeServerListValue(ssiV.get());
|
||||
if (ssi.isTss()) {
|
||||
Optional<Value> tagV =
|
||||
txnStateStore->readValue(serverTagKeyFor(ssi.tssPairID.get())).get();
|
||||
if (tagV.present()) {
|
||||
MutationRef privatized = m;
|
||||
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
|
||||
toCommit->addTag(decodeServerTagValue(tagV.get()));
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (range.contains(coordinatorsKey)) {
|
||||
if (!initialCommit)
|
||||
txnStateStore->clear(singleKeyRange(coordinatorsKey));
|
||||
}
|
||||
if (range.contains(databaseLockedKey)) {
|
||||
if (!initialCommit)
|
||||
txnStateStore->clear(singleKeyRange(databaseLockedKey));
|
||||
}
|
||||
if (range.contains(metadataVersionKey)) {
|
||||
if (!initialCommit)
|
||||
txnStateStore->clear(singleKeyRange(metadataVersionKey));
|
||||
}
|
||||
if (range.contains(mustContainSystemMutationsKey)) {
|
||||
if (!initialCommit)
|
||||
txnStateStore->clear(singleKeyRange(mustContainSystemMutationsKey));
|
||||
}
|
||||
if (range.contains(writeRecoveryKey)) {
|
||||
if (!initialCommit)
|
||||
txnStateStore->clear(singleKeyRange(writeRecoveryKey));
|
||||
}
|
||||
if (range.intersects(testOnlyTxnStateStorePrefixRange)) {
|
||||
if (!initialCommit)
|
||||
txnStateStore->clear(range & testOnlyTxnStateStorePrefixRange);
|
||||
}
|
||||
if (range.intersects(applyMutationsEndRange)) {
|
||||
KeyRangeRef commonEndRange(range & applyMutationsEndRange);
|
||||
if (!initialCommit)
|
||||
txnStateStore->clear(commonEndRange);
|
||||
if (uid_applyMutationsData != nullptr) {
|
||||
uid_applyMutationsData->erase(
|
||||
uid_applyMutationsData->lower_bound(m.param1.substr(applyMutationsEndRange.begin.size())),
|
||||
m.param2 == applyMutationsEndRange.end ? uid_applyMutationsData->end()
|
||||
: uid_applyMutationsData->lower_bound(m.param2.substr(
|
||||
applyMutationsEndRange.begin.size())));
|
||||
m.param2 == applyMutationsEndRange.end
|
||||
? uid_applyMutationsData->end()
|
||||
: uid_applyMutationsData->lower_bound(m.param2.substr(applyMutationsEndRange.begin.size())));
|
||||
}
|
||||
}
|
||||
if (range.intersects(applyMutationsKeyVersionMapRange)) {
|
||||
|
||||
void checkClearApplyMutationKeyVersionMapRange(MutationRef m, KeyRangeRef range) {
|
||||
if (!range.intersects(applyMutationsKeyVersionMapRange)) {
|
||||
return;
|
||||
}
|
||||
KeyRangeRef commonApplyRange(range & applyMutationsKeyVersionMapRange);
|
||||
if (!initialCommit)
|
||||
txnStateStore->clear(commonApplyRange);
|
||||
if (uid_applyMutationsData != nullptr) {
|
||||
if (uid_applyMutationsData == nullptr) {
|
||||
return;
|
||||
}
|
||||
if (m.param1.size() >= applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID) &&
|
||||
m.param2.size() >= applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID)) {
|
||||
Key uid = m.param1.substr(applyMutationsKeyVersionMapRange.begin.size(), sizeof(UID));
|
||||
@ -634,14 +729,17 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
auto& p = (*uid_applyMutationsData)[uid];
|
||||
if (p.keyVersion == Reference<KeyRangeMap<Version>>())
|
||||
p.keyVersion = makeReference<KeyRangeMap<Version>>();
|
||||
p.keyVersion->rawErase(KeyRangeRef(
|
||||
m.param1.substr(applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID)),
|
||||
p.keyVersion->rawErase(
|
||||
KeyRangeRef(m.param1.substr(applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID)),
|
||||
m.param2.substr(applyMutationsKeyVersionMapRange.begin.size() + sizeof(UID))));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void checkClearLogRangesRange(KeyRangeRef range) {
|
||||
if (!range.intersects(logRangesRange)) {
|
||||
return;
|
||||
}
|
||||
if (range.intersects(logRangesRange)) {
|
||||
KeyRangeRef commonLogRange(range & logRangesRange);
|
||||
|
||||
TraceEvent("LogRangeClear")
|
||||
@ -710,18 +808,85 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
if (!initialCommit)
|
||||
txnStateStore->clear(commonLogRange);
|
||||
}
|
||||
|
||||
void checkClearTssMappingKeys(MutationRef m, KeyRangeRef range) {
|
||||
if (!tssMappingKeys.intersects(range)) {
|
||||
return;
|
||||
}
|
||||
KeyRangeRef rangeToClear = range & tssMappingKeys;
|
||||
ASSERT(rangeToClear.singleKeyRange());
|
||||
|
||||
// Normally uses key backed map, so have to use same unpacking code here.
|
||||
UID ssId = Codec<UID>::unpack(Tuple::unpack(m.param1.removePrefix(tssMappingKeys.begin)));
|
||||
if (!initialCommit) {
|
||||
txnStateStore->clear(rangeToClear);
|
||||
}
|
||||
|
||||
if (tssMapping) {
|
||||
tssMapping->erase(ssId);
|
||||
}
|
||||
|
||||
if (!toCommit) {
|
||||
return;
|
||||
}
|
||||
// send private mutation to SS to notify that it no longer has a tss pair
|
||||
if (Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssId)).get(); tagV.present()) {
|
||||
MutationRef privatized = m;
|
||||
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
|
||||
toCommit->addTag(decodeServerTagValue(tagV.get()));
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
}
|
||||
}
|
||||
|
||||
for (KeyRangeRef& range : tssServerListToRemove) {
|
||||
txnStateStore->clear(range);
|
||||
void checkClearTssQuarantineKeys(MutationRef m, KeyRangeRef range) {
|
||||
if (!tssQuarantineKeys.intersects(range) || initialCommit) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (auto& tssPair : tssMappingToAdd) {
|
||||
// read tss server list from txn state store and add it to tss mapping
|
||||
StorageServerInterface tssi =
|
||||
decodeServerListValue(txnStateStore->readValue(serverListKeyFor(tssPair.second)).get().get());
|
||||
(*tssMapping)[tssPair.first] = tssi;
|
||||
KeyRangeRef rangeToClear = range & tssQuarantineKeys;
|
||||
ASSERT(rangeToClear.singleKeyRange());
|
||||
txnStateStore->clear(rangeToClear);
|
||||
|
||||
if (!toCommit) {
|
||||
return;
|
||||
}
|
||||
UID tssId = decodeTssQuarantineKey(m.param1);
|
||||
if (Optional<Value> ssiV = txnStateStore->readValue(serverListKeyFor(tssId)).get(); ssiV.present()) {
|
||||
if (StorageServerInterface ssi = decodeServerListValue(ssiV.get()); ssi.isTss()) {
|
||||
if (Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssi.tssPairID.get())).get();
|
||||
tagV.present()) {
|
||||
|
||||
MutationRef privatized = m;
|
||||
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
|
||||
toCommit->addTag(decodeServerTagValue(tagV.get()));
|
||||
toCommit->writeTypedMessage(privatized);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void checkClearMiscRangeKeys(KeyRangeRef range) {
|
||||
if (initialCommit) {
|
||||
return;
|
||||
}
|
||||
if (range.contains(coordinatorsKey)) {
|
||||
txnStateStore->clear(singleKeyRange(coordinatorsKey));
|
||||
}
|
||||
if (range.contains(databaseLockedKey)) {
|
||||
txnStateStore->clear(singleKeyRange(databaseLockedKey));
|
||||
}
|
||||
if (range.contains(metadataVersionKey)) {
|
||||
txnStateStore->clear(singleKeyRange(metadataVersionKey));
|
||||
}
|
||||
if (range.contains(mustContainSystemMutationsKey)) {
|
||||
txnStateStore->clear(singleKeyRange(mustContainSystemMutationsKey));
|
||||
}
|
||||
if (range.contains(writeRecoveryKey)) {
|
||||
txnStateStore->clear(singleKeyRange(writeRecoveryKey));
|
||||
}
|
||||
if (range.intersects(testOnlyTxnStateStorePrefixRange)) {
|
||||
txnStateStore->clear(range & testOnlyTxnStateStorePrefixRange);
|
||||
}
|
||||
}
|
||||
|
||||
// If we accumulated private mutations for cached key-ranges, we also need to
|
||||
@ -732,7 +897,11 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
// TODO Note that, we are currently not handling the case when cached key-ranges move out
|
||||
// to different storage servers. This would require some checking when keys in the keyServersPrefix change.
|
||||
// For the first implementation, we could just send the entire map to every storage server. Revisit!
|
||||
if (cachedRangeInfo.size() != 0 && toCommit) {
|
||||
void tagStorageServersForCachedKeyRanges() {
|
||||
if (cachedRangeInfo.size() == 0 || !toCommit) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::map<KeyRef, MutationRef>::iterator itr;
|
||||
KeyRef keyBegin, keyEnd;
|
||||
vector<uint16_t> serverIndices;
|
||||
@ -787,8 +956,66 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
toCommit->writeTypedMessage(mutationEnd);
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
void apply() {
|
||||
for (auto const& m : mutations) {
|
||||
if (toCommit) {
|
||||
toCommit->addTransactionInfo(spanContext);
|
||||
}
|
||||
|
||||
if (m.type == MutationRef::SetValue && isSystemKey(m.param1)) {
|
||||
checkSetKeyServersPrefix(m);
|
||||
checkSetServerKeysPrefix(m);
|
||||
checkSetServerTagsPrefix(m);
|
||||
checkSetStorageCachePrefix(m);
|
||||
checkSetCacheKeysPrefix(m);
|
||||
checkSetConfigKeys(m);
|
||||
checkSetServerListPrefix(m);
|
||||
checkSetTSSMappingKeys(m);
|
||||
checkSetTSSQuarantineKeys(m);
|
||||
checkSetApplyMutationsEndRange(m);
|
||||
checkSetApplyMutationsKeyVersionMapRange(m);
|
||||
checkSetLogRangesRange(m);
|
||||
checkSetGlobalKeys(m);
|
||||
checkSetWriteRecoverKey(m);
|
||||
checkSetMinRequiredCommitVersionKey(m);
|
||||
checkSetOtherKeys(m);
|
||||
} else if (m.type == MutationRef::ClearRange && isSystemKey(m.param2)) {
|
||||
KeyRangeRef range(m.param1, m.param2);
|
||||
|
||||
checkClearKeyServerKeys(range);
|
||||
checkClearConfigKeys(m, range);
|
||||
checkClearServerListKeys(range);
|
||||
checkClearTagLocalityListKeys(range);
|
||||
checkClearServerTagKeys(m, range);
|
||||
checkClearServerTagHistoryKeys(range);
|
||||
checkClearApplyMutationsEndRange(m, range);
|
||||
checkClearApplyMutationKeyVersionMapRange(m, range);
|
||||
checkClearLogRangesRange(range);
|
||||
checkClearTssMappingKeys(m, range);
|
||||
checkClearTssQuarantineKeys(m, range);
|
||||
checkClearMiscRangeKeys(range);
|
||||
}
|
||||
}
|
||||
|
||||
for (KeyRangeRef& range : tssServerListToRemove) {
|
||||
txnStateStore->clear(range);
|
||||
}
|
||||
|
||||
for (auto& tssPair : tssMappingToAdd) {
|
||||
// read tss server list from txn state store and add it to tss mapping
|
||||
StorageServerInterface tssi =
|
||||
decodeServerListValue(txnStateStore->readValue(serverListKeyFor(tssPair.second)).get().get());
|
||||
(*tssMapping)[tssPair.first] = tssi;
|
||||
}
|
||||
|
||||
tagStorageServersForCachedKeyRanges();
|
||||
}
|
||||
};
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
void applyMetadataMutations(SpanID const& spanContext,
|
||||
ProxyCommitData& proxyCommitData,
|
||||
Arena& arena,
|
||||
@ -799,31 +1026,9 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
Version popVersion,
|
||||
bool initialCommit) {
|
||||
|
||||
std::map<Key, ApplyMutationsData>* uid_applyMutationsData = nullptr;
|
||||
if (proxyCommitData.firstProxy) {
|
||||
uid_applyMutationsData = &proxyCommitData.uid_applyMutationsData;
|
||||
}
|
||||
|
||||
applyMetadataMutations(spanContext,
|
||||
proxyCommitData.dbgid,
|
||||
arena,
|
||||
mutations,
|
||||
proxyCommitData.txnStateStore,
|
||||
toCommit,
|
||||
confChange,
|
||||
logSystem,
|
||||
popVersion,
|
||||
&proxyCommitData.vecBackupKeys,
|
||||
&proxyCommitData.keyInfo,
|
||||
&proxyCommitData.cacheInfo,
|
||||
uid_applyMutationsData,
|
||||
proxyCommitData.commit,
|
||||
proxyCommitData.cx,
|
||||
&proxyCommitData.committedVersion,
|
||||
&proxyCommitData.storageCache,
|
||||
&proxyCommitData.tag_popped,
|
||||
&proxyCommitData.tssMapping,
|
||||
initialCommit);
|
||||
ApplyMetadataMutationsImpl(
|
||||
spanContext, arena, mutations, proxyCommitData, logSystem, toCommit, confChange, popVersion, initialCommit)
|
||||
.apply();
|
||||
}
|
||||
|
||||
void applyMetadataMutations(SpanID const& spanContext,
|
||||
@ -831,27 +1036,5 @@ void applyMetadataMutations(SpanID const& spanContext,
|
||||
Arena& arena,
|
||||
const VectorRef<MutationRef>& mutations,
|
||||
IKeyValueStore* txnStateStore) {
|
||||
|
||||
bool confChange; // Dummy variable, not used.
|
||||
|
||||
applyMetadataMutations(spanContext,
|
||||
dbgid,
|
||||
arena,
|
||||
mutations,
|
||||
txnStateStore,
|
||||
/* toCommit= */ nullptr,
|
||||
confChange,
|
||||
Reference<ILogSystem>(),
|
||||
/* popVersion= */ 0,
|
||||
/* vecBackupKeys= */ nullptr,
|
||||
/* keyInfo= */ nullptr,
|
||||
/* cacheInfo= */ nullptr,
|
||||
/* uid_applyMutationsData= */ nullptr,
|
||||
RequestStream<CommitTransactionRequest>(),
|
||||
Database(),
|
||||
/* commitVersion= */ nullptr,
|
||||
/* storageCache= */ nullptr,
|
||||
/* tag_popped= */ nullptr,
|
||||
/* tssMapping= */ nullptr,
|
||||
/* initialCommit= */ false);
|
||||
ApplyMetadataMutationsImpl(spanContext, dbgid, arena, mutations, txnStateStore).apply();
|
||||
}
|
@ -22,22 +22,27 @@
|
||||
#define FDBSERVER_APPLYMETADATAMUTATION_H
|
||||
#pragma once
|
||||
|
||||
#include "fdbclient/MutationList.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/BackupAgent.actor.h"
|
||||
#include "fdbclient/MutationList.h"
|
||||
#include "fdbclient/Notified.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbserver/IKeyValueStore.h"
|
||||
#include "fdbserver/LogSystem.h"
|
||||
#include "fdbserver/LogProtocolMessage.h"
|
||||
#include "fdbserver/LogSystem.h"
|
||||
#include "fdbserver/ProxyCommitData.actor.h"
|
||||
|
||||
inline bool isMetadataMutation(MutationRef const& m) {
|
||||
// FIXME: This is conservative - not everything in system keyspace is necessarily processed by
|
||||
// applyMetadataMutations
|
||||
return (m.type == MutationRef::SetValue && m.param1.size() && m.param1[0] == systemKeys.begin[0] &&
|
||||
!m.param1.startsWith(nonMetadataSystemKeys.begin)) ||
|
||||
(m.type == MutationRef::ClearRange && m.param2.size() > 1 && m.param2[0] == systemKeys.begin[0] &&
|
||||
!nonMetadataSystemKeys.contains(KeyRangeRef(m.param1, m.param2)));
|
||||
if (m.type == MutationRef::SetValue) {
|
||||
return m.param1.size() && m.param1[0] == systemKeys.begin[0] &&
|
||||
!m.param1.startsWith(nonMetadataSystemKeys.begin);
|
||||
} else if (m.type == MutationRef::ClearRange) {
|
||||
return m.param2.size() > 1 && m.param2[0] == systemKeys.begin[0] &&
|
||||
!nonMetadataSystemKeys.contains(KeyRangeRef(m.param1, m.param2));
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
Reference<StorageInfo> getStorageInfo(UID id,
|
||||
|
Loading…
x
Reference in New Issue
Block a user