diff --git a/fdbrpc/include/fdbrpc/AsyncFileChaos.h b/fdbrpc/include/fdbrpc/AsyncFileChaos.h index 6cb0e0fca5..825f9e07f1 100644 --- a/fdbrpc/include/fdbrpc/AsyncFileChaos.h +++ b/fdbrpc/include/fdbrpc/AsyncFileChaos.h @@ -25,6 +25,7 @@ #include "flow/IAsyncFile.h" #include "flow/network.h" #include "flow/ActorCollection.h" +#include "flow/ChaosMetrics.h" #include "fdbrpc/simulator.h" // template diff --git a/fdbrpc/include/fdbrpc/simulator.h b/fdbrpc/include/fdbrpc/simulator.h index 104f45765c..95a1456c10 100644 --- a/fdbrpc/include/fdbrpc/simulator.h +++ b/fdbrpc/include/fdbrpc/simulator.h @@ -30,6 +30,7 @@ #include "flow/flow.h" #include "flow/Histogram.h" +#include "flow/ChaosMetrics.h" #include "flow/ProtocolVersion.h" #include "fdbrpc/FailureMonitor.h" #include "fdbrpc/Locality.h" diff --git a/fdbserver/RESTKmsConnector.actor.cpp b/fdbserver/RESTKmsConnector.actor.cpp index 44720305fa..93124cb4cf 100644 --- a/fdbserver/RESTKmsConnector.actor.cpp +++ b/fdbserver/RESTKmsConnector.actor.cpp @@ -47,6 +47,7 @@ #include #include #include +#include #include #include #include diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 007aaedce0..01a61425f2 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -70,6 +70,7 @@ #include "flow/genericactors.actor.h" #include "flow/network.h" #include "flow/serialize.h" +#include "flow/ChaosMetrics.h" #ifdef __linux__ #include @@ -692,42 +693,18 @@ ACTOR Future registrationClient( TraceEvent(SevWarn, "WorkerRegisterTimeout").detail("WaitTime", now() - startTime); } } - when(wait(ccInterface->onChange())) { - break; - } - when(wait(ddInterf->onChange())) { - break; - } - when(wait(rkInterf->onChange())) { - break; - } - when(wait(csInterf->onChange())) { - break; - } - when(wait(bmInterf->onChange())) { - break; - } - when(wait(blobMigratorInterf->onChange())) { - break; - } - when(wait(ekpInterf->onChange())) { - break; - } - when(wait(degraded->onChange())) { - break; - } - when(wait(FlowTransport::transport().onIncompatibleChanged())) { - break; - } - when(wait(issues->onChange())) { - break; - } - when(wait(recovered)) { - break; - } - when(wait(clusterId->onChange())) { - break; - } + when(wait(ccInterface->onChange())) { break; } + when(wait(ddInterf->onChange())) { break; } + when(wait(rkInterf->onChange())) { break; } + when(wait(csInterf->onChange())) { break; } + when(wait(bmInterf->onChange())) { break; } + when(wait(blobMigratorInterf->onChange())) { break; } + when(wait(ekpInterf->onChange())) { break; } + when(wait(degraded->onChange())) { break; } + when(wait(FlowTransport::transport().onIncompatibleChanged())) { break; } + when(wait(issues->onChange())) { break; } + when(wait(recovered)) { break; } + when(wait(clusterId->onChange())) { break; } } } } diff --git a/flow/IThreadPoolTest.actor.cpp b/flow/IThreadPoolTest.actor.cpp index ed18c17b6f..023a4aaeb5 100644 --- a/flow/IThreadPoolTest.actor.cpp +++ b/flow/IThreadPoolTest.actor.cpp @@ -24,7 +24,7 @@ #include "flow/IThreadPool.h" #include -#include +#include #include "flow/UnitTest.h" #include "flow/actorcompiler.h" // has to be last include diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index d5b2f223dc..44615f5897 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -44,6 +44,7 @@ #include "flow/ActorCollection.h" #include "flow/TaskQueue.h" #include "flow/ThreadHelper.actor.h" +#include "flow/ChaosMetrics.h" #include "flow/TDMetric.actor.h" #include "flow/AsioReactor.h" #include "flow/Profiler.h" diff --git a/flow/flat_buffers.cpp b/flow/flat_buffers.cpp index bf7948e024..d91be1f74e 100644 --- a/flow/flat_buffers.cpp +++ b/flow/flat_buffers.cpp @@ -27,6 +27,7 @@ #include #include +#include #include #include diff --git a/flow/include/flow/ChaosMetrics.h b/flow/include/flow/ChaosMetrics.h new file mode 100644 index 0000000000..cea6362553 --- /dev/null +++ b/flow/include/flow/ChaosMetrics.h @@ -0,0 +1,59 @@ +#ifndef FLOW_CHAOSMETRICS_H +#define FLOW_CHAOSMETRICS_H + +struct TraceEvent; + +// Chaos Metrics - We periodically log chaosMetrics to make sure that chaos events are happening +// Only includes DiskDelays which encapsulates all type delays and BitFlips for now +// Expand as per need +struct ChaosMetrics { + + ChaosMetrics(); + + void clear(); + unsigned int diskDelays; + unsigned int bitFlips; + double startTime; + + void getFields(TraceEvent* e); +}; + +// This class supports injecting two type of disk failures +// 1. Stalls: Every interval seconds, the disk will stall and no IO will complete for x seconds, where x is a randomly +// chosen interval +// 2. Slowdown: Random slowdown is injected to each disk operation for specified period of time +struct DiskFailureInjector { + static DiskFailureInjector* injector(); + void setDiskFailure(double interval, double stallFor, double throttleFor); + double getStallDelay() const; + double getThrottleDelay() const; + double getDiskDelay() const; + +private: // members + double stallInterval = 0.0; // how often should the disk be stalled (0 meaning once, 10 meaning every 10 secs) + double stallPeriod; // Period of time disk stalls will be injected for + double stallUntil; // End of disk stall period + double stallDuration; // Duration of each stall + double throttlePeriod; // Period of time the disk will be slowed down for + double throttleUntil; // End of disk slowdown period + +private: // construction + DiskFailureInjector() = default; + DiskFailureInjector(DiskFailureInjector const&) = delete; +}; + +struct BitFlipper { + static BitFlipper* flipper(); + double getBitFlipPercentage() { return bitFlipPercentage; } + + void setBitFlipPercentage(double percentage) { bitFlipPercentage = percentage; } + +private: // members + double bitFlipPercentage = 0.0; + +private: // construction + BitFlipper() = default; + BitFlipper(BitFlipper const&) = delete; +}; + +#endif // FLOW_CHAOSMETRICS_H \ No newline at end of file diff --git a/flow/include/flow/IPAddress.h b/flow/include/flow/IPAddress.h index 52723e7267..686062483c 100644 --- a/flow/include/flow/IPAddress.h +++ b/flow/include/flow/IPAddress.h @@ -69,4 +69,9 @@ private: std::variant addr; }; +template <> +struct Traceable : std::true_type { + static std::string toString(const IPAddress& value) { return value.toString(); } +}; + #endif // FLOW_IPADDRESS_H \ No newline at end of file diff --git a/flow/include/flow/flow.h b/flow/include/flow/flow.h index a1bcd99f91..532d91e54d 100644 --- a/flow/include/flow/flow.h +++ b/flow/include/flow/flow.h @@ -33,13 +33,7 @@ #include #include -#include -#include -#include -#include #include -#include -#include #include #include #include diff --git a/flow/include/flow/network.h b/flow/include/flow/network.h index d5862486c2..6be39ead27 100644 --- a/flow/include/flow/network.h +++ b/flow/include/flow/network.h @@ -37,12 +37,6 @@ class Void; - -template <> -struct Traceable : std::true_type { - static std::string toString(const IPAddress& value) { return value.toString(); } -}; - FDB_DECLARE_BOOLEAN_PARAM(NetworkAddressFromHostname); struct NetworkAddress { @@ -590,120 +584,4 @@ public: // Returns the interface that should be used to make and accept socket connections }; -// Chaos Metrics - We periodically log chaosMetrics to make sure that chaos events are happening -// Only includes DiskDelays which encapsulates all type delays and BitFlips for now -// Expand as per need -struct ChaosMetrics { - - ChaosMetrics() { clear(); } - - void clear() { - memset(this, 0, sizeof(ChaosMetrics)); - startTime = g_network ? g_network->now() : 0; - } - - unsigned int diskDelays; - unsigned int bitFlips; - double startTime; - - void getFields(TraceEvent* e) { - std::pair metrics[] = { { "DiskDelays", diskDelays }, { "BitFlips", bitFlips } }; - if (e != nullptr) { - for (auto& m : metrics) { - char c = m.first[0]; - if (c != 0) { - e->detail(m.first, m.second); - } - } - } - } -}; - -// This class supports injecting two type of disk failures -// 1. Stalls: Every interval seconds, the disk will stall and no IO will complete for x seconds, where x is a randomly -// chosen interval -// 2. Slowdown: Random slowdown is injected to each disk operation for specified period of time -struct DiskFailureInjector { - static DiskFailureInjector* injector() { - auto res = g_network->global(INetwork::enDiskFailureInjector); - if (!res) { - res = new DiskFailureInjector(); - g_network->setGlobal(INetwork::enDiskFailureInjector, res); - } - return static_cast(res); - } - - void setDiskFailure(double interval, double stallFor, double throttleFor) { - stallInterval = interval; - stallPeriod = stallFor; - stallUntil = std::max(stallUntil, g_network->now() + stallFor); - // random stall duration in ms (chosen once) - // TODO: make this delay configurable - stallDuration = 0.001 * deterministicRandom()->randomInt(1, 5); - throttlePeriod = throttleFor; - throttleUntil = std::max(throttleUntil, g_network->now() + throttleFor); - TraceEvent("SetDiskFailure") - .detail("Now", g_network->now()) - .detail("StallInterval", interval) - .detail("StallPeriod", stallFor) - .detail("StallUntil", stallUntil) - .detail("ThrottlePeriod", throttleFor) - .detail("ThrottleUntil", throttleUntil); - } - - double getStallDelay() { - // If we are in a stall period and a stallInterval was specified, determine the - // delay to be inserted - if (((stallUntil - g_network->now()) > 0.0) && stallInterval) { - auto timeElapsed = fmod(g_network->now(), stallInterval); - return std::max(0.0, stallDuration - timeElapsed); - } - return 0.0; - } - - double getThrottleDelay() { - // If we are in the throttle period, insert a random delay (in ms) - // TODO: make this delay configurable - if ((throttleUntil - g_network->now()) > 0.0) - return (0.001 * deterministicRandom()->randomInt(1, 3)); - - return 0.0; - } - - double getDiskDelay() { return getStallDelay() + getThrottleDelay(); } - -private: // members - double stallInterval = 0.0; // how often should the disk be stalled (0 meaning once, 10 meaning every 10 secs) - double stallPeriod; // Period of time disk stalls will be injected for - double stallUntil; // End of disk stall period - double stallDuration; // Duration of each stall - double throttlePeriod; // Period of time the disk will be slowed down for - double throttleUntil; // End of disk slowdown period - -private: // construction - DiskFailureInjector() = default; - DiskFailureInjector(DiskFailureInjector const&) = delete; -}; - -struct BitFlipper { - static BitFlipper* flipper() { - auto res = g_network->global(INetwork::enBitFlipper); - if (!res) { - res = new BitFlipper(); - g_network->setGlobal(INetwork::enBitFlipper, res); - } - return static_cast(res); - } - - double getBitFlipPercentage() { return bitFlipPercentage; } - - void setBitFlipPercentage(double percentage) { bitFlipPercentage = percentage; } - -private: // members - double bitFlipPercentage = 0.0; - -private: // construction - BitFlipper() = default; - BitFlipper(BitFlipper const&) = delete; -}; #endif diff --git a/flow/network.cpp b/flow/network.cpp index e195f69bf1..f4cd1be9ab 100644 --- a/flow/network.cpp +++ b/flow/network.cpp @@ -18,14 +18,97 @@ * limitations under the License. */ +#include + #include #include "flow/Arena.h" #include "flow/network.h" #include "flow/IUDPSocket.h" #include "flow/flow.h" +#include "flow/ChaosMetrics.h" #include "flow/UnitTest.h" +ChaosMetrics::ChaosMetrics() { + clear(); +} + +void ChaosMetrics::clear() { + std::memset(this, 0, sizeof(ChaosMetrics)); + startTime = g_network ? g_network->now() : 0; +} + +void ChaosMetrics::getFields(TraceEvent* e) { + std::pair metrics[] = { { "DiskDelays", diskDelays }, { "BitFlips", bitFlips } }; + if (e != nullptr) { + for (auto& m : metrics) { + char c = m.first[0]; + if (c != 0) { + e->detail(m.first, m.second); + } + } + } +} + +DiskFailureInjector* DiskFailureInjector::injector() { + auto res = g_network->global(INetwork::enDiskFailureInjector); + if (!res) { + res = new DiskFailureInjector(); + g_network->setGlobal(INetwork::enDiskFailureInjector, res); + } + return static_cast(res); +} + +void DiskFailureInjector::setDiskFailure(double interval, double stallFor, double throttleFor) { + stallInterval = interval; + stallPeriod = stallFor; + stallUntil = std::max(stallUntil, g_network->now() + stallFor); + // random stall duration in ms (chosen once) + // TODO: make this delay configurable + stallDuration = 0.001 * deterministicRandom()->randomInt(1, 5); + throttlePeriod = throttleFor; + throttleUntil = std::max(throttleUntil, g_network->now() + throttleFor); + TraceEvent("SetDiskFailure") + .detail("Now", g_network->now()) + .detail("StallInterval", interval) + .detail("StallPeriod", stallFor) + .detail("StallUntil", stallUntil) + .detail("ThrottlePeriod", throttleFor) + .detail("ThrottleUntil", throttleUntil); +} + +double DiskFailureInjector::getStallDelay() const { + // If we are in a stall period and a stallInterval was specified, determine the + // delay to be inserted + if (((stallUntil - g_network->now()) > 0.0) && stallInterval) { + auto timeElapsed = fmod(g_network->now(), stallInterval); + return std::max(0.0, stallDuration - timeElapsed); + } + return 0.0; +} + +double DiskFailureInjector::getThrottleDelay() const { + // If we are in the throttle period, insert a random delay (in ms) + // TODO: make this delay configurable + if ((throttleUntil - g_network->now()) > 0.0) + return (0.001 * deterministicRandom()->randomInt(1, 3)); + + return 0.0; +} + +double DiskFailureInjector::getDiskDelay() const { + return getStallDelay() + getThrottleDelay(); +} + +BitFlipper* BitFlipper::flipper() { + auto res = g_network->global(INetwork::enBitFlipper); + if (!res) { + res = new BitFlipper(); + g_network->setGlobal(INetwork::enBitFlipper, res); + } + return static_cast(res); +} + bool IPAddress::operator==(const IPAddress& rhs) const { return addr == rhs.addr; }