Add epochBegin version to OldTLogCoreData/OldLogData/OldTLogConf

This is to simplify the backup process so that whenever there is an old epoch
in the log system, we always know its begin version and can backup from that
version if no progress is known for that old epoch.
This commit is contained in:
Jingyu Zhou 2019-10-01 13:38:52 -07:00
parent 250137a52f
commit 42430e8f5e
3 changed files with 27 additions and 19 deletions

View File

@ -80,16 +80,17 @@ struct OldTLogCoreData {
std::vector<CoreTLogSet> tLogs;
int32_t logRouterTags;
int32_t txsTags;
Version epochEnd;
Version epochBegin, epochEnd;
std::set<int8_t> pseudoLocalities;
LogEpoch epoch;
OldTLogCoreData() : epochEnd(0), logRouterTags(0), txsTags(0), epoch(0) {}
OldTLogCoreData() : epochBegin(0), epochEnd(0), logRouterTags(0), txsTags(0), epoch(0) {}
explicit OldTLogCoreData(const OldLogData&);
bool operator==(const OldTLogCoreData& rhs) const {
return tLogs == rhs.tLogs && logRouterTags == rhs.logRouterTags && txsTags == rhs.txsTags &&
epochEnd == rhs.epochEnd && pseudoLocalities == rhs.pseudoLocalities && epoch == rhs.epoch;
epochBegin == rhs.epochBegin && epochEnd == rhs.epochEnd && pseudoLocalities == rhs.pseudoLocalities &&
epoch == rhs.epoch;
}
template <class Archive>
@ -109,7 +110,7 @@ struct OldTLogCoreData {
serializer(ar, txsTags);
}
if (ar.protocolVersion().hasBackupWorker()) {
serializer(ar, epoch);
serializer(ar, epoch, epochBegin);
}
}
};

View File

@ -172,22 +172,23 @@ struct TLogSet {
struct OldTLogConf {
constexpr static FileIdentifier file_identifier = 16233772;
std::vector<TLogSet> tLogs;
Version epochEnd;
Version epochBegin, epochEnd;
int32_t logRouterTags;
int32_t txsTags;
std::set<int8_t> pseudoLocalities; // Tracking pseudo localities, e.g., tagLocalityLogRouterMapped, used in the old epoch.
LogEpoch epoch;
OldTLogConf() : epochEnd(0), logRouterTags(0), txsTags(0), epoch(0) {}
OldTLogConf() : epochBegin(0), epochEnd(0), logRouterTags(0), txsTags(0), epoch(0) {}
explicit OldTLogConf(const OldLogData&);
std::string toString() const {
return format("end: %d tags: %d %s", epochEnd, logRouterTags, describe(tLogs).c_str());
}
bool operator == ( const OldTLogConf& rhs ) const {
return tLogs == rhs.tLogs && epochEnd == rhs.epochEnd && logRouterTags == rhs.logRouterTags &&
txsTags == rhs.txsTags && pseudoLocalities == rhs.pseudoLocalities && epoch == rhs.epoch;
bool operator==(const OldTLogConf& rhs) const {
return tLogs == rhs.tLogs && epochBegin == rhs.epochBegin && epochEnd == rhs.epochEnd &&
logRouterTags == rhs.logRouterTags && txsTags == rhs.txsTags &&
pseudoLocalities == rhs.pseudoLocalities && epoch == rhs.epoch;
}
bool isEqualIds(OldTLogConf const& r) const {
@ -204,7 +205,7 @@ struct OldTLogConf {
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, tLogs, epochEnd, logRouterTags, pseudoLocalities, txsTags, epoch);
serializer(ar, tLogs, epochBegin, epochEnd, logRouterTags, pseudoLocalities, txsTags, epoch);
}
};

View File

@ -45,16 +45,16 @@ struct OldLogData {
std::vector<Reference<LogSet>> tLogs;
int32_t logRouterTags;
int32_t txsTags; // The number of txsTags, which may change across generations.
Version epochEnd;
Version epochBegin, epochEnd;
std::set<int8_t> pseudoLocalities;
LogEpoch epoch;
OldLogData() : epochEnd(0), logRouterTags(0), txsTags(0), epoch(0) {}
OldLogData() : epochBegin(0), epochEnd(0), logRouterTags(0), txsTags(0), epoch(0) {}
// Constructor for T of OldTLogConf and OldTLogCoreData
template <class T>
explicit OldLogData(const T& conf)
: logRouterTags(conf.logRouterTags), txsTags(conf.txsTags), epochEnd(conf.epochEnd),
: logRouterTags(conf.logRouterTags), txsTags(conf.txsTags), epochBegin(conf.epochBegin), epochEnd(conf.epochEnd),
pseudoLocalities(conf.pseudoLocalities), epoch(conf.epoch) {
tLogs.resize(conf.tLogs.size());
for (int j = 0; j < conf.tLogs.size(); j++) {
@ -130,8 +130,8 @@ TLogSet::TLogSet(const LogSet& rhs) :
}
OldTLogConf::OldTLogConf(const OldLogData& oldLogData)
: logRouterTags(oldLogData.logRouterTags), txsTags(oldLogData.txsTags), epochEnd(oldLogData.epochEnd),
pseudoLocalities(oldLogData.pseudoLocalities), epoch(oldLogData.epoch) {
: logRouterTags(oldLogData.logRouterTags), txsTags(oldLogData.txsTags), epochBegin(oldLogData.epochBegin),
epochEnd(oldLogData.epochEnd), pseudoLocalities(oldLogData.pseudoLocalities), epoch(oldLogData.epoch) {
for (const Reference<LogSet>& logSet : oldLogData.tLogs) {
tLogs.emplace_back(*logSet);
}
@ -153,8 +153,8 @@ CoreTLogSet::CoreTLogSet(const LogSet& logset) :
}
OldTLogCoreData::OldTLogCoreData(const OldLogData& oldData)
: logRouterTags(oldData.logRouterTags), txsTags(oldData.txsTags), epochEnd(oldData.epochEnd),
pseudoLocalities(oldData.pseudoLocalities), epoch(oldData.epoch) {
: logRouterTags(oldData.logRouterTags), txsTags(oldData.txsTags), epochBegin(oldData.epochBegin),
epochEnd(oldData.epochEnd), pseudoLocalities(oldData.pseudoLocalities), epoch(oldData.epoch) {
for (const Reference<LogSet>& logSet : oldData.tLogs) {
if (logSet->logServers.size()) {
tLogs.emplace_back(*logSet);
@ -365,7 +365,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
newState.oldTLogData.emplace_back(oldData);
TraceEvent("BWToCore")
.detail("Epoch", newState.oldTLogData.back().epoch)
.detail("Version", newState.oldTLogData.back().epochEnd);
.detail("BeginVersion", newState.oldTLogData.back().epochBegin)
.detail("EndVersion", newState.oldTLogData.back().epochEnd);
}
}
@ -1607,7 +1608,11 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
modifiedState.oldTLogData[i].tLogs.push_back(coreSet);
modifiedState.oldTLogData[i].tLogs[0].isLocal = true;
modifiedState.oldTLogData[i].logRouterTags = 0;
modifiedState.oldTLogData[i].epochEnd = ( i == 0 ? modifiedState.tLogs[0].startVersion : modifiedState.oldTLogData[i-1].tLogs[0].startVersion );
modifiedState.oldTLogData[i].epochBegin =
modifiedState.oldTLogData[i].tLogs[0].startVersion;
modifiedState.oldTLogData[i].epochEnd =
(i == 0 ? modifiedState.tLogs[0].startVersion
: modifiedState.oldTLogData[i - 1].tLogs[0].startVersion);
modifiedLogSets++;
}
break;
@ -2193,6 +2198,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(oldLogSystem->tLogs.size()) {
logSystem->oldLogData.emplace_back();
logSystem->oldLogData[0].tLogs = oldLogSystem->tLogs;
logSystem->oldLogData[0].epochBegin = oldLogSystem->tLogs[0]->startVersion;
logSystem->oldLogData[0].epochEnd = oldLogSystem->knownCommittedVersion + 1;
logSystem->oldLogData[0].logRouterTags = oldLogSystem->logRouterTags;
logSystem->oldLogData[0].txsTags = oldLogSystem->txsTags;