From 17be3dc201b4614914e37e601f33d8c3922d281d Mon Sep 17 00:00:00 2001 From: Dan Lambright Date: Thu, 8 Jul 2021 15:59:07 -0400 Subject: [PATCH] fix bug related to misunderstanding of tag replicas. --- fdbserver/CommitProxyServer.actor.cpp | 20 ++++++++++---------- fdbserver/LogSystem.h | 14 ++++---------- fdbserver/MasterInterface.h | 14 +++++++------- fdbserver/masterserver.actor.cpp | 10 +++++----- 4 files changed, 26 insertions(+), 32 deletions(-) diff --git a/fdbserver/CommitProxyServer.actor.cpp b/fdbserver/CommitProxyServer.actor.cpp index 6927eefc20..501b950394 100644 --- a/fdbserver/CommitProxyServer.actor.cpp +++ b/fdbserver/CommitProxyServer.actor.cpp @@ -480,8 +480,8 @@ struct CommitBatchContext { double commitStartTime; - std::set locSet; // the set of tlog locations written to in the mutation. - std::set tagSet; // the set of tags written to in the mutation. + std::set writtenTLogs; // the set of tlog locations written to in the mutation. + std::set writtenTags; // the set of tags written to in the mutation. CommitBatchContext(ProxyCommitData*, const std::vector*, const int); @@ -880,7 +880,7 @@ ACTOR Future applyMetadataToCommittedTransactions(CommitBatchContext* self ACTOR Future getTPCV(CommitBatchContext* self) { state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData; GetTlogPrevCommitVersionReply rep = wait(brokenPromiseToNever( - pProxyCommitData->master.getTlogPrevCommitVersion.getReply(GetTlogPrevCommitVersionRequest(self->locSet)))); + pProxyCommitData->master.getTlogPrevCommitVersion.getReply(GetTlogPrevCommitVersionRequest(self->writtenTLogs)))); // TraceEvent("GetTlogPrevCommitVersionRequest"); return Void(); } @@ -961,9 +961,9 @@ ACTOR Future assignMutationsToStorageServers(CommitBatchContext* self) { if (pProxyCommitData->cacheInfo[m.param1]) { self->toCommit.addTag(cacheTag); } - self->toCommit.saveTags(self->tagSet); + self->toCommit.saveTags(self->writtenTags); self->toCommit.writeTypedMessage(m); - self->toCommit.saveLocations(self->locSet, self->tagSet); + self->toCommit.saveLocations(self->writtenTLogs); } else if (m.type == MutationRef::ClearRange) { KeyRangeRef clearRange(KeyRangeRef(m.param1, m.param2)); auto ranges = pProxyCommitData->keyInfo.intersectingRanges(clearRange); @@ -1019,9 +1019,9 @@ ACTOR Future assignMutationsToStorageServers(CommitBatchContext* self) { if (pProxyCommitData->needsCacheTag(clearRange)) { self->toCommit.addTag(cacheTag); } - self->toCommit.saveTags(self->tagSet); + self->toCommit.saveTags(self->writtenTags); self->toCommit.writeTypedMessage(m); - self->toCommit.saveLocations(self->locSet, self->tagSet); + self->toCommit.saveLocations(self->writtenTLogs); } else { UNREACHABLE(); } @@ -1298,16 +1298,16 @@ ACTOR Future reply(CommitBatchContext* self) { // self->committedVersion. TEST(pProxyCommitData->committedVersion.get() > self->commitVersion); // A later version was reported committed first if (self->commitVersion >= pProxyCommitData->committedVersion.get()) { - state Optional> tagSet; + state Optional> writtenTags; if (SERVER_KNOBS->ENABLE_VERSION_VECTOR) { - tagSet = self->tagSet; + writtenTags = self->writtenTags; } wait(pProxyCommitData->master.reportLiveCommittedVersion.getReply( ReportRawCommittedVersionRequest(self->commitVersion, self->lockedAfter, self->metadataVersionAfter, pProxyCommitData->minKnownCommittedVersion, - tagSet), + writtenTags), TaskPriority::ProxyMasterVersionReply)); } if (self->commitVersion > pProxyCommitData->committedVersion.get()) { diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index 3413e094d7..e14a676777 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -996,19 +996,13 @@ struct LogPushData : NonCopyable { } // copy next_message_tags into given set - void saveTags(std::set& tagSet) { - tagSet.insert(next_message_tags.begin(), next_message_tags.end()); - return; + void saveTags(std::set& writtenTags) { + writtenTags.insert(next_message_tags.begin(), next_message_tags.end()); } // store tlogs as represented by index - // also store in tag set all replicas - void saveLocations(std::set& locSet, std::set& tagSet) { - locSet.insert(msg_locations.begin(), msg_locations.end()); - for (auto loc: msg_locations) { - tagSet.insert(Tag(0, loc)); // TODO POST DEMO support DC other than primary - } - return; + void saveLocations(std::set& writtenTLogs) { + writtenTLogs.insert(msg_locations.begin(), msg_locations.end()); } void writeMessage(StringRef rawMessageWithoutLength, bool usePreviousLocations) { diff --git a/fdbserver/MasterInterface.h b/fdbserver/MasterInterface.h index 950006c53b..26a57ba2ee 100644 --- a/fdbserver/MasterInterface.h +++ b/fdbserver/MasterInterface.h @@ -201,13 +201,13 @@ struct GetTlogPrevCommitVersionReply { struct GetTlogPrevCommitVersionRequest { constexpr static FileIdentifier file_identifier = 16683184; - std::set locSet; + std::set writtenTLogs; ReplyPromise reply; GetTlogPrevCommitVersionRequest() {} - GetTlogPrevCommitVersionRequest(std::set& locSet) : locSet(locSet) {} + GetTlogPrevCommitVersionRequest(std::set& writtenTLogs) : writtenTLogs(writtenTLogs) {} template void serialize(Ar& ar) { - serializer(ar, locSet, reply); + serializer(ar, writtenTLogs, reply); } }; @@ -217,7 +217,7 @@ struct ReportRawCommittedVersionRequest { bool locked; Optional metadataVersion; Version minKnownCommittedVersion; - Optional> tagSet; + Optional> writtenTags; ReplyPromise reply; ReportRawCommittedVersionRequest() : version(invalidVersion), locked(false), minKnownCommittedVersion(0) {} @@ -225,13 +225,13 @@ struct ReportRawCommittedVersionRequest { bool locked, Optional metadataVersion, Version minKnownCommittedVersion, - Optional> tagSet = Optional>()) + Optional> writtenTags = Optional>()) : version(version), locked(locked), metadataVersion(metadataVersion), - minKnownCommittedVersion(minKnownCommittedVersion), tagSet(tagSet) {} + minKnownCommittedVersion(minKnownCommittedVersion), writtenTags(writtenTags) {} template void serialize(Ar& ar) { - serializer(ar, version, locked, metadataVersion, minKnownCommittedVersion, tagSet, reply); + serializer(ar, version, locked, metadataVersion, minKnownCommittedVersion, writtenTags, reply); } }; diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index 7aa6bf3478..1d113260f4 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -1246,10 +1246,10 @@ ACTOR Future serveLiveCommittedVersion(Reference self) { when(ReportRawCommittedVersionRequest req = waitNext(self->myInterface.reportLiveCommittedVersion.getFuture())) { self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, req.minKnownCommittedVersion); - if (SERVER_KNOBS->ENABLE_VERSION_VECTOR && req.tagSet.present()) { + if (SERVER_KNOBS->ENABLE_VERSION_VECTOR && req.writtenTags.present()) { if (req.version > self->ssVersionVector.maxVersion) { // TraceEvent("Received ReportRawCommittedVersionRequest").detail("Version",req.version); - self->ssVersionVector.setVersions(req.tagSet.get(), req.version); + self->ssVersionVector.setVersions(req.writtenTags.get(), req.version); } } if (req.version > self->liveCommittedVersion) { @@ -1263,10 +1263,10 @@ ACTOR Future serveLiveCommittedVersion(Reference self) { when(GetTlogPrevCommitVersionRequest req = waitNext(self->myInterface.getTlogPrevCommitVersion.getFuture())) { GetTlogPrevCommitVersionReply reply; - for (uint16_t loc : req.locSet) { + for (uint16_t tLog : req.writtenTLogs) { // TraceEvent("Received GetTlogPrevCommitVersionRequest").detail("Loc", loc); - if (self->tpcvMap.find(loc) != self->tpcvMap.end()) { - reply.tpcvMap[loc] = self->tpcvMap[loc]; + if (self->tpcvMap.find(tLog) != self->tpcvMap.end()) { + reply.tpcvMap[tLog] = self->tpcvMap[tLog]; } } req.reply.send(reply);