From 2b5a96f745e23d557ce725a068ba5ace0145a2f6 Mon Sep 17 00:00:00 2001 From: negoyal Date: Wed, 7 Jul 2021 23:58:14 -0700 Subject: [PATCH] Single code path for sim and non-sim modes. --- fdbrpc/AsyncFileNonDurable.actor.h | 43 ++++++++++++++++---- fdbserver/worker.actor.cpp | 1 + fdbserver/workloads/DiskThrottling.actor.cpp | 20 ++++----- flow/Knobs.cpp | 2 +- flow/network.h | 8 ++-- 5 files changed, 50 insertions(+), 24 deletions(-) diff --git a/fdbrpc/AsyncFileNonDurable.actor.h b/fdbrpc/AsyncFileNonDurable.actor.h index f813c1a354..0c63846169 100644 --- a/fdbrpc/AsyncFileNonDurable.actor.h +++ b/fdbrpc/AsyncFileNonDurable.actor.h @@ -31,6 +31,7 @@ #include "flow/flow.h" #include "fdbrpc/IAsyncFile.h" #include "flow/ActorCollection.h" +#include "flow/network.h" #include "fdbrpc/simulator.h" #include "fdbrpc/TraceFileIO.h" #include "fdbrpc/RangeMap.h" @@ -61,7 +62,7 @@ private: Future shutdown; public: - explicit AsyncFileDetachable(Reference file) : file(file) { shutdown = doShutdown(this); } + explicit AsyncFileDetachable(Reference file) : file(file), diskFailureInjector(DiskFailureInjector::injector()) { shutdown = doShutdown(this); } ACTOR Future doShutdown(AsyncFileDetachable* self) { wait(success(g_simulator.getCurrentProcess()->shutdownSignal.getFuture())); @@ -84,12 +85,20 @@ public: Future read(void* data, int length, int64_t offset) override { if (!file.getPtr() || g_simulator.getCurrentProcess()->shutdownSignal.getFuture().isReady()) return io_error().asInjectedFault(); + // throttleDisk if enabled + auto throttleFor = diskFailureInjector->getDiskDelay(); + if (throttleFor > 0.0) { + TraceEvent("AsyncFileDetachable_Read").detail("ThrottleDelay", throttleFor); + //wait(delay(throttleFor)); + } return sendErrorOnShutdown(file->read(data, length, offset)); } Future write(void const* data, int length, int64_t offset) override { if (!file.getPtr() || g_simulator.getCurrentProcess()->shutdownSignal.getFuture().isReady()) return io_error().asInjectedFault(); + if (diskFailureInjector->getDiskDelay() > 0.0) + TraceEvent("AsyncFileDetachable_Write").detail("ThrottleDelay", diskFailureInjector->getDiskDelay()); return sendErrorOnShutdown(file->write(data, length, offset)); } @@ -121,6 +130,8 @@ public: throw io_error().asInjectedFault(); return file->getFilename(); } +public: + DiskFailureInjector* diskFailureInjector; }; // An async file implementation which wraps another async file and will randomly destroy sectors that it is writing when @@ -190,11 +201,12 @@ private: Reference diskParameters, NetworkAddress openedAddress, bool aio) - : filename(filename), initialFilename(initialFilename), file(file), diskParameters(diskParameters), - openedAddress(openedAddress), pendingModifications(uint64_t(-1)), approximateSize(0), reponses(false), - aio(aio) { + : filename(filename), initialFilename(initialFilename), file(file), diskParameters(diskParameters), + openedAddress(openedAddress), pendingModifications(uint64_t(-1)), approximateSize(0), reponses(false), + aio(aio), diskFailureInjector(DiskFailureInjector::injector()) + { - // This is only designed to work in simulation + // This is only designed to work in simulation ASSERT(g_network->isSimulated()); this->id = deterministicRandom()->randomUniqueID(); @@ -309,7 +321,7 @@ public: // Passes along reads straight to the underlying file, waiting for any outstanding changes that could affect the // results - Future read(void* data, int length, int64_t offset) override { return read(this, data, length, offset); } + Future read(void* data, int length, int64_t offset) override { return read(this, data, length, offset, diskFailureInjector->getDiskDelay()); } // Writes data to the file. Writes are delayed a random amount of time before being // passed to the underlying file @@ -324,7 +336,7 @@ public: Promise writeStarted; Promise> writeEnded; - writeEnded.send(write(this, writeStarted, writeEnded.getFuture(), data, length, offset)); + writeEnded.send(write(this, writeStarted, writeEnded.getFuture(), data, length, offset, diskFailureInjector->getDiskDelay())); return writeStarted.getFuture(); } @@ -432,7 +444,7 @@ private: return readFuture.get(); } - ACTOR Future read(AsyncFileNonDurable* self, void* data, int length, int64_t offset) { + ACTOR Future read(AsyncFileNonDurable* self, void* data, int length, int64_t offset, double throttleFor = 0.0) { state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess(); state TaskPriority currentTaskID = g_network->getCurrentTask(); wait(g_simulator.onMachine(currentProcess)); @@ -441,6 +453,11 @@ private: state int rep = wait(self->onRead(self, data, length, offset)); wait(g_simulator.onProcess(currentProcess, currentTaskID)); + // throttleDisk if enabled + if (throttleFor > 0.0) { + TraceEvent("AsyncFileNonDurable_ReadDone", self->id).detail("ThrottleDelay", throttleFor).detail("Filename", self->filename).detail("ReadLength", length).detail("Offset", offset); + wait(delay(throttleFor)); + } return rep; } catch (Error& e) { state Error err = e; @@ -457,7 +474,8 @@ private: Future> ownFuture, void const* data, int length, - int64_t offset) { + int64_t offset, + double throttleFor = 0.0) { state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess(); state TaskPriority currentTaskID = g_network->getCurrentTask(); wait(g_simulator.onMachine(currentProcess)); @@ -621,6 +639,11 @@ private: } wait(waitForAll(writeFutures)); + // throttleDisk if enabled + if (throttleFor > 0.0) { + TraceEvent("AsyncFileNonDurable_WriteDone", self->id).detail("ThrottleDelay", throttleFor).detail("Filename", self->filename).detail("WriteLength", length).detail("Offset", offset); + wait(delay(throttleFor)); + } //TraceEvent("AsyncFileNonDurable_WriteDone", self->id).detail("Delay", delayDuration).detail("Filename", self->filename).detail("WriteLength", length).detail("Offset", offset); return Void(); } @@ -866,6 +889,8 @@ private: throw err; } } +public: + DiskFailureInjector* diskFailureInjector; }; #include "flow/unactorcompiler.h" diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index dd6ee5e39d..4a99e02265 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -1516,6 +1516,7 @@ ACTOR Future workerServer(Reference connFile, when(SetFailureInjection req = waitNext(interf.clientInterface.setFailureInjection.getFuture())) { if (FLOW_KNOBS->ENABLE_CHAOS_FEATURES) { if (req.throttleDisk.present()) { + TraceEvent("DiskThrottleRequest").detail("Delay", req.throttleDisk.get().time); DiskFailureInjector::injector()->throttleFor(req.throttleDisk.get().time); } req.reply.send(Void()); diff --git a/fdbserver/workloads/DiskThrottling.actor.cpp b/fdbserver/workloads/DiskThrottling.actor.cpp index 1eb4c1639d..61c465ae7f 100644 --- a/fdbserver/workloads/DiskThrottling.actor.cpp +++ b/fdbserver/workloads/DiskThrottling.actor.cpp @@ -28,12 +28,13 @@ struct DiskThrottlingWorkload : TestWorkload { Future setup(Database const& cx) override { return Void(); } Future start(Database const& cx) override { - if (&g_simulator == g_network && enabled) { - TraceEvent("DiskThrottlingStart").detail("For", throttleFor); - return timeout(reportErrors(throttleDiskClient(cx, this), "DiskThrottlingError"), - testDuration, - Void()); - } else if (enabled) { + //if (&g_simulator == g_network && enabled) { + // TraceEvent("DiskThrottlingStart").detail("For", throttleFor); + // return timeout(reportErrors(throttleDiskClient(cx, this), "DiskThrottlingError"), + // testDuration, + // Void()); + //} else + if (enabled) { return timeout(reportErrors(throttleDiskClient(cx, this), "DiskThrottlingError"), testDuration, Void()); @@ -45,7 +46,7 @@ struct DiskThrottlingWorkload : TestWorkload { void getMetrics(vector& m) override {} - ACTOR void doThrottle(ISimulator::ProcessInfo* machine, double t, double delay = 0.0) { + ACTOR void doThrottle_unused(ISimulator::ProcessInfo* machine, double t, double delay = 0.0) { wait(::delay(delay)); TraceEvent("ThrottleDisk").detail("For", t); g_simulator.throttleDisk(machine, t); @@ -79,13 +80,13 @@ struct DiskThrottlingWorkload : TestWorkload { checkDiskThrottleResult(res, worker); } - static Future getAllWorkers(DiskThrottlingWorkload* self, std::vector* result) { + static Future getAllWorkers_unused(DiskThrottlingWorkload* self, std::vector* result) { result->clear(); *result = g_simulator.getAllProcesses(); return Void(); } - static Future getAllStorageWorkers(Database cx, DiskThrottlingWorkload* self, std::vector* result) { + static Future getAllStorageWorkers_unused(Database cx, DiskThrottlingWorkload* self, std::vector* result) { vector all = g_simulator.getAllProcesses(); for (int i = 0; i < all.size(); i++) if (!all[i]->failed && @@ -123,7 +124,6 @@ struct DiskThrottlingWorkload : TestWorkload { loop { wait(poisson(&lastTime, 1)); wait(DiskThrottlingWorkload::getAllStorageWorkers(cx, self, &machines)); - //wait(DiskThrottlingWorkload::getAllWorkers(self, &machines)); auto machine = deterministicRandom()->randomChoice(machines); TraceEvent("DoThrottleDisk").detail("For", self->throttleFor); self->doThrottle(machine, self->throttleFor); diff --git a/flow/Knobs.cpp b/flow/Knobs.cpp index 1d91a7e8da..12bc0d70c9 100644 --- a/flow/Knobs.cpp +++ b/flow/Knobs.cpp @@ -65,7 +65,7 @@ void FlowKnobs::initialize(Randomize _randomize, IsSimulated _isSimulated) { init( HUGE_ARENA_LOGGING_INTERVAL, 5.0 ); // Chaos testing - init( ENABLE_CHAOS_FEATURES, false ); + init( ENABLE_CHAOS_FEATURES, true ); init( WRITE_TRACING_ENABLED, true ); if( randomize && BUGGIFY ) WRITE_TRACING_ENABLED = false; diff --git a/flow/network.h b/flow/network.h index d174601fec..651882d23e 100644 --- a/flow/network.h +++ b/flow/network.h @@ -657,25 +657,25 @@ struct DiskFailureInjector : FastAllocated { return static_cast(res); } - //double getSendDelay(NetworkAddress const& peer); - //double getReceiveDelay(NetworkAddress const& peer); - //virtual void throttleFor(double time) = 0; //virtual double getDiskDelay() = 0; void throttleFor(double time) { + TraceEvent("DiskFailureInjectorBefore").detail("ThrottleUntil", throttleUntil); throttleUntil = std::max(throttleUntil, timer_monotonic() + time); + TraceEvent("DiskFailureInjectorAfter").detail("ThrottleUntil", throttleUntil); } double getDiskDelay() { if (!FLOW_KNOBS->ENABLE_CHAOS_FEATURES) { return 0.0; } - return throttleUntil; + return std::max(0.0, throttleUntil - timer_monotonic()); } private: // members double throttleUntil = 0.0; + std::unordered_map throttleDisk; private: // construction DiskFailureInjector() = default;