mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-15 10:22:20 +08:00
Move role UIDs for MutationTracking TraceEvents from various inconsistent detail fields into the TraceEvent UID field.
This commit is contained in:
parent
e5e8a56b66
commit
54c7036eaf
@ -953,8 +953,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
|
|||||||
pProxyCommitData->singleKeyMutationEvent->log();
|
pProxyCommitData->singleKeyMutationEvent->log();
|
||||||
}
|
}
|
||||||
|
|
||||||
DEBUG_MUTATION("ProxyCommit", self->commitVersion, m)
|
DEBUG_MUTATION("ProxyCommit", self->commitVersion, m, pProxyCommitData->dbgid)
|
||||||
.detail("Dbgid", pProxyCommitData->dbgid)
|
|
||||||
.detail("To", tags);
|
.detail("To", tags);
|
||||||
self->toCommit.addTags(tags);
|
self->toCommit.addTags(tags);
|
||||||
if (pProxyCommitData->cacheInfo[m.param1]) {
|
if (pProxyCommitData->cacheInfo[m.param1]) {
|
||||||
@ -968,8 +967,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
|
|||||||
++firstRange;
|
++firstRange;
|
||||||
if (firstRange == ranges.end()) {
|
if (firstRange == ranges.end()) {
|
||||||
// Fast path
|
// Fast path
|
||||||
DEBUG_MUTATION("ProxyCommit", self->commitVersion, m)
|
DEBUG_MUTATION("ProxyCommit", self->commitVersion, m, pProxyCommitData->dbgid)
|
||||||
.detail("Dbgid", pProxyCommitData->dbgid)
|
|
||||||
.detail("To", ranges.begin().value().tags);
|
.detail("To", ranges.begin().value().tags);
|
||||||
|
|
||||||
ranges.begin().value().populateTags();
|
ranges.begin().value().populateTags();
|
||||||
@ -1005,8 +1003,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
|
|||||||
trCost->get().clearIdxCosts.pop_front();
|
trCost->get().clearIdxCosts.pop_front();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
DEBUG_MUTATION("ProxyCommit", self->commitVersion, m)
|
DEBUG_MUTATION("ProxyCommit", self->commitVersion, m, pProxyCommitData->dbgid)
|
||||||
.detail("Dbgid", pProxyCommitData->dbgid)
|
|
||||||
.detail("To", allSources);
|
.detail("To", allSources);
|
||||||
|
|
||||||
self->toCommit.addTags(allSources);
|
self->toCommit.addTags(allSources);
|
||||||
@ -1540,7 +1537,7 @@ ACTOR static Future<Void> rejoinServer(CommitProxyInterface proxy, ProxyCommitDa
|
|||||||
// We can't respond to these requests until we have valid txnStateStore
|
// We can't respond to these requests until we have valid txnStateStore
|
||||||
wait(commitData->validState.getFuture());
|
wait(commitData->validState.getFuture());
|
||||||
|
|
||||||
TraceEvent("ProxyReadyForReads", proxy.id());
|
TraceEvent("ProxyReadyForReads", proxy.id()).log();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
GetStorageServerRejoinInfoRequest req = waitNext(proxy.getStorageServerRejoinInfo.getFuture());
|
GetStorageServerRejoinInfoRequest req = waitNext(proxy.getStorageServerRejoinInfo.getFuture());
|
||||||
|
@ -111,8 +111,7 @@ void ILogSystem::ServerPeekCursor::nextMessage() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
messageAndTags.loadFromArena(&rd, &messageVersion.sub);
|
messageAndTags.loadFromArena(&rd, &messageVersion.sub);
|
||||||
DEBUG_TAGS_AND_MESSAGE("ServerPeekCursor", messageVersion.version, messageAndTags.getRawMessage())
|
DEBUG_TAGS_AND_MESSAGE("ServerPeekCursor", messageVersion.version, messageAndTags.getRawMessage(), this->randomID);
|
||||||
.detail("CursorID", this->randomID);
|
|
||||||
// Rewind and consume the header so that reader() starts from the message.
|
// Rewind and consume the header so that reader() starts from the message.
|
||||||
rd.rewind();
|
rd.rewind();
|
||||||
rd.readBytes(messageAndTags.getHeaderSize());
|
rd.readBytes(messageAndTags.getHeaderSize());
|
||||||
|
@ -31,32 +31,32 @@
|
|||||||
// Track any of these keys in simulation via enabling MUTATION_TRACKING_ENABLED and setting the keys here.
|
// Track any of these keys in simulation via enabling MUTATION_TRACKING_ENABLED and setting the keys here.
|
||||||
std::vector<KeyRef> debugKeys = { ""_sr, "\xff\xff"_sr };
|
std::vector<KeyRef> debugKeys = { ""_sr, "\xff\xff"_sr };
|
||||||
|
|
||||||
TraceEvent debugMutationEnabled(const char* context, Version version, MutationRef const& mutation) {
|
TraceEvent debugMutationEnabled(const char* context, Version version, MutationRef const& mutation, UID id) {
|
||||||
if (std::any_of(debugKeys.begin(), debugKeys.end(), [&mutation](const KeyRef& debugKey) {
|
if (std::any_of(debugKeys.begin(), debugKeys.end(), [&mutation](const KeyRef& debugKey) {
|
||||||
return ((mutation.type == mutation.ClearRange || mutation.type == mutation.DebugKeyRange) &&
|
return ((mutation.type == mutation.ClearRange || mutation.type == mutation.DebugKeyRange) &&
|
||||||
mutation.param1 <= debugKey && mutation.param2 > debugKey) ||
|
mutation.param1 <= debugKey && mutation.param2 > debugKey) ||
|
||||||
mutation.param1 == debugKey;
|
mutation.param1 == debugKey;
|
||||||
})) {
|
})) {
|
||||||
|
|
||||||
TraceEvent event("MutationTracking");
|
TraceEvent event("MutationTracking", id);
|
||||||
event.detail("At", context).detail("Version", version).detail("Mutation", mutation);
|
event.detail("At", context).detail("Version", version).detail("Mutation", mutation);
|
||||||
return event;
|
return event;
|
||||||
}
|
}
|
||||||
return TraceEvent();
|
return TraceEvent();
|
||||||
}
|
}
|
||||||
|
|
||||||
TraceEvent debugKeyRangeEnabled(const char* context, Version version, KeyRangeRef const& keys) {
|
TraceEvent debugKeyRangeEnabled(const char* context, Version version, KeyRangeRef const& keys, UID id) {
|
||||||
if (std::any_of(
|
if (std::any_of(
|
||||||
debugKeys.begin(), debugKeys.end(), [&keys](const KeyRef& debugKey) { return keys.contains(debugKey); })) {
|
debugKeys.begin(), debugKeys.end(), [&keys](const KeyRef& debugKey) { return keys.contains(debugKey); })) {
|
||||||
|
|
||||||
TraceEvent event("MutationTracking");
|
TraceEvent event("MutationTracking", id);
|
||||||
event.detail("At", context).detail("Version", version).detail("Mutation", MutationRef(MutationRef::DebugKeyRange, keys.begin, keys.end));
|
event.detail("At", context).detail("Version", version).detail("Mutation", MutationRef(MutationRef::DebugKeyRange, keys.begin, keys.end));
|
||||||
return event;
|
return event;
|
||||||
}
|
}
|
||||||
return TraceEvent();
|
return TraceEvent();
|
||||||
}
|
}
|
||||||
|
|
||||||
TraceEvent debugTagsAndMessageEnabled(const char* context, Version version, StringRef commitBlob) {
|
TraceEvent debugTagsAndMessageEnabled(const char* context, Version version, StringRef commitBlob, UID id) {
|
||||||
BinaryReader rdr(commitBlob, AssumeVersion(g_network->protocolVersion()));
|
BinaryReader rdr(commitBlob, AssumeVersion(g_network->protocolVersion()));
|
||||||
while (!rdr.empty()) {
|
while (!rdr.empty()) {
|
||||||
if (*(int32_t*)rdr.peekBytes(4) == VERSION_HEADER) {
|
if (*(int32_t*)rdr.peekBytes(4) == VERSION_HEADER) {
|
||||||
@ -86,7 +86,7 @@ TraceEvent debugTagsAndMessageEnabled(const char* context, Version version, Stri
|
|||||||
MutationRef m;
|
MutationRef m;
|
||||||
BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion()));
|
BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion()));
|
||||||
br >> m;
|
br >> m;
|
||||||
TraceEvent event = debugMutation(context, version, m);
|
TraceEvent event = debugMutation(context, version, m, id);
|
||||||
if (event.isEnabled()) {
|
if (event.isEnabled()) {
|
||||||
event.detail("MessageTags", msg.tags);
|
event.detail("MessageTags", msg.tags);
|
||||||
return event;
|
return event;
|
||||||
@ -97,23 +97,23 @@ TraceEvent debugTagsAndMessageEnabled(const char* context, Version version, Stri
|
|||||||
}
|
}
|
||||||
|
|
||||||
#if MUTATION_TRACKING_ENABLED
|
#if MUTATION_TRACKING_ENABLED
|
||||||
TraceEvent debugMutation(const char* context, Version version, MutationRef const& mutation) {
|
TraceEvent debugMutation(const char* context, Version version, MutationRef const& mutation, UID id) {
|
||||||
return debugMutationEnabled(context, version, mutation);
|
return debugMutationEnabled(context, version, mutation, id);
|
||||||
}
|
}
|
||||||
TraceEvent debugKeyRange(const char* context, Version version, KeyRangeRef const& keys) {
|
TraceEvent debugKeyRange(const char* context, Version version, KeyRangeRef const& keys, UID id) {
|
||||||
return debugKeyRangeEnabled(context, version, keys);
|
return debugKeyRangeEnabled(context, version, keys, id);
|
||||||
}
|
}
|
||||||
TraceEvent debugTagsAndMessage(const char* context, Version version, StringRef commitBlob) {
|
TraceEvent debugTagsAndMessage(const char* context, Version version, StringRef commitBlob, UID id) {
|
||||||
return debugTagsAndMessageEnabled(context, version, commitBlob);
|
return debugTagsAndMessageEnabled(context, version, commitBlob, id);
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
TraceEvent debugMutation(const char* context, Version version, MutationRef const& mutation) {
|
TraceEvent debugMutation(const char* context, Version version, MutationRef const& mutation, UID id) {
|
||||||
return TraceEvent();
|
return TraceEvent();
|
||||||
}
|
}
|
||||||
TraceEvent debugKeyRange(const char* context, Version version, KeyRangeRef const& keys) {
|
TraceEvent debugKeyRange(const char* context, Version version, KeyRangeRef const& keys, UID id) {
|
||||||
return TraceEvent();
|
return TraceEvent();
|
||||||
}
|
}
|
||||||
TraceEvent debugTagsAndMessage(const char* context, Version version, StringRef commitBlob) {
|
TraceEvent debugTagsAndMessage(const char* context, Version version, StringRef commitBlob, UID id) {
|
||||||
return TraceEvent();
|
return TraceEvent();
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
@ -28,19 +28,18 @@
|
|||||||
#define MUTATION_TRACKING_ENABLED 0
|
#define MUTATION_TRACKING_ENABLED 0
|
||||||
// The keys to track are defined in the .cpp file to limit recompilation.
|
// The keys to track are defined in the .cpp file to limit recompilation.
|
||||||
|
|
||||||
#define DEBUG_MUTATION(context, version, mutation) MUTATION_TRACKING_ENABLED&& debugMutation(context, version, mutation)
|
#define DEBUG_MUTATION(...) MUTATION_TRACKING_ENABLED && debugMutation(__VA_ARGS__)
|
||||||
TraceEvent debugMutation(const char* context, Version version, MutationRef const& mutation);
|
TraceEvent debugMutation(const char* context, Version version, MutationRef const& mutation, UID id = UID());
|
||||||
|
|
||||||
// debugKeyRange and debugTagsAndMessage only log the *first* occurrence of a key in their range/commit.
|
// debugKeyRange and debugTagsAndMessage only log the *first* occurrence of a key in their range/commit.
|
||||||
// TODO: Create a TraceEventGroup that forwards all calls to each element of a vector<TraceEvent>,
|
// TODO: Create a TraceEventGroup that forwards all calls to each element of a vector<TraceEvent>,
|
||||||
// to allow "multiple" TraceEvents to be returned.
|
// to allow "multiple" TraceEvents to be returned.
|
||||||
|
|
||||||
#define DEBUG_KEY_RANGE(context, version, keys) MUTATION_TRACKING_ENABLED&& debugKeyRange(context, version, keys)
|
#define DEBUG_KEY_RANGE(...) MUTATION_TRACKING_ENABLED && debugKeyRange(__VA_ARGS__)
|
||||||
TraceEvent debugKeyRange(const char* context, Version version, KeyRangeRef const& keys);
|
TraceEvent debugKeyRange(const char* context, Version version, KeyRangeRef const& keys, UID id = UID());
|
||||||
|
|
||||||
#define DEBUG_TAGS_AND_MESSAGE(context, version, commitBlob) \
|
#define DEBUG_TAGS_AND_MESSAGE(...) MUTATION_TRACKING_ENABLED && debugTagsAndMessage(__VA_ARGS__)
|
||||||
MUTATION_TRACKING_ENABLED&& debugTagsAndMessage(context, version, commitBlob)
|
TraceEvent debugTagsAndMessage(const char* context, Version version, StringRef commitBlob, UID id = UID());
|
||||||
TraceEvent debugTagsAndMessage(const char* context, Version version, StringRef commitBlob);
|
|
||||||
|
|
||||||
// TODO: Version Tracking. If the bug is in handling a version rather than a key, then it'd be good to be able to log
|
// TODO: Version Tracking. If the bug is in handling a version rather than a key, then it'd be good to be able to log
|
||||||
// each time that version is handled within simulation. A similar set of functions should be implemented.
|
// each time that version is handled within simulation. A similar set of functions should be implemented.
|
||||||
|
@ -1419,9 +1419,8 @@ void commitMessages(TLogData* self,
|
|||||||
block.reserve(block.arena(), std::max<int64_t>(SERVER_KNOBS->TLOG_MESSAGE_BLOCK_BYTES, msgSize));
|
block.reserve(block.arena(), std::max<int64_t>(SERVER_KNOBS->TLOG_MESSAGE_BLOCK_BYTES, msgSize));
|
||||||
}
|
}
|
||||||
|
|
||||||
DEBUG_TAGS_AND_MESSAGE("TLogCommitMessages", version, msg.getRawMessage())
|
DEBUG_TAGS_AND_MESSAGE("TLogCommitMessages", version, msg.getRawMessage(), logData->logId)
|
||||||
.detail("UID", self->dbgid)
|
.detail("DebugID", self->dbgid);
|
||||||
.detail("LogId", logData->logId);
|
|
||||||
block.append(block.arena(), msg.message.begin(), msg.message.size());
|
block.append(block.arena(), msg.message.begin(), msg.message.size());
|
||||||
for (auto tag : msg.tags) {
|
for (auto tag : msg.tags) {
|
||||||
if (logData->locality == tagLocalitySatellite) {
|
if (logData->locality == tagLocalitySatellite) {
|
||||||
@ -1552,8 +1551,7 @@ void peekMessagesFromMemory(Reference<LogData> self,
|
|||||||
messages << it->second.toStringRef();
|
messages << it->second.toStringRef();
|
||||||
void* data = messages.getData();
|
void* data = messages.getData();
|
||||||
DEBUG_TAGS_AND_MESSAGE(
|
DEBUG_TAGS_AND_MESSAGE(
|
||||||
"TLogPeek", currentVersion, StringRef((uint8_t*)data + offset, messages.getLength() - offset))
|
"TLogPeek", currentVersion, StringRef((uint8_t*)data + offset, messages.getLength() - offset), self->logId)
|
||||||
.detail("LogId", self->logId)
|
|
||||||
.detail("PeekTag", req.tag);
|
.detail("PeekTag", req.tag);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1823,9 +1821,8 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen
|
|||||||
wait(parseMessagesForTag(entry.messages, req.tag, logData->logRouterTags));
|
wait(parseMessagesForTag(entry.messages, req.tag, logData->logRouterTags));
|
||||||
for (const StringRef& msg : rawMessages) {
|
for (const StringRef& msg : rawMessages) {
|
||||||
messages.serializeBytes(msg);
|
messages.serializeBytes(msg);
|
||||||
DEBUG_TAGS_AND_MESSAGE("TLogPeekFromDisk", entry.version, msg)
|
DEBUG_TAGS_AND_MESSAGE("TLogPeekFromDisk", entry.version, msg, logData->logId)
|
||||||
.detail("UID", self->dbgid)
|
.detail("DebugID", self->dbgid)
|
||||||
.detail("LogId", logData->logId)
|
|
||||||
.detail("PeekTag", req.tag);
|
.detail("PeekTag", req.tag);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1267,14 +1267,16 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
|
|||||||
|
|
||||||
DEBUG_MUTATION("ShardGetValue",
|
DEBUG_MUTATION("ShardGetValue",
|
||||||
version,
|
version,
|
||||||
MutationRef(MutationRef::DebugKey, req.key, v.present() ? v.get() : LiteralStringRef("<null>")));
|
MutationRef(MutationRef::DebugKey, req.key, v.present() ? v.get() : LiteralStringRef("<null>")),
|
||||||
|
data->thisServerID);
|
||||||
DEBUG_MUTATION("ShardGetPath",
|
DEBUG_MUTATION("ShardGetPath",
|
||||||
version,
|
version,
|
||||||
MutationRef(MutationRef::DebugKey,
|
MutationRef(MutationRef::DebugKey,
|
||||||
req.key,
|
req.key,
|
||||||
path == 0 ? LiteralStringRef("0")
|
path == 0 ? LiteralStringRef("0")
|
||||||
: path == 1 ? LiteralStringRef("1")
|
: path == 1 ? LiteralStringRef("1")
|
||||||
: LiteralStringRef("2")));
|
: LiteralStringRef("2")),
|
||||||
|
data->thisServerID);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
StorageMetrics m;
|
StorageMetrics m;
|
||||||
@ -1382,7 +1384,8 @@ ACTOR Future<Version> watchWaitForValueChange(StorageServer* data, SpanID parent
|
|||||||
latest,
|
latest,
|
||||||
MutationRef(MutationRef::DebugKey,
|
MutationRef(MutationRef::DebugKey,
|
||||||
metadata->key,
|
metadata->key,
|
||||||
reply.value.present() ? StringRef(reply.value.get()) : LiteralStringRef("<null>")));
|
reply.value.present() ? StringRef(reply.value.get()) : LiteralStringRef("<null>")),
|
||||||
|
data->thisServerID);
|
||||||
|
|
||||||
if (metadata->debugID.present())
|
if (metadata->debugID.present())
|
||||||
g_traceBatch.addEvent(
|
g_traceBatch.addEvent(
|
||||||
@ -2835,7 +2838,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||||||
wait(data->coreStarted.getFuture() && delay(0));
|
wait(data->coreStarted.getFuture() && delay(0));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
DEBUG_KEY_RANGE("fetchKeysBegin", data->version.get(), shard->keys);
|
DEBUG_KEY_RANGE("fetchKeysBegin", data->version.get(), shard->keys, data->thisServerID);
|
||||||
|
|
||||||
TraceEvent(SevDebug, interval.begin(), data->thisServerID)
|
TraceEvent(SevDebug, interval.begin(), data->thisServerID)
|
||||||
.detail("KeyBegin", shard->keys.begin)
|
.detail("KeyBegin", shard->keys.begin)
|
||||||
@ -2926,10 +2929,12 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||||||
.detail("Last", this_block.size() ? this_block.end()[-1].key : std::string())
|
.detail("Last", this_block.size() ? this_block.end()[-1].key : std::string())
|
||||||
.detail("Version", fetchVersion)
|
.detail("Version", fetchVersion)
|
||||||
.detail("More", this_block.more);
|
.detail("More", this_block.more);
|
||||||
DEBUG_KEY_RANGE("fetchRange", fetchVersion, keys);
|
|
||||||
|
DEBUG_KEY_RANGE("fetchRange", fetchVersion, keys, data->thisServerID);
|
||||||
if(MUTATION_TRACKING_ENABLED) {
|
if(MUTATION_TRACKING_ENABLED) {
|
||||||
for (auto k = this_block.begin(); k != this_block.end(); ++k)
|
for (auto k = this_block.begin(); k != this_block.end(); ++k) {
|
||||||
DEBUG_MUTATION("fetch", fetchVersion, MutationRef(MutationRef::SetValue, k->key, k->value));
|
DEBUG_MUTATION("fetch", fetchVersion, MutationRef(MutationRef::SetValue, k->key, k->value), data->thisServerID);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
metricReporter.addFetchedBytes(expectedBlockSize, this_block.size());
|
metricReporter.addFetchedBytes(expectedBlockSize, this_block.size());
|
||||||
@ -3094,8 +3099,9 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
|
|||||||
ASSERT(b->version >= checkv);
|
ASSERT(b->version >= checkv);
|
||||||
checkv = b->version;
|
checkv = b->version;
|
||||||
if(MUTATION_TRACKING_ENABLED) {
|
if(MUTATION_TRACKING_ENABLED) {
|
||||||
for (auto& m : b->mutations)
|
for (auto& m : b->mutations) {
|
||||||
DEBUG_MUTATION("fetchKeysFinalCommitInject", batch->changes[0].version, m);
|
DEBUG_MUTATION("fetchKeysFinalCommitInject", batch->changes[0].version, m, data->thisServerID);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3222,7 +3228,7 @@ void changeServerKeys(StorageServer* data,
|
|||||||
validate(data);
|
validate(data);
|
||||||
|
|
||||||
// TODO(alexmiller): Figure out how to selectively enable spammy data distribution events.
|
// TODO(alexmiller): Figure out how to selectively enable spammy data distribution events.
|
||||||
DEBUG_KEY_RANGE(nowAssigned ? "KeysAssigned" : "KeysUnassigned", version, keys);
|
DEBUG_KEY_RANGE(nowAssigned ? "KeysAssigned" : "KeysUnassigned", version, keys, data->thisServerID);
|
||||||
|
|
||||||
bool isDifferent = false;
|
bool isDifferent = false;
|
||||||
auto existingShards = data->shards.intersectingRanges(keys);
|
auto existingShards = data->shards.intersectingRanges(keys);
|
||||||
@ -3330,7 +3336,7 @@ void changeServerKeys(StorageServer* data,
|
|||||||
|
|
||||||
void rollback(StorageServer* data, Version rollbackVersion, Version nextVersion) {
|
void rollback(StorageServer* data, Version rollbackVersion, Version nextVersion) {
|
||||||
TEST(true); // call to shard rollback
|
TEST(true); // call to shard rollback
|
||||||
DEBUG_KEY_RANGE("Rollback", rollbackVersion, allKeys);
|
DEBUG_KEY_RANGE("Rollback", rollbackVersion, allKeys, data->thisServerID);
|
||||||
|
|
||||||
// We used to do a complicated dance to roll back in MVCC history. It's much simpler, and more testable,
|
// We used to do a complicated dance to roll back in MVCC history. It's much simpler, and more testable,
|
||||||
// to simply restart the storage server actor and restore from the persistent disk state, and then roll
|
// to simply restart the storage server actor and restore from the persistent disk state, and then roll
|
||||||
@ -3355,8 +3361,7 @@ void StorageServer::addMutation(Version version,
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
expanded = addMutationToMutationLog(mLog, expanded);
|
expanded = addMutationToMutationLog(mLog, expanded);
|
||||||
DEBUG_MUTATION("applyMutation", version, expanded)
|
DEBUG_MUTATION("applyMutation", version, expanded, thisServerID)
|
||||||
.detail("UID", thisServerID)
|
|
||||||
.detail("ShardBegin", shard.begin)
|
.detail("ShardBegin", shard.begin)
|
||||||
.detail("ShardEnd", shard.end);
|
.detail("ShardEnd", shard.end);
|
||||||
applyMutation(this, expanded, mLog.arena(), mutableData());
|
applyMutation(this, expanded, mLog.arena(), mutableData());
|
||||||
@ -3427,7 +3432,7 @@ public:
|
|||||||
} else {
|
} else {
|
||||||
// FIXME: enable when DEBUG_MUTATION is active
|
// FIXME: enable when DEBUG_MUTATION is active
|
||||||
// for(auto m = changes[c].mutations.begin(); m; ++m) {
|
// for(auto m = changes[c].mutations.begin(); m; ++m) {
|
||||||
// DEBUG_MUTATION("SSUpdateMutation", changes[c].version, *m);
|
// DEBUG_MUTATION("SSUpdateMutation", changes[c].version, *m, data->thisServerID);
|
||||||
//}
|
//}
|
||||||
|
|
||||||
splitMutation(data, data->shards, m, ver);
|
splitMutation(data, data->shards, m, ver);
|
||||||
@ -3834,11 +3839,13 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
|
|||||||
.suppressFor(10.0)
|
.suppressFor(10.0)
|
||||||
.detail("Version", cloneCursor2->version().toString());
|
.detail("Version", cloneCursor2->version().toString());
|
||||||
} else if (ver != invalidVersion) { // This change belongs to a version < minVersion
|
} else if (ver != invalidVersion) { // This change belongs to a version < minVersion
|
||||||
DEBUG_MUTATION("SSPeek", ver, msg).detail("ServerID", data->thisServerID);
|
DEBUG_MUTATION("SSPeek", ver, msg, data->thisServerID);
|
||||||
if (ver == 1) {
|
if (ver == 1) {
|
||||||
//TraceEvent("SSPeekMutation", data->thisServerID).log();
|
//TraceEvent("SSPeekMutation", data->thisServerID).log();
|
||||||
// The following trace event may produce a value with special characters
|
// The following trace event may produce a value with special characters
|
||||||
TraceEvent("SSPeekMutation", data->thisServerID).detail("Mutation", msg).detail("Version", cloneCursor2->version().toString());
|
TraceEvent("SSPeekMutation", data->thisServerID)
|
||||||
|
.detail("Mutation", msg)
|
||||||
|
.detail("Version", cloneCursor2->version().toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
updater.applyMutation(data, msg, ver);
|
updater.applyMutation(data, msg, ver);
|
||||||
@ -4158,7 +4165,7 @@ void StorageServerDisk::writeMutations(const VectorRef<MutationRef>& mutations,
|
|||||||
Version debugVersion,
|
Version debugVersion,
|
||||||
const char* debugContext) {
|
const char* debugContext) {
|
||||||
for (const auto& m : mutations) {
|
for (const auto& m : mutations) {
|
||||||
DEBUG_MUTATION(debugContext, debugVersion, m).detail("UID", data->thisServerID);
|
DEBUG_MUTATION(debugContext, debugVersion, m, data->thisServerID);
|
||||||
if (m.type == MutationRef::SetValue) {
|
if (m.type == MutationRef::SetValue) {
|
||||||
storage->set(KeyValueRef(m.param1, m.param2));
|
storage->set(KeyValueRef(m.param1, m.param2));
|
||||||
} else if (m.type == MutationRef::ClearRange) {
|
} else if (m.type == MutationRef::ClearRange) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user