Start backup with a wait on all backup workers running

This wait is to make sure that backup workers are already saving mutations so
that no mutations are missed. The idea is that the CLI sets a "backupStartedKey"
in the database and waits for allWorkerStarted() key of the backup to be set.

Backup workers monitor the changes to the "backupStartedKey" and start logging
mutations. Additionally, backup worker for Tag(-2,0) monitors all other workers
have started (checking their saved progress version is larger than the backup's
start version), and then sets the allWorkerStarted() key for the backup.
This commit is contained in:
Jingyu Zhou 2020-01-21 16:57:30 -08:00
parent e9c7ad82cc
commit 5a602f58e8
6 changed files with 153 additions and 5 deletions

View File

@ -782,6 +782,11 @@ public:
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
// Set to true when all backup workers for saving mutation logs have been started.
KeyBackedProperty<bool> allWorkerStarted() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
// Stop differntial logging if already started or don't start after completing KV ranges
KeyBackedProperty<bool> stopWhenDone() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));

View File

@ -23,6 +23,7 @@
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/Status.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/KeyBackedTypes.h"
#include "fdbclient/JsonBuilder.h"
@ -2346,8 +2347,8 @@ namespace fileBackup {
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
wait(checkTaskVersion(cx, task, StartFullBackupTaskFunc::name, StartFullBackupTaskFunc::version));
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
loop{
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
@ -2361,7 +2362,43 @@ namespace fileBackup {
}
}
return Void();
// Set the "backupStartedKey" and wait for all backup worker started
tr->reset();
state BackupConfig config(task);
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> value = wait(tr->get(backupStartedKey));
std::vector<std::pair<UID, Version>> ids;
if (value.present()) {
ids = decodeBackupStartedValue(value.get());
}
UID uid = config.getUid();
bool found = false;
for (int i = 0; i < ids.size(); i++) {
if (ids[i].first == uid) {
ids[i].second = Params.beginVersion().get(task);
found = true;
}
}
if (!found) {
ids.emplace_back(config.getUid(), Params.beginVersion().get(task));
}
for (auto p : ids) {
std::cout << "setBackupStartedKey UID: " << p.first.toString() << " Version: " << p.second << "\n";
}
tr->set(backupStartedKey, encodeBackupStartedValue(ids));
state Future<Void> watchFuture = tr->watch(config.allWorkerStarted().key);
wait(tr->commit());
wait(watchFuture);
return Void();
} catch (Error &e) {
wait(tr->onError(e));
}
}
}
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
@ -2396,6 +2433,8 @@ namespace fileBackup {
wait(success(FileBackupFinishedTask::addTask(tr, taskBucket, task, TaskCompletionKey::noSignal(), backupFinished)));
wait(taskBucket->finish(tr, task));
// Clear the "backupStartedKey" to pause backup workers
return Void();
}

View File

@ -523,6 +523,19 @@ WorkerBackupStatus decodeBackupProgressValue(const ValueRef& value) {
return status;
}
Value encodeBackupStartedValue(const std::vector<std::pair<UID, Version>>& ids) {
BinaryWriter wr(IncludeVersion());
wr << ids;
return wr.toValue();
}
std::vector<std::pair<UID, Version>> decodeBackupStartedValue(const ValueRef& value) {
std::vector<std::pair<UID, Version>> ids;
BinaryReader reader(value, IncludeVersion());
if (value.size() > 0) reader >> ids;
return ids;
}
const KeyRef coordinatorsKey = LiteralStringRef("\xff/coordinators");
const KeyRef logsKey = LiteralStringRef("\xff/logs");
const KeyRef minRequiredCommitVersionKey = LiteralStringRef("\xff/minRequiredCommitVersion");

View File

@ -182,8 +182,10 @@ const Value backupProgressValue(const WorkerBackupStatus& status);
UID decodeBackupProgressKey(const KeyRef& key);
WorkerBackupStatus decodeBackupProgressValue(const ValueRef& value);
// "\xff/backupStarted"
// "\xff/backupStarted" := "[[vector<UID,Version1>]]"
extern const KeyRef backupStartedKey;
Value encodeBackupStartedValue(const std::vector<std::pair<UID, Version>>& ids);
std::vector<std::pair<UID, Version>> decodeBackupStartedValue(const ValueRef& value);
extern const KeyRef coordinatorsKey;
extern const KeyRef logsKey;

View File

@ -56,6 +56,13 @@ public:
backupStartedValue = value;
}
// Returns progress for an epoch.
std::map<Tag, Version> getEpochStatus(LogEpoch epoch) const {
const auto it = progress.find(epoch);
if (it == progress.end()) return {};
return it->second;
}
void addref() { ReferenceCounted<BackupProgress>::addref(); }
void delref() { ReferenceCounted<BackupProgress>::delref(); }

View File

@ -18,11 +18,13 @@
* limitations under the License.
*/
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/BackupContainer.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/MasterProxyInterface.h"
#include "fdbclient/SystemData.h"
#include "fdbserver/BackupInterface.h"
#include "fdbserver/BackupProgress.actor.h"
#include "fdbserver/LogProtocolMessage.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/ServerDBInfo.h"
@ -59,6 +61,8 @@ struct BackupData {
Reference<IBackupContainer> container;
AsyncVar<bool> pullFinished;
AsyncVar<std::vector<std::pair<UID, Version>>> backupUidVersions; // active backup (UID, StartVersion) pairs
CounterCollection cc;
Future<Void> logger;
@ -121,8 +125,20 @@ ACTOR Future<Void> monitorBackupChanges(BackupData* self, bool started) {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Standalone<StringRef>> value = wait(tr.get(backupStartedKey));
if ((value.present() && started) || (!value.present() && !started)) {
Optional<Value> value = wait(tr.get(backupStartedKey));
if (value.present() && started) {
self->backupUidVersions.set(decodeBackupStartedValue(value.get()));
TraceEvent e("BackupWorkerGotStartKey", self->myId);
int i = 1;
for (auto uidVersion : self->backupUidVersions.get()) {
e.detail(format("BackupID%d", i), uidVersion.first.toString())
.detail(format("Version%d", i), uidVersion.second);
i++;
}
return Void();
}
if (!value.present() && !started) {
TraceEvent("BackupWorkerNoStartKey", self->myId);
return Void();
}
@ -137,6 +153,69 @@ ACTOR Future<Void> monitorBackupChanges(BackupData* self, bool started) {
}
}
// Monitor all backup worker in the recruited epoch has been started. If so,
// set the "allWorkerStarted" key of the BackupConfig to true, which in turn
// unblocks StartFullBackupTaskFunc::_execute.
ACTOR Future<Void> monitorAllWorkerStarted(BackupData* self) {
loop {
while (self->backupUidVersions.get().empty()) {
wait(self->backupUidVersions.onChange());
}
// check all workers have started by checking their progress is larger
// than the backup's start version.
state Reference<BackupProgress> progress(new BackupProgress(self->myId, {}));
wait(getBackupProgress(self->cx, self->myId, progress));
std::map<Tag, Version> tagVersions = progress->getEpochStatus(self->recruitedEpoch);
state std::vector<UID> ready;
if (tagVersions.size() == self->logSystem.get()->getLogRouterTags()) {
// Check every version is larger than backup's startVersion
for (const auto uidVersion : self->backupUidVersions.get()) {
bool saved = true;
for (const std::pair<Tag, Version> tv : tagVersions) {
if (tv.second < uidVersion.second) {
saved = false;
break;
}
}
if (saved) {
ready.push_back(uidVersion.first);
}
}
// Set "allWorkerStarted" key for ready backups
loop {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(self->cx));
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state std::vector<Future<Optional<Value>>> readyValues;
state std::vector<BackupConfig> configs;
for (UID uid : ready) {
configs.emplace_back(uid);
readyValues.push_back(tr->get(configs.back().allWorkerStarted().key));
}
wait(waitForAll(readyValues));
for (int i = 0; i < readyValues.size(); i++) {
if (!readyValues[i].get().present()) {
configs[i].allWorkerStarted().set(tr, true);
TraceEvent("BackupWorkerSetReady", self->myId).detail("BackupID", ready[i].toString());
}
}
wait(tr->commit());
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
wait(delay(SERVER_KNOBS->WORKER_LOGGING_INTERVAL / 2.0) || self->backupUidVersions.onChange());
}
}
ACTOR Future<Void> saveProgress(BackupData* self, Version backupVersion) {
state Transaction tr(self->cx);
state Key key = backupProgressKeyFor(self->myId);
@ -426,6 +505,9 @@ ACTOR Future<Void> backupWorker(BackupInterface interf, InitializeBackupRequest
addActor.send(monitorBackupKeyOrPullData(&self));
addActor.send(checkRemoved(db, req.recruitedEpoch, &self));
addActor.send(waitFailureServer(interf.waitFailure.getFuture()));
if (req.recruitedEpoch == req.backupEpoch && req.routerTag.id == 0) {
addActor.send(monitorAllWorkerStarted(&self));
}
state Future<Void> done = uploadData(&self);