/* * LogSystemPeekCursor.actor.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "fdbserver/LogSystem.h" #include "fdbrpc/FailureMonitor.h" #include "fdbserver/Knobs.h" #include "fdbrpc/ReplicationUtils.h" #include "flow/actorcompiler.h" // has to be last include ILogSystem::ServerPeekCursor::ServerPeekCursor( Reference>> const& interf, Tag tag, Version begin, Version end, bool returnIfBlocked, bool parallelGetMore ) : interf(interf), tag(tag), messageVersion(begin), end(end), hasMsg(false), rd(results.arena, results.messages, Unversioned()), randomID(deterministicRandom()->randomUniqueID()), poppedVersion(0), returnIfBlocked(returnIfBlocked), sequence(0), onlySpilled(false), parallelGetMore(parallelGetMore), lastReset(0), slowReplies(0), fastReplies(0), unknownReplies(0), resetCheck(Void()) { this->results.maxKnownVersion = 0; this->results.minKnownCommittedVersion = 0; //TraceEvent("SPC_Starting", randomID).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).backtrace(); } ILogSystem::ServerPeekCursor::ServerPeekCursor( TLogPeekReply const& results, LogMessageVersion const& messageVersion, LogMessageVersion const& end, TagsAndMessage const& message, bool hasMsg, Version poppedVersion, Tag tag ) : results(results), tag(tag), rd(results.arena, results.messages, Unversioned()), messageVersion(messageVersion), end(end), messageAndTags(message), hasMsg(hasMsg), randomID(deterministicRandom()->randomUniqueID()), poppedVersion(poppedVersion), returnIfBlocked(false), sequence(0), onlySpilled(false), parallelGetMore(false), lastReset(0), slowReplies(0), fastReplies(0), unknownReplies(0), resetCheck(Void()) { //TraceEvent("SPC_Clone", randomID); this->results.maxKnownVersion = 0; this->results.minKnownCommittedVersion = 0; if(hasMsg) nextMessage(); advanceTo(messageVersion); } Reference ILogSystem::ServerPeekCursor::cloneNoMore() { return Reference( new ILogSystem::ServerPeekCursor( results, messageVersion, end, messageAndTags, hasMsg, poppedVersion, tag ) ); } void ILogSystem::ServerPeekCursor::setProtocolVersion( ProtocolVersion version ) { rd.setProtocolVersion(version); } Arena& ILogSystem::ServerPeekCursor::arena() { return results.arena; } ArenaReader* ILogSystem::ServerPeekCursor::reader() { return &rd; } bool ILogSystem::ServerPeekCursor::hasMessage() { //TraceEvent("SPC_HasMessage", randomID).detail("HasMsg", hasMsg); return hasMsg; } void ILogSystem::ServerPeekCursor::nextMessage() { //TraceEvent("SPC_NextMessage", randomID).detail("MessageVersion", messageVersion.toString()); ASSERT(hasMsg); if (rd.empty()) { messageVersion.reset(std::min(results.end, end.version)); hasMsg = false; return; } if (*(int32_t*)rd.peekBytes(4) == VERSION_HEADER) { // A version int32_t dummy; Version ver; rd >> dummy >> ver; //TraceEvent("SPC_ProcessSeq", randomID).detail("MessageVersion", messageVersion.toString()).detail("Ver", ver).detail("Tag", tag.toString()); //ASSERT( ver >= messageVersion.version ); messageVersion.reset(ver); if( messageVersion >= end ) { messageVersion = end; hasMsg = false; return; } ASSERT(!rd.empty()); } messageAndTags.loadFromArena(&rd, &messageVersion.sub); // Rewind and consume the header so that reader() starts from the message. rd.rewind(); rd.readBytes(messageAndTags.getHeaderSize()); hasMsg = true; //TraceEvent("SPC_NextMessageB", randomID).detail("MessageVersion", messageVersion.toString()); } StringRef ILogSystem::ServerPeekCursor::getMessage() { //TraceEvent("SPC_GetMessage", randomID); StringRef message = messageAndTags.getMessageWithoutTags(); rd.readBytes(message.size()); // Consumes the message. return message; } StringRef ILogSystem::ServerPeekCursor::getMessageWithTags() { StringRef rawMessage = messageAndTags.getRawMessage(); rd.readBytes(rawMessage.size() - messageAndTags.getHeaderSize()); // Consumes the message. return rawMessage; } VectorRef ILogSystem::ServerPeekCursor::getTags() { return messageAndTags.tags; } void ILogSystem::ServerPeekCursor::advanceTo(LogMessageVersion n) { //TraceEvent("SPC_AdvanceTo", randomID).detail("N", n.toString()); while( messageVersion < n && hasMessage() ) { getMessage(); nextMessage(); } if( hasMessage() ) return; //if( more.isValid() && !more.isReady() ) more.cancel(); if( messageVersion < n ) { messageVersion = n; } } ACTOR Future resetChecker( ILogSystem::ServerPeekCursor* self, NetworkAddress addr ) { self->slowReplies = 0; self->unknownReplies = 0; self->fastReplies = 0; wait(delay(SERVER_KNOBS->PEEK_STATS_INTERVAL)); TraceEvent("SlowPeekStats", self->randomID) .detail("PeerAddress", addr) .detail("SlowReplies", self->slowReplies) .detail("FastReplies", self->fastReplies) .detail("UnknownReplies", self->unknownReplies); if (self->slowReplies >= SERVER_KNOBS->PEEK_STATS_SLOW_AMOUNT && self->slowReplies / double(self->slowReplies + self->fastReplies) >= SERVER_KNOBS->PEEK_STATS_SLOW_RATIO) { TraceEvent("ConnectionResetSlowPeek", self->randomID) .detail("PeerAddress", addr) .detail("SlowReplies", self->slowReplies) .detail("FastReplies", self->fastReplies) .detail("UnknownReplies", self->unknownReplies); FlowTransport::transport().resetConnection(addr); self->lastReset = now(); } return Void(); } ACTOR Future recordRequestMetrics( ILogSystem::ServerPeekCursor* self, NetworkAddress addr, Future in ) { try { state double startTime = now(); TLogPeekReply t = wait(in); if(now()-self->lastReset > SERVER_KNOBS->PEEK_RESET_INTERVAL) { if(now()-startTime > SERVER_KNOBS->PEEK_MAX_LATENCY) { if(t.messages.size() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES || SERVER_KNOBS->PEEK_COUNT_SMALL_MESSAGES) { if(self->resetCheck.isReady()) { self->resetCheck = resetChecker(self, addr); } self->slowReplies++; } else { self->unknownReplies++; } } else { self->fastReplies++; } } return t; } catch (Error& e) { if (e.code() != error_code_broken_promise) throw; wait(Never()); // never return throw internal_error(); // does not happen } } ACTOR Future serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self, TaskPriority taskID ) { if( !self->interf || self->messageVersion >= self->end ) { if( self->hasMessage() ) return Void(); wait( Future(Never())); throw internal_error(); } if(!self->interfaceChanged.isValid()) { self->interfaceChanged = self->interf->onChange(); } loop { state Version expectedBegin = self->messageVersion.version; try { if (self->parallelGetMore || self->onlySpilled) { while(self->futureResults.size() < SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->interf->get().present()) { self->futureResults.push_back( recordRequestMetrics( self, self->interf->get().interf().peekMessages.getEndpoint().getPrimaryAddress(), self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, self->onlySpilled, std::make_pair(self->randomID, self->sequence++)), taskID) ) ); } if (self->sequence == std::numeric_limitssequence)>::max()) { throw operation_obsolete(); } } else if (self->futureResults.size() == 0) { return Void(); } if( self->hasMessage() ) return Void(); choose { when( TLogPeekReply res = wait( self->interf->get().present() ? self->futureResults.front() : Never() ) ) { if(res.begin.get() != expectedBegin) { throw operation_obsolete(); } expectedBegin = res.end; self->futureResults.pop_front(); self->results = res; self->onlySpilled = res.onlySpilled; if(res.popped.present()) self->poppedVersion = std::min( std::max(self->poppedVersion, res.popped.get()), self->end.version ); self->rd = ArenaReader( self->results.arena, self->results.messages, Unversioned() ); LogMessageVersion skipSeq = self->messageVersion; self->hasMsg = true; self->nextMessage(); self->advanceTo(skipSeq); //TraceEvent("SPC_GetMoreB", self->randomID).detail("Has", self->hasMessage()).detail("End", res.end).detail("Popped", res.popped.present() ? res.popped.get() : 0); return Void(); } when( wait( self->interfaceChanged ) ) { self->interfaceChanged = self->interf->onChange(); self->randomID = deterministicRandom()->randomUniqueID(); self->sequence = 0; self->onlySpilled = false; self->futureResults.clear(); } } } catch( Error &e ) { if(e.code() == error_code_end_of_stream) { self->end.reset( self->messageVersion.version ); return Void(); } else if(e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) { TraceEvent("PeekCursorTimedOut", self->randomID).error(e); // We *should* never get timed_out(), as it means the TLog got stuck while handling a parallel peek, // and thus we've likely just wasted 10min. // timed_out() is sent by cleanupPeekTrackers as value PEEK_TRACKER_EXPIRATION_TIME ASSERT_WE_THINK(e.code() == error_code_operation_obsolete || SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME < 10); self->interfaceChanged = self->interf->onChange(); self->randomID = deterministicRandom()->randomUniqueID(); self->sequence = 0; self->futureResults.clear(); } else { throw e; } } } } ACTOR Future serverPeekGetMore( ILogSystem::ServerPeekCursor* self, TaskPriority taskID ) { if( !self->interf || self->messageVersion >= self->end ) { wait( Future(Never())); throw internal_error(); } try { loop { choose { when( TLogPeekReply res = wait( self->interf->get().present() ? brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, self->onlySpilled), taskID) ) : Never() ) ) { self->results = res; self->onlySpilled = res.onlySpilled; if(res.popped.present()) self->poppedVersion = std::min( std::max(self->poppedVersion, res.popped.get()), self->end.version ); self->rd = ArenaReader( self->results.arena, self->results.messages, Unversioned() ); LogMessageVersion skipSeq = self->messageVersion; self->hasMsg = true; self->nextMessage(); self->advanceTo(skipSeq); //TraceEvent("SPC_GetMoreB", self->randomID).detail("Has", self->hasMessage()).detail("End", res.end).detail("Popped", res.popped.present() ? res.popped.get() : 0); return Void(); } when( wait( self->interf->onChange() ) ) { self->onlySpilled = false; } } } } catch( Error &e ) { if(e.code() == error_code_end_of_stream) { self->end.reset( self->messageVersion.version ); return Void(); } throw e; } } Future ILogSystem::ServerPeekCursor::getMore(TaskPriority taskID) { //TraceEvent("SPC_GetMore", randomID).detail("HasMessage", hasMessage()).detail("More", !more.isValid() || more.isReady()).detail("MessageVersion", messageVersion.toString()).detail("End", end.toString()); if( hasMessage() && !parallelGetMore ) return Void(); if( !more.isValid() || more.isReady() ) { if (parallelGetMore || onlySpilled || futureResults.size()) { more = serverPeekParallelGetMore(this, taskID); } else { more = serverPeekGetMore(this, taskID); } } return more; } ACTOR Future serverPeekOnFailed( ILogSystem::ServerPeekCursor* self ) { loop { choose { when( wait( self->interf->get().present() ? IFailureMonitor::failureMonitor().onStateEqual( self->interf->get().interf().peekMessages.getEndpoint(), FailureStatus() ) : Never() ) ) { return Void(); } when( wait( self->interf->onChange() ) ) {} } } } Future ILogSystem::ServerPeekCursor::onFailed() { return serverPeekOnFailed(this); } bool ILogSystem::ServerPeekCursor::isActive() { if( !interf->get().present() ) return false; if( messageVersion >= end ) return false; return IFailureMonitor::failureMonitor().getState( interf->get().interf().peekMessages.getEndpoint() ).isAvailable(); } bool ILogSystem::ServerPeekCursor::isExhausted() { return messageVersion >= end; } const LogMessageVersion& ILogSystem::ServerPeekCursor::version() { return messageVersion; } // Call only after nextMessage(). The sequence of the current message, or results.end if nextMessage() has returned false. Version ILogSystem::ServerPeekCursor::getMinKnownCommittedVersion() { return results.minKnownCommittedVersion; } Optional ILogSystem::ServerPeekCursor::getPrimaryPeekLocation() { if(interf) { return interf->get().id(); } return Optional(); } Version ILogSystem::ServerPeekCursor::popped() { return poppedVersion; } ILogSystem::MergedPeekCursor::MergedPeekCursor( vector< Reference > const& serverCursors, Version begin ) : serverCursors(serverCursors), bestServer(-1), readQuorum(serverCursors.size()), tag(invalidTag), currentCursor(0), hasNextMessage(false), messageVersion(begin), randomID(deterministicRandom()->randomUniqueID()), tLogReplicationFactor(0) { sortedVersions.resize(serverCursors.size()); } ILogSystem::MergedPeekCursor::MergedPeekCursor( std::vector>>> const& logServers, int bestServer, int readQuorum, Tag tag, Version begin, Version end, bool parallelGetMore, std::vector< LocalityData > const& tLogLocalities, Reference const tLogPolicy, int tLogReplicationFactor ) : bestServer(bestServer), readQuorum(readQuorum), tag(tag), currentCursor(0), hasNextMessage(false), messageVersion(begin), randomID(deterministicRandom()->randomUniqueID()), tLogReplicationFactor(tLogReplicationFactor) { if(tLogPolicy) { logSet = Reference( new LogSet() ); logSet->tLogPolicy = tLogPolicy; logSet->tLogLocalities = tLogLocalities; filterLocalityDataForPolicy(logSet->tLogPolicy, &logSet->tLogLocalities); logSet->updateLocalitySet(logSet->tLogLocalities); } for( int i = 0; i < logServers.size(); i++ ) { Reference cursor( new ILogSystem::ServerPeekCursor( logServers[i], tag, begin, end, bestServer >= 0, parallelGetMore ) ); //TraceEvent("MPC_Starting", randomID).detail("Cursor", cursor->randomID).detail("End", end); serverCursors.push_back( cursor ); } sortedVersions.resize(serverCursors.size()); } ILogSystem::MergedPeekCursor::MergedPeekCursor( vector< Reference > const& serverCursors, LogMessageVersion const& messageVersion, int bestServer, int readQuorum, Optional nextVersion, Reference logSet, int tLogReplicationFactor ) : serverCursors(serverCursors), bestServer(bestServer), readQuorum(readQuorum), currentCursor(0), hasNextMessage(false), messageVersion(messageVersion), nextVersion(nextVersion), logSet(logSet), randomID(deterministicRandom()->randomUniqueID()), tLogReplicationFactor(tLogReplicationFactor) { sortedVersions.resize(serverCursors.size()); calcHasMessage(); } Reference ILogSystem::MergedPeekCursor::cloneNoMore() { vector< Reference > cursors; for( auto it : serverCursors ) { cursors.push_back(it->cloneNoMore()); } return Reference( new ILogSystem::MergedPeekCursor( cursors, messageVersion, bestServer, readQuorum, nextVersion, logSet, tLogReplicationFactor ) ); } void ILogSystem::MergedPeekCursor::setProtocolVersion( ProtocolVersion version ) { for( auto it : serverCursors ) if( it->hasMessage() ) it->setProtocolVersion( version ); } Arena& ILogSystem::MergedPeekCursor::arena() { return serverCursors[currentCursor]->arena(); } ArenaReader* ILogSystem::MergedPeekCursor::reader() { return serverCursors[currentCursor]->reader(); } void ILogSystem::MergedPeekCursor::calcHasMessage() { if(bestServer >= 0) { if(nextVersion.present()) serverCursors[bestServer]->advanceTo( nextVersion.get() ); if( serverCursors[bestServer]->hasMessage() ) { messageVersion = serverCursors[bestServer]->version(); currentCursor = bestServer; hasNextMessage = true; for (auto& c : serverCursors) c->advanceTo(messageVersion); return; } auto bestVersion = serverCursors[bestServer]->version(); for (auto& c : serverCursors) c->advanceTo(bestVersion); } hasNextMessage = false; updateMessage(false); if(!hasNextMessage && logSet) { updateMessage(true); } } void ILogSystem::MergedPeekCursor::updateMessage(bool usePolicy) { loop { bool advancedPast = false; sortedVersions.clear(); for(int i = 0; i < serverCursors.size(); i++) { auto& serverCursor = serverCursors[i]; if (nextVersion.present()) serverCursor->advanceTo(nextVersion.get()); sortedVersions.push_back(std::pair(serverCursor->version(), i)); } if(usePolicy) { ASSERT(logSet->tLogPolicy); std::sort(sortedVersions.begin(), sortedVersions.end()); locations.clear(); for(auto sortedVersion : sortedVersions) { locations.push_back(logSet->logEntryArray[sortedVersion.second]); if( locations.size() >= tLogReplicationFactor && logSet->satisfiesPolicy(locations) ) { messageVersion = sortedVersion.first; break; } } } else { std::nth_element(sortedVersions.begin(), sortedVersions.end()-readQuorum, sortedVersions.end()); messageVersion = sortedVersions[sortedVersions.size()-readQuorum].first; } for(int i = 0; i < serverCursors.size(); i++) { auto& c = serverCursors[i]; auto start = c->version(); c->advanceTo(messageVersion); if( start <= messageVersion && messageVersion < c->version() ) { advancedPast = true; TEST(true); //Merge peek cursor advanced past desired sequence } } if(!advancedPast) break; } for(int i = 0; i < serverCursors.size(); i++) { auto& c = serverCursors[i]; ASSERT_WE_THINK( !c->hasMessage() || c->version() >= messageVersion ); // Seems like the loop above makes this unconditionally true if (c->version() == messageVersion && c->hasMessage()) { hasNextMessage = true; currentCursor = i; break; } } } bool ILogSystem::MergedPeekCursor::hasMessage() { return hasNextMessage; } void ILogSystem::MergedPeekCursor::nextMessage() { nextVersion = version(); nextVersion.get().sub++; serverCursors[currentCursor]->nextMessage(); calcHasMessage(); ASSERT(hasMessage() || !version().sub); } StringRef ILogSystem::MergedPeekCursor::getMessage() { return serverCursors[currentCursor]->getMessage(); } StringRef ILogSystem::MergedPeekCursor::getMessageWithTags() { return serverCursors[currentCursor]->getMessageWithTags(); } VectorRef ILogSystem::MergedPeekCursor::getTags() { return serverCursors[currentCursor]->getTags(); } void ILogSystem::MergedPeekCursor::advanceTo(LogMessageVersion n) { bool canChange = false; for (auto& c : serverCursors) { if(c->version() < n) { canChange = true; c->advanceTo(n); } } if(canChange) { calcHasMessage(); } } ACTOR Future mergedPeekGetMore(ILogSystem::MergedPeekCursor* self, LogMessageVersion startVersion, TaskPriority taskID) { loop { //TraceEvent("MPC_GetMoreA", self->randomID).detail("Start", startVersion.toString()); if(self->bestServer >= 0 && self->serverCursors[self->bestServer]->isActive()) { ASSERT(!self->serverCursors[self->bestServer]->hasMessage()); wait( self->serverCursors[self->bestServer]->getMore(taskID) || self->serverCursors[self->bestServer]->onFailed() ); } else { vector> q; for (auto& c : self->serverCursors) if (!c->hasMessage()) q.push_back(c->getMore(taskID)); wait(quorum(q, 1)); } self->calcHasMessage(); //TraceEvent("MPC_GetMoreB", self->randomID).detail("HasMessage", self->hasMessage()).detail("Start", startVersion.toString()).detail("Seq", self->version().toString()); if (self->hasMessage() || self->version() > startVersion) { return Void(); } } } Future ILogSystem::MergedPeekCursor::getMore(TaskPriority taskID) { if( more.isValid() && !more.isReady() ) { return more; } if(!serverCursors.size()) return Never(); auto startVersion = version(); calcHasMessage(); if( hasMessage() ) return Void(); if (nextVersion.present()) advanceTo(nextVersion.get()); ASSERT(!hasMessage()); if (version() > startVersion) return Void(); more = mergedPeekGetMore(this, startVersion, taskID); return more; } Future ILogSystem::MergedPeekCursor::onFailed() { ASSERT(false); return Never(); } bool ILogSystem::MergedPeekCursor::isActive() { ASSERT(false); return false; } bool ILogSystem::MergedPeekCursor::isExhausted() { return serverCursors[currentCursor]->isExhausted(); } const LogMessageVersion& ILogSystem::MergedPeekCursor::version() { return messageVersion; } Version ILogSystem::MergedPeekCursor::getMinKnownCommittedVersion() { return serverCursors[currentCursor]->getMinKnownCommittedVersion(); } Optional ILogSystem::MergedPeekCursor::getPrimaryPeekLocation() { if(bestServer >= 0) { return serverCursors[bestServer]->getPrimaryPeekLocation(); } return Optional(); } Version ILogSystem::MergedPeekCursor::popped() { Version poppedVersion = 0; for (auto& c : serverCursors) poppedVersion = std::max(poppedVersion, c->popped()); return poppedVersion; } ILogSystem::SetPeekCursor::SetPeekCursor( std::vector> const& logSets, int bestSet, int bestServer, Tag tag, Version begin, Version end, bool parallelGetMore ) : logSets(logSets), bestSet(bestSet), bestServer(bestServer), tag(tag), currentCursor(0), currentSet(bestSet), hasNextMessage(false), messageVersion(begin), useBestSet(true), randomID(deterministicRandom()->randomUniqueID()) { serverCursors.resize(logSets.size()); int maxServers = 0; for( int i = 0; i < logSets.size(); i++ ) { for( int j = 0; j < logSets[i]->logServers.size(); j++) { Reference cursor( new ILogSystem::ServerPeekCursor( logSets[i]->logServers[j], tag, begin, end, true, parallelGetMore ) ); serverCursors[i].push_back( cursor ); } maxServers = std::max(maxServers, serverCursors[i].size()); } sortedVersions.resize(maxServers); } ILogSystem::SetPeekCursor::SetPeekCursor( std::vector> const& logSets, std::vector< std::vector< Reference > > const& serverCursors, LogMessageVersion const& messageVersion, int bestSet, int bestServer, Optional nextVersion, bool useBestSet ) : logSets(logSets), serverCursors(serverCursors), messageVersion(messageVersion), bestSet(bestSet), bestServer(bestServer), nextVersion(nextVersion), currentSet(bestSet), currentCursor(0), hasNextMessage(false), useBestSet(useBestSet), randomID(deterministicRandom()->randomUniqueID()) { int maxServers = 0; for( int i = 0; i < logSets.size(); i++ ) { maxServers = std::max(maxServers, serverCursors[i].size()); } sortedVersions.resize(maxServers); calcHasMessage(); } Reference ILogSystem::SetPeekCursor::cloneNoMore() { vector< vector< Reference > > cursors; cursors.resize(logSets.size()); for( int i = 0; i < logSets.size(); i++ ) { for( int j = 0; j < logSets[i]->logServers.size(); j++) { cursors[i].push_back( serverCursors[i][j]->cloneNoMore() ); } } return Reference( new ILogSystem::SetPeekCursor( logSets, cursors, messageVersion, bestSet, bestServer, nextVersion, useBestSet ) ); } void ILogSystem::SetPeekCursor::setProtocolVersion( ProtocolVersion version ) { for( auto& cursors : serverCursors ) { for( auto& it : cursors ) { if( it->hasMessage() ) { it->setProtocolVersion( version ); } } } } Arena& ILogSystem::SetPeekCursor::arena() { return serverCursors[currentSet][currentCursor]->arena(); } ArenaReader* ILogSystem::SetPeekCursor::reader() { return serverCursors[currentSet][currentCursor]->reader(); } void ILogSystem::SetPeekCursor::calcHasMessage() { if(bestSet >= 0 && bestServer >= 0) { if(nextVersion.present()) { //TraceEvent("LPC_CalcNext").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage).detail("NextVersion", nextVersion.get().toString()); serverCursors[bestSet][bestServer]->advanceTo( nextVersion.get() ); } if( serverCursors[bestSet][bestServer]->hasMessage() ) { messageVersion = serverCursors[bestSet][bestServer]->version(); currentSet = bestSet; currentCursor = bestServer; hasNextMessage = true; //TraceEvent("LPC_Calc1").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage); for (auto& cursors : serverCursors) { for(auto& c : cursors) { c->advanceTo(messageVersion); } } return; } auto bestVersion = serverCursors[bestSet][bestServer]->version(); for (auto& cursors : serverCursors) { for (auto& c : cursors) { c->advanceTo(bestVersion); } } } hasNextMessage = false; if(useBestSet) { updateMessage(bestSet, false); // Use Quorum logic //TraceEvent("LPC_Calc2").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage); if(!hasNextMessage) { updateMessage(bestSet, true); //TraceEvent("LPC_Calc3").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage); } } else { for(int i = 0; i < logSets.size() && !hasNextMessage; i++) { if(i != bestSet) { updateMessage(i, false); // Use Quorum logic } } //TraceEvent("LPC_Calc4").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage); for(int i = 0; i < logSets.size() && !hasNextMessage; i++) { if(i != bestSet) { updateMessage(i, true); } } //TraceEvent("LPC_Calc5").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage); } } void ILogSystem::SetPeekCursor::updateMessage(int logIdx, bool usePolicy) { loop { bool advancedPast = false; sortedVersions.clear(); for(int i = 0; i < serverCursors[logIdx].size(); i++) { auto& serverCursor = serverCursors[logIdx][i]; if (nextVersion.present()) serverCursor->advanceTo(nextVersion.get()); sortedVersions.push_back(std::pair(serverCursor->version(), i)); //TraceEvent("LPC_Update1").detail("Ver", messageVersion.toString()).detail("Tag", tag.toString()).detail("HasNextMessage", hasNextMessage).detail("ServerVer", serverCursor->version().toString()).detail("I", i); } if(usePolicy) { std::sort(sortedVersions.begin(), sortedVersions.end()); locations.clear(); for(auto sortedVersion : sortedVersions) { locations.push_back(logSets[logIdx]->logEntryArray[sortedVersion.second]); if( locations.size() >= logSets[logIdx]->tLogReplicationFactor && logSets[logIdx]->satisfiesPolicy(locations) ) { messageVersion = sortedVersion.first; break; } } } else { //(int)oldLogData[i].logServers.size() + 1 - oldLogData[i].tLogReplicationFactor std::nth_element(sortedVersions.begin(), sortedVersions.end()-(logSets[logIdx]->logServers.size()+1-logSets[logIdx]->tLogReplicationFactor), sortedVersions.end()); messageVersion = sortedVersions[sortedVersions.size()-(logSets[logIdx]->logServers.size()+1-logSets[logIdx]->tLogReplicationFactor)].first; } for (auto& cursors : serverCursors) { for (auto& c : cursors) { auto start = c->version(); c->advanceTo(messageVersion); if( start <= messageVersion && messageVersion < c->version() ) { advancedPast = true; TEST(true); //Merge peek cursor advanced past desired sequence } } } if(!advancedPast) break; } for(int i = 0; i < serverCursors[logIdx].size(); i++) { auto& c = serverCursors[logIdx][i]; ASSERT_WE_THINK( !c->hasMessage() || c->version() >= messageVersion ); // Seems like the loop above makes this unconditionally true if (c->version() == messageVersion && c->hasMessage()) { hasNextMessage = true; currentSet = logIdx; currentCursor = i; break; } } } bool ILogSystem::SetPeekCursor::hasMessage() { return hasNextMessage; } void ILogSystem::SetPeekCursor::nextMessage() { nextVersion = version(); nextVersion.get().sub++; serverCursors[currentSet][currentCursor]->nextMessage(); calcHasMessage(); ASSERT(hasMessage() || !version().sub); } StringRef ILogSystem::SetPeekCursor::getMessage() { return serverCursors[currentSet][currentCursor]->getMessage(); } StringRef ILogSystem::SetPeekCursor::getMessageWithTags() { return serverCursors[currentSet][currentCursor]->getMessageWithTags(); } VectorRef ILogSystem::SetPeekCursor::getTags() { return serverCursors[currentSet][currentCursor]->getTags(); } void ILogSystem::SetPeekCursor::advanceTo(LogMessageVersion n) { bool canChange = false; for( auto& cursors : serverCursors ) { for (auto& c : cursors) { if(c->version() < n) { canChange = true; c->advanceTo(n); } } } if(canChange) { calcHasMessage(); } } ACTOR Future setPeekGetMore(ILogSystem::SetPeekCursor* self, LogMessageVersion startVersion, TaskPriority taskID) { loop { //TraceEvent("LPC_GetMore1", self->randomID).detail("Start", startVersion.toString()).detail("Tag", self->tag); if(self->bestServer >= 0 && self->bestSet >= 0 && self->serverCursors[self->bestSet][self->bestServer]->isActive()) { ASSERT(!self->serverCursors[self->bestSet][self->bestServer]->hasMessage()); //TraceEvent("LPC_GetMore2", self->randomID).detail("Start", startVersion.toString()).detail("Tag", self->tag); wait( self->serverCursors[self->bestSet][self->bestServer]->getMore(taskID) || self->serverCursors[self->bestSet][self->bestServer]->onFailed() ); self->useBestSet = true; } else { //FIXME: if best set is exhausted, do not peek remote servers bool bestSetValid = self->bestSet >= 0; if(bestSetValid) { self->locations.clear(); for( int i = 0; i < self->serverCursors[self->bestSet].size(); i++) { if(!self->serverCursors[self->bestSet][i]->isActive() && self->serverCursors[self->bestSet][i]->version() <= self->messageVersion) { self->locations.push_back(self->logSets[self->bestSet]->logEntryArray[i]); } } bestSetValid = self->locations.size() < self->logSets[self->bestSet]->tLogReplicationFactor || !self->logSets[self->bestSet]->satisfiesPolicy(self->locations); } if(bestSetValid || self->logSets.size() == 1) { if(!self->useBestSet) { self->useBestSet = true; self->calcHasMessage(); if (self->hasMessage() || self->version() > startVersion) return Void(); } //TraceEvent("LPC_GetMore3", self->randomID).detail("Start", startVersion.toString()).detail("Tag", self->tag.toString()).detail("BestSetSize", self->serverCursors[self->bestSet].size()); vector> q; for (auto& c : self->serverCursors[self->bestSet]) { if (!c->hasMessage()) { q.push_back(c->getMore(taskID)); if(c->isActive()) { q.push_back(c->onFailed()); } } } wait(quorum(q, 1)); } else { //FIXME: this will peeking way too many cursors when satellites exist, and does not need to peek bestSet cursors since we cannot get anymore data from them vector> q; //TraceEvent("LPC_GetMore4", self->randomID).detail("Start", startVersion.toString()).detail("Tag", self->tag); for(auto& cursors : self->serverCursors) { for (auto& c :cursors) { if (!c->hasMessage()) { q.push_back(c->getMore(taskID)); } } } wait(quorum(q, 1)); self->useBestSet = false; } } self->calcHasMessage(); //TraceEvent("LPC_GetMoreB", self->randomID).detail("HasMessage", self->hasMessage()).detail("Start", startVersion.toString()).detail("Seq", self->version().toString()); if (self->hasMessage() || self->version() > startVersion) return Void(); } } Future ILogSystem::SetPeekCursor::getMore(TaskPriority taskID) { if( more.isValid() && !more.isReady() ) { return more; } auto startVersion = version(); calcHasMessage(); if( hasMessage() ) return Void(); if (nextVersion.present()) advanceTo(nextVersion.get()); ASSERT(!hasMessage()); if (version() > startVersion) return Void(); more = setPeekGetMore(this, startVersion, taskID); return more; } Future ILogSystem::SetPeekCursor::onFailed() { ASSERT(false); return Never(); } bool ILogSystem::SetPeekCursor::isActive() { ASSERT(false); return false; } bool ILogSystem::SetPeekCursor::isExhausted() { return serverCursors[currentSet][currentCursor]->isExhausted(); } const LogMessageVersion& ILogSystem::SetPeekCursor::version() { return messageVersion; } Version ILogSystem::SetPeekCursor::getMinKnownCommittedVersion() { return serverCursors[currentSet][currentCursor]->getMinKnownCommittedVersion(); } Optional ILogSystem::SetPeekCursor::getPrimaryPeekLocation() { if(bestServer >= 0 && bestSet >= 0) { return serverCursors[bestSet][bestServer]->getPrimaryPeekLocation(); } return Optional(); } Version ILogSystem::SetPeekCursor::popped() { Version poppedVersion = 0; for (auto& cursors : serverCursors) { for(auto& c : cursors) { poppedVersion = std::max(poppedVersion, c->popped()); } } return poppedVersion; } ILogSystem::MultiCursor::MultiCursor( std::vector> cursors, std::vector epochEnds ) : cursors(cursors), epochEnds(epochEnds), poppedVersion(0) { for(int i = 0; i < std::min(cursors.size(),SERVER_KNOBS->MULTI_CURSOR_PRE_FETCH_LIMIT); i++) { cursors[cursors.size()-i-1]->getMore(); } } Reference ILogSystem::MultiCursor::cloneNoMore() { return cursors.back()->cloneNoMore(); } void ILogSystem::MultiCursor::setProtocolVersion( ProtocolVersion version ) { cursors.back()->setProtocolVersion(version); } Arena& ILogSystem::MultiCursor::arena() { return cursors.back()->arena(); } ArenaReader* ILogSystem::MultiCursor::reader() { return cursors.back()->reader(); } bool ILogSystem::MultiCursor::hasMessage() { return cursors.back()->hasMessage(); } void ILogSystem::MultiCursor::nextMessage() { cursors.back()->nextMessage(); } StringRef ILogSystem::MultiCursor::getMessage() { return cursors.back()->getMessage(); } StringRef ILogSystem::MultiCursor::getMessageWithTags() { return cursors.back()->getMessageWithTags(); } VectorRef ILogSystem::MultiCursor::getTags() { return cursors.back()->getTags(); } void ILogSystem::MultiCursor::advanceTo(LogMessageVersion n) { while( cursors.size() > 1 && n >= epochEnds.back() ) { poppedVersion = std::max(poppedVersion, cursors.back()->popped()); cursors.pop_back(); epochEnds.pop_back(); } cursors.back()->advanceTo(n); } Future ILogSystem::MultiCursor::getMore(TaskPriority taskID) { LogMessageVersion startVersion = cursors.back()->version(); while( cursors.size() > 1 && cursors.back()->version() >= epochEnds.back() ) { poppedVersion = std::max(poppedVersion, cursors.back()->popped()); cursors.pop_back(); epochEnds.pop_back(); } if(cursors.back()->version() > startVersion) { return Void(); } return cursors.back()->getMore(taskID); } Future ILogSystem::MultiCursor::onFailed() { return cursors.back()->onFailed(); } bool ILogSystem::MultiCursor::isActive() { return cursors.back()->isActive(); } bool ILogSystem::MultiCursor::isExhausted() { return cursors.back()->isExhausted(); } const LogMessageVersion& ILogSystem::MultiCursor::version() { return cursors.back()->version(); } Version ILogSystem::MultiCursor::getMinKnownCommittedVersion() { return cursors.back()->getMinKnownCommittedVersion(); } Optional ILogSystem::MultiCursor::getPrimaryPeekLocation() { return cursors.back()->getPrimaryPeekLocation(); } Version ILogSystem::MultiCursor::popped() { return std::max(poppedVersion, cursors.back()->popped()); } ILogSystem::BufferedCursor::BufferedCursor( std::vector> cursors, Version begin, Version end, bool withTags, bool collectTags, bool canDiscardPopped ) : cursors(cursors), messageVersion(begin), end(end), withTags(withTags), collectTags(collectTags), hasNextMessage(false), messageIndex(0), poppedVersion(0), initialPoppedVersion(0), canDiscardPopped(canDiscardPopped), knownUnique(false), minKnownCommittedVersion(0), randomID(deterministicRandom()->randomUniqueID()) { targetQueueSize = SERVER_KNOBS->DESIRED_OUTSTANDING_MESSAGES/cursors.size(); messages.reserve(SERVER_KNOBS->DESIRED_OUTSTANDING_MESSAGES); cursorMessages.resize(cursors.size()); } ILogSystem::BufferedCursor::BufferedCursor( std::vector>>> const& logServers, Tag tag, Version begin, Version end, bool parallelGetMore ) : messageVersion(begin), end(end), withTags(true), collectTags(false), hasNextMessage(false), messageIndex(0), poppedVersion(0), initialPoppedVersion(0), canDiscardPopped(false), knownUnique(true), minKnownCommittedVersion(0), randomID(deterministicRandom()->randomUniqueID()) { targetQueueSize = SERVER_KNOBS->DESIRED_OUTSTANDING_MESSAGES/logServers.size(); messages.reserve(SERVER_KNOBS->DESIRED_OUTSTANDING_MESSAGES); cursorMessages.resize(logServers.size()); for( int i = 0; i < logServers.size(); i++ ) { Reference cursor( new ILogSystem::ServerPeekCursor( logServers[i], tag, begin, end, false, parallelGetMore ) ); cursors.push_back( cursor ); } } void ILogSystem::BufferedCursor::combineMessages() { if(!hasNextMessage) { return; } std::vector tags; tags.push_back(messages[messageIndex].tags[0]); for(int i = messageIndex + 1; i < messages.size() && messages[messageIndex].version == messages[i].version; i++) { tags.push_back(messages[i].tags[0]); messageIndex = i; } auto& msg = messages[messageIndex]; BinaryWriter messageWriter(Unversioned()); messageWriter << uint32_t(msg.message.size() + sizeof(uint32_t) + sizeof(uint16_t) + tags.size()*sizeof(Tag)) << msg.version.sub << uint16_t(tags.size()); for(auto t : tags) { messageWriter << t; } messageWriter.serializeBytes(msg.message); Standalone val = messageWriter.toValue(); msg.arena = val.arena(); msg.message = val; msg.tags = VectorRef(); for(auto t : tags) { msg.tags.push_back(msg.arena, t); } } Reference ILogSystem::BufferedCursor::cloneNoMore() { ASSERT(false); return Reference(); } void ILogSystem::BufferedCursor::setProtocolVersion( ProtocolVersion version ) { for(auto& c : cursors) { c->setProtocolVersion(version); } } Arena& ILogSystem::BufferedCursor::arena() { return messages[messageIndex].arena; } ArenaReader* ILogSystem::BufferedCursor::reader() { ASSERT(false); return cursors[0]->reader(); } bool ILogSystem::BufferedCursor::hasMessage() { return hasNextMessage; } void ILogSystem::BufferedCursor::nextMessage() { messageIndex++; if(messageIndex == messages.size()) { hasNextMessage = false; } if(collectTags) { combineMessages(); } } StringRef ILogSystem::BufferedCursor::getMessage() { ASSERT(!withTags); return messages[messageIndex].message; } StringRef ILogSystem::BufferedCursor::getMessageWithTags() { ASSERT(withTags); return messages[messageIndex].message; } VectorRef ILogSystem::BufferedCursor::getTags() { ASSERT(withTags); return messages[messageIndex].tags; } void ILogSystem::BufferedCursor::advanceTo(LogMessageVersion n) { ASSERT(false); } ACTOR Future bufferedGetMoreLoader( ILogSystem::BufferedCursor* self, Reference cursor, int idx, TaskPriority taskID ) { loop { wait(yield()); if(cursor->version().version >= self->end || self->cursorMessages[idx].size() > self->targetQueueSize) { return Void(); } wait(cursor->getMore(taskID)); self->poppedVersion = std::max(self->poppedVersion, cursor->popped()); self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, cursor->getMinKnownCommittedVersion()); if(self->canDiscardPopped) { self->initialPoppedVersion = std::max(self->initialPoppedVersion, cursor->popped()); } if(cursor->version().version >= self->end) { return Void(); } while(cursor->hasMessage()) { self->cursorMessages[idx].push_back(ILogSystem::BufferedCursor::BufferedMessage(cursor->arena(), (!self->withTags || self->collectTags) ? cursor->getMessage() : cursor->getMessageWithTags(), !self->withTags ? VectorRef() : cursor->getTags(), cursor->version())); cursor->nextMessage(); } } } ACTOR Future bufferedGetMore( ILogSystem::BufferedCursor* self, TaskPriority taskID ) { if( self->messageVersion.version >= self->end ) { wait( Future(Never())); throw internal_error(); } self->messages.clear(); std::vector> loaders; loaders.reserve(self->cursors.size()); for(int i = 0; i < self->cursors.size(); i++) { loaders.push_back(bufferedGetMoreLoader(self, self->cursors[i], i, taskID)); } state Future allLoaders = waitForAll(loaders); state Version minVersion; loop { wait( allLoaders || delay(SERVER_KNOBS->DESIRED_GET_MORE_DELAY, taskID) ); minVersion = self->end; for(int i = 0; i < self->cursors.size(); i++) { auto cursor = self->cursors[i]; while(cursor->hasMessage()) { self->cursorMessages[i].push_back(ILogSystem::BufferedCursor::BufferedMessage(cursor->arena(), (!self->withTags || self->collectTags) ? cursor->getMessage() : cursor->getMessageWithTags(), !self->withTags ? VectorRef() : cursor->getTags(), cursor->version())); cursor->nextMessage(); } minVersion = std::min(minVersion, cursor->version().version); } if(minVersion > self->messageVersion.version) { break; } if(allLoaders.isReady()) { wait(Future(Never())); } } wait( yield() ); for(auto &it : self->cursorMessages) { while(!it.empty() && it.front().version.version < minVersion) { self->messages.push_back(it.front()); it.pop_front(); } } if(self->collectTags || self->knownUnique) { std::sort(self->messages.begin(), self->messages.end()); } else { uniquify(self->messages); } self->messageVersion = LogMessageVersion(minVersion); self->messageIndex = 0; self->hasNextMessage = self->messages.size() > 0; if(self->collectTags) { self->combineMessages(); } wait(yield()); if(self->canDiscardPopped && self->poppedVersion > self->version().version) { TraceEvent(SevWarn, "DiscardingPoppedData", self->randomID).detail("Version", self->version().version).detail("Popped", self->poppedVersion); self->messageVersion = std::max(self->messageVersion, LogMessageVersion(self->poppedVersion)); for(auto cursor : self->cursors) { cursor->advanceTo(self->messageVersion); } self->messageIndex = self->messages.size(); if (self->messages.size() > 0 && self->messages[self->messages.size()-1].version.version < self->poppedVersion) { self->hasNextMessage = false; } else { auto iter = std::lower_bound(self->messages.begin(), self->messages.end(), ILogSystem::BufferedCursor::BufferedMessage(self->poppedVersion)); self->hasNextMessage = iter != self->messages.end(); if(self->hasNextMessage) { self->messageIndex = iter - self->messages.begin(); } } } if(self->hasNextMessage) { self->canDiscardPopped = false; } return Void(); } Future ILogSystem::BufferedCursor::getMore(TaskPriority taskID) { if( hasMessage() ) { return Void(); } if( !more.isValid() || more.isReady() ) { more = bufferedGetMore(this, taskID); } return more; } Future ILogSystem::BufferedCursor::onFailed() { ASSERT(false); return Never(); } bool ILogSystem::BufferedCursor::isActive() { ASSERT(false); return false; } bool ILogSystem::BufferedCursor::isExhausted() { ASSERT(false); return false; } const LogMessageVersion& ILogSystem::BufferedCursor::version() { if(hasNextMessage) { return messages[messageIndex].version; } return messageVersion; } Version ILogSystem::BufferedCursor::getMinKnownCommittedVersion() { return minKnownCommittedVersion; } Optional ILogSystem::BufferedCursor::getPrimaryPeekLocation() { return Optional(); } Version ILogSystem::BufferedCursor::popped() { if(initialPoppedVersion == poppedVersion) { return 0; } return poppedVersion; }