diff --git a/fdbrpc/fdbrpc.h b/fdbrpc/fdbrpc.h index 403d8d4dc2..b0443d2a23 100644 --- a/fdbrpc/fdbrpc.h +++ b/fdbrpc/fdbrpc.h @@ -277,9 +277,9 @@ struct AcknowledgementReceiver final : FlowReceiver, FastAllocated<Acknowledgeme using FastAllocated<AcknowledgementReceiver>::operator new; using FastAllocated<AcknowledgementReceiver>::operator delete; - int64_t bytesSent; - int64_t bytesAcknowledged; - int64_t bytesLimit; + uint64_t bytesSent; + uint64_t bytesAcknowledged; + uint64_t bytesLimit; Promise<Void> ready; Future<Void> failures; @@ -358,11 +358,19 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>, // send an ack immediately if (acknowledgements.getRawEndpoint().isValid()) { acknowledgements.bytesAcknowledged += message.get().asUnderlyingType().expectedSize(); - FlowTransport::transport().sendUnreliable( - SerializeSource<ErrorOr<AcknowledgementReply>>( - AcknowledgementReply(acknowledgements.bytesAcknowledged)), - acknowledgements.getEndpoint(TaskPriority::ReadSocket), - false); + // int64_t overflow: we need to reset this stream + if (acknowledgements.bytesAcknowledged > std::numeric_limits<int64_t>::max()) { + FlowTransport::transport().sendUnreliable( + SerializeSource<ErrorOr<AcknowledgementReply>>(operation_obsolete()), + acknowledgements.getEndpoint(TaskPriority::ReadSocket), + false); + } else { + FlowTransport::transport().sendUnreliable( + SerializeSource<ErrorOr<AcknowledgementReply>>( + AcknowledgementReply(acknowledgements.bytesAcknowledged)), + acknowledgements.getEndpoint(TaskPriority::ReadSocket), + false); + } } } @@ -376,10 +384,17 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>, // A reply that has been queued up is being consumed, so send an ack to the server if (acknowledgements.getRawEndpoint().isValid()) { acknowledgements.bytesAcknowledged += res.expectedSize(); - FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<AcknowledgementReply>>( - AcknowledgementReply(acknowledgements.bytesAcknowledged)), - acknowledgements.getEndpoint(TaskPriority::ReadSocket), - false); + if (acknowledgements.bytesAcknowledged > std::numeric_limits<int64_t>::max()) { + FlowTransport::transport().sendUnreliable( + SerializeSource<ErrorOr<AcknowledgementReply>>(operation_obsolete()), + acknowledgements.getEndpoint(TaskPriority::ReadSocket), + false); + } else { + FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<AcknowledgementReply>>( + AcknowledgementReply(acknowledgements.bytesAcknowledged)), + acknowledgements.getEndpoint(TaskPriority::ReadSocket), + false); + } } return res; } @@ -406,7 +421,6 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>, template <class T> class ReplyPromiseStream { public: - // stream.send( request ) // Unreliable at most once delivery: Delivers request unless there is a connection failure (zero or one times) @@ -475,8 +489,8 @@ public: errors->delPromiseRef(); } - // The endpoints of a ReplyPromiseStream must be initialized at Task::ReadSocket, because with lower priorities a - // delay(0) in FlowTransport deliver can cause out of order delivery. + // The endpoints of a ReplyPromiseStream must be initialized at Task::ReadSocket, because with lower priorities a + // delay(0) in FlowTransport deliver can cause out of order delivery. const Endpoint& getEndpoint() const { return queue->getEndpoint(TaskPriority::ReadSocket); } bool operator==(const ReplyPromiseStream<T>& rhs) const { return queue == rhs.queue; } diff --git a/fdbserver/LogSystemPeekCursor.actor.cpp b/fdbserver/LogSystemPeekCursor.actor.cpp index 809b126e70..154d870f88 100644 --- a/fdbserver/LogSystemPeekCursor.actor.cpp +++ b/fdbserver/LogSystemPeekCursor.actor.cpp @@ -35,7 +35,7 @@ void tryEstablishPeekStream(ILogSystem::ServerPeekCursor* self) { } self->peekReplyStream = self->interf->get().interf().peekStreamMessages.getReplyStream(TLogPeekStreamRequest( self->messageVersion.version, self->tag, self->returnIfBlocked, std::numeric_limits<int>::max())); - TraceEvent(SevDebug, "StreamCreated"); + TraceEvent(SevDebug, "SPC_StreamCreated", self->randomID); } ILogSystem::ServerPeekCursor::ServerPeekCursor(Reference<AsyncVar<OptionalInterface<TLogInterface>>> const& interf, @@ -51,7 +51,11 @@ ILogSystem::ServerPeekCursor::ServerPeekCursor(Reference<AsyncVar<OptionalInterf resetCheck(Void()), usePeekStream(SERVER_KNOBS->PEEK_USEING_STREAMING) { this->results.maxKnownVersion = 0; this->results.minKnownCommittedVersion = 0; - //TraceEvent("SPC_Starting", randomID).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).backtrace(); + TraceEvent("SPC_Starting", randomID) + .detail("Tag", tag.toString()) + .detail("Begin", begin) + .detail("End", end) + .backtrace(); } ILogSystem::ServerPeekCursor::ServerPeekCursor(TLogPeekReply const& results, @@ -316,29 +320,34 @@ ACTOR Future<Void> serverPeekParallelGetMore(ILogSystem::ServerPeekCursor* self, ACTOR Future<Void> serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, TaskPriority taskID) { if (!self->interf || self->isExhausted()) { + self->peekReplyStream.reset(); if (self->hasMessage()) return Void(); return Never(); } - tryEstablishPeekStream(self); loop { try { + tryEstablishPeekStream(self); + state Future<TLogPeekReply> fPeekReply = self->peekReplyStream.present() + ? map(waitAndForward(self->peekReplyStream.get().getFuture()), + [](const TLogPeekStreamReply& r) { return r.rep; }) + : Never(); choose { when(wait(self->interf->onChange())) { self->onlySpilled = false; self->peekReplyStream.reset(); - tryEstablishPeekStream(self); } - when(TLogPeekStreamReply res = + when(TLogPeekReply res = wait(self->peekReplyStream.present() - ? brokenPromiseToNever(waitAndForward(self->peekReplyStream.get().getFuture())) + ? recordRequestMetrics( + self, self->peekReplyStream.get().getEndpoint().getPrimaryAddress(), fPeekReply) : Never())) { - updateCursorWithReply(self, res.rep); + updateCursorWithReply(self, res); TraceEvent("SPC_GetMoreB", self->randomID) .detail("Has", self->hasMessage()) - .detail("End", res.rep.end) - .detail("Popped", res.rep.popped.present() ? res.rep.popped.get() : 0); + .detail("End", res.end) + .detail("Popped", res.popped.present() ? res.popped.get() : 0); return Void(); } } @@ -388,19 +397,24 @@ ACTOR Future<Void> serverPeekGetMore(ILogSystem::ServerPeekCursor* self, TaskPri } Future<Void> ILogSystem::ServerPeekCursor::getMore(TaskPriority taskID) { - //TraceEvent("SPC_GetMore", randomID).detail("HasMessage", hasMessage()).detail("More", !more.isValid() || more.isReady()).detail("MessageVersion", messageVersion.toString()).detail("End", end.toString()); + TraceEvent("SPC_GetMore", randomID) + .detail("HasMessage", hasMessage()) + .detail("More", !more.isValid() || more.isReady()) + .detail("MessageVersion", messageVersion.toString()) + .detail("End", end.toString()); if (hasMessage() && !parallelGetMore) return Void(); if (!more.isValid() || more.isReady()) { - if (usePeekStream && taskID == TaskPriority::TLogPeekReply) { - more = serverPeekStreamGetMore(this, taskID); - } - // if (parallelGetMore || onlySpilled || futureResults.size()) { - // more = serverPeekParallelGetMore(this, taskID); - // } - else { - more = serverPeekGetMore(this, taskID); - } + more = serverPeekStreamGetMore(this, taskID); +// if (usePeekStream && taskID == TaskPriority::TLogPeekReply) { +// more = serverPeekStreamGetMore(this, taskID); +// } +// if (parallelGetMore || onlySpilled || futureResults.size()) { +// more = serverPeekParallelGetMore(this, taskID); +// } +// else { +// more = serverPeekGetMore(this, taskID); +// } } return more; } diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index a46d22d856..9ad416986e 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -557,6 +557,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> { TLogData* tLogData; Promise<Void> recoveryComplete, committingQueue; Version unrecoveredBefore, recoveredAt; + int activePeekStreams = 0; struct PeekTrackerData { std::map<int, Promise<std::pair<Version, bool>>> @@ -668,6 +669,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> { specialCounter(cc, "PeekMemoryReserved", [tLogData]() { return tLogData->peekMemoryLimiter.activePermits(); }); specialCounter(cc, "PeekMemoryRequestsStalled", [tLogData]() { return tLogData->peekMemoryLimiter.waiters(); }); specialCounter(cc, "Generation", [this]() { return this->recoveryCount; }); + specialCounter(cc, "ActivePeekStreams", [this]() { return this->activePeekStreams; }); } ~LogData() { @@ -1167,17 +1169,19 @@ ACTOR Future<Void> tLogPopCore(TLogData* self, Tag inputTag, Version to, Referen } uint64_t PoppedVersionLag = logData->persistentDataDurableVersion - logData->queuePoppedVersion; - if ( SERVER_KNOBS->ENABLE_DETAILED_TLOG_POP_TRACE && - (logData->queuePoppedVersion > 0) && //avoid generating massive events at beginning - (tagData->unpoppedRecovered || PoppedVersionLag >= SERVER_KNOBS->TLOG_POPPED_VER_LAG_THRESHOLD_FOR_TLOGPOP_TRACE)) { //when recovery or long lag + if (SERVER_KNOBS->ENABLE_DETAILED_TLOG_POP_TRACE && + (logData->queuePoppedVersion > 0) && // avoid generating massive events at beginning + (tagData->unpoppedRecovered || + PoppedVersionLag >= + SERVER_KNOBS->TLOG_POPPED_VER_LAG_THRESHOLD_FOR_TLOGPOP_TRACE)) { // when recovery or long lag TraceEvent("TLogPopDetails", logData->logId) - .detail("Tag", tagData->tag.toString()) - .detail("UpTo", upTo) - .detail("PoppedVersionLag", PoppedVersionLag) - .detail("MinPoppedTag", logData->minPoppedTag.toString()) - .detail("QueuePoppedVersion", logData->queuePoppedVersion) - .detail("UnpoppedRecovered", tagData->unpoppedRecovered ? "True" : "False") - .detail("NothingPersistent", tagData->nothingPersistent ? "True" : "False"); + .detail("Tag", tagData->tag.toString()) + .detail("UpTo", upTo) + .detail("PoppedVersionLag", PoppedVersionLag) + .detail("MinPoppedTag", logData->minPoppedTag.toString()) + .detail("QueuePoppedVersion", logData->queuePoppedVersion) + .detail("UnpoppedRecovered", tagData->unpoppedRecovered ? "True" : "False") + .detail("NothingPersistent", tagData->nothingPersistent ? "True" : "False"); } if (upTo > logData->persistentDataDurableVersion) wait(tagData->eraseMessagesBefore(upTo, self, logData, TaskPriority::TLogPop)); @@ -1915,6 +1919,7 @@ ACTOR Future<TLogPeekReply> peekTLog(TLogData* self, // This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Reference<LogData> logData) { + logData->activePeekStreams ++; state Version begin = req.begin; state bool onlySpilled = false; if (req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) { @@ -1929,18 +1934,17 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref req.reply.send(reply); begin = reply.rep.end; onlySpilled = reply.rep.onlySpilled; - - wait(delay(.05, g_network->getCurrentTask())); + wait(delay(0, g_network->getCurrentTask())); } catch (Error& e) { + logData->activePeekStreams --; if (e.code() == error_code_end_of_stream) { req.reply.sendError(e); return Void(); - } - else if (e.code() == error_code_operation_obsolete) { + } else if (e.code() == error_code_operation_obsolete) { // reply stream is cancelled on the client - return Void(); + return Void(); } else { - throw; + throw; } } }