diff --git a/fdbclient/ClientWorkerInterface.h b/fdbclient/ClientWorkerInterface.h index cff4172387..b73c43ebd7 100644 --- a/fdbclient/ClientWorkerInterface.h +++ b/fdbclient/ClientWorkerInterface.h @@ -31,8 +31,10 @@ // A ClientWorkerInterface is embedded as the first element of a WorkerInterface. struct ClientWorkerInterface { constexpr static FileIdentifier file_identifier = 12418152; + RequestStream<struct RebootRequest> reboot; RequestStream<struct ProfilerRequest> profiler; + RequestStream<struct SetFailureInjection> setFailureInjection; bool operator==(ClientWorkerInterface const& r) const { return id() == r.id(); } bool operator!=(ClientWorkerInterface const& r) const { return id() != r.id(); } @@ -43,7 +45,7 @@ struct ClientWorkerInterface { template <class Ar> void serialize(Ar& ar) { - serializer(ar, reboot, profiler); + serializer(ar, reboot, profiler, setFailureInjection); } }; @@ -88,4 +90,23 @@ struct ProfilerRequest { } }; +struct SetFailureInjection { + constexpr static FileIdentifier file_identifier = 15439864; + ReplyPromise<Void> reply; + struct ThrottleDiskCommand { + double time; + Optional<NetworkAddress> address; // TODO: NEELAM: how do we identify the machine + + template <class Ar> + void serialize(Ar& ar) { + serializer(ar, time, address); + } + }; + Optional<ThrottleDiskCommand> throttleDisk; + + template <class Ar> + void serialize(Ar& ar) { + serializer(ar, reply, throttleDisk); + } +}; #endif diff --git a/fdbrpc/AsyncFileEIO.actor.h b/fdbrpc/AsyncFileEIO.actor.h index 44fe6448db..c962e60098 100644 --- a/fdbrpc/AsyncFileEIO.actor.h +++ b/fdbrpc/AsyncFileEIO.actor.h @@ -162,14 +162,16 @@ public: Future<int> read(void* data, int length, int64_t offset) override { ++countFileLogicalReads; ++countLogicalReads; - return read_impl(fd, data, length, offset); + double throttleFor = diskFailureInjector->getDiskDelay(); + return read_impl(fd, data, length, offset, throttleFor); } Future<Void> write(void const* data, int length, int64_t offset) override // Copies data synchronously { ++countFileLogicalWrites; ++countLogicalWrites; + double throttleFor = diskFailureInjector->getDiskDelay(); // Standalone<StringRef> copy = StringRef((const uint8_t*)data, length); - return write_impl(fd, err, StringRef((const uint8_t*)data, length), offset); + return write_impl(fd, err, StringRef((const uint8_t*)data, length), offset, throttleFor); } Future<Void> truncate(int64_t size) override { ++countFileLogicalWrites; @@ -270,6 +272,7 @@ private: int fd, flags; Reference<ErrorInfo> err; std::string filename; + //DiskFailureInjector* diskFailureInjector; mutable Int64MetricHandle countFileLogicalWrites; mutable Int64MetricHandle countFileLogicalReads; @@ -277,7 +280,8 @@ private: mutable Int64MetricHandle countLogicalReads; AsyncFileEIO(int fd, int flags, std::string const& filename) - : fd(fd), flags(flags), filename(filename), err(new ErrorInfo) { + : fd(fd), flags(flags), filename(filename), err(new ErrorInfo), + diskFailureInjector(DiskFailureInjector::injector()) { if (!g_network->isSimulated()) { countFileLogicalWrites.init(LiteralStringRef("AsyncFile.CountFileLogicalWrites"), filename); countFileLogicalReads.init(LiteralStringRef("AsyncFile.CountFileLogicalReads"), filename); @@ -329,13 +333,18 @@ private: TraceEvent("AsyncFileClosed").suppressFor(1.0).detail("Fd", fd); } - ACTOR static Future<int> read_impl(int fd, void* data, int length, int64_t offset) { + ACTOR static Future<int> read_impl(int fd, void* data, int length, int64_t offset, double throttleFor) { state TaskPriority taskID = g_network->getCurrentTask(); state Promise<Void> p; // fprintf(stderr, "eio_read (fd=%d length=%d offset=%lld)\n", fd, length, offset); state eio_req* r = eio_read(fd, data, length, offset, 0, eio_callback, &p); try { wait(p.getFuture()); + // throttleDisk if enabled + //double throttleFor = diskFailureInjector->getDiskDelay(); + if (throttleFor > 0.0) { + wait(delay(throttleFor)); + } } catch (...) { g_network->setCurrentTask(taskID); eio_cancel(r); @@ -358,12 +367,17 @@ private: } } - ACTOR static Future<Void> write_impl(int fd, Reference<ErrorInfo> err, StringRef data, int64_t offset) { + ACTOR static Future<Void> write_impl(int fd, Reference<ErrorInfo> err, StringRef data, int64_t offset, double throttleFor) { state TaskPriority taskID = g_network->getCurrentTask(); state Promise<Void> p; state eio_req* r = eio_write(fd, (void*)data.begin(), data.size(), offset, 0, eio_callback, &p); try { wait(p.getFuture()); + // throttleDisk if enabled + //double throttleFor = diskFailureInjector->getDiskDelay(); + if (throttleFor > 0.0) { + wait(delay(throttleFor)); + } } catch (...) { g_network->setCurrentTask(taskID); eio_cancel(r); @@ -553,6 +567,8 @@ private: static void apple_fsync(eio_req* req) { req->result = fcntl(req->int1, F_FULLFSYNC, 0); } static void free_req(eio_req* req) { free(req); } #endif +public: + DiskFailureInjector* diskFailureInjector; }; #ifdef FILESYSTEM_IMPL diff --git a/fdbrpc/AsyncFileKAIO.actor.h b/fdbrpc/AsyncFileKAIO.actor.h index 5e6592e6ba..15553a85e2 100644 --- a/fdbrpc/AsyncFileKAIO.actor.h +++ b/fdbrpc/AsyncFileKAIO.actor.h @@ -195,7 +195,10 @@ public: void addref() override { ReferenceCounted<AsyncFileKAIO>::addref(); } void delref() override { ReferenceCounted<AsyncFileKAIO>::delref(); } - + ACTOR static void throttleDisk(double throttleFor) { + if (throttleFor > 0.0) + wait(delay(throttleFor)); + } Future<int> read(void* data, int length, int64_t offset) override { ++countFileLogicalReads; ++countLogicalReads; @@ -213,6 +216,9 @@ public: enqueue(io, "read", this); Future<int> result = io->result.getFuture(); + // throttleDisk if enabled + throttleDisk(diskFailureInjector->getDiskDelay()); + #if KAIO_LOGGING // result = map(result, [=](int r) mutable { KAIOLogBlockEvent(io, OpLogEntry::READY, r); return r; }); #endif @@ -238,6 +244,9 @@ public: enqueue(io, "write", this); Future<int> result = io->result.getFuture(); + // throttleDisk if enabled + throttleDisk(diskFailureInjector->getDiskDelay()); + #if KAIO_LOGGING // result = map(result, [=](int r) mutable { KAIOLogBlockEvent(io, OpLogEntry::READY, r); return r; }); #endif @@ -749,6 +758,9 @@ private: } } } + +public: + DiskFailureInjector* diskFailureInjector; }; #if KAIO_LOGGING diff --git a/fdbrpc/IAsyncFile.h b/fdbrpc/IAsyncFile.h index ed703514c6..f21760cb00 100644 --- a/fdbrpc/IAsyncFile.h +++ b/fdbrpc/IAsyncFile.h @@ -34,6 +34,7 @@ // must complete or cancel, but you should probably look at the file implementations you'll be using. class IAsyncFile { public: + //explicit IAsyncFile() : diskFailureInjector(DiskFailureInjector::injector()) {} virtual ~IAsyncFile(); // Pass these to g_network->open to get an IAsyncFile enum { @@ -95,6 +96,9 @@ public: // Used for rate control, at present, only AsyncFileCached supports it virtual Reference<IRateControl> const& getRateControl() { throw unsupported_operation(); } virtual void setRateControl(Reference<IRateControl> const& rc) { throw unsupported_operation(); } + +//public: + //DiskFailureInjector* diskFailureInjector; }; typedef void (*runCycleFuncPtr)(); diff --git a/fdbrpc/sim2.actor.cpp b/fdbrpc/sim2.actor.cpp index ee735b963a..093ef389ac 100644 --- a/fdbrpc/sim2.actor.cpp +++ b/fdbrpc/sim2.actor.cpp @@ -1949,6 +1949,13 @@ public: void clogPair(const IPAddress& from, const IPAddress& to, double seconds) override { g_clogging.clogPairFor(from, to, seconds); } + void throttleDisk(ProcessInfo* machine, double seconds) override { + machine->throttleDiskFor = seconds; + TraceEvent("ThrottleDisk").detail("Delay", seconds). + detail("Roles", getRoles(machine->address)). + detail("Address", machine->address). + detail("StartingClass", machine->startingClass.toString()); + } std::vector<ProcessInfo*> getAllProcesses() const override { std::vector<ProcessInfo*> processes; for (auto& c : machines) { @@ -2390,11 +2397,19 @@ Future<Void> waitUntilDiskReady(Reference<DiskParameters> diskParameters, int64_ diskParameters->nextOperation += (1.0 / diskParameters->iops) + (size / diskParameters->bandwidth); double randomLatency; - if (sync) { + if (g_simulator.getCurrentProcess()->throttleDiskFor) { + randomLatency = g_simulator.getCurrentProcess()->throttleDiskFor; + TraceEvent("WaitUntilDiskReadyThrottling") + .detail("Delay", randomLatency); + } else if (sync) { randomLatency = .005 + deterministicRandom()->random01() * (BUGGIFY ? 1.0 : .010); } else randomLatency = 10 * deterministicRandom()->random01() / diskParameters->iops; + TraceEvent("WaitUntilDiskReady").detail("Delay", randomLatency). + detail("Roles", g_simulator.getRoles(g_simulator.getCurrentProcess()->address)). + detail("Address", g_simulator.getCurrentProcess()->address). + detail("ThrottleDiskFor", g_simulator.getCurrentProcess()->throttleDiskFor); return delayUntil(diskParameters->nextOperation + randomLatency); } diff --git a/fdbrpc/simulator.h b/fdbrpc/simulator.h index 6404eafc17..1da850e48c 100644 --- a/fdbrpc/simulator.h +++ b/fdbrpc/simulator.h @@ -87,6 +87,7 @@ public: uint64_t fault_injection_r; double fault_injection_p1, fault_injection_p2; bool failedDisk; + double throttleDiskFor; UID uid; @@ -102,7 +103,7 @@ public: : name(name), locality(locality), startingClass(startingClass), addresses(addresses), address(addresses.address), dataFolder(dataFolder), network(net), coordinationFolder(coordinationFolder), failed(false), excluded(false), rebooting(false), fault_injection_p1(0), fault_injection_p2(0), - fault_injection_r(0), machine(0), cleared(false), failedDisk(false) { + fault_injection_r(0), machine(0), cleared(false), failedDisk(false), throttleDiskFor(0) { uid = deterministicRandom()->randomUniqueID(); } @@ -374,6 +375,7 @@ public: virtual void clogInterface(const IPAddress& ip, double seconds, ClogMode mode = ClogDefault) = 0; virtual void clogPair(const IPAddress& from, const IPAddress& to, double seconds) = 0; + virtual void throttleDisk(ProcessInfo* machine, double seconds) = 0; virtual std::vector<ProcessInfo*> getAllProcesses() const = 0; virtual ProcessInfo* getProcessByAddress(NetworkAddress const& address) = 0; virtual MachineInfo* getMachineByNetworkAddress(NetworkAddress const& address) = 0; @@ -462,8 +464,9 @@ struct DiskParameters : ReferenceCounted<DiskParameters> { double nextOperation; int64_t iops; int64_t bandwidth; + double throttleFor; - DiskParameters(int64_t iops, int64_t bandwidth) : nextOperation(0), iops(iops), bandwidth(bandwidth) {} + DiskParameters(int64_t iops, int64_t bandwidth) : nextOperation(0), iops(iops), bandwidth(bandwidth), throttleFor(0) {} }; // Simulates delays for performing operations on disk diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 0f7d5dc860..efa2c7fbf1 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -158,6 +158,7 @@ set(FDBSERVER_SRCS workloads/ChangeConfig.actor.cpp workloads/ClientTransactionProfileCorrectness.actor.cpp workloads/TriggerRecovery.actor.cpp + workloads/DiskThrottling.actor.cpp workloads/SuspendProcesses.actor.cpp workloads/CommitBugCheck.actor.cpp workloads/ConfigureDatabase.actor.cpp diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index ad91d4dd34..dd6ee5e39d 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -1209,6 +1209,10 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile, state Reference<AsyncVar<std::set<std::string>>> issues(new AsyncVar<std::set<std::string>>()); + if (FLOW_KNOBS->ENABLE_CHAOS_FEATURES) { + TraceEvent(SevWarnAlways, "ChaosFeaturesEnabled"); + } + folder = abspath(folder); if (metricsPrefix.size() > 0) { @@ -1509,6 +1513,16 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile, flushAndExit(0); } } + when(SetFailureInjection req = waitNext(interf.clientInterface.setFailureInjection.getFuture())) { + if (FLOW_KNOBS->ENABLE_CHAOS_FEATURES) { + if (req.throttleDisk.present()) { + DiskFailureInjector::injector()->throttleFor(req.throttleDisk.get().time); + } + req.reply.send(Void()); + } else { + req.reply.sendError(client_invalid_operation()); + } + } when(ProfilerRequest req = waitNext(interf.clientInterface.profiler.getFuture())) { state ProfilerRequest profilerReq = req; // There really isn't a great "filepath sanitizer" or "filepath escape" function available, diff --git a/flow/Knobs.cpp b/flow/Knobs.cpp index 7ceeb95801..1d91a7e8da 100644 --- a/flow/Knobs.cpp +++ b/flow/Knobs.cpp @@ -64,6 +64,10 @@ void FlowKnobs::initialize(Randomize _randomize, IsSimulated _isSimulated) { init( HUGE_ARENA_LOGGING_BYTES, 100e6 ); init( HUGE_ARENA_LOGGING_INTERVAL, 5.0 ); + // Chaos testing + init( ENABLE_CHAOS_FEATURES, false ); + + init( WRITE_TRACING_ENABLED, true ); if( randomize && BUGGIFY ) WRITE_TRACING_ENABLED = false; init( TRACING_UDP_LISTENER_PORT, 8889 ); // Only applicable if TracerType is set to a network option. diff --git a/flow/Knobs.h b/flow/Knobs.h index ef4fdcf2af..340848b68f 100644 --- a/flow/Knobs.h +++ b/flow/Knobs.h @@ -98,6 +98,9 @@ public: double HUGE_ARENA_LOGGING_BYTES; double HUGE_ARENA_LOGGING_INTERVAL; + // Chaos testing + bool ENABLE_CHAOS_FEATURES; + bool WRITE_TRACING_ENABLED; int TRACING_UDP_LISTENER_PORT; diff --git a/flow/network.h b/flow/network.h index 00f430fb86..d174601fec 100644 --- a/flow/network.h +++ b/flow/network.h @@ -486,7 +486,8 @@ public: enNetworkAddressesFunc = 11, enClientFailureMonitor = 12, enSQLiteInjectedError = 13, - enGlobalConfig = 14 + enGlobalConfig = 14, + enFailureInjector = 15 }; virtual void longTaskCheck(const char* name) {} @@ -646,4 +647,39 @@ public: // Returns the interface that should be used to make and accept socket connections }; +struct DiskFailureInjector : FastAllocated<DiskFailureInjector> { + static DiskFailureInjector* injector() { + auto res = g_network->global(INetwork::enFailureInjector); + if (!res) { + res = new DiskFailureInjector(); + g_network->setGlobal(INetwork::enFailureInjector, res); + } + return static_cast<DiskFailureInjector*>(res); + } + + //double getSendDelay(NetworkAddress const& peer); + //double getReceiveDelay(NetworkAddress const& peer); + + //virtual void throttleFor(double time) = 0; + //virtual double getDiskDelay() = 0; + + void throttleFor(double time) { + throttleUntil = std::max(throttleUntil, timer_monotonic() + time); + } + + double getDiskDelay() { + if (!FLOW_KNOBS->ENABLE_CHAOS_FEATURES) { + return 0.0; + } + return throttleUntil; + } + +private: // members + double throttleUntil = 0.0; + +private: // construction + DiskFailureInjector() = default; + DiskFailureInjector(DiskFailureInjector const&) = delete; +}; + #endif diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 913b39413b..5a5bf2c208 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -124,6 +124,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES fast/ConstrainedRandomSelector.toml) add_fdb_test(TEST_FILES fast/CycleAndLock.toml) add_fdb_test(TEST_FILES fast/CycleTest.toml) + add_fdb_test(TEST_FILES fast/DiskThrottledCycle.toml IGNORE) add_fdb_test(TEST_FILES fast/FuzzApiCorrectness.toml) add_fdb_test(TEST_FILES fast/FuzzApiCorrectnessClean.toml) add_fdb_test(TEST_FILES fast/IncrementalBackup.toml)