Extract ChaosMetrics out from network.h

This commit is contained in:
Xiaoge Su 2022-12-27 16:04:09 -08:00
parent 3f03a6b12d
commit c11c48fa3f
12 changed files with 166 additions and 165 deletions

View File

@ -25,6 +25,7 @@
#include "flow/IAsyncFile.h"
#include "flow/network.h"
#include "flow/ActorCollection.h"
#include "flow/ChaosMetrics.h"
#include "fdbrpc/simulator.h"
// template <class AsyncFileType>

View File

@ -30,6 +30,7 @@
#include "flow/flow.h"
#include "flow/Histogram.h"
#include "flow/ChaosMetrics.h"
#include "flow/ProtocolVersion.h"
#include "fdbrpc/FailureMonitor.h"
#include "fdbrpc/Locality.h"

View File

@ -47,6 +47,7 @@
#include <rapidjson/writer.h>
#include <boost/algorithm/string.hpp>
#include <cstring>
#include <stack>
#include <memory>
#include <queue>
#include <sstream>

View File

@ -70,6 +70,7 @@
#include "flow/genericactors.actor.h"
#include "flow/network.h"
#include "flow/serialize.h"
#include "flow/ChaosMetrics.h"
#ifdef __linux__
#include <fcntl.h>
@ -692,42 +693,18 @@ ACTOR Future<Void> registrationClient(
TraceEvent(SevWarn, "WorkerRegisterTimeout").detail("WaitTime", now() - startTime);
}
}
when(wait(ccInterface->onChange())) {
break;
}
when(wait(ddInterf->onChange())) {
break;
}
when(wait(rkInterf->onChange())) {
break;
}
when(wait(csInterf->onChange())) {
break;
}
when(wait(bmInterf->onChange())) {
break;
}
when(wait(blobMigratorInterf->onChange())) {
break;
}
when(wait(ekpInterf->onChange())) {
break;
}
when(wait(degraded->onChange())) {
break;
}
when(wait(FlowTransport::transport().onIncompatibleChanged())) {
break;
}
when(wait(issues->onChange())) {
break;
}
when(wait(recovered)) {
break;
}
when(wait(clusterId->onChange())) {
break;
}
when(wait(ccInterface->onChange())) { break; }
when(wait(ddInterf->onChange())) { break; }
when(wait(rkInterf->onChange())) { break; }
when(wait(csInterf->onChange())) { break; }
when(wait(bmInterf->onChange())) { break; }
when(wait(blobMigratorInterf->onChange())) { break; }
when(wait(ekpInterf->onChange())) { break; }
when(wait(degraded->onChange())) { break; }
when(wait(FlowTransport::transport().onIncompatibleChanged())) { break; }
when(wait(issues->onChange())) { break; }
when(wait(recovered)) { break; }
when(wait(clusterId->onChange())) { break; }
}
}
}

View File

@ -24,7 +24,7 @@
#include "flow/IThreadPool.h"
#include <pthread.h>
#include <ostream>
#include <iostream>
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // has to be last include

View File

@ -44,6 +44,7 @@
#include "flow/ActorCollection.h"
#include "flow/TaskQueue.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/ChaosMetrics.h"
#include "flow/TDMetric.actor.h"
#include "flow/AsioReactor.h"
#include "flow/Profiler.h"

View File

@ -27,6 +27,7 @@
#include <algorithm>
#include <iomanip>
#include <iostream>
#include <unordered_set>
#include <variant>

View File

@ -0,0 +1,59 @@
#ifndef FLOW_CHAOSMETRICS_H
#define FLOW_CHAOSMETRICS_H
struct TraceEvent;
// Chaos Metrics - We periodically log chaosMetrics to make sure that chaos events are happening
// Only includes DiskDelays which encapsulates all type delays and BitFlips for now
// Expand as per need
struct ChaosMetrics {
ChaosMetrics();
void clear();
unsigned int diskDelays;
unsigned int bitFlips;
double startTime;
void getFields(TraceEvent* e);
};
// This class supports injecting two type of disk failures
// 1. Stalls: Every interval seconds, the disk will stall and no IO will complete for x seconds, where x is a randomly
// chosen interval
// 2. Slowdown: Random slowdown is injected to each disk operation for specified period of time
struct DiskFailureInjector {
static DiskFailureInjector* injector();
void setDiskFailure(double interval, double stallFor, double throttleFor);
double getStallDelay() const;
double getThrottleDelay() const;
double getDiskDelay() const;
private: // members
double stallInterval = 0.0; // how often should the disk be stalled (0 meaning once, 10 meaning every 10 secs)
double stallPeriod; // Period of time disk stalls will be injected for
double stallUntil; // End of disk stall period
double stallDuration; // Duration of each stall
double throttlePeriod; // Period of time the disk will be slowed down for
double throttleUntil; // End of disk slowdown period
private: // construction
DiskFailureInjector() = default;
DiskFailureInjector(DiskFailureInjector const&) = delete;
};
struct BitFlipper {
static BitFlipper* flipper();
double getBitFlipPercentage() { return bitFlipPercentage; }
void setBitFlipPercentage(double percentage) { bitFlipPercentage = percentage; }
private: // members
double bitFlipPercentage = 0.0;
private: // construction
BitFlipper() = default;
BitFlipper(BitFlipper const&) = delete;
};
#endif // FLOW_CHAOSMETRICS_H

View File

@ -69,4 +69,9 @@ private:
std::variant<uint32_t, IPAddressStore> addr;
};
template <>
struct Traceable<IPAddress> : std::true_type {
static std::string toString(const IPAddress& value) { return value.toString(); }
};
#endif // FLOW_IPADDRESS_H

View File

@ -33,13 +33,7 @@
#include <vector>
#include <queue>
#include <stack>
#include <map>
#include <unordered_map>
#include <set>
#include <functional>
#include <iostream>
#include <string>
#include <string_view>
#include <utility>
#include <algorithm>

View File

@ -37,12 +37,6 @@
class Void;
template <>
struct Traceable<IPAddress> : std::true_type {
static std::string toString(const IPAddress& value) { return value.toString(); }
};
FDB_DECLARE_BOOLEAN_PARAM(NetworkAddressFromHostname);
struct NetworkAddress {
@ -590,120 +584,4 @@ public:
// Returns the interface that should be used to make and accept socket connections
};
// Chaos Metrics - We periodically log chaosMetrics to make sure that chaos events are happening
// Only includes DiskDelays which encapsulates all type delays and BitFlips for now
// Expand as per need
struct ChaosMetrics {
ChaosMetrics() { clear(); }
void clear() {
memset(this, 0, sizeof(ChaosMetrics));
startTime = g_network ? g_network->now() : 0;
}
unsigned int diskDelays;
unsigned int bitFlips;
double startTime;
void getFields(TraceEvent* e) {
std::pair<const char*, unsigned int> metrics[] = { { "DiskDelays", diskDelays }, { "BitFlips", bitFlips } };
if (e != nullptr) {
for (auto& m : metrics) {
char c = m.first[0];
if (c != 0) {
e->detail(m.first, m.second);
}
}
}
}
};
// This class supports injecting two type of disk failures
// 1. Stalls: Every interval seconds, the disk will stall and no IO will complete for x seconds, where x is a randomly
// chosen interval
// 2. Slowdown: Random slowdown is injected to each disk operation for specified period of time
struct DiskFailureInjector {
static DiskFailureInjector* injector() {
auto res = g_network->global(INetwork::enDiskFailureInjector);
if (!res) {
res = new DiskFailureInjector();
g_network->setGlobal(INetwork::enDiskFailureInjector, res);
}
return static_cast<DiskFailureInjector*>(res);
}
void setDiskFailure(double interval, double stallFor, double throttleFor) {
stallInterval = interval;
stallPeriod = stallFor;
stallUntil = std::max(stallUntil, g_network->now() + stallFor);
// random stall duration in ms (chosen once)
// TODO: make this delay configurable
stallDuration = 0.001 * deterministicRandom()->randomInt(1, 5);
throttlePeriod = throttleFor;
throttleUntil = std::max(throttleUntil, g_network->now() + throttleFor);
TraceEvent("SetDiskFailure")
.detail("Now", g_network->now())
.detail("StallInterval", interval)
.detail("StallPeriod", stallFor)
.detail("StallUntil", stallUntil)
.detail("ThrottlePeriod", throttleFor)
.detail("ThrottleUntil", throttleUntil);
}
double getStallDelay() {
// If we are in a stall period and a stallInterval was specified, determine the
// delay to be inserted
if (((stallUntil - g_network->now()) > 0.0) && stallInterval) {
auto timeElapsed = fmod(g_network->now(), stallInterval);
return std::max(0.0, stallDuration - timeElapsed);
}
return 0.0;
}
double getThrottleDelay() {
// If we are in the throttle period, insert a random delay (in ms)
// TODO: make this delay configurable
if ((throttleUntil - g_network->now()) > 0.0)
return (0.001 * deterministicRandom()->randomInt(1, 3));
return 0.0;
}
double getDiskDelay() { return getStallDelay() + getThrottleDelay(); }
private: // members
double stallInterval = 0.0; // how often should the disk be stalled (0 meaning once, 10 meaning every 10 secs)
double stallPeriod; // Period of time disk stalls will be injected for
double stallUntil; // End of disk stall period
double stallDuration; // Duration of each stall
double throttlePeriod; // Period of time the disk will be slowed down for
double throttleUntil; // End of disk slowdown period
private: // construction
DiskFailureInjector() = default;
DiskFailureInjector(DiskFailureInjector const&) = delete;
};
struct BitFlipper {
static BitFlipper* flipper() {
auto res = g_network->global(INetwork::enBitFlipper);
if (!res) {
res = new BitFlipper();
g_network->setGlobal(INetwork::enBitFlipper, res);
}
return static_cast<BitFlipper*>(res);
}
double getBitFlipPercentage() { return bitFlipPercentage; }
void setBitFlipPercentage(double percentage) { bitFlipPercentage = percentage; }
private: // members
double bitFlipPercentage = 0.0;
private: // construction
BitFlipper() = default;
BitFlipper(BitFlipper const&) = delete;
};
#endif

View File

@ -18,14 +18,97 @@
* limitations under the License.
*/
#include <memory>
#include <boost/asio.hpp>
#include "flow/Arena.h"
#include "flow/network.h"
#include "flow/IUDPSocket.h"
#include "flow/flow.h"
#include "flow/ChaosMetrics.h"
#include "flow/UnitTest.h"
ChaosMetrics::ChaosMetrics() {
clear();
}
void ChaosMetrics::clear() {
std::memset(this, 0, sizeof(ChaosMetrics));
startTime = g_network ? g_network->now() : 0;
}
void ChaosMetrics::getFields(TraceEvent* e) {
std::pair<const char*, unsigned int> metrics[] = { { "DiskDelays", diskDelays }, { "BitFlips", bitFlips } };
if (e != nullptr) {
for (auto& m : metrics) {
char c = m.first[0];
if (c != 0) {
e->detail(m.first, m.second);
}
}
}
}
DiskFailureInjector* DiskFailureInjector::injector() {
auto res = g_network->global(INetwork::enDiskFailureInjector);
if (!res) {
res = new DiskFailureInjector();
g_network->setGlobal(INetwork::enDiskFailureInjector, res);
}
return static_cast<DiskFailureInjector*>(res);
}
void DiskFailureInjector::setDiskFailure(double interval, double stallFor, double throttleFor) {
stallInterval = interval;
stallPeriod = stallFor;
stallUntil = std::max(stallUntil, g_network->now() + stallFor);
// random stall duration in ms (chosen once)
// TODO: make this delay configurable
stallDuration = 0.001 * deterministicRandom()->randomInt(1, 5);
throttlePeriod = throttleFor;
throttleUntil = std::max(throttleUntil, g_network->now() + throttleFor);
TraceEvent("SetDiskFailure")
.detail("Now", g_network->now())
.detail("StallInterval", interval)
.detail("StallPeriod", stallFor)
.detail("StallUntil", stallUntil)
.detail("ThrottlePeriod", throttleFor)
.detail("ThrottleUntil", throttleUntil);
}
double DiskFailureInjector::getStallDelay() const {
// If we are in a stall period and a stallInterval was specified, determine the
// delay to be inserted
if (((stallUntil - g_network->now()) > 0.0) && stallInterval) {
auto timeElapsed = fmod(g_network->now(), stallInterval);
return std::max(0.0, stallDuration - timeElapsed);
}
return 0.0;
}
double DiskFailureInjector::getThrottleDelay() const {
// If we are in the throttle period, insert a random delay (in ms)
// TODO: make this delay configurable
if ((throttleUntil - g_network->now()) > 0.0)
return (0.001 * deterministicRandom()->randomInt(1, 3));
return 0.0;
}
double DiskFailureInjector::getDiskDelay() const {
return getStallDelay() + getThrottleDelay();
}
BitFlipper* BitFlipper::flipper() {
auto res = g_network->global(INetwork::enBitFlipper);
if (!res) {
res = new BitFlipper();
g_network->setGlobal(INetwork::enBitFlipper, res);
}
return static_cast<BitFlipper*>(res);
}
bool IPAddress::operator==(const IPAddress& rhs) const {
return addr == rhs.addr;
}