diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index 2f0442b7d5..8d8362d4c9 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -521,9 +521,8 @@ struct ILogSystem { std::vector> cursors; std::vector epochEnds; Version poppedVersion; - bool needsPopped; - MultiCursor( std::vector> cursors, std::vector epochEnds, bool needsPopped = true ); + MultiCursor( std::vector> cursors, std::vector epochEnds ); virtual Reference cloneNoMore(); virtual void setProtocolVersion( ProtocolVersion version ); @@ -578,6 +577,9 @@ struct ILogSystem { Version end; bool hasNextMessage; bool withTags; + Version poppedVersion; + Version initialPoppedVersion; + bool hasReturnedData; //FIXME: collectTags is needed to support upgrades from 5.X to 6.0. Remove this code when we no longer support that upgrade. bool collectTags; @@ -781,7 +783,7 @@ struct LogPushData : NonCopyable { next_message_tags.insert(next_message_tags.end(), tags.begin(), tags.end()); } - void addMessage( StringRef rawMessageWithoutLength, bool usePreviousLocations = false ) { + void addMessage( StringRef rawMessageWithoutLength, bool usePreviousLocations, Version commitVersion ) { if( !usePreviousLocations ) { prev_tags.clear(); if(logSystem->hasRemoteLogs()) { @@ -795,12 +797,14 @@ struct LogPushData : NonCopyable { next_message_tags.clear(); } uint32_t subseq = this->subsequence++; + uint32_t msgsize = rawMessageWithoutLength.size() + sizeof(subseq) + sizeof(uint16_t) + sizeof(Tag)*prev_tags.size(); for(int loc : msg_locations) { - messagesWriter[loc] << uint32_t(rawMessageWithoutLength.size() + sizeof(subseq) + sizeof(uint16_t) + sizeof(Tag)*prev_tags.size()) << subseq << uint16_t(prev_tags.size()); + messagesWriter[loc] << msgsize << subseq << uint16_t(prev_tags.size()); for(auto& tag : prev_tags) messagesWriter[loc] << tag; messagesWriter[loc].serializeBytes(rawMessageWithoutLength); } + TraceEvent("AddMessage").detail("Tags", describe(prev_tags)).detail("Version", commitVersion).detail("Subseq", subseq).detail("MsgSize", msgsize); } template diff --git a/fdbserver/LogSystemPeekCursor.actor.cpp b/fdbserver/LogSystemPeekCursor.actor.cpp index fc59c80bd9..147393fcfe 100644 --- a/fdbserver/LogSystemPeekCursor.actor.cpp +++ b/fdbserver/LogSystemPeekCursor.actor.cpp @@ -810,7 +810,7 @@ Version ILogSystem::SetPeekCursor::popped() { return poppedVersion; } -ILogSystem::MultiCursor::MultiCursor( std::vector> cursors, std::vector epochEnds, bool needsPopped ) : cursors(cursors), epochEnds(epochEnds), needsPopped(needsPopped), poppedVersion(0) { +ILogSystem::MultiCursor::MultiCursor( std::vector> cursors, std::vector epochEnds ) : cursors(cursors), epochEnds(epochEnds), poppedVersion(0) { for(int i = 0; i < std::min(cursors.size(),SERVER_KNOBS->MULTI_CURSOR_PRE_FETCH_LIMIT); i++) { cursors[cursors.size()-i-1]->getMore(); } @@ -854,7 +854,7 @@ const std::vector& ILogSystem::MultiCursor::getTags() { void ILogSystem::MultiCursor::advanceTo(LogMessageVersion n) { while( cursors.size() > 1 && n >= epochEnds.back() ) { - if(needsPopped) poppedVersion = std::max(poppedVersion, cursors.back()->popped()); + poppedVersion = std::max(poppedVersion, cursors.back()->popped()); cursors.pop_back(); epochEnds.pop_back(); } @@ -864,7 +864,7 @@ void ILogSystem::MultiCursor::advanceTo(LogMessageVersion n) { Future ILogSystem::MultiCursor::getMore(TaskPriority taskID) { LogMessageVersion startVersion = cursors.back()->version(); while( cursors.size() > 1 && cursors.back()->version() >= epochEnds.back() ) { - if(needsPopped) poppedVersion = std::max(poppedVersion, cursors.back()->popped()); + poppedVersion = std::max(poppedVersion, cursors.back()->popped()); cursors.pop_back(); epochEnds.pop_back(); } @@ -895,11 +895,10 @@ Version ILogSystem::MultiCursor::getMinKnownCommittedVersion() { } Version ILogSystem::MultiCursor::popped() { - ASSERT(needsPopped); return std::max(poppedVersion, cursors.back()->popped()); } -ILogSystem::BufferedCursor::BufferedCursor( std::vector> cursors, Version begin, Version end, bool withTags, bool collectTags ) : cursors(cursors), messageVersion(begin), end(end), withTags(withTags), collectTags(collectTags), hasNextMessage(false), messageIndex(0) { +ILogSystem::BufferedCursor::BufferedCursor( std::vector> cursors, Version begin, Version end, bool withTags, bool collectTags ) : cursors(cursors), messageVersion(begin), end(end), withTags(withTags), collectTags(collectTags), hasNextMessage(false), messageIndex(0), poppedVersion(0), initialPoppedVersion(0), hasReturnedData(false) { messages.reserve(10000); } @@ -994,6 +993,10 @@ ACTOR Future bufferedGetMoreLoader( ILogSystem::BufferedCursor* self, Refe } } wait(cursor->getMore(taskID)); + self->poppedVersion = std::max(self->poppedVersion, cursor->popped()); + if(!self->hasReturnedData) { + self->initialPoppedVersion = std::max(self->initialPoppedVersion, cursor->popped()); + } } } @@ -1032,6 +1035,14 @@ ACTOR Future bufferedGetMore( ILogSystem::BufferedCursor* self, TaskPriori } wait(yield()); + if(!self->hasReturnedData) { + while(self->hasNextMessage && self->version().version < self->poppedVersion) { + self->nextMessage(); + } + if(self->hasNextMessage) { + self->hasReturnedData = true; + } + } return Void(); } @@ -1069,6 +1080,8 @@ Version ILogSystem::BufferedCursor::getMinKnownCommittedVersion() { } Version ILogSystem::BufferedCursor::popped() { - ASSERT(false); - return invalidVersion; + if(initialPoppedVersion == poppedVersion) { + return 0; + } + return poppedVersion; } diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index caae64f284..5700ddac1a 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -807,7 +807,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted( new ILogSystem::BufferedCursor(allCursors, localEnd, end, false) ); epochEnds.emplace_back(localEnd); - return Reference( new ILogSystem::MultiCursor(cursors, epochEnds, false) ); + return Reference( new ILogSystem::MultiCursor(cursors, epochEnds) ); } catch( Error& e ) { if(e.code() == error_code_worker_removed) { std::vector< Reference > cursors;