Report latency/bandwidth/size in each fetchKEys call

This commit is contained in:
Xiaoge Su 2020-11-07 20:38:02 -08:00
parent 15d10d68cd
commit e6aedfbf57
2 changed files with 96 additions and 1 deletions

View File

@ -27,6 +27,11 @@
#include "fdbserver/Knobs.h"
#include "flow/actorcompiler.h" // This must be the last #include.
const StringRef STORAGESERVER_HISTOGRAM_GROUP = LiteralStringRef("StorageServer");
const StringRef FETCH_KEYS_LATENCY_HISTOGRAM = LiteralStringRef("FetchKeysLatency");
const StringRef FETCH_KEYS_BYTES_HISTOGRAM = LiteralStringRef("FetchKeysSize");
const StringRef FETCH_KEYS_BYTES_PER_SECOND_HISTOGRAM = LiteralStringRef("FetchKeysBandwidth");
struct StorageMetricSample {
IndexedSet<Key, int64_t> sample;
int64_t metricUnitsPerSample;

View File

@ -298,6 +298,55 @@ private:
std::map<Version, Standalone<VersionUpdateRef>> mutationLog; // versions (durableVersion, version]
public:
public:
// Histograms
struct FetchKeysHistograms {
const Reference<Histogram> latency;
const Reference<Histogram> bytes;
const Reference<Histogram> bandwidth;
FetchKeysHistograms(const Reference<Histogram> latency_, const Reference<Histogram> bytes_, const Reference<Histogram> bandwidth_)
: latency(latency_), bytes(bytes_), bandwidth(bandwidth_) {}
} fetchKeysHistograms;
class CurrentRunningFetchKeys {
std::unordered_map<UID, double> startTimeMap;
std::unordered_map<UID, KeyRangeRef> keyRangeMap;
static const StringRef emptyString;
static const KeyRangeRef emptyKeyRange;
public:
void recordStart(const UID id, const KeyRange keyRange) {
startTimeMap[id] = now();
keyRangeMap[id] = keyRange;
}
void recordFinish(const UID id) {
startTimeMap.erase(id);
keyRangeMap.erase(id);
}
std::pair<double, KeyRangeRef> longestTime() const {
if (numRunning() == 0) {
return {-1, emptyKeyRange};
}
const double now_= now();
double longest = 0;
UID UIDofLongest;
for (const auto kv: startTimeMap) {
const double currentRunningTime = now_ - kv.second;
if (longest < currentRunningTime) {
longest = currentRunningTime;
UIDofLongest = kv.first;
}
}
return {longest, keyRangeMap.at(UIDofLongest)};
}
int numRunning() const { return startTimeMap.size(); }
} currentRunningFetchKeys;
Tag tag;
vector<std::pair<Version,Tag>> history;
vector<std::pair<Version,Tag>> allHistory;
@ -538,7 +587,12 @@ public:
} counters;
StorageServer(IKeyValueStore* storage, Reference<AsyncVar<ServerDBInfo>> const& db, StorageServerInterface const& ssi)
: instanceID(deterministicRandom()->randomUniqueID().first()),
: fetchKeysHistograms(
Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, FETCH_KEYS_LATENCY_HISTOGRAM, Histogram::Unit::microseconds),
Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, FETCH_KEYS_BYTES_HISTOGRAM, Histogram::Unit::bytes),
Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, FETCH_KEYS_BYTES_PER_SECOND_HISTOGRAM, Histogram::Unit::bytes)
),
instanceID(deterministicRandom()->randomUniqueID().first()),
storage(this, storage), db(db),
lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0),
rebootAfterDurableVersion(std::numeric_limits<Version>::max()),
@ -664,6 +718,9 @@ public:
}
};
const StringRef StorageServer::CurrentRunningFetchKeys::emptyString= LiteralStringRef("");
const KeyRangeRef StorageServer::CurrentRunningFetchKeys::emptyKeyRange = KeyRangeRef(StorageServer::CurrentRunningFetchKeys::emptyString, StorageServer::CurrentRunningFetchKeys::emptyString);
// If and only if key:=value is in (storage+versionedData), // NOT ACTUALLY: and key < allKeys.end,
// and H(key) < |key+value|/bytesPerSample,
// let sampledSize = max(|key+value|,bytesPerSample)
@ -1948,12 +2005,44 @@ ACTOR Future<Void> logFetchKeysWarning(AddingShard* shard) {
}
}
class FetchKeysMetricReporter {
const UID uid;
const double startTime;
int fetchedBytes;
StorageServer::FetchKeysHistograms& histograms;
StorageServer::CurrentRunningFetchKeys& currentRunning;
public:
FetchKeysMetricReporter(const UID& uid_, const double startTime_, const KeyRange& keyRange, StorageServer::FetchKeysHistograms& histograms_, StorageServer::CurrentRunningFetchKeys& currentRunning_)
: uid(uid_), startTime(startTime_), fetchedBytes(0), histograms(histograms_), currentRunning(currentRunning_) {
currentRunning.recordStart(uid, keyRange);
}
void addFetchedBytes(const int bytes) { fetchedBytes += bytes; }
~FetchKeysMetricReporter() {
const double latency = now() - startTime;
if (latency == 0) return;
const uint32_t bandwidth = fetchedBytes / latency;
histograms.latency->sampleSeconds(latency);
histograms.bytes->sample(fetchedBytes);
histograms.bandwidth->sample(bandwidth);
currentRunning.recordFinish(uid);
}
};
ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
state const UID fetchKeysID = deterministicRandom()->randomUniqueID();
state TraceInterval interval("FetchKeys");
state KeyRange keys = shard->keys;
state Future<Void> warningLogger = logFetchKeysWarning(shard);
state const double startTime = now();
state int fetchBlockBytes = BUGGIFY ? SERVER_KNOBS->BUGGIFY_BLOCK_BYTES : SERVER_KNOBS->FETCH_BLOCK_BYTES;
state FetchKeysMetricReporter metricReporter(fetchKeysID, startTime, keys, data->fetchKeysHistograms, data->currentRunningFetchKeys);
// delay(0) to force a return to the run loop before the work of fetchKeys is started.
// This allows adding->start() to be called inline with CSK.
@ -2031,6 +2120,7 @@ ACTOR Future<Void> fetchKeys( StorageServer *data, AddingShard* shard ) {
debugKeyRange("fetchRange", fetchVersion, keys);
for(auto k = this_block.begin(); k != this_block.end(); ++k) debugMutation("fetch", fetchVersion, MutationRef(MutationRef::SetValue, k->key, k->value));
metricReporter.addFetchedBytes(expectedSize);
data->counters.bytesFetched += expectedSize;
if( fetchBlockBytes > expectedSize ) {
holdingFKPL.release( fetchBlockBytes - expectedSize );