Single code path for sim and non-sim modes.

This commit is contained in:
negoyal 2021-07-07 23:58:14 -07:00
parent 957eceb14c
commit 2b5a96f745
5 changed files with 50 additions and 24 deletions

View File

@ -31,6 +31,7 @@
#include "flow/flow.h"
#include "fdbrpc/IAsyncFile.h"
#include "flow/ActorCollection.h"
#include "flow/network.h"
#include "fdbrpc/simulator.h"
#include "fdbrpc/TraceFileIO.h"
#include "fdbrpc/RangeMap.h"
@ -61,7 +62,7 @@ private:
Future<Void> shutdown;
public:
explicit AsyncFileDetachable(Reference<IAsyncFile> file) : file(file) { shutdown = doShutdown(this); }
explicit AsyncFileDetachable(Reference<IAsyncFile> file) : file(file), diskFailureInjector(DiskFailureInjector::injector()) { shutdown = doShutdown(this); }
ACTOR Future<Void> doShutdown(AsyncFileDetachable* self) {
wait(success(g_simulator.getCurrentProcess()->shutdownSignal.getFuture()));
@ -84,12 +85,20 @@ public:
Future<int> read(void* data, int length, int64_t offset) override {
if (!file.getPtr() || g_simulator.getCurrentProcess()->shutdownSignal.getFuture().isReady())
return io_error().asInjectedFault();
// throttleDisk if enabled
auto throttleFor = diskFailureInjector->getDiskDelay();
if (throttleFor > 0.0) {
TraceEvent("AsyncFileDetachable_Read").detail("ThrottleDelay", throttleFor);
//wait(delay(throttleFor));
}
return sendErrorOnShutdown(file->read(data, length, offset));
}
Future<Void> write(void const* data, int length, int64_t offset) override {
if (!file.getPtr() || g_simulator.getCurrentProcess()->shutdownSignal.getFuture().isReady())
return io_error().asInjectedFault();
if (diskFailureInjector->getDiskDelay() > 0.0)
TraceEvent("AsyncFileDetachable_Write").detail("ThrottleDelay", diskFailureInjector->getDiskDelay());
return sendErrorOnShutdown(file->write(data, length, offset));
}
@ -121,6 +130,8 @@ public:
throw io_error().asInjectedFault();
return file->getFilename();
}
public:
DiskFailureInjector* diskFailureInjector;
};
// An async file implementation which wraps another async file and will randomly destroy sectors that it is writing when
@ -190,11 +201,12 @@ private:
Reference<DiskParameters> diskParameters,
NetworkAddress openedAddress,
bool aio)
: filename(filename), initialFilename(initialFilename), file(file), diskParameters(diskParameters),
openedAddress(openedAddress), pendingModifications(uint64_t(-1)), approximateSize(0), reponses(false),
aio(aio) {
: filename(filename), initialFilename(initialFilename), file(file), diskParameters(diskParameters),
openedAddress(openedAddress), pendingModifications(uint64_t(-1)), approximateSize(0), reponses(false),
aio(aio), diskFailureInjector(DiskFailureInjector::injector())
{
// This is only designed to work in simulation
// This is only designed to work in simulation
ASSERT(g_network->isSimulated());
this->id = deterministicRandom()->randomUniqueID();
@ -309,7 +321,7 @@ public:
// Passes along reads straight to the underlying file, waiting for any outstanding changes that could affect the
// results
Future<int> read(void* data, int length, int64_t offset) override { return read(this, data, length, offset); }
Future<int> read(void* data, int length, int64_t offset) override { return read(this, data, length, offset, diskFailureInjector->getDiskDelay()); }
// Writes data to the file. Writes are delayed a random amount of time before being
// passed to the underlying file
@ -324,7 +336,7 @@ public:
Promise<Void> writeStarted;
Promise<Future<Void>> writeEnded;
writeEnded.send(write(this, writeStarted, writeEnded.getFuture(), data, length, offset));
writeEnded.send(write(this, writeStarted, writeEnded.getFuture(), data, length, offset, diskFailureInjector->getDiskDelay()));
return writeStarted.getFuture();
}
@ -432,7 +444,7 @@ private:
return readFuture.get();
}
ACTOR Future<int> read(AsyncFileNonDurable* self, void* data, int length, int64_t offset) {
ACTOR Future<int> read(AsyncFileNonDurable* self, void* data, int length, int64_t offset, double throttleFor = 0.0) {
state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess();
state TaskPriority currentTaskID = g_network->getCurrentTask();
wait(g_simulator.onMachine(currentProcess));
@ -441,6 +453,11 @@ private:
state int rep = wait(self->onRead(self, data, length, offset));
wait(g_simulator.onProcess(currentProcess, currentTaskID));
// throttleDisk if enabled
if (throttleFor > 0.0) {
TraceEvent("AsyncFileNonDurable_ReadDone", self->id).detail("ThrottleDelay", throttleFor).detail("Filename", self->filename).detail("ReadLength", length).detail("Offset", offset);
wait(delay(throttleFor));
}
return rep;
} catch (Error& e) {
state Error err = e;
@ -457,7 +474,8 @@ private:
Future<Future<Void>> ownFuture,
void const* data,
int length,
int64_t offset) {
int64_t offset,
double throttleFor = 0.0) {
state ISimulator::ProcessInfo* currentProcess = g_simulator.getCurrentProcess();
state TaskPriority currentTaskID = g_network->getCurrentTask();
wait(g_simulator.onMachine(currentProcess));
@ -621,6 +639,11 @@ private:
}
wait(waitForAll(writeFutures));
// throttleDisk if enabled
if (throttleFor > 0.0) {
TraceEvent("AsyncFileNonDurable_WriteDone", self->id).detail("ThrottleDelay", throttleFor).detail("Filename", self->filename).detail("WriteLength", length).detail("Offset", offset);
wait(delay(throttleFor));
}
//TraceEvent("AsyncFileNonDurable_WriteDone", self->id).detail("Delay", delayDuration).detail("Filename", self->filename).detail("WriteLength", length).detail("Offset", offset);
return Void();
}
@ -866,6 +889,8 @@ private:
throw err;
}
}
public:
DiskFailureInjector* diskFailureInjector;
};
#include "flow/unactorcompiler.h"

View File

@ -1516,6 +1516,7 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
when(SetFailureInjection req = waitNext(interf.clientInterface.setFailureInjection.getFuture())) {
if (FLOW_KNOBS->ENABLE_CHAOS_FEATURES) {
if (req.throttleDisk.present()) {
TraceEvent("DiskThrottleRequest").detail("Delay", req.throttleDisk.get().time);
DiskFailureInjector::injector()->throttleFor(req.throttleDisk.get().time);
}
req.reply.send(Void());

View File

@ -28,12 +28,13 @@ struct DiskThrottlingWorkload : TestWorkload {
Future<Void> setup(Database const& cx) override { return Void(); }
Future<Void> start(Database const& cx) override {
if (&g_simulator == g_network && enabled) {
TraceEvent("DiskThrottlingStart").detail("For", throttleFor);
return timeout(reportErrors(throttleDiskClient<ISimulator::ProcessInfo*>(cx, this), "DiskThrottlingError"),
testDuration,
Void());
} else if (enabled) {
//if (&g_simulator == g_network && enabled) {
// TraceEvent("DiskThrottlingStart").detail("For", throttleFor);
// return timeout(reportErrors(throttleDiskClient<ISimulator::ProcessInfo*>(cx, this), "DiskThrottlingError"),
// testDuration,
// Void());
//} else
if (enabled) {
return timeout(reportErrors(throttleDiskClient<WorkerInterface>(cx, this), "DiskThrottlingError"),
testDuration,
Void());
@ -45,7 +46,7 @@ struct DiskThrottlingWorkload : TestWorkload {
void getMetrics(vector<PerfMetric>& m) override {}
ACTOR void doThrottle(ISimulator::ProcessInfo* machine, double t, double delay = 0.0) {
ACTOR void doThrottle_unused(ISimulator::ProcessInfo* machine, double t, double delay = 0.0) {
wait(::delay(delay));
TraceEvent("ThrottleDisk").detail("For", t);
g_simulator.throttleDisk(machine, t);
@ -79,13 +80,13 @@ struct DiskThrottlingWorkload : TestWorkload {
checkDiskThrottleResult(res, worker);
}
static Future<Void> getAllWorkers(DiskThrottlingWorkload* self, std::vector<ISimulator::ProcessInfo*>* result) {
static Future<Void> getAllWorkers_unused(DiskThrottlingWorkload* self, std::vector<ISimulator::ProcessInfo*>* result) {
result->clear();
*result = g_simulator.getAllProcesses();
return Void();
}
static Future<Void> getAllStorageWorkers(Database cx, DiskThrottlingWorkload* self, std::vector<ISimulator::ProcessInfo*>* result) {
static Future<Void> getAllStorageWorkers_unused(Database cx, DiskThrottlingWorkload* self, std::vector<ISimulator::ProcessInfo*>* result) {
vector<ISimulator::ProcessInfo*> all = g_simulator.getAllProcesses();
for (int i = 0; i < all.size(); i++)
if (!all[i]->failed &&
@ -123,7 +124,6 @@ struct DiskThrottlingWorkload : TestWorkload {
loop {
wait(poisson(&lastTime, 1));
wait(DiskThrottlingWorkload::getAllStorageWorkers(cx, self, &machines));
//wait(DiskThrottlingWorkload::getAllWorkers(self, &machines));
auto machine = deterministicRandom()->randomChoice(machines);
TraceEvent("DoThrottleDisk").detail("For", self->throttleFor);
self->doThrottle(machine, self->throttleFor);

View File

@ -65,7 +65,7 @@ void FlowKnobs::initialize(Randomize _randomize, IsSimulated _isSimulated) {
init( HUGE_ARENA_LOGGING_INTERVAL, 5.0 );
// Chaos testing
init( ENABLE_CHAOS_FEATURES, false );
init( ENABLE_CHAOS_FEATURES, true );
init( WRITE_TRACING_ENABLED, true ); if( randomize && BUGGIFY ) WRITE_TRACING_ENABLED = false;

View File

@ -657,25 +657,25 @@ struct DiskFailureInjector : FastAllocated<DiskFailureInjector> {
return static_cast<DiskFailureInjector*>(res);
}
//double getSendDelay(NetworkAddress const& peer);
//double getReceiveDelay(NetworkAddress const& peer);
//virtual void throttleFor(double time) = 0;
//virtual double getDiskDelay() = 0;
void throttleFor(double time) {
TraceEvent("DiskFailureInjectorBefore").detail("ThrottleUntil", throttleUntil);
throttleUntil = std::max(throttleUntil, timer_monotonic() + time);
TraceEvent("DiskFailureInjectorAfter").detail("ThrottleUntil", throttleUntil);
}
double getDiskDelay() {
if (!FLOW_KNOBS->ENABLE_CHAOS_FEATURES) {
return 0.0;
}
return throttleUntil;
return std::max(0.0, throttleUntil - timer_monotonic());
}
private: // members
double throttleUntil = 0.0;
std::unordered_map<NetworkAddress, double> throttleDisk;
private: // construction
DiskFailureInjector() = default;