mirror of
https://github.com/apple/foundationdb.git
synced 2025-06-01 18:56:00 +08:00
Merge pull request #89 from yichic/share-log-mutations-5.2
Share log mutations 5.2
This commit is contained in:
commit
ede5cab192
@ -46,6 +46,7 @@ public:
|
||||
static const Key keyFolderId;
|
||||
static const Key keyBeginVersion;
|
||||
static const Key keyEndVersion;
|
||||
static const Key keyPrevBeginVersion;
|
||||
static const Key keyConfigBackupTag;
|
||||
static const Key keyConfigLogUid;
|
||||
static const Key keyConfigBackupRanges;
|
||||
@ -55,6 +56,8 @@ public:
|
||||
static const Key keyLastUid;
|
||||
static const Key keyBeginKey;
|
||||
static const Key keyEndKey;
|
||||
static const Key destUid;
|
||||
static const Key backupStartVersion;
|
||||
|
||||
static const Key keyTagName;
|
||||
static const Key keyStates;
|
||||
@ -353,6 +356,11 @@ public:
|
||||
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return getStateValue(tr, logUid); });
|
||||
}
|
||||
|
||||
Future<UID> getDestUid(Reference<ReadYourWritesTransaction> tr, UID logUid);
|
||||
Future<UID> getDestUid(Database cx, UID logUid) {
|
||||
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return getDestUid(tr, logUid); });
|
||||
}
|
||||
|
||||
Future<UID> getLogUid(Reference<ReadYourWritesTransaction> tr, Key tagName);
|
||||
Future<UID> getLogUid(Database cx, Key tagName) {
|
||||
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){ return getLogUid(tr, tagName); });
|
||||
@ -410,8 +418,9 @@ struct RCGroup {
|
||||
|
||||
bool copyParameter(Reference<Task> source, Reference<Task> dest, Key key);
|
||||
Version getVersionFromString(std::string const& value);
|
||||
Standalone<VectorRef<KeyRangeRef>> getLogRanges(Version beginVersion, Version endVersion, Key backupUid, int blockSize = CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE);
|
||||
Standalone<VectorRef<KeyRangeRef>> getLogRanges(Version beginVersion, Version endVersion, Key destUidValue, int blockSize = CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE);
|
||||
Standalone<VectorRef<KeyRangeRef>> getApplyRanges(Version beginVersion, Version endVersion, Key backupUid);
|
||||
Future<Void> eraseLogData(Database cx, Key logUidValue, Key destUidValue, Optional<Version> beginVersion = Optional<Version>(), Optional<Version> endVersion = Optional<Version>(), bool checkBackupUid = false, Version backupUid = 0);
|
||||
Key getApplyKey( Version version, Key backupUid );
|
||||
std::pair<uint64_t, uint32_t> decodeBKMutationLogKey(Key key);
|
||||
Standalone<VectorRef<MutationRef>> decodeBackupLogValue(StringRef value);
|
||||
@ -500,9 +509,14 @@ public:
|
||||
|
||||
KeyBackedConfig(StringRef prefix, Reference<Task> task) : KeyBackedConfig(prefix, TaskParams.uid().get(task)) {}
|
||||
|
||||
Future<Void> toTask(Reference<ReadYourWritesTransaction> tr, Reference<Task> task) {
|
||||
Future<Void> toTask(Reference<ReadYourWritesTransaction> tr, Reference<Task> task, bool setValidation = true) {
|
||||
// Set the uid task parameter
|
||||
TaskParams.uid().set(task, uid);
|
||||
|
||||
if (!setValidation) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Set the validation condition for the task which is that the restore uid's tag's uid is the same as the restore uid.
|
||||
// Get this uid's tag, then get the KEY for the tag's uid but don't read it. That becomes the validation key
|
||||
// which TaskBucket will check, and its value must be this restore config's uid.
|
||||
@ -701,6 +715,10 @@ public:
|
||||
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
||||
}
|
||||
|
||||
KeyBackedProperty<Key> destUidValue() {
|
||||
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
||||
}
|
||||
|
||||
Future<Optional<Version>> getLatestRestorableVersion(Reference<ReadYourWritesTransaction> tr) {
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
||||
@ -720,9 +738,9 @@ public:
|
||||
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
||||
}
|
||||
|
||||
void startMutationLogs(Reference<ReadYourWritesTransaction> tr, KeyRangeRef backupRange) {
|
||||
Key mutationLogsDestKey = uidPrefixKey(backupLogKeys.begin, getUid());
|
||||
tr->set(logRangesEncodeKey(backupRange.begin, getUid()), logRangesEncodeValue(backupRange.end, mutationLogsDestKey));
|
||||
void startMutationLogs(Reference<ReadYourWritesTransaction> tr, KeyRangeRef backupRange, Key destUidValue) {
|
||||
Key mutationLogsDestKey = destUidValue.withPrefix(backupLogKeys.begin);
|
||||
tr->set(logRangesEncodeKey(backupRange.begin, BinaryReader::fromStringRef<UID>(destUidValue, Unversioned())), logRangesEncodeValue(backupRange.end, mutationLogsDestKey));
|
||||
}
|
||||
|
||||
Future<Void> logError(Database cx, Error e, std::string details, void *taskInstance = nullptr) {
|
||||
|
@ -25,6 +25,7 @@
|
||||
const Key BackupAgentBase::keyFolderId = LiteralStringRef("config_folderid");
|
||||
const Key BackupAgentBase::keyBeginVersion = LiteralStringRef("beginVersion");
|
||||
const Key BackupAgentBase::keyEndVersion = LiteralStringRef("endVersion");
|
||||
const Key BackupAgentBase::keyPrevBeginVersion = LiteralStringRef("prevBeginVersion");
|
||||
const Key BackupAgentBase::keyConfigBackupTag = LiteralStringRef("config_backup_tag");
|
||||
const Key BackupAgentBase::keyConfigLogUid = LiteralStringRef("config_log_uid");
|
||||
const Key BackupAgentBase::keyConfigBackupRanges = LiteralStringRef("config_backup_ranges");
|
||||
@ -34,6 +35,8 @@ const Key BackupAgentBase::keyStateStatus = LiteralStringRef("state_status");
|
||||
const Key BackupAgentBase::keyLastUid = LiteralStringRef("last_uid");
|
||||
const Key BackupAgentBase::keyBeginKey = LiteralStringRef("beginKey");
|
||||
const Key BackupAgentBase::keyEndKey = LiteralStringRef("endKey");
|
||||
const Key BackupAgentBase::destUid = LiteralStringRef("destUid");
|
||||
const Key BackupAgentBase::backupStartVersion = LiteralStringRef("backupStartVersion");
|
||||
|
||||
const Key BackupAgentBase::keyTagName = LiteralStringRef("tagname");
|
||||
const Key BackupAgentBase::keyStates = LiteralStringRef("state");
|
||||
@ -68,12 +71,12 @@ Version getVersionFromString(std::string const& value) {
|
||||
// \xff / bklog / keyspace in a funny order for performance reasons.
|
||||
// Return the ranges of keys that contain the data for the given range
|
||||
// of versions.
|
||||
Standalone<VectorRef<KeyRangeRef>> getLogRanges(Version beginVersion, Version endVersion, Key backupUid, int blockSize) {
|
||||
Standalone<VectorRef<KeyRangeRef>> getLogRanges(Version beginVersion, Version endVersion, Key destUidValue, int blockSize) {
|
||||
Standalone<VectorRef<KeyRangeRef>> ret;
|
||||
|
||||
Key baLogRangePrefix = backupUid.withPrefix(backupLogKeys.begin);
|
||||
Key baLogRangePrefix = destUidValue.withPrefix(backupLogKeys.begin);
|
||||
|
||||
//TraceEvent("getLogRanges").detail("backupUid", backupUid).detail("prefix", printable(StringRef(baLogRangePrefix)));
|
||||
//TraceEvent("getLogRanges").detail("destUidValue", destUidValue).detail("prefix", printable(StringRef(baLogRangePrefix)));
|
||||
|
||||
for (int64_t vblock = beginVersion / blockSize; vblock < (endVersion + blockSize - 1) / blockSize; ++vblock) {
|
||||
int64_t tb = vblock * blockSize / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
|
||||
@ -619,3 +622,147 @@ ACTOR Future<Void> applyMutations(Database cx, Key uid, Key addPrefix, Key remov
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> _clearLogRanges(Reference<ReadYourWritesTransaction> tr, bool clearVersionHistory, Key logUidValue, Key destUidValue, Version beginVersion, Version endVersion) {
|
||||
state Key backupLatestVersionsPath = destUidValue.withPrefix(backupLatestVersionsPrefix);
|
||||
state Key backupLatestVersionsKey = logUidValue.withPrefix(backupLatestVersionsPath);
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
state Standalone<RangeResultRef> backupVersions = wait(tr->getRange(KeyRangeRef(backupLatestVersionsPath, strinc(backupLatestVersionsPath)), CLIENT_KNOBS->TOO_MANY));
|
||||
|
||||
// Make sure version history key does exist and lower the beginVersion if needed
|
||||
bool foundSelf = false;
|
||||
for (auto backupVersion : backupVersions) {
|
||||
Key currLogUidValue = backupVersion.key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue);
|
||||
|
||||
if (currLogUidValue == logUidValue) {
|
||||
foundSelf = true;
|
||||
beginVersion = std::min(beginVersion, BinaryReader::fromStringRef<Version>(backupVersion.value, Unversioned()));
|
||||
}
|
||||
}
|
||||
|
||||
// Do not clear anything if version history key cannot be found
|
||||
if (!foundSelf) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
Version nextSmallestVersion = endVersion;
|
||||
bool clearLogRangesRequired = true;
|
||||
|
||||
// More than one backup/DR with the same range
|
||||
if (backupVersions.size() > 1) {
|
||||
for (auto backupVersion : backupVersions) {
|
||||
Key currLogUidValue = backupVersion.key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue);
|
||||
Version currVersion = BinaryReader::fromStringRef<Version>(backupVersion.value, Unversioned());
|
||||
|
||||
if (currLogUidValue == logUidValue) {
|
||||
continue;
|
||||
} else if (currVersion > beginVersion) {
|
||||
nextSmallestVersion = std::min(currVersion, nextSmallestVersion);
|
||||
} else {
|
||||
// If we can find a version less than or equal to beginVersion, clearing log ranges is not required
|
||||
clearLogRangesRequired = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (clearVersionHistory && backupVersions.size() == 1) {
|
||||
// Clear version history
|
||||
tr->clear(prefixRange(backupLatestVersionsPath));
|
||||
|
||||
// Clear everything under blog/[destUid]
|
||||
tr->clear(prefixRange(destUidValue.withPrefix(backupLogKeys.begin)));
|
||||
|
||||
// Disable committing mutations into blog
|
||||
tr->clear(prefixRange(destUidValue.withPrefix(logRangesRange.begin)));
|
||||
} else {
|
||||
if (clearVersionHistory) {
|
||||
// Clear current backup version history
|
||||
tr->clear(backupLatestVersionsKey);
|
||||
} else {
|
||||
// Update current backup latest version
|
||||
tr->set(backupLatestVersionsKey, BinaryWriter::toValue<Version>(endVersion, Unversioned()));
|
||||
}
|
||||
|
||||
// Clear log ranges if needed
|
||||
if (clearLogRangesRequired) {
|
||||
Standalone<VectorRef<KeyRangeRef>> ranges = getLogRanges(beginVersion, nextSmallestVersion, destUidValue);
|
||||
for (auto& range : ranges) {
|
||||
tr->clear(range);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
// The difference between beginVersion and endVersion should not be too large
|
||||
Future<Void> clearLogRanges(Reference<ReadYourWritesTransaction> tr, bool clearVersionHistory, Key logUidValue, Key destUidValue, Version beginVersion, Version endVersion) {
|
||||
return _clearLogRanges(tr, clearVersionHistory, logUidValue, destUidValue, beginVersion, endVersion);
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> _eraseLogData(Database cx, Key logUidValue, Key destUidValue, Optional<Version> beginVersion, Optional<Version> endVersion, bool checkBackupUid, Version backupUid) {
|
||||
if ((beginVersion.present() && endVersion.present() && endVersion.get() <= beginVersion.get()) || !destUidValue.size())
|
||||
return Void();
|
||||
|
||||
state Version currBeginVersion;
|
||||
state Version endVersionValue;
|
||||
state Version currEndVersion;
|
||||
state bool clearVersionHistory;
|
||||
|
||||
ASSERT(beginVersion.present() == endVersion.present());
|
||||
if (beginVersion.present()) {
|
||||
currBeginVersion = beginVersion.get();
|
||||
endVersionValue = endVersion.get();
|
||||
clearVersionHistory = false;
|
||||
} else {
|
||||
// If beginVersion and endVersion are not presented, it means backup is done and we need to clear version history.
|
||||
// Set currBeginVersion to INTMAX_MAX and it will be set to the correct version in clearLogRanges().
|
||||
// Set endVersionValue to INTMAX_MAX since we need to clear log ranges up to next smallest version.
|
||||
currBeginVersion = endVersionValue = currEndVersion = INTMAX_MAX;
|
||||
clearVersionHistory = true;
|
||||
}
|
||||
|
||||
|
||||
while (currBeginVersion < endVersionValue || clearVersionHistory) {
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
|
||||
loop{
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
||||
if (checkBackupUid) {
|
||||
Subspace sourceStates = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keySourceStates).get(logUidValue);
|
||||
Optional<Value> v = wait( tr->get( sourceStates.pack(DatabaseBackupAgent::keyFolderId) ) );
|
||||
if(v.present() && BinaryReader::fromStringRef<Version>(v.get(), Unversioned()) > backupUid)
|
||||
return Void();
|
||||
}
|
||||
|
||||
if (!clearVersionHistory) {
|
||||
currEndVersion = std::min(currBeginVersion + CLIENT_KNOBS->CLEAR_LOG_RANGE_COUNT * CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE, endVersionValue);
|
||||
}
|
||||
|
||||
Void _ = wait(clearLogRanges(tr, clearVersionHistory, logUidValue, destUidValue, currBeginVersion, currEndVersion));
|
||||
Void _ = wait(tr->commit());
|
||||
|
||||
if (clearVersionHistory) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
currBeginVersion = currEndVersion;
|
||||
break;
|
||||
} catch (Error &e) {
|
||||
Void _ = wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> eraseLogData(Database cx, Key logUidValue, Key destUidValue, Optional<Version> beginVersion, Optional<Version> endVersion, bool checkBackupUid, Version backupUid) {
|
||||
return _eraseLogData(cx, logUidValue, destUidValue, beginVersion, endVersion, checkBackupUid, backupUid);
|
||||
}
|
@ -94,6 +94,7 @@ namespace dbBackup {
|
||||
if (source) {
|
||||
copyParameter(source, dest, BackupAgentBase::keyFolderId);
|
||||
copyParameter(source, dest, BackupAgentBase::keyConfigLogUid);
|
||||
copyParameter(source, dest, BackupAgentBase::destUid);
|
||||
|
||||
copyParameter(source, dest, DatabaseBackupAgent::keyAddPrefix);
|
||||
copyParameter(source, dest, DatabaseBackupAgent::keyRemovePrefix);
|
||||
@ -469,25 +470,6 @@ namespace dbBackup {
|
||||
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _execute(cx, tb, fb, task); };
|
||||
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
|
||||
|
||||
ACTOR static Future<Void> eraseLogData(Database cx, Reference<Task> task, Version beginVersion, Version endVersion) {
|
||||
if (endVersion <= beginVersion)
|
||||
return Void();
|
||||
state Transaction tr(cx);
|
||||
loop{
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Standalone<VectorRef<KeyRangeRef>> ranges = getLogRanges(beginVersion, endVersion, task->params[BackupAgentBase::keyConfigLogUid]);
|
||||
for (auto & rng : ranges)
|
||||
tr.clear(rng);
|
||||
Void _ = wait(tr.commit());
|
||||
return Void();
|
||||
}
|
||||
catch (Error &e) {
|
||||
Void _ = wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
|
||||
state FlowLock lock(CLIENT_KNOBS->BACKUP_LOCK_BYTES);
|
||||
|
||||
@ -496,7 +478,7 @@ namespace dbBackup {
|
||||
Version beginVersion = BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyBeginVersion], Unversioned());
|
||||
Version endVersion = BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyEndVersion], Unversioned());
|
||||
|
||||
Void _ = wait(eraseLogData(taskBucket->src, task, beginVersion, endVersion));
|
||||
Void _ = wait(eraseLogData(taskBucket->src, task->params[BackupAgentBase::keyConfigLogUid], task->params[BackupAgentBase::destUid], Optional<Version>(beginVersion), Optional<Version>(endVersion), true, BinaryReader::fromStringRef<Version>(task->params[BackupAgentBase::keyFolderId], Unversioned())));
|
||||
|
||||
return Void();
|
||||
}
|
||||
@ -520,8 +502,6 @@ namespace dbBackup {
|
||||
|
||||
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
|
||||
|
||||
state Version beginVersion = BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyBeginVersion], Unversioned());
|
||||
state Version endVersion = BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyEndVersion], Unversioned());
|
||||
state Reference<TaskFuture> taskFuture = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]);
|
||||
|
||||
Void _ = wait(taskFuture->set(tr, taskBucket) && taskBucket->finish(tr, task));
|
||||
@ -609,7 +589,7 @@ namespace dbBackup {
|
||||
tr.addReadConflictRange(singleKeyRange(kv.key));
|
||||
first = false;
|
||||
}
|
||||
tr.set(kv.key.removePrefix(backupLogKeys.begin).withPrefix(applyLogKeys.begin), kv.value);
|
||||
tr.set(kv.key.removePrefix(backupLogKeys.begin).removePrefix(task->params[BackupAgentBase::destUid]).withPrefix(task->params[BackupAgentBase::keyConfigLogUid]).withPrefix(applyLogKeys.begin), kv.value);
|
||||
bytesSet += kv.expectedSize() - backupLogKeys.begin.expectedSize() + applyLogKeys.begin.expectedSize();
|
||||
}
|
||||
}
|
||||
@ -644,7 +624,7 @@ namespace dbBackup {
|
||||
state Version endVersion = BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyEndVersion], Unversioned());
|
||||
state Version newEndVersion = std::min(endVersion, (((beginVersion-1) / CLIENT_KNOBS->BACKUP_BLOCK_SIZE) + 2 + (g_network->isSimulated() ? CLIENT_KNOBS->BACKUP_SIM_COPY_LOG_RANGES : 0)) * CLIENT_KNOBS->BACKUP_BLOCK_SIZE);
|
||||
|
||||
state Standalone<VectorRef<KeyRangeRef>> ranges = getLogRanges(beginVersion, newEndVersion, task->params[BackupAgentBase::keyConfigLogUid], CLIENT_KNOBS->BACKUP_BLOCK_SIZE);
|
||||
state Standalone<VectorRef<KeyRangeRef>> ranges = getLogRanges(beginVersion, newEndVersion, task->params[BackupAgentBase::destUid], CLIENT_KNOBS->BACKUP_BLOCK_SIZE);
|
||||
state std::vector<PromiseStream<RCGroup>> results;
|
||||
state std::vector<Future<Void>> rc;
|
||||
state std::vector<Future<Void>> dump;
|
||||
@ -695,12 +675,10 @@ namespace dbBackup {
|
||||
if (task->params.find(CopyLogRangeTaskFunc::keyNextBeginVersion) != task->params.end()) {
|
||||
state Version nextVersion = BinaryReader::fromStringRef<Version>(task->params[CopyLogRangeTaskFunc::keyNextBeginVersion], Unversioned());
|
||||
Void _ = wait(success(CopyLogRangeTaskFunc::addTask(tr, taskBucket, task, nextVersion, endVersion, TaskCompletionKey::signal(taskFuture->key))) &&
|
||||
success(EraseLogRangeTaskFunc::addTask(tr, taskBucket, task, beginVersion, nextVersion, TaskCompletionKey::noSignal())) &&
|
||||
taskBucket->finish(tr, task));
|
||||
}
|
||||
else {
|
||||
Void _ = wait(success(EraseLogRangeTaskFunc::addTask(tr, taskBucket, task, beginVersion, endVersion, TaskCompletionKey::noSignal())) &&
|
||||
taskFuture->set(tr, taskBucket) &&
|
||||
Void _ = wait(taskFuture->set(tr, taskBucket) &&
|
||||
taskBucket->finish(tr, task));
|
||||
}
|
||||
|
||||
@ -722,6 +700,7 @@ namespace dbBackup {
|
||||
Void _ = wait(checkTaskVersion(tr, task, CopyLogsTaskFunc::name, CopyLogsTaskFunc::version));
|
||||
|
||||
state Version beginVersion = BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyBeginVersion], Unversioned());
|
||||
state Version prevBeginVersion = BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyPrevBeginVersion], Unversioned());
|
||||
state Future<Optional<Value>> fStopValue = tr->get(states.pack(DatabaseBackupAgent::keyCopyStop));
|
||||
state Future<Optional<Value>> fAppliedValue = tr->get(task->params[BackupAgentBase::keyConfigLogUid].withPrefix(applyMutationsBeginRange.begin));
|
||||
|
||||
@ -733,7 +712,7 @@ namespace dbBackup {
|
||||
|
||||
if (endVersion <= beginVersion) {
|
||||
Void _ = wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
|
||||
Key _ = wait(CopyLogsTaskFunc::addTask(tr, taskBucket, task, beginVersion, TaskCompletionKey::signal(onDone)));
|
||||
Key _ = wait(CopyLogsTaskFunc::addTask(tr, taskBucket, task, prevBeginVersion, beginVersion, TaskCompletionKey::signal(onDone)));
|
||||
Void _ = wait(taskBucket->finish(tr, task));
|
||||
return Void();
|
||||
}
|
||||
@ -758,7 +737,7 @@ namespace dbBackup {
|
||||
if ((stopVersionData == -1) || (stopVersionData >= applyVersion)) {
|
||||
state Reference<TaskFuture> allPartsDone = futureBucket->future(tr);
|
||||
std::vector<Future<Key>> addTaskVector;
|
||||
addTaskVector.push_back(CopyLogsTaskFunc::addTask(tr, taskBucket, task, endVersion, TaskCompletionKey::signal(onDone), allPartsDone));
|
||||
addTaskVector.push_back(CopyLogsTaskFunc::addTask(tr, taskBucket, task, beginVersion, endVersion, TaskCompletionKey::signal(onDone), allPartsDone));
|
||||
int blockSize = std::max<int>(1, ((endVersion - beginVersion)/CLIENT_KNOBS->BACKUP_COPY_TASKS)/CLIENT_KNOBS->BACKUP_BLOCK_SIZE);
|
||||
for (int64_t vblock = beginVersion / CLIENT_KNOBS->BACKUP_BLOCK_SIZE; vblock < (endVersion + CLIENT_KNOBS->BACKUP_BLOCK_SIZE - 1) / CLIENT_KNOBS->BACKUP_BLOCK_SIZE; vblock += blockSize) {
|
||||
addTaskVector.push_back(CopyLogRangeTaskFunc::addTask(tr, taskBucket, task,
|
||||
@ -766,11 +745,17 @@ namespace dbBackup {
|
||||
std::min(endVersion, (vblock + blockSize) * CLIENT_KNOBS->BACKUP_BLOCK_SIZE),
|
||||
TaskCompletionKey::joinWith(allPartsDone)));
|
||||
}
|
||||
|
||||
// Do not erase at the first time
|
||||
if (prevBeginVersion > 0) {
|
||||
addTaskVector.push_back(EraseLogRangeTaskFunc::addTask(tr, taskBucket, task, prevBeginVersion, beginVersion, TaskCompletionKey::joinWith(allPartsDone)));
|
||||
}
|
||||
|
||||
Void _ = wait(waitForAll(addTaskVector) && taskBucket->finish(tr, task));
|
||||
} else {
|
||||
if(appliedVersion <= stopVersionData) {
|
||||
Void _ = wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
|
||||
Key _ = wait(CopyLogsTaskFunc::addTask(tr, taskBucket, task, beginVersion, TaskCompletionKey::signal(onDone)));
|
||||
Key _ = wait(CopyLogsTaskFunc::addTask(tr, taskBucket, task, prevBeginVersion, beginVersion, TaskCompletionKey::signal(onDone)));
|
||||
Void _ = wait(taskBucket->finish(tr, task));
|
||||
return Void();
|
||||
}
|
||||
@ -782,12 +767,13 @@ namespace dbBackup {
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, Version beginVersion, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
|
||||
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, Version prevBeginVersion, Version beginVersion, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
|
||||
Key doneKey = wait(completionKey.get(tr, taskBucket));
|
||||
Reference<Task> task(new Task(CopyLogsTaskFunc::name, CopyLogsTaskFunc::version, doneKey, 1));
|
||||
|
||||
copyDefaultParameters(parentTask, task);
|
||||
task->params[BackupAgentBase::keyBeginVersion] = BinaryWriter::toValue(beginVersion, Unversioned());
|
||||
task->params[DatabaseBackupAgent::keyPrevBeginVersion] = BinaryWriter::toValue(prevBeginVersion, Unversioned());
|
||||
|
||||
if (!waitFor) {
|
||||
return taskBucket->addTask(tr, task, parentTask->params[Task::reservedTaskParamValidKey], task->params[BackupAgentBase::keyFolderId]);
|
||||
@ -839,29 +825,38 @@ namespace dbBackup {
|
||||
}
|
||||
}
|
||||
|
||||
state Transaction tr(taskBucket->src);
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(taskBucket->src));
|
||||
state Key logUidValue = task->params[DatabaseBackupAgent::keyConfigLogUid];
|
||||
state Key destUidValue = task->params[BackupAgentBase::destUid];
|
||||
state Version beginVersion;
|
||||
state Version endVersion;
|
||||
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Optional<Value> v = wait( tr.get( sourceStates.pack(DatabaseBackupAgent::keyFolderId) ) );
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Optional<Value> v = wait( tr->get( sourceStates.pack(DatabaseBackupAgent::keyFolderId) ) );
|
||||
if(v.present() && BinaryReader::fromStringRef<Version>(v.get(), Unversioned()) > BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyFolderId], Unversioned()))
|
||||
return Void();
|
||||
|
||||
UID logUid = BinaryReader::fromStringRef<UID>(task->params[DatabaseBackupAgent::keyConfigLogUid], Unversioned());
|
||||
Key configPath = uidPrefixKey(logRangesRange.begin, logUid);
|
||||
Key logsPath = uidPrefixKey(backupLogKeys.begin, logUid);
|
||||
state Key latestVersionKey = logUidValue.withPrefix(task->params[BackupAgentBase::destUid].withPrefix(backupLatestVersionsPrefix));
|
||||
state Optional<Key> bVersion = wait(tr->get(latestVersionKey));
|
||||
|
||||
tr.set(sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_COMPLETED)));
|
||||
tr.clear(KeyRangeRef(configPath, strinc(configPath)));
|
||||
tr.clear(KeyRangeRef(logsPath, strinc(logsPath)));
|
||||
if (!bVersion.present()) {
|
||||
return Void();
|
||||
}
|
||||
beginVersion = BinaryReader::fromStringRef<Version>(bVersion.get(), Unversioned());
|
||||
|
||||
Void _ = wait(tr.commit());
|
||||
endVersion = tr->getReadVersion().get();
|
||||
break;
|
||||
} catch(Error &e) {
|
||||
Void _ = wait(tr.onError(e));
|
||||
Void _ = wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
Version backupUid = BinaryReader::fromStringRef<Version>(task->params[BackupAgentBase::keyFolderId], Unversioned());
|
||||
Void _ = wait(eraseLogData(taskBucket->src, logUidValue, destUidValue, Optional<Version>(), Optional<Version>(), true, backupUid));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
@ -920,6 +915,7 @@ namespace dbBackup {
|
||||
Void _ = wait(checkTaskVersion(tr, task, CopyDiffLogsTaskFunc::name, CopyDiffLogsTaskFunc::version));
|
||||
|
||||
state Version beginVersion = BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyBeginVersion], Unversioned());
|
||||
state Version prevBeginVersion = BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyPrevBeginVersion], Unversioned());
|
||||
state Future<Optional<Value>> fStopWhenDone = tr->get(conf.pack(DatabaseBackupAgent::keyConfigStopWhenDoneKey));
|
||||
|
||||
Transaction srcTr(taskBucket->src);
|
||||
@ -930,7 +926,7 @@ namespace dbBackup {
|
||||
|
||||
if (endVersion <= beginVersion) {
|
||||
Void _ = wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
|
||||
Key _ = wait(CopyDiffLogsTaskFunc::addTask(tr, taskBucket, task, beginVersion, TaskCompletionKey::signal(onDone)));
|
||||
Key _ = wait(CopyDiffLogsTaskFunc::addTask(tr, taskBucket, task, prevBeginVersion, beginVersion, TaskCompletionKey::signal(onDone)));
|
||||
Void _ = wait(taskBucket->finish(tr, task));
|
||||
return Void();
|
||||
}
|
||||
@ -945,7 +941,7 @@ namespace dbBackup {
|
||||
if (!stopWhenDone.present()) {
|
||||
state Reference<TaskFuture> allPartsDone = futureBucket->future(tr);
|
||||
std::vector<Future<Key>> addTaskVector;
|
||||
addTaskVector.push_back(CopyDiffLogsTaskFunc::addTask(tr, taskBucket, task, endVersion, TaskCompletionKey::signal(onDone), allPartsDone));
|
||||
addTaskVector.push_back(CopyDiffLogsTaskFunc::addTask(tr, taskBucket, task, beginVersion, endVersion, TaskCompletionKey::signal(onDone), allPartsDone));
|
||||
int blockSize = std::max<int>(1, ((endVersion - beginVersion)/ CLIENT_KNOBS->BACKUP_COPY_TASKS)/CLIENT_KNOBS->BACKUP_BLOCK_SIZE);
|
||||
for (int64_t vblock = beginVersion / CLIENT_KNOBS->BACKUP_BLOCK_SIZE; vblock < (endVersion + CLIENT_KNOBS->BACKUP_BLOCK_SIZE - 1) / CLIENT_KNOBS->BACKUP_BLOCK_SIZE; vblock += blockSize) {
|
||||
addTaskVector.push_back(CopyLogRangeTaskFunc::addTask(tr, taskBucket, task,
|
||||
@ -953,6 +949,11 @@ namespace dbBackup {
|
||||
std::min(endVersion, (vblock + blockSize) * CLIENT_KNOBS->BACKUP_BLOCK_SIZE),
|
||||
TaskCompletionKey::joinWith(allPartsDone)));
|
||||
}
|
||||
|
||||
if (prevBeginVersion > 0) {
|
||||
addTaskVector.push_back(EraseLogRangeTaskFunc::addTask(tr, taskBucket, task, prevBeginVersion, beginVersion, TaskCompletionKey::joinWith(allPartsDone)));
|
||||
}
|
||||
|
||||
Void _ = wait(waitForAll(addTaskVector) && taskBucket->finish(tr, task));
|
||||
} else {
|
||||
Void _ = wait(onDone->set(tr, taskBucket) && taskBucket->finish(tr, task));
|
||||
@ -960,13 +961,14 @@ namespace dbBackup {
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, Version beginVersion, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
|
||||
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, Version prevBeginVersion, Version beginVersion, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
|
||||
Key doneKey = wait(completionKey.get(tr, taskBucket));
|
||||
Reference<Task> task(new Task(CopyDiffLogsTaskFunc::name, CopyDiffLogsTaskFunc::version, doneKey, 1));
|
||||
|
||||
copyDefaultParameters(parentTask, task);
|
||||
|
||||
task->params[DatabaseBackupAgent::keyBeginVersion] = BinaryWriter::toValue(beginVersion, Unversioned());
|
||||
task->params[DatabaseBackupAgent::keyPrevBeginVersion] = BinaryWriter::toValue(prevBeginVersion, Unversioned());
|
||||
|
||||
if (!waitFor) {
|
||||
return taskBucket->addTask(tr, task, parentTask->params[Task::reservedTaskParamValidKey], task->params[BackupAgentBase::keyFolderId]);
|
||||
@ -998,6 +1000,15 @@ namespace dbBackup {
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr.addReadConflictRange(singleKeyRange(sourceStates.pack(DatabaseBackupAgent::keyStateStatus)));
|
||||
tr.set(sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_DIFFERENTIAL)));
|
||||
|
||||
Key versionKey = task->params[DatabaseBackupAgent::keyConfigLogUid].withPrefix(task->params[BackupAgentBase::destUid]).withPrefix(backupLatestVersionsPrefix);
|
||||
Optional<Key> prevBeginVersion = wait(tr.get(versionKey));
|
||||
if (!prevBeginVersion.present()) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
task->params[DatabaseBackupAgent::keyPrevBeginVersion] = prevBeginVersion.get();
|
||||
|
||||
Void _ = wait(tr.commit());
|
||||
return Void();
|
||||
}
|
||||
@ -1033,7 +1044,9 @@ namespace dbBackup {
|
||||
tr->set(states.pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_DIFFERENTIAL)));
|
||||
|
||||
allPartsDone = futureBucket->future(tr);
|
||||
Key _ = wait(CopyDiffLogsTaskFunc::addTask(tr, taskBucket, task, restoreVersion, TaskCompletionKey::joinWith(allPartsDone)));
|
||||
|
||||
Version prevBeginVersion = BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyPrevBeginVersion], Unversioned());
|
||||
Key _ = wait(CopyDiffLogsTaskFunc::addTask(tr, taskBucket, task, prevBeginVersion, restoreVersion, TaskCompletionKey::joinWith(allPartsDone)));
|
||||
|
||||
// After the Backup completes, clear the backup subspace and update the status
|
||||
Key _ = wait(FinishedFullBackupTaskFunc::addTask(tr, taskBucket, task, TaskCompletionKey::noSignal(), allPartsDone));
|
||||
@ -1071,48 +1084,119 @@ namespace dbBackup {
|
||||
static const uint32_t version;
|
||||
|
||||
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
|
||||
state Subspace sourceStates = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keySourceStates).get(task->params[BackupAgentBase::keyConfigLogUid]);
|
||||
state Key logUidValue = task->params[DatabaseBackupAgent::keyConfigLogUid];
|
||||
state Subspace sourceStates = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keySourceStates).get(logUidValue);
|
||||
Void _ = wait(checkTaskVersion(cx, task, StartFullBackupTaskFunc::name, StartFullBackupTaskFunc::version));
|
||||
|
||||
state UID logUid = BinaryReader::fromStringRef<UID>(task->params[DatabaseBackupAgent::keyConfigLogUid], Unversioned());
|
||||
state Key logUidDest = uidPrefixKey(backupLogKeys.begin, logUid);
|
||||
state Key destUidValue(logUidValue);
|
||||
state UID logUid = BinaryReader::fromStringRef<UID>(logUidValue, Unversioned());
|
||||
|
||||
state Standalone<VectorRef<KeyRangeRef>> backupRanges = BinaryReader::fromStringRef<Standalone<VectorRef<KeyRangeRef>>>(task->params[DatabaseBackupAgent::keyConfigBackupRanges], IncludeVersion());
|
||||
state Transaction tr(taskBucket->src);
|
||||
state Key beginVersionKey;
|
||||
|
||||
state Reference<ReadYourWritesTransaction> srcTr(new ReadYourWritesTransaction(taskBucket->src));
|
||||
loop {
|
||||
try {
|
||||
srcTr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
srcTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
||||
// Initialize destUid
|
||||
if (backupRanges.size() == 1) {
|
||||
state Key destUidLookupPath = BinaryWriter::toValue(backupRanges[0], IncludeVersion()).withPrefix(destUidLookupPrefix);
|
||||
Optional<Key> existingDestUidValue = wait(srcTr->get(destUidLookupPath));
|
||||
if (existingDestUidValue.present()) {
|
||||
destUidValue = existingDestUidValue.get();
|
||||
} else {
|
||||
destUidValue = BinaryWriter::toValue(g_random->randomUniqueID(), Unversioned());
|
||||
srcTr->set(destUidLookupPath, destUidValue);
|
||||
}
|
||||
}
|
||||
|
||||
Version bVersion = wait(srcTr->getReadVersion());
|
||||
beginVersionKey = BinaryWriter::toValue(bVersion, Unversioned());
|
||||
|
||||
task->params[BackupAgentBase::destUid] = destUidValue;
|
||||
|
||||
Void _ = wait(srcTr->commit());
|
||||
break;
|
||||
} catch(Error &e) {
|
||||
Void _ = wait(srcTr->onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Optional<Value> v = wait( tr.get( sourceStates.pack(DatabaseBackupAgent::keyFolderId) ) );
|
||||
task->params[DatabaseBackupAgent::keyBeginVersion] = BinaryWriter::toValue(tr.getReadVersion().get(), Unversioned());
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
state Future<Void> verified = taskBucket->keepRunning(tr, task);
|
||||
Void _ = wait(verified);
|
||||
|
||||
// Set destUid at destination side
|
||||
state Subspace config = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keyConfig).get(logUidValue);
|
||||
tr->set(config.pack(BackupAgentBase::destUid), task->params[BackupAgentBase::destUid]);
|
||||
|
||||
// Use existing beginVersion if we already have one
|
||||
Optional<Key> backupStartVersion = wait(tr->get(config.pack(BackupAgentBase::backupStartVersion)));
|
||||
if (backupStartVersion.present()) {
|
||||
beginVersionKey = backupStartVersion.get();
|
||||
} else {
|
||||
tr->set(config.pack(BackupAgentBase::backupStartVersion), beginVersionKey);
|
||||
}
|
||||
|
||||
task->params[BackupAgentBase::keyBeginVersion] = beginVersionKey;
|
||||
|
||||
Void _ = wait(tr->commit());
|
||||
break;
|
||||
} catch (Error &e) {
|
||||
Void _ = wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
try {
|
||||
state Reference<ReadYourWritesTransaction> srcTr2(new ReadYourWritesTransaction(taskBucket->src));
|
||||
srcTr2->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
srcTr2->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
||||
state Optional<Value> v = wait( srcTr2->get( sourceStates.pack(DatabaseBackupAgent::keyFolderId) ) );
|
||||
|
||||
if(v.present() && BinaryReader::fromStringRef<Version>(v.get(), Unversioned()) >= BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyFolderId], Unversioned()))
|
||||
return Void();
|
||||
|
||||
tr.set( Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keySourceTagName).pack(task->params[BackupAgentBase::keyTagName]), task->params[BackupAgentBase::keyConfigLogUid] );
|
||||
tr.set( sourceStates.pack(DatabaseBackupAgent::keyFolderId), task->params[DatabaseBackupAgent::keyFolderId] );
|
||||
tr.set( sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_BACKUP)));
|
||||
Key versionKey = logUidValue.withPrefix(destUidValue).withPrefix(backupLatestVersionsPrefix);
|
||||
srcTr2->set(versionKey, beginVersionKey);
|
||||
|
||||
srcTr2->set( Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keySourceTagName).pack(task->params[BackupAgentBase::keyTagName]), logUidValue );
|
||||
srcTr2->set( sourceStates.pack(DatabaseBackupAgent::keyFolderId), task->params[DatabaseBackupAgent::keyFolderId] );
|
||||
srcTr2->set( sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_BACKUP)));
|
||||
|
||||
state Key destPath = destUidValue.withPrefix(backupLogKeys.begin);
|
||||
// Start logging the mutations for the specified ranges of the tag
|
||||
for (auto &backupRange : backupRanges) {
|
||||
tr.set(logRangesEncodeKey(backupRange.begin, logUid), logRangesEncodeValue(backupRange.end, logUidDest));
|
||||
srcTr2->set(logRangesEncodeKey(backupRange.begin, BinaryReader::fromStringRef<UID>(destUidValue, Unversioned())), logRangesEncodeValue(backupRange.end, destPath));
|
||||
}
|
||||
|
||||
Void _ = wait(tr.commit());
|
||||
return Void();
|
||||
} catch(Error &e) {
|
||||
Void _ = wait(tr.onError(e));
|
||||
Void _ = wait(srcTr2->commit());
|
||||
break;
|
||||
} catch (Error &e) {
|
||||
Void _ = wait(srcTr2->onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
|
||||
state Subspace states = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keyStates).get(task->params[BackupAgentBase::keyConfigLogUid]);
|
||||
state Key logUidValue = task->params[BackupAgentBase::keyConfigLogUid];
|
||||
state Subspace states = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keyStates).get(logUidValue);
|
||||
state Subspace config = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keyConfig).get(logUidValue);
|
||||
|
||||
state Version beginVersion = BinaryReader::fromStringRef<Version>(task->params[BackupAgentBase::keyBeginVersion], Unversioned());
|
||||
state Standalone<VectorRef<KeyRangeRef>> backupRanges = BinaryReader::fromStringRef<Standalone<VectorRef<KeyRangeRef>>>(task->params[DatabaseBackupAgent::keyConfigBackupRanges], IncludeVersion());
|
||||
|
||||
TraceEvent("DBA_StartFullBackup").detail("beginVer", beginVersion);
|
||||
tr->set(task->params[BackupAgentBase::keyConfigLogUid].withPrefix(applyMutationsBeginRange.begin), BinaryWriter::toValue(beginVersion, Unversioned()));
|
||||
tr->set(task->params[BackupAgentBase::keyConfigLogUid].withPrefix(applyMutationsEndRange.begin), BinaryWriter::toValue(beginVersion, Unversioned()));
|
||||
tr->set(logUidValue.withPrefix(applyMutationsBeginRange.begin), BinaryWriter::toValue(beginVersion, Unversioned()));
|
||||
tr->set(logUidValue.withPrefix(applyMutationsEndRange.begin), BinaryWriter::toValue(beginVersion, Unversioned()));
|
||||
tr->set(states.pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_BACKUP)));
|
||||
|
||||
state Reference<TaskFuture> kvBackupRangeComplete = futureBucket->future(tr);
|
||||
@ -1132,7 +1216,7 @@ namespace dbBackup {
|
||||
Key _ = wait(FinishFullBackupTaskFunc::addTask(tr, taskBucket, task, TaskCompletionKey::noSignal(), kvBackupRangeComplete));
|
||||
|
||||
// Backup the logs which will create BackupLogRange tasks
|
||||
Key _ = wait(CopyLogsTaskFunc::addTask(tr, taskBucket, task, beginVersion, TaskCompletionKey::joinWith(kvBackupComplete)));
|
||||
Key _ = wait(CopyLogsTaskFunc::addTask(tr, taskBucket, task, 0, beginVersion, TaskCompletionKey::joinWith(kvBackupComplete)));
|
||||
|
||||
// After the Backup completes, clear the backup subspace and update the status
|
||||
Key _ = wait(BackupRestorableTaskFunc::addTask(tr, taskBucket, task, TaskCompletionKey::noSignal(), kvBackupComplete));
|
||||
@ -1492,7 +1576,8 @@ public:
|
||||
|
||||
ACTOR static Future<Void> abortBackup(DatabaseBackupAgent* backupAgent, Database cx, Key tagName, bool partial) {
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
state Key logUid;
|
||||
state Key logUidValue, destUidValue;
|
||||
state UID logUid, destUid;
|
||||
state Value backupUid;
|
||||
|
||||
loop {
|
||||
@ -1502,25 +1587,34 @@ public:
|
||||
tr->setOption(FDBTransactionOptions::COMMIT_ON_FIRST_PROXY);
|
||||
|
||||
UID _logUid = wait(backupAgent->getLogUid(tr, tagName));
|
||||
logUid = BinaryWriter::toValue(_logUid, Unversioned());
|
||||
logUid = _logUid;
|
||||
logUidValue = BinaryWriter::toValue(logUid, Unversioned());
|
||||
|
||||
int status = wait(backupAgent->getStateValue(tr, _logUid));
|
||||
state Future<int> statusFuture= backupAgent->getStateValue(tr, logUid);
|
||||
state Future<UID> destUidFuture = backupAgent->getDestUid(tr, logUid);
|
||||
Void _ = wait(success(statusFuture) && success(destUidFuture));
|
||||
|
||||
UID destUid = destUidFuture.get();
|
||||
if (destUid.isValid()) {
|
||||
destUidValue = BinaryWriter::toValue(destUid, Unversioned());
|
||||
}
|
||||
int status = statusFuture.get();
|
||||
if (!backupAgent->isRunnable((BackupAgentBase::enumState)status)) {
|
||||
throw backup_unneeded();
|
||||
}
|
||||
|
||||
Optional<Value> _backupUid = wait(tr->get(backupAgent->states.get(logUid).pack(DatabaseBackupAgent::keyFolderId)));
|
||||
Optional<Value> _backupUid = wait(tr->get(backupAgent->states.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId)));
|
||||
backupUid = _backupUid.get();
|
||||
|
||||
// Clearing the folder id will prevent future tasks from executing
|
||||
tr->clear(backupAgent->config.get(logUid).range());
|
||||
tr->clear(backupAgent->config.get(logUidValue).range());
|
||||
|
||||
// Clearing the end version of apply mutation cancels ongoing apply work
|
||||
tr->clear(logUid.withPrefix(applyMutationsEndRange.begin));
|
||||
tr->clear(logUidValue.withPrefix(applyMutationsEndRange.begin));
|
||||
|
||||
tr->clear(prefixRange(logUid.withPrefix(applyLogKeys.begin)));
|
||||
tr->clear(prefixRange(logUidValue.withPrefix(applyLogKeys.begin)));
|
||||
|
||||
tr->set(StringRef(backupAgent->states.get(logUid).pack(DatabaseBackupAgent::keyStateStatus)), StringRef(DatabaseBackupAgent::getStateText(BackupAgentBase::STATE_PARTIALLY_ABORTED)));
|
||||
tr->set(StringRef(backupAgent->states.get(logUidValue).pack(DatabaseBackupAgent::keyStateStatus)), StringRef(DatabaseBackupAgent::getStateText(BackupAgentBase::STATE_PARTIALLY_ABORTED)));
|
||||
|
||||
Void _ = wait(tr->commit());
|
||||
TraceEvent("DBA_Abort").detail("commitVersion", tr->getCommittedVersion());
|
||||
@ -1545,7 +1639,7 @@ public:
|
||||
tr->setOption(FDBTransactionOptions::COMMIT_ON_FIRST_PROXY);
|
||||
try {
|
||||
// Ensure that we're at a version higher than the data that we've written.
|
||||
Optional<Value> lastApplied = wait(tr->get(logUid.withPrefix(applyMutationsBeginRange.begin)));
|
||||
Optional<Value> lastApplied = wait(tr->get(logUidValue.withPrefix(applyMutationsBeginRange.begin)));
|
||||
if (lastApplied.present()) {
|
||||
Version current = tr->getReadVersion().get();
|
||||
Version applied = BinaryReader::fromStringRef<Version>(lastApplied.get(), Unversioned());
|
||||
@ -1576,23 +1670,37 @@ public:
|
||||
return Void();
|
||||
|
||||
state Reference<ReadYourWritesTransaction> srcTr(new ReadYourWritesTransaction(backupAgent->taskBucket->src));
|
||||
state Version beginVersion;
|
||||
state Version endVersion;
|
||||
state bool clearSrcDb = true;
|
||||
|
||||
loop {
|
||||
try {
|
||||
srcTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
srcTr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Optional<Value> v = wait( srcTr->get( backupAgent->sourceStates.get(logUid).pack(DatabaseBackupAgent::keyFolderId) ) );
|
||||
Optional<Value> v = wait( srcTr->get( backupAgent->sourceStates.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId) ) );
|
||||
|
||||
if(v.present() && BinaryReader::fromStringRef<Version>(v.get(), Unversioned()) > BinaryReader::fromStringRef<Version>(backupUid, Unversioned()))
|
||||
if(v.present() && BinaryReader::fromStringRef<Version>(v.get(), Unversioned()) > BinaryReader::fromStringRef<Version>(backupUid, Unversioned())) {
|
||||
clearSrcDb = false;
|
||||
break;
|
||||
}
|
||||
|
||||
srcTr->set( backupAgent->sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_ABORTED) ));
|
||||
srcTr->set( backupAgent->sourceStates.get(logUid).pack(DatabaseBackupAgent::keyFolderId), backupUid );
|
||||
Key latestVersionKey = logUidValue.withPrefix(destUidValue.withPrefix(backupLatestVersionsPrefix));
|
||||
|
||||
srcTr->clear(prefixRange(logUid.withPrefix(backupLogKeys.begin)));
|
||||
srcTr->clear(prefixRange(logUid.withPrefix(logRangesRange.begin)));
|
||||
Optional<Key> bVersion = wait(srcTr->get(latestVersionKey));
|
||||
if (bVersion.present()) {
|
||||
beginVersion = BinaryReader::fromStringRef<Version>(bVersion.get(), Unversioned());
|
||||
} else {
|
||||
clearSrcDb = false;
|
||||
break;
|
||||
}
|
||||
|
||||
srcTr->set( backupAgent->sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(DatabaseBackupAgent::getStateText(BackupAgentBase::STATE_PARTIALLY_ABORTED) ));
|
||||
srcTr->set( backupAgent->sourceStates.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId), backupUid );
|
||||
|
||||
Void _ = wait(srcTr->commit());
|
||||
endVersion = srcTr->getCommittedVersion() + 1;
|
||||
|
||||
break;
|
||||
}
|
||||
catch (Error &e) {
|
||||
@ -1600,18 +1708,22 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
if (clearSrcDb) {
|
||||
Void _ = wait(eraseLogData(backupAgent->taskBucket->src, logUidValue, destUidValue));
|
||||
}
|
||||
|
||||
tr = Reference<ReadYourWritesTransaction>(new ReadYourWritesTransaction(cx));
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
Optional<Value> v = wait(tr->get(StringRef(backupAgent->config.get(logUid).pack(DatabaseBackupAgent::keyFolderId))));
|
||||
Optional<Value> v = wait(tr->get(StringRef(backupAgent->config.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId))));
|
||||
if(v.present()) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
tr->set(StringRef(backupAgent->states.get(logUid).pack(DatabaseBackupAgent::keyStateStatus)), StringRef(DatabaseBackupAgent::getStateText(BackupAgentBase::STATE_ABORTED)));
|
||||
tr->set(StringRef(backupAgent->states.get(logUidValue).pack(DatabaseBackupAgent::keyStateStatus)), StringRef(DatabaseBackupAgent::getStateText(BackupAgentBase::STATE_ABORTED)));
|
||||
|
||||
Void _ = wait(tr->commit());
|
||||
|
||||
@ -1753,6 +1865,15 @@ public:
|
||||
return (!status.present()) ? DatabaseBackupAgent::STATE_NEVERRAN : BackupAgentBase::getState(status.get().toString());
|
||||
}
|
||||
|
||||
ACTOR static Future<UID> getDestUid(DatabaseBackupAgent* backupAgent, Reference<ReadYourWritesTransaction> tr, UID logUid) {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
state Key destUidKey = backupAgent->config.get(BinaryWriter::toValue(logUid, Unversioned())).pack(BackupAgentBase::destUid);
|
||||
Optional<Value> destUid = wait(tr->get(destUidKey));
|
||||
|
||||
return (destUid.present()) ? BinaryReader::fromStringRef<UID>(destUid.get(), Unversioned()) : UID();
|
||||
}
|
||||
|
||||
ACTOR static Future<UID> getLogUid(DatabaseBackupAgent* backupAgent, Reference<ReadYourWritesTransaction> tr, Key tagName) {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
@ -1790,6 +1911,10 @@ Future<int> DatabaseBackupAgent::getStateValue(Reference<ReadYourWritesTransacti
|
||||
return DatabaseBackupAgentImpl::getStateValue(this, tr, logUid);
|
||||
}
|
||||
|
||||
Future<UID> DatabaseBackupAgent::getDestUid(Reference<ReadYourWritesTransaction> tr, UID logUid) {
|
||||
return DatabaseBackupAgentImpl::getDestUid(this, tr, logUid);
|
||||
}
|
||||
|
||||
Future<UID> DatabaseBackupAgent::getLogUid(Reference<ReadYourWritesTransaction> tr, Key tagName) {
|
||||
return DatabaseBackupAgentImpl::getLogUid(this, tr, tagName);
|
||||
}
|
||||
|
@ -807,7 +807,8 @@ namespace fileBackup {
|
||||
BackupConfig config,
|
||||
Reference<TaskFuture> waitFor = Reference<TaskFuture>(),
|
||||
std::function<void(Reference<Task>)> setupTaskFn = NOP_SETUP_TASK_FN,
|
||||
int priority = 0) {
|
||||
int priority = 0,
|
||||
bool setValidation = true) {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
@ -815,7 +816,7 @@ namespace fileBackup {
|
||||
state Reference<Task> task(new Task(name, version, doneKey, priority));
|
||||
|
||||
// Bind backup config to new task
|
||||
Void _ = wait(config.toTask(tr, task));
|
||||
Void _ = wait(config.toTask(tr, task, setValidation));
|
||||
|
||||
// Set task specific params
|
||||
setupTaskFn(task);
|
||||
@ -1680,7 +1681,8 @@ namespace fileBackup {
|
||||
}
|
||||
}
|
||||
|
||||
state Standalone<VectorRef<KeyRangeRef>> ranges = getLogRanges(beginVersion, endVersion, config.getUidAsKey());
|
||||
Key destUidValue = wait(config.destUidValue().getOrThrow(tr));
|
||||
state Standalone<VectorRef<KeyRangeRef>> ranges = getLogRanges(beginVersion, endVersion, destUidValue);
|
||||
if (ranges.size() > CLIENT_KNOBS->BACKUP_MAX_LOG_RANGES) {
|
||||
Params.addBackupLogRangeTasks().set(task, true);
|
||||
return Void();
|
||||
@ -1804,12 +1806,6 @@ namespace fileBackup {
|
||||
Void _ = wait(taskFuture->set(tr, taskBucket));
|
||||
}
|
||||
|
||||
if(endVersion > beginVersion) {
|
||||
Standalone<VectorRef<KeyRangeRef>> ranges = getLogRanges(beginVersion, endVersion, config.getUidAsKey());
|
||||
for (auto & rng : ranges)
|
||||
tr->clear(rng);
|
||||
}
|
||||
|
||||
Void _ = wait(taskBucket->finish(tr, task));
|
||||
return Void();
|
||||
}
|
||||
@ -1819,11 +1815,85 @@ namespace fileBackup {
|
||||
const uint32_t BackupLogRangeTaskFunc::version = 1;
|
||||
REGISTER_TASKFUNC(BackupLogRangeTaskFunc);
|
||||
|
||||
struct EraseLogRangeTaskFunc : BackupTaskFuncBase {
|
||||
static StringRef name;
|
||||
static const uint32_t version;
|
||||
StringRef getName() const { return name; };
|
||||
|
||||
static struct {
|
||||
static TaskParam<Version> beginVersion() {
|
||||
return LiteralStringRef(__FUNCTION__);
|
||||
}
|
||||
static TaskParam<Version> endVersion() {
|
||||
return LiteralStringRef(__FUNCTION__);
|
||||
}
|
||||
static TaskParam<Key> destUidValue() {
|
||||
return LiteralStringRef(__FUNCTION__);
|
||||
}
|
||||
} Params;
|
||||
|
||||
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
|
||||
state Reference<FlowLock> lock(new FlowLock(CLIENT_KNOBS->BACKUP_LOCK_BYTES));
|
||||
Void _ = wait(checkTaskVersion(cx, task, EraseLogRangeTaskFunc::name, EraseLogRangeTaskFunc::version));
|
||||
|
||||
state Version beginVersion = Params.beginVersion().get(task);
|
||||
state Version endVersion = Params.endVersion().get(task);
|
||||
state Key destUidValue = Params.destUidValue().get(task);
|
||||
|
||||
state BackupConfig config(task);
|
||||
state Key logUidValue = config.getUidAsKey();
|
||||
|
||||
if (beginVersion == 0) {
|
||||
Void _ = wait(eraseLogData(cx, logUidValue, destUidValue));
|
||||
} else {
|
||||
Void _ = wait(eraseLogData(cx, logUidValue, destUidValue, Optional<Version>(beginVersion), Optional<Version>(endVersion)));
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, UID logUid, TaskCompletionKey completionKey, Key destUidValue, Version beginVersion = 0, Version endVersion = 0, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
|
||||
Key key = wait(addBackupTask(EraseLogRangeTaskFunc::name,
|
||||
EraseLogRangeTaskFunc::version,
|
||||
tr, taskBucket, completionKey,
|
||||
BackupConfig(logUid),
|
||||
waitFor,
|
||||
[=](Reference<Task> task) {
|
||||
Params.beginVersion().set(task, beginVersion);
|
||||
Params.endVersion().set(task, endVersion);
|
||||
Params.destUidValue().set(task, destUidValue);
|
||||
},
|
||||
0, false));
|
||||
|
||||
return key;
|
||||
}
|
||||
|
||||
|
||||
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
|
||||
state Reference<TaskFuture> taskFuture = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]);
|
||||
|
||||
Void _ = wait(taskFuture->set(tr, taskBucket) && taskBucket->finish(tr, task));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _execute(cx, tb, fb, task); };
|
||||
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
|
||||
};
|
||||
StringRef EraseLogRangeTaskFunc::name = LiteralStringRef("file_backup_erase_logs");
|
||||
const uint32_t EraseLogRangeTaskFunc::version = 1;
|
||||
REGISTER_TASKFUNC(EraseLogRangeTaskFunc);
|
||||
|
||||
|
||||
|
||||
struct BackupLogsDispatchTask : BackupTaskFuncBase {
|
||||
static StringRef name;
|
||||
static const uint32_t version;
|
||||
|
||||
static struct {
|
||||
static TaskParam<Version> prevBeginVersion() {
|
||||
return LiteralStringRef(__FUNCTION__);
|
||||
}
|
||||
static TaskParam<Version> beginVersion() {
|
||||
return LiteralStringRef(__FUNCTION__);
|
||||
}
|
||||
@ -1836,6 +1906,7 @@ namespace fileBackup {
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
state Reference<TaskFuture> onDone = task->getDoneFuture(futureBucket);
|
||||
state Version prevBeginVersion = Params.prevBeginVersion().get(task);
|
||||
state Version beginVersion = Params.beginVersion().get(task);
|
||||
state BackupConfig config(task);
|
||||
config.latestLogEndVersion().set(tr, beginVersion);
|
||||
@ -1883,7 +1954,13 @@ namespace fileBackup {
|
||||
|
||||
// Add the next logs dispatch task which will run after this batch is done
|
||||
Key _ = wait(BackupLogRangeTaskFunc::addTask(tr, taskBucket, task, beginVersion, endVersion, TaskCompletionKey::joinWith(logDispatchBatchFuture)));
|
||||
Key _ = wait(BackupLogsDispatchTask::addTask(tr, taskBucket, task, endVersion, TaskCompletionKey::signal(onDone), logDispatchBatchFuture));
|
||||
|
||||
// Do not erase at the first time
|
||||
if (prevBeginVersion > 0) {
|
||||
state Key destUidValue = wait(config.destUidValue().getOrThrow(tr));
|
||||
Key _ = wait(EraseLogRangeTaskFunc::addTask(tr, taskBucket, config.getUid(), TaskCompletionKey::joinWith(logDispatchBatchFuture), destUidValue, prevBeginVersion, beginVersion));
|
||||
}
|
||||
Key _ = wait(BackupLogsDispatchTask::addTask(tr, taskBucket, task, beginVersion, endVersion, TaskCompletionKey::signal(onDone), logDispatchBatchFuture));
|
||||
|
||||
Void _ = wait(taskBucket->finish(tr, task));
|
||||
|
||||
@ -1896,13 +1973,14 @@ namespace fileBackup {
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, Version beginVersion, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
|
||||
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, Version prevBeginVersion, Version beginVersion, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
|
||||
Key key = wait(addBackupTask(BackupLogsDispatchTask::name,
|
||||
BackupLogsDispatchTask::version,
|
||||
tr, taskBucket, completionKey,
|
||||
BackupConfig(parentTask),
|
||||
waitFor,
|
||||
[=](Reference<Task> task) {
|
||||
Params.prevBeginVersion().set(task, prevBeginVersion);
|
||||
Params.beginVersion().set(task, beginVersion);
|
||||
}));
|
||||
return key;
|
||||
@ -1929,12 +2007,10 @@ namespace fileBackup {
|
||||
state BackupConfig backup(task);
|
||||
state UID uid = backup.getUid();
|
||||
|
||||
state Key configPath = uidPrefixKey(logRangesRange.begin, uid);
|
||||
state Key logsPath = uidPrefixKey(backupLogKeys.begin, uid);
|
||||
|
||||
tr->setOption(FDBTransactionOptions::COMMIT_ON_FIRST_PROXY);
|
||||
tr->clear(KeyRangeRef(configPath, strinc(configPath)));
|
||||
tr->clear(KeyRangeRef(logsPath, strinc(logsPath)));
|
||||
state Key destUidValue = wait(backup.destUidValue().getOrThrow(tr));
|
||||
Key _ = wait(EraseLogRangeTaskFunc::addTask(tr, taskBucket, backup.getUid(), TaskCompletionKey::noSignal(), destUidValue));
|
||||
|
||||
backup.stateEnum().set(tr, EBackupState::STATE_COMPLETED);
|
||||
|
||||
Void _ = wait(taskBucket->finish(tr, task));
|
||||
@ -2153,11 +2229,15 @@ namespace fileBackup {
|
||||
state BackupConfig config(task);
|
||||
state Version beginVersion = Params.beginVersion().get(task);
|
||||
|
||||
state std::vector<KeyRange> backupRanges = wait(config.backupRanges().getOrThrow(tr));
|
||||
state Future<std::vector<KeyRange>> backupRangesFuture = config.backupRanges().getOrThrow(tr);
|
||||
state Future<Key> destUidValueFuture = config.destUidValue().getOrThrow(tr);
|
||||
Void _ = wait(success(backupRangesFuture) && success(destUidValueFuture));
|
||||
std::vector<KeyRange> backupRanges = backupRangesFuture.get();
|
||||
Key destUidValue = destUidValueFuture.get();
|
||||
|
||||
// Start logging the mutations for the specified ranges of the tag
|
||||
for (auto &backupRange : backupRanges) {
|
||||
config.startMutationLogs(tr, backupRange);
|
||||
config.startMutationLogs(tr, backupRange, destUidValue);
|
||||
}
|
||||
|
||||
config.stateEnum().set(tr, EBackupState::STATE_BACKUP);
|
||||
@ -2168,7 +2248,7 @@ namespace fileBackup {
|
||||
// The initial snapshot has a desired duration of 0, meaning go as fast as possible.
|
||||
Void _ = wait(config.initNewSnapshot(tr, 0));
|
||||
Key _ = wait(BackupSnapshotDispatchTask::addTask(tr, taskBucket, task, TaskCompletionKey::joinWith(backupFinished)));
|
||||
Key _ = wait(BackupLogsDispatchTask::addTask(tr, taskBucket, task, beginVersion, TaskCompletionKey::joinWith(backupFinished)));
|
||||
Key _ = wait(BackupLogsDispatchTask::addTask(tr, taskBucket, task, 0, beginVersion, TaskCompletionKey::joinWith(backupFinished)));
|
||||
|
||||
// If a clean stop is requested, the log and snapshot tasks will quit after the backup is restorable, then the following
|
||||
// task will clean up and set the completed state.
|
||||
@ -3307,6 +3387,21 @@ public:
|
||||
|
||||
config.clear(tr);
|
||||
|
||||
state Key destUidValue(BinaryWriter::toValue(uid, Unversioned()));
|
||||
if (normalizedRanges.size() == 1) {
|
||||
state Key destUidLookupPath = BinaryWriter::toValue(normalizedRanges[0], IncludeVersion()).withPrefix(destUidLookupPrefix);
|
||||
Optional<Key> existingDestUidValue = wait(tr->get(destUidLookupPath));
|
||||
if (existingDestUidValue.present()) {
|
||||
destUidValue = existingDestUidValue.get();
|
||||
} else {
|
||||
destUidValue = BinaryWriter::toValue(g_random->randomUniqueID(), Unversioned());
|
||||
tr->set(destUidLookupPath, destUidValue);
|
||||
}
|
||||
}
|
||||
Version initVersion = 1;
|
||||
tr->set(config.getUidAsKey().withPrefix(destUidValue).withPrefix(backupLatestVersionsPrefix), BinaryWriter::toValue<Version>(initVersion, Unversioned()));
|
||||
config.destUidValue().set(tr, destUidValue);
|
||||
|
||||
// Point the tag to this new uid
|
||||
tag.set(tr, {uid, false});
|
||||
|
||||
@ -3463,12 +3558,12 @@ public:
|
||||
// Cancel all backup tasks through tag
|
||||
Void _ = wait(tag.cancel(tr));
|
||||
|
||||
Key configPath = uidPrefixKey(logRangesRange.begin, config.getUid());
|
||||
Key logsPath = uidPrefixKey(backupLogKeys.begin, config.getUid());
|
||||
|
||||
tr->setOption(FDBTransactionOptions::COMMIT_ON_FIRST_PROXY);
|
||||
tr->clear(KeyRangeRef(configPath, strinc(configPath)));
|
||||
tr->clear(KeyRangeRef(logsPath, strinc(logsPath)));
|
||||
|
||||
state Key destUidValue = wait(config.destUidValue().getOrThrow(tr));
|
||||
state Version endVersion = wait(tr->getReadVersion());
|
||||
|
||||
Key _ = wait(fileBackup::EraseLogRangeTaskFunc::addTask(tr, backupAgent->taskBucket, config.getUid(), TaskCompletionKey::noSignal(), destUidValue));
|
||||
|
||||
config.stateEnum().set(tr, EBackupState::STATE_COMPLETED);
|
||||
|
||||
@ -3494,6 +3589,7 @@ public:
|
||||
state UidAndAbortedFlagT current = wait(tag.getOrThrow(tr, false, backup_unneeded()));
|
||||
|
||||
state BackupConfig config(current.first);
|
||||
state Key destUidValue = wait(config.destUidValue().getOrThrow(tr));
|
||||
EBackupState status = wait(config.stateEnum().getD(tr, EBackupState::STATE_NEVERRAN));
|
||||
|
||||
if (!backupAgent->isRunnable((BackupAgentBase::enumState)status)) {
|
||||
@ -3507,11 +3603,7 @@ public:
|
||||
// Cancel backup task through tag
|
||||
Void _ = wait(tag.cancel(tr));
|
||||
|
||||
Key configPath = uidPrefixKey(logRangesRange.begin, config.getUid());
|
||||
Key logsPath = uidPrefixKey(backupLogKeys.begin, config.getUid());
|
||||
|
||||
tr->clear(KeyRangeRef(configPath, strinc(configPath)));
|
||||
tr->clear(KeyRangeRef(logsPath, strinc(logsPath)));
|
||||
Key _ = wait(fileBackup::EraseLogRangeTaskFunc::addTask(tr, backupAgent->taskBucket, config.getUid(), TaskCompletionKey::noSignal(), destUidValue));
|
||||
|
||||
config.stateEnum().set(tr, EBackupState::STATE_ABORTED);
|
||||
|
||||
|
@ -129,6 +129,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
|
||||
init( BACKUP_ERROR_DELAY, 10.0 );
|
||||
init( BACKUP_STATUS_DELAY, 40.0 );
|
||||
init( BACKUP_STATUS_JITTER, 0.05 );
|
||||
init( CLEAR_LOG_RANGE_COUNT, 1500); // transaction size / (size of '\xff\x02/blog/' + size of UID + size of hash result) = 200,000 / (8 + 16 + 8)
|
||||
|
||||
// Configuration
|
||||
init( DEFAULT_AUTO_PROXIES, 3 );
|
||||
|
@ -115,6 +115,7 @@ public:
|
||||
int BACKUP_COPY_TASKS;
|
||||
int BACKUP_BLOCK_SIZE;
|
||||
int BACKUP_TASKS_PER_AGENT;
|
||||
int CLEAR_LOG_RANGE_COUNT;
|
||||
int SIM_BACKUP_TASKS_PER_AGENT;
|
||||
int BACKUP_RANGEFILE_BLOCK_SIZE;
|
||||
int BACKUP_LOGFILE_BLOCK_SIZE;
|
||||
|
@ -342,6 +342,11 @@ const KeyRangeRef fileBackupPrefixRange(LiteralStringRef("\xff\x02/backup-agent/
|
||||
// DR Agent configuration constant variables
|
||||
const KeyRangeRef databaseBackupPrefixRange(LiteralStringRef("\xff\x02/db-backup-agent/"), LiteralStringRef("\xff\x02/db-backup-agent0"));
|
||||
|
||||
// \xff\x02/sharedLogRangesConfig/destUidLookup/[keyRange]
|
||||
const KeyRef destUidLookupPrefix = LiteralStringRef("\xff\x02/sharedLogRangesConfig/destUidLookup/");
|
||||
// \xff\x02/sharedLogRangesConfig/backuplatestVersions/[destUid]/[logUid]
|
||||
const KeyRef backupLatestVersionsPrefix = LiteralStringRef("\xff\x02/sharedLogRangesConfig/backupLatestVersions/");
|
||||
|
||||
// Returns the encoded key comprised of begin key and log uid
|
||||
Key logRangesEncodeKey(KeyRef keyBegin, UID logUid) {
|
||||
return keyBegin.withPrefix(uidPrefixKey(logRangesRange.begin, logUid));
|
||||
@ -361,10 +366,10 @@ KeyRef logRangesDecodeKey(KeyRef key, UID* logUid) {
|
||||
return key.substr(logRangesRange.begin.size() + sizeof(UID));
|
||||
}
|
||||
|
||||
// Returns the encoded key value comprised of the end key and destination prefix
|
||||
Key logRangesEncodeValue(KeyRef keyEnd, KeyRef destKeyPrefix) {
|
||||
// Returns the encoded key value comprised of the end key and destination path
|
||||
Key logRangesEncodeValue(KeyRef keyEnd, KeyRef destPath) {
|
||||
BinaryWriter wr(IncludeVersion());
|
||||
wr << std::make_pair(keyEnd, destKeyPrefix);
|
||||
wr << std::make_pair(keyEnd, destPath);
|
||||
return wr.toStringRef();
|
||||
}
|
||||
|
||||
|
@ -149,7 +149,7 @@ KeyRef logRangesDecodeKey(KeyRef key, UID* logUid = NULL);
|
||||
Key logRangesDecodeValue(KeyRef keyValue, Key* destKeyPrefix = NULL);
|
||||
|
||||
// Returns the encoded key value comprised of the end key and destination prefix
|
||||
Key logRangesEncodeValue(KeyRef keyEnd, KeyRef destKeyPrefix);
|
||||
Key logRangesEncodeValue(KeyRef keyEnd, KeyRef destPath);
|
||||
|
||||
// Returns a key prefixed with the specified key with
|
||||
// the given uid encoded at the end
|
||||
@ -202,6 +202,9 @@ extern const KeyRangeRef fileRestorePrefixRange;
|
||||
// Key range reserved by database backup agent to storing configuration and state information
|
||||
extern const KeyRangeRef databaseBackupPrefixRange;
|
||||
|
||||
extern const KeyRef destUidLookupPrefix;
|
||||
extern const KeyRef backupLatestVersionsPrefix;
|
||||
|
||||
// Key range reserved by backup agent to storing mutations
|
||||
extern const KeyRangeRef backupLogKeys;
|
||||
extern const KeyRangeRef applyLogKeys;
|
||||
|
@ -34,8 +34,7 @@ enum ClogMode { ClogDefault, ClogAll, ClogSend, ClogReceive };
|
||||
|
||||
class ISimulator : public INetwork {
|
||||
public:
|
||||
ISimulator() : killedMachines(0), killableMachines(0), machinesNeededForProgress(3), neededDatacenters(1), killableDatacenters(0), killedDatacenters(0), maxCoordinatorsInDatacenter(0), desiredCoordinators(1), processesPerMachine(0), isStopped(false), lastConnectionFailure(0), connectionFailuresDisableDuration(0), speedUpSimulation(false), allSwapsDisabled(false), backupAgents(WaitForType), extraDB(NULL) {}
|
||||
|
||||
ISimulator() : killedMachines(0), killableMachines(0), machinesNeededForProgress(3), neededDatacenters(1), killableDatacenters(0), killedDatacenters(0), maxCoordinatorsInDatacenter(0), desiredCoordinators(1), processesPerMachine(0), isStopped(false), lastConnectionFailure(0), connectionFailuresDisableDuration(0), speedUpSimulation(false), allSwapsDisabled(false), backupAgents(WaitForType), drAgents(WaitForType), extraDB(NULL) {}
|
||||
// Order matters!
|
||||
enum KillType { None, KillInstantly, InjectFaults, RebootAndDelete, Reboot, RebootProcessAndDelete, RebootProcess };
|
||||
|
||||
@ -297,6 +296,7 @@ public:
|
||||
double connectionFailuresDisableDuration;
|
||||
bool speedUpSimulation;
|
||||
BackupAgentType backupAgents;
|
||||
BackupAgentType drAgents;
|
||||
|
||||
virtual flowGlobalType global(int id) { return getCurrentProcess()->global(id); };
|
||||
virtual void setGlobal(size_t id, flowGlobalType v) { getCurrentProcess()->setGlobal(id,v); };
|
||||
|
@ -146,7 +146,19 @@ ACTOR Future<Void> runBackup( Reference<ClusterConnectionFile> connFile ) {
|
||||
it.cancel();
|
||||
}
|
||||
}
|
||||
else if (g_simulator.backupAgents == ISimulator::BackupToDB) {
|
||||
|
||||
Void _= wait(Future<Void>(Never()));
|
||||
throw internal_error();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> runDr( Reference<ClusterConnectionFile> connFile ) {
|
||||
state std::vector<Future<Void>> agentFutures;
|
||||
|
||||
while (g_simulator.drAgents == ISimulator::WaitForType) {
|
||||
Void _ = wait(delay(1.0));
|
||||
}
|
||||
|
||||
if (g_simulator.drAgents == ISimulator::BackupToDB) {
|
||||
Reference<Cluster> cluster = Cluster::createCluster(connFile, -1);
|
||||
Database cx = cluster->createDatabase(LiteralStringRef("DB")).get();
|
||||
|
||||
@ -154,7 +166,7 @@ ACTOR Future<Void> runBackup( Reference<ClusterConnectionFile> connFile ) {
|
||||
Reference<Cluster> extraCluster = Cluster::createCluster(extraFile, -1);
|
||||
state Database extraDB = extraCluster->createDatabase(LiteralStringRef("DB")).get();
|
||||
|
||||
TraceEvent("StartingBackupAgents").detail("connFile", connFile->getConnectionString().toString()).detail("extraString", extraFile->getConnectionString().toString());
|
||||
TraceEvent("StartingDrAgents").detail("connFile", connFile->getConnectionString().toString()).detail("extraString", extraFile->getConnectionString().toString());
|
||||
|
||||
state DatabaseBackupAgent dbAgent = DatabaseBackupAgent(cx);
|
||||
state DatabaseBackupAgent extraAgent = DatabaseBackupAgent(extraDB);
|
||||
@ -165,11 +177,11 @@ ACTOR Future<Void> runBackup( Reference<ClusterConnectionFile> connFile ) {
|
||||
agentFutures.push_back(extraAgent.run(cx, &dr1PollDelay, CLIENT_KNOBS->SIM_BACKUP_TASKS_PER_AGENT));
|
||||
agentFutures.push_back(dbAgent.run(extraDB, &dr2PollDelay, CLIENT_KNOBS->SIM_BACKUP_TASKS_PER_AGENT));
|
||||
|
||||
while (g_simulator.backupAgents == ISimulator::BackupToDB) {
|
||||
while (g_simulator.drAgents == ISimulator::BackupToDB) {
|
||||
Void _ = wait(delay(1.0));
|
||||
}
|
||||
|
||||
TraceEvent("StoppingBackupAgents");
|
||||
TraceEvent("StoppingDrAgents");
|
||||
|
||||
for(auto it : agentFutures) {
|
||||
it.cancel();
|
||||
@ -243,8 +255,9 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(
|
||||
Future<Void> listen = FlowTransport::transport().bind( n, n );
|
||||
Future<Void> fd = fdbd( connFile, localities, processClass, *dataFolder, *coordFolder, 500e6, "", "");
|
||||
Future<Void> backup = runBackupAgents ? runBackup(connFile) : Future<Void>(Never());
|
||||
Future<Void> dr = runBackupAgents ? runDr(connFile) : Future<Void>(Never());
|
||||
|
||||
Void _ = wait(listen || fd || success(onShutdown) || backup);
|
||||
Void _ = wait(listen || fd || success(onShutdown) || backup || dr);
|
||||
} catch (Error& e) {
|
||||
// If in simulation, if we make it here with an error other than io_timeout but enASIOTimedOut is set then somewhere an io_timeout was converted to a different error.
|
||||
if(g_network->isSimulated() && e.code() != error_code_io_timeout && (bool)g_network->global(INetwork::enASIOTimedOut))
|
||||
|
@ -943,13 +943,17 @@ vector<TestSpec> readTests( ifstream& ifs ) {
|
||||
g_simulator.connectionFailuresDisableDuration = spec.simConnectionFailuresDisableDuration;
|
||||
TraceEvent("TestParserTest").detail("ParsedSimConnectionFailuresDisableDuration", spec.simConnectionFailuresDisableDuration);
|
||||
} else if( attrib == "simBackupAgents" ) {
|
||||
if (value == "BackupToFile")
|
||||
if (value == "BackupToFile" || value == "BackupToFileAndDB")
|
||||
spec.simBackupAgents = ISimulator::BackupToFile;
|
||||
else if (value == "BackupToDB")
|
||||
spec.simBackupAgents = ISimulator::BackupToDB;
|
||||
else
|
||||
spec.simBackupAgents = ISimulator::NoBackupAgents;
|
||||
TraceEvent("TestParserTest").detail("ParsedSimBackupAgents", spec.simBackupAgents);
|
||||
|
||||
if (value == "BackupToDB" || value == "BackupToFileAndDB")
|
||||
spec.simDrAgents = ISimulator::BackupToDB;
|
||||
else
|
||||
spec.simDrAgents = ISimulator::NoBackupAgents;
|
||||
TraceEvent("TestParserTest").detail("ParsedSimDrAgents", spec.simDrAgents);
|
||||
} else if( attrib == "extraDB" ) {
|
||||
TraceEvent("TestParserTest").detail("ParsedExtraDB", "");
|
||||
} else if( attrib == "minimumReplication" ) {
|
||||
@ -1004,6 +1008,7 @@ ACTOR Future<Void> runTests( Reference<AsyncVar<Optional<struct ClusterControlle
|
||||
state double startDelay = 0.0;
|
||||
state double databasePingDelay = 1e9;
|
||||
state ISimulator::BackupAgentType simBackupAgents = ISimulator::NoBackupAgents;
|
||||
state ISimulator::BackupAgentType simDrAgents = ISimulator::NoBackupAgents;
|
||||
if (tests.empty()) useDB = true;
|
||||
for( auto iter = tests.begin(); iter != tests.end(); ++iter ) {
|
||||
if( iter->useDB ) useDB = true;
|
||||
@ -1012,10 +1017,15 @@ ACTOR Future<Void> runTests( Reference<AsyncVar<Optional<struct ClusterControlle
|
||||
startDelay = std::max( startDelay, iter->startDelay );
|
||||
databasePingDelay = std::min( databasePingDelay, iter->databasePingDelay );
|
||||
if (iter->simBackupAgents != ISimulator::NoBackupAgents) simBackupAgents = iter->simBackupAgents;
|
||||
|
||||
if (iter->simDrAgents != ISimulator::NoBackupAgents) {
|
||||
simDrAgents = iter->simDrAgents;
|
||||
}
|
||||
}
|
||||
|
||||
if (g_network->isSimulated()) {
|
||||
g_simulator.backupAgents = simBackupAgents;
|
||||
g_simulator.drAgents = simDrAgents;
|
||||
}
|
||||
|
||||
// turn off the database ping functionality if the suite of tests are not going to be using the database
|
||||
|
@ -176,8 +176,8 @@ struct AtomicSwitchoverWorkload : TestWorkload {
|
||||
TraceEvent("AS_Done");
|
||||
|
||||
// SOMEDAY: Remove after backup agents can exist quiescently
|
||||
if (g_simulator.backupAgents == ISimulator::BackupToDB) {
|
||||
g_simulator.backupAgents = ISimulator::NoBackupAgents;
|
||||
if (g_simulator.drAgents == ISimulator::BackupToDB) {
|
||||
g_simulator.drAgents = ISimulator::NoBackupAgents;
|
||||
}
|
||||
|
||||
return Void();
|
||||
|
@ -37,6 +37,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
||||
static int backupAgentRequests;
|
||||
bool locked;
|
||||
bool allowPauses;
|
||||
bool shareLogRange;
|
||||
|
||||
BackupAndRestoreCorrectnessWorkload(WorkloadContext const& wcx)
|
||||
: TestWorkload(wcx) {
|
||||
@ -53,12 +54,19 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
||||
differentialBackup ? g_random->random01() * (restoreAfter - std::max(abortAndRestartAfter,backupAfter)) + std::max(abortAndRestartAfter,backupAfter) : 0.0);
|
||||
agentRequest = getOption(options, LiteralStringRef("simBackupAgents"), true);
|
||||
allowPauses = getOption(options, LiteralStringRef("allowPauses"), true);
|
||||
shareLogRange = getOption(options, LiteralStringRef("shareLogRange"), false);
|
||||
|
||||
KeyRef beginRange;
|
||||
KeyRef endRange;
|
||||
UID randomID = g_nondeterministic_random->randomUniqueID();
|
||||
|
||||
if(backupRangesCount <= 0) {
|
||||
if (shareLogRange) {
|
||||
bool beforePrefix = sharedRandomNumber & 1;
|
||||
if (beforePrefix)
|
||||
backupRanges.push_back_deep(backupRanges.arena(), KeyRangeRef(normalKeys.begin, LiteralStringRef("\xfe\xff\xfe")));
|
||||
else
|
||||
backupRanges.push_back_deep(backupRanges.arena(), KeyRangeRef(strinc(LiteralStringRef("\x00\x00\x01")), normalKeys.end));
|
||||
} else if (backupRangesCount <= 0) {
|
||||
backupRanges.push_back_deep(backupRanges.arena(), normalKeys);
|
||||
} else {
|
||||
// Add backup ranges
|
||||
@ -342,6 +350,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
||||
state KeyBackedTag keyBackedTag = makeBackupTag(self->backupTag.toString());
|
||||
UidAndAbortedFlagT uidFlag = wait(keyBackedTag.getOrThrow(cx));
|
||||
state UID logUid = uidFlag.first;
|
||||
state Key destUidValue = wait(BackupConfig(logUid).destUidValue().getD(cx));
|
||||
state Reference<IBackupContainer> lastBackupContainer = wait(BackupConfig(logUid).backupContainer().getD(cx));
|
||||
|
||||
// Occasionally start yet another backup that might still be running when we restore
|
||||
@ -431,8 +440,10 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
||||
}
|
||||
}
|
||||
|
||||
state Key backupAgentKey = uidPrefixKey(logRangesRange.begin, logUid);
|
||||
state Key backupLogValuesKey = uidPrefixKey(backupLogKeys.begin, logUid);
|
||||
state Key backupAgentKey = uidPrefixKey(logRangesRange.begin, logUid);
|
||||
state Key backupLogValuesKey = destUidValue.withPrefix(backupLogKeys.begin);
|
||||
state Key backupLatestVersionsPath = destUidValue.withPrefix(backupLatestVersionsPrefix);
|
||||
state Key backupLatestVersionsKey = uidPrefixKey(backupLatestVersionsPath, logUid);
|
||||
state int displaySystemKeys = 0;
|
||||
|
||||
// Ensure that there is no left over key within the backup subspace
|
||||
@ -444,39 +455,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
||||
Standalone<RangeResultRef> agentValues = wait(tr->getRange(KeyRange(KeyRangeRef(backupAgentKey, strinc(backupAgentKey))), 100));
|
||||
|
||||
// Error if the system keyspace for the backup tag is not empty
|
||||
if (agentValues.size() > 0) {
|
||||
displaySystemKeys ++;
|
||||
printf("BackupCorrectnessLeftOverMutationKeys: (%d) %s\n", agentValues.size(), printable(backupAgentKey).c_str());
|
||||
TraceEvent(SevError, "BackupCorrectnessLeftOverMutationKeys", randomID).detail("backupTag", printable(self->backupTag))
|
||||
.detail("LeftOverKeys", agentValues.size()).detail("keySpace", printable(backupAgentKey));
|
||||
for (auto & s : agentValues) {
|
||||
TraceEvent("BARW_LeftOverKey", randomID).detail("key", printable(StringRef(s.key.toString()))).detail("value", printable(StringRef(s.value.toString())));
|
||||
printf(" Key: %-50s Value: %s\n", printable(StringRef(s.key.toString())).c_str(), printable(StringRef(s.value.toString())).c_str());
|
||||
}
|
||||
}
|
||||
else {
|
||||
printf("No left over backup agent configuration keys\n");
|
||||
}
|
||||
|
||||
Standalone<RangeResultRef> logValues = wait(tr->getRange(KeyRange(KeyRangeRef(backupLogValuesKey, strinc(backupLogValuesKey))), 100));
|
||||
|
||||
// Error if the log/mutation keyspace for the backup tag is not empty
|
||||
if (logValues.size() > 0) {
|
||||
displaySystemKeys ++;
|
||||
printf("BackupCorrectnessLeftOverLogKeys: (%d) %s\n", logValues.size(), printable(backupLogValuesKey).c_str());
|
||||
TraceEvent(SevError, "BackupCorrectnessLeftOverLogKeys", randomID).detail("backupTag", printable(self->backupTag))
|
||||
.detail("LeftOverKeys", logValues.size()).detail("keySpace", printable(backupLogValuesKey));
|
||||
for (auto & s : logValues) {
|
||||
TraceEvent("BARW_LeftOverKey", randomID).detail("key", printable(StringRef(s.key.toString()))).detail("value", printable(StringRef(s.value.toString())));
|
||||
printf(" Key: %-50s Value: %s\n", printable(StringRef(s.key.toString())).c_str(), printable(StringRef(s.value.toString())).c_str());
|
||||
}
|
||||
}
|
||||
else {
|
||||
printf("No left over backup log keys\n");
|
||||
}
|
||||
|
||||
// Check the left over tasks
|
||||
// We have to wait for the list to empty since an abort and get status
|
||||
@ -514,6 +493,48 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
||||
printf("BackupCorrectnessLeftOverLogTasks: %ld\n", (long) taskCount);
|
||||
}
|
||||
|
||||
|
||||
|
||||
Standalone<RangeResultRef> agentValues = wait(tr->getRange(KeyRange(KeyRangeRef(backupAgentKey, strinc(backupAgentKey))), 100));
|
||||
|
||||
// Error if the system keyspace for the backup tag is not empty
|
||||
if (agentValues.size() > 0) {
|
||||
displaySystemKeys ++;
|
||||
printf("BackupCorrectnessLeftOverMutationKeys: (%d) %s\n", agentValues.size(), printable(backupAgentKey).c_str());
|
||||
TraceEvent(SevError, "BackupCorrectnessLeftOverMutationKeys", randomID).detail("backupTag", printable(self->backupTag))
|
||||
.detail("LeftOverKeys", agentValues.size()).detail("keySpace", printable(backupAgentKey));
|
||||
for (auto & s : agentValues) {
|
||||
TraceEvent("BARW_LeftOverKey", randomID).detail("key", printable(StringRef(s.key.toString()))).detail("value", printable(StringRef(s.value.toString())));
|
||||
printf(" Key: %-50s Value: %s\n", printable(StringRef(s.key.toString())).c_str(), printable(StringRef(s.value.toString())).c_str());
|
||||
}
|
||||
}
|
||||
else {
|
||||
printf("No left over backup agent configuration keys\n");
|
||||
}
|
||||
|
||||
Optional<Value> latestVersion = wait(tr->get(backupLatestVersionsKey));
|
||||
if (latestVersion.present()) {
|
||||
TraceEvent(SevError, "BackupCorrectnessLeftOverVersionKey", randomID).detail("backupTag", printable(self->backupTag)).detail("backupLatestVersionsKey", backupLatestVersionsKey.printable()).detail("destUidValue", destUidValue.printable());
|
||||
} else {
|
||||
printf("No left over backup version key\n");
|
||||
}
|
||||
|
||||
Standalone<RangeResultRef> versions = wait(tr->getRange(KeyRange(KeyRangeRef(backupLatestVersionsPath, strinc(backupLatestVersionsPath))), 1));
|
||||
if (!self->shareLogRange || !versions.size()) {
|
||||
Standalone<RangeResultRef> logValues = wait(tr->getRange(KeyRange(KeyRangeRef(backupLogValuesKey, strinc(backupLogValuesKey))), 100));
|
||||
|
||||
// Error if the log/mutation keyspace for the backup tag is not empty
|
||||
if (logValues.size() > 0) {
|
||||
displaySystemKeys ++;
|
||||
printf("BackupCorrectnessLeftOverLogKeys: (%d) %s\n", logValues.size(), printable(backupLogValuesKey).c_str());
|
||||
TraceEvent(SevError, "BackupCorrectnessLeftOverLogKeys", randomID).detail("backupTag", printable(self->backupTag))
|
||||
.detail("LeftOverKeys", logValues.size()).detail("keySpace", printable(backupLogValuesKey));
|
||||
}
|
||||
else {
|
||||
printf("No left over backup log keys\n");
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
catch (Error &e) {
|
||||
|
@ -88,8 +88,8 @@ struct BackupToDBAbort : TestWorkload {
|
||||
TraceEvent("BDBA_End");
|
||||
|
||||
// SOMEDAY: Remove after backup agents can exist quiescently
|
||||
if (g_simulator.backupAgents == ISimulator::BackupToDB) {
|
||||
g_simulator.backupAgents = ISimulator::NoBackupAgents;
|
||||
if (g_simulator.drAgents == ISimulator::BackupToDB) {
|
||||
g_simulator.drAgents = ISimulator::NoBackupAgents;
|
||||
}
|
||||
|
||||
return Void();
|
||||
|
@ -34,9 +34,11 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
|
||||
int backupRangesCount, backupRangeLengthMax;
|
||||
bool differentialBackup, performRestore, agentRequest;
|
||||
Standalone<VectorRef<KeyRangeRef>> backupRanges;
|
||||
static int backupAgentRequests;
|
||||
static int drAgentRequests;
|
||||
Database extraDB;
|
||||
bool locked;
|
||||
bool shareLogRange;
|
||||
UID destUid;
|
||||
|
||||
BackupToDBCorrectnessWorkload(WorkloadContext const& wcx)
|
||||
: TestWorkload(wcx) {
|
||||
@ -53,13 +55,15 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
|
||||
differentialBackup = getOption(options, LiteralStringRef("differentialBackup"), g_random->random01() < 0.5 ? true : false);
|
||||
stopDifferentialAfter = getOption(options, LiteralStringRef("stopDifferentialAfter"),
|
||||
differentialBackup ? g_random->random01() * (restoreAfter - std::max(abortAndRestartAfter,backupAfter)) + std::max(abortAndRestartAfter,backupAfter) : 0.0);
|
||||
agentRequest = getOption(options, LiteralStringRef("simBackupAgents"), true);
|
||||
agentRequest = getOption(options, LiteralStringRef("simDrAgents"), true);
|
||||
shareLogRange = getOption(options, LiteralStringRef("shareLogRange"), false);
|
||||
|
||||
// Use sharedRandomNumber if shareLogRange is true so that we can ensure backup and DR both backup the same range
|
||||
beforePrefix = shareLogRange ? (sharedRandomNumber & 1) : (g_random->random01() < 0.5);
|
||||
|
||||
beforePrefix = g_random->random01() < 0.5;
|
||||
if (beforePrefix) {
|
||||
extraPrefix = backupPrefix.withPrefix(LiteralStringRef("\xfe\xff\xfe"));
|
||||
backupPrefix = backupPrefix.withPrefix(LiteralStringRef("\xfe\xff\xff"));
|
||||
|
||||
}
|
||||
else {
|
||||
extraPrefix = backupPrefix.withPrefix(LiteralStringRef("\x00\x00\x01"));
|
||||
@ -72,7 +76,12 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
|
||||
KeyRef endRange;
|
||||
UID randomID = g_nondeterministic_random->randomUniqueID();
|
||||
|
||||
if(backupRangesCount <= 0) {
|
||||
if (shareLogRange) {
|
||||
if (beforePrefix)
|
||||
backupRanges.push_back_deep(backupRanges.arena(), KeyRangeRef(normalKeys.begin, LiteralStringRef("\xfe\xff\xfe")));
|
||||
else
|
||||
backupRanges.push_back_deep(backupRanges.arena(), KeyRangeRef(strinc(LiteralStringRef("\x00\x00\x01")), normalKeys.end));
|
||||
} else if(backupRangesCount <= 0) {
|
||||
if (beforePrefix)
|
||||
backupRanges.push_back_deep(backupRanges.arena(), KeyRangeRef(normalKeys.begin, std::min(backupPrefix, extraPrefix)));
|
||||
else
|
||||
@ -249,6 +258,8 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
|
||||
|
||||
submitted.send(Void());
|
||||
|
||||
state UID logUid = wait(backupAgent->getLogUid(cx, tag));
|
||||
|
||||
// Stop the differential backup, if enabled
|
||||
if (stopDifferentialDelay) {
|
||||
TEST(!stopDifferentialFuture.isReady()); //Restore starts at specified time
|
||||
@ -261,7 +272,6 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
|
||||
TraceEvent("BARW_doBackup waitForRestorable", randomID).detail("tag", printable(tag));
|
||||
// Wait until the backup is in a restorable state
|
||||
state int resultWait = wait(backupAgent->waitBackup(cx, tag, false));
|
||||
state UID logUid = wait(backupAgent->getLogUid(cx, tag));
|
||||
|
||||
TraceEvent("BARW_lastBackupFolder", randomID).detail("backupTag", printable(tag))
|
||||
.detail("logUid", logUid).detail("waitStatus", resultWait);
|
||||
@ -296,6 +306,10 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
|
||||
|
||||
// Wait for the backup to complete
|
||||
TraceEvent("BARW_doBackup waitBackup", randomID).detail("tag", printable(tag));
|
||||
|
||||
UID _destUid = wait(backupAgent->getDestUid(cx, logUid));
|
||||
self->destUid = _destUid;
|
||||
|
||||
state int statusValue = wait(backupAgent->waitBackup(cx, tag, true));
|
||||
Void _ = wait(backupAgent->unlockBackup(cx, tag));
|
||||
|
||||
@ -311,9 +325,11 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> checkData(Database cx, UID logUid, UID randomID, Key tag, DatabaseBackupAgent* backupAgent) {
|
||||
ACTOR static Future<Void> checkData(Database cx, UID logUid, UID destUid, UID randomID, Key tag, DatabaseBackupAgent* backupAgent, bool shareLogRange) {
|
||||
state Key backupAgentKey = uidPrefixKey(logRangesRange.begin, logUid);
|
||||
state Key backupLogValuesKey = uidPrefixKey(backupLogKeys.begin, logUid);
|
||||
state Key backupLogValuesKey = uidPrefixKey(backupLogKeys.begin, destUid);
|
||||
state Key backupLatestVersionsPath = uidPrefixKey(backupLatestVersionsPrefix, destUid);
|
||||
state Key backupLatestVersionsKey = uidPrefixKey(backupLatestVersionsPath, logUid);
|
||||
state int displaySystemKeys = 0;
|
||||
|
||||
// Ensure that there is no left over key within the backup subspace
|
||||
@ -378,21 +394,31 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
|
||||
printf("No left over backup agent configuration keys\n");
|
||||
}
|
||||
|
||||
Standalone<RangeResultRef> logValues = wait(tr->getRange(KeyRange(KeyRangeRef(backupLogValuesKey, strinc(backupLogValuesKey))), 100));
|
||||
|
||||
// Error if the log/mutation keyspace for the backup tag is not empty
|
||||
if (logValues.size() > 0) {
|
||||
displaySystemKeys++;
|
||||
printf("BackupCorrectnessLeftOverLogKeys: (%d) %s\n", logValues.size(), printable(backupLogValuesKey).c_str());
|
||||
TraceEvent(SevError, "BackupCorrectnessLeftOverLogKeys", randomID).detail("backupTag", printable(tag))
|
||||
.detail("LeftOverKeys", logValues.size()).detail("keySpace", printable(backupLogValuesKey)).detail("version", decodeBKMutationLogKey(logValues[0].key).first);
|
||||
for (auto & s : logValues) {
|
||||
TraceEvent("BARW_LeftOverKey", randomID).detail("key", printable(StringRef(s.key.toString()))).detail("value", printable(StringRef(s.value.toString())));
|
||||
printf(" Key: %-50s Value: %s\n", printable(StringRef(s.key.toString())).c_str(), printable(StringRef(s.value.toString())).c_str());
|
||||
}
|
||||
Optional<Value> latestVersion = wait(tr->get(backupLatestVersionsKey));
|
||||
if (latestVersion.present()) {
|
||||
TraceEvent(SevError, "BackupCorrectnessLeftOverVersionKey", randomID).detail("backupTag", printable(tag)).detail("key", backupLatestVersionsKey.printable()).detail("value", BinaryReader::fromStringRef<Version>(latestVersion.get(), Unversioned()));
|
||||
} else {
|
||||
printf("No left over backup version key\n");
|
||||
}
|
||||
else {
|
||||
printf("No left over backup log keys\n");
|
||||
|
||||
Standalone<RangeResultRef> versions = wait(tr->getRange(KeyRange(KeyRangeRef(backupLatestVersionsPath, strinc(backupLatestVersionsPath))), 1));
|
||||
if (!shareLogRange || !versions.size()) {
|
||||
Standalone<RangeResultRef> logValues = wait(tr->getRange(KeyRange(KeyRangeRef(backupLogValuesKey, strinc(backupLogValuesKey))), 100));
|
||||
|
||||
// Error if the log/mutation keyspace for the backup tag is not empty
|
||||
if (logValues.size() > 0) {
|
||||
displaySystemKeys++;
|
||||
printf("BackupCorrectnessLeftOverLogKeys: (%d) %s\n", logValues.size(), printable(backupLogValuesKey).c_str());
|
||||
TraceEvent(SevError, "BackupCorrectnessLeftOverLogKeys", randomID).detail("backupTag", printable(tag))
|
||||
.detail("LeftOverKeys", logValues.size()).detail("keySpace", printable(backupLogValuesKey)).detail("version", decodeBKMutationLogKey(logValues[0].key).first);
|
||||
for (auto & s : logValues) {
|
||||
TraceEvent("BARW_LeftOverKey", randomID).detail("key", printable(StringRef(s.key.toString()))).detail("value", printable(StringRef(s.value.toString())));
|
||||
printf(" Key: %-50s Value: %s\n", printable(StringRef(s.key.toString())).c_str(), printable(StringRef(s.value.toString())).c_str());
|
||||
}
|
||||
}
|
||||
else {
|
||||
printf("No left over backup log keys\n");
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
@ -422,7 +448,7 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
|
||||
|
||||
// Increment the backup agent requets
|
||||
if (self->agentRequest) {
|
||||
BackupToDBCorrectnessWorkload::backupAgentRequests++;
|
||||
BackupToDBCorrectnessWorkload::drAgentRequests++;
|
||||
}
|
||||
|
||||
try{
|
||||
@ -527,23 +553,23 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
|
||||
}
|
||||
}
|
||||
|
||||
Void _ = wait( checkData(self->extraDB, logUid, randomID, self->backupTag, &backupAgent) );
|
||||
Void _ = wait( checkData(self->extraDB, logUid, self->destUid, randomID, self->backupTag, &backupAgent, self->shareLogRange) );
|
||||
|
||||
if (self->performRestore) {
|
||||
state UID restoreUid = wait(backupAgent.getLogUid(self->extraDB, self->restoreTag));
|
||||
Void _ = wait( checkData(cx, restoreUid, randomID, self->restoreTag, &restoreAgent) );
|
||||
Void _ = wait( checkData(cx, restoreUid, restoreUid, randomID, self->restoreTag, &restoreAgent, self->shareLogRange) );
|
||||
}
|
||||
|
||||
TraceEvent("BARW_complete", randomID).detail("backupTag", printable(self->backupTag));
|
||||
|
||||
// Decrement the backup agent requets
|
||||
if (self->agentRequest) {
|
||||
BackupToDBCorrectnessWorkload::backupAgentRequests--;
|
||||
BackupToDBCorrectnessWorkload::drAgentRequests--;
|
||||
}
|
||||
|
||||
// SOMEDAY: Remove after backup agents can exist quiescently
|
||||
if ((g_simulator.backupAgents == ISimulator::BackupToDB) && (!BackupToDBCorrectnessWorkload::backupAgentRequests)) {
|
||||
g_simulator.backupAgents = ISimulator::NoBackupAgents;
|
||||
if ((g_simulator.drAgents == ISimulator::BackupToDB) && (!BackupToDBCorrectnessWorkload::drAgentRequests)) {
|
||||
g_simulator.drAgents = ISimulator::NoBackupAgents;
|
||||
}
|
||||
}
|
||||
catch (Error& e) {
|
||||
@ -555,6 +581,6 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
|
||||
}
|
||||
};
|
||||
|
||||
int BackupToDBCorrectnessWorkload::backupAgentRequests = 0;
|
||||
int BackupToDBCorrectnessWorkload::drAgentRequests = 0;
|
||||
|
||||
WorkloadFactory<BackupToDBCorrectnessWorkload> BackupToDBCorrectnessWorkloadFactory("BackupToDBCorrectness");
|
||||
|
@ -164,6 +164,7 @@ public:
|
||||
simCheckRelocationDuration = false;
|
||||
simConnectionFailuresDisableDuration = 0;
|
||||
simBackupAgents = ISimulator::NoBackupAgents;
|
||||
simDrAgents = ISimulator::NoBackupAgents;
|
||||
}
|
||||
TestSpec( StringRef title, bool dump, bool clear, double startDelay = 30.0, bool useDB = true, double databasePingDelay = -1.0 ) :
|
||||
title( title ), dumpAfterTest( dump ),
|
||||
@ -171,7 +172,7 @@ public:
|
||||
useDB( useDB ), timeout( 600 ),
|
||||
databasePingDelay( databasePingDelay ), runConsistencyCheck( g_network->isSimulated() ),
|
||||
waitForQuiescenceBegin( true ), waitForQuiescenceEnd( true ), simCheckRelocationDuration( false ),
|
||||
simConnectionFailuresDisableDuration( 0 ), simBackupAgents( ISimulator::NoBackupAgents ) {
|
||||
simConnectionFailuresDisableDuration( 0 ), simBackupAgents( ISimulator::NoBackupAgents ), simDrAgents( ISimulator::NoBackupAgents ) {
|
||||
phases = TestWorkload::SETUP | TestWorkload::EXECUTION | TestWorkload::CHECK | TestWorkload::METRICS;
|
||||
if( databasePingDelay < 0 )
|
||||
databasePingDelay = g_network->isSimulated() ? 0.0 : 15.0;
|
||||
@ -193,6 +194,7 @@ public:
|
||||
bool simCheckRelocationDuration; //If set to true, then long duration relocations generate SevWarnAlways messages. Once any workload sets this to true, it will be true for the duration of the program. Can only be used in simulation.
|
||||
double simConnectionFailuresDisableDuration;
|
||||
ISimulator::BackupAgentType simBackupAgents; //If set to true, then the simulation runs backup agents on the workers. Can only be used in simulation.
|
||||
ISimulator::BackupAgentType simDrAgents;
|
||||
};
|
||||
|
||||
Future<DistributedTestResults> runWorkload(
|
||||
|
28
tests/slow/SharedBackupCorrectness.txt
Normal file
28
tests/slow/SharedBackupCorrectness.txt
Normal file
@ -0,0 +1,28 @@
|
||||
testTitle=BackupAndRestore
|
||||
testName=Cycle
|
||||
nodeCount=3000
|
||||
transactionsPerSecond=500.0
|
||||
testDuration=30.0
|
||||
expectedRate=0
|
||||
clearAfterTest=false
|
||||
|
||||
testName=BackupAndRestoreCorrectness
|
||||
backupTag=backup1
|
||||
backupAfter=10.0
|
||||
restoreAfter=60.0
|
||||
clearAfterTest=false
|
||||
simBackupAgents=BackupToFileAndDB
|
||||
shareLogRange=true
|
||||
performRestore=true
|
||||
allowPauses=false
|
||||
|
||||
testName=BackupToDBCorrectness
|
||||
backupTag=backup2
|
||||
backupPrefix=b1
|
||||
backupAfter=15.0
|
||||
restoreAfter=60.0
|
||||
performRestore=false
|
||||
clearAfterTest=false
|
||||
simBackupAgents=BackupToFileAndDB
|
||||
shareLogRange=true
|
||||
extraDB=1
|
27
tests/slow/SharedBackupToDBCorrectness.txt
Normal file
27
tests/slow/SharedBackupToDBCorrectness.txt
Normal file
@ -0,0 +1,27 @@
|
||||
testTitle=BackupAndRestore
|
||||
testName=Cycle
|
||||
nodeCount=3000
|
||||
transactionsPerSecond=500.0
|
||||
testDuration=30.0
|
||||
expectedRate=0
|
||||
clearAfterTest=false
|
||||
|
||||
testName=BackupAndRestoreCorrectness
|
||||
backupTag=backup1
|
||||
backupAfter=10.0
|
||||
clearAfterTest=false
|
||||
simBackupAgents=BackupToFileAndDB
|
||||
shareLogRange=true
|
||||
performRestore=false
|
||||
allowPauses=false
|
||||
|
||||
testName=BackupToDBCorrectness
|
||||
backupTag=backup2
|
||||
backupPrefix=b2
|
||||
backupAfter=15.0
|
||||
restoreAfter=60.0
|
||||
performRestore=true
|
||||
clearAfterTest=false
|
||||
simBackupAgents=BackupToFileAndDB
|
||||
shareLogRange=true
|
||||
extraDB=1
|
Loading…
x
Reference in New Issue
Block a user