fix bug related to misunderstanding of tag replicas.

This commit is contained in:
Dan Lambright 2021-07-08 15:59:07 -04:00
parent a107dd655e
commit 17be3dc201
4 changed files with 26 additions and 32 deletions

View File

@ -480,8 +480,8 @@ struct CommitBatchContext {
double commitStartTime;
std::set<uint16_t> locSet; // the set of tlog locations written to in the mutation.
std::set<Tag> tagSet; // the set of tags written to in the mutation.
std::set<uint16_t> writtenTLogs; // the set of tlog locations written to in the mutation.
std::set<Tag> writtenTags; // the set of tags written to in the mutation.
CommitBatchContext(ProxyCommitData*, const std::vector<CommitTransactionRequest>*, const int);
@ -880,7 +880,7 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
ACTOR Future<Void> getTPCV(CommitBatchContext* self) {
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
GetTlogPrevCommitVersionReply rep = wait(brokenPromiseToNever(
pProxyCommitData->master.getTlogPrevCommitVersion.getReply(GetTlogPrevCommitVersionRequest(self->locSet))));
pProxyCommitData->master.getTlogPrevCommitVersion.getReply(GetTlogPrevCommitVersionRequest(self->writtenTLogs))));
// TraceEvent("GetTlogPrevCommitVersionRequest");
return Void();
}
@ -961,9 +961,9 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
if (pProxyCommitData->cacheInfo[m.param1]) {
self->toCommit.addTag(cacheTag);
}
self->toCommit.saveTags(self->tagSet);
self->toCommit.saveTags(self->writtenTags);
self->toCommit.writeTypedMessage(m);
self->toCommit.saveLocations(self->locSet, self->tagSet);
self->toCommit.saveLocations(self->writtenTLogs);
} else if (m.type == MutationRef::ClearRange) {
KeyRangeRef clearRange(KeyRangeRef(m.param1, m.param2));
auto ranges = pProxyCommitData->keyInfo.intersectingRanges(clearRange);
@ -1019,9 +1019,9 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
if (pProxyCommitData->needsCacheTag(clearRange)) {
self->toCommit.addTag(cacheTag);
}
self->toCommit.saveTags(self->tagSet);
self->toCommit.saveTags(self->writtenTags);
self->toCommit.writeTypedMessage(m);
self->toCommit.saveLocations(self->locSet, self->tagSet);
self->toCommit.saveLocations(self->writtenTLogs);
} else {
UNREACHABLE();
}
@ -1298,16 +1298,16 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
// self->committedVersion.
TEST(pProxyCommitData->committedVersion.get() > self->commitVersion); // A later version was reported committed first
if (self->commitVersion >= pProxyCommitData->committedVersion.get()) {
state Optional<std::set<Tag>> tagSet;
state Optional<std::set<Tag>> writtenTags;
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR) {
tagSet = self->tagSet;
writtenTags = self->writtenTags;
}
wait(pProxyCommitData->master.reportLiveCommittedVersion.getReply(
ReportRawCommittedVersionRequest(self->commitVersion,
self->lockedAfter,
self->metadataVersionAfter,
pProxyCommitData->minKnownCommittedVersion,
tagSet),
writtenTags),
TaskPriority::ProxyMasterVersionReply));
}
if (self->commitVersion > pProxyCommitData->committedVersion.get()) {

View File

@ -996,19 +996,13 @@ struct LogPushData : NonCopyable {
}
// copy next_message_tags into given set
void saveTags(std::set<Tag>& tagSet) {
tagSet.insert(next_message_tags.begin(), next_message_tags.end());
return;
void saveTags(std::set<Tag>& writtenTags) {
writtenTags.insert(next_message_tags.begin(), next_message_tags.end());
}
// store tlogs as represented by index
// also store in tag set all replicas
void saveLocations(std::set<uint16_t>& locSet, std::set<Tag>& tagSet) {
locSet.insert(msg_locations.begin(), msg_locations.end());
for (auto loc: msg_locations) {
tagSet.insert(Tag(0, loc)); // TODO POST DEMO support DC other than primary
}
return;
void saveLocations(std::set<uint16_t>& writtenTLogs) {
writtenTLogs.insert(msg_locations.begin(), msg_locations.end());
}
void writeMessage(StringRef rawMessageWithoutLength, bool usePreviousLocations) {

View File

@ -201,13 +201,13 @@ struct GetTlogPrevCommitVersionReply {
struct GetTlogPrevCommitVersionRequest {
constexpr static FileIdentifier file_identifier = 16683184;
std::set<uint16_t> locSet;
std::set<uint16_t> writtenTLogs;
ReplyPromise<GetTlogPrevCommitVersionReply> reply;
GetTlogPrevCommitVersionRequest() {}
GetTlogPrevCommitVersionRequest(std::set<uint16_t>& locSet) : locSet(locSet) {}
GetTlogPrevCommitVersionRequest(std::set<uint16_t>& writtenTLogs) : writtenTLogs(writtenTLogs) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, locSet, reply);
serializer(ar, writtenTLogs, reply);
}
};
@ -217,7 +217,7 @@ struct ReportRawCommittedVersionRequest {
bool locked;
Optional<Value> metadataVersion;
Version minKnownCommittedVersion;
Optional<std::set<Tag>> tagSet;
Optional<std::set<Tag>> writtenTags;
ReplyPromise<Void> reply;
ReportRawCommittedVersionRequest() : version(invalidVersion), locked(false), minKnownCommittedVersion(0) {}
@ -225,13 +225,13 @@ struct ReportRawCommittedVersionRequest {
bool locked,
Optional<Value> metadataVersion,
Version minKnownCommittedVersion,
Optional<std::set<Tag>> tagSet = Optional<std::set<Tag>>())
Optional<std::set<Tag>> writtenTags = Optional<std::set<Tag>>())
: version(version), locked(locked), metadataVersion(metadataVersion),
minKnownCommittedVersion(minKnownCommittedVersion), tagSet(tagSet) {}
minKnownCommittedVersion(minKnownCommittedVersion), writtenTags(writtenTags) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version, locked, metadataVersion, minKnownCommittedVersion, tagSet, reply);
serializer(ar, version, locked, metadataVersion, minKnownCommittedVersion, writtenTags, reply);
}
};

View File

@ -1246,10 +1246,10 @@ ACTOR Future<Void> serveLiveCommittedVersion(Reference<MasterData> self) {
when(ReportRawCommittedVersionRequest req =
waitNext(self->myInterface.reportLiveCommittedVersion.getFuture())) {
self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, req.minKnownCommittedVersion);
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR && req.tagSet.present()) {
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR && req.writtenTags.present()) {
if (req.version > self->ssVersionVector.maxVersion) {
// TraceEvent("Received ReportRawCommittedVersionRequest").detail("Version",req.version);
self->ssVersionVector.setVersions(req.tagSet.get(), req.version);
self->ssVersionVector.setVersions(req.writtenTags.get(), req.version);
}
}
if (req.version > self->liveCommittedVersion) {
@ -1263,10 +1263,10 @@ ACTOR Future<Void> serveLiveCommittedVersion(Reference<MasterData> self) {
when(GetTlogPrevCommitVersionRequest req =
waitNext(self->myInterface.getTlogPrevCommitVersion.getFuture())) {
GetTlogPrevCommitVersionReply reply;
for (uint16_t loc : req.locSet) {
for (uint16_t tLog : req.writtenTLogs) {
// TraceEvent("Received GetTlogPrevCommitVersionRequest").detail("Loc", loc);
if (self->tpcvMap.find(loc) != self->tpcvMap.end()) {
reply.tpcvMap[loc] = self->tpcvMap[loc];
if (self->tpcvMap.find(tLog) != self->tpcvMap.end()) {
reply.tpcvMap[tLog] = self->tpcvMap[tLog];
}
}
req.reply.send(reply);