add returnIfBlocked in stream request

This commit is contained in:
Xiaoxi Wang 2021-07-08 19:32:58 +00:00
parent 15347773d9
commit 5a43a8c367
3 changed files with 25 additions and 28 deletions

View File

@ -33,8 +33,8 @@ void tryEstablishPeekStream(ILogSystem::ServerPeekCursor* self) {
self->peekReplyStream.reset();
return;
}
self->peekReplyStream = self->interf->get().interf().peekStreamMessages.getReplyStream(
TLogPeekStreamRequest(self->messageVersion.version, self->tag, std::numeric_limits<int>::max()));
self->peekReplyStream = self->interf->get().interf().peekStreamMessages.getReplyStream(TLogPeekStreamRequest(
self->messageVersion.version, self->tag, self->returnIfBlocked, std::numeric_limits<int>::max()));
}
ILogSystem::ServerPeekCursor::ServerPeekCursor(Reference<AsyncVar<OptionalInterface<TLogInterface>>> const& interf,
@ -339,12 +339,10 @@ ACTOR Future<Void> serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, T
} catch (Error& e) {
if (e.code() == error_code_connection_failed) {
self->peekReplyStream.reset();
}
else if(e.code() == error_code_end_of_stream) {
} else if (e.code() == error_code_end_of_stream) {
self->end.reset(self->messageVersion.version);
return Void();
}
else {
} else {
throw;
}
}

View File

@ -235,16 +235,17 @@ struct TLogPeekStreamRequest {
Arena arena;
Version begin;
Tag tag;
bool returnIfBlocked;
int limitBytes;
ReplyPromiseStream<TLogPeekStreamReply> reply;
TLogPeekStreamRequest() {}
TLogPeekStreamRequest(Version version, Tag tag, int limitBytes)
: begin(version), tag(tag), limitBytes(limitBytes) {}
TLogPeekStreamRequest(Version version, Tag tag, bool returnIfBlocked, int limitBytes)
: begin(version), tag(tag), returnIfBlocked(returnIfBlocked), limitBytes(limitBytes) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, arena, begin, tag, limitBytes, reply);
serializer(ar, arena, begin, tag, returnIfBlocked, limitBytes, reply);
}
};

View File

@ -1570,14 +1570,13 @@ ACTOR Future<std::vector<StringRef>> parseMessagesForTag(StringRef commitBlob, T
}
// Common logics to peek TLog and create TLogPeekReply that serves both streaming peek or normal peek request
ACTOR Future<TLogPeekReply> peekTLog(
TLogData* self,
Reference<LogData> logData,
Version begin,
Tag tag,
bool returnIfBlocked = false,
bool reqOnlySpilled = false,
Optional<std::pair<UID, int>> sequence = Optional<std::pair<UID, int>>()) {
ACTOR Future<TLogPeekReply> peekTLog(TLogData* self,
Reference<LogData> logData,
Version begin,
Tag tag,
bool returnIfBlocked = false,
bool reqOnlySpilled = false,
Optional<std::pair<UID, int>> sequence = Optional<std::pair<UID, int>>()) {
state BinaryWriter messages(Unversioned());
state BinaryWriter messages2(Unversioned());
state int sequenceNum = -1;
@ -1680,7 +1679,8 @@ ACTOR Future<TLogPeekReply> peekTLog(
rep.end = poppedVer;
rep.onlySpilled = false;
// TODO: once the fake stream is replaced by ReplyPromiseStream, we can remove the code handling sequence requests.
// TODO: once the fake stream is replaced by ReplyPromiseStream, we can remove the code handling sequence
// requests.
if (sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequenceNum + 1];
@ -1911,31 +1911,29 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
loop {
state TLogPeekStreamReply reply;
try {
wait(req.reply.onReady() && store(reply.rep, peekTLog(self, logData, begin, req.tag, false, onlySpilled)));
wait(req.reply.onReady() &&
store(reply.rep, peekTLog(self, logData, begin, req.tag, req.returnIfBlocked, onlySpilled)));
req.reply.send(reply);
begin = reply.rep.end;
onlySpilled = reply.rep.onlySpilled;
wait(delay(0.005, TaskPriority::TLogPeekReply));
// return Void();
} catch (Error& e) {
if(e.code() == error_code_end_of_stream || e.code() == error_code_operation_obsolete) {
if (e.code() == error_code_end_of_stream) {
req.reply.sendError(e);
return Void();
} else {
throw;
}
throw;
}
}
}
ACTOR Future<Void> tLogPeekMessages(TLogData* self, TLogPeekRequest req, Reference<LogData> logData) {
state BinaryWriter messages(Unversioned());
state BinaryWriter messages2(Unversioned());
state int sequence = -1;
state UID peekId;
state double queueStart = now();
try {
TLogPeekReply reply = wait(peekTLog(self, logData, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, req.sequence));
TLogPeekReply reply =
wait(peekTLog(self, logData, req.begin, req.tag, req.returnIfBlocked, req.onlySpilled, req.sequence));
req.reply.send(reply);
} catch (Error& e) {
if (e.code() == error_code_timed_out || e.code() == error_code_operation_obsolete ||