From 8d49c98a412ab1d6c5d1d7ae49e28df9a2d85cd5 Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Thu, 26 Aug 2021 13:47:31 -0500 Subject: [PATCH] Added simulation workload for blob granules and fixed some bugs --- fdbclient/BlobGranuleReader.actor.cpp | 27 +- fdbclient/DatabaseContext.h | 5 +- fdbclient/NativeAPI.actor.cpp | 81 +++-- fdbclient/ServerKnobs.cpp | 4 +- fdbserver/BlobManager.actor.cpp | 12 +- fdbserver/BlobWorker.actor.cpp | 38 +- fdbserver/CMakeLists.txt | 1 + fdbserver/SimulatedCluster.actor.cpp | 14 +- .../workloads/BlobGranuleVerifier.actor.cpp | 339 ++++++++++++++++++ tests/slow/BlobGranuleCorrectness.toml | 10 + 10 files changed, 451 insertions(+), 80 deletions(-) create mode 100644 fdbserver/workloads/BlobGranuleVerifier.actor.cpp create mode 100644 tests/slow/BlobGranuleCorrectness.toml diff --git a/fdbclient/BlobGranuleReader.actor.cpp b/fdbclient/BlobGranuleReader.actor.cpp index e6bc611854..b9ea93b6c0 100644 --- a/fdbclient/BlobGranuleReader.actor.cpp +++ b/fdbclient/BlobGranuleReader.actor.cpp @@ -32,6 +32,8 @@ // TODO could refactor the file reading code from here and the delta file function into another actor, // then this part would also be testable? but meh +#define BG_READ_DEBUG false + ACTOR Future readSnapshotFile(Reference bstore, BlobFilenameRef f, KeyRangeRef keyRange, @@ -88,11 +90,13 @@ ACTOR Future readSnapshotFile(Reference bstore i++; } }*/ - printf("Started with %d rows from snapshot file %s after pruning to [%s - %s)\n", - dataMap->size(), - f.toString().c_str(), - keyRange.begin.printable().c_str(), - keyRange.end.printable().c_str()); + if (BG_READ_DEBUG) { + printf("Started with %d rows from snapshot file %s after pruning to [%s - %s)\n", + dataMap->size(), + f.toString().c_str(), + keyRange.begin.printable().c_str(), + keyRange.end.printable().c_str()); + } return arena; } catch (Error& e) { @@ -126,8 +130,9 @@ ACTOR Future> readDeltaFile(Reference::value, result.contents(), parseArena); result.arena().dependsOn(parseArena); - // result.contents() = ObjectReader::fromStringRef(dataRef, Unversioned()); - printf("Parsed %d deltas from delta file %s\n", result.size(), f.toString().c_str()); + if (BG_READ_DEBUG) { + printf("Parsed %d deltas from delta file %s\n", result.size(), f.toString().c_str()); + } // TODO REMOVE sanity check for (int i = 0; i < result.size() - 1; i++) { @@ -276,14 +281,18 @@ ACTOR Future readBlobGranule(BlobGranuleChunkRef chunk, Arena snapshotArena = wait(readSnapshotFuture); arena.dependsOn(snapshotArena); - printf("Applying %d delta files\n", readDeltaFutures.size()); + if (BG_READ_DEBUG) { + printf("Applying %d delta files\n", readDeltaFutures.size()); + } for (Future> deltaFuture : readDeltaFutures) { Standalone result = wait(deltaFuture); arena.dependsOn(result.arena()); applyDeltas(&dataMap, arena, result, keyRange, readVersion); wait(yield()); } - printf("Applying %d memory deltas\n", chunk.newDeltas.size()); + if (BG_READ_DEBUG) { + printf("Applying %d memory deltas\n", chunk.newDeltas.size()); + } applyDeltas(&dataMap, arena, chunk.newDeltas, keyRange, readVersion); wait(yield()); diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index 601d0d17ff..594eceee99 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -268,10 +268,11 @@ public: Future popRangeFeedMutations(StringRef rangeID, Version version); Future getBlobGranuleRangesStream(const PromiseStream& results, KeyRange range); + // TODO add optional for end version so it can do a GRV in the transaction it already has to do Future readBlobGranulesStream(const PromiseStream>& results, KeyRange range, - Version begin = 0, - Version end = std::numeric_limits::max()); + Version begin, + Version end); // private: explicit DatabaseContext(Reference>> connectionFile, diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 60a1e6cb24..1503021cf1 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -6771,6 +6771,8 @@ Future DatabaseContext::popRangeFeedMutations(StringRef rangeID, Version v return popRangeFeedMutationsActor(Reference::addRef(this), rangeID, version); } +#define BG_REQUEST_DEBUG false + // FIXME: code for discovering blob granules is similar enough that it could be refactored? It's pretty simple though ACTOR Future getBlobGranuleRangesStreamActor(Reference db, PromiseStream results, @@ -6778,10 +6780,14 @@ ACTOR Future getBlobGranuleRangesStreamActor(Reference db state Database cx(db); state Reference tr = makeReference(cx); state KeyRange currentRange = keyRange; - printf( - "Getting Blob Granules for [%s - %s)\n", keyRange.begin.printable().c_str(), keyRange.end.printable().c_str()); + if (BG_REQUEST_DEBUG) { + printf("Getting Blob Granules for [%s - %s)\n", + keyRange.begin.printable().c_str(), + keyRange.end.printable().c_str()); + } loop { try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); state RangeResult blobGranuleMapping = wait(krmGetRanges( tr, blobGranuleMappingKeys.begin, currentRange, 1000, GetRangeLimits::BYTE_LIMIT_UNLIMITED)); @@ -6831,8 +6837,6 @@ ACTOR Future readBlobGranulesStreamActor(Reference db, loop { try { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - // state KeyRange keyRange = KeyRange(KeyRangeRef(LiteralStringRef("\x01"), LiteralStringRef("\x02"))); - // state KeyRange keyRange = KeyRange(KeyRangeRef()); if (end.present()) { endVersion = end.get(); } else { @@ -6859,9 +6863,10 @@ ACTOR Future readBlobGranulesStreamActor(Reference db, throw transaction_too_old(); } - printf("Doing blob granule request @ %lld\n", endVersion); - - printf("blob worker assignments:\n"); + if (BG_REQUEST_DEBUG) { + printf("Doing blob granule request @ %lld\n", endVersion); + printf("blob worker assignments:\n"); + } for (i = 0; i < blobGranuleMapping.size() - 1; i++) { granuleStartKey = blobGranuleMapping[i].key; @@ -6875,16 +6880,20 @@ ACTOR Future readBlobGranulesStreamActor(Reference db, } workerId = decodeBlobGranuleMappingValue(blobGranuleMapping[i].value); - printf(" [%s - %s): %s\n", - granuleStartKey.printable().c_str(), - granuleEndKey.printable().c_str(), - workerId.toString().c_str()); + if (BG_REQUEST_DEBUG) { + printf(" [%s - %s): %s\n", + granuleStartKey.printable().c_str(), + granuleEndKey.printable().c_str(), + workerId.toString().c_str()); + } if (!cx->blobWorker_interf.count(workerId)) { Optional workerInterface = wait(tr->get(blobWorkerListKeyFor(workerId))); ASSERT(workerInterface.present()); cx->blobWorker_interf[workerId] = decodeBlobWorkerListValue(workerInterface.get()); - printf(" decoded worker interface for %s\n", workerId.toString().c_str()); + if (BG_REQUEST_DEBUG) { + printf(" decoded worker interface for %s\n", workerId.toString().c_str()); + } } } break; @@ -6894,7 +6903,7 @@ ACTOR Future readBlobGranulesStreamActor(Reference db, } // Make request for each granule - for (i = 0; i < blobGranuleMapping.size(); i++) { + for (i = 0; i < blobGranuleMapping.size() - 1; i++) { granuleStartKey = blobGranuleMapping[i].key; granuleEndKey = blobGranuleMapping[i + 1].key; workerId = decodeBlobGranuleMappingValue(blobGranuleMapping[i].value); @@ -6929,29 +6938,35 @@ ACTOR Future readBlobGranulesStreamActor(Reference db, throw _rep.getError(); } BlobGranuleFileReply rep = _rep.get();*/ - printf("Blob granule request for [%s - %s) @ %lld - %lld got reply from %s:\n", - granuleStartKey.printable().c_str(), - granuleEndKey.printable().c_str(), - begin, - endVersion, - workerId.toString().c_str()); + if (BG_REQUEST_DEBUG) { + printf("Blob granule request for [%s - %s) @ %lld - %lld got reply from %s:\n", + granuleStartKey.printable().c_str(), + granuleEndKey.printable().c_str(), + begin, + endVersion, + workerId.toString().c_str()); + } for (auto& chunk : rep.chunks) { - printf("[%s - %s)\n", chunk.keyRange.begin.printable().c_str(), chunk.keyRange.end.printable().c_str()); + if (BG_REQUEST_DEBUG) { + printf("[%s - %s)\n", + chunk.keyRange.begin.printable().c_str(), + chunk.keyRange.end.printable().c_str()); - printf(" SnapshotFile:\n %s\n", - chunk.snapshotFile.present() ? chunk.snapshotFile.get().toString().c_str() : ""); - printf(" DeltaFiles:\n"); - for (auto& df : chunk.deltaFiles) { - printf(" %s\n", df.toString().c_str()); + printf(" SnapshotFile:\n %s\n", + chunk.snapshotFile.present() ? chunk.snapshotFile.get().toString().c_str() : ""); + printf(" DeltaFiles:\n"); + for (auto& df : chunk.deltaFiles) { + printf(" %s\n", df.toString().c_str()); + } + printf(" Deltas: (%d)", chunk.newDeltas.size()); + if (chunk.newDeltas.size() > 0) { + printf(" with version [%lld - %lld]", + chunk.newDeltas[0].version, + chunk.newDeltas[chunk.newDeltas.size() - 1].version); + } + printf(" IncludedVersion: %lld\n", chunk.includedVersion); + printf("\n\n"); } - printf(" Deltas: (%d)", chunk.newDeltas.size()); - if (chunk.newDeltas.size() > 0) { - printf(" with version [%lld - %lld]", - chunk.newDeltas[0].version, - chunk.newDeltas[chunk.newDeltas.size() - 1].version); - } - printf(" IncludedVersion: %lld\n", chunk.includedVersion); - printf("\n\n"); Arena a; a.dependsOn(rep.arena); results.send(Standalone(chunk, a)); diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 2ff1d2bdff..234a2a1a85 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -752,8 +752,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi // Blob granlues init( BG_URL, "" ); // TODO CHANGE BACK - // init( BG_SNAPSHOT_FILE_TARGET_BYTES, 10000000 ); - init( BG_SNAPSHOT_FILE_TARGET_BYTES, 1000000 ); + init( BG_SNAPSHOT_FILE_TARGET_BYTES, 10000000 ); + // init( BG_SNAPSHOT_FILE_TARGET_BYTES, 1000000 ); init( BG_DELTA_BYTES_BEFORE_COMPACT, BG_SNAPSHOT_FILE_TARGET_BYTES/2 ); init( BG_DELTA_FILE_TARGET_BYTES, BG_DELTA_BYTES_BEFORE_COMPACT/10 ); diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index 76637cccb9..3980af9b0b 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -260,17 +260,7 @@ ACTOR Future blobManager(LocalityData locality, Reference rangesToAdd; VectorRef rangesToRemove; // TODO hack for simulation - if (g_network->isSimulated()) { - printf("Hacking blob ranges!\n"); - RangeResult fakeResults; - KeyValueRef one = KeyValueRef(normalKeys.begin, StringRef(ar, LiteralStringRef("1"))); - KeyValueRef two = KeyValueRef(normalKeys.end, StringRef()); - fakeResults.push_back(fakeResults.arena(), one); - fakeResults.push_back(fakeResults.arena(), two); - updateClientBlobRanges(&knownBlobRanges, fakeResults, ar, &rangesToAdd, &rangesToRemove); - } else { - updateClientBlobRanges(&knownBlobRanges, results, ar, &rangesToAdd, &rangesToRemove); - } + updateClientBlobRanges(&knownBlobRanges, results, ar, &rangesToAdd, &rangesToRemove); for (KeyRangeRef range : rangesToRemove) { printf("BM Got range to revoke [%s - %s)\n", diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index c1c498055d..cbaead7699 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -117,8 +117,8 @@ ACTOR Future writeDeltaFile(BlobWorkerData* bwData, // update FDB with new file state Reference tr = makeReference(bwData->db); - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); loop { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); try { Tuple deltaFileKey; deltaFileKey.append(keyRange.begin).append(keyRange.end); @@ -197,9 +197,10 @@ ACTOR Future writeSnapshot(BlobWorkerData* bwData, snapshotFileKey.append(LiteralStringRef("snapshot")).append(version); state Reference tr = makeReference(bwData->db); - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + try { loop { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); try { tr->set(snapshotFileKey.getDataAsStandalone().withPrefix(blobGranuleFileKeys.begin), getFileValue(fname, 0, serialized.size())); @@ -230,9 +231,9 @@ ACTOR Future dumpInitialSnapshotFromFDB(BlobWorkerData* bwData, K keyRange.begin.printable().c_str(), keyRange.end.printable().c_str()); state Reference tr = makeReference(bwData->db); - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); loop { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); try { state Version readVersion = wait(tr->getReadVersion()); state PromiseStream rowsStream; @@ -291,6 +292,7 @@ ACTOR Future compactFromBlob(BlobWorkerData* bwData, KeyRange key chunk.deltaFiles.emplace_back_deep(filenameArena, deltaF.filename, deltaF.offset, deltaF.length); deltaIdx++; } + chunk.includedVersion = version; printf("Re-snapshotting [%s - %s) @ %lld\n", keyRange.begin.printable().c_str(), @@ -328,9 +330,10 @@ ACTOR Future compactFromBlob(BlobWorkerData* bwData, KeyRange key ACTOR Future> createRangeFeed(BlobWorkerData* bwData, KeyRange keyRange) { state Key rangeFeedID = StringRef(deterministicRandom()->randomUniqueID().toString()); state Transaction tr(bwData->db); - tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + loop { try { + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); wait(tr.registerRangeFeed(rangeFeedID, keyRange)); wait(tr.commit()); return std::pair(rangeFeedID, tr.getCommittedVersion()); @@ -370,19 +373,14 @@ ACTOR Future blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference> mutations = waitNext(rangeFeedStream.getFuture()); for (auto& deltas : mutations) { if (!deltas.mutations.empty()) { - metadata->currentDeltas.emplace_back_deep(metadata->deltaArena, deltas); + metadata->currentDeltas.push_back_deep(metadata->deltaArena, deltas); + for (auto& delta : deltas.mutations) { + // FIXME: add mutation tracking here + // 8 for version, 1 for type, 4 for each param length then actual param size + metadata->currentDeltaBytes += 17 + delta.param1.size() + delta.param2.size(); + } } - for (auto& delta : deltas.mutations) { - // TODO REMOVE!!! Just for initial debugging - /*printf("BlobWorker [%s - %s) Got Mutation @ %lld: %s\n", - metadata->keyRange.begin.printable().c_str(), - metadata->keyRange.end.printable().c_str(), - deltas.version, - delta.toString().c_str());*/ - // 8 for version, 1 for type, 4 for each param length then actual param size - metadata->currentDeltaBytes += 17 + delta.param1.size() + delta.param2.size(); - } ASSERT(metadata->currentDeltaVersion <= deltas.version); metadata->currentDeltaVersion = deltas.version; @@ -632,9 +630,9 @@ static void handleRevokedRange(BlobWorkerData* bwData, KeyRange keyRange, Versio ACTOR Future registerBlobWorker(BlobWorkerData* bwData, BlobWorkerInterface interf) { state Reference tr = makeReference(bwData->db); - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); loop { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); try { Key blobWorkerListKey = blobWorkerListKeyFor(interf.id()); tr->addReadConflictRange(singleKeyRange(blobWorkerListKey)); @@ -654,11 +652,11 @@ ACTOR Future registerBlobWorker(BlobWorkerData* bwData, BlobWorkerInterfac // TODO list of key ranges in the future to batch ACTOR Future persistAssignWorkerRange(BlobWorkerData* bwData, KeyRange keyRange, Version assignVersion) { state Reference tr = makeReference(bwData->db); - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + loop { try { - + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); wait(krmSetRangeCoalescing( tr, blobGranuleMappingKeys.begin, keyRange, KeyRange(allKeys), blobGranuleMappingValueFor(bwData->id))); diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index b5314e9787..c7ae08d229 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -153,6 +153,7 @@ set(FDBSERVER_SRCS workloads/BackupToDBCorrectness.actor.cpp workloads/BackupToDBUpgrade.actor.cpp workloads/BlobStoreWorkload.h + workloads/BlobGranuleVerifier.actor.cpp workloads/BulkLoad.actor.cpp workloads/BulkSetup.actor.h workloads/Cache.actor.cpp diff --git a/fdbserver/SimulatedCluster.actor.cpp b/fdbserver/SimulatedCluster.actor.cpp index 432ac561af..925db42854 100644 --- a/fdbserver/SimulatedCluster.actor.cpp +++ b/fdbserver/SimulatedCluster.actor.cpp @@ -254,7 +254,8 @@ public: // Refer to FDBTypes.h::TLogVersion. Defaults to the maximum supported version. int maxTLogVersion = TLogVersion::MAX_SUPPORTED; // Set true to simplify simulation configs for easier debugging - bool simpleConfig = false; + // TODO CHANGE BACK + bool simpleConfig = true; Optional generateFearless, buggify; Optional datacenters, desiredTLogCount, commitProxyCount, grvProxyCount, resolverCount, storageEngineType, stderrSeverity, machineCount, processesPerMachine, coordinators; @@ -1582,7 +1583,14 @@ void SimulationConfig::setRegions(const TestConfig& testConfig) { void SimulationConfig::setMachineCount(const TestConfig& testConfig) { if (testConfig.machineCount.present()) { machine_count = testConfig.machineCount.get(); - } else if (generateFearless && testConfig.minimumReplication > 1) { + } + /// TODO REMOVE! + else if (testConfig.simpleConfig) { + printf("Setting machine count to 1\n"); + machine_count = 1; + } + // + else if (generateFearless && testConfig.minimumReplication > 1) { // low latency tests in fearless configurations need 4 machines per datacenter (3 for triple replication, 1 that // is down during failures). machine_count = 16; @@ -1628,7 +1636,7 @@ void SimulationConfig::setCoordinators(const TestConfig& testConfig) { void SimulationConfig::setProcessesPerMachine(const TestConfig& testConfig) { if (testConfig.processesPerMachine.present()) { processes_per_machine = testConfig.processesPerMachine.get(); - } else if (generateFearless) { + } else if (generateFearless || testConfig.simpleConfig) { // TODO CHANGE BACK processes_per_machine = 1; } else { processes_per_machine = deterministicRandom()->randomInt(1, (extraDB ? 14 : 28) / machine_count + 2); diff --git a/fdbserver/workloads/BlobGranuleVerifier.actor.cpp b/fdbserver/workloads/BlobGranuleVerifier.actor.cpp new file mode 100644 index 0000000000..2e852ac765 --- /dev/null +++ b/fdbserver/workloads/BlobGranuleVerifier.actor.cpp @@ -0,0 +1,339 @@ +/* + * BlobGranuleVerifier.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/BlobGranuleReader.actor.h" +#include "fdbclient/NativeAPI.actor.h" +#include "fdbclient/ReadYourWrites.h" +#include "fdbserver/Knobs.h" +#include "fdbserver/TesterInterface.actor.h" +#include "fdbserver/workloads/workloads.actor.h" +#include "flow/IRandom.h" +#include "flow/genericactors.actor.h" + +#include "flow/actorcompiler.h" // This must be the last #include. + +/* + * This workload is designed to verify the correctness of the blob data produced by the blob workers. + * As a read-only validation workload, it can piggyback off of other write or read/write workloads. + * To verify the data outside FDB's 5 second MVCC window, it tests time travel reads by doing an initial comparison at + * the latest read version, and then waiting a period of time to re-read the data from blob. + * To catch availability issues with the blob worker, it does a request to each granule at the end of the test. + */ +struct BlobGranuleVerifierWorkload : TestWorkload { + // TODO add delay on start so it can start with data + + bool doSetup; + double minDelay; + double maxDelay; + double testDuration; + double timeTravelLimit; + uint64_t timeTravelBufferSize; + int threads; + int64_t errors = 0; + int64_t mismatches = 0; + int64_t initialReads = 0; + int64_t timeTravelReads = 0; + int64_t rowsRead = 0; + int64_t bytesRead = 0; + vector> clients; + + Reference bstore; + AsyncVar> granuleRanges; + + BlobGranuleVerifierWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) { + doSetup = !clientId; // only do this on the "first" client + minDelay = getOption(options, LiteralStringRef("minDelay"), 0.0); + maxDelay = getOption(options, LiteralStringRef("minDelay"), 60.0); + testDuration = getOption(options, LiteralStringRef("testDuration"), 120.0); + timeTravelLimit = getOption(options, LiteralStringRef("timeTravelLimit"), 60.0); + timeTravelBufferSize = getOption(options, LiteralStringRef("timeTravelBufferSize"), 100000000); + threads = getOption(options, LiteralStringRef("threads"), 1); + ASSERT(threads >= 1); + + printf("Initializing Blob Granule Verifier s3 stuff\n"); + try { + if (g_network->isSimulated()) { + printf("Blob Granule Verifier constructing simulated backup container\n"); + bstore = BackupContainerFileSystem::openContainerFS("file://fdbblob/"); + } else { + printf("Blob Granule Verifier constructing backup container from %s\n", SERVER_KNOBS->BG_URL.c_str()); + bstore = BackupContainerFileSystem::openContainerFS(SERVER_KNOBS->BG_URL); + printf("Blob Granule Verifier constructed backup container\n"); + } + } catch (Error& e) { + printf("BW got backup container init error %s\n", e.name()); + throw e; + } + } + + // FIXME: run the actual FDBCLI command instead of copy/pasting its implementation + // Sets the whole user keyspace to be blobified + ACTOR Future setUpBlobRange(Database cx, Future waitForStart) { + state Reference tr = makeReference(cx); + wait(waitForStart); + loop { + try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr->set(blobRangeChangeKey, deterministicRandom()->randomUniqueID().toString()); + wait(krmSetRangeCoalescing( + tr, blobRangeKeys.begin, KeyRange(normalKeys), KeyRange(normalKeys), LiteralStringRef("1"))); + wait(tr->commit()); + printf("Successfully set up blob granule range for normalKeys\n"); + return Void(); + } catch (Error& e) { + wait(tr->onError(e)); + } + } + } + + std::string description() const override { return "BlobGranuleVerifier"; } + Future setup(Database const& cx) override { + if (doSetup) { + /// TODO make only one client do this!!! others wait + double initialDelay = deterministicRandom()->random01() * (maxDelay - minDelay) + minDelay; + printf("BGW setup initial delay of %.3f\n", initialDelay); + return setUpBlobRange(cx, delay(initialDelay)); + } + return delay(0); + } + + ACTOR Future findGranules(Database cx, BlobGranuleVerifierWorkload* self) { + // updates the current set of granules in the database, but on a delay, so there can be some mismatch if ranges + // change + // printf("BGV find granules starting\n"); + loop { + state std::vector allGranules; + state Transaction tr(cx); + state PromiseStream stream; + state Future reader = cx->getBlobGranuleRangesStream(stream, normalKeys); + loop { + try { + KeyRange r = waitNext(stream.getFuture()); + allGranules.push_back(r); + } catch (Error& e) { + if (e.code() == error_code_end_of_stream) { + break; + } + throw e; + } + } + wait(reader); + // printf("BG find granules found %d granules\n", allGranules.size()); + self->granuleRanges.set(allGranules); + + wait(delay(deterministicRandom()->random01() * 10.0)); + } + } + + // assumes we can read the whole range in one transaction at a single version + ACTOR Future> readFromFDB(Database cx, KeyRange range) { + state Version v; + state RangeResult out; + state Transaction tr(cx); + state KeyRange currentRange = range; + loop { + try { + RangeResult r = wait(tr.getRange(currentRange, CLIENT_KNOBS->TOO_MANY)); + out.arena().dependsOn(r.arena()); + out.append(out.arena(), r.begin(), r.size()); + if (r.more) { + currentRange = KeyRangeRef(keyAfter(r.back().key), currentRange.end); + } else { + Version _v = wait(tr.getReadVersion()); + v = _v; + break; + } + } catch (Error& e) { + wait(tr.onError(e)); + } + } + return std::pair(out, v); + } + + ACTOR Future readFromBlob(Database cx, + BlobGranuleVerifierWorkload* self, + KeyRange range, + Version version) { + state RangeResult out; + state PromiseStream> chunkStream; + state Future requester = cx->readBlobGranulesStream(chunkStream, range, 0, version); + loop { + try { + Standalone nextChunk = waitNext(chunkStream.getFuture()); + out.arena().dependsOn( + nextChunk.arena()); // TODO this wastes extra memory but we'll se e if it fixes segfault + RangeResult chunkRows = wait(readBlobGranule(nextChunk, range, version, self->bstore)); + out.arena().dependsOn(chunkRows.arena()); + out.append(out.arena(), chunkRows.begin(), chunkRows.size()); + } catch (Error& e) { + if (e.code() == error_code_end_of_stream) { + break; + } + throw e; + } + } + return out; + } + + bool compareResult(RangeResult fdb, RangeResult blob, KeyRange range, Version v, bool initialRequest) { + bool correct = fdb == blob; + if (!correct) { + mismatches++; + TraceEvent ev(SevError, "GranuleMismatch"); + ev.detail("RangeStart", range.begin) + .detail("RangeEnd", range.end) + .detail("Version", v) + .detail("RequestType", initialRequest ? "RealTime" : "TimeTravel"); + // TODO debugging details! + } + return correct; + } + + struct OldRead { + KeyRange range; + Version v; + RangeResult oldResult; + + OldRead() {} + OldRead(KeyRange range, Version v, RangeResult oldResult) : range(range), v(v), oldResult(oldResult) {} + // OldRead(const OldRead& other) : range(other.range), v(other.v), oldResult(other.oldResult) {} + }; + + ACTOR Future verifyGranules(Database cx, BlobGranuleVerifierWorkload* self) { + // TODO add time travel + verification + state double last = now(); + state double endTime = last + self->testDuration; + state std::map timeTravelChecks; + state int64_t timeTravelChecksMemory = 0; + + printf("BGV thread starting\n"); + + // wait for first set of ranges to be loaded + wait(self->granuleRanges.onChange()); + + printf("BGV got ranges\n"); + + loop { + try { + state double currentTime = now(); + state std::map::iterator timeTravelIt = timeTravelChecks.begin(); + while (timeTravelIt != timeTravelChecks.end() && currentTime >= timeTravelIt->first) { + state OldRead oldRead = timeTravelIt->second; + RangeResult reReadResult = wait(self->readFromBlob(cx, self, oldRead.range, oldRead.v)); + self->compareResult(oldRead.oldResult, reReadResult, oldRead.range, oldRead.v, false); + + timeTravelChecksMemory -= oldRead.oldResult.expectedSize(); + timeTravelIt = timeTravelChecks.erase(timeTravelIt); + self->timeTravelReads++; + } + + // pick a random range + int rIndex = deterministicRandom()->randomInt(0, self->granuleRanges.get().size()); + state KeyRange range = self->granuleRanges.get()[rIndex]; + + state std::pair fdb = wait(self->readFromFDB(cx, range)); + RangeResult blob = wait(self->readFromBlob(cx, self, range, fdb.second)); + if (self->compareResult(fdb.first, blob, range, fdb.second, true)) { + double reReadTime = currentTime + deterministicRandom()->random01() * self->timeTravelLimit; + int memory = fdb.first.expectedSize(); + if (reReadTime <= endTime && + timeTravelChecksMemory + memory <= (self->timeTravelBufferSize / self->threads)) { + timeTravelChecks[reReadTime] = OldRead(range, fdb.second, fdb.first); + timeTravelChecksMemory += memory; + } + } + self->rowsRead += fdb.first.size(); + self->bytesRead += fdb.first.expectedSize(); + self->initialReads++; + + // TODO increase frequency a lot!! just for initial testing + wait(poisson(&last, 5.0)); + // wait(poisson(&last, 0.1)); + } catch (Error& e) { + printf("BGVerifier got error %s\n", e.name()); + if (e.code() == error_code_operation_cancelled) { + return Void(); + } + self->errors++; + } + } + } + + Future start(Database const& cx) override { + clients.reserve(threads + 1); + clients.push_back(timeout(findGranules(cx, this), testDuration, Void())); + for (int i = 0; i < threads; i++) { + clients.push_back( + timeout(reportErrors(verifyGranules(cx, this), "BlobGranuleVerifier"), testDuration, Void())); + } + + printf("BGF start launched\n"); + return delay(testDuration); + } + + ACTOR Future _check(Database cx, BlobGranuleVerifierWorkload* self) { + // check error counts, and do an availability check at the end + + Transaction tr(cx); + state Version readVersion = wait(tr.getReadVersion()); + state int checks = 0; + + state vector allRanges = self->granuleRanges.get(); + for (auto& range : allRanges) { + state KeyRange r = range; + state PromiseStream> chunkStream; + printf("Final availability check [%s - %s)\n", r.begin.printable().c_str(), r.end.printable().c_str()); + state Future requester = cx->readBlobGranulesStream(chunkStream, r, 0, readVersion); + loop { + try { + // just make sure granule returns a non-error response, to ensure the range wasn't lost and the + // workers are all caught up. Kind of like a quiet database check, just for the blob workers + Standalone nextChunk = waitNext(chunkStream.getFuture()); + checks++; + } catch (Error& e) { + if (e.code() == error_code_end_of_stream) { + break; + } + printf("BG Verifier failed final availability check for [%s - %s) @ %lld with error %s\n", + r.begin.printable().c_str(), + r.end.printable().c_str(), + readVersion, + e.name()); + throw e; + } + } + } + printf("Blob Granule Verifier finished with:\n"); + printf(" %lld mismatches\n", self->mismatches); + printf(" %lld errors\n", self->errors); + printf(" %lld initial reads\n", self->initialReads); + printf(" %lld time travel reads\n", self->timeTravelReads); + printf(" %lld rows\n", self->rowsRead); + printf(" %lld bytes\n", self->bytesRead); + printf(" %d final granule checks\n", checks); + return self->mismatches == 0 && checks > 0; + } + + Future check(Database const& cx) override { return _check(cx, this); } + void getMetrics(vector& m) override {} +}; + +WorkloadFactory BlobGranuleVerifierWorkloadFactory("BlobGranuleVerifier"); \ No newline at end of file diff --git a/tests/slow/BlobGranuleCorrectness.toml b/tests/slow/BlobGranuleCorrectness.toml new file mode 100644 index 0000000000..59d6be0364 --- /dev/null +++ b/tests/slow/BlobGranuleCorrectness.toml @@ -0,0 +1,10 @@ +[[test]] +testTitle = 'BlobGranuleCorrectnessTest' + + [[test.workload]] + testName = 'WriteDuringRead' + testDuration = 120.0 + + [[test.workload]] + testName = 'BlobGranuleVerifier' + testDuration = 120.0