Merge pull request #5217 from sfc-gh-ngoyal/bit-flipping-workload

Chaos workloads
This commit is contained in:
Steve Atherton 2021-11-17 10:45:36 -08:00 committed by GitHub
commit 874c418459
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 807 additions and 24 deletions

View File

@ -1,3 +1 @@
DisableFormat: true
SortIncludes: Never

View File

@ -31,8 +31,10 @@
// A ClientWorkerInterface is embedded as the first element of a WorkerInterface.
struct ClientWorkerInterface {
constexpr static FileIdentifier file_identifier = 12418152;
RequestStream<struct RebootRequest> reboot;
RequestStream<struct ProfilerRequest> profiler;
RequestStream<struct SetFailureInjection> setFailureInjection;
bool operator==(ClientWorkerInterface const& r) const { return id() == r.id(); }
bool operator!=(ClientWorkerInterface const& r) const { return id() != r.id(); }
@ -43,7 +45,7 @@ struct ClientWorkerInterface {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reboot, profiler);
serializer(ar, reboot, profiler, setFailureInjection);
}
};
@ -88,4 +90,39 @@ struct ProfilerRequest {
}
};
struct SetFailureInjection {
constexpr static FileIdentifier file_identifier = 15439864;
ReplyPromise<Void> reply;
struct DiskFailureCommand {
// how often should the disk be stalled (0 meaning once, 10 meaning every 10 secs)
double stallInterval;
// Period of time disk stalls will be injected for
double stallPeriod;
// Period of time the disk will be slowed down for
double throttlePeriod;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, stallInterval, stallPeriod, throttlePeriod);
}
};
struct FlipBitsCommand {
// percent of bits to flip in the given file
double percentBitFlips;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, percentBitFlips);
}
};
Optional<DiskFailureCommand> diskFailure;
Optional<FlipBitsCommand> flipBits;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply, diskFailure, flipBits);
}
};
#endif

View File

@ -0,0 +1,155 @@
/*
* AsyncFileChaos.actor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/flow.h"
#include "flow/serialize.h"
#include "flow/genericactors.actor.h"
#include "fdbrpc/IAsyncFile.h"
#include "flow/network.h"
#include "flow/ActorCollection.h"
#include "flow/actorcompiler.h"
// template <class AsyncFileType>
class AsyncFileChaos final : public IAsyncFile, public ReferenceCounted<AsyncFileChaos> {
private:
Reference<IAsyncFile> file;
bool enabled;
public:
explicit AsyncFileChaos(Reference<IAsyncFile> file) : file(file) {
// We only allow chaos events on storage files
enabled = (file->getFilename().find("storage-") != std::string::npos);
}
void addref() override { ReferenceCounted<AsyncFileChaos>::addref(); }
void delref() override { ReferenceCounted<AsyncFileChaos>::delref(); }
double getDelay() const {
double delayFor = 0.0;
if (!enabled)
return delayFor;
auto res = g_network->global(INetwork::enDiskFailureInjector);
if (res) {
DiskFailureInjector* delayInjector = static_cast<DiskFailureInjector*>(res);
delayFor = delayInjector->getDiskDelay();
// increment the metric for disk delays
if (delayFor > 0.0) {
auto res = g_network->global(INetwork::enChaosMetrics);
if (res) {
ChaosMetrics* chaosMetrics = static_cast<ChaosMetrics*>(res);
chaosMetrics->diskDelays++;
}
}
}
return delayFor;
}
Future<int> read(void* data, int length, int64_t offset) override {
double diskDelay = getDelay();
if (diskDelay == 0.0)
return file->read(data, length, offset);
// Wait for diskDelay before submitting the I/O
// Template types are being provided explicitly because they can't be automatically deduced for some reason.
return mapAsync<Void, std::function<Future<int>(Void)>, int>(
delay(diskDelay), [=](Void _) -> Future<int> { return file->read(data, length, offset); });
}
Future<Void> write(void const* data, int length, int64_t offset) override {
Arena arena;
char* pdata = nullptr;
// Check if a bit flip event was injected, if so, copy the buffer contents
// with a random bit flipped in a new buffer and use that for the write
auto res = g_network->global(INetwork::enBitFlipper);
if (enabled && res) {
auto bitFlipPercentage = static_cast<BitFlipper*>(res)->getBitFlipPercentage();
if (bitFlipPercentage > 0.0) {
auto bitFlipProb = bitFlipPercentage / 100;
if (deterministicRandom()->random01() < bitFlipProb) {
pdata = (char*)arena.allocate4kAlignedBuffer(length);
memcpy(pdata, data, length);
// flip a random bit in the copied buffer
pdata[deterministicRandom()->randomInt(0, length)] ^= (1 << deterministicRandom()->randomInt(0, 8));
// increment the metric for bit flips
auto res = g_network->global(INetwork::enChaosMetrics);
if (res) {
ChaosMetrics* chaosMetrics = static_cast<ChaosMetrics*>(res);
chaosMetrics->bitFlips++;
}
}
}
}
double diskDelay = getDelay();
if (diskDelay == 0.0) {
if (pdata)
return holdWhile(arena, file->write(pdata, length, offset));
return file->write(data, length, offset);
}
// Wait for diskDelay before submitting the I/O
return mapAsync<Void, std::function<Future<Void>(Void)>, Void>(delay(diskDelay), [=](Void _) -> Future<Void> {
if (pdata)
return holdWhile(arena, file->write(pdata, length, offset));
return file->write(data, length, offset);
});
}
Future<Void> truncate(int64_t size) override {
double diskDelay = getDelay();
if (diskDelay == 0.0)
return file->truncate(size);
// Wait for diskDelay before submitting the I/O
return mapAsync<Void, std::function<Future<Void>(Void)>, Void>(
delay(diskDelay), [=](Void _) -> Future<Void> { return file->truncate(size); });
}
Future<Void> sync() override {
double diskDelay = getDelay();
if (diskDelay == 0.0)
return file->sync();
// Wait for diskDelay before submitting the I/O
return mapAsync<Void, std::function<Future<Void>(Void)>, Void>(
delay(diskDelay), [=](Void _) -> Future<Void> { return file->sync(); });
}
Future<int64_t> size() const override {
double diskDelay = getDelay();
if (diskDelay == 0.0)
return file->size();
// Wait for diskDelay before submitting the I/O
return mapAsync<Void, std::function<Future<int64_t>(Void)>, int64_t>(
delay(diskDelay), [=](Void _) -> Future<int64_t> { return file->size(); });
}
int64_t debugFD() const override { return file->debugFD(); }
std::string getFilename() const override { return file->getFilename(); }
};

View File

@ -195,7 +195,6 @@ public:
void addref() override { ReferenceCounted<AsyncFileKAIO>::addref(); }
void delref() override { ReferenceCounted<AsyncFileKAIO>::delref(); }
Future<int> read(void* data, int length, int64_t offset) override {
++countFileLogicalReads;
++countLogicalReads;

View File

@ -439,7 +439,6 @@ private:
try {
state int rep = wait(self->onRead(self, data, length, offset));
wait(g_simulator.onProcess(currentProcess, currentTaskID));
return rep;
} catch (Error& e) {
state Error err = e;

View File

@ -31,6 +31,7 @@
#define FILESYSTEM_IMPL 1
#include "fdbrpc/AsyncFileCached.actor.h"
#include "fdbrpc/AsyncFileChaos.actor.h"
#include "fdbrpc/AsyncFileEIO.actor.h"
#include "fdbrpc/AsyncFileEncrypted.h"
#include "fdbrpc/AsyncFileWinASIO.actor.h"
@ -77,6 +78,8 @@ Future<Reference<class IAsyncFile>> Net2FileSystem::open(const std::string& file
static_cast<boost::asio::io_service*>((void*)g_network->global(INetwork::enASIOService)));
if (FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0)
f = map(f, [=](Reference<IAsyncFile> r) { return Reference<IAsyncFile>(new AsyncFileWriteChecker(r)); });
if (FLOW_KNOBS->ENABLE_CHAOS_FEATURES)
f = map(f, [=](Reference<IAsyncFile> r) { return Reference<IAsyncFile>(new AsyncFileChaos(r)); });
#if ENCRYPTION_ENABLED
if (flags & IAsyncFile::OPEN_ENCRYPTED)
f = map(f, [flags](Reference<IAsyncFile> r) {

View File

@ -36,6 +36,7 @@
#include "fdbrpc/AsyncFileCached.actor.h"
#include "fdbrpc/AsyncFileEncrypted.h"
#include "fdbrpc/AsyncFileNonDurable.actor.h"
#include "fdbrpc/AsyncFileChaos.actor.h"
#include "flow/crc32c.h"
#include "fdbrpc/TraceFileIO.h"
#include "flow/FaultInjection.h"
@ -1205,6 +1206,9 @@ public:
m->protocolVersion = protocol;
m->setGlobal(enTDMetrics, (flowGlobalType)&m->tdmetrics);
if (FLOW_KNOBS->ENABLE_CHAOS_FEATURES) {
m->setGlobal(enChaosMetrics, (flowGlobalType)&m->chaosMetrics);
}
m->setGlobal(enNetworkConnections, (flowGlobalType)m->network);
m->setGlobal(enASIOTimedOut, (flowGlobalType) false);
@ -2501,6 +2505,8 @@ Future<Reference<class IAsyncFile>> Sim2FileSystem::open(const std::string& file
f = AsyncFileDetachable::open(f);
if (FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0)
f = map(f, [=](Reference<IAsyncFile> r) { return Reference<IAsyncFile>(new AsyncFileWriteChecker(r)); });
if (FLOW_KNOBS->ENABLE_CHAOS_FEATURES)
f = map(f, [=](Reference<IAsyncFile> r) { return Reference<IAsyncFile>(new AsyncFileChaos(r)); });
#if ENCRYPTION_ENABLED
if (flags & IAsyncFile::OPEN_ENCRYPTED)
f = map(f, [flags](Reference<IAsyncFile> r) {

View File

@ -73,6 +73,7 @@ public:
LocalityData locality;
ProcessClass startingClass;
TDMetricCollection tdmetrics;
ChaosMetrics chaosMetrics;
HistogramRegistry histograms;
std::map<NetworkAddress, Reference<IListener>> listenerMap;
std::map<NetworkAddress, Reference<IUDPSocket>> boundUDPSockets;
@ -411,6 +412,7 @@ public:
std::vector<Optional<Standalone<StringRef>>> primarySatelliteDcIds;
std::vector<Optional<Standalone<StringRef>>> remoteSatelliteDcIds;
TSSMode tssMode;
std::map<NetworkAddress, bool> corruptWorkerMap;
ConfigDBType configDBType;
// Used by workloads that perform reconfigurations
@ -444,6 +446,13 @@ public:
static thread_local ProcessInfo* currentProcess;
bool checkInjectedCorruption() {
auto iter = corruptWorkerMap.find(currentProcess->address);
if (iter != corruptWorkerMap.end())
return iter->second;
return false;
}
protected:
Mutex mutex;

View File

@ -164,6 +164,7 @@ set(FDBSERVER_SRCS
workloads/BulkSetup.actor.h
workloads/Cache.actor.cpp
workloads/ChangeConfig.actor.cpp
workloads/ClearSingleRange.actor.cpp
workloads/ClientLibManagementWorkload.actor.cpp
workloads/ClientTransactionProfileCorrectness.actor.cpp
workloads/TriggerRecovery.actor.cpp
@ -182,6 +183,7 @@ set(FDBSERVER_SRCS
workloads/DDMetricsExclude.actor.cpp
workloads/DiskDurability.actor.cpp
workloads/DiskDurabilityTest.actor.cpp
workloads/DiskFailureInjection.actor.cpp
workloads/DummyWorkload.actor.cpp
workloads/ExternalWorkload.actor.cpp
workloads/FastTriggeredWatches.actor.cpp

View File

@ -96,7 +96,6 @@ extern int limitReasonEnd;
extern const char* limitReasonName[];
extern const char* limitReasonDesc[];
struct WorkerEvents : std::map<NetworkAddress, TraceEventFields> {};
typedef std::map<std::string, TraceEventFields> EventMap;
ACTOR static Future<Optional<TraceEventFields>> latestEventOnWorker(WorkerInterface worker, std::string eventName) {
@ -116,7 +115,7 @@ ACTOR static Future<Optional<TraceEventFields>> latestEventOnWorker(WorkerInterf
}
}
ACTOR static Future<Optional<std::pair<WorkerEvents, std::set<std::string>>>> latestEventOnWorkers(
ACTOR Future<Optional<std::pair<WorkerEvents, std::set<std::string>>>> latestEventOnWorkers(
std::vector<WorkerDetails> workers,
std::string eventName) {
try {

View File

@ -46,4 +46,8 @@ Future<StatusReply> clusterGetStatus(
Version const& datacenterVersionDifference,
ConfigBroadcaster const* const& conifgBroadcaster);
struct WorkerEvents : std::map<NetworkAddress, TraceEventFields> {};
Future<Optional<std::pair<WorkerEvents, std::set<std::string>>>> latestEventOnWorkers(
std::vector<WorkerDetails> const& workers,
std::string const& eventName);
#endif

View File

@ -22,6 +22,7 @@
#include <string>
#include <map>
#include "fdbrpc/IAsyncFile.h"
#include "fdbrpc/simulator.h"
/*
** When using this VFS, the sqlite3_file* handles that SQLite uses are
@ -67,11 +68,13 @@ struct VFSAsyncFile {
// Error code is only checked for non-zero because the SQLite API error code after an injected error
// may not match the error code returned by VFSAsyncFile when the inject error occurred.
bool e = g_network->global(INetwork::enSQLiteInjectedError) != (flowGlobalType)0;
bool f = g_simulator.checkInjectedCorruption();
TraceEvent("VFSCheckInjectedError")
.detail("Found", e)
.detail("InjectedIOError", e)
.detail("InjectedCorruption", f)
.detail("ErrorCode", (int64_t)g_network->global(INetwork::enSQLiteInjectedError))
.backtrace();
return e;
return e || f;
}
uint32_t* const pLockCount; // +1 for each SHARED_LOCK, or 1+X_COUNT for lock level X

View File

@ -27,6 +27,7 @@
#include <limits>
#include <random>
#include "fdbrpc/ContinuousSample.h"
#include "fdbrpc/simulator.h"
#include "fdbserver/IPager.h"
#include "fdbclient/Tuple.h"
#include "flow/serialize.h"
@ -2895,6 +2896,8 @@ public:
debug_printf(
"DWALPager(%s) checksum failed for %s\n", self->filename.c_str(), toString(pageID).c_str());
Error e = checksum_failed();
if (g_network->isSimulated() && g_simulator.checkInjectedCorruption())
e = e.asInjectedFault();
TraceEvent(SevError, "RedwoodChecksumFailed")
.detail("Filename", self->filename.c_str())
.detail("PageID", pageID)

View File

@ -1334,6 +1334,27 @@ struct SharedLogsValue {
: actor(actor), uid(uid), requests(requests) {}
};
ACTOR Future<Void> chaosMetricsLogger() {
auto res = g_network->global(INetwork::enChaosMetrics);
if (!res)
return Void();
state ChaosMetrics* chaosMetrics = static_cast<ChaosMetrics*>(res);
chaosMetrics->clear();
loop {
wait(delay(FLOW_KNOBS->CHAOS_LOGGING_INTERVAL));
TraceEvent e("ChaosMetrics");
double elapsed = now() - chaosMetrics->startTime;
e.detail("Elapsed", elapsed);
chaosMetrics->getFields(&e);
e.trackLatest("ChaosMetrics");
chaosMetrics->clear();
}
}
ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>> const> ccInterface,
LocalityData locality,
@ -1363,6 +1384,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
state Promise<Void> stopping;
state WorkerCache<InitializeStorageReply> storageCache;
state Future<Void> metricsLogger;
state Future<Void> chaosMetricsActor;
state Reference<AsyncVar<bool>> degraded = FlowTransport::transport().getDegraded();
// tLogFnForOptions() can return a function that doesn't correspond with the FDB version that the
// TLogVersion represents. This can be done if the newer TLog doesn't support a requested option.
@ -1381,6 +1403,11 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
state Reference<AsyncVar<std::set<std::string>>> issues(new AsyncVar<std::set<std::string>>());
if (FLOW_KNOBS->ENABLE_CHAOS_FEATURES) {
TraceEvent(SevWarnAlways, "ChaosFeaturesEnabled");
chaosMetricsActor = chaosMetricsLogger();
}
folder = abspath(folder);
if (metricsPrefix.size() > 0) {
@ -1698,6 +1725,22 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
flushAndExit(0);
}
}
when(SetFailureInjection req = waitNext(interf.clientInterface.setFailureInjection.getFuture())) {
if (FLOW_KNOBS->ENABLE_CHAOS_FEATURES) {
if (req.diskFailure.present()) {
auto diskFailureInjector = DiskFailureInjector::injector();
diskFailureInjector->setDiskFailure(req.diskFailure.get().stallInterval,
req.diskFailure.get().stallPeriod,
req.diskFailure.get().throttlePeriod);
} else if (req.flipBits.present()) {
auto bitFlipper = BitFlipper::flipper();
bitFlipper->setBitFlipPercentage(req.flipBits.get().percentBitFlips);
}
req.reply.send(Void());
} else {
req.reply.sendError(client_invalid_operation());
}
}
when(ProfilerRequest req = waitNext(interf.clientInterface.profiler.getFuture())) {
state ProfilerRequest profilerReq = req;
// There really isn't a great "filepath sanitizer" or "filepath escape" function available,

View File

@ -0,0 +1,67 @@
/*
* ClearSingleRange.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
struct ClearSingleRange : TestWorkload {
Key begin;
Key end;
double startDelay;
ClearSingleRange(WorkloadContext const& wcx) : TestWorkload(wcx) {
begin = getOption(options, LiteralStringRef("begin"), normalKeys.begin);
end = getOption(options, LiteralStringRef("end"), normalKeys.end);
startDelay = getOption(options, LiteralStringRef("beginClearRange"), 10.0);
}
std::string description() const override { return "ClearSingleRangeWorkload"; }
Future<Void> setup(Database const& cx) override { return Void(); }
Future<Void> start(Database const& cx) override { return clientId != 0 ? Void() : fdbClientClearRange(cx, this); }
Future<bool> check(Database const& cx) override { return true; }
void getMetrics(std::vector<PerfMetric>& m) override {}
ACTOR static Future<Void> fdbClientClearRange(Database db, ClearSingleRange* self) {
state Transaction tr(db);
try {
TraceEvent("ClearSingleRange")
.detail("Begin", printable(self->begin))
.detail("End", printable(self->end))
.detail("StartDelay", self->startDelay);
tr.setOption(FDBTransactionOptions::NEXT_WRITE_NO_WRITE_CONFLICT_RANGE);
wait(delay(self->startDelay));
tr.clear(KeyRangeRef(self->begin, self->end));
wait(tr.commit());
} catch (Error& e) {
TraceEvent("ClearRangeError").error(e);
wait(tr.onError(e));
}
return Void();
}
};
WorkloadFactory<ClearSingleRange> ClearSingleRangeWorkloadFactory("ClearSingleRange");

View File

@ -0,0 +1,273 @@
/*
* DiskFailureInjection.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbrpc/simulator.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/QuietDatabase.h"
#include "fdbserver/Status.h"
#include "flow/actorcompiler.h" // This must be the last #include.
struct DiskFailureInjectionWorkload : TestWorkload {
bool enabled;
double testDuration;
double startDelay;
bool throttleDisk;
int workersToThrottle;
double stallInterval;
double stallPeriod;
double throttlePeriod;
bool corruptFile;
int workersToCorrupt;
double percentBitFlips;
double periodicBroadcastInterval;
std::vector<NetworkAddress> chosenWorkers;
std::vector<Future<Void>> clients;
// Verification Mode: We run the workload indefinitely in this mode.
// The idea is to keep going until we get a non-zero chaosMetric to ensure
// that we haven't lost the chaos event. testDuration is ignored in this mode
bool verificationMode;
DiskFailureInjectionWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
enabled = !clientId; // only do this on the "first" client
startDelay = getOption(options, LiteralStringRef("startDelay"), 0.0);
testDuration = getOption(options, LiteralStringRef("testDuration"), 60.0);
verificationMode = getOption(options, LiteralStringRef("verificationMode"), false);
throttleDisk = getOption(options, LiteralStringRef("throttleDisk"), false);
workersToThrottle = getOption(options, LiteralStringRef("workersToThrottle"), 3);
stallInterval = getOption(options, LiteralStringRef("stallInterval"), 0.0);
stallPeriod = getOption(options, LiteralStringRef("stallPeriod"), 60.0);
throttlePeriod = getOption(options, LiteralStringRef("throttlePeriod"), 60.0);
corruptFile = getOption(options, LiteralStringRef("corruptFile"), false);
workersToCorrupt = getOption(options, LiteralStringRef("workersToCorrupt"), 1);
percentBitFlips = getOption(options, LiteralStringRef("percentBitFlips"), 10.0);
periodicBroadcastInterval = getOption(options, LiteralStringRef("periodicBroadcastInterval"), 5.0);
}
std::string description() const override {
if (&g_simulator == g_network)
return "DiskFailureInjection";
else
return "NoSimDiskFailureInjection";
}
Future<Void> setup(Database const& cx) override { return Void(); }
// Starts the workload by -
// 1. Starting the actor to periodically check chaosMetrics and re-broadcast chaos events, and
// 2. Starting the actor that injects failures on chosen storage servers
Future<Void> start(Database const& cx) override {
if (enabled) {
clients.push_back(timeout(diskFailureInjectionClient<WorkerInterface>(cx, this), testDuration, Void()));
// In verification mode, we want to wait until periodicEventBroadcast actor returns which indicates that
// a non-zero chaosMetric was found.
if (verificationMode) {
clients.push_back(periodicEventBroadcast(this));
} else
// Else we honor the testDuration
clients.push_back(timeout(periodicEventBroadcast(this), testDuration, Void()));
return waitForAll(clients);
} else
return Void();
}
Future<bool> check(Database const& cx) override {
clients.clear();
return true;
}
void getMetrics(std::vector<PerfMetric>& m) override {}
static void checkDiskFailureInjectionResult(Future<Void> res, WorkerInterface worker) {
if (res.isError()) {
auto err = res.getError();
if (err.code() == error_code_client_invalid_operation) {
TraceEvent(SevError, "ChaosDisabled")
.detail("OnEndpoint", worker.waitFailure.getEndpoint().addresses.address.toString());
} else {
TraceEvent(SevError, "DiskFailureInjectionFailed")
.detail("OnEndpoint", worker.waitFailure.getEndpoint().addresses.address.toString())
.error(err);
}
}
}
// Sets the disk delay request
ACTOR void injectDiskDelays(WorkerInterface worker,
double stallInterval,
double stallPeriod,
double throttlePeriod) {
state Future<Void> res;
SetFailureInjection::DiskFailureCommand diskFailure;
diskFailure.stallInterval = stallInterval;
diskFailure.stallPeriod = stallPeriod;
diskFailure.throttlePeriod = throttlePeriod;
SetFailureInjection req;
req.diskFailure = diskFailure;
res = worker.clientInterface.setFailureInjection.getReply(req);
wait(ready(res));
checkDiskFailureInjectionResult(res, worker);
}
// Sets the disk corruption request
ACTOR void injectBitFlips(WorkerInterface worker, double percentage) {
state Future<Void> res;
SetFailureInjection::FlipBitsCommand flipBits;
flipBits.percentBitFlips = percentage;
SetFailureInjection req;
req.flipBits = flipBits;
res = worker.clientInterface.setFailureInjection.getReply(req);
wait(ready(res));
checkDiskFailureInjectionResult(res, worker);
}
// Choose random storage servers to inject disk failures.
// We currently only inject disk failure on storage servers. Can be expanded to include
// other worker types in future
ACTOR template <class W>
Future<Void> diskFailureInjectionClient(Database cx, DiskFailureInjectionWorkload* self) {
wait(::delay(self->startDelay));
state double lastTime = now();
state std::vector<W> machines;
state int throttledWorkers = 0;
state int corruptedWorkers = 0;
loop {
wait(poisson(&lastTime, 1));
try {
wait(store(machines, getStorageWorkers(cx, self->dbInfo, false)));
} catch (Error& e) {
// If we failed to get a list of storage servers, we can't inject failure events
// But don't throw the error in that case
continue;
}
auto machine = deterministicRandom()->randomChoice(machines);
// If we have already chosen this worker, then just continue
if (find(self->chosenWorkers.begin(), self->chosenWorkers.end(), machine.address()) !=
self->chosenWorkers.end()) {
continue;
}
// Keep track of chosen workers for verification purpose
self->chosenWorkers.emplace_back(machine.address());
if (self->throttleDisk && (throttledWorkers++ < self->workersToThrottle))
self->injectDiskDelays(machine, self->stallInterval, self->stallPeriod, self->throttlePeriod);
if (self->corruptFile && (corruptedWorkers++ < self->workersToCorrupt)) {
if (&g_simulator == g_network)
g_simulator.corruptWorkerMap[machine.address()] = true;
self->injectBitFlips(machine, self->percentBitFlips);
}
}
}
// Resend the chaos event to previosuly chosen workers, in case some workers got restarted and lost their chaos
// config
ACTOR static Future<Void> reSendChaos(DiskFailureInjectionWorkload* self) {
state int throttledWorkers = 0;
state int corruptedWorkers = 0;
state std::map<NetworkAddress, WorkerInterface> workersMap;
state std::vector<WorkerDetails> workers = wait(getWorkers(self->dbInfo));
for (auto worker : workers) {
workersMap[worker.interf.address()] = worker.interf;
}
for (auto& workerAddress : self->chosenWorkers) {
auto itr = workersMap.find(workerAddress);
if (itr != workersMap.end()) {
if (self->throttleDisk && (throttledWorkers++ < self->workersToThrottle))
self->injectDiskDelays(itr->second, self->stallInterval, self->stallPeriod, self->throttlePeriod);
if (self->corruptFile && (corruptedWorkers++ < self->workersToCorrupt)) {
if (&g_simulator == g_network)
g_simulator.corruptWorkerMap[workerAddress] = true;
self->injectBitFlips(itr->second, self->percentBitFlips);
}
}
}
return Void();
}
// Fetches chaosMetrics and verifies that chaos events are happening for enabled workers
ACTOR static Future<int> chaosGetStatus(DiskFailureInjectionWorkload* self) {
state int foundChaosMetrics = 0;
state std::vector<WorkerDetails> workers = wait(getWorkers(self->dbInfo));
Future<Optional<std::pair<WorkerEvents, std::set<std::string>>>> latestEventsFuture;
latestEventsFuture = latestEventOnWorkers(workers, "ChaosMetrics");
state Optional<std::pair<WorkerEvents, std::set<std::string>>> workerEvents = wait(latestEventsFuture);
state WorkerEvents cMetrics = workerEvents.present() ? workerEvents.get().first : WorkerEvents();
// Check if any of the chosen workers for chaos events have non-zero chaosMetrics
try {
for (auto& workerAddress : self->chosenWorkers) {
auto chaosMetrics = cMetrics.find(workerAddress);
if (chaosMetrics != cMetrics.end()) {
// we expect diskDelays to be non-zero for chosenWorkers for throttleDisk event
if (self->throttleDisk) {
int diskDelays = chaosMetrics->second.getInt("DiskDelays");
if (diskDelays > 0) {
foundChaosMetrics += diskDelays;
}
}
// we expect bitFlips to be non-zero for chosenWorkers for corruptFile event
if (self->corruptFile) {
int bitFlips = chaosMetrics->second.getInt("BitFlips");
if (bitFlips > 0) {
foundChaosMetrics += bitFlips;
}
}
}
}
} catch (Error& e) {
// it's possible to get an empty event, it's okay to ignore
if (e.code() != error_code_attribute_not_found) {
TraceEvent(SevError, "ChaosGetStatus").error(e);
throw e;
}
}
return foundChaosMetrics;
}
// Periodically re-send the chaos event in case of a process restart
ACTOR static Future<Void> periodicEventBroadcast(DiskFailureInjectionWorkload* self) {
wait(::delay(self->startDelay));
state double start = now();
state double elapsed = 0.0;
loop {
wait(delayUntil(start + elapsed));
wait(reSendChaos(self));
elapsed += self->periodicBroadcastInterval;
wait(delayUntil(start + elapsed));
int foundChaosMetrics = wait(chaosGetStatus(self));
if (foundChaosMetrics > 0) {
TraceEvent("FoundChaos")
.detail("ChaosMetricCount", foundChaosMetrics)
.detail("ClientID", self->clientId);
return Void();
}
}
}
};
WorkloadFactory<DiskFailureInjectionWorkload> DiskFailureInjectionWorkloadFactory("DiskFailureInjection");

View File

@ -56,7 +56,7 @@ struct MakoWorkload : TestWorkload {
commits("Commits"), totalOps("Operations") {
// init parameters from test file
// Number of rows populated
rowCount = getOption(options, LiteralStringRef("rows"), 10000);
rowCount = getOption(options, LiteralStringRef("rows"), (uint64_t)10000);
// Test duration in seconds
testDuration = getOption(options, LiteralStringRef("testDuration"), 30.0);
warmingDelay = getOption(options, LiteralStringRef("warmingDelay"), 0.0);

View File

@ -32,19 +32,24 @@
struct TargetedKillWorkload : TestWorkload {
std::string machineToKill;
bool enabled, killAllMachineProcesses;
int numKillStorages;
double killAt;
bool reboot;
double suspendDuration;
TargetedKillWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
enabled = !clientId; // only do this on the "first" client
killAt = getOption(options, LiteralStringRef("killAt"), 5.0);
reboot = getOption(options, LiteralStringRef("reboot"), false);
suspendDuration = getOption(options, LiteralStringRef("suspendDuration"), 1.0);
machineToKill = getOption(options, LiteralStringRef("machineToKill"), LiteralStringRef("master")).toString();
killAllMachineProcesses = getOption(options, LiteralStringRef("killWholeMachine"), false);
numKillStorages = getOption(options, LiteralStringRef("numKillStorages"), 1);
}
std::string description() const override { return "TargetedKillWorkload"; }
Future<Void> setup(Database const& cx) override { return Void(); }
Future<Void> start(Database const& cx) override {
TraceEvent("StartTargetedKill").detail("Enabled", enabled);
if (enabled)
return assassin(cx, this);
return Void();
@ -52,22 +57,30 @@ struct TargetedKillWorkload : TestWorkload {
Future<bool> check(Database const& cx) override { return true; }
void getMetrics(std::vector<PerfMetric>& m) override {}
ACTOR Future<Void> killEndpoint(NetworkAddress address, Database cx, TargetedKillWorkload* self) {
Future<Void> killEndpoint(std::vector<WorkerDetails> workers,
NetworkAddress address,
Database cx,
TargetedKillWorkload* self) {
if (&g_simulator == g_network) {
g_simulator.killInterface(address, ISimulator::KillInstantly);
return Void();
}
state std::vector<WorkerDetails> workers = wait(getWorkers(self->dbInfo));
int killed = 0;
RebootRequest rbReq;
if (self->reboot) {
rbReq.waitForDuration = self->suspendDuration;
} else {
rbReq.waitForDuration = std::numeric_limits<uint32_t>::max();
}
for (int i = 0; i < workers.size(); i++) {
if (workers[i].interf.master.getEndpoint().getPrimaryAddress() == address ||
(self->killAllMachineProcesses &&
workers[i].interf.master.getEndpoint().getPrimaryAddress().ip == address.ip &&
workers[i].processClass != ProcessClass::TesterClass)) {
TraceEvent("WorkerKill").detail("TargetedMachine", address).detail("Worker", workers[i].interf.id());
workers[i].interf.clientInterface.reboot.send(RebootRequest());
workers[i].interf.clientInterface.reboot.send(rbReq);
killed++;
}
}
@ -82,8 +95,13 @@ struct TargetedKillWorkload : TestWorkload {
ACTOR Future<Void> assassin(Database cx, TargetedKillWorkload* self) {
wait(delay(self->killAt));
state std::vector<StorageServerInterface> storageServers = wait(getStorageServers(cx));
state std::vector<WorkerDetails> workers = wait(getWorkers(self->dbInfo));
NetworkAddress machine;
state NetworkAddress machine;
state NetworkAddress ccAddr;
state int killed = 0;
state int s = 0;
state int j = 0;
if (self->machineToKill == "master") {
machine = self->dbInfo->get().master.address();
} else if (self->machineToKill == "commitproxy") {
@ -118,13 +136,19 @@ struct TargetedKillWorkload : TestWorkload {
}
} else if (self->machineToKill == "storage" || self->machineToKill == "ss" ||
self->machineToKill == "storageserver") {
int o = deterministicRandom()->randomInt(0, storageServers.size());
for (int i = 0; i < storageServers.size(); i++) {
StorageServerInterface ssi = storageServers[o];
s = deterministicRandom()->randomInt(0, storageServers.size());
ccAddr = self->dbInfo->get().clusterInterface.getWorkers.getEndpoint().getPrimaryAddress();
for (j = 0; j < storageServers.size(); j++) {
StorageServerInterface ssi = storageServers[s];
machine = ssi.address();
if (machine != self->dbInfo->get().clusterInterface.getWorkers.getEndpoint().getPrimaryAddress())
break;
o = ++o % storageServers.size();
if (machine != ccAddr) {
TraceEvent("IsolatedMark").detail("TargetedMachine", machine).detail("Role", self->machineToKill);
wait(self->killEndpoint(workers, machine, cx, self));
killed++;
if (killed == self->numKillStorages)
return Void();
}
s = ++s % storageServers.size();
}
} else if (self->machineToKill == "clustercontroller" || self->machineToKill == "cc") {
machine = self->dbInfo->get().clusterInterface.getWorkers.getEndpoint().getPrimaryAddress();
@ -132,7 +156,7 @@ struct TargetedKillWorkload : TestWorkload {
TraceEvent("IsolatedMark").detail("TargetedMachine", machine).detail("Role", self->machineToKill);
wait(self->killEndpoint(machine, cx, self));
wait(self->killEndpoint(workers, machine, cx, self));
return Void();
}

View File

@ -67,6 +67,11 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
init( HUGE_ARENA_LOGGING_BYTES, 100e6 );
init( HUGE_ARENA_LOGGING_INTERVAL, 5.0 );
// Chaos testing - enabled for simulation by default
init( ENABLE_CHAOS_FEATURES, isSimulated );
init( CHAOS_LOGGING_INTERVAL, 5.0 );
init( WRITE_TRACING_ENABLED, true ); if( randomize && BUGGIFY ) WRITE_TRACING_ENABLED = false;
init( TRACING_SAMPLE_RATE, 0.0 ); // Fraction of distributed traces (not spans) to sample (0 means ignore all traces)
init( TRACING_UDP_LISTENER_PORT, 8889 ); // Only applicable if TracerType is set to a network option

View File

@ -116,6 +116,10 @@ public:
double HUGE_ARENA_LOGGING_BYTES;
double HUGE_ARENA_LOGGING_INTERVAL;
// Chaos testing
bool ENABLE_CHAOS_FEATURES;
double CHAOS_LOGGING_INTERVAL;
bool WRITE_TRACING_ENABLED;
double TRACING_SAMPLE_RATE;
int TRACING_UDP_LISTENER_PORT;

View File

@ -237,6 +237,7 @@ public:
TaskPriority currentTaskID;
uint64_t tasksIssued;
TDMetricCollection tdmetrics;
ChaosMetrics chaosMetrics;
// we read now() from a different thread. On Intel, reading a double is atomic anyways, but on other platforms it's
// not. For portability this should be atomic
std::atomic<double> currentTime;
@ -1202,6 +1203,9 @@ Net2::Net2(const TLSConfig& tlsConfig, bool useThreadPool, bool useMetrics)
if (useMetrics) {
setGlobal(INetwork::enTDMetrics, (flowGlobalType)&tdmetrics);
}
if (FLOW_KNOBS->ENABLE_CHAOS_FEATURES) {
setGlobal(INetwork::enChaosMetrics, (flowGlobalType)&chaosMetrics);
}
setGlobal(INetwork::enNetworkConnections, (flowGlobalType)network);
setGlobal(INetwork::enASIOService, (flowGlobalType)&reactor.ios);
setGlobal(INetwork::enBlobCredentialFiles, &blobCredentialFiles);

View File

@ -537,7 +537,10 @@ public:
enNetworkAddressesFunc = 11,
enClientFailureMonitor = 12,
enSQLiteInjectedError = 13,
enGlobalConfig = 14
enGlobalConfig = 14,
enChaosMetrics = 15,
enDiskFailureInjector = 16,
enBitFlipper = 17
};
virtual void longTaskCheck(const char* name) {}
@ -712,4 +715,120 @@ public:
// Returns the interface that should be used to make and accept socket connections
};
// Chaos Metrics - We periodically log chaosMetrics to make sure that chaos events are happening
// Only includes DiskDelays which encapsulates all type delays and BitFlips for now
// Expand as per need
struct ChaosMetrics {
ChaosMetrics() { clear(); }
void clear() {
memset(this, 0, sizeof(ChaosMetrics));
startTime = g_network ? g_network->now() : 0;
}
unsigned int diskDelays;
unsigned int bitFlips;
double startTime;
void getFields(TraceEvent* e) {
std::pair<const char*, unsigned int> metrics[] = { { "DiskDelays", diskDelays }, { "BitFlips", bitFlips } };
if (e != nullptr) {
for (auto& m : metrics) {
char c = m.first[0];
if (c != 0) {
e->detail(m.first, m.second);
}
}
}
}
};
// This class supports injecting two type of disk failures
// 1. Stalls: Every interval seconds, the disk will stall and no IO will complete for x seconds, where x is a randomly
// chosen interval
// 2. Slowdown: Random slowdown is injected to each disk operation for specified period of time
struct DiskFailureInjector {
static DiskFailureInjector* injector() {
auto res = g_network->global(INetwork::enDiskFailureInjector);
if (!res) {
res = new DiskFailureInjector();
g_network->setGlobal(INetwork::enDiskFailureInjector, res);
}
return static_cast<DiskFailureInjector*>(res);
}
void setDiskFailure(double interval, double stallFor, double throttleFor) {
stallInterval = interval;
stallPeriod = stallFor;
stallUntil = std::max(stallUntil, g_network->now() + stallFor);
// random stall duration in ms (chosen once)
// TODO: make this delay configurable
stallDuration = 0.001 * deterministicRandom()->randomInt(1, 5);
throttlePeriod = throttleFor;
throttleUntil = std::max(throttleUntil, g_network->now() + throttleFor);
TraceEvent("SetDiskFailure")
.detail("Now", g_network->now())
.detail("StallInterval", interval)
.detail("StallPeriod", stallFor)
.detail("StallUntil", stallUntil)
.detail("ThrottlePeriod", throttleFor)
.detail("ThrottleUntil", throttleUntil);
}
double getStallDelay() {
// If we are in a stall period and a stallInterval was specified, determine the
// delay to be inserted
if (((stallUntil - g_network->now()) > 0.0) && stallInterval) {
auto timeElapsed = fmod(g_network->now(), stallInterval);
return std::max(0.0, stallDuration - timeElapsed);
}
return 0.0;
}
double getThrottleDelay() {
// If we are in the throttle period, insert a random delay (in ms)
// TODO: make this delay configurable
if ((throttleUntil - g_network->now()) > 0.0)
return (0.001 * deterministicRandom()->randomInt(1, 3));
return 0.0;
}
double getDiskDelay() { return getStallDelay() + getThrottleDelay(); }
private: // members
double stallInterval = 0.0; // how often should the disk be stalled (0 meaning once, 10 meaning every 10 secs)
double stallPeriod; // Period of time disk stalls will be injected for
double stallUntil; // End of disk stall period
double stallDuration; // Duration of each stall
double throttlePeriod; // Period of time the disk will be slowed down for
double throttleUntil; // End of disk slowdown period
private: // construction
DiskFailureInjector() = default;
DiskFailureInjector(DiskFailureInjector const&) = delete;
};
struct BitFlipper {
static BitFlipper* flipper() {
auto res = g_network->global(INetwork::enBitFlipper);
if (!res) {
res = new BitFlipper();
g_network->setGlobal(INetwork::enBitFlipper, res);
}
return static_cast<BitFlipper*>(res);
}
double getBitFlipPercentage() { return bitFlipPercentage; }
void setBitFlipPercentage(double percentage) { bitFlipPercentage = percentage; }
private: // members
double bitFlipPercentage = 0.0;
private: // construction
BitFlipper() = default;
BitFlipper(BitFlipper const&) = delete;
};
#endif

View File

@ -273,6 +273,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES slow/DDBalanceAndRemove.toml)
add_fdb_test(TEST_FILES slow/DDBalanceAndRemoveStatus.toml)
add_fdb_test(TEST_FILES slow/DifferentClustersSameRV.toml)
add_fdb_test(TEST_FILES slow/DiskFailureCycle.toml)
add_fdb_test(TEST_FILES slow/FastTriggeredWatches.toml)
add_fdb_test(TEST_FILES slow/LowLatencyWithFailures.toml)
add_fdb_test(TEST_FILES slow/MoveKeysClean.toml)

View File

@ -0,0 +1,26 @@
[configuration]
buggify = false
minimumReplication = 3
minimumRegions = 3
logAntiQuorum = 0
[[test]]
testTitle = 'DiskFailureCycle'
[[test.workload]]
testName = 'Cycle'
transactionsPerSecond = 2500.0
testDuration = 30.0
expectedRate = 0
[[test.workload]]
testName = 'DiskFailureInjection'
testDuration = 120.0
verificationMode = true
startDelay = 3.0
throttleDisk = true
stallInterval = 5.0
stallPeriod = 5.0
throttlePeriod = 30.0
corruptFile = true
percentBitFlips = 10