mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-14 01:42:37 +08:00
fix: pops which were ignored during a snapshot would not be replayed on the proper tlogs within a shared tlog (#6892)
This commit is contained in:
parent
297d831192
commit
442d2b34c7
@ -291,8 +291,6 @@ struct TLogData : NonCopyable {
|
||||
// the set and for callers that unset will
|
||||
// be able to match it up
|
||||
std::string dataFolder; // folder where data is stored
|
||||
std::map<Tag, Version> toBePopped; // map of Tag->Version for all the pops
|
||||
// that came when ignorePopRequest was set
|
||||
Reference<AsyncVar<bool>> degraded;
|
||||
std::vector<TagsAndMessage> tempTagMessages;
|
||||
|
||||
@ -514,6 +512,9 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
||||
bool execOpCommitInProgress;
|
||||
int txsTags;
|
||||
|
||||
std::map<Tag, Version> toBePopped; // map of Tag->Version for all the pops
|
||||
// that came when ignorePopRequest was set
|
||||
|
||||
explicit LogData(TLogData* tLogData,
|
||||
TLogInterface interf,
|
||||
Tag remoteTag,
|
||||
@ -819,8 +820,8 @@ ACTOR Future<Void> tLogPopCore(TLogData* self, Tag inputTag, Version to, Referen
|
||||
if (self->ignorePopRequest) {
|
||||
TraceEvent(SevDebug, "IgnoringPopRequest").detail("IgnorePopDeadline", self->ignorePopDeadline);
|
||||
|
||||
if (self->toBePopped.find(inputTag) == self->toBePopped.end() || to > self->toBePopped[inputTag]) {
|
||||
self->toBePopped[inputTag] = to;
|
||||
if (logData->toBePopped.find(inputTag) == logData->toBePopped.end() || to > logData->toBePopped[inputTag]) {
|
||||
logData->toBePopped[inputTag] = to;
|
||||
}
|
||||
// add the pop to the toBePopped map
|
||||
TraceEvent(SevDebug, "IgnoringPopRequest")
|
||||
@ -882,11 +883,11 @@ ACTOR Future<Void> tLogPop(TLogData* self, TLogPopRequest req, Reference<LogData
|
||||
self->ignorePopRequest = false;
|
||||
self->ignorePopUid = "";
|
||||
self->ignorePopDeadline = 0.0;
|
||||
for (it = self->toBePopped.begin(); it != self->toBePopped.end(); it++) {
|
||||
for (it = logData->toBePopped.begin(); it != logData->toBePopped.end(); it++) {
|
||||
TraceEvent("PlayIgnoredPop").detail("Tag", it->first.toString()).detail("Version", it->second);
|
||||
ignoredPops.push_back(tLogPopCore(self, it->first, it->second, logData));
|
||||
}
|
||||
self->toBePopped.clear();
|
||||
logData->toBePopped.clear();
|
||||
wait(waitForAll(ignoredPops));
|
||||
TraceEvent("ResetIgnorePopRequest")
|
||||
.detail("Now", g_network->now())
|
||||
@ -1937,7 +1938,7 @@ ACTOR Future<Void> tLogEnablePopReq(TLogEnablePopRequest enablePopReq, TLogData*
|
||||
self->ignorePopRequest = false;
|
||||
self->ignorePopDeadline = 0.0;
|
||||
self->ignorePopUid = "";
|
||||
for (it = self->toBePopped.begin(); it != self->toBePopped.end(); it++) {
|
||||
for (it = logData->toBePopped.begin(); it != logData->toBePopped.end(); it++) {
|
||||
TraceEvent("PlayIgnoredPop").detail("Tag", it->first.toString()).detail("Version", it->second);
|
||||
ignoredPops.push_back(tLogPopCore(self, it->first, it->second, logData));
|
||||
}
|
||||
@ -1951,7 +1952,7 @@ ACTOR Future<Void> tLogEnablePopReq(TLogEnablePopRequest enablePopReq, TLogData*
|
||||
.detail("QueueCommittedVersion", logData->queueCommittedVersion.get())
|
||||
.detail("Version", logData->version.get());
|
||||
wait(waitForAll(ignoredPops));
|
||||
self->toBePopped.clear();
|
||||
logData->toBePopped.clear();
|
||||
enablePopReq.reply.send(Void());
|
||||
return Void();
|
||||
}
|
||||
|
@ -355,8 +355,6 @@ struct TLogData : NonCopyable {
|
||||
// the set and for callers that unset will
|
||||
// be able to match it up
|
||||
std::string dataFolder; // folder where data is stored
|
||||
std::map<Tag, Version> toBePopped; // map of Tag->Version for all the pops
|
||||
// that came when ignorePopRequest was set
|
||||
Reference<AsyncVar<bool>> degraded;
|
||||
|
||||
TLogData(UID dbgid,
|
||||
@ -596,6 +594,9 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
||||
bool execOpCommitInProgress;
|
||||
int txsTags;
|
||||
|
||||
std::map<Tag, Version> toBePopped; // map of Tag->Version for all the pops
|
||||
// that came when ignorePopRequest was set
|
||||
|
||||
explicit LogData(TLogData* tLogData,
|
||||
TLogInterface interf,
|
||||
Tag remoteTag,
|
||||
@ -1400,8 +1401,8 @@ ACTOR Future<Void> tLogPopCore(TLogData* self, Tag inputTag, Version to, Referen
|
||||
if (self->ignorePopRequest) {
|
||||
TraceEvent(SevDebug, "IgnoringPopRequest").detail("IgnorePopDeadline", self->ignorePopDeadline);
|
||||
|
||||
if (self->toBePopped.find(inputTag) == self->toBePopped.end() || to > self->toBePopped[inputTag]) {
|
||||
self->toBePopped[inputTag] = to;
|
||||
if (logData->toBePopped.find(inputTag) == logData->toBePopped.end() || to > logData->toBePopped[inputTag]) {
|
||||
logData->toBePopped[inputTag] = to;
|
||||
}
|
||||
// add the pop to the toBePopped map
|
||||
TraceEvent(SevDebug, "IgnoringPopRequest")
|
||||
@ -1478,11 +1479,11 @@ ACTOR Future<Void> tLogPop(TLogData* self, TLogPopRequest req, Reference<LogData
|
||||
self->ignorePopRequest = false;
|
||||
self->ignorePopUid = "";
|
||||
self->ignorePopDeadline = 0.0;
|
||||
for (it = self->toBePopped.begin(); it != self->toBePopped.end(); it++) {
|
||||
for (it = logData->toBePopped.begin(); it != logData->toBePopped.end(); it++) {
|
||||
TraceEvent("PlayIgnoredPop").detail("Tag", it->first.toString()).detail("Version", it->second);
|
||||
ignoredPops.push_back(tLogPopCore(self, it->first, it->second, logData));
|
||||
}
|
||||
self->toBePopped.clear();
|
||||
logData->toBePopped.clear();
|
||||
wait(waitForAll(ignoredPops));
|
||||
TraceEvent("ResetIgnorePopRequest")
|
||||
.detail("Now", g_network->now())
|
||||
@ -2382,7 +2383,7 @@ ACTOR Future<Void> tLogEnablePopReq(TLogEnablePopRequest enablePopReq, TLogData*
|
||||
self->ignorePopRequest = false;
|
||||
self->ignorePopDeadline = 0.0;
|
||||
self->ignorePopUid = "";
|
||||
for (it = self->toBePopped.begin(); it != self->toBePopped.end(); it++) {
|
||||
for (it = logData->toBePopped.begin(); it != logData->toBePopped.end(); it++) {
|
||||
TraceEvent("PlayIgnoredPop").detail("Tag", it->first.toString()).detail("Version", it->second);
|
||||
ignoredPops.push_back(tLogPopCore(self, it->first, it->second, logData));
|
||||
}
|
||||
@ -2396,7 +2397,7 @@ ACTOR Future<Void> tLogEnablePopReq(TLogEnablePopRequest enablePopReq, TLogData*
|
||||
.detail("QueueCommittedVersion", logData->queueCommittedVersion.get())
|
||||
.detail("Version", logData->version.get());
|
||||
wait(waitForAll(ignoredPops));
|
||||
self->toBePopped.clear();
|
||||
logData->toBePopped.clear();
|
||||
enablePopReq.reply.send(Void());
|
||||
return Void();
|
||||
}
|
||||
|
@ -358,7 +358,6 @@ struct TLogData : NonCopyable {
|
||||
FlowLock persistentDataCommitLock;
|
||||
|
||||
// Beginning of fields used by snapshot based backup and restore
|
||||
bool ignorePopRequest; // ignore pop request from storage servers
|
||||
double ignorePopDeadline; // time until which the ignorePopRequest will be
|
||||
// honored
|
||||
std::string ignorePopUid; // callers that set ignorePopRequest will set this
|
||||
@ -366,8 +365,6 @@ struct TLogData : NonCopyable {
|
||||
// the set and for callers that unset will
|
||||
// be able to match it up
|
||||
std::string dataFolder; // folder where data is stored
|
||||
std::map<Tag, Version> toBePopped; // map of Tag->Version for all the pops
|
||||
// that came when ignorePopRequest was set
|
||||
Reference<AsyncVar<bool>> degraded;
|
||||
// End of fields used by snapshot based backup and restore
|
||||
|
||||
@ -388,11 +385,10 @@ struct TLogData : NonCopyable {
|
||||
instanceID(deterministicRandom()->randomUniqueID().first()), bytesInput(0), bytesDurable(0),
|
||||
targetVolatileBytes(SERVER_KNOBS->TLOG_SPILL_THRESHOLD), overheadBytesInput(0), overheadBytesDurable(0),
|
||||
peekMemoryLimiter(SERVER_KNOBS->TLOG_SPILL_REFERENCE_MAX_PEEK_MEMORY_BYTES),
|
||||
concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS), ignorePopRequest(false),
|
||||
dataFolder(folder), degraded(degraded),
|
||||
commitLatencyDist(Histogram::getHistogram(LiteralStringRef("tLog"),
|
||||
LiteralStringRef("commit"),
|
||||
Histogram::Unit::microseconds)) {
|
||||
concurrentLogRouterReads(SERVER_KNOBS->CONCURRENT_LOG_ROUTER_READS), ignorePopDeadline(0), dataFolder(folder),
|
||||
degraded(degraded), commitLatencyDist(Histogram::getHistogram(LiteralStringRef("tLog"),
|
||||
LiteralStringRef("commit"),
|
||||
Histogram::Unit::microseconds)) {
|
||||
cx = openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True);
|
||||
}
|
||||
};
|
||||
@ -629,6 +625,9 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
||||
bool execOpCommitInProgress;
|
||||
int txsTags;
|
||||
|
||||
std::map<Tag, Version> toBePopped; // map of Tag->Version for all the pops
|
||||
// that came when ignorePopRequest was set
|
||||
|
||||
explicit LogData(TLogData* tLogData,
|
||||
TLogInterface interf,
|
||||
Tag remoteTag,
|
||||
@ -1234,14 +1233,17 @@ ACTOR Future<Void> processPopRequests(TLogData* self, Reference<LogData> logData
|
||||
state std::map<Tag, Version>::const_iterator it;
|
||||
state int ignoredPopsPlayed = 0;
|
||||
state std::map<Tag, Version> toBePopped;
|
||||
toBePopped = std::move(self->toBePopped);
|
||||
self->toBePopped.clear();
|
||||
self->ignorePopRequest = false;
|
||||
self->ignorePopDeadline = 0.0;
|
||||
|
||||
while (now() < self->ignorePopDeadline) {
|
||||
wait(delayUntil(self->ignorePopDeadline + 0.0001));
|
||||
}
|
||||
|
||||
toBePopped = std::move(logData->toBePopped);
|
||||
logData->toBePopped.clear();
|
||||
self->ignorePopUid = "";
|
||||
for (it = toBePopped.cbegin(); it != toBePopped.cend(); ++it) {
|
||||
const auto& [tag, version] = *it;
|
||||
TraceEvent("PlayIgnoredPop").detail("Tag", tag.toString()).detail("Version", version);
|
||||
TraceEvent("PlayIgnoredPop", logData->logId).detail("Tag", tag.toString()).detail("Version", version);
|
||||
ignoredPops.push_back(tLogPopCore(self, tag, version, logData));
|
||||
if (++ignoredPopsPlayed % SERVER_KNOBS->TLOG_POP_BATCH_SIZE == 0) {
|
||||
TEST(true); // Yielding while processing pop requests
|
||||
@ -1249,20 +1251,22 @@ ACTOR Future<Void> processPopRequests(TLogData* self, Reference<LogData> logData
|
||||
}
|
||||
}
|
||||
wait(waitForAll(ignoredPops));
|
||||
TraceEvent("ResetIgnorePopRequest")
|
||||
.detail("IgnorePopRequest", self->ignorePopRequest)
|
||||
.detail("IgnorePopDeadline", self->ignorePopDeadline);
|
||||
TraceEvent("ResetIgnorePopRequest", logData->logId).detail("IgnorePopDeadline", self->ignorePopDeadline);
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> tLogPop(TLogData* self, TLogPopRequest req, Reference<LogData> logData) {
|
||||
if (self->ignorePopRequest) {
|
||||
TraceEvent(SevDebug, "IgnoringPopRequest").detail("IgnorePopDeadline", self->ignorePopDeadline);
|
||||
if (now() < self->ignorePopDeadline) {
|
||||
TraceEvent(SevDebug, "IgnoringPopRequest", logData->logId).detail("IgnorePopDeadline", self->ignorePopDeadline);
|
||||
|
||||
auto& v = self->toBePopped[req.tag];
|
||||
if (logData->toBePopped.empty()) {
|
||||
logData->addActor.send(processPopRequests(self, logData));
|
||||
}
|
||||
|
||||
auto& v = logData->toBePopped[req.tag];
|
||||
v = std::max(v, req.to);
|
||||
|
||||
TraceEvent(SevDebug, "IgnoringPopRequest")
|
||||
TraceEvent(SevDebug, "IgnoringPopRequest", logData->logId)
|
||||
.detail("IgnorePopDeadline", self->ignorePopDeadline)
|
||||
.detail("Tag", req.tag.toString())
|
||||
.detail("Version", req.to);
|
||||
@ -2571,15 +2575,15 @@ ACTOR Future<Void> tLogEnablePopReq(TLogEnablePopRequest enablePopReq, TLogData*
|
||||
enablePopReq.reply.sendError(operation_failed());
|
||||
return Void();
|
||||
}
|
||||
TraceEvent("EnableTLogPlayAllIgnoredPops2")
|
||||
TraceEvent("EnableTLogPlayAllIgnoredPops2", logData->logId)
|
||||
.detail("UidStr", enablePopReq.snapUID.toString())
|
||||
.detail("IgnorePopUid", self->ignorePopUid)
|
||||
.detail("IgnorePopRequest", self->ignorePopRequest)
|
||||
.detail("IgnorePopDeadline", self->ignorePopDeadline)
|
||||
.detail("PersistentDataVersion", logData->persistentDataVersion)
|
||||
.detail("PersistentDataDurableVersion", logData->persistentDataDurableVersion)
|
||||
.detail("QueueCommittedVersion", logData->queueCommittedVersion.get())
|
||||
.detail("Version", logData->version.get());
|
||||
self->ignorePopDeadline = 0;
|
||||
wait(processPopRequests(self, logData));
|
||||
enablePopReq.reply.send(Void());
|
||||
return Void();
|
||||
@ -2681,9 +2685,8 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self,
|
||||
req.reply.sendError(operation_failed());
|
||||
} else {
|
||||
// FIXME: As part of reverting snapshot V1, make ignorePopUid a UID instead of string
|
||||
self->ignorePopRequest = true;
|
||||
self->ignorePopUid = req.snapUID.toString();
|
||||
self->ignorePopDeadline = g_network->now() + SERVER_KNOBS->TLOG_IGNORE_POP_AUTO_ENABLE_DELAY;
|
||||
self->ignorePopDeadline = now() + SERVER_KNOBS->TLOG_IGNORE_POP_AUTO_ENABLE_DELAY;
|
||||
req.reply.send(Void());
|
||||
}
|
||||
}
|
||||
@ -2693,11 +2696,6 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self,
|
||||
when(TLogSnapRequest snapReq = waitNext(tli.snapRequest.getFuture())) {
|
||||
logData->addActor.send(tLogSnapCreate(snapReq, self, logData));
|
||||
}
|
||||
when(wait(self->ignorePopRequest ? delayUntil(self->ignorePopDeadline) : Never())) {
|
||||
TEST(true); // Hit ignorePopDeadline
|
||||
TraceEvent("EnableTLogPlayAllIgnoredPops").detail("IgnoredPopDeadline", self->ignorePopDeadline);
|
||||
logData->addActor.send(processPopRequests(self, logData));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user