Add correctness test for point-in-time restore (#9185)

This commit is contained in:
Hui Liu 2023-03-14 08:56:34 -07:00 committed by GitHub
parent 4e0a46b3d4
commit 499a4cab93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 204 additions and 21 deletions

View File

@ -163,11 +163,10 @@ public:
const BlobManifestFile& firstFile = *iter;
result.push_back(firstFile);
// search all following files belonging to same manifest
for (auto it = iter + 1; it != allFiles.end(); ++it) {
if (it->belongToSameManifest(firstFile)) {
result.push_back(*it);
for (++iter; iter != allFiles.end(); ++iter) {
if (iter->belongToSameManifest(firstFile)) {
result.push_back(*iter);
} else {
iter = it; // start point for next search
break;
}
}

View File

@ -94,6 +94,16 @@ private:
.detail("Version", granule.version)
.detail("SizeInBytes", granule.sizeInBytes);
}
// Restore version is expected to be greater than max version from blob granule files.
state Version max = maxVersion(self);
Version targetVersion = wait(BlobRestoreController::getTargetVersion(restoreController, max));
if (targetVersion < max) {
TraceEvent("UnsupportedRestoreVersion", self->interf_.id())
.detail("MaxBlobGranulesVersion", max)
.detail("TargetVersion", targetVersion);
throw restore_missing_data();
}
wait(BlobRestoreController::updateState(restoreController, COPYING_DATA, LOADED_MANIFEST));
return Void();
}
@ -327,13 +337,16 @@ private:
// check last version in mutation logs
state std::string mlogsUrl = wait(getMutationLogUrl());
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(mlogsUrl, {}, {});
state double beginTs = now();
BackupDescription desc = wait(bc->describeBackup(true));
TraceEvent("DescribeBackupLatency", self->interf_.id()).detail("Seconds", now() - beginTs);
if (!desc.contiguousLogEnd.present()) {
TraceEvent("InvalidMutationLogs").detail("Url", SERVER_KNOBS->BLOB_RESTORE_MLOGS_URL);
TraceEvent("InvalidMutationLogs", self->interf_.id()).detail("Url", SERVER_KNOBS->BLOB_RESTORE_MLOGS_URL);
throw blob_restore_missing_logs();
}
if (!desc.minLogBegin.present()) {
TraceEvent("InvalidMutationLogs").detail("Url", SERVER_KNOBS->BLOB_RESTORE_MLOGS_URL);
TraceEvent("InvalidMutationLogs", self->interf_.id()).detail("Url", SERVER_KNOBS->BLOB_RESTORE_MLOGS_URL);
throw blob_restore_missing_logs();
}
state Version minLogVersion = desc.minLogBegin.get();
@ -343,20 +356,20 @@ private:
state Version targetVersion = wait(BlobRestoreController::getTargetVersion(restoreController, maxLogVersion));
if (targetVersion < maxLogVersion) {
if (!needApplyLogs(self, targetVersion)) {
TraceEvent("SkipMutationLogs").detail("TargetVersion", targetVersion);
TraceEvent("SkipMutationLogs", self->interf_.id()).detail("TargetVersion", targetVersion);
dprint("Skip mutation logs as all granules are at version {}\n", targetVersion);
return Void();
}
}
if (targetVersion < minLogVersion) {
TraceEvent("MissingMutationLogs")
TraceEvent("MissingMutationLogs", self->interf_.id())
.detail("MinLogVersion", minLogVersion)
.detail("TargetVersion", maxLogVersion);
throw blob_restore_missing_logs();
}
if (targetVersion > maxLogVersion) {
TraceEvent("SkipTargetVersion")
TraceEvent("SkipTargetVersion", self->interf_.id())
.detail("MaxLogVersion", maxLogVersion)
.detail("TargetVersion", targetVersion);
}
@ -366,7 +379,7 @@ private:
state Standalone<VectorRef<Version>> beginVersions;
for (auto& granule : self->blobGranules_) {
if (granule.version < minLogVersion || granule.version > maxLogVersion) {
TraceEvent("InvalidMutationLogs")
TraceEvent("InvalidMutationLogs", self->interf_.id())
.detail("Granule", granule.granuleID)
.detail("GranuleVersion", granule.version)
.detail("MinLogVersion", minLogVersion)
@ -380,13 +393,15 @@ private:
// Blob granule ends at granule.version(inclusive), so we need to apply mutation logs
// after granule.version(exclusive).
beginVersions.push_back(beginVersions.arena(), granule.version);
TraceEvent("ApplyMutationLogVersion").detail("GID", granule.granuleID).detail("Ver", granule.version);
TraceEvent("ApplyMutationLogVersion", self->interf_.id())
.detail("GID", granule.granuleID)
.detail("Ver", granule.version);
}
}
Optional<RestorableFileSet> restoreSet =
wait(bc->getRestoreSet(maxLogVersion, ranges, OnlyApplyMutationLogs::True, minLogVersion));
if (!restoreSet.present()) {
TraceEvent("InvalidMutationLogs")
TraceEvent("InvalidMutationLogs", self->interf_.id())
.detail("MinLogVersion", minLogVersion)
.detail("MaxLogVersion", maxLogVersion);
throw blob_restore_corrupted_logs();

View File

@ -26,6 +26,8 @@
#include "fdbclient/BackupContainerFileSystem.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/BlobGranuleReader.actor.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/BlobGranuleServerCommon.actor.h"
#include "flow/Error.h"
@ -52,7 +54,11 @@ struct BlobRestoreWorkload : TestWorkload {
extraDb_ = Database::createSimulatedExtraDatabase(g_simulator->extraDatabases[0]);
setupBlob_ = getOption(options, "setupBlob"_sr, false);
performRestore_ = getOption(options, "performRestore"_sr, false);
restoreToVersion_ = getOption(options, "restoreToVersion"_sr, false);
readBatchSize_ = getOption(options, "readBatchSize"_sr, 3000);
if (!blobConn_.isValid() && SERVER_KNOBS->BG_METADATA_SOURCE != "tenant") {
blobConn_ = BlobConnectionProvider::newBlobConnectionProvider(SERVER_KNOBS->BG_URL);
}
}
Future<Void> setup(Database const& cx) override { return Void(); }
@ -76,7 +82,15 @@ struct BlobRestoreWorkload : TestWorkload {
KnobValueRef knobFalse = KnobValueRef::create(bool{ false });
IKnobCollection::getMutableGlobalKnobCollection().setKnob("blob_manifest_backup", knobFalse);
wait(store(result, self->extraDb_->blobRestore(normalKeys, {})));
wait(store(self->restoreTargetVersion_, getRestoreVersion(cx, self)));
fmt::print("Restore target version {}\n", self->restoreTargetVersion_);
// Only need to pass the version if we are trying to restore to a previous version
Optional<Version> targetVersion;
if (self->restoreToVersion_) {
targetVersion = self->restoreTargetVersion_;
}
wait(store(result, self->extraDb_->blobRestore(normalKeys, targetVersion)));
state std::vector<Future<Void>> futures;
futures.push_back(self->runBackupAgent(self));
@ -86,6 +100,28 @@ struct BlobRestoreWorkload : TestWorkload {
return Void();
}
ACTOR static Future<Version> getRestoreVersion(Database cx, BlobRestoreWorkload* self) {
state Version targetVersion;
state std::string baseUrl = SERVER_KNOBS->BLOB_RESTORE_MLOGS_URL;
state std::vector<std::string> containers = wait(IBackupContainer::listContainers(baseUrl, {}));
if (containers.size() == 0) {
fmt::print("missing mutation logs {}\n", baseUrl);
throw restore_missing_data();
}
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(containers.front(), {}, {});
BackupDescription desc = wait(bc->describeBackup(true));
if (!desc.contiguousLogEnd.present()) {
fmt::print("missing mutation logs {}\n", baseUrl);
throw restore_missing_data();
}
targetVersion = desc.contiguousLogEnd.get() - 1;
if (self->restoreToVersion_) {
// restore to a previous version
targetVersion -= deterministicRandom()->randomInt(1, 100000);
}
return targetVersion;
}
// Start backup agent on the extra db
ACTOR Future<Void> runBackupAgent(BlobRestoreWorkload* self) {
state FileBackupAgent backupAgent;
@ -116,13 +152,14 @@ struct BlobRestoreWorkload : TestWorkload {
}
}
ACTOR static Future<Standalone<VectorRef<KeyValueRef>>> read(Database cx,
KeySelectorRef begin,
BlobRestoreWorkload* self) {
ACTOR static Future<Standalone<VectorRef<KeyValueRef>>> readFromStorageServer(Database cx,
KeySelectorRef begin,
BlobRestoreWorkload* self) {
state Standalone<VectorRef<KeyValueRef>> data;
state Transaction tr(cx);
state KeySelectorRef end = firstGreaterOrEqual(normalKeys.end);
state Arena arena;
loop {
try {
GetRangeLimits limits(self->readBatchSize_ - data.size());
@ -149,6 +186,41 @@ struct BlobRestoreWorkload : TestWorkload {
return data;
}
ACTOR static Future<Standalone<VectorRef<KeyValueRef>>> readFromBlob(Database cx,
KeySelectorRef begin,
Version readVersion,
BlobRestoreWorkload* self) {
state Transaction tr(cx);
state Standalone<VectorRef<KeyValueRef>> data;
state KeyRangeRef keys(begin.getKey(), normalKeys.end);
state int count = 0;
loop {
try {
state Standalone<VectorRef<BlobGranuleChunkRef>> chunks =
wait(tr.readBlobGranules(keys, 0, readVersion));
state int i;
for (i = 0; i < chunks.size(); ++i) {
state RangeResult rows = wait(readBlobGranule(chunks[i], keys, 0, readVersion, self->blobConn_));
for (auto& r : rows) {
if (begin.isDefinitelyGreater(r.key)) {
continue;
}
data.push_back_deep(data.arena(), r);
count++;
if (count >= self->readBatchSize_) {
return data;
}
}
}
return data;
} catch (Error& e) {
fmt::print("read blob error {} \n", e.what());
wait(tr.onError(e));
}
}
}
static bool compare(VectorRef<KeyValueRef> src, VectorRef<KeyValueRef> dest) {
if (src.size() != dest.size()) {
fmt::print("Size mismatch src {} dest {}\n", src.size(), dest.size());
@ -198,11 +270,20 @@ struct BlobRestoreWorkload : TestWorkload {
state Arena arena;
state KeySelectorRef srcBegin = firstGreaterOrEqual(normalKeys.begin);
state KeySelectorRef destBegin = firstGreaterOrEqual(normalKeys.begin);
// flush src db
bool flush = wait(cx->flushBlobRange(normalKeys, false, self->restoreTargetVersion_));
if (!flush) {
fmt::print("Cannot flush to version {} \n", self->restoreTargetVersion_);
throw internal_error();
}
loop {
// restore src. data before restore
state Standalone<VectorRef<KeyValueRef>> src = wait(read(cx, srcBegin, self));
// restore dest. data after restore
state Standalone<VectorRef<KeyValueRef>> dest = wait(read(self->extraDb_, destBegin, self));
state Standalone<VectorRef<KeyValueRef>> src =
wait(readFromBlob(cx, srcBegin, self->restoreTargetVersion_, self));
// restore dest. data after restore
state Standalone<VectorRef<KeyValueRef>> dest =
wait(readFromStorageServer(self->extraDb_, destBegin, self));
if (src.size() == 0 && dest.size() == 0) {
break;
}
@ -225,6 +306,9 @@ private:
bool setupBlob_;
bool performRestore_;
int readBatchSize_;
bool restoreToVersion_;
Version restoreTargetVersion_;
Reference<BlobConnectionProvider> blobConn_;
};
WorkloadFactory<BlobRestoreWorkload> BlobRestoreWorkloadFactory;

View File

@ -151,6 +151,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES fast/BlobGranuleMoveVerifyCycle.toml IGNORE)
add_fdb_test(TEST_FILES fast/BlobRestoreBasic.toml)
add_fdb_test(TEST_FILES fast/BlobRestoreLarge.toml)
add_fdb_test(TEST_FILES fast/BlobRestoreToVersion.toml)
add_fdb_test(TEST_FILES fast/CacheTest.toml)
add_fdb_test(TEST_FILES fast/CloggedSideband.toml)
add_fdb_test(TEST_FILES fast/CompressionUtilsUnit.toml IGNORE)

View File

@ -46,7 +46,7 @@ clearAfterTest = false
testName = 'Cycle'
nodeCount = 3000
transactionsPerSecond = 3000.0
testDuration = 10.0
testDuration = 30.0
expectedRate = 0
[[test.workload]]
@ -75,5 +75,5 @@ checkOnly = true
testName = 'Cycle'
nodeCount = 3000
transactionsPerSecond = 3000.0
testDuration = 10.0
testDuration = 30.0
expectedRate = 0

View File

@ -0,0 +1,84 @@
[configuration]
testClass = "BlobRestore"
blobGranulesEnabled = true
extraDatabaseMode = 'Single'
allowDefaultTenant = false
disableTss = true
storageEngineExcludeTypes = [4, 5]
[[knobs]]
bg_consistency_check_enabled = 0
blob_manifest_backup = true
shard_encode_location_metadata = false
bw_throttling_enabled = false
[[test]]
testTitle = 'SetupBlob'
simBackupAgents = 'BackupToFile'
runConsistencyCheck = false
clearAfterTest = false
waitForQuiescence = false
[[test.workload]]
testName = 'BlobRestoreWorkload'
setupBlob = true
[[test]]
testTitle = 'BackupMutationLogs'
simBackupAgents = 'BackupToFile'
runConsistencyCheck = false
waitForQuiescence = false
[[test.workload]]
testName = 'IncrementalBackup'
tag = 'default'
submitOnly = true
waitForBackup = true
[[test]]
testTitle = 'WriteTest'
simBackupAgents = 'BackupToFile'
runConsistencyCheck = false
waitForQuiescence = false
clearAfterTest = false
[[test.workload]]
testName = 'ReadWrite'
testDuration = 60.0
transactionsPerSecond = 200
writesPerTransactionA = 5
readsPerTransactionA = 1
writesPerTransactionB = 10
readsPerTransactionB = 1
alpha = 0.5
nodeCount = 10000
valueBytes = 128
discardEdgeMeasurements = false
warmingDelay = 10.0
setup = false
[[test]]
testTitle = 'StopBackup'
simBackupAgents = 'BackupToFile'
clearAfterTest = false
runConsistencyCheck = false
waitForQuiescence = false
[[test.workload]]
testName = 'IncrementalBackup'
tag = 'default'
stopBackup = true
waitForBackup = true
[[test]]
testTitle = 'BlobRestore'
simBackupAgents = 'BackupToFile'
clearAfterTest = false
runConsistencyCheck = false
waitForQuiescence = false
[[test.workload]]
testName = 'BlobRestoreWorkload'
performRestore = true
restoreToVersion = true