From ae2268f9f2bc1d0c89c1a65e54f336e1f000f114 Mon Sep 17 00:00:00 2001 From: Xiaoxi Wang Date: Sat, 31 Jul 2021 22:07:43 -0700 Subject: [PATCH] 200k simulation: check stream sequence; delay in GetMore loop --- fdbserver/LogSystemPeekCursor.actor.cpp | 12 ++++++++++-- fdbserver/OldTLogServer_4_6.actor.cpp | 1 + fdbserver/OldTLogServer_6_0.actor.cpp | 1 + fdbserver/OldTLogServer_6_2.actor.cpp | 1 + fdbserver/TLogServer.actor.cpp | 1 + 5 files changed, 14 insertions(+), 2 deletions(-) diff --git a/fdbserver/LogSystemPeekCursor.actor.cpp b/fdbserver/LogSystemPeekCursor.actor.cpp index c210f89f29..f719c7c83a 100644 --- a/fdbserver/LogSystemPeekCursor.actor.cpp +++ b/fdbserver/LogSystemPeekCursor.actor.cpp @@ -333,6 +333,7 @@ ACTOR Future serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, T loop { try { + state Version expectedBegin = self->messageVersion.version; state Future fPeekReply = self->peekReplyStream.present() ? map(waitAndForward(self->peekReplyStream.get().getFuture()), [](const TLogPeekStreamReply& r) { return r.rep; }) @@ -350,7 +351,11 @@ ACTOR Future serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, T self->interf->get().interf().peekStreamMessages.getEndpoint().getPrimaryAddress(), fPeekReply) : Never())) { + if (res.begin.get() != expectedBegin) { + throw operation_obsolete(); + } updateCursorWithReply(self, res); + expectedBegin = res.end; TraceEvent("SPC_GetMoreB", self->randomID) .detail("Has", self->hasMessage()) .detail("End", res.end) @@ -364,8 +369,11 @@ ACTOR Future serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, T } } catch (Error& e) { TraceEvent(SevDebug, "SPC_GetMoreB_Error", self->randomID).detail("Error", e.what()); + + self->peekReplyStream.reset(); if (e.code() == error_code_connection_failed || e.code() == error_code_operation_obsolete) { - self->peekReplyStream.reset(); + // NOTE: delay in order to avoid the endless retry loop block other tasks + wait(delay(0)); } else if (e.code() == error_code_end_of_stream) { self->end.reset(self->messageVersion.version); return Void(); @@ -418,7 +426,7 @@ Future ILogSystem::ServerPeekCursor::getMore(TaskPriority taskID) { return Void(); if (!more.isValid() || more.isReady()) { // TODO: remove locality check when log router support streaming peek - if (false && usePeekStream && tag.locality >= 0) { + if (usePeekStream && tag.locality >= 0) { more = serverPeekStreamGetMore(this, taskID); } else if (parallelGetMore || onlySpilled || futureResults.size()) { more = serverPeekParallelGetMore(this, taskID); diff --git a/fdbserver/OldTLogServer_4_6.actor.cpp b/fdbserver/OldTLogServer_4_6.actor.cpp index 35d142b9f7..ea0f6ba22e 100644 --- a/fdbserver/OldTLogServer_4_6.actor.cpp +++ b/fdbserver/OldTLogServer_4_6.actor.cpp @@ -1157,6 +1157,7 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref } reply.rep = future.get(); + reply.rep.begin = begin; req.reply.send(reply); begin = reply.rep.end; onlySpilled = reply.rep.onlySpilled; diff --git a/fdbserver/OldTLogServer_6_0.actor.cpp b/fdbserver/OldTLogServer_6_0.actor.cpp index 5c27581b2c..fa08f0e3f3 100644 --- a/fdbserver/OldTLogServer_6_0.actor.cpp +++ b/fdbserver/OldTLogServer_6_0.actor.cpp @@ -1469,6 +1469,7 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref } reply.rep = future.get(); + reply.rep.begin = begin; req.reply.send(reply); begin = reply.rep.end; onlySpilled = reply.rep.onlySpilled; diff --git a/fdbserver/OldTLogServer_6_2.actor.cpp b/fdbserver/OldTLogServer_6_2.actor.cpp index 81025d93de..181960cec9 100644 --- a/fdbserver/OldTLogServer_6_2.actor.cpp +++ b/fdbserver/OldTLogServer_6_2.actor.cpp @@ -1898,6 +1898,7 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref } reply.rep = future.get(); + reply.rep.begin = begin; req.reply.send(reply); begin = reply.rep.end; onlySpilled = reply.rep.onlySpilled; diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index aa2afd5406..746064ce91 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1951,6 +1951,7 @@ ACTOR Future tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref } reply.rep = future.get(); + reply.rep.begin = begin; req.reply.send(reply); begin = reply.rep.end; onlySpilled = reply.rep.onlySpilled;