200k simulation: check stream sequence; delay in GetMore loop

This commit is contained in:
Xiaoxi Wang 2021-07-31 22:07:43 -07:00
parent 2a88033800
commit ae2268f9f2
5 changed files with 14 additions and 2 deletions

View File

@ -333,6 +333,7 @@ ACTOR Future<Void> serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, T
loop {
try {
state Version expectedBegin = self->messageVersion.version;
state Future<TLogPeekReply> fPeekReply = self->peekReplyStream.present()
? map(waitAndForward(self->peekReplyStream.get().getFuture()),
[](const TLogPeekStreamReply& r) { return r.rep; })
@ -350,7 +351,11 @@ ACTOR Future<Void> serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, T
self->interf->get().interf().peekStreamMessages.getEndpoint().getPrimaryAddress(),
fPeekReply)
: Never())) {
if (res.begin.get() != expectedBegin) {
throw operation_obsolete();
}
updateCursorWithReply(self, res);
expectedBegin = res.end;
TraceEvent("SPC_GetMoreB", self->randomID)
.detail("Has", self->hasMessage())
.detail("End", res.end)
@ -364,8 +369,11 @@ ACTOR Future<Void> serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, T
}
} catch (Error& e) {
TraceEvent(SevDebug, "SPC_GetMoreB_Error", self->randomID).detail("Error", e.what());
self->peekReplyStream.reset();
if (e.code() == error_code_connection_failed || e.code() == error_code_operation_obsolete) {
self->peekReplyStream.reset();
// NOTE: delay in order to avoid the endless retry loop block other tasks
wait(delay(0));
} else if (e.code() == error_code_end_of_stream) {
self->end.reset(self->messageVersion.version);
return Void();
@ -418,7 +426,7 @@ Future<Void> ILogSystem::ServerPeekCursor::getMore(TaskPriority taskID) {
return Void();
if (!more.isValid() || more.isReady()) {
// TODO: remove locality check when log router support streaming peek
if (false && usePeekStream && tag.locality >= 0) {
if (usePeekStream && tag.locality >= 0) {
more = serverPeekStreamGetMore(this, taskID);
} else if (parallelGetMore || onlySpilled || futureResults.size()) {
more = serverPeekParallelGetMore(this, taskID);

View File

@ -1157,6 +1157,7 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
}
reply.rep = future.get();
reply.rep.begin = begin;
req.reply.send(reply);
begin = reply.rep.end;
onlySpilled = reply.rep.onlySpilled;

View File

@ -1469,6 +1469,7 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
}
reply.rep = future.get();
reply.rep.begin = begin;
req.reply.send(reply);
begin = reply.rep.end;
onlySpilled = reply.rep.onlySpilled;

View File

@ -1898,6 +1898,7 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
}
reply.rep = future.get();
reply.rep.begin = begin;
req.reply.send(reply);
begin = reply.rep.end;
onlySpilled = reply.rep.onlySpilled;

View File

@ -1951,6 +1951,7 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
}
reply.rep = future.get();
reply.rep.begin = begin;
req.reply.send(reply);
begin = reply.rep.end;
onlySpilled = reply.rep.onlySpilled;