1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-05-28 10:52:03 +08:00

Merge pull request from nblintao/dd-bandwidth-in-ss

Metrics to compare the bandwidth used by data distributions and updates
This commit is contained in:
Meng Xu 2021-06-10 10:29:33 -07:00 committed by GitHub
commit a6fd2242dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -95,13 +95,25 @@ struct AddingShard : NonCopyable {
Promise<Void> fetchComplete;
Promise<Void> readWrite;
std::deque<Standalone<VerUpdateRef>>
updates; // during the Fetching phase, mutations with key in keys and version>=(fetchClient's) fetchVersion;
// During the Fetching phase, it saves newer mutations whose version is greater or equal to fetchClient's
// fetchVersion, while the shard is still busy catching up with fetchClient. It applies these updates after fetching
// completes.
std::deque<Standalone<VerUpdateRef>> updates;
struct StorageServer* server;
Version transferredVersion;
enum Phase { WaitPrevious, Fetching, Waiting };
// To learn more details of the phase transitions, see function fetchKeys(). The phases below are sorted in
// chronological order and do not go back.
enum Phase {
WaitPrevious,
// During Fetching phase, it fetches data before fetchVersion and write it to storage, then let updater know it
// is ready to update the deferred updates` (see the comment of member variable `updates` above).
Fetching,
// During Waiting phase, it sends updater the deferred updates, and wait until they are durable.
Waiting
// The shard's state is changed from adding to readWrite then.
};
Phase phase;
@ -128,6 +140,7 @@ class ShardInfo : public ReferenceCounted<ShardInfo>, NonCopyable {
: adding(std::move(adding)), readWrite(readWrite), keys(keys) {}
public:
// A shard has 3 mutual exclusive states: adding, readWrite and notAssigned.
std::unique_ptr<AddingShard> adding;
struct StorageServer* readWrite;
KeyRange keys;
@ -284,6 +297,7 @@ const int VERSION_OVERHEAD =
sizeof(Reference<VersionedMap<KeyRef, ValueOrClearToRef>::PTreeT>)); // versioned map [ x2 for
// createNewVersion(version+1) ], 64b
// overhead for map
// For both the mutation log and the versioned map.
static int mvccStorageBytes(MutationRef const& m) {
return VersionedMap<KeyRef, ValueOrClearToRef>::overheadPerItem * 2 +
(MutationRef::OVERHEAD_BYTES + m.param1.size() + m.param2.size()) * 2;
@ -690,9 +704,24 @@ public:
CounterCollection cc;
Counter allQueries, getKeyQueries, getValueQueries, getRangeQueries, finishedQueries, lowPriorityQueries,
rowsQueried, bytesQueried, watchQueries, emptyQueries;
Counter bytesInput, bytesDurable, bytesFetched,
mutationBytes; // Like bytesInput but without MVCC accounting
// Bytes of the mutations that have been added to the memory of the storage server. When the data is durable
// and cleared from the memory, we do not subtract it but add it to bytesDurable.
Counter bytesInput;
// Bytes of the mutations that have been removed from memory because they durable. The counting is same as
// bytesInput, instead of the actual bytes taken in the storages, so that (bytesInput - bytesDurable) can
// reflect the current memory footprint of MVCC.
Counter bytesDurable;
// Bytes fetched by fetchKeys() for data movements. The size is counted as a collection of KeyValueRef.
Counter bytesFetched;
// Like bytesInput but without MVCC accounting. The size is counted as how much it takes when serialized. It
// is basically the size of both parameters of the mutation and a 12 bytes overhead that keeps mutation type
// and the lengths of both parameters.
Counter mutationBytes;
Counter sampledBytesCleared;
// The number of key-value pairs fetched by fetchKeys()
Counter kvFetched;
Counter mutations, setMutations, clearRangeMutations, atomicMutations;
Counter updateBatches, updateVersions;
Counter loops;
@ -712,7 +741,7 @@ public:
bytesQueried("BytesQueried", cc), watchQueries("WatchQueries", cc), emptyQueries("EmptyQueries", cc),
bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), bytesFetched("BytesFetched", cc),
mutationBytes("MutationBytes", cc), sampledBytesCleared("SampledBytesCleared", cc),
mutations("Mutations", cc), setMutations("SetMutations", cc),
kvFetched("KVFetched", cc), mutations("Mutations", cc), setMutations("SetMutations", cc),
clearRangeMutations("ClearRangeMutations", cc), atomicMutations("AtomicMutations", cc),
updateBatches("UpdateBatches", cc), updateVersions("UpdateVersions", cc), loops("Loops", cc),
fetchWaitingMS("FetchWaitingMS", cc), fetchWaitingCount("FetchWaitingCount", cc),
@ -2209,6 +2238,8 @@ Optional<MutationRef> clipMutation(MutationRef const& m, KeyRangeRef range) {
return Optional<MutationRef>();
}
// Return true if the mutation need to be applied, otherwise (it's a CompareAndClear mutation and failed the comparison)
// false.
bool expandMutation(MutationRef& m,
StorageServer::VersionedData const& data,
UpdateEagerReadInfo* eager,
@ -2312,6 +2343,8 @@ void applyMutation(StorageServer* self, MutationRef const& m, Arena& arena, Stor
self->metrics.notify(m.param1, metrics);
if (m.type == MutationRef::SetValue) {
// VersionedMap (data) is bookkeeping all empty ranges. If the key to be set is new, it is supposed to be in a
// range what was empty. Break the empty range into halves.
auto prev = data.atLatest().lastLessOrEqual(m.param1);
if (prev && prev->isClearTo() && prev->getEndKey() > m.param1) {
ASSERT(prev.key() <= m.param1);
@ -2542,19 +2575,28 @@ class FetchKeysMetricReporter {
int fetchedBytes;
StorageServer::FetchKeysHistograms& histograms;
StorageServer::CurrentRunningFetchKeys& currentRunning;
Counter& bytesFetchedCounter;
Counter& kvFetchedCounter;
public:
FetchKeysMetricReporter(const UID& uid_,
const double startTime_,
const KeyRange& keyRange,
StorageServer::FetchKeysHistograms& histograms_,
StorageServer::CurrentRunningFetchKeys& currentRunning_)
: uid(uid_), startTime(startTime_), fetchedBytes(0), histograms(histograms_), currentRunning(currentRunning_) {
StorageServer::CurrentRunningFetchKeys& currentRunning_,
Counter& bytesFetchedCounter,
Counter& kvFetchedCounter)
: uid(uid_), startTime(startTime_), fetchedBytes(0), histograms(histograms_), currentRunning(currentRunning_),
bytesFetchedCounter(bytesFetchedCounter), kvFetchedCounter(kvFetchedCounter) {
currentRunning.recordStart(uid, keyRange);
}
void addFetchedBytes(const int bytes) { fetchedBytes += bytes; }
void addFetchedBytes(const int bytes, const int kvCount) {
fetchedBytes += bytes;
bytesFetchedCounter += bytes;
kvFetchedCounter += kvCount;
}
~FetchKeysMetricReporter() {
double latency = now() - startTime;
@ -2580,8 +2622,13 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
state Future<Void> warningLogger = logFetchKeysWarning(shard);
state const double startTime = now();
state int fetchBlockBytes = BUGGIFY ? SERVER_KNOBS->BUGGIFY_BLOCK_BYTES : SERVER_KNOBS->FETCH_BLOCK_BYTES;
state FetchKeysMetricReporter metricReporter(
fetchKeysID, startTime, keys, data->fetchKeysHistograms, data->currentRunningFetchKeys);
state FetchKeysMetricReporter metricReporter(fetchKeysID,
startTime,
keys,
data->fetchKeysHistograms,
data->currentRunningFetchKeys,
data->counters.bytesFetched,
data->counters.kvFetched);
// delay(0) to force a return to the run loop before the work of fetchKeys is started.
// This allows adding->start() to be called inline with CSK.
@ -2673,8 +2720,8 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
for (auto k = this_block.begin(); k != this_block.end(); ++k)
DEBUG_MUTATION("fetch", fetchVersion, MutationRef(MutationRef::SetValue, k->key, k->value));
metricReporter.addFetchedBytes(expectedSize);
data->counters.bytesFetched += expectedSize;
metricReporter.addFetchedBytes(expectedSize, this_block.size());
if (fetchBlockBytes > expectedSize) {
holdingFKPL.release(fetchBlockBytes - expectedSize);
}
@ -2683,7 +2730,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
// wait( data->fetchKeysStorageWriteLock.take() );
// state FlowLock::Releaser holdingFKSWL( data->fetchKeysStorageWriteLock );
// Write this_block to storage
// Write this_block directly to storage, bypassing update() which write to MVCC in memory.
state KeyValueRef* kvItr = this_block.begin();
for (; kvItr != this_block.end(); ++kvItr) {
data->storage.writeKeyValue(*kvItr);
@ -2805,6 +2852,8 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
Promise<FetchInjectionInfo*> p;
data->readyFetchKeys.push_back(p);
// After we add to the promise readyFetchKeys, update() would provide a pointer to FetchInjectionInfo that we
// can put mutation in.
FetchInjectionInfo* batch = wait(p.getFuture());
TraceEvent(SevDebug, "FKUpdateBatch", data->thisServerID).detail("FKID", interval.pairID);
@ -2859,6 +2908,9 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
keys,
true); // keys will be available when getLatestVersion()==transferredVersion is durable
// Note that since it receives a pointer to FetchInjectionInfo, the thread does not leave this actor until this
// point.
// Wait for the transferredVersion (and therefore the shard data) to be committed and durable.
wait(data->durableVersion.whenAtLeast(shard->transferredVersion));
@ -2922,6 +2974,9 @@ void AddingShard::addMutation(Version version, MutationRef const& mutation) {
if (phase == WaitPrevious) {
// Updates can be discarded
} else if (phase == Fetching) {
// Save incoming mutations (See the comments of member variable `updates`).
// Create a new VerUpdateRef in updates queue if it is a new version.
if (!updates.size() || version > updates.end()[-1].version) {
VerUpdateRef v;
v.version = version;
@ -2930,6 +2985,7 @@ void AddingShard::addMutation(Version version, MutationRef const& mutation) {
} else {
ASSERT(version == updates.end()[-1].version);
}
// Add the mutation to the version.
updates.back().mutations.push_back_deep(updates.back().arena(), mutation);
} else if (phase == Waiting) {
server->addMutation(version, mutation, keys, server->updateEagerReads);
@ -3434,6 +3490,8 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
auto fk = data->readyFetchKeys.back();
data->readyFetchKeys.pop_back();
fk.send(&fii);
// fetchKeys() would put the data it fetched into the fii. The thread will not return back to this actor
// until it was completed.
}
for (auto& c : fii.changes)
@ -3472,6 +3530,8 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
for (; mutationNum < pUpdate->mutations.size(); mutationNum++) {
updater.applyMutation(data, pUpdate->mutations[mutationNum], pUpdate->version);
mutationBytes += pUpdate->mutations[mutationNum].totalSize();
// data->counters.mutationBytes or data->counters.mutations should not be updated because they should
// have counted when the mutations arrive from cursor initially.
injectedChanges = true;
if (mutationBytes > SERVER_KNOBS->DESIRED_UPDATE_BYTES) {
mutationBytes = 0;
@ -4331,6 +4391,7 @@ ACTOR Future<Void> metricsCore(StorageServer* self, StorageServerInterface ssi)
wait(self->byteSampleRecovery);
// Logs all counters in `counters.cc` and reset the interval.
self->actors.add(traceCounters("StorageMetrics",
self->thisServerID,
SERVER_KNOBS->STORAGE_LOGGING_DELAY,