Merge pull request #440 from etschannen/release-5.2

backup created large transactions when erasing log ranges
This commit is contained in:
A.J. Beamon 2018-06-06 13:45:43 -07:00 committed by GitHub
commit 59caa968dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 108 additions and 147 deletions

View File

@ -8,6 +8,7 @@ Release Notes
Fixes
-----
* Backup would attempt to clear too many ranges in a single transaction when erasing log ranges. `(PR #440) https://github.com/apple/foundationdb/pull/440`_
* A read-only transaction using the ``READ_LOCK_AWARE`` option would fail if committed. `(PR #437) https://github.com/apple/foundationdb/pull/437`_
5.2.2

View File

@ -423,7 +423,7 @@ bool copyParameter(Reference<Task> source, Reference<Task> dest, Key key);
Version getVersionFromString(std::string const& value);
Standalone<VectorRef<KeyRangeRef>> getLogRanges(Version beginVersion, Version endVersion, Key destUidValue, int blockSize = CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE);
Standalone<VectorRef<KeyRangeRef>> getApplyRanges(Version beginVersion, Version endVersion, Key backupUid);
Future<Void> eraseLogData(Database cx, Key logUidValue, Key destUidValue, Optional<Version> beginVersion = Optional<Version>(), Optional<Version> endVersion = Optional<Version>(), bool checkBackupUid = false, Version backupUid = 0);
Future<Void> eraseLogData(Database cx, Key logUidValue, Key destUidValue, Optional<Version> endVersion = Optional<Version>(), bool checkBackupUid = false, Version backupUid = 0);
Key getApplyKey( Version version, Key backupUid );
std::pair<uint64_t, uint32_t> decodeBKMutationLogKey(Key key);
Standalone<VectorRef<MutationRef>> decodeBackupLogValue(StringRef value);

View File

@ -624,146 +624,112 @@ ACTOR Future<Void> applyMutations(Database cx, Key uid, Key addPrefix, Key remov
}
}
ACTOR Future<Void> _clearLogRanges(Reference<ReadYourWritesTransaction> tr, bool clearVersionHistory, Key logUidValue, Key destUidValue, Version beginVersion, Version endVersion) {
ACTOR static Future<Void> _eraseLogData(Database cx, Key logUidValue, Key destUidValue, Optional<Version> endVersion, bool checkBackupUid, Version backupUid) {
state Key backupLatestVersionsPath = destUidValue.withPrefix(backupLatestVersionsPrefix);
state Key backupLatestVersionsKey = logUidValue.withPrefix(backupLatestVersionsPath);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Standalone<RangeResultRef> backupVersions = wait(tr->getRange(KeyRangeRef(backupLatestVersionsPath, strinc(backupLatestVersionsPath)), CLIENT_KNOBS->TOO_MANY));
// Make sure version history key does exist and lower the beginVersion if needed
bool foundSelf = false;
for (auto backupVersion : backupVersions) {
Key currLogUidValue = backupVersion.key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue);
if (currLogUidValue == logUidValue) {
foundSelf = true;
beginVersion = std::min(beginVersion, BinaryReader::fromStringRef<Version>(backupVersion.value, Unversioned()));
}
}
// Do not clear anything if version history key cannot be found
if (!foundSelf) {
if (!destUidValue.size()) {
return Void();
}
Version nextSmallestVersion = endVersion;
bool clearLogRangesRequired = true;
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
loop{
try {
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
// More than one backup/DR with the same range
if (backupVersions.size() > 1) {
for (auto backupVersion : backupVersions) {
Key currLogUidValue = backupVersion.key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue);
Version currVersion = BinaryReader::fromStringRef<Version>(backupVersion.value, Unversioned());
if (currLogUidValue == logUidValue) {
continue;
} else if (currVersion > beginVersion) {
nextSmallestVersion = std::min(currVersion, nextSmallestVersion);
} else {
// If we can find a version less than or equal to beginVersion, clearing log ranges is not required
clearLogRangesRequired = false;
break;
}
}
}
if (clearVersionHistory && backupVersions.size() == 1) {
// Clear version history
tr->clear(prefixRange(backupLatestVersionsPath));
// Clear everything under blog/[destUid]
tr->clear(prefixRange(destUidValue.withPrefix(backupLogKeys.begin)));
// Disable committing mutations into blog
tr->clear(prefixRange(destUidValue.withPrefix(logRangesRange.begin)));
} else {
if (clearVersionHistory) {
// Clear current backup version history
tr->clear(backupLatestVersionsKey);
} else {
// Update current backup latest version
tr->set(backupLatestVersionsKey, BinaryWriter::toValue<Version>(endVersion, Unversioned()));
}
// Clear log ranges if needed
if (clearLogRangesRequired) {
Standalone<VectorRef<KeyRangeRef>> ranges = getLogRanges(beginVersion, nextSmallestVersion, destUidValue);
for (auto& range : ranges) {
tr->clear(range);
}
}
}
return Void();
}
// The difference between beginVersion and endVersion should not be too large
Future<Void> clearLogRanges(Reference<ReadYourWritesTransaction> tr, bool clearVersionHistory, Key logUidValue, Key destUidValue, Version beginVersion, Version endVersion) {
return _clearLogRanges(tr, clearVersionHistory, logUidValue, destUidValue, beginVersion, endVersion);
}
ACTOR static Future<Void> _eraseLogData(Database cx, Key logUidValue, Key destUidValue, Optional<Version> beginVersion, Optional<Version> endVersion, bool checkBackupUid, Version backupUid) {
if ((beginVersion.present() && endVersion.present() && endVersion.get() <= beginVersion.get()) || !destUidValue.size())
return Void();
state Version currBeginVersion;
state Version endVersionValue;
state Version currEndVersion;
state bool clearVersionHistory;
ASSERT(beginVersion.present() == endVersion.present());
if (beginVersion.present()) {
currBeginVersion = beginVersion.get();
endVersionValue = endVersion.get();
clearVersionHistory = false;
} else {
// If beginVersion and endVersion are not presented, it means backup is done and we need to clear version history.
// Set currBeginVersion to INTMAX_MAX and it will be set to the correct version in clearLogRanges().
// Set endVersionValue to INTMAX_MAX since we need to clear log ranges up to next smallest version.
currBeginVersion = endVersionValue = currEndVersion = INTMAX_MAX;
clearVersionHistory = true;
}
while (currBeginVersion < endVersionValue || clearVersionHistory) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
loop{
try {
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (checkBackupUid) {
Subspace sourceStates = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keySourceStates).get(logUidValue);
Optional<Value> v = wait( tr->get( sourceStates.pack(DatabaseBackupAgent::keyFolderId) ) );
if(v.present() && BinaryReader::fromStringRef<Version>(v.get(), Unversioned()) > backupUid)
return Void();
}
if (!clearVersionHistory) {
currEndVersion = std::min(currBeginVersion + CLIENT_KNOBS->CLEAR_LOG_RANGE_COUNT * CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE, endVersionValue);
}
Void _ = wait(clearLogRanges(tr, clearVersionHistory, logUidValue, destUidValue, currBeginVersion, currEndVersion));
Void _ = wait(tr->commit());
if (clearVersionHistory) {
if (checkBackupUid) {
Subspace sourceStates = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keySourceStates).get(logUidValue);
Optional<Value> v = wait( tr->get( sourceStates.pack(DatabaseBackupAgent::keyFolderId) ) );
if(v.present() && BinaryReader::fromStringRef<Version>(v.get(), Unversioned()) > backupUid)
return Void();
}
state Standalone<RangeResultRef> backupVersions = wait(tr->getRange(KeyRangeRef(backupLatestVersionsPath, strinc(backupLatestVersionsPath)), CLIENT_KNOBS->TOO_MANY));
// Make sure version history key does exist and lower the beginVersion if needed
state Version currBeginVersion = invalidVersion;
for (auto backupVersion : backupVersions) {
Key currLogUidValue = backupVersion.key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue);
if (currLogUidValue == logUidValue) {
currBeginVersion = BinaryReader::fromStringRef<Version>(backupVersion.value, Unversioned());
break;
}
}
// Do not clear anything if version history key cannot be found
if (currBeginVersion == invalidVersion) {
return Void();
}
state Version currEndVersion = currBeginVersion + CLIENT_KNOBS->CLEAR_LOG_RANGE_COUNT * CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
if(endVersion.present()) {
currEndVersion = std::min(currEndVersion, endVersion.get());
}
state Version nextSmallestVersion = currEndVersion;
bool clearLogRangesRequired = true;
// More than one backup/DR with the same range
if (backupVersions.size() > 1) {
for (auto backupVersion : backupVersions) {
Key currLogUidValue = backupVersion.key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue);
Version currVersion = BinaryReader::fromStringRef<Version>(backupVersion.value, Unversioned());
if (currLogUidValue == logUidValue) {
continue;
} else if (currVersion > currBeginVersion) {
nextSmallestVersion = std::min(currVersion, nextSmallestVersion);
} else {
// If we can find a version less than or equal to beginVersion, clearing log ranges is not required
clearLogRangesRequired = false;
break;
}
}
}
if (!endVersion.present() && backupVersions.size() == 1) {
// Clear version history
tr->clear(prefixRange(backupLatestVersionsPath));
// Clear everything under blog/[destUid]
tr->clear(prefixRange(destUidValue.withPrefix(backupLogKeys.begin)));
// Disable committing mutations into blog
tr->clear(prefixRange(destUidValue.withPrefix(logRangesRange.begin)));
} else {
if (!endVersion.present() && currEndVersion >= nextSmallestVersion) {
// Clear current backup version history
tr->clear(backupLatestVersionsKey);
} else {
// Update current backup latest version
tr->set(backupLatestVersionsKey, BinaryWriter::toValue<Version>(currEndVersion, Unversioned()));
}
currBeginVersion = currEndVersion;
break;
} catch (Error &e) {
Void _ = wait(tr->onError(e));
// Clear log ranges if needed
if (clearLogRangesRequired) {
Standalone<VectorRef<KeyRangeRef>> ranges = getLogRanges(currBeginVersion, nextSmallestVersion, destUidValue);
for (auto& range : ranges) {
tr->clear(range);
}
}
}
Void _ = wait(tr->commit());
if (!endVersion.present() && (backupVersions.size() == 1 || currEndVersion >= nextSmallestVersion)) {
return Void();
}
if(endVersion.present() && currEndVersion == endVersion.get()) {
return Void();
}
tr->reset();
} catch (Error &e) {
Void _ = wait(tr->onError(e));
}
}
return Void();
}
Future<Void> eraseLogData(Database cx, Key logUidValue, Key destUidValue, Optional<Version> beginVersion, Optional<Version> endVersion, bool checkBackupUid, Version backupUid) {
return _eraseLogData(cx, logUidValue, destUidValue, beginVersion, endVersion, checkBackupUid, backupUid);
}
Future<Void> eraseLogData(Database cx, Key logUidValue, Key destUidValue, Optional<Version> endVersion, bool checkBackupUid, Version backupUid) {
return _eraseLogData(cx, logUidValue, destUidValue, endVersion, checkBackupUid, backupUid);
}

View File

@ -476,21 +476,20 @@ namespace dbBackup {
Void _ = wait(checkTaskVersion(cx, task, EraseLogRangeTaskFunc::name, EraseLogRangeTaskFunc::version));
Version beginVersion = BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyBeginVersion], Unversioned());
Version endVersion = BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyEndVersion], Unversioned());
Void _ = wait(eraseLogData(taskBucket->src, task->params[BackupAgentBase::keyConfigLogUid], task->params[BackupAgentBase::destUid], Optional<Version>(beginVersion), Optional<Version>(endVersion), true, BinaryReader::fromStringRef<Version>(task->params[BackupAgentBase::keyFolderId], Unversioned())));
Void _ = wait(eraseLogData(taskBucket->src, task->params[BackupAgentBase::keyConfigLogUid], task->params[BackupAgentBase::destUid], Optional<Version>(endVersion), true, BinaryReader::fromStringRef<Version>(task->params[BackupAgentBase::keyFolderId], Unversioned())));
return Void();
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, Version beginVersion, Version endVersion, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, Version endVersion, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
Key doneKey = wait(completionKey.get(tr, taskBucket));
Reference<Task> task(new Task(EraseLogRangeTaskFunc::name, EraseLogRangeTaskFunc::version, doneKey, 1));
copyDefaultParameters(parentTask, task);
task->params[DatabaseBackupAgent::keyBeginVersion] = BinaryWriter::toValue(beginVersion, Unversioned());
task->params[DatabaseBackupAgent::keyBeginVersion] = BinaryWriter::toValue(1, Unversioned()); //FIXME: remove in 6.X, only needed for 5.2 backward compatibility
task->params[DatabaseBackupAgent::keyEndVersion] = BinaryWriter::toValue(endVersion, Unversioned());
if (!waitFor) {
@ -749,7 +748,7 @@ namespace dbBackup {
// Do not erase at the first time
if (prevBeginVersion > 0) {
addTaskVector.push_back(EraseLogRangeTaskFunc::addTask(tr, taskBucket, task, prevBeginVersion, beginVersion, TaskCompletionKey::joinWith(allPartsDone)));
addTaskVector.push_back(EraseLogRangeTaskFunc::addTask(tr, taskBucket, task, beginVersion, TaskCompletionKey::joinWith(allPartsDone)));
}
Void _ = wait(waitForAll(addTaskVector) && taskBucket->finish(tr, task));
@ -856,7 +855,7 @@ namespace dbBackup {
}
Version backupUid = BinaryReader::fromStringRef<Version>(task->params[BackupAgentBase::keyFolderId], Unversioned());
Void _ = wait(eraseLogData(taskBucket->src, logUidValue, destUidValue, Optional<Version>(), Optional<Version>(), true, backupUid));
Void _ = wait(eraseLogData(taskBucket->src, logUidValue, destUidValue, Optional<Version>(), true, backupUid));
return Void();
}
@ -952,7 +951,7 @@ namespace dbBackup {
}
if (prevBeginVersion > 0) {
addTaskVector.push_back(EraseLogRangeTaskFunc::addTask(tr, taskBucket, task, prevBeginVersion, beginVersion, TaskCompletionKey::joinWith(allPartsDone)));
addTaskVector.push_back(EraseLogRangeTaskFunc::addTask(tr, taskBucket, task, beginVersion, TaskCompletionKey::joinWith(allPartsDone)));
}
Void _ = wait(waitForAll(addTaskVector) && taskBucket->finish(tr, task));

View File

@ -1911,30 +1911,25 @@ namespace fileBackup {
state Reference<FlowLock> lock(new FlowLock(CLIENT_KNOBS->BACKUP_LOCK_BYTES));
Void _ = wait(checkTaskVersion(cx, task, EraseLogRangeTaskFunc::name, EraseLogRangeTaskFunc::version));
state Version beginVersion = Params.beginVersion().get(task);
state Version endVersion = Params.endVersion().get(task);
state Key destUidValue = Params.destUidValue().get(task);
state BackupConfig config(task);
state Key logUidValue = config.getUidAsKey();
if (beginVersion == 0) {
Void _ = wait(eraseLogData(cx, logUidValue, destUidValue));
} else {
Void _ = wait(eraseLogData(cx, logUidValue, destUidValue, Optional<Version>(beginVersion), Optional<Version>(endVersion)));
}
Void _ = wait(eraseLogData(cx, logUidValue, destUidValue, endVersion != 0 ? Optional<Version>(endVersion) : Optional<Version>()));
return Void();
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, UID logUid, TaskCompletionKey completionKey, Key destUidValue, Version beginVersion = 0, Version endVersion = 0, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, UID logUid, TaskCompletionKey completionKey, Key destUidValue, Version endVersion = 0, Reference<TaskFuture> waitFor = Reference<TaskFuture>()) {
Key key = wait(addBackupTask(EraseLogRangeTaskFunc::name,
EraseLogRangeTaskFunc::version,
tr, taskBucket, completionKey,
BackupConfig(logUid),
waitFor,
[=](Reference<Task> task) {
Params.beginVersion().set(task, beginVersion);
Params.beginVersion().set(task, 1); //FIXME: remove in 6.X, only needed for 5.2 backward compatibility
Params.endVersion().set(task, endVersion);
Params.destUidValue().set(task, destUidValue);
},
@ -2039,7 +2034,7 @@ namespace fileBackup {
// Do not erase at the first time
if (prevBeginVersion > 0) {
state Key destUidValue = wait(config.destUidValue().getOrThrow(tr));
Key _ = wait(EraseLogRangeTaskFunc::addTask(tr, taskBucket, config.getUid(), TaskCompletionKey::joinWith(logDispatchBatchFuture), destUidValue, prevBeginVersion, beginVersion));
Key _ = wait(EraseLogRangeTaskFunc::addTask(tr, taskBucket, config.getUid(), TaskCompletionKey::joinWith(logDispatchBatchFuture), destUidValue, beginVersion));
}
Void _ = wait(taskBucket->finish(tr, task));
@ -3481,8 +3476,8 @@ public:
tr->set(destUidLookupPath, destUidValue);
}
}
Version initVersion = 1;
tr->set(config.getUidAsKey().withPrefix(destUidValue).withPrefix(backupLatestVersionsPrefix), BinaryWriter::toValue<Version>(initVersion, Unversioned()));
tr->set(config.getUidAsKey().withPrefix(destUidValue).withPrefix(backupLatestVersionsPrefix), BinaryWriter::toValue<Version>(tr->getReadVersion().get(), Unversioned()));
config.destUidValue().set(tr, destUidValue);
// Point the tag to this new uid