diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 290452012a..1e90c5a38b 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -64,7 +64,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( TLOG_MESSAGE_BLOCK_BYTES, 10e6 ); init( TLOG_MESSAGE_BLOCK_OVERHEAD_FACTOR, double(TLOG_MESSAGE_BLOCK_BYTES) / (TLOG_MESSAGE_BLOCK_BYTES - MAX_MESSAGE_SIZE) ); //1.0121466709838096006362758832473 init( PEEK_TRACKER_EXPIRATION_TIME, 600 ); if( randomize && BUGGIFY ) PEEK_TRACKER_EXPIRATION_TIME = deterministicRandom()->coinflip() ? 0.1 : 120; - init( PEEK_USEING_STREAMING, true ); + init( PEEK_USING_STREAMING, true ); init( PARALLEL_GET_MORE_REQUESTS, 32 ); if( randomize && BUGGIFY ) PARALLEL_GET_MORE_REQUESTS = 2; init( MULTI_CURSOR_PRE_FETCH_LIMIT, 10 ); init( MAX_QUEUE_COMMIT_BYTES, 15e6 ); if( randomize && BUGGIFY ) MAX_QUEUE_COMMIT_BYTES = 5000; diff --git a/fdbclient/ServerKnobs.h b/fdbclient/ServerKnobs.h index 23b3049668..c905720898 100644 --- a/fdbclient/ServerKnobs.h +++ b/fdbclient/ServerKnobs.h @@ -41,7 +41,7 @@ public: // often, so that versions always advance smoothly // TLogs - bool PEEK_USEING_STREAMING; + bool PEEK_USING_STREAMING; double TLOG_TIMEOUT; // tlog OR commit proxy failure - master's reaction time double TLOG_SLOW_REJOIN_WARN_TIMEOUT_SECS; // Warns if a tlog takes too long to rejoin double RECOVERY_TLOG_SMART_QUORUM_DELAY; // smaller might be better for bug amplification diff --git a/fdbserver/LogRouter.actor.cpp b/fdbserver/LogRouter.actor.cpp index f26ea539af..22348fe8b3 100644 --- a/fdbserver/LogRouter.actor.cpp +++ b/fdbserver/LogRouter.actor.cpp @@ -573,7 +573,6 @@ Future logRouterPeekMessages(PromiseType replyPromise, return Void(); } -// TODO: enable streaming peek log from log router // This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover ACTOR Future logRouterPeekStream(LogRouterData* self, TLogPeekStreamRequest req) { self->activePeekStreams++; @@ -600,7 +599,9 @@ ACTOR Future logRouterPeekStream(LogRouterData* self, TLogPeekStreamReques } } catch (Error& e) { self->activePeekStreams--; - TraceEvent(SevDebug, "TLogPeekStreamEnd", self->dbgid).error(e, true); + TraceEvent(SevDebug, "TLogPeekStreamEnd", self->dbgid) + .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()) + .error(e, true); if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { req.reply.sendError(e); diff --git a/fdbserver/LogSystemPeekCursor.actor.cpp b/fdbserver/LogSystemPeekCursor.actor.cpp index 769fd2ee57..4ab5aaf56f 100644 --- a/fdbserver/LogSystemPeekCursor.actor.cpp +++ b/fdbserver/LogSystemPeekCursor.actor.cpp @@ -38,7 +38,7 @@ ACTOR Future tryEstablishPeekStream(ILogSystem::ServerPeekCursor* self) { self->peekReplyStream = self->interf->get().interf().peekStreamMessages.getReplyStream(TLogPeekStreamRequest( self->messageVersion.version, self->tag, self->returnIfBlocked, std::numeric_limits::max())); TraceEvent(SevDebug, "SPC_StreamCreated", self->randomID) - .detail("PeerAddress", self->interf->get().interf().peekStreamMessages.getEndpoint().getPrimaryAddress()) + .detail("PeerAddr", self->interf->get().interf().peekStreamMessages.getEndpoint().getPrimaryAddress()) .detail("PeerToken", self->interf->get().interf().peekStreamMessages.getEndpoint().token); return Void(); } @@ -52,11 +52,11 @@ ILogSystem::ServerPeekCursor::ServerPeekCursor(ReferencerandomUniqueID()), returnIfBlocked(returnIfBlocked), onlySpilled(false), parallelGetMore(parallelGetMore), - usePeekStream(SERVER_KNOBS->PEEK_USEING_STREAMING), sequence(0), lastReset(0), resetCheck(Void()), slowReplies(0), + usePeekStream(SERVER_KNOBS->PEEK_USING_STREAMING), sequence(0), lastReset(0), resetCheck(Void()), slowReplies(0), fastReplies(0), unknownReplies(0) { this->results.maxKnownVersion = 0; this->results.minKnownCommittedVersion = 0; - TraceEvent(SevDebug, "SPC_Starting", randomID) + DisabledTraceEvent(SevDebug, "SPC_Starting", randomID) .detail("Tag", tag.toString()) .detail("Begin", begin) .detail("End", end); @@ -355,7 +355,7 @@ ACTOR Future serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, T } updateCursorWithReply(self, res); expectedBegin = res.end; - TraceEvent(SevDebug, "SPC_GetMoreB", self->randomID) + DisabledTraceEvent(SevDebug, "SPC_GetMoreB", self->randomID) .detail("Has", self->hasMessage()) .detail("End", res.end) .detail("Popped", res.popped.present() ? res.popped.get() : 0); @@ -367,7 +367,7 @@ ACTOR Future serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, T } } } catch (Error& e) { - TraceEvent(SevDebug, "SPC_GetMoreB_Error").error(e, true); + TraceEvent(SevDebug, "SPC_GetMoreB_Error", self->randomID).error(e, true); if (e.code() == error_code_connection_failed || e.code() == error_code_operation_obsolete) { // NOTE: delay in order to avoid the endless retry loop block other tasks self->peekReplyStream.reset(); @@ -424,7 +424,6 @@ Future ILogSystem::ServerPeekCursor::getMore(TaskPriority taskID) { if (hasMessage() && !parallelGetMore) return Void(); if (!more.isValid() || more.isReady()) { - // TODO: add tagLocalityRemoteLog when log router support streaming peek if (usePeekStream && (tag.locality >= 0 || tag.locality == tagLocalityLogRouter || tag.locality == tagLocalityRemoteLog)) { more = serverPeekStreamGetMore(this, taskID); } else if (parallelGetMore || onlySpilled || futureResults.size()) { diff --git a/fdbserver/OldTLogServer_4_6.actor.cpp b/fdbserver/OldTLogServer_4_6.actor.cpp index f50744cc1a..7cb4c565bf 100644 --- a/fdbserver/OldTLogServer_4_6.actor.cpp +++ b/fdbserver/OldTLogServer_4_6.actor.cpp @@ -1162,7 +1162,9 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref } } catch (Error& e) { self->activePeekStreams--; - TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true); + TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId) + .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()) + .error(e, true); if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { req.reply.sendError(e); diff --git a/fdbserver/OldTLogServer_6_0.actor.cpp b/fdbserver/OldTLogServer_6_0.actor.cpp index 5d77b4e12f..cc561d1f3d 100644 --- a/fdbserver/OldTLogServer_6_0.actor.cpp +++ b/fdbserver/OldTLogServer_6_0.actor.cpp @@ -1474,7 +1474,9 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref } } catch (Error& e) { self->activePeekStreams--; - TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true); + TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId) + .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()) + .error(e, true); if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { req.reply.sendError(e); diff --git a/fdbserver/OldTLogServer_6_2.actor.cpp b/fdbserver/OldTLogServer_6_2.actor.cpp index 687ea0e638..284afb1da3 100644 --- a/fdbserver/OldTLogServer_6_2.actor.cpp +++ b/fdbserver/OldTLogServer_6_2.actor.cpp @@ -1903,7 +1903,9 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref } } catch (Error& e) { self->activePeekStreams--; - TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true); + TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId) + .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()) + .error(e, true); if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { req.reply.sendError(e); diff --git a/fdbserver/TLogInterface.h b/fdbserver/TLogInterface.h index 322c7ddb35..1ca049927d 100644 --- a/fdbserver/TLogInterface.h +++ b/fdbserver/TLogInterface.h @@ -83,7 +83,7 @@ struct TLogInterface { streams.push_back(disablePopRequest.getReceiver()); streams.push_back(enablePopRequest.getReceiver()); streams.push_back(snapRequest.getReceiver()); - streams.push_back(peekStreamMessages.getReceiver(TaskPriority::TLogPeek)); + streams.push_back(peekStreamMessages.getReceiver(TaskPriority::TLogPeek)); FlowTransport::transport().addEndpoints(streams); } @@ -235,7 +235,7 @@ struct TLogPeekStreamRequest { Arena arena; Version begin; Tag tag; - bool returnIfBlocked; + bool returnIfBlocked; int limitBytes; ReplyPromiseStream reply; diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index f7853e23b2..6608468860 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1953,7 +1953,9 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref } } catch (Error& e) { self->activePeekStreams--; - TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true); + TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId) + .detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress()) + .error(e, true); if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) { req.reply.sendError(e); diff --git a/flow/Trace.h b/flow/Trace.h index 467422b9cc..aeaabb4373 100644 --- a/flow/Trace.h +++ b/flow/Trace.h @@ -599,4 +599,5 @@ extern TraceBatch g_traceBatch; #define DUMPTOKEN(name) \ TraceEvent("DumpToken", recruited.id()).detail("Name", #name).detail("Token", name.getEndpoint().token) +#define DisabledTraceEvent(...) false && TraceEvent() #endif