diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index ee67813e02..0a1e69ab36 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -41,8 +41,8 @@ typedef UID SpanID; enum { tagLocalitySpecial = -1, // tag with this locality means it is invalidTag (id=0), txsTag (id=1), or cacheTag (id=2) tagLocalityLogRouter = -2, - tagLocalityRemoteLog = -3, // tag created by log router for remote tLogs - tagLocalityUpgraded = -4, + tagLocalityRemoteLog = -3, // tag created by log router for remote (aka. not in Primary DC) tLogs + tagLocalityUpgraded = -4, // tlogs with old log format tagLocalitySatellite = -5, tagLocalityLogRouterMapped = -6, // The pseudo tag used by log routers to pop the real LogRouter tag (i.e., -2) tagLocalityTxs = -7, diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 3437186209..0873b072dc 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -64,6 +64,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( TLOG_MESSAGE_BLOCK_BYTES, 10e6 ); init( TLOG_MESSAGE_BLOCK_OVERHEAD_FACTOR, double(TLOG_MESSAGE_BLOCK_BYTES) / (TLOG_MESSAGE_BLOCK_BYTES - MAX_MESSAGE_SIZE) ); //1.0121466709838096006362758832473 init( PEEK_TRACKER_EXPIRATION_TIME, 600 ); if( randomize && BUGGIFY ) PEEK_TRACKER_EXPIRATION_TIME = deterministicRandom()->coinflip() ? 0.1 : 120; + init( PEEK_USING_STREAMING, true ); init( PARALLEL_GET_MORE_REQUESTS, 32 ); if( randomize && BUGGIFY ) PARALLEL_GET_MORE_REQUESTS = 2; init( MULTI_CURSOR_PRE_FETCH_LIMIT, 10 ); init( MAX_QUEUE_COMMIT_BYTES, 15e6 ); if( randomize && BUGGIFY ) MAX_QUEUE_COMMIT_BYTES = 5000; diff --git a/fdbclient/ServerKnobs.h b/fdbclient/ServerKnobs.h index 82dbd227b0..c905720898 100644 --- a/fdbclient/ServerKnobs.h +++ b/fdbclient/ServerKnobs.h @@ -41,6 +41,7 @@ public: // often, so that versions always advance smoothly // TLogs + bool PEEK_USING_STREAMING; double TLOG_TIMEOUT; // tlog OR commit proxy failure - master's reaction time double TLOG_SLOW_REJOIN_WARN_TIMEOUT_SECS; // Warns if a tlog takes too long to rejoin double RECOVERY_TLOG_SMART_QUORUM_DELAY; // smaller might be better for bug amplification diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index 4c6be0e56d..72cc459025 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -158,7 +158,10 @@ const Endpoint& EndpointMap::insert(NetworkAddressList localAddresses, NetworkMessageReceiver* EndpointMap::get(Endpoint::Token const& token) { uint32_t index = token.second(); if (index < wellKnownEndpointCount && data[index].receiver == nullptr) { - TraceEvent(SevWarnAlways, "WellKnownEndpointNotAdded").detail("Token", token).detail("Index", index).backtrace(); + TraceEvent(SevWarnAlways, "WellKnownEndpointNotAdded") + .detail("Token", token) + .detail("Index", index) + .backtrace(); } if (index < data.size() && data[index].token().first() == token.first() && ((data[index].token().second() & 0xffffffff00000000LL) | index) == token.second()) @@ -923,6 +926,7 @@ ACTOR static void deliver(TransportData* self, // ReadSocket) we can just upgrade. Otherwise we'll context switch so that we don't block other tasks that might run // with a higher priority. ReplyPromiseStream needs to guarentee that messages are recieved in the order they were // sent, so we are using orderedDelay. + // NOTE: don't skip delay(0) when it's local deliver since it could cause out of order object deconstruction. if (priority < TaskPriority::ReadSocket || !inReadSocket) { wait(orderedDelay(0, priority)); } else { diff --git a/fdbrpc/fdbrpc.h b/fdbrpc/fdbrpc.h index eeac5e9bed..60b4c0168e 100644 --- a/fdbrpc/fdbrpc.h +++ b/fdbrpc/fdbrpc.h @@ -300,7 +300,7 @@ struct AcknowledgementReceiver final : FlowReceiver, FastAllocated<Acknowledgeme Promise<Void> hold = ready; hold.sendError(message.getError()); } else { - ASSERT(message.get().bytes > bytesAcknowledged); + ASSERT(message.get().bytes > bytesAcknowledged || (message.get().bytes < 0 && bytesAcknowledged > 0)); bytesAcknowledged = message.get().bytes; if (ready.isValid() && bytesSent - bytesAcknowledged < bytesLimit) { Promise<Void> hold = ready; @@ -393,7 +393,8 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>, false); } if (isRemoteEndpoint() && !sentError && !acknowledgements.failures.isReady()) { - // The ReplyPromiseStream was cancelled before sending an error, so the storage server must have died + // Notify the client ReplyPromiseStream was cancelled before sending an error, so the storage server must + // have died FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<EnsureTable<T>>>(broken_promise()), getEndpoint(TaskPriority::ReadSocket), false); @@ -413,6 +414,7 @@ public: void send(U&& value) const { if (queue->isRemoteEndpoint()) { if (!queue->acknowledgements.getRawEndpoint().isValid()) { + // register acknowledge receiver on sender and tell the receiver where to send acknowledge messages value.acknowledgeToken = queue->acknowledgements.getEndpoint(TaskPriority::ReadSocket).token; } queue->acknowledgements.bytesSent += value.expectedSize(); @@ -474,6 +476,8 @@ public: errors->delPromiseRef(); } + // The endpoints of a ReplyPromiseStream must be initialized at Task::ReadSocket, because with lower priorities + // a delay(0) in FlowTransport deliver can cause out of order delivery. const Endpoint& getEndpoint() const { return queue->getEndpoint(TaskPriority::ReadSocket); } bool operator==(const ReplyPromiseStream<T>& rhs) const { return queue == rhs.queue; } diff --git a/fdbrpc/genericactors.actor.h b/fdbrpc/genericactors.actor.h index 23bd3e97c2..46a79d29cf 100644 --- a/fdbrpc/genericactors.actor.h +++ b/fdbrpc/genericactors.actor.h @@ -197,7 +197,7 @@ struct PeerHolder { } }; -// Implements getRepyStream, this a void actor with the same lifetime as the input ReplyPromiseStream. +// Implements getReplyStream, this a void actor with the same lifetime as the input ReplyPromiseStream. // Because this actor holds a reference to the stream, normally it would be impossible to know when there are no other // references. To get around this, there is a SAV inside the stream that has one less promise reference than it should // (caused by getErrorFutureAndDelPromiseRef()). When that SAV gets a broken promise because no one besides this void diff --git a/fdbserver/LogRouter.actor.cpp b/fdbserver/LogRouter.actor.cpp index e6ac9fac46..7e3316b28d 100644 --- a/fdbserver/LogRouter.actor.cpp +++ b/fdbserver/LogRouter.actor.cpp @@ -117,6 +117,7 @@ struct LogRouterData { getMoreBlockedCount; // Increase by 1 if data is not available when LR tries to pull data from satellite tLog. Future<Void> logger; Reference<EventCacheHolder> eventCacheHolder; + int activePeekStreams = 0; std::vector<Reference<TagData>> tag_data; // we only store data for the remote tag locality @@ -193,6 +194,7 @@ struct LogRouterData { return int64_t(1000 * val); }); specialCounter(cc, "Generation", [this]() { return this->generation; }); + specialCounter(cc, "ActivePeekStreams", [this]() { return this->activePeekStreams; }); logger = traceCounters("LogRouterMetrics", dbgid, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, @@ -404,18 +406,15 @@ std::deque<std::pair<Version, LengthPrefixedStringRef>>& get_version_messages(Lo return tagData->version_messages; }; -void peekMessagesFromMemory(LogRouterData* self, - TLogPeekRequest const& req, - BinaryWriter& messages, - Version& endVersion) { +void peekMessagesFromMemory(LogRouterData* self, Tag tag, Version begin, BinaryWriter& messages, Version& endVersion) { ASSERT(!messages.getLength()); - auto& deque = get_version_messages(self, req.tag); + auto& deque = get_version_messages(self, tag); //TraceEvent("TLogPeekMem", self->dbgid).detail("Tag", req.tag1).detail("PDS", self->persistentDataSequence).detail("PDDS", self->persistentDataDurableSequence).detail("Oldest", map1.empty() ? 0 : map1.begin()->key ).detail("OldestMsgCount", map1.empty() ? 0 : map1.begin()->value.size()); auto it = std::lower_bound(deque.begin(), deque.end(), - std::make_pair(req.begin, LengthPrefixedStringRef()), + std::make_pair(begin, LengthPrefixedStringRef()), CompareFirst<std::pair<Version, LengthPrefixedStringRef>>()); Version currentVersion = -1; @@ -442,22 +441,30 @@ Version poppedVersion(LogRouterData* self, Tag tag) { return tagData->popped; } -ACTOR Future<Void> logRouterPeekMessages(LogRouterData* self, TLogPeekRequest req) { +// Common logics to peek TLog and create TLogPeekReply that serves both streaming peek or normal peek request +ACTOR template <typename PromiseType> +Future<Void> logRouterPeekMessages(PromiseType replyPromise, + LogRouterData* self, + Version reqBegin, + Tag reqTag, + bool reqReturnIfBlocked = false, + bool reqOnlySpilled = false, + Optional<std::pair<UID, int>> reqSequence = Optional<std::pair<UID, int>>()) { state BinaryWriter messages(Unversioned()); state int sequence = -1; state UID peekId; - if (req.sequence.present()) { + if (reqSequence.present()) { try { - peekId = req.sequence.get().first; - sequence = req.sequence.get().second; + peekId = reqSequence.get().first; + sequence = reqSequence.get().second; if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->peekTracker.find(peekId) == self->peekTracker.end()) { throw operation_obsolete(); } auto& trackerData = self->peekTracker[peekId]; if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) { - trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled)); + trackerData.sequence_version[0].send(std::make_pair(reqBegin, reqOnlySpilled)); } auto seqBegin = trackerData.sequence_version.begin(); // The peek cursor and this comparison need to agree about the maximum number of in-flight requests. @@ -476,12 +483,12 @@ ACTOR Future<Void> logRouterPeekMessages(LogRouterData* self, TLogPeekRequest re trackerData.lastUpdate = now(); std::pair<Version, bool> prevPeekData = wait(trackerData.sequence_version[sequence].getFuture()); - req.begin = prevPeekData.first; - req.onlySpilled = prevPeekData.second; + reqBegin = prevPeekData.first; + reqOnlySpilled = prevPeekData.second; wait(yield()); } catch (Error& e) { if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) { - req.reply.sendError(e); + replyPromise.sendError(e); return Void(); } else { throw; @@ -489,62 +496,62 @@ ACTOR Future<Void> logRouterPeekMessages(LogRouterData* self, TLogPeekRequest re } } - //TraceEvent("LogRouterPeek1", self->dbgid).detail("From", req.reply.getEndpoint().getPrimaryAddress()).detail("Ver", self->version.get()).detail("Begin", req.begin); - if (req.returnIfBlocked && self->version.get() < req.begin) { + //TraceEvent("LogRouterPeek1", self->dbgid).detail("From", replyPromise.getEndpoint().getPrimaryAddress()).detail("Ver", self->version.get()).detail("Begin", reqBegin); + if (reqReturnIfBlocked && self->version.get() < reqBegin) { //TraceEvent("LogRouterPeek2", self->dbgid); - req.reply.sendError(end_of_stream()); - if (req.sequence.present()) { + replyPromise.sendError(end_of_stream()); + if (reqSequence.present()) { auto& trackerData = self->peekTracker[peekId]; auto& sequenceData = trackerData.sequence_version[sequence + 1]; if (!sequenceData.isSet()) { - sequenceData.send(std::make_pair(req.begin, req.onlySpilled)); + sequenceData.send(std::make_pair(reqBegin, reqOnlySpilled)); } } return Void(); } - if (self->version.get() < req.begin) { - wait(self->version.whenAtLeast(req.begin)); + if (self->version.get() < reqBegin) { + wait(self->version.whenAtLeast(reqBegin)); wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); } - Version poppedVer = poppedVersion(self, req.tag); + Version poppedVer = poppedVersion(self, reqTag); - if (poppedVer > req.begin || req.begin < self->startVersion) { + if (poppedVer > reqBegin || reqBegin < self->startVersion) { // This should only happen if a packet is sent multiple times and the reply is not needed. // Since we are using popped differently, do not send a reply. TraceEvent(SevWarnAlways, "LogRouterPeekPopped", self->dbgid) - .detail("Begin", req.begin) + .detail("Begin", reqBegin) .detail("Popped", poppedVer) .detail("Start", self->startVersion); - req.reply.send(Never()); - if (req.sequence.present()) { + replyPromise.send(Never()); + if (reqSequence.present()) { auto& trackerData = self->peekTracker[peekId]; auto& sequenceData = trackerData.sequence_version[sequence + 1]; if (!sequenceData.isSet()) { - sequenceData.send(std::make_pair(req.begin, req.onlySpilled)); + sequenceData.send(std::make_pair(reqBegin, reqOnlySpilled)); } } return Void(); } Version endVersion = self->version.get() + 1; - peekMessagesFromMemory(self, req, messages, endVersion); + peekMessagesFromMemory(self, reqTag, reqBegin, messages, endVersion); TLogPeekReply reply; reply.maxKnownVersion = self->version.get(); reply.minKnownCommittedVersion = self->poppedVersion; - reply.messages = messages.toValue(); + reply.messages = StringRef(reply.arena, messages.toValue()); reply.popped = self->minPopped.get() >= self->startVersion ? self->minPopped.get() : 0; reply.end = endVersion; reply.onlySpilled = false; - if (req.sequence.present()) { + if (reqSequence.present()) { auto& trackerData = self->peekTracker[peekId]; trackerData.lastUpdate = now(); auto& sequenceData = trackerData.sequence_version[sequence + 1]; if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) { - req.reply.sendError(operation_obsolete()); + replyPromise.sendError(operation_obsolete()); if (!sequenceData.isSet()) sequenceData.sendError(operation_obsolete()); return Void(); @@ -552,20 +559,60 @@ ACTOR Future<Void> logRouterPeekMessages(LogRouterData* self, TLogPeekRequest re if (sequenceData.isSet()) { if (sequenceData.getFuture().get().first != reply.end) { TEST(true); // tlog peek second attempt ended at a different version - req.reply.sendError(operation_obsolete()); + replyPromise.sendError(operation_obsolete()); return Void(); } } else { sequenceData.send(std::make_pair(reply.end, reply.onlySpilled)); } - reply.begin = req.begin; + reply.begin = reqBegin; } - req.reply.send(reply); + replyPromise.send(reply); //TraceEvent("LogRouterPeek4", self->dbgid); return Void(); } +// This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover +ACTOR Future<Void> logRouterPeekStream(LogRouterData* self, TLogPeekStreamRequest req) { + self->activePeekStreams++; + + state Version begin = req.begin; + state bool onlySpilled = false; + req.reply.setByteLimit(std::min(SERVER_KNOBS->MAXIMUM_PEEK_BYTES, req.limitBytes)); + loop { + state TLogPeekStreamReply reply; + state Promise<TLogPeekReply> promise; + state Future<TLogPeekReply> future(promise.getFuture()); + try { + wait(req.reply.onReady() && store(reply.rep, future) && + logRouterPeekMessages(promise, self, begin, req.tag, req.returnIfBlocked, onlySpilled)); + + reply.rep.begin = begin; + req.reply.send(reply); + begin = reply.rep.end; + onlySpilled = reply.rep.onlySpilled; + if (reply.rep.end > self->version.get()) { + wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); + } else { + wait(delay(0, g_network->getCurrentTask())); + } + } catch (Error& e) { + self->activePeekStreams--; + TraceEvent(SevDebug, "TLogPeekStreamEnd", self->dbgid) + .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()) + .error(e, true); + + if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { + req.reply.sendError(e); + return Void(); + } else { + throw; + } + } + } +} + ACTOR Future<Void> cleanupPeekTrackers(LogRouterData* self) { loop { double minTimeUntilExpiration = SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME; @@ -643,7 +690,13 @@ ACTOR Future<Void> logRouterCore(TLogInterface interf, logRouterData.logSystem->set(ILogSystem::fromServerDBInfo(logRouterData.dbgid, db->get(), true)); } when(TLogPeekRequest req = waitNext(interf.peekMessages.getFuture())) { - addActor.send(logRouterPeekMessages(&logRouterData, req)); + addActor.send(logRouterPeekMessages( + req.reply, &logRouterData, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, req.sequence)); + } + when(TLogPeekStreamRequest req = waitNext(interf.peekStreamMessages.getFuture())) { + TraceEvent(SevDebug, "LogRouterPeekStream", logRouterData.dbgid) + .detail("Token", interf.peekStreamMessages.getEndpoint().token); + addActor.send(logRouterPeekStream(&logRouterData, req)); } when(TLogPopRequest req = waitNext(interf.popMessages.getFuture())) { // Request from remote tLog to pop data from LR diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index 28c706d67f..2b4fa0c19b 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -427,7 +427,7 @@ struct ILogSystem { TLogPeekReply results; ArenaReader rd; - LogMessageVersion messageVersion, end; + LogMessageVersion messageVersion, end; // the version of current message; the intended end version of current cursor Version poppedVersion; TagsAndMessage messageAndTags; bool hasMsg; @@ -437,9 +437,11 @@ struct ILogSystem { bool onlySpilled; bool parallelGetMore; + bool usePeekStream; int sequence; Deque<Future<TLogPeekReply>> futureResults; Future<Void> interfaceChanged; + Optional<ReplyPromiseStream<TLogPeekStreamReply>> peekReplyStream; double lastReset; Future<Void> resetCheck; diff --git a/fdbserver/LogSystemPeekCursor.actor.cpp b/fdbserver/LogSystemPeekCursor.actor.cpp index 546f365953..429e803c02 100644 --- a/fdbserver/LogSystemPeekCursor.actor.cpp +++ b/fdbserver/LogSystemPeekCursor.actor.cpp @@ -25,6 +25,24 @@ #include "fdbrpc/ReplicationUtils.h" #include "flow/actorcompiler.h" // has to be last include +// create a peek stream for cursor when it's possible +ACTOR Future<Void> tryEstablishPeekStream(ILogSystem::ServerPeekCursor* self) { + if (self->peekReplyStream.present()) + return Void(); + else if (!self->interf || !self->interf->get().present()) { + self->peekReplyStream.reset(); + return Never(); + } + wait(IFailureMonitor::failureMonitor().onStateEqual(self->interf->get().interf().peekStreamMessages.getEndpoint(), + FailureStatus(false))); + self->peekReplyStream = self->interf->get().interf().peekStreamMessages.getReplyStream(TLogPeekStreamRequest( + self->messageVersion.version, self->tag, self->returnIfBlocked, std::numeric_limits<int>::max())); + TraceEvent(SevDebug, "SPC_StreamCreated", self->randomID) + .detail("PeerAddr", self->interf->get().interf().peekStreamMessages.getEndpoint().getPrimaryAddress()) + .detail("PeerToken", self->interf->get().interf().peekStreamMessages.getEndpoint().token); + return Void(); +} + ILogSystem::ServerPeekCursor::ServerPeekCursor(Reference<AsyncVar<OptionalInterface<TLogInterface>>> const& interf, Tag tag, Version begin, @@ -33,11 +51,15 @@ ILogSystem::ServerPeekCursor::ServerPeekCursor(Reference<AsyncVar<OptionalInterf bool parallelGetMore) : interf(interf), tag(tag), rd(results.arena, results.messages, Unversioned()), messageVersion(begin), end(end), poppedVersion(0), hasMsg(false), randomID(deterministicRandom()->randomUniqueID()), - returnIfBlocked(returnIfBlocked), onlySpilled(false), parallelGetMore(parallelGetMore), sequence(0), lastReset(0), - resetCheck(Void()), slowReplies(0), fastReplies(0), unknownReplies(0) { + returnIfBlocked(returnIfBlocked), onlySpilled(false), parallelGetMore(parallelGetMore), + usePeekStream(SERVER_KNOBS->PEEK_USING_STREAMING), sequence(0), lastReset(0), resetCheck(Void()), slowReplies(0), + fastReplies(0), unknownReplies(0) { this->results.maxKnownVersion = 0; this->results.minKnownCommittedVersion = 0; - //TraceEvent("SPC_Starting", randomID).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).backtrace(); + DisabledTraceEvent(SevDebug, "SPC_Starting", randomID) + .detail("Tag", tag.toString()) + .detail("Begin", begin) + .detail("End", end); } ILogSystem::ServerPeekCursor::ServerPeekCursor(TLogPeekReply const& results, @@ -50,8 +72,8 @@ ILogSystem::ServerPeekCursor::ServerPeekCursor(TLogPeekReply const& results, : tag(tag), results(results), rd(results.arena, results.messages, Unversioned()), messageVersion(messageVersion), end(end), poppedVersion(poppedVersion), messageAndTags(message), hasMsg(hasMsg), randomID(deterministicRandom()->randomUniqueID()), returnIfBlocked(false), onlySpilled(false), - parallelGetMore(false), sequence(0), lastReset(0), resetCheck(Void()), slowReplies(0), fastReplies(0), - unknownReplies(0) { + parallelGetMore(false), usePeekStream(false), sequence(0), lastReset(0), resetCheck(Void()), slowReplies(0), + fastReplies(0), unknownReplies(0) { //TraceEvent("SPC_Clone", randomID); this->results.maxKnownVersion = 0; this->results.minKnownCommittedVersion = 0; @@ -153,6 +175,20 @@ void ILogSystem::ServerPeekCursor::advanceTo(LogMessageVersion n) { } } +// This function is called after the cursor received one TLogPeekReply to update its members, which is the common logic +// in getMore helper functions. +void updateCursorWithReply(ILogSystem::ServerPeekCursor* self, const TLogPeekReply& res) { + self->results = res; + self->onlySpilled = res.onlySpilled; + if (res.popped.present()) + self->poppedVersion = std::min(std::max(self->poppedVersion, res.popped.get()), self->end.version); + self->rd = ArenaReader(self->results.arena, self->results.messages, Unversioned()); + LogMessageVersion skipSeq = self->messageVersion; + self->hasMsg = true; + self->nextMessage(); + self->advanceTo(skipSeq); +} + ACTOR Future<Void> resetChecker(ILogSystem::ServerPeekCursor* self, NetworkAddress addr) { self->slowReplies = 0; self->unknownReplies = 0; @@ -208,11 +244,10 @@ ACTOR Future<TLogPeekReply> recordRequestMetrics(ILogSystem::ServerPeekCursor* s } ACTOR Future<Void> serverPeekParallelGetMore(ILogSystem::ServerPeekCursor* self, TaskPriority taskID) { - if (!self->interf || self->messageVersion >= self->end) { + if (!self->interf || self->isExhausted()) { if (self->hasMessage()) return Void(); - wait(Future<Void>(Never())); - throw internal_error(); + return Never(); } if (!self->interfaceChanged.isValid()) { @@ -253,16 +288,7 @@ ACTOR Future<Void> serverPeekParallelGetMore(ILogSystem::ServerPeekCursor* self, } expectedBegin = res.end; self->futureResults.pop_front(); - self->results = res; - self->onlySpilled = res.onlySpilled; - if (res.popped.present()) - self->poppedVersion = - std::min(std::max(self->poppedVersion, res.popped.get()), self->end.version); - self->rd = ArenaReader(self->results.arena, self->results.messages, Unversioned()); - LogMessageVersion skipSeq = self->messageVersion; - self->hasMsg = true; - self->nextMessage(); - self->advanceTo(skipSeq); + updateCursorWithReply(self, res); //TraceEvent("SPC_GetMoreB", self->randomID).detail("Has", self->hasMessage()).detail("End", res.end).detail("Popped", res.popped.present() ? res.popped.get() : 0); return Void(); } @@ -296,10 +322,70 @@ ACTOR Future<Void> serverPeekParallelGetMore(ILogSystem::ServerPeekCursor* self, } } +ACTOR Future<Void> serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, TaskPriority taskID) { + if (!self->interf || self->isExhausted()) { + self->peekReplyStream.reset(); + if (self->hasMessage()) + return Void(); + return Never(); + } + + loop { + try { + state Version expectedBegin = self->messageVersion.version; + state Future<TLogPeekReply> fPeekReply = self->peekReplyStream.present() + ? map(waitAndForward(self->peekReplyStream.get().getFuture()), + [](const TLogPeekStreamReply& r) { return r.rep; }) + : Never(); + choose { + when(wait(self->peekReplyStream.present() ? Never() : tryEstablishPeekStream(self))) {} + when(wait(self->interf->onChange())) { + self->onlySpilled = false; + self->peekReplyStream.reset(); + } + when(TLogPeekReply res = wait( + self->peekReplyStream.present() + ? recordRequestMetrics( + self, + self->interf->get().interf().peekStreamMessages.getEndpoint().getPrimaryAddress(), + fPeekReply) + : Never())) { + if (res.begin.get() != expectedBegin) { + throw operation_obsolete(); + } + updateCursorWithReply(self, res); + expectedBegin = res.end; + DisabledTraceEvent(SevDebug, "SPC_GetMoreB", self->randomID) + .detail("Has", self->hasMessage()) + .detail("End", res.end) + .detail("Popped", res.popped.present() ? res.popped.get() : 0); + + // NOTE: delay is necessary here since ReplyPromiseStream delivers reply on high priority. Here we + // change the priority to the intended one. + wait(delay(0, taskID)); + return Void(); + } + } + } catch (Error& e) { + DisabledTraceEvent(SevDebug, "SPC_GetMoreB_Error", self->randomID).error(e, true); + if (e.code() == error_code_connection_failed || e.code() == error_code_operation_obsolete) { + // NOTE: delay in order to avoid the endless retry loop block other tasks + self->peekReplyStream.reset(); + wait(delay(0)); + } else if (e.code() == error_code_end_of_stream) { + self->peekReplyStream.reset(); + self->end.reset(self->messageVersion.version); + return Void(); + } else { + throw; + } + } + } +} + ACTOR Future<Void> serverPeekGetMore(ILogSystem::ServerPeekCursor* self, TaskPriority taskID) { - if (!self->interf || self->messageVersion >= self->end) { - wait(Future<Void>(Never())); - throw internal_error(); + if (!self->interf || self->isExhausted()) { + return Never(); } try { loop { @@ -313,16 +399,7 @@ ACTOR Future<Void> serverPeekGetMore(ILogSystem::ServerPeekCursor* self, TaskPri self->onlySpilled), taskID)) : Never())) { - self->results = res; - self->onlySpilled = res.onlySpilled; - if (res.popped.present()) - self->poppedVersion = - std::min(std::max(self->poppedVersion, res.popped.get()), self->end.version); - self->rd = ArenaReader(self->results.arena, self->results.messages, Unversioned()); - LogMessageVersion skipSeq = self->messageVersion; - self->hasMsg = true; - self->nextMessage(); - self->advanceTo(skipSeq); + updateCursorWithReply(self, res); //TraceEvent("SPC_GetMoreB", self->randomID).detail("Has", self->hasMessage()).detail("End", res.end).detail("Popped", res.popped.present() ? res.popped.get() : 0); return Void(); } @@ -339,11 +416,17 @@ ACTOR Future<Void> serverPeekGetMore(ILogSystem::ServerPeekCursor* self, TaskPri } Future<Void> ILogSystem::ServerPeekCursor::getMore(TaskPriority taskID) { - //TraceEvent("SPC_GetMore", randomID).detail("HasMessage", hasMessage()).detail("More", !more.isValid() || more.isReady()).detail("MessageVersion", messageVersion.toString()).detail("End", end.toString()); + // TraceEvent("SPC_GetMore", randomID) + // .detail("HasMessage", hasMessage()) + // .detail("More", !more.isValid() || more.isReady()) + // .detail("MessageVersion", messageVersion.toString()) + // .detail("End", end.toString()); if (hasMessage() && !parallelGetMore) return Void(); if (!more.isValid() || more.isReady()) { - if (parallelGetMore || onlySpilled || futureResults.size()) { + if (usePeekStream && (tag.locality >= 0 || tag.locality == tagLocalityLogRouter || tag.locality == tagLocalityRemoteLog)) { + more = serverPeekStreamGetMore(this, taskID); + } else if (parallelGetMore || onlySpilled || futureResults.size()) { more = serverPeekParallelGetMore(this, taskID); } else { more = serverPeekGetMore(this, taskID); @@ -361,6 +444,12 @@ ACTOR Future<Void> serverPeekOnFailed(ILogSystem::ServerPeekCursor* self) { : Never())) { return Void(); } + when(wait(self->interf->get().present() + ? IFailureMonitor::failureMonitor().onStateEqual( + self->interf->get().interf().peekStreamMessages.getEndpoint(), FailureStatus()) + : Never())) { + return Void(); + } when(wait(self->interf->onChange())) {} } } @@ -373,9 +462,14 @@ Future<Void> ILogSystem::ServerPeekCursor::onFailed() { bool ILogSystem::ServerPeekCursor::isActive() const { if (!interf->get().present()) return false; - if (messageVersion >= end) + if (isExhausted()) return false; - return IFailureMonitor::failureMonitor().getState(interf->get().interf().peekMessages.getEndpoint()).isAvailable(); + return IFailureMonitor::failureMonitor() + .getState(interf->get().interf().peekMessages.getEndpoint()) + .isAvailable() && + IFailureMonitor::failureMonitor() + .getState(interf->get().interf().peekStreamMessages.getEndpoint()) + .isAvailable(); } bool ILogSystem::ServerPeekCursor::isExhausted() const { diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 7a314a27a8..164826f449 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -798,8 +798,7 @@ ACTOR static Future<Void> finishMoveKeys(Database occ, // Wait for a durable quorum of servers in destServers to have keys available (readWrite) // They must also have at least the transaction read version so they can't "forget" the shard - // between - // now and when this transaction commits. + // between now and when this transaction commits. state vector<Future<Void>> serverReady; // only for count below state vector<Future<Void>> tssReady; // for waiting in parallel with tss state vector<StorageServerInterface> tssReadyInterfs; diff --git a/fdbserver/OldTLogServer_4_6.actor.cpp b/fdbserver/OldTLogServer_4_6.actor.cpp index 45214ad3bf..7cb4c565bf 100644 --- a/fdbserver/OldTLogServer_4_6.actor.cpp +++ b/fdbserver/OldTLogServer_4_6.actor.cpp @@ -300,6 +300,7 @@ struct TLogData : NonCopyable { int64_t instanceID; int64_t bytesInput; int64_t bytesDurable; + int activePeekStreams = 0; Version prevVersion; @@ -477,6 +478,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> { }); specialCounter( cc, "QueueDiskBytesTotal", [tLogData]() { return tLogData->rawPersistentQueue->getStorageBytes().total; }); + specialCounter(cc, "ActivePeekStreams", [tLogData]() { return tLogData->activePeekStreams; }); } ~LogData() { @@ -931,14 +933,15 @@ ACTOR Future<Void> tLogPop(TLogData* self, TLogPopRequest req, Reference<LogData } void peekMessagesFromMemory(Reference<LogData> self, - TLogPeekRequest const& req, + Tag tag, + Version reqBegin, BinaryWriter& messages, Version& endVersion) { - OldTag oldTag = convertTag(req.tag); + OldTag oldTag = convertTag(tag); ASSERT(!messages.getLength()); auto& deque = get_version_messages(self, oldTag); - Version begin = std::max(req.begin, self->persistentDataDurableVersion + 1); + Version begin = std::max(reqBegin, self->persistentDataDurableVersion + 1); auto it = std::lower_bound(deque.begin(), deque.end(), std::make_pair(begin, LengthPrefixedStringRef()), @@ -963,24 +966,33 @@ void peekMessagesFromMemory(Reference<LogData> self, uint32_t subVersion; rd >> messageLength >> subVersion; messageLength += sizeof(uint16_t) + sizeof(Tag); - messages << messageLength << subVersion << uint16_t(1) << req.tag; + messages << messageLength << subVersion << uint16_t(1) << tag; messageLength -= (sizeof(subVersion) + sizeof(uint16_t) + sizeof(Tag)); messages.serializeBytes(rd.readBytes(messageLength), messageLength); } } } -ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Reference<LogData> logData) { +// Common logics to peek TLog and create TLogPeekReply that serves both streaming peek or normal peek request +ACTOR template <typename PromiseType> +Future<Void> tLogPeekMessages(PromiseType replyPromise, + TLogData* self, + Reference<LogData> logData, + Version reqBegin, + Tag reqTag, + bool reqReturnIfBlocked = false, + bool reqOnlySpilled = false, + Optional<std::pair<UID, int>> reqSequence = Optional<std::pair<UID, int>>()) { state BinaryWriter messages(Unversioned()); state BinaryWriter messages2(Unversioned()); state int sequence = -1; state UID peekId; - state OldTag oldTag = convertTag(req.tag); + state OldTag oldTag = convertTag(reqTag); - if (req.sequence.present()) { + if (reqSequence.present()) { try { - peekId = req.sequence.get().first; - sequence = req.sequence.get().second; + peekId = reqSequence.get().first; + sequence = reqSequence.get().second; if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->peekTracker.find(peekId) == self->peekTracker.end()) { throw operation_obsolete(); @@ -989,12 +1001,12 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen auto& trackerData = self->peekTracker[peekId]; trackerData.lastUpdate = now(); Version ver = wait(trackerData.sequence_version[sequence].getFuture()); - req.begin = std::max(ver, req.begin); + reqBegin = std::max(ver, reqBegin); wait(yield()); } } catch (Error& e) { if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) { - req.reply.sendError(e); + replyPromise.sendError(e); return Void(); } else { throw; @@ -1002,22 +1014,22 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen } } - if (req.returnIfBlocked && logData->version.get() < req.begin) { - req.reply.sendError(end_of_stream()); + if (reqReturnIfBlocked && logData->version.get() < reqBegin) { + replyPromise.sendError(end_of_stream()); return Void(); } - //TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); + //TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2); // Wait until we have something to return that the caller doesn't already have - if (logData->version.get() < req.begin) { - wait(logData->version.whenAtLeast(req.begin)); + if (logData->version.get() < reqBegin) { + wait(logData->version.whenAtLeast(reqBegin)); wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); } state Version endVersion = logData->version.get() + 1; Version poppedVer = poppedVersion(logData, oldTag); - if (poppedVer > req.begin) { + if (poppedVer > reqBegin) { TLogPeekReply rep; rep.maxKnownVersion = logData->version.get(); rep.minKnownCommittedVersion = 0; @@ -1025,12 +1037,12 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen rep.end = poppedVer; rep.onlySpilled = false; - if (req.sequence.present()) { + if (reqSequence.present()) { auto& trackerData = self->peekTracker[peekId]; auto& sequenceData = trackerData.sequence_version[sequence + 1]; trackerData.lastUpdate = now(); if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) { - req.reply.sendError(operation_obsolete()); + replyPromise.sendError(operation_obsolete()); if (!sequenceData.isSet()) sequenceData.sendError(operation_obsolete()); return Void(); @@ -1038,37 +1050,37 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen if (sequenceData.isSet()) { if (sequenceData.getFuture().get() != rep.end) { TEST(true); // tlog peek second attempt ended at a different version - req.reply.sendError(operation_obsolete()); + replyPromise.sendError(operation_obsolete()); return Void(); } } else { sequenceData.send(rep.end); } - rep.begin = req.begin; + rep.begin = reqBegin; } - req.reply.send(rep); + replyPromise.send(rep); return Void(); } // grab messages from disk - //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); - if (req.begin <= logData->persistentDataDurableVersion) { + //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2); + if (reqBegin <= logData->persistentDataDurableVersion) { // Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We // may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if // an initial attempt to read from disk results in insufficient data and the required data is no longer in // memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the // result? - peekMessagesFromMemory(logData, req, messages2, endVersion); + peekMessagesFromMemory(logData, reqTag, reqBegin, messages2, endVersion); RangeResult kvs = wait(self->persistentData->readRange( - KeyRangeRef(persistTagMessagesKey(logData->logId, oldTag, req.begin), + KeyRangeRef(persistTagMessagesKey(logData->logId, oldTag, reqBegin), persistTagMessagesKey(logData->logId, oldTag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES)); - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); + //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); for (auto& kv : kvs) { auto ver = decodeTagMessagesKey(kv.key); @@ -1080,7 +1092,7 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen uint32_t subVersion; rd >> messageLength >> subVersion; messageLength += sizeof(uint16_t) + sizeof(Tag); - messages << messageLength << subVersion << uint16_t(1) << req.tag; + messages << messageLength << subVersion << uint16_t(1) << reqTag; messageLength -= (sizeof(subVersion) + sizeof(uint16_t) + sizeof(Tag)); messages.serializeBytes(rd.readBytes(messageLength), messageLength); } @@ -1091,39 +1103,79 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen else messages.serializeBytes(messages2.toValue()); } else { - peekMessagesFromMemory(logData, req, messages, endVersion); - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); + peekMessagesFromMemory(logData, reqTag, reqBegin, messages, endVersion); + //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); } TLogPeekReply reply; reply.maxKnownVersion = logData->version.get(); reply.minKnownCommittedVersion = 0; reply.onlySpilled = false; - reply.messages = messages.toValue(); + reply.messages = StringRef(reply.arena, messages.toValue()); reply.end = endVersion; - //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()); + //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()); - if (req.sequence.present()) { + if (reqSequence.present()) { auto& trackerData = self->peekTracker[peekId]; trackerData.lastUpdate = now(); auto& sequenceData = trackerData.sequence_version[sequence + 1]; if (sequenceData.isSet()) { if (sequenceData.getFuture().get() != reply.end) { TEST(true); // tlog peek second attempt ended at a different version (2) - req.reply.sendError(operation_obsolete()); + replyPromise.sendError(operation_obsolete()); return Void(); } } else { sequenceData.send(reply.end); } - reply.begin = req.begin; + reply.begin = reqBegin; } - req.reply.send(reply); + replyPromise.send(reply); return Void(); } +// This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover +ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Reference<LogData> logData) { + self->activePeekStreams++; + + state Version begin = req.begin; + state bool onlySpilled = false; + req.reply.setByteLimit(std::min(SERVER_KNOBS->MAXIMUM_PEEK_BYTES, req.limitBytes)); + loop { + state TLogPeekStreamReply reply; + state Promise<TLogPeekReply> promise; + state Future<TLogPeekReply> future(promise.getFuture()); + try { + wait(req.reply.onReady() && store(reply.rep, future) && + tLogPeekMessages(promise, self, logData, begin, req.tag, req.returnIfBlocked, onlySpilled)); + + reply.rep.begin = begin; + req.reply.send(reply); + begin = reply.rep.end; + onlySpilled = reply.rep.onlySpilled; + if (reply.rep.end > logData->version.get()) { + wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); + } else { + wait(delay(0, g_network->getCurrentTask())); + } + } catch (Error& e) { + self->activePeekStreams--; + TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId) + .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()) + .error(e, true); + + if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { + req.reply.sendError(e); + return Void(); + } else { + throw; + } + } + } +} + ACTOR Future<Void> doQueueCommit(TLogData* self, Reference<LogData> logData) { state Version ver = logData->version.get(); state Version commitNumber = self->queueCommitBegin + 1; @@ -1288,7 +1340,13 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self, PromiseStream<Void> warningCollectorInput) { loop choose { when(TLogPeekRequest req = waitNext(tli.peekMessages.getFuture())) { - logData->addActor.send(tLogPeekMessages(self, req, logData)); + logData->addActor.send(tLogPeekMessages( + req.reply, self, logData, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, req.sequence)); + } + when(TLogPeekStreamRequest req = waitNext(tli.peekStreamMessages.getFuture())) { + TraceEvent(SevDebug, "TLogPeekStream", logData->logId) + .detail("Token", tli.peekStreamMessages.getEndpoint().token); + logData->addActor.send(tLogPeekStream(self, req, logData)); } when(TLogPopRequest req = waitNext(tli.popMessages.getFuture())) { logData->addActor.send(tLogPop(self, req, logData)); @@ -1435,6 +1493,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self, LocalityData locality) recruited.initEndpoints(); DUMPTOKEN(recruited.peekMessages); + DUMPTOKEN(recruited.peekStreamMessages); DUMPTOKEN(recruited.popMessages); DUMPTOKEN(recruited.commit); DUMPTOKEN(recruited.lock); @@ -1574,7 +1633,7 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData, state TLogData self(tlogId, workerID, persistentData, persistentQueue, db); state Future<Void> error = actorCollection(self.sharedActors.getFuture()); - TraceEvent("SharedTlog", tlogId).log(); + TraceEvent("SharedTlog", tlogId).detail("Version", "4.6"); try { wait(restorePersistentState(&self, locality)); diff --git a/fdbserver/OldTLogServer_6_0.actor.cpp b/fdbserver/OldTLogServer_6_0.actor.cpp index 192eaaaf29..cc561d1f3d 100644 --- a/fdbserver/OldTLogServer_6_0.actor.cpp +++ b/fdbserver/OldTLogServer_6_0.actor.cpp @@ -276,6 +276,7 @@ struct TLogData : NonCopyable { int64_t targetVolatileBytes; // The number of bytes of mutations this TLog should hold in memory before spilling. int64_t overheadBytesInput; int64_t overheadBytesDurable; + int activePeekStreams = 0; WorkerCache<TLogInterface> tlogCache; @@ -572,6 +573,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> { }); specialCounter( cc, "QueueDiskBytesTotal", [tLogData]() { return tLogData->rawPersistentQueue->getStorageBytes().total; }); + specialCounter(cc, "ActivePeekStreams", [tLogData]() { return tLogData->activePeekStreams; }); } ~LogData() { @@ -1172,15 +1174,16 @@ std::deque<std::pair<Version, LengthPrefixedStringRef>>& getVersionMessages(Refe }; void peekMessagesFromMemory(Reference<LogData> self, - TLogPeekRequest const& req, + Tag tag, + Version begin, BinaryWriter& messages, Version& endVersion) { ASSERT(!messages.getLength()); - auto& deque = getVersionMessages(self, req.tag); + auto& deque = getVersionMessages(self, tag); //TraceEvent("TLogPeekMem", self->dbgid).detail("Tag", req.tag1).detail("PDS", self->persistentDataSequence).detail("PDDS", self->persistentDataDurableSequence).detail("Oldest", map1.empty() ? 0 : map1.begin()->key ).detail("OldestMsgCount", map1.empty() ? 0 : map1.begin()->value.size()); - Version begin = std::max(req.begin, self->persistentDataDurableVersion + 1); + begin = std::max(begin, self->persistentDataDurableVersion + 1); auto it = std::lower_bound(deque.begin(), deque.end(), std::make_pair(begin, LengthPrefixedStringRef()), @@ -1203,29 +1206,38 @@ void peekMessagesFromMemory(Reference<LogData> self, } } -ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Reference<LogData> logData) { +// Common logics to peek TLog and create TLogPeekReply that serves both streaming peek or normal peek request +ACTOR template <typename PromiseType> +Future<Void> tLogPeekMessages(PromiseType replyPromise, + TLogData* self, + Reference<LogData> logData, + Version reqBegin, + Tag reqTag, + bool reqReturnIfBlocked = false, + bool reqOnlySpilled = false, + Optional<std::pair<UID, int>> reqSequence = Optional<std::pair<UID, int>>()) { state BinaryWriter messages(Unversioned()); state BinaryWriter messages2(Unversioned()); state int sequence = -1; state UID peekId; state double queueStart = now(); - if (req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) { - req.tag.id = req.tag.id % logData->txsTags; + if (reqTag.locality == tagLocalityTxs && reqTag.id >= logData->txsTags && logData->txsTags > 0) { + reqTag.id = reqTag.id % logData->txsTags; } - if (req.sequence.present()) { + if (reqSequence.present()) { try { - peekId = req.sequence.get().first; - sequence = req.sequence.get().second; + peekId = reqSequence.get().first; + sequence = reqSequence.get().second; if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && logData->peekTracker.find(peekId) == logData->peekTracker.end()) { throw operation_obsolete(); } auto& trackerData = logData->peekTracker[peekId]; if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) { - trackerData.tag = req.tag; - trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled)); + trackerData.tag = reqTag; + trackerData.sequence_version[0].send(std::make_pair(reqBegin, reqOnlySpilled)); } auto seqBegin = trackerData.sequence_version.begin(); while (trackerData.sequence_version.size() && @@ -1252,12 +1264,12 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen trackerData.lastUpdate = now(); std::pair<Version, bool> prevPeekData = wait(fPrevPeekData); - req.begin = std::max(prevPeekData.first, req.begin); - req.onlySpilled = prevPeekData.second; + reqBegin = std::max(prevPeekData.first, reqBegin); + reqOnlySpilled = prevPeekData.second; wait(yield()); } catch (Error& e) { if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) { - req.reply.sendError(e); + replyPromise.sendError(e); return Void(); } else { throw; @@ -1267,32 +1279,32 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen state double blockStart = now(); - if (req.returnIfBlocked && logData->version.get() < req.begin) { - req.reply.sendError(end_of_stream()); - if (req.sequence.present()) { + if (reqReturnIfBlocked && logData->version.get() < reqBegin) { + replyPromise.sendError(end_of_stream()); + if (reqSequence.present()) { auto& trackerData = logData->peekTracker[peekId]; auto& sequenceData = trackerData.sequence_version[sequence + 1]; if (!sequenceData.isSet()) { - sequenceData.send(std::make_pair(req.begin, req.onlySpilled)); + sequenceData.send(std::make_pair(reqBegin, reqOnlySpilled)); } } return Void(); } - //TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); + //TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2); // Wait until we have something to return that the caller doesn't already have - if (logData->version.get() < req.begin) { - wait(logData->version.whenAtLeast(req.begin)); + if (logData->version.get() < reqBegin) { + wait(logData->version.whenAtLeast(reqBegin)); wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); } - if (logData->locality != tagLocalitySatellite && req.tag.locality == tagLocalityLogRouter) { + if (logData->locality != tagLocalitySatellite && reqTag.locality == tagLocalityLogRouter) { wait(self->concurrentLogRouterReads.take()); state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads); wait(delay(0.0, TaskPriority::Low)); } - if (req.begin <= logData->persistentDataDurableVersion && req.tag.locality != tagLocalityTxs && req.tag != txsTag) { + if (reqBegin <= logData->persistentDataDurableVersion && reqTag.locality != tagLocalityTxs && reqTag != txsTag) { // Reading spilled data will almost always imply that the storage server is >5s behind the rest // of the cluster. We shouldn't prioritize spending CPU on helping this server catch up // slightly faster over keeping the rest of the cluster operating normally. @@ -1303,8 +1315,8 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen state double workStart = now(); - Version poppedVer = poppedVersion(logData, req.tag); - if (poppedVer > req.begin) { + Version poppedVer = poppedVersion(logData, reqTag); + if (poppedVer > reqBegin) { TLogPeekReply rep; rep.maxKnownVersion = logData->version.get(); rep.minKnownCommittedVersion = logData->minKnownCommittedVersion; @@ -1312,12 +1324,12 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen rep.end = poppedVer; rep.onlySpilled = false; - if (req.sequence.present()) { + if (reqSequence.present()) { auto& trackerData = logData->peekTracker[peekId]; auto& sequenceData = trackerData.sequence_version[sequence + 1]; trackerData.lastUpdate = now(); if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) { - req.reply.sendError(operation_obsolete()); + replyPromise.sendError(operation_obsolete()); if (!sequenceData.isSet()) sequenceData.sendError(operation_obsolete()); return Void(); @@ -1325,16 +1337,16 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen if (sequenceData.isSet()) { if (sequenceData.getFuture().get().first != rep.end) { TEST(true); // tlog peek second attempt ended at a different version - req.reply.sendError(operation_obsolete()); + replyPromise.sendError(operation_obsolete()); return Void(); } } else { sequenceData.send(std::make_pair(rep.end, rep.onlySpilled)); } - rep.begin = req.begin; + rep.begin = reqBegin; } - req.reply.send(rep); + replyPromise.send(rep); return Void(); } @@ -1342,27 +1354,27 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen state bool onlySpilled = false; // grab messages from disk - //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); - if (req.begin <= logData->persistentDataDurableVersion) { + //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2); + if (reqBegin <= logData->persistentDataDurableVersion) { // Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We // may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if // an initial attempt to read from disk results in insufficient data and the required data is no longer in // memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the // result? - if (req.onlySpilled) { + if (reqOnlySpilled) { endVersion = logData->persistentDataDurableVersion + 1; } else { - peekMessagesFromMemory(logData, req, messages2, endVersion); + peekMessagesFromMemory(logData, reqTag, reqBegin, messages2, endVersion); } RangeResult kvs = wait(self->persistentData->readRange( - KeyRangeRef(persistTagMessagesKey(logData->logId, req.tag, req.begin), - persistTagMessagesKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)), + KeyRangeRef(persistTagMessagesKey(logData->logId, reqTag, reqBegin), + persistTagMessagesKey(logData->logId, reqTag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES)); - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().address).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); + //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().address).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); for (auto& kv : kvs) { auto ver = decodeTagMessagesKey(kv.key); @@ -1377,20 +1389,20 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen messages.serializeBytes(messages2.toValue()); } } else { - peekMessagesFromMemory(logData, req, messages, endVersion); - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().address).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); + peekMessagesFromMemory(logData, reqTag, reqBegin, messages, endVersion); + //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().address).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); } TLogPeekReply reply; reply.maxKnownVersion = logData->version.get(); reply.minKnownCommittedVersion = logData->minKnownCommittedVersion; - reply.messages = messages.toValue(); + reply.messages = StringRef(reply.arena, messages.toValue()); reply.end = endVersion; reply.onlySpilled = onlySpilled; - //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().address); + //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", replyPromise.getEndpoint().address); - if (req.sequence.present()) { + if (reqSequence.present()) { auto& trackerData = logData->peekTracker[peekId]; trackerData.lastUpdate = now(); @@ -1414,7 +1426,7 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen auto& sequenceData = trackerData.sequence_version[sequence + 1]; if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) { - req.reply.sendError(operation_obsolete()); + replyPromise.sendError(operation_obsolete()); if (!sequenceData.isSet()) sequenceData.sendError(operation_obsolete()); return Void(); @@ -1423,19 +1435,59 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen trackerData.duplicatePeeks++; if (sequenceData.getFuture().get().first != reply.end) { TEST(true); // tlog peek second attempt ended at a different version (2) - req.reply.sendError(operation_obsolete()); + replyPromise.sendError(operation_obsolete()); return Void(); } } else { sequenceData.send(std::make_pair(reply.end, reply.onlySpilled)); } - reply.begin = req.begin; + reply.begin = reqBegin; } - req.reply.send(reply); + replyPromise.send(reply); return Void(); } +// This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover +ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Reference<LogData> logData) { + self->activePeekStreams++; + + state Version begin = req.begin; + state bool onlySpilled = false; + req.reply.setByteLimit(std::min(SERVER_KNOBS->MAXIMUM_PEEK_BYTES, req.limitBytes)); + loop { + state TLogPeekStreamReply reply; + state Promise<TLogPeekReply> promise; + state Future<TLogPeekReply> future(promise.getFuture()); + try { + wait(req.reply.onReady() && store(reply.rep, future) && + tLogPeekMessages(promise, self, logData, begin, req.tag, req.returnIfBlocked, onlySpilled)); + + reply.rep.begin = begin; + req.reply.send(reply); + begin = reply.rep.end; + onlySpilled = reply.rep.onlySpilled; + if (reply.rep.end > logData->version.get()) { + wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); + } else { + wait(delay(0, g_network->getCurrentTask())); + } + } catch (Error& e) { + self->activePeekStreams--; + TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId) + .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()) + .error(e, true); + + if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { + req.reply.sendError(e); + return Void(); + } else { + throw; + } + } + } +} + ACTOR Future<Void> doQueueCommit(TLogData* self, Reference<LogData> logData, std::vector<Reference<LogData>> missingFinalCommit) { @@ -1930,7 +1982,13 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self, } } when(TLogPeekRequest req = waitNext(tli.peekMessages.getFuture())) { - logData->addActor.send(tLogPeekMessages(self, req, logData)); + logData->addActor.send(tLogPeekMessages( + req.reply, self, logData, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, req.sequence)); + } + when(TLogPeekStreamRequest req = waitNext(tli.peekStreamMessages.getFuture())) { + TraceEvent(SevDebug, "TLogPeekStream", logData->logId) + .detail("Token", tli.peekStreamMessages.getEndpoint().token); + logData->addActor.send(tLogPeekStream(self, req, logData)); } when(TLogPopRequest req = waitNext(tli.popMessages.getFuture())) { logData->addActor.send(tLogPop(self, req, logData)); @@ -2327,6 +2385,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self, recruited.initEndpoints(); DUMPTOKEN(recruited.peekMessages); + DUMPTOKEN(recruited.peekStreamMessages); DUMPTOKEN(recruited.popMessages); DUMPTOKEN(recruited.commit); DUMPTOKEN(recruited.lock); @@ -2537,6 +2596,7 @@ ACTOR Future<Void> tLogStart(TLogData* self, InitializeTLogRequest req, Locality recruited.initEndpoints(); DUMPTOKEN(recruited.peekMessages); + DUMPTOKEN(recruited.peekStreamMessages); DUMPTOKEN(recruited.popMessages); DUMPTOKEN(recruited.commit); DUMPTOKEN(recruited.lock); @@ -2729,7 +2789,8 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData, state TLogData self(tlogId, workerID, persistentData, persistentQueue, db, degraded, folder); state Future<Void> error = actorCollection(self.sharedActors.getFuture()); - TraceEvent("SharedTlog", tlogId).log(); + TraceEvent("SharedTlog", tlogId).detail("Version", "6.0"); + try { if (restoreFromDisk) { wait(restorePersistentState(&self, locality, oldLog, recovered, tlogRequests)); diff --git a/fdbserver/OldTLogServer_6_2.actor.cpp b/fdbserver/OldTLogServer_6_2.actor.cpp index 0fcf76d5a3..284afb1da3 100644 --- a/fdbserver/OldTLogServer_6_2.actor.cpp +++ b/fdbserver/OldTLogServer_6_2.actor.cpp @@ -339,6 +339,7 @@ struct TLogData : NonCopyable { int64_t targetVolatileBytes; // The number of bytes of mutations this TLog should hold in memory before spilling. int64_t overheadBytesInput; int64_t overheadBytesDurable; + int activePeekStreams; WorkerCache<TLogInterface> tlogCache; FlowLock peekMemoryLimiter; @@ -661,6 +662,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> { cc, "QueueDiskBytesTotal", [tLogData]() { return tLogData->rawPersistentQueue->getStorageBytes().total; }); specialCounter(cc, "PeekMemoryReserved", [tLogData]() { return tLogData->peekMemoryLimiter.activePermits(); }); specialCounter(cc, "PeekMemoryRequestsStalled", [tLogData]() { return tLogData->peekMemoryLimiter.waiters(); }); + specialCounter(cc, "ActivePeekStreams", [tLogData]() { return tLogData->activePeekStreams; }); } ~LogData() { @@ -1440,17 +1442,19 @@ ACTOR Future<Void> tLogPopCore(TLogData* self, Tag inputTag, Version to, Referen } uint64_t PoppedVersionLag = logData->persistentDataDurableVersion - logData->queuePoppedVersion; - if ( SERVER_KNOBS->ENABLE_DETAILED_TLOG_POP_TRACE && - (logData->queuePoppedVersion > 0) && //avoid generating massive events at beginning - (tagData->unpoppedRecovered || PoppedVersionLag >= SERVER_KNOBS->TLOG_POPPED_VER_LAG_THRESHOLD_FOR_TLOGPOP_TRACE)) { //when recovery or long lag + if (SERVER_KNOBS->ENABLE_DETAILED_TLOG_POP_TRACE && + (logData->queuePoppedVersion > 0) && // avoid generating massive events at beginning + (tagData->unpoppedRecovered || + PoppedVersionLag >= + SERVER_KNOBS->TLOG_POPPED_VER_LAG_THRESHOLD_FOR_TLOGPOP_TRACE)) { // when recovery or long lag TraceEvent("TLogPopDetails", logData->logId) - .detail("Tag", tagData->tag.toString()) - .detail("UpTo", upTo) - .detail("PoppedVersionLag", PoppedVersionLag) - .detail("MinPoppedTag", logData->minPoppedTag.toString()) - .detail("QueuePoppedVersion", logData->queuePoppedVersion) - .detail("UnpoppedRecovered", tagData->unpoppedRecovered ? "True" : "False") - .detail("NothingPersistent", tagData->nothingPersistent ? "True" : "False"); + .detail("Tag", tagData->tag.toString()) + .detail("UpTo", upTo) + .detail("PoppedVersionLag", PoppedVersionLag) + .detail("MinPoppedTag", logData->minPoppedTag.toString()) + .detail("QueuePoppedVersion", logData->queuePoppedVersion) + .detail("UnpoppedRecovered", tagData->unpoppedRecovered ? "True" : "False") + .detail("NothingPersistent", tagData->nothingPersistent ? "True" : "False"); } if (upTo > logData->persistentDataDurableVersion) wait(tagData->eraseMessagesBefore(upTo, self, logData, TaskPriority::TLogPop)); @@ -1487,15 +1491,16 @@ ACTOR Future<Void> tLogPop(TLogData* self, TLogPopRequest req, Reference<LogData } void peekMessagesFromMemory(Reference<LogData> self, - TLogPeekRequest const& req, + Tag tag, + Version begin, BinaryWriter& messages, Version& endVersion) { ASSERT(!messages.getLength()); - auto& deque = getVersionMessages(self, req.tag); + auto& deque = getVersionMessages(self, tag); //TraceEvent("TLogPeekMem", self->dbgid).detail("Tag", req.tag1).detail("PDS", self->persistentDataSequence).detail("PDDS", self->persistentDataDurableSequence).detail("Oldest", map1.empty() ? 0 : map1.begin()->key ).detail("OldestMsgCount", map1.empty() ? 0 : map1.begin()->value.size()); - Version begin = std::max(req.begin, self->persistentDataDurableVersion + 1); + begin = std::max(begin, self->persistentDataDurableVersion + 1); auto it = std::lower_bound(deque.begin(), deque.end(), std::make_pair(begin, LengthPrefixedStringRef()), @@ -1540,29 +1545,38 @@ ACTOR Future<std::vector<StringRef>> parseMessagesForTag(StringRef commitBlob, T return relevantMessages; } -ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Reference<LogData> logData) { +// Common logics to peek TLog and create TLogPeekReply that serves both streaming peek or normal peek request +ACTOR template <typename PromiseType> +Future<Void> tLogPeekMessages(PromiseType replyPromise, + TLogData* self, + Reference<LogData> logData, + Version reqBegin, + Tag reqTag, + bool reqReturnIfBlocked = false, + bool reqOnlySpilled = false, + Optional<std::pair<UID, int>> reqSequence = Optional<std::pair<UID, int>>()) { state BinaryWriter messages(Unversioned()); state BinaryWriter messages2(Unversioned()); state int sequence = -1; state UID peekId; state double queueStart = now(); - if (req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) { - req.tag.id = req.tag.id % logData->txsTags; + if (reqTag.locality == tagLocalityTxs && reqTag.id >= logData->txsTags && logData->txsTags > 0) { + reqTag.id = reqTag.id % logData->txsTags; } - if (req.sequence.present()) { + if (reqSequence.present()) { try { - peekId = req.sequence.get().first; - sequence = req.sequence.get().second; + peekId = reqSequence.get().first; + sequence = reqSequence.get().second; if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && logData->peekTracker.find(peekId) == logData->peekTracker.end()) { throw operation_obsolete(); } auto& trackerData = logData->peekTracker[peekId]; if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) { - trackerData.tag = req.tag; - trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled)); + trackerData.tag = reqTag; + trackerData.sequence_version[0].send(std::make_pair(reqBegin, reqOnlySpilled)); } auto seqBegin = trackerData.sequence_version.begin(); // The peek cursor and this comparison need to agree about the maximum number of in-flight requests. @@ -1589,12 +1603,12 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen } trackerData.lastUpdate = now(); std::pair<Version, bool> prevPeekData = wait(fPrevPeekData); - req.begin = std::max(prevPeekData.first, req.begin); - req.onlySpilled = prevPeekData.second; + reqBegin = std::max(prevPeekData.first, reqBegin); + reqOnlySpilled = prevPeekData.second; wait(yield()); } catch (Error& e) { if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) { - req.reply.sendError(e); + replyPromise.sendError(e); return Void(); } else { throw; @@ -1604,32 +1618,32 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen state double blockStart = now(); - if (req.returnIfBlocked && logData->version.get() < req.begin) { - req.reply.sendError(end_of_stream()); - if (req.sequence.present()) { + if (reqReturnIfBlocked && logData->version.get() < reqBegin) { + replyPromise.sendError(end_of_stream()); + if (reqSequence.present()) { auto& trackerData = logData->peekTracker[peekId]; auto& sequenceData = trackerData.sequence_version[sequence + 1]; if (!sequenceData.isSet()) { - sequenceData.send(std::make_pair(req.begin, req.onlySpilled)); + sequenceData.send(std::make_pair(reqBegin, reqOnlySpilled)); } } return Void(); } - //TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); + //TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2); // Wait until we have something to return that the caller doesn't already have - if (logData->version.get() < req.begin) { - wait(logData->version.whenAtLeast(req.begin)); + if (logData->version.get() < reqBegin) { + wait(logData->version.whenAtLeast(reqBegin)); wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); } - if (req.tag.locality == tagLocalityLogRouter) { + if (reqTag.locality == tagLocalityLogRouter) { wait(self->concurrentLogRouterReads.take()); state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads); wait(delay(0.0, TaskPriority::Low)); } - if (req.begin <= logData->persistentDataDurableVersion && req.tag.locality != tagLocalityTxs && req.tag != txsTag) { + if (reqBegin <= logData->persistentDataDurableVersion && reqTag.locality != tagLocalityTxs && reqTag != txsTag) { // Reading spilled data will almost always imply that the storage server is >5s behind the rest // of the cluster. We shouldn't prioritize spending CPU on helping this server catch up // slightly faster over keeping the rest of the cluster operating normally. @@ -1640,8 +1654,8 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen state double workStart = now(); - Version poppedVer = poppedVersion(logData, req.tag); - if (poppedVer > req.begin) { + Version poppedVer = poppedVersion(logData, reqTag); + if (poppedVer > reqBegin) { TLogPeekReply rep; rep.maxKnownVersion = logData->version.get(); rep.minKnownCommittedVersion = logData->minKnownCommittedVersion; @@ -1649,12 +1663,12 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen rep.end = poppedVer; rep.onlySpilled = false; - if (req.sequence.present()) { + if (reqSequence.present()) { auto& trackerData = logData->peekTracker[peekId]; auto& sequenceData = trackerData.sequence_version[sequence + 1]; trackerData.lastUpdate = now(); if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) { - req.reply.sendError(operation_obsolete()); + replyPromise.sendError(operation_obsolete()); if (!sequenceData.isSet()) sequenceData.sendError(operation_obsolete()); return Void(); @@ -1662,16 +1676,16 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen if (sequenceData.isSet()) { if (sequenceData.getFuture().get().first != rep.end) { TEST(true); // tlog peek second attempt ended at a different version - req.reply.sendError(operation_obsolete()); + replyPromise.sendError(operation_obsolete()); return Void(); } } else { sequenceData.send(std::make_pair(rep.end, rep.onlySpilled)); } - rep.begin = req.begin; + rep.begin = reqBegin; } - req.reply.send(rep); + replyPromise.send(rep); return Void(); } @@ -1679,24 +1693,24 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen state bool onlySpilled = false; // grab messages from disk - //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); - if (req.begin <= logData->persistentDataDurableVersion) { + //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2); + if (reqBegin <= logData->persistentDataDurableVersion) { // Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We // may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if // an initial attempt to read from disk results in insufficient data and the required data is no longer in // memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the // result? - if (req.onlySpilled) { + if (reqOnlySpilled) { endVersion = logData->persistentDataDurableVersion + 1; } else { - peekMessagesFromMemory(logData, req, messages2, endVersion); + peekMessagesFromMemory(logData, reqTag, reqBegin, messages2, endVersion); } - if (req.tag.locality == tagLocalityTxs || req.tag == txsTag) { + if (reqTag.locality == tagLocalityTxs || reqTag == txsTag) { RangeResult kvs = wait(self->persistentData->readRange( - KeyRangeRef(persistTagMessagesKey(logData->logId, req.tag, req.begin), - persistTagMessagesKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)), + KeyRangeRef(persistTagMessagesKey(logData->logId, reqTag, reqBegin), + persistTagMessagesKey(logData->logId, reqTag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES)); @@ -1716,11 +1730,11 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen // FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow. RangeResult kvrefs = wait(self->persistentData->readRange( KeyRangeRef( - persistTagMessageRefsKey(logData->logId, req.tag, req.begin), - persistTagMessageRefsKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)), + persistTagMessageRefsKey(logData->logId, reqTag, reqBegin), + persistTagMessageRefsKey(logData->logId, reqTag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK + 1)); - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); + //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); state std::vector<std::pair<IDiskQueue::location, IDiskQueue::location>> commitLocations; state bool earlyEnd = false; @@ -1737,7 +1751,7 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen earlyEnd = true; break; } - if (sd.version >= req.begin) { + if (sd.version >= reqBegin) { firstVersion = std::min(firstVersion, sd.version); const IDiskQueue::location end = sd.start.lo + sd.length; commitLocations.emplace_back(sd.start, end); @@ -1779,7 +1793,7 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen messages << VERSION_HEADER << entry.version; std::vector<StringRef> rawMessages = - wait(parseMessagesForTag(entry.messages, req.tag, logData->logRouterTags)); + wait(parseMessagesForTag(entry.messages, reqTag, logData->logRouterTags)); for (const StringRef& msg : rawMessages) { messages.serializeBytes(msg); } @@ -1799,25 +1813,25 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen } } } else { - if (req.onlySpilled) { + if (reqOnlySpilled) { endVersion = logData->persistentDataDurableVersion + 1; } else { - peekMessagesFromMemory(logData, req, messages, endVersion); + peekMessagesFromMemory(logData, reqTag, reqBegin, messages, endVersion); } - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); + //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); } TLogPeekReply reply; reply.maxKnownVersion = logData->version.get(); reply.minKnownCommittedVersion = logData->minKnownCommittedVersion; - reply.messages = messages.toValue(); + reply.messages = StringRef(reply.arena, messages.toValue()); reply.end = endVersion; reply.onlySpilled = onlySpilled; - //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()); + //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()); - if (req.sequence.present()) { + if (reqSequence.present()) { auto& trackerData = logData->peekTracker[peekId]; trackerData.lastUpdate = now(); @@ -1841,7 +1855,7 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen auto& sequenceData = trackerData.sequence_version[sequence + 1]; if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) { - req.reply.sendError(operation_obsolete()); + replyPromise.sendError(operation_obsolete()); if (!sequenceData.isSet()) sequenceData.sendError(operation_obsolete()); return Void(); @@ -1850,19 +1864,59 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen trackerData.duplicatePeeks++; if (sequenceData.getFuture().get().first != reply.end) { TEST(true); // tlog peek second attempt ended at a different version (2) - req.reply.sendError(operation_obsolete()); + replyPromise.sendError(operation_obsolete()); return Void(); } } else { sequenceData.send(std::make_pair(reply.end, reply.onlySpilled)); } - reply.begin = req.begin; + reply.begin = reqBegin; } - req.reply.send(reply); + replyPromise.send(reply); return Void(); } +// This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover +ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Reference<LogData> logData) { + self->activePeekStreams++; + + state Version begin = req.begin; + state bool onlySpilled = false; + req.reply.setByteLimit(std::min(SERVER_KNOBS->MAXIMUM_PEEK_BYTES, req.limitBytes)); + loop { + state TLogPeekStreamReply reply; + state Promise<TLogPeekReply> promise; + state Future<TLogPeekReply> future(promise.getFuture()); + try { + wait(req.reply.onReady() && store(reply.rep, future) && + tLogPeekMessages(promise, self, logData, begin, req.tag, req.returnIfBlocked, onlySpilled)); + + reply.rep.begin = begin; + req.reply.send(reply); + begin = reply.rep.end; + onlySpilled = reply.rep.onlySpilled; + if (reply.rep.end > logData->version.get()) { + wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); + } else { + wait(delay(0, g_network->getCurrentTask())); + } + } catch (Error& e) { + self->activePeekStreams--; + TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId) + .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()) + .error(e, true); + + if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { + req.reply.sendError(e); + return Void(); + } else { + throw; + } + } + } +} + ACTOR Future<Void> watchDegraded(TLogData* self) { if (g_network->isSimulated() && g_simulator.speedUpSimulation) { return Void(); @@ -2373,7 +2427,13 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self, } } when(TLogPeekRequest req = waitNext(tli.peekMessages.getFuture())) { - logData->addActor.send(tLogPeekMessages(self, req, logData)); + logData->addActor.send(tLogPeekMessages( + req.reply, self, logData, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, req.sequence)); + } + when(TLogPeekStreamRequest req = waitNext(tli.peekStreamMessages.getFuture())) { + TraceEvent(SevDebug, "TLogPeekStream", logData->logId) + .detail("Token", tli.peekStreamMessages.getEndpoint().token); + logData->addActor.send(tLogPeekStream(self, req, logData)); } when(TLogPopRequest req = waitNext(tli.popMessages.getFuture())) { logData->addActor.send(tLogPop(self, req, logData)); @@ -2788,6 +2848,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self, recruited.initEndpoints(); DUMPTOKEN(recruited.peekMessages); + DUMPTOKEN(recruited.peekStreamMessages); DUMPTOKEN(recruited.popMessages); DUMPTOKEN(recruited.commit); DUMPTOKEN(recruited.lock); @@ -2826,9 +2887,9 @@ ACTOR Future<Void> restorePersistentState(TLogData* self, logsByVersion.emplace_back(ver, id1); TraceEvent("TLogPersistentStateRestore", self->dbgid) - .detail("LogId", logData->logId) - .detail("Ver", ver) - .detail("RecoveryCount", logData->recoveryCount); + .detail("LogId", logData->logId) + .detail("Ver", ver) + .detail("RecoveryCount", logData->recoveryCount); // Restore popped keys. Pop operations that took place after the last (committed) updatePersistentDataVersion // might be lost, but that is fine because we will get the corresponding data back, too. tagKeys = prefixRange(rawId.withPrefix(persistTagPoppedKeys.begin)); @@ -3019,6 +3080,7 @@ ACTOR Future<Void> tLogStart(TLogData* self, InitializeTLogRequest req, Locality recruited.initEndpoints(); DUMPTOKEN(recruited.peekMessages); + DUMPTOKEN(recruited.peekStreamMessages); DUMPTOKEN(recruited.popMessages); DUMPTOKEN(recruited.commit); DUMPTOKEN(recruited.lock); @@ -3218,7 +3280,8 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData, state TLogData self(tlogId, workerID, persistentData, persistentQueue, db, degraded, folder); state Future<Void> error = actorCollection(self.sharedActors.getFuture()); - TraceEvent("SharedTlog", tlogId).log(); + TraceEvent("SharedTlog", tlogId).detail("Version", "6.2"); + try { if (restoreFromDisk) { wait(restorePersistentState(&self, locality, oldLog, recovered, tlogRequests)); diff --git a/fdbserver/QuietDatabase.actor.cpp b/fdbserver/QuietDatabase.actor.cpp index 5e8ec3b976..6e89cb58a7 100644 --- a/fdbserver/QuietDatabase.actor.cpp +++ b/fdbserver/QuietDatabase.actor.cpp @@ -637,13 +637,12 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx, // In a simulated environment, wait 5 seconds so that workers can move to their optimal locations if (g_network->isSimulated()) wait(delay(5.0)); - // The quiet database check (which runs at the end of every test) will always time out due to active data movement. // To get around this, quiet Database will disable the perpetual wiggle in the setup phase. printf("Set perpetual_storage_wiggle=0 ...\n"); wait(setPerpetualStorageWiggle(cx, false, LockAware::True)); printf("Set perpetual_storage_wiggle=0 Done.\n"); - + // Require 3 consecutive successful quiet database checks spaced 2 second apart state int numSuccesses = 0; diff --git a/fdbserver/TLogInterface.h b/fdbserver/TLogInterface.h index 787e00ed2c..1ca049927d 100644 --- a/fdbserver/TLogInterface.h +++ b/fdbserver/TLogInterface.h @@ -38,6 +38,8 @@ struct TLogInterface { UID sharedTLogID; RequestStream<struct TLogPeekRequest> peekMessages; + RequestStream<struct TLogPeekStreamRequest> + peekStreamMessages; // request establish a peek stream with the TLog server RequestStream<struct TLogPopRequest> popMessages; RequestStream<struct TLogCommitRequest> commit; @@ -68,6 +70,7 @@ struct TLogInterface { NetworkAddressList addresses() const { return peekMessages.getEndpoint().addresses; } void initEndpoints() { + // NOTE: the adding order should be the same as the hardcoded indices in serialize() std::vector<std::pair<FlowReceiver*, TaskPriority>> streams; streams.push_back(peekMessages.getReceiver(TaskPriority::TLogPeek)); streams.push_back(popMessages.getReceiver(TaskPriority::TLogPop)); @@ -80,6 +83,7 @@ struct TLogInterface { streams.push_back(disablePopRequest.getReceiver()); streams.push_back(enablePopRequest.getReceiver()); streams.push_back(snapRequest.getReceiver()); + streams.push_back(peekStreamMessages.getReceiver(TaskPriority::TLogPeek)); FlowTransport::transport().addEndpoints(streams); } @@ -106,6 +110,8 @@ struct TLogInterface { enablePopRequest = RequestStream<struct TLogEnablePopRequest>(peekMessages.getEndpoint().getAdjustedEndpoint(9)); snapRequest = RequestStream<struct TLogSnapRequest>(peekMessages.getEndpoint().getAdjustedEndpoint(10)); + peekStreamMessages = + RequestStream<struct TLogPeekStreamRequest>(peekMessages.getEndpoint().getAdjustedEndpoint(11)); } } }; @@ -209,6 +215,40 @@ struct TLogPeekRequest { } }; +struct TLogPeekStreamReply : public ReplyPromiseStreamReply { + constexpr static FileIdentifier file_identifier = 10072848; + TLogPeekReply rep; + + TLogPeekStreamReply() = default; + explicit TLogPeekStreamReply(const TLogPeekReply& rep) : rep(rep) {} + + int expectedSize() const { return rep.messages.expectedSize() + sizeof(TLogPeekStreamReply); } + + template <class Ar> + void serialize(Ar& ar) { + serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, rep); + } +}; + +struct TLogPeekStreamRequest { + constexpr static FileIdentifier file_identifier = 10072821; + Arena arena; + Version begin; + Tag tag; + bool returnIfBlocked; + int limitBytes; + ReplyPromiseStream<TLogPeekStreamReply> reply; + + TLogPeekStreamRequest() {} + TLogPeekStreamRequest(Version version, Tag tag, bool returnIfBlocked, int limitBytes) + : begin(version), tag(tag), returnIfBlocked(returnIfBlocked), limitBytes(limitBytes) {} + + template <class Ar> + void serialize(Ar& ar) { + serializer(ar, arena, begin, tag, returnIfBlocked, limitBytes, reply); + } +}; + struct TLogPopRequest { constexpr static FileIdentifier file_identifier = 5556423; Arena arena; diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index df0c3a07d3..6608468860 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -341,6 +341,7 @@ struct TLogData : NonCopyable { int64_t targetVolatileBytes; // The number of bytes of mutations this TLog should hold in memory before spilling. int64_t overheadBytesInput; int64_t overheadBytesDurable; + int activePeekStreams = 0; WorkerCache<TLogInterface> tlogCache; FlowLock peekMemoryLimiter; @@ -667,6 +668,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> { specialCounter(cc, "PeekMemoryReserved", [tLogData]() { return tLogData->peekMemoryLimiter.activePermits(); }); specialCounter(cc, "PeekMemoryRequestsStalled", [tLogData]() { return tLogData->peekMemoryLimiter.waiters(); }); specialCounter(cc, "Generation", [this]() { return this->recoveryCount; }); + specialCounter(cc, "ActivePeekStreams", [tLogData]() { return tLogData->activePeekStreams; }); } ~LogData() { @@ -1166,17 +1168,19 @@ ACTOR Future<Void> tLogPopCore(TLogData* self, Tag inputTag, Version to, Referen } uint64_t PoppedVersionLag = logData->persistentDataDurableVersion - logData->queuePoppedVersion; - if ( SERVER_KNOBS->ENABLE_DETAILED_TLOG_POP_TRACE && - (logData->queuePoppedVersion > 0) && //avoid generating massive events at beginning - (tagData->unpoppedRecovered || PoppedVersionLag >= SERVER_KNOBS->TLOG_POPPED_VER_LAG_THRESHOLD_FOR_TLOGPOP_TRACE)) { //when recovery or long lag + if (SERVER_KNOBS->ENABLE_DETAILED_TLOG_POP_TRACE && + (logData->queuePoppedVersion > 0) && // avoid generating massive events at beginning + (tagData->unpoppedRecovered || + PoppedVersionLag >= + SERVER_KNOBS->TLOG_POPPED_VER_LAG_THRESHOLD_FOR_TLOGPOP_TRACE)) { // when recovery or long lag TraceEvent("TLogPopDetails", logData->logId) - .detail("Tag", tagData->tag.toString()) - .detail("UpTo", upTo) - .detail("PoppedVersionLag", PoppedVersionLag) - .detail("MinPoppedTag", logData->minPoppedTag.toString()) - .detail("QueuePoppedVersion", logData->queuePoppedVersion) - .detail("UnpoppedRecovered", tagData->unpoppedRecovered ? "True" : "False") - .detail("NothingPersistent", tagData->nothingPersistent ? "True" : "False"); + .detail("Tag", tagData->tag.toString()) + .detail("UpTo", upTo) + .detail("PoppedVersionLag", PoppedVersionLag) + .detail("MinPoppedTag", logData->minPoppedTag.toString()) + .detail("QueuePoppedVersion", logData->queuePoppedVersion) + .detail("UnpoppedRecovered", tagData->unpoppedRecovered ? "True" : "False") + .detail("NothingPersistent", tagData->nothingPersistent ? "True" : "False"); } if (upTo > logData->persistentDataDurableVersion) wait(tagData->eraseMessagesBefore(upTo, self, logData, TaskPriority::TLogPop)); @@ -1518,15 +1522,16 @@ std::deque<std::pair<Version, LengthPrefixedStringRef>>& getVersionMessages(Refe }; void peekMessagesFromMemory(Reference<LogData> self, - TLogPeekRequest const& req, + Tag tag, + Version begin, BinaryWriter& messages, Version& endVersion) { ASSERT(!messages.getLength()); - auto& deque = getVersionMessages(self, req.tag); + auto& deque = getVersionMessages(self, tag); //TraceEvent("TLogPeekMem", self->dbgid).detail("Tag", req.tag1).detail("PDS", self->persistentDataSequence).detail("PDDS", self->persistentDataDurableSequence).detail("Oldest", map1.empty() ? 0 : map1.begin()->key ).detail("OldestMsgCount", map1.empty() ? 0 : map1.begin()->value.size()); - Version begin = std::max(req.begin, self->persistentDataDurableVersion + 1); + begin = std::max(begin, self->persistentDataDurableVersion + 1); auto it = std::lower_bound(deque.begin(), deque.end(), std::make_pair(begin, LengthPrefixedStringRef()), @@ -1552,7 +1557,7 @@ void peekMessagesFromMemory(Reference<LogData> self, void* data = messages.getData(); DEBUG_TAGS_AND_MESSAGE( "TLogPeek", currentVersion, StringRef((uint8_t*)data + offset, messages.getLength() - offset), self->logId) - .detail("PeekTag", req.tag); + .detail("PeekTag", tag); } } @@ -1578,29 +1583,38 @@ ACTOR Future<std::vector<StringRef>> parseMessagesForTag(StringRef commitBlob, T return relevantMessages; } -ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Reference<LogData> logData) { +// Common logics to peek TLog and create TLogPeekReply that serves both streaming peek or normal peek request +ACTOR template <typename PromiseType> +Future<Void> tLogPeekMessages(PromiseType replyPromise, + TLogData* self, + Reference<LogData> logData, + Version reqBegin, + Tag reqTag, + bool reqReturnIfBlocked = false, + bool reqOnlySpilled = false, + Optional<std::pair<UID, int>> reqSequence = Optional<std::pair<UID, int>>()) { state BinaryWriter messages(Unversioned()); state BinaryWriter messages2(Unversioned()); state int sequence = -1; state UID peekId; state double queueStart = now(); - if (req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) { - req.tag.id = req.tag.id % logData->txsTags; + if (reqTag.locality == tagLocalityTxs && reqTag.id >= logData->txsTags && logData->txsTags > 0) { + reqTag.id = reqTag.id % logData->txsTags; } - if (req.sequence.present()) { + if (reqSequence.present()) { try { - peekId = req.sequence.get().first; - sequence = req.sequence.get().second; + peekId = reqSequence.get().first; + sequence = reqSequence.get().second; if (sequence >= SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && logData->peekTracker.find(peekId) == logData->peekTracker.end()) { throw operation_obsolete(); } auto& trackerData = logData->peekTracker[peekId]; if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) { - trackerData.tag = req.tag; - trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled)); + trackerData.tag = reqTag; + trackerData.sequence_version[0].send(std::make_pair(reqBegin, reqOnlySpilled)); } auto seqBegin = trackerData.sequence_version.begin(); // The peek cursor and this comparison need to agree about the maximum number of in-flight requests. @@ -1627,12 +1641,12 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen } trackerData.lastUpdate = now(); std::pair<Version, bool> prevPeekData = wait(fPrevPeekData); - req.begin = std::max(prevPeekData.first, req.begin); - req.onlySpilled = prevPeekData.second; + reqBegin = std::max(prevPeekData.first, reqBegin); + reqOnlySpilled = prevPeekData.second; wait(yield()); } catch (Error& e) { if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete) { - req.reply.sendError(e); + replyPromise.sendError(e); return Void(); } else { throw; @@ -1642,33 +1656,33 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen state double blockStart = now(); - if (req.returnIfBlocked && logData->version.get() < req.begin) { - req.reply.sendError(end_of_stream()); - if (req.sequence.present()) { + if (reqReturnIfBlocked && logData->version.get() < reqBegin) { + replyPromise.sendError(end_of_stream()); + if (reqSequence.present()) { auto& trackerData = logData->peekTracker[peekId]; auto& sequenceData = trackerData.sequence_version[sequence + 1]; trackerData.lastUpdate = now(); if (!sequenceData.isSet()) { - sequenceData.send(std::make_pair(req.begin, req.onlySpilled)); + sequenceData.send(std::make_pair(reqBegin, reqOnlySpilled)); } } return Void(); } - //TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); + //TraceEvent("TLogPeekMessages0", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2); // Wait until we have something to return that the caller doesn't already have - if (logData->version.get() < req.begin) { - wait(logData->version.whenAtLeast(req.begin)); + if (logData->version.get() < reqBegin) { + wait(logData->version.whenAtLeast(reqBegin)); wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); } - if (logData->locality != tagLocalitySatellite && req.tag.locality == tagLocalityLogRouter) { + if (logData->locality != tagLocalitySatellite && reqTag.locality == tagLocalityLogRouter) { wait(self->concurrentLogRouterReads.take()); state FlowLock::Releaser globalReleaser(self->concurrentLogRouterReads); wait(delay(0.0, TaskPriority::Low)); } - if (req.begin <= logData->persistentDataDurableVersion && req.tag.locality != tagLocalityTxs && req.tag != txsTag) { + if (reqBegin <= logData->persistentDataDurableVersion && reqTag.locality != tagLocalityTxs && reqTag != txsTag) { // Reading spilled data will almost always imply that the storage server is >5s behind the rest // of the cluster. We shouldn't prioritize spending CPU on helping this server catch up // slightly faster over keeping the rest of the cluster operating normally. @@ -1679,8 +1693,8 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen state double workStart = now(); - Version poppedVer = poppedVersion(logData, req.tag); - if (poppedVer > req.begin) { + Version poppedVer = poppedVersion(logData, reqTag); + if (poppedVer > reqBegin) { TLogPeekReply rep; rep.maxKnownVersion = logData->version.get(); rep.minKnownCommittedVersion = logData->minKnownCommittedVersion; @@ -1688,12 +1702,12 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen rep.end = poppedVer; rep.onlySpilled = false; - if (req.sequence.present()) { + if (reqSequence.present()) { auto& trackerData = logData->peekTracker[peekId]; auto& sequenceData = trackerData.sequence_version[sequence + 1]; trackerData.lastUpdate = now(); if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) { - req.reply.sendError(operation_obsolete()); + replyPromise.sendError(operation_obsolete()); if (!sequenceData.isSet()) sequenceData.sendError(operation_obsolete()); return Void(); @@ -1701,16 +1715,16 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen if (sequenceData.isSet()) { if (sequenceData.getFuture().get().first != rep.end) { TEST(true); // tlog peek second attempt ended at a different version - req.reply.sendError(operation_obsolete()); + replyPromise.sendError(operation_obsolete()); return Void(); } } else { sequenceData.send(std::make_pair(rep.end, rep.onlySpilled)); } - rep.begin = req.begin; + rep.begin = reqBegin; } - req.reply.send(rep); + replyPromise.send(rep); return Void(); } @@ -1718,24 +1732,24 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen state bool onlySpilled = false; // grab messages from disk - //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2); - if (req.begin <= logData->persistentDataDurableVersion) { + //TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", reqBegin.epoch).detail("ReqBeginSeq", reqBegin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", reqTag1).detail("Tag2", reqTag2); + if (reqBegin <= logData->persistentDataDurableVersion) { // Just in case the durable version changes while we are waiting for the read, we grab this data from memory. We // may or may not actually send it depending on whether we get enough data from disk. SOMEDAY: Only do this if // an initial attempt to read from disk results in insufficient data and the required data is no longer in // memory SOMEDAY: Should we only send part of the messages we collected, to actually limit the size of the // result? - if (req.onlySpilled) { + if (reqOnlySpilled) { endVersion = logData->persistentDataDurableVersion + 1; } else { - peekMessagesFromMemory(logData, req, messages2, endVersion); + peekMessagesFromMemory(logData, reqTag, reqBegin, messages2, endVersion); } - if (logData->shouldSpillByValue(req.tag)) { + if (logData->shouldSpillByValue(reqTag)) { RangeResult kvs = wait(self->persistentData->readRange( - KeyRangeRef(persistTagMessagesKey(logData->logId, req.tag, req.begin), - persistTagMessagesKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)), + KeyRangeRef(persistTagMessagesKey(logData->logId, reqTag, reqBegin), + persistTagMessagesKey(logData->logId, reqTag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->DESIRED_TOTAL_BYTES, SERVER_KNOBS->DESIRED_TOTAL_BYTES)); @@ -1755,11 +1769,11 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen // FIXME: Limit to approximately DESIRED_TOTATL_BYTES somehow. RangeResult kvrefs = wait(self->persistentData->readRange( KeyRangeRef( - persistTagMessageRefsKey(logData->logId, req.tag, req.begin), - persistTagMessageRefsKey(logData->logId, req.tag, logData->persistentDataDurableVersion + 1)), + persistTagMessageRefsKey(logData->logId, reqTag, reqBegin), + persistTagMessageRefsKey(logData->logId, reqTag, logData->persistentDataDurableVersion + 1)), SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_BATCHES_PER_PEEK + 1)); - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); + //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()).detail("Tag1Results", s1).detail("Tag2Results", s2).detail("Tag1ResultsLim", kv1.size()).detail("Tag2ResultsLim", kv2.size()).detail("Tag1ResultsLast", kv1.size() ? kv1[0].key : "").detail("Tag2ResultsLast", kv2.size() ? kv2[0].key : "").detail("Limited", limited).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowEpoch", self->epoch()).detail("NowSeq", self->sequence.getNextSequence()); state std::vector<std::pair<IDiskQueue::location, IDiskQueue::location>> commitLocations; state bool earlyEnd = false; @@ -1776,7 +1790,7 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen earlyEnd = true; break; } - if (sd.version >= req.begin) { + if (sd.version >= reqBegin) { firstVersion = std::min(firstVersion, sd.version); const IDiskQueue::location end = sd.start.lo + sd.length; commitLocations.emplace_back(sd.start, end); @@ -1818,12 +1832,12 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen messages << VERSION_HEADER << entry.version; std::vector<StringRef> rawMessages = - wait(parseMessagesForTag(entry.messages, req.tag, logData->logRouterTags)); + wait(parseMessagesForTag(entry.messages, reqTag, logData->logRouterTags)); for (const StringRef& msg : rawMessages) { messages.serializeBytes(msg); DEBUG_TAGS_AND_MESSAGE("TLogPeekFromDisk", entry.version, msg, logData->logId) .detail("DebugID", self->dbgid) - .detail("PeekTag", req.tag); + .detail("PeekTag", reqTag); } lastRefMessageVersion = entry.version; @@ -1841,28 +1855,28 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen } } } else { - if (req.onlySpilled) { + if (reqOnlySpilled) { endVersion = logData->persistentDataDurableVersion + 1; } else { - peekMessagesFromMemory(logData, req, messages, endVersion); + peekMessagesFromMemory(logData, reqTag, reqBegin, messages, endVersion); } - //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); + //TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence()); } TLogPeekReply reply; reply.maxKnownVersion = logData->version.get(); reply.minKnownCommittedVersion = logData->minKnownCommittedVersion; - reply.messages = messages.toValue(); + reply.messages = StringRef(reply.arena, messages.toValue()); reply.end = endVersion; reply.onlySpilled = onlySpilled; - //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("Tag", req.tag.toString()). - // detail("BeginVer", req.begin).detail("EndVer", reply.end). + //TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("Tag", reqTag.toString()). + // detail("BeginVer", reqBegin).detail("EndVer", reply.end). // detail("MsgBytes", reply.messages.expectedSize()). - // detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()); + // detail("ForAddress", replyPromise.getEndpoint().getPrimaryAddress()); - if (req.sequence.present()) { + if (reqSequence.present()) { auto& trackerData = logData->peekTracker[peekId]; trackerData.lastUpdate = now(); @@ -1886,9 +1900,9 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen auto& sequenceData = trackerData.sequence_version[sequence + 1]; if (trackerData.sequence_version.size() && sequence + 1 < trackerData.sequence_version.begin()->first) { - req.reply.sendError(operation_obsolete()); + replyPromise.sendError(operation_obsolete()); if (!sequenceData.isSet()) { - // It would technically be more correct to .send({req.begin, req.onlySpilled}), as the next + // It would technically be more correct to .send({reqBegin, reqOnlySpilled}), as the next // request might still be in the window of active requests, but LogSystemPeekCursor will // throw away all future responses upon getting an operation_obsolete(), so computing a // response will probably be a waste of CPU. @@ -1900,19 +1914,59 @@ ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Referen trackerData.duplicatePeeks++; if (sequenceData.getFuture().get().first != reply.end) { TEST(true); // tlog peek second attempt ended at a different version (2) - req.reply.sendError(operation_obsolete()); + replyPromise.sendError(operation_obsolete()); return Void(); } } else { sequenceData.send(std::make_pair(reply.end, reply.onlySpilled)); } - reply.begin = req.begin; + reply.begin = reqBegin; } - req.reply.send(reply); + replyPromise.send(reply); return Void(); } +// This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover +ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Reference<LogData> logData) { + self->activePeekStreams++; + + state Version begin = req.begin; + state bool onlySpilled = false; + req.reply.setByteLimit(std::min(SERVER_KNOBS->MAXIMUM_PEEK_BYTES, req.limitBytes)); + loop { + state TLogPeekStreamReply reply; + state Promise<TLogPeekReply> promise; + state Future<TLogPeekReply> future(promise.getFuture()); + try { + wait(req.reply.onReady() && store(reply.rep, future) && + tLogPeekMessages(promise, self, logData, begin, req.tag, req.returnIfBlocked, onlySpilled)); + + reply.rep.begin = begin; + req.reply.send(reply); + begin = reply.rep.end; + onlySpilled = reply.rep.onlySpilled; + if (reply.rep.end > logData->version.get()) { + wait(delay(SERVER_KNOBS->TLOG_PEEK_DELAY, g_network->getCurrentTask())); + } else { + wait(delay(0, g_network->getCurrentTask())); + } + } catch (Error& e) { + self->activePeekStreams--; + TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId) + .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()) + .error(e, true); + + if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { + req.reply.sendError(e); + return Void(); + } else { + throw; + } + } + } +} + ACTOR Future<Void> doQueueCommit(TLogData* self, Reference<LogData> logData, std::vector<Reference<LogData>> missingFinalCommit) { @@ -2408,8 +2462,14 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self, logData->logSystem->set(Reference<ILogSystem>()); } } + when(TLogPeekStreamRequest req = waitNext(tli.peekStreamMessages.getFuture())) { + TraceEvent(SevDebug, "TLogPeekStream", logData->logId) + .detail("Token", tli.peekStreamMessages.getEndpoint().token); + logData->addActor.send(tLogPeekStream(self, req, logData)); + } when(TLogPeekRequest req = waitNext(tli.peekMessages.getFuture())) { - logData->addActor.send(tLogPeekMessages(self, req, logData)); + logData->addActor.send(tLogPeekMessages( + req.reply, self, logData, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, req.sequence)); } when(TLogPopRequest req = waitNext(tli.popMessages.getFuture())) { logData->addActor.send(tLogPop(self, req, logData)); @@ -2664,7 +2724,7 @@ ACTOR Future<Void> tLogCore(TLogData* self, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &logData->cc, logData->logId.toString() + "/TLogMetrics", - [self=self](TraceEvent& te) { + [self = self](TraceEvent& te) { StorageBytes sbTlog = self->persistentData->getStorageBytes(); te.detail("KvstoreBytesUsed", sbTlog.used); te.detail("KvstoreBytesFree", sbTlog.free); @@ -2848,6 +2908,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self, recruited.initEndpoints(); DUMPTOKEN(recruited.peekMessages); + DUMPTOKEN(recruited.peekStreamMessages); DUMPTOKEN(recruited.popMessages); DUMPTOKEN(recruited.commit); DUMPTOKEN(recruited.lock); @@ -2894,9 +2955,9 @@ ACTOR Future<Void> restorePersistentState(TLogData* self, logsByVersion.emplace_back(ver, id1); TraceEvent("TLogPersistentStateRestore", self->dbgid) - .detail("LogId", logData->logId) - .detail("Ver", ver) - .detail("RecoveryCount", logData->recoveryCount); + .detail("LogId", logData->logId) + .detail("Ver", ver) + .detail("RecoveryCount", logData->recoveryCount); // Restore popped keys. Pop operations that took place after the last (committed) updatePersistentDataVersion // might be lost, but that is fine because we will get the corresponding data back, too. tagKeys = prefixRange(rawId.withPrefix(persistTagPoppedKeys.begin)); @@ -3109,6 +3170,7 @@ ACTOR Future<Void> tLogStart(TLogData* self, InitializeTLogRequest req, Locality recruited.initEndpoints(); DUMPTOKEN(recruited.peekMessages); + DUMPTOKEN(recruited.peekStreamMessages); DUMPTOKEN(recruited.popMessages); DUMPTOKEN(recruited.commit); DUMPTOKEN(recruited.lock); diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index a9760390a9..ea4de5e7af 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -1912,6 +1912,7 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile, startRole(Role::LOG_ROUTER, recruited.id(), interf.id(), details); DUMPTOKEN(recruited.peekMessages); + DUMPTOKEN(recruited.peekStreamMessages); DUMPTOKEN(recruited.popMessages); DUMPTOKEN(recruited.commit); DUMPTOKEN(recruited.lock); diff --git a/flow/Trace.h b/flow/Trace.h index 467422b9cc..aeaabb4373 100644 --- a/flow/Trace.h +++ b/flow/Trace.h @@ -599,4 +599,5 @@ extern TraceBatch g_traceBatch; #define DUMPTOKEN(name) \ TraceEvent("DumpToken", recruited.id()).detail("Name", #name).detail("Token", name.getEndpoint().token) +#define DisabledTraceEvent(...) false && TraceEvent() #endif