1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-05-21 05:53:02 +08:00

trace log and reset changes; byteAcknownledge overflow

This commit is contained in:
Xiaoxi Wang 2021-07-15 21:30:14 +00:00
parent 1584ed5853
commit 227570357a
3 changed files with 82 additions and 50 deletions

@ -277,9 +277,9 @@ struct AcknowledgementReceiver final : FlowReceiver, FastAllocated<Acknowledgeme
using FastAllocated<AcknowledgementReceiver>::operator new; using FastAllocated<AcknowledgementReceiver>::operator new;
using FastAllocated<AcknowledgementReceiver>::operator delete; using FastAllocated<AcknowledgementReceiver>::operator delete;
int64_t bytesSent; uint64_t bytesSent;
int64_t bytesAcknowledged; uint64_t bytesAcknowledged;
int64_t bytesLimit; uint64_t bytesLimit;
Promise<Void> ready; Promise<Void> ready;
Future<Void> failures; Future<Void> failures;
@ -358,11 +358,19 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
// send an ack immediately // send an ack immediately
if (acknowledgements.getRawEndpoint().isValid()) { if (acknowledgements.getRawEndpoint().isValid()) {
acknowledgements.bytesAcknowledged += message.get().asUnderlyingType().expectedSize(); acknowledgements.bytesAcknowledged += message.get().asUnderlyingType().expectedSize();
FlowTransport::transport().sendUnreliable( // int64_t overflow: we need to reset this stream
SerializeSource<ErrorOr<AcknowledgementReply>>( if (acknowledgements.bytesAcknowledged > std::numeric_limits<int64_t>::max()) {
AcknowledgementReply(acknowledgements.bytesAcknowledged)), FlowTransport::transport().sendUnreliable(
acknowledgements.getEndpoint(TaskPriority::ReadSocket), SerializeSource<ErrorOr<AcknowledgementReply>>(operation_obsolete()),
false); acknowledgements.getEndpoint(TaskPriority::ReadSocket),
false);
} else {
FlowTransport::transport().sendUnreliable(
SerializeSource<ErrorOr<AcknowledgementReply>>(
AcknowledgementReply(acknowledgements.bytesAcknowledged)),
acknowledgements.getEndpoint(TaskPriority::ReadSocket),
false);
}
} }
} }
@ -376,10 +384,17 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
// A reply that has been queued up is being consumed, so send an ack to the server // A reply that has been queued up is being consumed, so send an ack to the server
if (acknowledgements.getRawEndpoint().isValid()) { if (acknowledgements.getRawEndpoint().isValid()) {
acknowledgements.bytesAcknowledged += res.expectedSize(); acknowledgements.bytesAcknowledged += res.expectedSize();
FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<AcknowledgementReply>>( if (acknowledgements.bytesAcknowledged > std::numeric_limits<int64_t>::max()) {
AcknowledgementReply(acknowledgements.bytesAcknowledged)), FlowTransport::transport().sendUnreliable(
acknowledgements.getEndpoint(TaskPriority::ReadSocket), SerializeSource<ErrorOr<AcknowledgementReply>>(operation_obsolete()),
false); acknowledgements.getEndpoint(TaskPriority::ReadSocket),
false);
} else {
FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<AcknowledgementReply>>(
AcknowledgementReply(acknowledgements.bytesAcknowledged)),
acknowledgements.getEndpoint(TaskPriority::ReadSocket),
false);
}
} }
return res; return res;
} }
@ -406,7 +421,6 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
template <class T> template <class T>
class ReplyPromiseStream { class ReplyPromiseStream {
public: public:
// stream.send( request ) // stream.send( request )
// Unreliable at most once delivery: Delivers request unless there is a connection failure (zero or one times) // Unreliable at most once delivery: Delivers request unless there is a connection failure (zero or one times)
@ -475,8 +489,8 @@ public:
errors->delPromiseRef(); errors->delPromiseRef();
} }
// The endpoints of a ReplyPromiseStream must be initialized at Task::ReadSocket, because with lower priorities a // The endpoints of a ReplyPromiseStream must be initialized at Task::ReadSocket, because with lower priorities a
// delay(0) in FlowTransport deliver can cause out of order delivery. // delay(0) in FlowTransport deliver can cause out of order delivery.
const Endpoint& getEndpoint() const { return queue->getEndpoint(TaskPriority::ReadSocket); } const Endpoint& getEndpoint() const { return queue->getEndpoint(TaskPriority::ReadSocket); }
bool operator==(const ReplyPromiseStream<T>& rhs) const { return queue == rhs.queue; } bool operator==(const ReplyPromiseStream<T>& rhs) const { return queue == rhs.queue; }

@ -35,7 +35,7 @@ void tryEstablishPeekStream(ILogSystem::ServerPeekCursor* self) {
} }
self->peekReplyStream = self->interf->get().interf().peekStreamMessages.getReplyStream(TLogPeekStreamRequest( self->peekReplyStream = self->interf->get().interf().peekStreamMessages.getReplyStream(TLogPeekStreamRequest(
self->messageVersion.version, self->tag, self->returnIfBlocked, std::numeric_limits<int>::max())); self->messageVersion.version, self->tag, self->returnIfBlocked, std::numeric_limits<int>::max()));
TraceEvent(SevDebug, "StreamCreated"); TraceEvent(SevDebug, "SPC_StreamCreated", self->randomID);
} }
ILogSystem::ServerPeekCursor::ServerPeekCursor(Reference<AsyncVar<OptionalInterface<TLogInterface>>> const& interf, ILogSystem::ServerPeekCursor::ServerPeekCursor(Reference<AsyncVar<OptionalInterface<TLogInterface>>> const& interf,
@ -51,7 +51,11 @@ ILogSystem::ServerPeekCursor::ServerPeekCursor(Reference<AsyncVar<OptionalInterf
resetCheck(Void()), usePeekStream(SERVER_KNOBS->PEEK_USEING_STREAMING) { resetCheck(Void()), usePeekStream(SERVER_KNOBS->PEEK_USEING_STREAMING) {
this->results.maxKnownVersion = 0; this->results.maxKnownVersion = 0;
this->results.minKnownCommittedVersion = 0; this->results.minKnownCommittedVersion = 0;
//TraceEvent("SPC_Starting", randomID).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).backtrace(); TraceEvent("SPC_Starting", randomID)
.detail("Tag", tag.toString())
.detail("Begin", begin)
.detail("End", end)
.backtrace();
} }
ILogSystem::ServerPeekCursor::ServerPeekCursor(TLogPeekReply const& results, ILogSystem::ServerPeekCursor::ServerPeekCursor(TLogPeekReply const& results,
@ -316,29 +320,34 @@ ACTOR Future<Void> serverPeekParallelGetMore(ILogSystem::ServerPeekCursor* self,
ACTOR Future<Void> serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, TaskPriority taskID) { ACTOR Future<Void> serverPeekStreamGetMore(ILogSystem::ServerPeekCursor* self, TaskPriority taskID) {
if (!self->interf || self->isExhausted()) { if (!self->interf || self->isExhausted()) {
self->peekReplyStream.reset();
if (self->hasMessage()) if (self->hasMessage())
return Void(); return Void();
return Never(); return Never();
} }
tryEstablishPeekStream(self);
loop { loop {
try { try {
tryEstablishPeekStream(self);
state Future<TLogPeekReply> fPeekReply = self->peekReplyStream.present()
? map(waitAndForward(self->peekReplyStream.get().getFuture()),
[](const TLogPeekStreamReply& r) { return r.rep; })
: Never();
choose { choose {
when(wait(self->interf->onChange())) { when(wait(self->interf->onChange())) {
self->onlySpilled = false; self->onlySpilled = false;
self->peekReplyStream.reset(); self->peekReplyStream.reset();
tryEstablishPeekStream(self);
} }
when(TLogPeekStreamReply res = when(TLogPeekReply res =
wait(self->peekReplyStream.present() wait(self->peekReplyStream.present()
? brokenPromiseToNever(waitAndForward(self->peekReplyStream.get().getFuture())) ? recordRequestMetrics(
self, self->peekReplyStream.get().getEndpoint().getPrimaryAddress(), fPeekReply)
: Never())) { : Never())) {
updateCursorWithReply(self, res.rep); updateCursorWithReply(self, res);
TraceEvent("SPC_GetMoreB", self->randomID) TraceEvent("SPC_GetMoreB", self->randomID)
.detail("Has", self->hasMessage()) .detail("Has", self->hasMessage())
.detail("End", res.rep.end) .detail("End", res.end)
.detail("Popped", res.rep.popped.present() ? res.rep.popped.get() : 0); .detail("Popped", res.popped.present() ? res.popped.get() : 0);
return Void(); return Void();
} }
} }
@ -388,19 +397,24 @@ ACTOR Future<Void> serverPeekGetMore(ILogSystem::ServerPeekCursor* self, TaskPri
} }
Future<Void> ILogSystem::ServerPeekCursor::getMore(TaskPriority taskID) { Future<Void> ILogSystem::ServerPeekCursor::getMore(TaskPriority taskID) {
//TraceEvent("SPC_GetMore", randomID).detail("HasMessage", hasMessage()).detail("More", !more.isValid() || more.isReady()).detail("MessageVersion", messageVersion.toString()).detail("End", end.toString()); TraceEvent("SPC_GetMore", randomID)
.detail("HasMessage", hasMessage())
.detail("More", !more.isValid() || more.isReady())
.detail("MessageVersion", messageVersion.toString())
.detail("End", end.toString());
if (hasMessage() && !parallelGetMore) if (hasMessage() && !parallelGetMore)
return Void(); return Void();
if (!more.isValid() || more.isReady()) { if (!more.isValid() || more.isReady()) {
if (usePeekStream && taskID == TaskPriority::TLogPeekReply) { more = serverPeekStreamGetMore(this, taskID);
more = serverPeekStreamGetMore(this, taskID); // if (usePeekStream && taskID == TaskPriority::TLogPeekReply) {
} // more = serverPeekStreamGetMore(this, taskID);
// if (parallelGetMore || onlySpilled || futureResults.size()) { // }
// more = serverPeekParallelGetMore(this, taskID); // if (parallelGetMore || onlySpilled || futureResults.size()) {
// } // more = serverPeekParallelGetMore(this, taskID);
else { // }
more = serverPeekGetMore(this, taskID); // else {
} // more = serverPeekGetMore(this, taskID);
// }
} }
return more; return more;
} }

@ -557,6 +557,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
TLogData* tLogData; TLogData* tLogData;
Promise<Void> recoveryComplete, committingQueue; Promise<Void> recoveryComplete, committingQueue;
Version unrecoveredBefore, recoveredAt; Version unrecoveredBefore, recoveredAt;
int activePeekStreams = 0;
struct PeekTrackerData { struct PeekTrackerData {
std::map<int, Promise<std::pair<Version, bool>>> std::map<int, Promise<std::pair<Version, bool>>>
@ -668,6 +669,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
specialCounter(cc, "PeekMemoryReserved", [tLogData]() { return tLogData->peekMemoryLimiter.activePermits(); }); specialCounter(cc, "PeekMemoryReserved", [tLogData]() { return tLogData->peekMemoryLimiter.activePermits(); });
specialCounter(cc, "PeekMemoryRequestsStalled", [tLogData]() { return tLogData->peekMemoryLimiter.waiters(); }); specialCounter(cc, "PeekMemoryRequestsStalled", [tLogData]() { return tLogData->peekMemoryLimiter.waiters(); });
specialCounter(cc, "Generation", [this]() { return this->recoveryCount; }); specialCounter(cc, "Generation", [this]() { return this->recoveryCount; });
specialCounter(cc, "ActivePeekStreams", [this]() { return this->activePeekStreams; });
} }
~LogData() { ~LogData() {
@ -1167,17 +1169,19 @@ ACTOR Future<Void> tLogPopCore(TLogData* self, Tag inputTag, Version to, Referen
} }
uint64_t PoppedVersionLag = logData->persistentDataDurableVersion - logData->queuePoppedVersion; uint64_t PoppedVersionLag = logData->persistentDataDurableVersion - logData->queuePoppedVersion;
if ( SERVER_KNOBS->ENABLE_DETAILED_TLOG_POP_TRACE && if (SERVER_KNOBS->ENABLE_DETAILED_TLOG_POP_TRACE &&
(logData->queuePoppedVersion > 0) && //avoid generating massive events at beginning (logData->queuePoppedVersion > 0) && // avoid generating massive events at beginning
(tagData->unpoppedRecovered || PoppedVersionLag >= SERVER_KNOBS->TLOG_POPPED_VER_LAG_THRESHOLD_FOR_TLOGPOP_TRACE)) { //when recovery or long lag (tagData->unpoppedRecovered ||
PoppedVersionLag >=
SERVER_KNOBS->TLOG_POPPED_VER_LAG_THRESHOLD_FOR_TLOGPOP_TRACE)) { // when recovery or long lag
TraceEvent("TLogPopDetails", logData->logId) TraceEvent("TLogPopDetails", logData->logId)
.detail("Tag", tagData->tag.toString()) .detail("Tag", tagData->tag.toString())
.detail("UpTo", upTo) .detail("UpTo", upTo)
.detail("PoppedVersionLag", PoppedVersionLag) .detail("PoppedVersionLag", PoppedVersionLag)
.detail("MinPoppedTag", logData->minPoppedTag.toString()) .detail("MinPoppedTag", logData->minPoppedTag.toString())
.detail("QueuePoppedVersion", logData->queuePoppedVersion) .detail("QueuePoppedVersion", logData->queuePoppedVersion)
.detail("UnpoppedRecovered", tagData->unpoppedRecovered ? "True" : "False") .detail("UnpoppedRecovered", tagData->unpoppedRecovered ? "True" : "False")
.detail("NothingPersistent", tagData->nothingPersistent ? "True" : "False"); .detail("NothingPersistent", tagData->nothingPersistent ? "True" : "False");
} }
if (upTo > logData->persistentDataDurableVersion) if (upTo > logData->persistentDataDurableVersion)
wait(tagData->eraseMessagesBefore(upTo, self, logData, TaskPriority::TLogPop)); wait(tagData->eraseMessagesBefore(upTo, self, logData, TaskPriority::TLogPop));
@ -1915,6 +1919,7 @@ ACTOR Future<TLogPeekReply> peekTLog(TLogData* self,
// This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover // This actor keep pushing TLogPeekStreamReply until it's removed from the cluster or should recover
ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Reference<LogData> logData) { ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Reference<LogData> logData) {
logData->activePeekStreams ++;
state Version begin = req.begin; state Version begin = req.begin;
state bool onlySpilled = false; state bool onlySpilled = false;
if (req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) { if (req.tag.locality == tagLocalityTxs && req.tag.id >= logData->txsTags && logData->txsTags > 0) {
@ -1929,18 +1934,17 @@ ACTOR Future<Void> tLogPeekStream(TLogData* self, TLogPeekStreamRequest req, Ref
req.reply.send(reply); req.reply.send(reply);
begin = reply.rep.end; begin = reply.rep.end;
onlySpilled = reply.rep.onlySpilled; onlySpilled = reply.rep.onlySpilled;
wait(delay(0, g_network->getCurrentTask()));
wait(delay(.05, g_network->getCurrentTask()));
} catch (Error& e) { } catch (Error& e) {
logData->activePeekStreams --;
if (e.code() == error_code_end_of_stream) { if (e.code() == error_code_end_of_stream) {
req.reply.sendError(e); req.reply.sendError(e);
return Void(); return Void();
} } else if (e.code() == error_code_operation_obsolete) {
else if (e.code() == error_code_operation_obsolete) {
// reply stream is cancelled on the client // reply stream is cancelled on the client
return Void(); return Void();
} else { } else {
throw; throw;
} }
} }
} }