diff --git a/fdbclient/BackupContainerLocalDirectory.actor.cpp b/fdbclient/BackupContainerLocalDirectory.actor.cpp index b1c24da69d..4e4d82edbb 100644 --- a/fdbclient/BackupContainerLocalDirectory.actor.cpp +++ b/fdbclient/BackupContainerLocalDirectory.actor.cpp @@ -21,6 +21,7 @@ #include "fdbclient/BackupContainerLocalDirectory.h" #include "fdbrpc/AsyncFileReadAhead.actor.h" #include "flow/IAsyncFile.h" +#include "flow/FaultInjection.h" #include "flow/Platform.actor.h" #include "flow/Platform.h" #include "fdbrpc/simulator.h" @@ -67,6 +68,8 @@ public: Future r = uncancellable(holdWhile(old, m_file->write(old.begin(), size, m_writeOffset))); m_writeOffset += size; + INJECT_BLOB_FAULT(http_request_failed, "BackupContainerLocalDirectory::flush"); + return r; } @@ -77,6 +80,9 @@ public: std::string name = f->m_file->getFilename(); f->m_file.clear(); wait(IAsyncFileSystem::filesystem()->renameFile(name, f->m_finalFullPath)); + + INJECT_BLOB_FAULT(http_request_failed, "BackupContainerLocalDirectory::finish"); + return Void(); } @@ -116,6 +122,8 @@ ACTOR static Future listFiles_impl(st results.push_back({ f.substr(m_path.size() + 1), ::fileSize(f) }); } + INJECT_BLOB_FAULT(http_request_failed, "BackupContainerLocalDirectory::listFiles"); + return results; } @@ -217,6 +225,7 @@ Future> BackupContainerLocalDirectory::readFile(const std: if (usesEncryption()) { flags |= IAsyncFile::OPEN_ENCRYPTED; } + INJECT_BLOB_FAULT(http_request_failed, "BackupContainerLocalDirectory::readFile"); // Simulation does not properly handle opening the same file from multiple machines using a shared filesystem, // so create a symbolic link to make each file opening appear to be unique. This could also work in production // but only if the source directory is writeable which shouldn't be required for a restore. @@ -268,6 +277,7 @@ Future> BackupContainerLocalDirectory::readFile(const std: } Future> BackupContainerLocalDirectory::writeFile(const std::string& path) { + INJECT_BLOB_FAULT(http_request_failed, "BackupContainerLocalDirectory::writeFile"); int flags = IAsyncFile::OPEN_NO_AIO | IAsyncFile::OPEN_UNCACHED | IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_ATOMIC_WRITE_AND_CREATE | IAsyncFile::OPEN_READWRITE; if (usesEncryption()) { @@ -286,6 +296,7 @@ Future BackupContainerLocalDirectory::writeEntireFile(const std::string& p Future BackupContainerLocalDirectory::deleteFile(const std::string& path) { ::deleteFile(joinPath(m_path, path)); + INJECT_BLOB_FAULT(http_request_failed, "BackupContainerLocalDirectory::deleteFile"); return Void(); } diff --git a/fdbrpc/include/fdbrpc/SimulatorProcessInfo.h b/fdbrpc/include/fdbrpc/SimulatorProcessInfo.h index 3fe0538a19..48654cb8df 100644 --- a/fdbrpc/include/fdbrpc/SimulatorProcessInfo.h +++ b/fdbrpc/include/fdbrpc/SimulatorProcessInfo.h @@ -62,7 +62,7 @@ struct ProcessInfo : NonCopyable { INetworkConnections* network; uint64_t fault_injection_r; - double fault_injection_p1, fault_injection_p2; + double fault_injection_p1, fault_injection_p2, blob_inject_failure_rate; bool failedDisk; UID uid; @@ -82,7 +82,8 @@ struct ProcessInfo : NonCopyable { : name(name), coordinationFolder(coordinationFolder), dataFolder(dataFolder), machine(nullptr), addresses(addresses), address(addresses.address), locality(locality), startingClass(startingClass), failed(false), excluded(false), cleared(false), rebooting(false), drProcess(false), network(net), - fault_injection_r(0), fault_injection_p1(0), fault_injection_p2(0), failedDisk(false) { + fault_injection_r(0), fault_injection_p1(0), fault_injection_p2(0), blob_inject_failure_rate(0), + failedDisk(false) { uid = deterministicRandom()->randomUniqueID(); } diff --git a/fdbrpc/include/fdbrpc/simulator.h b/fdbrpc/include/fdbrpc/simulator.h index cebe52f60d..b07d1a5ebb 100644 --- a/fdbrpc/include/fdbrpc/simulator.h +++ b/fdbrpc/include/fdbrpc/simulator.h @@ -128,6 +128,8 @@ public: KillType* ktFinal = nullptr) = 0; virtual bool killAll(KillType kt, bool forceKill = false, KillType* ktFinal = nullptr) = 0; // virtual KillType getMachineKillState( UID zoneID ) = 0; + virtual void processInjectBlobFault(ProcessInfo* machine, double failureRate) = 0; + virtual void processStopInjectBlobFault(ProcessInfo* machine) = 0; virtual bool canKillProcesses(std::vector const& availableProcesses, std::vector const& deadProcesses, KillType kt, diff --git a/fdbrpc/sim2.actor.cpp b/fdbrpc/sim2.actor.cpp index a42392e9d1..20e10e81af 100644 --- a/fdbrpc/sim2.actor.cpp +++ b/fdbrpc/sim2.actor.cpp @@ -116,6 +116,29 @@ bool simulator_should_inject_fault(const char* context, const char* file, int li return false; } +bool simulator_should_inject_blob_fault(const char* context, const char* file, int line, int error_code) { + if (!g_network->isSimulated() || !faultInjectionActivated) + return false; + + auto p = g_simulator->getCurrentProcess(); + + if (!g_simulator->speedUpSimulation && deterministicRandom()->random01() < p->blob_inject_failure_rate) { + CODE_PROBE(true, "A blob fault was injected", probe::assert::simOnly, probe::context::sim2); + CODE_PROBE(error_code == error_code_http_request_failed, + "A failed http request was injected", + probe::assert::simOnly, + probe::context::sim2); + TraceEvent("BlobFaultInjected") + .detail("Context", context) + .detail("File", file) + .detail("Line", line) + .detail("ErrorCode", error_code); + return true; + } + + return false; +} + void ISimulator::disableFor(const std::string& desc, double time) { disabledMap[desc] = time; } @@ -1492,6 +1515,7 @@ public: // The following function will determine if a machine can be remove in case when it has a blob worker bool canKillMachineWithBlobWorkers(Optional> machineId, KillType kt, KillType* ktFinal) { // Allow if no blob workers, or it's a reboot(without removing the machine) + // FIXME: this should be || if (!blobGranulesEnabled && kt >= KillType::RebootAndDelete) { return true; } @@ -2337,6 +2361,18 @@ public: g_clogging.unclogPair(from, to); } + void processInjectBlobFault(ProcessInfo* machine, double failureRate) override { + CODE_PROBE(true, "Simulated process beginning blob fault", probe::context::sim2, probe::assert::simOnly); + should_inject_blob_fault = simulator_should_inject_blob_fault; + ASSERT(machine->blob_inject_failure_rate == 0.0); + machine->blob_inject_failure_rate = failureRate; + } + + void processStopInjectBlobFault(ProcessInfo* machine) override { + CODE_PROBE(true, "Simulated process stopping blob fault", probe::context::sim2, probe::assert::simOnly); + machine->blob_inject_failure_rate = 0.0; + } + std::vector getAllProcesses() const override { std::vector processes; for (auto& c : machines) { diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index a6473d5693..ece2431166 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -5230,7 +5230,8 @@ ACTOR Future monitorPurgeKeys(Reference self) { } catch (Error& e) { // These should not get an error that then causes a transaction retry loop. All error handling // should be done in the purge calls - if (e.code() == error_code_operation_cancelled || + // FIXME: retry purging if it gets blobstore errors instead of killing blob manager + if (e.code() == error_code_operation_cancelled || e.code() == error_code_http_request_failed || e.code() == error_code_blob_manager_replaced || e.code() == error_code_platform_error) { throw e; } diff --git a/fdbserver/workloads/BlobFailureInjection.actor.cpp b/fdbserver/workloads/BlobFailureInjection.actor.cpp new file mode 100644 index 0000000000..7fc381e18b --- /dev/null +++ b/fdbserver/workloads/BlobFailureInjection.actor.cpp @@ -0,0 +1,177 @@ +/* + * BlobFailureInjection.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2023 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbrpc/simulator.h" +#include "fdbserver/workloads/workloads.actor.h" +#include "flow/FaultInjection.h" +#include "flow/DeterministicRandom.h" +#include "fdbrpc/SimulatorProcessInfo.h" +#include "flow/actorcompiler.h" // This must be the last #include. + +/* + * The BlobFailureInjection workload is designed to simulate blob storage becoming temporarily flaky or unavailable, + * from a single host to the whole cluster. + * TODO: add blob storage becoming permanently flaky or unavailable on a single host, to ensure the system moves work + * away accordingly. Could also handle that through attrition workload maybe? + * FIXME: make this work outside simulation. Talk to workers like DiskFailureInjection does and add S3BlobStore and + * AzureBlobStore fault injection points. + */ +struct BlobFailureInjectionWorkload : FailureInjectionWorkload { + static constexpr auto NAME = "BlobFailureInjection"; + + bool enabled; + double enableProbability = 0.5; + double testDuration = 10.0; + + std::vector currentlyAffected; + + BlobFailureInjectionWorkload(WorkloadContext const& wcx, NoOptions) : FailureInjectionWorkload(wcx) { + enabled = !clientId && g_network->isSimulated() && faultInjectionActivated; + } + + BlobFailureInjectionWorkload(WorkloadContext const& wcx) : FailureInjectionWorkload(wcx) { + // only do this on the "first" client, and only when in simulation and only when fault injection is enabled + enabled = !clientId && g_network->isSimulated() && faultInjectionActivated; + enableProbability = getOption(options, "enableProbability"_sr, enableProbability); + testDuration = getOption(options, "testDuration"_sr, testDuration); + enabled = (enabled && deterministicRandom()->random01() < enableProbability); + } + + Future setup(Database const& cx) override { return Void(); } + Future start(Database const& cx) override { return _start(cx, this); } + + bool shouldInject(DeterministicRandom& random, + const WorkloadRequest& work, + const unsigned alreadyAdded) const override { + return alreadyAdded < 1 && work.useDatabase && 0.1 / (1 + alreadyAdded) > random.random01(); + } + + void undoFaultInjection() { + if (!currentlyAffected.empty()) { + TraceEvent("BlobFailureInjectionUnFailing").detail("Count", currentlyAffected.size()); + } + for (auto& it : currentlyAffected) { + TraceEvent("BlobFailureInjectionUnFailingProcess").detail("Addr", it->address); + g_simulator->processStopInjectBlobFault(it); + } + currentlyAffected.clear(); + } + + ACTOR Future _start(Database cx, BlobFailureInjectionWorkload* self) { + if (!self->enabled) { + return Void(); + } + + CODE_PROBE(true, "Running workload with blob failure injection"); + TraceEvent("BlobFailureInjectionBegin").log(); + + auto processes = getServers(); + deterministicRandom()->randomShuffle(processes); + + wait(timeout(reportErrors(self->worker(cx, self, processes), "BlobFailureInjectionWorkerError"), + self->testDuration, + Void())); + + // Undo all fault injection before exiting, if worker didn't + self->undoFaultInjection(); + TraceEvent("BlobFailureInjectionEnd").log(); + + return Void(); + } + + // TODO: share code with machine attrition + static std::vector getServers() { + std::vector machines; + std::vector all = g_simulator->getAllProcesses(); + for (int i = 0; i < all.size(); i++) + if (!all[i]->failed && all[i]->name == std::string("Server") && + all[i]->startingClass != ProcessClass::TesterClass) + machines.push_back(all[i]); + return machines; + } + + ACTOR Future worker(Database cx, + BlobFailureInjectionWorkload* self, + std::vector processes) { + int minFailureDuration = 5; + int maxFailureDuration = std::max(10, (int)(self->testDuration / 2)); + + state double failureDuration = + deterministicRandom()->randomSkewedUInt32(minFailureDuration, maxFailureDuration); + // add a random amount between 0 and 1, otherwise it's a whole number + failureDuration += deterministicRandom()->random01(); + state double delayBefore = + deterministicRandom()->random01() * (std::max(0.0, self->testDuration - failureDuration)); + + wait(delay(delayBefore)); + + // TODO: pick one random worker, a subset of workers, or entire cluster randomly + + int amountToFail = 1; + if (deterministicRandom()->coinflip()) { + if (deterministicRandom()->coinflip()) { + // fail all processes + amountToFail = processes.size(); + } else if (processes.size() > 3) { + // fail a random amount of processes up to half + amountToFail = deterministicRandom()->randomInt(2, std::max(3, processes.size() / 2)); + } + } // fail 1 process 50% of the time + ASSERT(amountToFail <= processes.size()); + ASSERT(amountToFail > 0); + + double failureRate; + if (deterministicRandom()->coinflip()) { + // fail all requests - blob store is completely unreachable + failureRate = 1.0; + } else { + // fail a random percentage of requests, biasing towards low percentages. + // This is based on the intuition that failing 98% of requests is not very different than failing 99%, but + // failing 0.1% vs 1% is different + failureRate = deterministicRandom()->randomSkewedUInt32(1, 1000) / 1000.0; + } + + CODE_PROBE(true, "blob failure injection killing processes"); + + TraceEvent("BlobFailureInjectionFailing") + .detail("Count", amountToFail) + .detail("Duration", failureDuration) + .detail("FailureRate", failureRate) + .log(); + for (int i = 0; i < amountToFail; i++) { + TraceEvent("BlobFailureInjectionFailingProcess").detail("Addr", processes[i]->address); + self->currentlyAffected.push_back(processes[i]); + g_simulator->processInjectBlobFault(processes[i], failureRate); + } + + wait(delay(failureDuration)); + + self->undoFaultInjection(); + + return Void(); + } + + Future check(Database const& cx) override { return true; } + void getMetrics(std::vector& m) override {} +}; + +WorkloadFactory BlobFailureInjectionWorkloadFactory; +// TODO enable once bugs fixed! +// FailureInjectorFactory BlobFailureInjectionFailureWorkloadFactory; diff --git a/flow/FaultInjection.cpp b/flow/FaultInjection.cpp index d309574ae5..786c585d93 100644 --- a/flow/FaultInjection.cpp +++ b/flow/FaultInjection.cpp @@ -21,6 +21,7 @@ #include "flow/FaultInjection.h" bool (*should_inject_fault)(const char* context, const char* file, int line, int error_code) = 0; +bool (*should_inject_blob_fault)(const char* context, const char* file, int line, int error_code) = 0; bool faultInjectionActivated = true; void enableFaultInjection(bool enabled) { diff --git a/flow/include/flow/FaultInjection.h b/flow/include/flow/FaultInjection.h index 0bbc6799c5..1125fd5f7f 100644 --- a/flow/include/flow/FaultInjection.h +++ b/flow/include/flow/FaultInjection.h @@ -31,11 +31,23 @@ #define SHOULD_INJECT_FAULT(context) (should_inject_fault && should_inject_fault(context, __FILE__, __LINE__, 0)) +#define INJECT_BLOB_FAULT(error_type, context) \ + do { \ + if (should_inject_blob_fault && \ + should_inject_blob_fault(context, __FILE__, __LINE__, error_code_##error_type)) \ + throw error_type().asInjectedFault(); \ + } while (0) + +#define SHOULD_INJECT_BLOB_FAULT(context) \ + (should_inject_blob_fault && should_inject_blob_fault(context, __FILE__, __LINE__, 0)) + extern bool (*should_inject_fault)(const char* context, const char* file, int line, int error_code); +extern bool (*should_inject_blob_fault)(const char* context, const char* file, int line, int error_code); extern bool faultInjectionActivated; extern void enableFaultInjection(bool enabled); // Enable fault injection called from fdbserver actor main function #else #define INJECT_FAULT(error_type, context) +#define INJECT_BLOB_FAULT(error_type, context) #endif #endif \ No newline at end of file diff --git a/tests/fast/BlobGranuleMoveVerifyCycle.toml b/tests/fast/BlobGranuleMoveVerifyCycle.toml index 421f3003c5..b7f99616a0 100644 --- a/tests/fast/BlobGranuleMoveVerifyCycle.toml +++ b/tests/fast/BlobGranuleMoveVerifyCycle.toml @@ -44,3 +44,7 @@ testTitle = 'BlobGranuleMoveVerifyCycle' machinesToLeave = 3 reboot = true testDuration = 60.0 + + [[test.workload]] + testName = 'BlobFailureInjection' + testDuration = 60.0 diff --git a/tests/fast/BlobGranuleVerifyAtomicOps.toml b/tests/fast/BlobGranuleVerifyAtomicOps.toml index a9756043e9..5141a94c97 100644 --- a/tests/fast/BlobGranuleVerifyAtomicOps.toml +++ b/tests/fast/BlobGranuleVerifyAtomicOps.toml @@ -39,3 +39,7 @@ testTitle = 'BlobGranuleVerifyAtomicOps' machinesToLeave = 3 reboot = true testDuration = 30.0 + + [[test.workload]] + testName = 'BlobFailureInjection' + testDuration = 30.0 diff --git a/tests/fast/BlobGranuleVerifyCycle.toml b/tests/fast/BlobGranuleVerifyCycle.toml index d5db339a34..86d6c0d902 100644 --- a/tests/fast/BlobGranuleVerifyCycle.toml +++ b/tests/fast/BlobGranuleVerifyCycle.toml @@ -42,3 +42,7 @@ testTitle = 'BlobGranuleVerifyCycle' machinesToLeave = 3 reboot = true testDuration = 60.0 + + [[test.workload]] + testName = 'BlobFailureInjection' + testDuration = 60.0 \ No newline at end of file diff --git a/tests/fast/BlobGranuleVerifySmall.toml b/tests/fast/BlobGranuleVerifySmall.toml index c761441d8b..2c97c83017 100644 --- a/tests/fast/BlobGranuleVerifySmall.toml +++ b/tests/fast/BlobGranuleVerifySmall.toml @@ -41,3 +41,7 @@ testTitle = 'BlobGranuleVerifySmall' reboot = true testDuration = 60.0 + [[test.workload]] + testName = 'BlobFailureInjection' + testDuration = 60.0 + diff --git a/tests/rare/BlobGranuleRanges.toml b/tests/rare/BlobGranuleRanges.toml index 3ae8e10876..f896f28b1f 100644 --- a/tests/rare/BlobGranuleRanges.toml +++ b/tests/rare/BlobGranuleRanges.toml @@ -34,3 +34,7 @@ testTitle = 'BlobGranuleRanges' reboot = true testDuration = 30.0 + [[test.workload]] + testName = 'BlobFailureInjection' + testDuration = 30.0 + diff --git a/tests/restarting/from_7.3.0/BlobGranuleRestartCycle-1.toml b/tests/restarting/from_7.3.0/BlobGranuleRestartCycle-1.toml index ce915ae960..a06262654f 100644 --- a/tests/restarting/from_7.3.0/BlobGranuleRestartCycle-1.toml +++ b/tests/restarting/from_7.3.0/BlobGranuleRestartCycle-1.toml @@ -54,6 +54,10 @@ clearAfterTest=false reboot = true testDuration = 30.0 + [[test.workload]] + testName = 'BlobFailureInjection' + testDuration = 30.0 + [[test.workload]] testName='SaveAndKill' restartInfoLocation='simfdb/restartInfo.ini' diff --git a/tests/restarting/from_7.3.0/BlobGranuleRestartCycle-2.toml b/tests/restarting/from_7.3.0/BlobGranuleRestartCycle-2.toml index 1bad4892d6..b94ba0d131 100644 --- a/tests/restarting/from_7.3.0/BlobGranuleRestartCycle-2.toml +++ b/tests/restarting/from_7.3.0/BlobGranuleRestartCycle-2.toml @@ -44,6 +44,10 @@ runSetup=false reboot = true testDuration = 30.0 + [[test.workload]] + testName = 'BlobFailureInjection' + testDuration = 30.0 + [[test.workload]] testName = 'BlobGranuleVerifier' testDuration = 30.0 diff --git a/tests/restarting/from_7.3.0/BlobGranuleRestartLarge-1.toml b/tests/restarting/from_7.3.0/BlobGranuleRestartLarge-1.toml index 7d62d02f1d..49d2b3cee1 100644 --- a/tests/restarting/from_7.3.0/BlobGranuleRestartLarge-1.toml +++ b/tests/restarting/from_7.3.0/BlobGranuleRestartLarge-1.toml @@ -62,6 +62,10 @@ clearAfterTest=false reboot = true testDuration = 60.0 + [[test.workload]] + testName = 'BlobFailureInjection' + testDuration = 60.0 + [[test.workload]] testName='SaveAndKill' restartInfoLocation='simfdb/restartInfo.ini' diff --git a/tests/restarting/from_7.3.0/BlobGranuleRestartLarge-2.toml b/tests/restarting/from_7.3.0/BlobGranuleRestartLarge-2.toml index 5aff485d3e..1dd539f091 100644 --- a/tests/restarting/from_7.3.0/BlobGranuleRestartLarge-2.toml +++ b/tests/restarting/from_7.3.0/BlobGranuleRestartLarge-2.toml @@ -52,6 +52,10 @@ runSetup=false reboot = true testDuration = 60.0 + [[test.workload]] + testName = 'BlobFailureInjection' + testDuration = 60.0 + [[test.workload]] testName = 'BlobGranuleVerifier' testDuration = 60.0 diff --git a/tests/slow/BlobGranuleCorrectness.toml b/tests/slow/BlobGranuleCorrectness.toml index ff5758ed74..2614f60790 100644 --- a/tests/slow/BlobGranuleCorrectness.toml +++ b/tests/slow/BlobGranuleCorrectness.toml @@ -40,3 +40,7 @@ testTitle = 'BlobGranuleCorrectness' machinesToLeave = 3 reboot = true testDuration = 120.0 + + [[test.workload]] + testName = 'BlobFailureInjection' + testDuration = 120.0 diff --git a/tests/slow/BlobGranuleVerifyBalance.toml b/tests/slow/BlobGranuleVerifyBalance.toml index 57956ef0f1..b5425a61b9 100644 --- a/tests/slow/BlobGranuleVerifyBalance.toml +++ b/tests/slow/BlobGranuleVerifyBalance.toml @@ -52,3 +52,7 @@ testTitle = 'BlobGranuleVerifyBalance' maxDelay = 100 kill1Timeout = 30 kill2Timeout = 6000 + + [[test.workload]] + testName = 'BlobFailureInjection' + testDuration = 120.0 diff --git a/tests/slow/BlobGranuleVerifyLarge.toml b/tests/slow/BlobGranuleVerifyLarge.toml index 219724a66f..5056a6482a 100644 --- a/tests/slow/BlobGranuleVerifyLarge.toml +++ b/tests/slow/BlobGranuleVerifyLarge.toml @@ -49,3 +49,7 @@ testTitle = 'BlobGranuleVerifyLarge' reboot = true testDuration = 120.0 + [[test.workload]] + testName = 'BlobFailureInjection' + testDuration = 120.0 +