Merge pull request #11908 from jzhou77/fix

Handle cases when backup worker pulling may miss mutations
This commit is contained in:
Jingyu Zhou 2025-02-17 16:02:57 -08:00 committed by GitHub
commit 63c6539f1f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 182 additions and 18 deletions

View File

@ -1119,6 +1119,7 @@ const KeyRangeRef backupProgressKeys("\xff\x02/backupProgress/"_sr, "\xff\x02/ba
const KeyRef backupProgressPrefix = backupProgressKeys.begin;
const KeyRef backupStartedKey = "\xff\x02/backupStarted"_sr;
extern const KeyRef backupPausedKey = "\xff\x02/backupPaused"_sr;
extern const KeyRef backupWorkerMaxNoopVersionKey = "\xff\x02/backupWorkerMaxNoopVersion"_sr;
const Key backupProgressKeyFor(UID workerID) {
BinaryWriter wr(Unversioned());

View File

@ -472,6 +472,9 @@ std::vector<std::pair<UID, Version>> decodeBackupStartedValue(const ValueRef& va
// 1 = Send a signal to pause/already paused.
extern const KeyRef backupPausedKey;
// The key to store the maximum version that backup workers popped in NOOP mode.
extern const KeyRef backupWorkerMaxNoopVersionKey;
// "\xff/previousCoordinators" = "[[ClusterConnectionString]]"
// Set to the encoded structure of the cluster's previous set of coordinators.
// Changed when performing quorumChange.

View File

@ -275,6 +275,8 @@ struct BackupData {
CounterCollection cc;
Future<Void> logger;
Future<Void> noopPopper; // holds actor to save progress in NOOP mode
AsyncVar<Version> popTrigger; // trigger to pop version in NOOP mode
explicit BackupData(UID id, Reference<AsyncVar<ServerDBInfo> const> db, const InitializeBackupRequest& req)
: myId(id), tag(req.routerTag), totalTags(req.totalTags), startVersion(req.startVersion),
@ -291,6 +293,8 @@ struct BackupData {
specialCounter(cc, "AvailableBytes", [this]() { return this->lock->available(); });
logger =
cc.traceCounters("BackupWorkerMetrics", myId, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, "BackupWorkerMetrics");
popTrigger.set(invalidVersion);
noopPopper = _noopPopper(this);
}
bool pullFinished() const { return endVersion.present() && pulledVersion.get() > endVersion.get(); }
@ -438,6 +442,56 @@ struct BackupData {
changedTrigger.trigger();
}
// Update the NOOP popped version so that when backup is started or resumed,
// the worker can ignore any versions that are already popped.
ACTOR static Future<Void> _saveNoopVersion(BackupData* self, Version poppedVersion) {
state Transaction tr(self->cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> noopValue = wait(tr.get(backupWorkerMaxNoopVersionKey));
if (noopValue.present()) {
Version noopVersion = BinaryReader::fromStringRef<Version>(noopValue.get(), Unversioned());
if (poppedVersion > noopVersion) {
tr.set(backupWorkerMaxNoopVersionKey, BinaryWriter::toValue(poppedVersion, Unversioned()));
}
} else {
tr.set(backupWorkerMaxNoopVersionKey, BinaryWriter::toValue(poppedVersion, Unversioned()));
}
wait(tr.commit());
return Void();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR static Future<Void> _noopPopper(BackupData* self) {
state Future<Void> onChange = self->popTrigger.onChange();
loop {
wait(onChange);
onChange = self->popTrigger.onChange();
if (!self->pulling) {
// Save the noop pop version, which sets min version for
// the next backup job. Note this version may change after the wait.
state Version popVersion = self->popTrigger.get();
wait(_saveNoopVersion(self, popVersion));
self->popVersion = popVersion;
TraceEvent("BackupWorkerNoopPop", self->myId)
.detail("Tag", self->tag)
.detail("SavedVersion", self->savedVersion)
.detail("PopVersion", popVersion);
self->pop();
}
}
}
ACTOR static Future<Void> _waitAllInfoReady(BackupData* self) {
std::vector<Future<Void>> all;
for (auto it = self->backups.begin(); it != self->backups.end();) {
@ -942,14 +996,47 @@ ACTOR Future<Void> uploadData(BackupData* self) {
}
}
ACTOR static Future<Version> getNoopVersion(BackupData* self) {
state Transaction tr(self->cx);
loop {
try {
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
Optional<Value> noopValue = wait(tr.get(backupWorkerMaxNoopVersionKey));
if (noopValue.present()) {
return BinaryReader::fromStringRef<Version>(noopValue.get(), Unversioned());
} else {
return invalidVersion;
}
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
// Pulls data from TLog servers using LogRouter tag.
ACTOR Future<Void> pullAsyncData(BackupData* self) {
state Future<Void> logSystemChange = Void();
state Reference<ILogSystem::IPeekCursor> r;
state Version tagAt = std::max(self->pulledVersion.get(), std::max(self->startVersion, self->savedVersion));
state Arena prev;
TraceEvent("BackupWorkerPull", self->myId).log();
// Going out of noop mode, the popVersion could be larger than
// savedVersion or ongoing pop version, i.e., popTrigger.get(),
// and we can't peek messages between savedVersion and popVersion.
state Version tagAt = std::max({ self->pulledVersion.get(),
self->startVersion,
self->savedVersion,
self->popVersion,
self->popTrigger.get() });
TraceEvent("BackupWorkerPull", self->myId)
.detail("Tag", self->tag)
.detail("Version", tagAt)
.detail("PopVersion", self->popVersion)
.detail("SavedVersion", self->savedVersion);
loop {
while (self->paused.get()) {
wait(self->paused.onChange());
@ -957,6 +1044,9 @@ ACTOR Future<Void> pullAsyncData(BackupData* self) {
loop choose {
when(wait(r ? r->getMore(TaskPriority::TLogCommit) : Never())) {
DisabledTraceEvent("BackupWorkerGotMore", self->myId)
.detail("Tag", self->tag)
.detail("CursorVersion", r->version().version);
break;
}
when(wait(logSystemChange)) {
@ -969,6 +1059,25 @@ ACTOR Future<Void> pullAsyncData(BackupData* self) {
logSystemChange = self->logSystem.onChange();
}
}
// When TLog sets popped version, it means mutations between popped() and tagAt are unavailable
// on the TLog. So, we should stop pulling data from the TLog.
if (r->popped() > 0) {
Version maxNoopVersion = wait(getNoopVersion(self));
Severity sev = maxNoopVersion != invalidVersion && maxNoopVersion < r->popped() ? SevError : SevWarnAlways;
TraceEvent(sev, "BackupWorkerPullMissingMutations", self->myId)
.detail("Tag", self->tag)
.detail("BackupEpoch", self->backupEpoch)
.detail("Popped", r->popped())
.detail("NoopPoppedVersion", maxNoopVersion)
.detail("ExpectedPeekVersion", tagAt);
ASSERT(self->backupEpoch < self->recruitedEpoch && maxNoopVersion >= r->popped());
// This can only happen when the backup was in NOOP mode in the previous epoch,
// where NOOP mode popped version is larger than the expected peek version.
// CC recruits this worker from epoch's begin version, which is lower than the
// noop popped version. So it's ok for this worker to continue from the popped
// version. Max noop popped version (maybe from a different tag) should be larger
// than the popped version.
}
self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, r->getMinKnownCommittedVersion());
// Note we aggressively peek (uncommitted) messages, but only committed
@ -1036,14 +1145,11 @@ ACTOR Future<Void> monitorBackupKeyOrPullData(BackupData* self, bool keyPresent)
}
when(wait(success(committedVersion) || delay(SERVER_KNOBS->BACKUP_NOOP_POP_DELAY, self->cx->taskID))) {
if (committedVersion.isReady()) {
self->popVersion =
std::max(self->popVersion, std::max(committedVersion.get(), self->savedVersion));
Version newPopVersion =
std::max({ self->popVersion, self->savedVersion, committedVersion.get() });
self->minKnownCommittedVersion =
std::max(committedVersion.get(), self->minKnownCommittedVersion);
TraceEvent("BackupWorkerNoopPop", self->myId)
.detail("SavedVersion", self->savedVersion)
.detail("PopVersion", self->popVersion);
self->pop(); // Pop while the worker is in this NOOP state.
self->popTrigger.set(newPopVersion);
committedVersion = Never();
} else {
committedVersion = self->getMinKnownCommittedVersion();

View File

@ -18,6 +18,8 @@
* limitations under the License.
*/
#include <utility>
#include "fdbclient/FDBTypes.h"
#include "fdbclient/MetaclusterRegistration.h"
#include "fdbrpc/sim_validation.h"
@ -625,15 +627,22 @@ ACTOR Future<Void> configurationMonitor(Reference<ClusterRecoveryData> self, Dat
}
}
ACTOR static Future<Optional<Version>> getMinBackupVersion(Reference<ClusterRecoveryData> self, Database cx) {
// Returns the minimum backup version and the maximum backup worker noop version.
ACTOR static Future<std::pair<Optional<Version>, Optional<Version>>> getMinBackupVersion(
Reference<ClusterRecoveryData> self,
Database cx) {
loop {
state ReadYourWritesTransaction tr(cx);
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> value = wait(tr.get(backupStartedKey));
Optional<Version> minVersion;
state Future<Optional<Value>> fValue = tr.get(backupStartedKey);
state Future<Optional<Value>> fNoopValue = tr.get(backupWorkerMaxNoopVersionKey);
wait(success(fValue) && success(fNoopValue));
Optional<Value> value = fValue.get();
Optional<Value> noopValue = fNoopValue.get();
Optional<Version> minVersion, noopVersion;
if (value.present()) {
auto uidVersions = decodeBackupStartedValue(value.get());
TraceEvent e("GotBackupStartKey", self->dbgid);
@ -646,7 +655,10 @@ ACTOR static Future<Optional<Version>> getMinBackupVersion(Reference<ClusterReco
} else {
TraceEvent("EmptyBackupStartKey", self->dbgid).log();
}
return minVersion;
if (noopValue.present()) {
noopVersion = BinaryReader::fromStringRef<Version>(noopValue.get(), Unversioned());
}
return std::make_pair(minVersion, noopVersion);
} catch (Error& e) {
wait(tr.onError(e));
@ -695,17 +707,23 @@ ACTOR static Future<Void> recruitBackupWorkers(Reference<ClusterRecoveryData> se
backup_worker_failed()));
}
state Future<Optional<Version>> fMinVersion = getMinBackupVersion(self, cx);
state Future<std::pair<Optional<Version>, Optional<Version>>> fMinVersion = getMinBackupVersion(self, cx);
wait(gotProgress && success(fMinVersion));
TraceEvent("MinBackupVersion", self->dbgid).detail("Version", fMinVersion.get().present() ? fMinVersion.get() : -1);
Optional<Version> minVersion = fMinVersion.get().first;
Optional<Version> noopVersion = fMinVersion.get().second;
TraceEvent("MinBackupVersion", self->dbgid)
.detail("Version", minVersion.present() ? minVersion.get() : -1)
.detail("NoopVersion", noopVersion.present() ? noopVersion.get() : -1);
std::map<std::tuple<LogEpoch, Version, int>, std::map<Tag, Version>> toRecruit =
backupProgress->getUnfinishedBackup();
for (const auto& [epochVersionTags, tagVersions] : toRecruit) {
const Version oldEpochEnd = std::get<1>(epochVersionTags);
if (!fMinVersion.get().present() || fMinVersion.get().get() + 1 >= oldEpochEnd) {
if ((!minVersion.present() || minVersion.get() + 1 >= oldEpochEnd) ||
(noopVersion.present() && noopVersion.get() >= oldEpochEnd)) {
TraceEvent("SkipBackupRecruitment", self->dbgid)
.detail("MinVersion", fMinVersion.get().present() ? fMinVersion.get() : -1)
.detail("MinVersion", minVersion.present() ? minVersion.get() : -1)
.detail("NoopVersion", noopVersion.present() ? noopVersion.get() : -1)
.detail("Epoch", epoch)
.detail("OldEpoch", std::get<0>(epochVersionTags))
.detail("OldEpochEnd", oldEpochEnd);

View File

@ -1015,6 +1015,8 @@ ACTOR Future<Void> disableConsistencyScanInSim(Database db, bool waitForCompleti
return Void();
}
ACTOR Future<Void> disableBackupWorker(Database cx);
// Waits until a database quiets down (no data in flight, small tlog queue, low SQ, no active data distribution). This
// requires the database to be available and healthy in order to succeed.
ACTOR Future<Void> waitForQuietDatabase(Database cx,
@ -1058,6 +1060,10 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
state Version version = wait(setPerpetualStorageWiggle(cx, false, LockAware::True));
printf("Set perpetual_storage_wiggle=0 Done.\n");
printf("Disabling backup worker ...\n");
wait(disableBackupWorker(cx));
printf("Disabled backup worker.\n");
wait(disableConsistencyScanInSim(cx, false));
// Require 3 consecutive successful quiet database checks spaced 2 second apart

View File

@ -2658,6 +2658,24 @@ ACTOR Future<Void> disableConnectionFailuresAfter(double seconds, std::string co
return Void();
}
ACTOR Future<Void> disableBackupWorker(Database cx) {
ConfigurationResult res = wait(ManagementAPI::changeConfig(cx.getReference(), "backup_worker_enabled:=0", true));
if (res != ConfigurationResult::SUCCESS) {
TraceEvent("BackupWorkerDisableFailed").detail("Result", res);
throw operation_failed();
}
return Void();
}
ACTOR Future<Void> enableBackupWorker(Database cx) {
ConfigurationResult res = wait(ManagementAPI::changeConfig(cx.getReference(), "backup_worker_enabled:=1", true));
if (res != ConfigurationResult::SUCCESS) {
TraceEvent("BackupWorkerEnableFailed").detail("Result", res);
throw operation_failed();
}
return Void();
}
/**
* \brief Test orchestrator: sends test specification to testers in the right order and collects the results.
*
@ -2694,6 +2712,7 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
state bool waitForQuiescenceEnd = false;
state bool restorePerpetualWiggleSetting = false;
state bool perpetualWiggleEnabled = false;
state bool backupWorkerEnabled = false;
state double startDelay = 0.0;
state double databasePingDelay = 1e9;
state ISimulator::BackupAgentType simBackupAgents = ISimulator::BackupAgentType::NoBackupAgents;
@ -2757,15 +2776,20 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
} catch (Error& e) {
TraceEvent(SevError, "TestFailure").error(e).detail("Reason", "Unable to set starting configuration");
}
std::string_view confView(reinterpret_cast<const char*>(startingConfiguration.begin()),
startingConfiguration.size());
if (restorePerpetualWiggleSetting) {
std::string_view confView(reinterpret_cast<const char*>(startingConfiguration.begin()),
startingConfiguration.size());
const std::string setting = "perpetual_storage_wiggle:=";
auto pos = confView.find(setting);
if (pos != confView.npos && confView.at(pos + setting.size()) == '1') {
perpetualWiggleEnabled = true;
}
}
const std::string bwSetting = "backup_worker_enabled:=";
auto pos = confView.find(bwSetting);
if (pos != confView.npos && confView.at(pos + bwSetting.size()) == '1') {
backupWorkerEnabled = true;
}
}
// Read cluster configuration
@ -2865,6 +2889,12 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
printf("Set perpetual_storage_wiggle=1 Done.\n");
}
if (backupWorkerEnabled) {
printf("Enabling backup worker ...\n");
wait(enableBackupWorker(cx));
printf("Enabled backup worker.\n");
}
// TODO: Move this to a BehaviorInjection workload once that concept exists.
if (consistencyScanState.enabled && !consistencyScanState.enableAfter) {
printf("Enabling consistency scan ...\n");