remote logs use bufferedCursor when peeking from log routers to improve performance

bufferedCursor performance has been improved
This commit is contained in:
Evan Tschannen 2019-11-04 19:47:45 -08:00
parent c805ed11e8
commit 457896b80d
3 changed files with 90 additions and 44 deletions

View File

@ -438,6 +438,7 @@ struct ILogSystem {
bool hasNextMessage;
UID randomID;
int tLogReplicationFactor;
Future<Void> more;
MergedPeekCursor( std::vector< Reference<ILogSystem::IPeekCursor> > const& serverCursors, Version begin );
MergedPeekCursor( std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> const& logServers, int bestServer, int readQuorum, Tag tag, Version begin, Version end, bool parallelGetMore, std::vector<LocalityData> const& tLogLocalities, Reference<IReplicationPolicy> const tLogPolicy, int tLogReplicationFactor );
@ -484,6 +485,7 @@ struct ILogSystem {
bool hasNextMessage;
bool useBestSet;
UID randomID;
Future<Void> more;
SetPeekCursor( std::vector<Reference<LogSet>> const& logSets, int bestSet, int bestServer, Tag tag, Version begin, Version end, bool parallelGetMore );
SetPeekCursor( std::vector<Reference<LogSet>> const& logSets, std::vector< std::vector< Reference<IPeekCursor> > > const& serverCursors, LogMessageVersion const& messageVersion, int bestSet, int bestServer, Optional<LogMessageVersion> nextVersion, bool useBestSet );
@ -572,16 +574,20 @@ struct ILogSystem {
};
std::vector<Reference<IPeekCursor>> cursors;
std::vector<Deque<BufferedMessage>> cursorMessages;
std::vector<BufferedMessage> messages;
int messageIndex;
LogMessageVersion messageVersion;
Version end;
bool hasNextMessage;
bool withTags;
bool knownUnique;
Version poppedVersion;
Version initialPoppedVersion;
bool canDiscardPopped;
Future<Void> more;
int targetQueueSize;
UID randomID;
//FIXME: collectTags is needed to support upgrades from 5.X to 6.0. Remove this code when we no longer support that upgrade.
bool collectTags;
@ -589,6 +595,7 @@ struct ILogSystem {
void combineMessages();
BufferedCursor( std::vector<Reference<IPeekCursor>> cursors, Version begin, Version end, bool withTags, bool collectTags, bool canDiscardPopped );
BufferedCursor( std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> const& logServers, Tag tag, Version begin, Version end, bool parallelGetMore );
virtual Reference<IPeekCursor> cloneNoMore();
virtual void setProtocolVersion( ProtocolVersion version );
@ -644,7 +651,7 @@ struct ILogSystem {
// Returns when the preceding changes are durable. (Later we will need multiple return signals for diffferent durability levels)
// If the current epoch has ended, push will not return, and the pushed messages will not be visible in any subsequent epoch (but may become visible in this epoch)
virtual Reference<IPeekCursor> peek( UID dbgid, Version begin, Tag tag, bool parallelGetMore = false ) = 0;
virtual Reference<IPeekCursor> peek( UID dbgid, Version begin, Optional<Version> end, Tag tag, bool parallelGetMore = false ) = 0;
// Returns (via cursor interface) a stream of messages with the given tag and message versions >= (begin, 0), ordered by message version
// If pop was previously or concurrently called with upTo > begin, the cursor may not return all such messages. In that case cursor->popped() will
// be greater than begin to reflect that.

View File

@ -477,6 +477,10 @@ ACTOR Future<Void> mergedPeekGetMore(ILogSystem::MergedPeekCursor* self, LogMess
}
Future<Void> ILogSystem::MergedPeekCursor::getMore(TaskPriority taskID) {
if( more.isValid() && !more.isReady() ) {
return more;
}
if(!serverCursors.size())
return Never();
@ -490,7 +494,8 @@ Future<Void> ILogSystem::MergedPeekCursor::getMore(TaskPriority taskID) {
if (version() > startVersion)
return Void();
return mergedPeekGetMore(this, startVersion, taskID);
more = mergedPeekGetMore(this, startVersion, taskID);
return more;
}
Future<Void> ILogSystem::MergedPeekCursor::onFailed() {
@ -778,6 +783,10 @@ ACTOR Future<Void> setPeekGetMore(ILogSystem::SetPeekCursor* self, LogMessageVer
}
Future<Void> ILogSystem::SetPeekCursor::getMore(TaskPriority taskID) {
if( more.isValid() && !more.isReady() ) {
return more;
}
auto startVersion = version();
calcHasMessage();
if( hasMessage() )
@ -788,7 +797,8 @@ Future<Void> ILogSystem::SetPeekCursor::getMore(TaskPriority taskID) {
if (version() > startVersion)
return Void();
return setPeekGetMore(this, startVersion, taskID);
more = setPeekGetMore(this, startVersion, taskID);
return more;
}
Future<Void> ILogSystem::SetPeekCursor::onFailed() {
@ -909,8 +919,20 @@ Version ILogSystem::MultiCursor::popped() {
return std::max(poppedVersion, cursors.back()->popped());
}
ILogSystem::BufferedCursor::BufferedCursor( std::vector<Reference<IPeekCursor>> cursors, Version begin, Version end, bool withTags, bool collectTags, bool canDiscardPopped ) : cursors(cursors), messageVersion(begin), end(end), withTags(withTags), collectTags(collectTags), hasNextMessage(false), messageIndex(0), poppedVersion(0), initialPoppedVersion(0), canDiscardPopped(canDiscardPopped) {
ILogSystem::BufferedCursor::BufferedCursor( std::vector<Reference<IPeekCursor>> cursors, Version begin, Version end, bool withTags, bool collectTags, bool canDiscardPopped ) : cursors(cursors), messageVersion(begin), end(end), withTags(withTags), collectTags(collectTags), hasNextMessage(false), messageIndex(0), poppedVersion(0), initialPoppedVersion(0), canDiscardPopped(canDiscardPopped), knownUnique(false), randomID(deterministicRandom()->randomUniqueID()) {
targetQueueSize = 5000/cursors.size();
messages.reserve(10000);
cursorMessages.resize(cursors.size());
}
ILogSystem::BufferedCursor::BufferedCursor( std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> const& logServers, Tag tag, Version begin, Version end, bool parallelGetMore ) : messageVersion(begin), end(end), withTags(true), collectTags(false), hasNextMessage(false), messageIndex(0), poppedVersion(0), initialPoppedVersion(0), canDiscardPopped(false), knownUnique(true), randomID(deterministicRandom()->randomUniqueID()) {
targetQueueSize = 5000/logServers.size();
messages.reserve(10000);
cursorMessages.resize(logServers.size());
for( int i = 0; i < logServers.size(); i++ ) {
Reference<ILogSystem::ServerPeekCursor> cursor( new ILogSystem::ServerPeekCursor( logServers[i], tag, begin, end, false, parallelGetMore ) );
cursors.push_back( cursor );
}
}
void ILogSystem::BufferedCursor::combineMessages() {
@ -990,26 +1012,23 @@ void ILogSystem::BufferedCursor::advanceTo(LogMessageVersion n) {
ASSERT(false);
}
ACTOR Future<Void> bufferedGetMoreLoader( ILogSystem::BufferedCursor* self, Reference<ILogSystem::IPeekCursor> cursor, Version maxVersion, TaskPriority taskID ) {
if(cursor->version().version >= maxVersion) {
return Void();
}
ACTOR Future<Void> bufferedGetMoreLoader( ILogSystem::BufferedCursor* self, Reference<ILogSystem::IPeekCursor> cursor, int idx, TaskPriority taskID ) {
loop {
wait(yield());
if(cursor->version().version >= self->end || self->cursorMessages[idx].size() > self->targetQueueSize) {
return Void();
}
wait(cursor->getMore(taskID));
self->poppedVersion = std::max(self->poppedVersion, cursor->popped());
if(self->canDiscardPopped) {
self->initialPoppedVersion = std::max(self->initialPoppedVersion, cursor->popped());
}
if(cursor->version().version >= maxVersion) {
if(cursor->version().version >= self->end) {
return Void();
}
while(cursor->hasMessage()) {
self->messages.push_back(ILogSystem::BufferedCursor::BufferedMessage(cursor->arena(), (!self->withTags || self->collectTags) ? cursor->getMessage() : cursor->getMessageWithTags(), !self->withTags ? std::vector<Tag>() : cursor->getTags(), cursor->version()));
self->cursorMessages[idx].push_back(ILogSystem::BufferedCursor::BufferedMessage(cursor->arena(), (!self->withTags || self->collectTags) ? cursor->getMessage() : cursor->getMessageWithTags(), !self->withTags ? std::vector<Tag>() : cursor->getTags(), cursor->version()));
cursor->nextMessage();
if(cursor->version().version >= maxVersion) {
return Void();
}
}
}
}
@ -1020,29 +1039,47 @@ ACTOR Future<Void> bufferedGetMore( ILogSystem::BufferedCursor* self, TaskPriori
throw internal_error();
}
state Version targetVersion = std::min(self->end, self->messageVersion.version + SERVER_KNOBS->VERSIONS_PER_BATCH);
self->messages.clear();
std::vector<Future<Void>> loaders;
loaders.reserve(self->cursors.size());
for(auto& cursor : self->cursors) {
loaders.push_back(bufferedGetMoreLoader(self, cursor, targetVersion, taskID));
}
wait( waitForAll(loaders) );
wait(yield());
if(self->collectTags) {
for(int i = 0; i < self->cursors.size(); i++) {
loaders.push_back(bufferedGetMoreLoader(self, self->cursors[i], i, taskID));
}
state Future<Void> allLoaders = waitForAll(loaders);
state Version minVersion;
loop {
wait( allLoaders || delay(0.005, taskID) );
minVersion = self->end;
for(auto& cursor : self->cursors) {
minVersion = std::min(minVersion, cursor->version().version);
}
if(minVersion > self->messageVersion.version) {
break;
}
if(allLoaders.isReady()) {
wait(Future<Void>(Never()));
}
}
wait( yield() );
for(auto &it : self->cursorMessages) {
while(!it.empty() && it.front().version.version < minVersion) {
self->messages.push_back(it.front());
it.pop_front();
}
}
if(self->collectTags || self->knownUnique) {
std::sort(self->messages.begin(), self->messages.end());
} else {
uniquify(self->messages);
}
self->messageVersion = LogMessageVersion(minVersion);
self->messageIndex = 0;
self->hasNextMessage = self->messages.size() > 0;
Version minVersion = self->end;
for(auto& cursor : self->cursors) {
minVersion = std::min(minVersion, cursor->version().version);
}
self->messageVersion = LogMessageVersion(minVersion);
if(self->collectTags) {
self->combineMessages();
@ -1050,7 +1087,7 @@ ACTOR Future<Void> bufferedGetMore( ILogSystem::BufferedCursor* self, TaskPriori
wait(yield());
if(self->canDiscardPopped && self->poppedVersion > self->version().version) {
TraceEvent(SevWarn, "DiscardingPoppedData").detail("Version", self->version().version).detail("Popped", self->poppedVersion);
TraceEvent(SevWarn, "DiscardingPoppedData", self->randomID).detail("Version", self->version().version).detail("Popped", self->poppedVersion);
self->messageVersion = std::max(self->messageVersion, LogMessageVersion(self->poppedVersion));
for(auto& cursor : self->cursors) {
cursor->advanceTo(self->messageVersion);
@ -1107,8 +1144,11 @@ const LogMessageVersion& ILogSystem::BufferedCursor::version() {
}
Version ILogSystem::BufferedCursor::getMinKnownCommittedVersion() {
ASSERT(false);
return invalidVersion;
Version res = 0;
for(auto& cursor : cursors) {
res = std::max(res, cursor->getMinKnownCommittedVersion());
}
return res;
}
Version ILogSystem::BufferedCursor::popped() {

View File

@ -538,7 +538,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
Reference<IPeekCursor> peekRemote( UID dbgid, Version begin, Tag tag, bool parallelGetMore ) {
Reference<IPeekCursor> peekRemote( UID dbgid, Version begin, Optional<Version> end, Tag tag, bool parallelGetMore ) {
int bestSet = -1;
Version lastBegin = recoveredAt.present() ? recoveredAt.get() + 1 : 0;
for(int t = 0; t < tLogs.size(); t++) {
@ -552,21 +552,21 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
if(bestSet == -1) {
TraceEvent("TLogPeekRemoteNoBestSet", dbgid).detail("Tag", tag.toString()).detail("Begin", begin);
TraceEvent("TLogPeekRemoteNoBestSet", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end.present() ? end.get() : getPeekEnd());
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, parallelGetMore ) );
}
if(begin >= lastBegin) {
TraceEvent("TLogPeekRemoteBestOnly", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("BestSet", bestSet).detail("BestSetStart", lastBegin).detail("LogRouterIds", tLogs[bestSet]->logRouterString());
return Reference<ILogSystem::MergedPeekCursor>( new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, begin, getPeekEnd(), parallelGetMore, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0 ) );
TraceEvent("TLogPeekRemoteBestOnly", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end.present() ? end.get() : getPeekEnd()).detail("BestSet", bestSet).detail("BestSetStart", lastBegin).detail("LogRouterIds", tLogs[bestSet]->logRouterString());
return Reference<ILogSystem::BufferedCursor>( new ILogSystem::BufferedCursor( tLogs[bestSet]->logRouters, tag, begin, end.present() ? end.get() + 1 : getPeekEnd(), parallelGetMore ) );
} else {
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
std::vector< LogMessageVersion > epochEnds;
TraceEvent("TLogPeekRemoteAddingBest", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("BestSet", bestSet).detail("BestSetStart", lastBegin).detail("LogRouterIds", tLogs[bestSet]->logRouterString());
cursors.emplace_back(new ILogSystem::MergedPeekCursor( tLogs[bestSet]->logRouters, -1, (int)tLogs[bestSet]->logRouters.size(), tag, lastBegin, getPeekEnd(), parallelGetMore, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0 ) );
TraceEvent("TLogPeekRemoteAddingBest", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end.present() ? end.get() : getPeekEnd()).detail("BestSet", bestSet).detail("BestSetStart", lastBegin).detail("LogRouterIds", tLogs[bestSet]->logRouterString());
cursors.emplace_back(new ILogSystem::BufferedCursor( tLogs[bestSet]->logRouters, tag, lastBegin, end.present() ? end.get() + 1 : getPeekEnd(), parallelGetMore ) );
int i = 0;
while(begin < lastBegin) {
if(i == oldLogData.size()) {
TraceEvent("TLogPeekRemoteDead", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("LastBegin", lastBegin).detail("OldLogDataSize", oldLogData.size());
TraceEvent("TLogPeekRemoteDead", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end.present() ? end.get() : getPeekEnd()).detail("LastBegin", lastBegin).detail("OldLogDataSize", oldLogData.size());
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, parallelGetMore ) );
}
@ -583,15 +583,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
if(bestOldSet == -1) {
TraceEvent("TLogPeekRemoteNoOldBestSet", dbgid).detail("Tag", tag.toString()).detail("Begin", begin);
TraceEvent("TLogPeekRemoteNoOldBestSet", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end.present() ? end.get() : getPeekEnd());
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, parallelGetMore ) );
}
if(thisBegin < lastBegin) {
TraceEvent("TLogPeekRemoteAddingOldBest", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("BestOldSet", bestOldSet).detail("LogRouterIds", oldLogData[i].tLogs[bestOldSet]->logRouterString())
TraceEvent("TLogPeekRemoteAddingOldBest", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end.present() ? end.get() : getPeekEnd()).detail("BestOldSet", bestOldSet).detail("LogRouterIds", oldLogData[i].tLogs[bestOldSet]->logRouterString())
.detail("LastBegin", lastBegin).detail("ThisBegin", thisBegin).detail("BestStartVer", oldLogData[i].tLogs[bestOldSet]->startVersion);
cursors.emplace_back(new ILogSystem::MergedPeekCursor(oldLogData[i].tLogs[bestOldSet]->logRouters, -1, (int)oldLogData[i].tLogs[bestOldSet]->logRouters.size(), tag,
thisBegin, lastBegin, parallelGetMore, std::vector<LocalityData>(), Reference<IReplicationPolicy>(), 0));
cursors.emplace_back(new ILogSystem::BufferedCursor(oldLogData[i].tLogs[bestOldSet]->logRouters, tag, thisBegin, lastBegin, parallelGetMore));
epochEnds.emplace_back(lastBegin);
lastBegin = thisBegin;
}
@ -602,14 +601,14 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
virtual Reference<IPeekCursor> peek( UID dbgid, Version begin, Tag tag, bool parallelGetMore ) {
virtual Reference<IPeekCursor> peek( UID dbgid, Version begin, Optional<Version> end, Tag tag, bool parallelGetMore ) {
if(!tLogs.size()) {
TraceEvent("TLogPeekNoLogSets", dbgid).detail("Tag", tag.toString()).detail("Begin", begin);
return Reference<ILogSystem::ServerPeekCursor>( new ILogSystem::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>>(), tag, begin, getPeekEnd(), false, false ) );
}
if(tag.locality == tagLocalityRemoteLog) {
return peekRemote(dbgid, begin, tag, parallelGetMore);
return peekRemote(dbgid, begin, end, tag, parallelGetMore);
} else {
return peekAll(dbgid, begin, getPeekEnd(), tag, parallelGetMore);
}
@ -622,12 +621,12 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
if(tags.size() == 1) {
return peek(dbgid, begin, tags[0], parallelGetMore);
return peek(dbgid, begin, end, tags[0], parallelGetMore);
}
std::vector< Reference<ILogSystem::IPeekCursor> > cursors;
for(auto tag : tags) {
cursors.push_back(peek(dbgid, begin, tag, parallelGetMore));
cursors.push_back(peek(dbgid, begin, end, tag, parallelGetMore));
}
return Reference<ILogSystem::BufferedCursor>( new ILogSystem::BufferedCursor(cursors, begin, end.present() ? end.get() + 1 : getPeekEnd(), true, tLogs[0]->locality == tagLocalityUpgraded, false) );
}