solve mis-spelling, trace log and format problems

This commit is contained in:
Xiaoxi Wang 2021-08-11 18:26:00 -07:00
parent 1f6cee89ab
commit a97570bd06
10 changed files with 25 additions and 16 deletions

View File

@ -64,7 +64,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( TLOG_MESSAGE_BLOCK_BYTES, 10e6 );
init( TLOG_MESSAGE_BLOCK_OVERHEAD_FACTOR, double(TLOG_MESSAGE_BLOCK_BYTES) / (TLOG_MESSAGE_BLOCK_BYTES - MAX_MESSAGE_SIZE) ); //1.0121466709838096006362758832473
init( PEEK_TRACKER_EXPIRATION_TIME, 600 ); if( randomize && BUGGIFY ) PEEK_TRACKER_EXPIRATION_TIME = deterministicRandom()->coinflip() ? 0.1 : 120;
init( PEEK_USEING_STREAMING, true );
init( PEEK_USING_STREAMING, true );
init( PARALLEL_GET_MORE_REQUESTS, 32 ); if( randomize && BUGGIFY ) PARALLEL_GET_MORE_REQUESTS = 2;
init( MULTI_CURSOR_PRE_FETCH_LIMIT, 10 );
init( MAX_QUEUE_COMMIT_BYTES, 15e6 ); if( randomize && BUGGIFY ) MAX_QUEUE_COMMIT_BYTES = 5000;

View File

@ -41,7 +41,7 @@ public:
// often, so that versions always advance smoothly
// TLogs
bool PEEK_USEING_STREAMING;
bool PEEK_USING_STREAMING;
double TLOG_TIMEOUT; // tlog OR commit proxy failure - master's reaction time
double TLOG_SLOW_REJOIN_WARN_TIMEOUT_SECS; // Warns if a tlog takes too long to rejoin
double RECOVERY_TLOG_SMART_QUORUM_DELAY; // smaller might be better for bug amplification

View File

@ -573,7 +573,6 @@ Future<Void> logRouterPeekMessages(PromiseType replyPromise,
return Void();
}
// TODO: enable streaming peek log from log router
// This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover
ACTOR Future<Void> logRouterPeekStream(LogRouterData* self, TLogPeekStreamRequest req) {
self->activePeekStreams++;
@ -600,7 +599,9 @@ ACTOR Future<Void> logRouterPeekStream(LogRouterData* self, TLogPeekStreamReques
}
} catch (Error& e) {
self->activePeekStreams--;
TraceEvent(SevDebug, "TLogPeekStreamEnd", self->dbgid).error(e, true);
TraceEvent(SevDebug, "TLogPeekStreamEnd", self->dbgid)
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress())
.error(e, true);
if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);

View File

@ -38,7 +38,7 @@ ACTOR Future<Void> tryEstablishPeekStream(ILogSystem::ServerPeekCursor* self) {
self->peekReplyStream = self->interf->get().interf().peekStreamMessages.getReplyStream(TLogPeekStreamRequest(
self->messageVersion.version, self->tag, self->returnIfBlocked, std::numeric_limits<int>::max()));
TraceEvent(SevDebug, "SPC_StreamCreated", self->randomID)
.detail("PeerAddress", self->interf->get().interf().peekStreamMessages.getEndpoint().getPrimaryAddress())
.detail("PeerAddr", self->interf->get().interf().peekStreamMessages.getEndpoint().getPrimaryAddress())
.detail("PeerToken", self->interf->get().interf().peekStreamMessages.getEndpoint().token);
return Void();
}
@ -52,11 +52,11 @@ ILogSystem::ServerPeekCursor::ServerPeekCursor(Reference<AsyncVar<OptionalInterf
: interf(interf), tag(tag), rd(results.arena, results.messages, Unversioned()), messageVersion(begin), end(end),
poppedVersion(0), hasMsg(false), randomID(deterministicRandom()->randomUniqueID()),
returnIfBlocked(returnIfBlocked), onlySpilled(false), parallelGetMore(parallelGetMore),
usePeekStream(SERVER_KNOBS->PEEK_USEING_STREAMING), sequence(0), lastReset(0), resetCheck(Void()), slowReplies(0),
usePeekStream(SERVER_KNOBS->PEEK_USING_STREAMING), sequence(0), lastReset(0), resetCheck(Void()), slowReplies(0),
fastReplies(0), unknownReplies(0) {
this->results.maxKnownVersion = 0;
this->results.minKnownCommittedVersion = 0;
TraceEvent(SevDebug, "SPC_Starting", randomID)
DisabledTraceEvent(SevDebug, "SPC_Starting", randomID)
.detail("Tag", tag.toString())
.detail("Begin", begin)
.detail("End", end);
@ -355,7 +355,7 @@ ACTOR Future<Void> serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, T
}
updateCursorWithReply(self, res);
expectedBegin = res.end;
TraceEvent(SevDebug, "SPC_GetMoreB", self->randomID)
DisabledTraceEvent(SevDebug, "SPC_GetMoreB", self->randomID)
.detail("Has", self->hasMessage())
.detail("End", res.end)
.detail("Popped", res.popped.present() ? res.popped.get() : 0);
@ -367,7 +367,7 @@ ACTOR Future<Void> serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, T
}
}
} catch (Error& e) {
TraceEvent(SevDebug, "SPC_GetMoreB_Error").error(e, true);
TraceEvent(SevDebug, "SPC_GetMoreB_Error", self->randomID).error(e, true);
if (e.code() == error_code_connection_failed || e.code() == error_code_operation_obsolete) {
// NOTE: delay in order to avoid the endless retry loop block other tasks
self->peekReplyStream.reset();
@ -424,7 +424,6 @@ Future<Void> ILogSystem::ServerPeekCursor::getMore(TaskPriority taskID) {
if (hasMessage() && !parallelGetMore)
return Void();
if (!more.isValid() || more.isReady()) {
// TODO: add tagLocalityRemoteLog when log router support streaming peek
if (usePeekStream && (tag.locality >= 0 || tag.locality == tagLocalityLogRouter || tag.locality == tagLocalityRemoteLog)) {
more = serverPeekStreamGetMore(this, taskID);
} else if (parallelGetMore || onlySpilled || futureResults.size()) {

View File

@ -1162,7 +1162,9 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
}
} catch (Error& e) {
self->activePeekStreams--;
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true);
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId)
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress())
.error(e, true);
if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);

View File

@ -1474,7 +1474,9 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
}
} catch (Error& e) {
self->activePeekStreams--;
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true);
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId)
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress())
.error(e, true);
if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);

View File

@ -1903,7 +1903,9 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
}
} catch (Error& e) {
self->activePeekStreams--;
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true);
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId)
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress())
.error(e, true);
if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);

View File

@ -1953,7 +1953,9 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
}
} catch (Error& e) {
self->activePeekStreams--;
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId).error(e, true);
TraceEvent(SevDebug, "TLogPeekStreamEnd", logData->logId)
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress())
.error(e, true);
if (e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
req.reply.sendError(e);

View File

@ -599,4 +599,5 @@ extern TraceBatch g_traceBatch;
#define DUMPTOKEN(name) \
TraceEvent("DumpToken", recruited.id()).detail("Name", #name).detail("Token", name.getEndpoint().token)
#define DisabledTraceEvent(...) false && TraceEvent()
#endif