mirror of
https://github.com/apple/foundationdb.git
synced 2025-06-03 03:41:53 +08:00
support upgrades by merging tags associated with the different peek requests
This commit is contained in:
parent
b36e08f08f
commit
1a4ded1c99
@ -46,7 +46,7 @@ struct CoreTLogSet {
|
||||
int32_t logRouterCount;
|
||||
Version startVersion;
|
||||
|
||||
CoreTLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBestPolicy(HasBestPolicyId), locality(tagLocalitySpecial), logRouterCount(0), startVersion(invalidVersion) {}
|
||||
CoreTLogSet() : tLogWriteAntiQuorum(0), tLogReplicationFactor(0), isLocal(true), hasBestPolicy(HasBestPolicyId), locality(tagLocalityUpgraded), logRouterCount(0), startVersion(invalidVersion) {}
|
||||
|
||||
bool operator == (CoreTLogSet const& rhs) const {
|
||||
return tLogs == rhs.tLogs && tLogWriteAntiQuorum == rhs.tLogWriteAntiQuorum && tLogReplicationFactor == rhs.tLogReplicationFactor && isLocal == rhs.isLocal && hasBestPolicy == rhs.hasBestPolicy &&
|
||||
|
@ -302,8 +302,10 @@ struct ILogSystem {
|
||||
int tLogReplicationFactor;
|
||||
IRepPolicyRef tLogPolicy;
|
||||
std::vector< LocalityData > tLogLocalities;
|
||||
bool collectTags;
|
||||
std::vector<Tag> tags;
|
||||
|
||||
MergedPeekCursor( vector< Reference<ILogSystem::IPeekCursor> > const& serverCursors, Version begin );
|
||||
MergedPeekCursor( vector< Reference<ILogSystem::IPeekCursor> > const& serverCursors, Version begin, bool collectTags );
|
||||
MergedPeekCursor( std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> const& logServers, int bestServer, int readQuorum, Tag tag, Version begin, Version end, bool parallelGetMore, std::vector<LocalityData> const& tLogLocalities, IRepPolicyRef const tLogPolicy, int tLogReplicationFactor );
|
||||
MergedPeekCursor( vector< Reference<IPeekCursor> > const& serverCursors, LogMessageVersion const& messageVersion, int bestServer, int readQuorum, Optional<LogMessageVersion> nextVersion, std::vector<LocalityData> const& tLogLocalities, IRepPolicyRef const tLogPolicy, int tLogReplicationFactor );
|
||||
|
||||
|
@ -256,14 +256,17 @@ LogMessageVersion ILogSystem::ServerPeekCursor::version() { return messageVersio
|
||||
|
||||
Version ILogSystem::ServerPeekCursor::popped() { return poppedVersion; }
|
||||
|
||||
ILogSystem::MergedPeekCursor::MergedPeekCursor( vector< Reference<ILogSystem::IPeekCursor> > const& serverCursors, Version begin )
|
||||
: serverCursors(serverCursors), bestServer(-1), readQuorum(serverCursors.size()), tag(invalidTag), currentCursor(0), hasNextMessage(false), messageVersion(begin), randomID(g_random->randomUniqueID()), tLogReplicationFactor(0) {
|
||||
ILogSystem::MergedPeekCursor::MergedPeekCursor( vector< Reference<ILogSystem::IPeekCursor> > const& serverCursors, Version begin, bool collectTags )
|
||||
: serverCursors(serverCursors), bestServer(-1), readQuorum(serverCursors.size()), tag(invalidTag), currentCursor(0), hasNextMessage(false),
|
||||
messageVersion(begin), randomID(g_random->randomUniqueID()), tLogReplicationFactor(0), collectTags(collectTags) {
|
||||
|
||||
sortedVersions.resize(serverCursors.size());
|
||||
}
|
||||
|
||||
ILogSystem::MergedPeekCursor::MergedPeekCursor( std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> const& logServers, int bestServer, int readQuorum, Tag tag, Version begin, Version end, bool parallelGetMore, std::vector< LocalityData > const& tLogLocalities, IRepPolicyRef const tLogPolicy, int tLogReplicationFactor )
|
||||
: bestServer(bestServer), readQuorum(readQuorum), tag(tag), currentCursor(0), hasNextMessage(false), messageVersion(begin), randomID(g_random->randomUniqueID()), tLogLocalities(tLogLocalities), tLogPolicy(tLogPolicy), tLogReplicationFactor(tLogReplicationFactor) {
|
||||
ILogSystem::MergedPeekCursor::MergedPeekCursor( std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> const& logServers, int bestServer, int readQuorum, Tag tag, Version begin, Version end,
|
||||
bool parallelGetMore, std::vector< LocalityData > const& tLogLocalities, IRepPolicyRef const tLogPolicy, int tLogReplicationFactor )
|
||||
: bestServer(bestServer), readQuorum(readQuorum), tag(tag), currentCursor(0), hasNextMessage(false), messageVersion(begin), randomID(g_random->randomUniqueID()), tLogLocalities(tLogLocalities),
|
||||
tLogPolicy(tLogPolicy), tLogReplicationFactor(tLogReplicationFactor), collectTags(false) {
|
||||
for( int i = 0; i < logServers.size(); i++ ) {
|
||||
Reference<ILogSystem::ServerPeekCursor> cursor( new ILogSystem::ServerPeekCursor( logServers[i], tag, begin, end, bestServer >= 0, parallelGetMore ) );
|
||||
//TraceEvent("MPC_starting", randomID).detail("cursor", cursor->randomID).detail("end", end);
|
||||
@ -272,8 +275,10 @@ ILogSystem::MergedPeekCursor::MergedPeekCursor( std::vector<Reference<AsyncVar<O
|
||||
sortedVersions.resize(serverCursors.size());
|
||||
}
|
||||
|
||||
ILogSystem::MergedPeekCursor::MergedPeekCursor( vector< Reference<ILogSystem::IPeekCursor> > const& serverCursors, LogMessageVersion const& messageVersion, int bestServer, int readQuorum, Optional<LogMessageVersion> nextVersion, std::vector< LocalityData > const& tLogLocalities, IRepPolicyRef const tLogPolicy, int tLogReplicationFactor )
|
||||
: serverCursors(serverCursors), bestServer(bestServer), readQuorum(readQuorum), currentCursor(0), hasNextMessage(false), messageVersion(messageVersion), nextVersion(nextVersion), randomID(g_random->randomUniqueID()), tLogLocalities(tLogLocalities), tLogPolicy(tLogPolicy), tLogReplicationFactor(tLogReplicationFactor) {
|
||||
ILogSystem::MergedPeekCursor::MergedPeekCursor( vector< Reference<ILogSystem::IPeekCursor> > const& serverCursors, LogMessageVersion const& messageVersion, int bestServer, int readQuorum, Optional<LogMessageVersion> nextVersion,
|
||||
std::vector< LocalityData > const& tLogLocalities, IRepPolicyRef const tLogPolicy, int tLogReplicationFactor )
|
||||
: serverCursors(serverCursors), bestServer(bestServer), readQuorum(readQuorum), currentCursor(0), hasNextMessage(false), messageVersion(messageVersion), nextVersion(nextVersion),
|
||||
randomID(g_random->randomUniqueID()), tLogLocalities(tLogLocalities), tLogPolicy(tLogPolicy), tLogReplicationFactor(tLogReplicationFactor), collectTags(false) {
|
||||
sortedVersions.resize(serverCursors.size());
|
||||
calcHasMessage();
|
||||
}
|
||||
@ -367,13 +372,19 @@ void ILogSystem::MergedPeekCursor::updateMessage(bool usePolicy) {
|
||||
break;
|
||||
}
|
||||
|
||||
tags.clear();
|
||||
for(int i = 0; i < serverCursors.size(); i++) {
|
||||
auto& c = serverCursors[i];
|
||||
ASSERT_WE_THINK( !c->hasMessage() || c->version() >= messageVersion ); // Seems like the loop above makes this unconditionally true
|
||||
if (c->version() == messageVersion && c->hasMessage()) {
|
||||
hasNextMessage = true;
|
||||
currentCursor = i;
|
||||
break;
|
||||
if(!collectTags) {
|
||||
break;
|
||||
}
|
||||
auto& addTags = c->getTags();
|
||||
ASSERT(addTags.size() == 1);
|
||||
tags.push_back(addTags[0]);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -396,6 +407,9 @@ StringRef ILogSystem::MergedPeekCursor::getMessageWithTags() { return serverCurs
|
||||
|
||||
|
||||
const std::vector<Tag>& ILogSystem::MergedPeekCursor::getTags() {
|
||||
if(collectTags) {
|
||||
return tags;
|
||||
}
|
||||
return serverCursors[currentCursor]->getTags();
|
||||
}
|
||||
|
||||
|
@ -858,9 +858,9 @@ namespace oldTLog {
|
||||
int32_t messageLength;
|
||||
uint32_t subVersion;
|
||||
rd >> messageLength >> subVersion;
|
||||
messageLength += sizeof(uint16_t);
|
||||
messages << messageLength << subVersion << uint16_t(0);
|
||||
messageLength -= (sizeof(subVersion) + sizeof(uint16_t));
|
||||
messageLength += sizeof(uint16_t) + sizeof(Tag);
|
||||
messages << messageLength << subVersion << uint16_t(1) << req.tag;
|
||||
messageLength -= (sizeof(subVersion) + sizeof(uint16_t) + sizeof(Tag));
|
||||
messages.serializeBytes(rd.readBytes(messageLength), messageLength);
|
||||
}
|
||||
}
|
||||
@ -934,9 +934,9 @@ namespace oldTLog {
|
||||
int32_t messageLength;
|
||||
uint32_t subVersion;
|
||||
rd >> messageLength >> subVersion;
|
||||
messageLength += sizeof(uint16_t);
|
||||
messages << messageLength << subVersion << uint16_t(0);
|
||||
messageLength -= (sizeof(subVersion) + sizeof(uint16_t));
|
||||
messageLength += sizeof(uint16_t) + sizeof(Tag);
|
||||
messages << messageLength << subVersion << uint16_t(1) << req.tag;
|
||||
messageLength -= (sizeof(subVersion) + sizeof(uint16_t) + sizeof(Tag));
|
||||
messages.serializeBytes(rd.readBytes(messageLength), messageLength);
|
||||
}
|
||||
}
|
||||
|
@ -1847,7 +1847,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
|
||||
|
||||
if (req.recoverFrom.logSystemType == 2) {
|
||||
bool found = false;
|
||||
if(req.locality == tagLocalityInvalid || req.locality == tagLocalitySpecial) {
|
||||
if(req.locality == tagLocalityInvalid || req.locality == tagLocalitySpecial || req.locality == tagLocalityUpgraded) {
|
||||
logData->unrecoveredBefore = req.knownCommittedVersion;
|
||||
found = true;
|
||||
}
|
||||
|
@ -421,7 +421,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
||||
} else {
|
||||
int bestSet = -1;
|
||||
for(int t = 0; t < tLogs.size(); t++) {
|
||||
if(tLogs[t]->hasBestPolicy && (tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalitySpecial || (tLogs[t]->isLocal && tag.locality == tagLocalityLogRouter))) {
|
||||
if(tLogs[t]->hasBestPolicy && (tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalityUpgraded || (tLogs[t]->isLocal && tag.locality == tagLocalityLogRouter))) {
|
||||
bestSet = t;
|
||||
break;
|
||||
}
|
||||
@ -437,7 +437,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
||||
for(int i = 0; i < oldLogData.size() && begin < oldLogData[i].epochEnd; i++) {
|
||||
int bestOldSet = -1;
|
||||
for(int t = 0; t < oldLogData[i].tLogs.size(); t++) {
|
||||
if(oldLogData[i].tLogs[t]->hasBestPolicy && (oldLogData[i].tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalitySpecial || (oldLogData[i].tLogs[t]->isLocal && tag.locality == tagLocalityLogRouter))) {
|
||||
if(oldLogData[i].tLogs[t]->hasBestPolicy && (oldLogData[i].tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalityUpgraded || (oldLogData[i].tLogs[t]->isLocal && tag.locality == tagLocalityLogRouter))) {
|
||||
bestOldSet = t;
|
||||
break;
|
||||
}
|
||||
@ -465,13 +465,13 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
||||
for(auto tag : tags) {
|
||||
cursors.push_back(peek(begin, tag, parallelGetMore));
|
||||
}
|
||||
return Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor(cursors, begin) );
|
||||
return Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor(cursors, begin, oldLogData.size() && oldLogData[0].tLogs.size() && oldLogData[0].tLogs[0]->locality == tagLocalityUpgraded) );
|
||||
}
|
||||
|
||||
Reference<IPeekCursor> peekLocal( Tag tag, Version begin, Version end ) {
|
||||
int bestSet = -1;
|
||||
for(int t = 0; t < tLogs.size(); t++) {
|
||||
if(tLogs[t]->hasBestPolicy && (tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalitySpecial || (tLogs[t]->isLocal && tag.locality == tagLocalityLogRouter))) {
|
||||
if(tLogs[t]->hasBestPolicy && (tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalitySpecial || tLogs[t]->locality == tagLocalityUpgraded || (tLogs[t]->isLocal && tag.locality == tagLocalityLogRouter))) {
|
||||
bestSet = t;
|
||||
break;
|
||||
}
|
||||
@ -491,7 +491,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
||||
for(int i = 0; i < oldLogData.size() && begin < oldLogData[i].epochEnd; i++) {
|
||||
int bestOldSet = -1;
|
||||
for(int t = 0; t < oldLogData[i].tLogs.size(); t++) {
|
||||
if(oldLogData[i].tLogs[t]->hasBestPolicy && (oldLogData[i].tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalitySpecial || (oldLogData[i].tLogs[t]->isLocal && tag.locality == tagLocalityLogRouter))) {
|
||||
if(oldLogData[i].tLogs[t]->hasBestPolicy && (oldLogData[i].tLogs[t]->locality == tag.locality || tag.locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalitySpecial || oldLogData[i].tLogs[t]->locality == tagLocalityUpgraded || (oldLogData[i].tLogs[t]->isLocal && tag.locality == tagLocalityLogRouter))) {
|
||||
bestOldSet = t;
|
||||
break;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user