Add DDSketch to mako (#7167)

* Add logic for DDSketch in mako

* Return double from percentile() and fix crash in deserialize()

* make sure to serialize and print result from mergeSketchReport()

* clean up comments

* move ddsketch into its own file

* remove LatencySampleBin and add DDSketch to ThreadStatistics

* Update DDSketch implementation

* remove assertions that cause circular references

* add DDSketchMako as a subsclass from DDSketch

* Merge branch 'ddsketch_mako' of github.com:sfc-gh-khoxha/foundationdb into ddsketch_mako

* Revert "Merge branch 'ddsketch_mako' of github.com:sfc-gh-khoxha/foundationdb into ddsketch_mako"

This reverts commit cc29a68aefd1b385b563bfbaa09a32e399c0d233.

* add ddsketch mako class and rename export flag

* remove redundant decimal roundings

* print max/min/avg from ddsketch

* remove latency sample bin completly

* Make ThreadStatistics dump latency to a file and read from file in printReport()

* make sure to add latency data from file to final stats

* change mergeSketchReport to use new ThreadStatistics serialization (1)

* use C-style string arrays in Arguments instead of std::string

* remove unused header

* only serialize non-empty sketches

* fix CentOS build error

* Update report file count properly

* avoid deserializing empty sketches

* fix segmentation fault when getting file name for export_sketch_path

* make sure to properly add file to report_files list

* fix printing bugs when running in report mode

* fix incorrect insertion of report files

* don't use range based loop for char array

* don't reset args.num_report_files

* Update the usage info for new options

* switch to using std::vector for sketches instead of std::array

* make sure to use true/false instead of 1/0 for booleans

* remove op_name if not being used

* remove fp code in dumpThreadSamples

* replace lambda with function in printReport

* merge and print stats in seperate functions

* make sure to exit after printing report

* address review feedback

* make defaultMin, defaultMax static and move setBucketSize to protected

* switch to reverse iterators when moving backwards along bucket array
This commit is contained in:
Kevin Hoxha 2022-06-06 18:19:31 -07:00 committed by GitHub
parent 5f1a061e3a
commit e579038018
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 773 additions and 371 deletions

View File

@ -70,8 +70,6 @@ void ResumableStateForPopulate::runOneTick() {
const auto tx_duration = watch_tx.diff();
stats.addLatency(OP_COMMIT, commit_latency);
stats.addLatency(OP_TRANSACTION, tx_duration);
sample_bins[OP_COMMIT].put(commit_latency);
sample_bins[OP_TRANSACTION].put(tx_duration);
}
stats.incrOpCount(OP_COMMIT);
stats.incrOpCount(OP_TRANSACTION);
@ -190,7 +188,6 @@ void ResumableStateForRunWorkload::updateStepStats() {
const auto step_latency = watch_step.diff();
if (do_sample) {
stats.addLatency(OP_COMMIT, step_latency);
sample_bins[OP_COMMIT].put(step_latency);
}
tx.reset();
stats.incrOpCount(OP_COMMIT);
@ -204,7 +201,6 @@ void ResumableStateForRunWorkload::updateStepStats() {
if (do_sample) {
const auto op_latency = watch_op.diff();
stats.addLatency(iter.op, op_latency);
sample_bins[iter.op].put(op_latency);
}
stats.incrOpCount(iter.op);
}
@ -248,8 +244,6 @@ void ResumableStateForRunWorkload::onTransactionSuccess() {
const auto tx_duration = watch_tx.diff();
stats.addLatency(OP_COMMIT, commit_latency);
stats.addLatency(OP_TRANSACTION, tx_duration);
sample_bins[OP_COMMIT].put(commit_latency);
sample_bins[OP_TRANSACTION].put(tx_duration);
}
stats.incrOpCount(OP_COMMIT);
stats.incrOpCount(OP_TRANSACTION);
@ -270,7 +264,6 @@ void ResumableStateForRunWorkload::onTransactionSuccess() {
if (stats.getOpCount(OP_TRANSACTION) % args.sampling == 0) {
const auto tx_duration = watch_tx.diff();
stats.addLatency(OP_TRANSACTION, tx_duration);
sample_bins[OP_TRANSACTION].put(tx_duration);
}
stats.incrOpCount(OP_TRANSACTION);
watch_tx.startFromStop();

View File

@ -42,7 +42,6 @@ struct ResumableStateForPopulate : std::enable_shared_from_this<ResumableStateFo
Arguments const& args;
ThreadStatistics& stats;
std::atomic<int>& stopcount;
LatencySampleBinArray sample_bins;
int key_begin;
int key_end;
int key_checkpoint;
@ -84,7 +83,6 @@ struct ResumableStateForRunWorkload : std::enable_shared_from_this<ResumableStat
std::atomic<int> const& signal;
int max_iters;
OpIterator iter;
LatencySampleBinArray sample_bins;
fdb::ByteString key1;
fdb::ByteString key2;
fdb::ByteString val;

View File

@ -0,0 +1,275 @@
/*
* DDSketch.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef DDSKETCH_H
#define DDSKETCH_H
#include <iterator>
#include <limits>
#include <type_traits>
#pragma once
#include <algorithm>
#include <cassert>
#include <cmath>
#include <vector>
// A namespace for fast log() computation.
namespace fastLogger {
// Basically, the goal is to compute log(x)/log(r).
// For double, it is represented as 2^e*(1+s) (0<=s<1), so our goal becomes
// e*log(2)/log(r)*log(1+s), and we approximate log(1+s) with a cubic function.
// See more details on Datadog's paper, or CubicallyInterpolatedMapping.java in
// https://github.com/DataDog/sketches-java/
inline const double correctingFactor = 1.00988652862227438516; // = 7 / (10 * log(2));
constexpr inline const double A = 6.0 / 35.0, B = -3.0 / 5.0, C = 10.0 / 7.0;
inline double fastlog(double value) {
int e;
double s = frexp(value, &e);
s = s * 2 - 1;
return ((A * s + B) * s + C) * s + e - 1;
}
inline double reverseLog(double index) {
long exponent = floor(index);
// Derived from Cardano's formula
double d0 = B * B - 3 * A * C;
double d1 = 2 * B * B * B - 9 * A * B * C - 27 * A * A * (index - exponent);
double p = cbrt((d1 - sqrt(d1 * d1 - 4 * d0 * d0 * d0)) / 2);
double significandPlusOne = -(B + p + d0 / p) / (3 * A) + 1;
return ldexp(significandPlusOne / 2, exponent + 1);
}
} // namespace fastLogger
// DDSketch for non-negative numbers (those < EPS = 10^-18 are
// treated as 0, and huge numbers (>1/EPS) fail ASSERT). This is the base
// class without a concrete log() implementation.
template <class Impl, class T>
class DDSketchBase {
static constexpr T defaultMin() { return std::numeric_limits<T>::max(); }
static constexpr T defaultMax() {
if constexpr (std::is_floating_point_v<T>) {
return -std::numeric_limits<T>::max();
} else {
return std::numeric_limits<T>::min();
}
}
public:
explicit DDSketchBase(double errorGuarantee)
: errorGuarantee(errorGuarantee), populationSize(0), zeroPopulationSize(0), minValue(defaultMin()),
maxValue(defaultMax()), sum(T()) {}
DDSketchBase<Impl, T>& addSample(T sample) {
// Call it addSample for now, while it is not a sample anymore
if (!populationSize)
minValue = maxValue = sample;
if (sample <= EPS) {
zeroPopulationSize++;
} else {
int index = static_cast<Impl*>(this)->getIndex(sample);
assert(index >= 0 && index < int(buckets.size()));
buckets[index]++;
}
populationSize++;
sum += sample;
maxValue = std::max(maxValue, sample);
minValue = std::min(minValue, sample);
return *this;
}
double mean() const {
if (populationSize == 0)
return 0;
return (double)sum / populationSize;
}
T median() { return percentile(0.5); }
T percentile(double percentile) {
assert(percentile >= 0 && percentile <= 1);
if (populationSize == 0)
return T();
uint64_t targetPercentilePopulation = percentile * (populationSize - 1);
// Now find the tPP-th (0-indexed) element
if (targetPercentilePopulation < zeroPopulationSize)
return T(0);
int index = -1;
bool found = false;
if (percentile <= 0.5) { // count up
uint64_t count = zeroPopulationSize;
for (size_t i = 0; i < buckets.size(); i++) {
if (targetPercentilePopulation < count + buckets[i]) {
// count + buckets[i] = # of numbers so far (from the rightmost to
// this bucket, inclusive), so if target is in this bucket, it should
// means tPP < cnt + bck[i]
found = true;
index = i;
break;
}
count += buckets[i];
}
} else { // and count down
uint64_t count = 0;
for (auto rit = buckets.rbegin(); rit != buckets.rend(); rit++) {
if (targetPercentilePopulation + count + *rit >= populationSize) {
// cnt + bkt[i] is # of numbers to the right of this bucket (incl.),
// so if target is not in this bucket (i.e., to the left of this
// bucket), it would be as right as the left bucket's rightmost
// number, so we would have tPP + cnt + bkt[i] < total population (tPP
// is 0-indexed), that means target is in this bucket if this
// condition is not satisfied.
found = true;
index = std::distance(rit, buckets.rend()) - 1;
break;
}
count += *rit;
}
}
assert(found);
return static_cast<Impl*>(this)->getValue(index);
}
T min() const { return minValue; }
T max() const { return maxValue; }
void clear() {
std::fill(buckets.begin(), buckets.end(), 0);
populationSize = zeroPopulationSize = 0;
sum = 0;
minValue = defaultMin();
maxValue = defaultMax();
}
uint64_t getPopulationSize() const { return populationSize; }
double getErrorGurantee() const { return errorGuarantee; }
DDSketchBase<Impl, T>& mergeWith(const DDSketchBase<Impl, T>& anotherSketch) {
// Must have the same guarantee
assert(fabs(errorGuarantee - anotherSketch.errorGuarantee) < EPS &&
anotherSketch.buckets.size() == buckets.size());
for (size_t i = 0; i < anotherSketch.buckets.size(); i++) {
buckets[i] += anotherSketch.buckets[i];
}
populationSize += anotherSketch.populationSize;
zeroPopulationSize += anotherSketch.zeroPopulationSize;
minValue = std::min(minValue, anotherSketch.minValue);
maxValue = std::max(maxValue, anotherSketch.maxValue);
sum += anotherSketch.sum;
return *this;
}
constexpr static double EPS = 1e-18; // smaller numbers are considered as 0
protected:
double errorGuarantee; // As defined in the paper
uint64_t populationSize, zeroPopulationSize; // we need to separately count 0s
std::vector<uint64_t> buckets;
T minValue, maxValue, sum;
void setBucketSize(int capacity) { buckets.resize(capacity, 0); }
};
// DDSketch with fast log implementation for float numbers
template <class T>
class DDSketch : public DDSketchBase<DDSketch<T>, T> {
public:
explicit DDSketch(double errorGuarantee = 0.005)
: DDSketchBase<DDSketch<T>, T>(errorGuarantee), gamma((1.0 + errorGuarantee) / (1.0 - errorGuarantee)),
multiplier(fastLogger::correctingFactor * log(2) / log(gamma)) {
offset = getIndex(1.0 / DDSketchBase<DDSketch<T>, T>::EPS);
this->setBucketSize(2 * offset);
}
int getIndex(T sample) {
static_assert(__BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__, "Do not support non-little-endian systems");
return ceil(fastLogger::fastlog(sample) * multiplier) + offset;
}
T getValue(int index) { return fastLogger::reverseLog((index - offset) / multiplier) * 2.0 / (1 + gamma); }
private:
double gamma, multiplier;
int offset = 0;
};
// DDSketch with <cmath> log. Slow and only use this when others doesn't work.
template <class T>
class DDSketchSlow : public DDSketchBase<DDSketchSlow<T>, T> {
public:
DDSketchSlow(double errorGuarantee = 0.1)
: DDSketchBase<DDSketchSlow<T>, T>(errorGuarantee), gamma((1.0 + errorGuarantee) / (1.0 - errorGuarantee)),
logGamma(log(gamma)) {
offset = getIndex(1.0 / DDSketchBase<DDSketch<T>, T>::EPS) + 5;
this->setBucketSize(2 * offset);
}
int getIndex(T sample) { return ceil(log(sample) / logGamma) + offset; }
T getValue(int index) { return (T)(2.0 * pow(gamma, (index - offset)) / (1 + gamma)); }
private:
double gamma, logGamma;
int offset = 0;
};
// DDSketch for unsigned int. Faster than the float version. Fixed accuracy.
class DDSketchFastUnsigned : public DDSketchBase<DDSketchFastUnsigned, unsigned> {
public:
DDSketchFastUnsigned() : DDSketchBase<DDSketchFastUnsigned, unsigned>(errorGuarantee) { this->setBucketSize(129); }
int getIndex(unsigned sample) {
__uint128_t v = sample;
v *= v;
v *= v; // sample^4
uint64_t low = (uint64_t)v, high = (uint64_t)(v >> 64);
return 128 - (high == 0 ? ((low == 0 ? 64 : __builtin_clzll(low)) + 64) : __builtin_clzll(high));
}
unsigned getValue(int index) {
double r = 1, g = gamma;
while (index) { // quick power method for power(gamma, index)
if (index & 1)
r *= g;
g *= g;
index >>= 1;
}
// 2.0 * pow(gamma, index) / (1 + gamma) is what we need
return (unsigned)(2.0 * r / (1 + gamma) + 0.5); // round to nearest int
}
private:
constexpr static double errorGuarantee = 0.08642723372;
// getIndex basically calc floor(log_2(x^4)) + 1,
// which is almost ceil(log_2(x^4)) as it only matters when x is a power of 2,
// and it does not change the error bound. Original sketch asks for
// ceil(log_r(x)), so we know r = pow(2, 1/4) = 1.189207115. And r = (1 + eG)
// / (1 - eG) so eG = 0.08642723372.
constexpr static double gamma = 1.189207115;
};
#endif

View File

@ -18,12 +18,14 @@
* limitations under the License.
*/
#include <array>
#include <cassert>
#include <cmath>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <fstream>
#include <map>
#include <new>
#include <string>
@ -43,6 +45,7 @@
#include <fmt/format.h>
#include <fmt/printf.h>
#include <fdb_api.hpp>
#include <unordered_map>
#include "fdbclient/zipf.h"
#include "async.hpp"
@ -63,7 +66,6 @@ struct alignas(64) ThreadArgs {
int worker_id;
int thread_id;
pid_t parent_id;
LatencySampleBinArray sample_bins;
Arguments const* args;
shared_memory::Access shm;
fdb::Database database; // database to work with
@ -121,8 +123,7 @@ int populate(Transaction tx,
int worker_id,
int thread_id,
int thread_tps,
ThreadStatistics& stats,
LatencySampleBinArray& sample_bins) {
ThreadStatistics& stats) {
const auto key_begin = insertBegin(args.rows, worker_id, thread_id, args.num_processes, args.num_threads);
const auto key_end = insertEnd(args.rows, worker_id, thread_id, args.num_processes, args.num_threads);
auto xacts = 0;
@ -197,8 +198,6 @@ int populate(Transaction tx,
const auto tx_duration = watch_tx.diff();
stats.addLatency(OP_COMMIT, commit_latency);
stats.addLatency(OP_TRANSACTION, tx_duration);
sample_bins[OP_COMMIT].put(commit_latency);
sample_bins[OP_TRANSACTION].put(tx_duration);
}
stats.incrOpCount(OP_COMMIT);
stats.incrOpCount(OP_TRANSACTION);
@ -219,7 +218,6 @@ int populate(Transaction tx,
int runOneTransaction(Transaction& tx,
Arguments const& args,
ThreadStatistics& stats,
LatencySampleBinArray& sample_bins,
ByteString& key1,
ByteString& key2,
ByteString& val) {
@ -271,7 +269,6 @@ transaction_begin:
if (do_sample) {
const auto step_latency = watch_step.diff();
stats.addLatency(OP_COMMIT, step_latency);
sample_bins[OP_COMMIT].put(step_latency);
}
tx.reset();
stats.incrOpCount(OP_COMMIT);
@ -286,7 +283,6 @@ transaction_begin:
if (do_sample) {
const auto op_latency = watch_op.diff();
stats.addLatency(op, op_latency);
sample_bins[op].put(op_latency);
}
stats.incrOpCount(op);
}
@ -304,7 +300,6 @@ transaction_begin:
if (do_sample) {
const auto commit_latency = watch_commit.diff();
stats.addLatency(OP_COMMIT, commit_latency);
sample_bins[OP_COMMIT].put(commit_latency);
}
stats.incrOpCount(OP_COMMIT);
} else {
@ -323,7 +318,6 @@ transaction_begin:
// one transaction has completed successfully
if (do_sample) {
const auto tx_duration = watch_tx.stop().diff();
sample_bins[OP_TRANSACTION].put(tx_duration);
stats.addLatency(OP_TRANSACTION, tx_duration);
}
stats.incrOpCount(OP_TRANSACTION);
@ -339,7 +333,6 @@ int runWorkload(Transaction tx,
int const thread_iters,
std::atomic<int> const& signal,
ThreadStatistics& stats,
LatencySampleBinArray& sample_bins,
int const dotrace,
int const dotagging) {
auto traceid = std::string{};
@ -421,7 +414,7 @@ int runWorkload(Transaction tx,
}
}
rc = runOneTransaction(tx, args, stats, sample_bins, key1, key2, val);
rc = runOneTransaction(tx, args, stats, key1, key2, val);
if (rc) {
logr.warn("runOneTransaction failed ({})", rc);
}
@ -446,11 +439,15 @@ std::string getStatsFilename(std::string_view dirname, int worker_id, int thread
return fmt::format("{}/{}_{}_{}", dirname, worker_id + 1, thread_id + 1, opTable[op].name());
}
std::string getStatsFilename(std::string_view dirname, int worker_id, int thread_id) {
return fmt::format("{}/{}_{}", dirname, worker_id + 1, thread_id + 1);
}
void dumpThreadSamples(Arguments const& args,
pid_t parent_id,
int worker_id,
int thread_id,
const LatencySampleBinArray& sample_bins,
const ThreadStatistics& stats,
bool overwrite = true) {
const auto dirname = fmt::format("{}{}", TEMP_DATA_STORE, parent_id);
const auto rc = mkdir(dirname.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
@ -460,14 +457,7 @@ void dumpThreadSamples(Arguments const& args,
}
for (auto op = 0; op < MAX_OP; op++) {
if (args.txnspec.ops[op][OP_COUNT] > 0 || isAbstractOp(op)) {
const auto filename = getStatsFilename(dirname, worker_id, thread_id, op);
auto fp = fopen(filename.c_str(), overwrite ? "w" : "a");
if (!fp) {
logr.error("fopen({}): {}", filename, strerror(errno));
continue;
}
auto fclose_guard = ExitGuard([fp]() { fclose(fp); });
sample_bins[op].forEachBlock([fp](auto ptr, auto count) { fwrite(ptr, sizeof(*ptr) * count, 1, fp); });
stats.writeToFile(getStatsFilename(dirname, worker_id, thread_id, op), op);
}
}
}
@ -481,7 +471,7 @@ void runAsyncWorkload(Arguments const& args,
auto dump_samples = [&args, pid_main, worker_id](auto&& states) {
auto overwrite = true; /* overwrite or append */
for (const auto& state : states) {
dumpThreadSamples(args, pid_main, worker_id, 0 /*thread_id*/, state->sample_bins, overwrite);
dumpThreadSamples(args, pid_main, worker_id, 0 /*thread_id*/, state->stats, overwrite);
overwrite = false;
}
};
@ -585,28 +575,25 @@ void workerThread(ThreadArgs& thread_args) {
usleep(10000); /* 10ms */
}
auto& sample_bins = thread_args.sample_bins;
if (args.mode == MODE_CLEAN) {
auto rc = cleanup(tx, args);
if (rc < 0) {
logr.error("cleanup failed");
}
} else if (args.mode == MODE_BUILD) {
auto rc = populate(tx, args, worker_id, thread_id, thread_tps, stats, sample_bins);
auto rc = populate(tx, args, worker_id, thread_id, thread_tps, stats);
if (rc < 0) {
logr.error("populate failed");
}
} else if (args.mode == MODE_RUN) {
auto rc = runWorkload(
tx, args, thread_tps, throttle_factor, thread_iters, signal, stats, sample_bins, dotrace, dotagging);
auto rc = runWorkload(tx, args, thread_tps, throttle_factor, thread_iters, signal, stats, dotrace, dotagging);
if (rc < 0) {
logr.error("runWorkload failed");
}
}
if (args.mode == MODE_BUILD || args.mode == MODE_RUN) {
dumpThreadSamples(args, parent_id, worker_id, thread_id, sample_bins);
dumpThreadSamples(args, parent_id, worker_id, thread_id, stats);
}
}
@ -743,13 +730,6 @@ int workerProcessMain(Arguments const& args, int worker_id, shared_memory::Acces
this_args.args = &args;
this_args.shm = shm;
this_args.database = databases[i % args.num_databases];
/* for ops to run, pre-allocate one latency sample block */
for (auto op = 0; op < MAX_OP; op++) {
if (args.txnspec.ops[op][OP_COUNT] > 0 || isAbstractOp(op)) {
this_args.sample_bins[op].reserveOneBlock();
}
}
worker_threads[i] = std::thread(workerThread, std::ref(this_args));
}
/* wait for everyone to finish */
@ -835,6 +815,7 @@ int initArguments(Arguments& args) {
args.client_threads_per_version = 0;
args.disable_ryw = 0;
args.json_output_path[0] = '\0';
args.stats_export_path[0] = '\0';
args.bg_materialize_files = false;
args.bg_file_path[0] = '\0';
args.distributed_tracer_client = 0;
@ -994,7 +975,7 @@ void usage() {
printf("%-24s %s\n", " --tpsinterval=SEC", "Specify the TPS change interval (Default: 10 seconds)");
printf("%-24s %s\n", " --tpschange=<sin|square|pulse>", "Specify the TPS change type (Default: sin)");
printf("%-24s %s\n", " --sampling=RATE", "Specify the sampling rate for latency stats");
printf("%-24s %s\n", "-m, --mode=MODE", "Specify the mode (build, run, clean)");
printf("%-24s %s\n", "-m, --mode=MODE", "Specify the mode (build, run, clean, report)");
printf("%-24s %s\n", "-z, --zipf", "Use zipfian distribution instead of uniform distribution");
printf("%-24s %s\n", " --commitget", "Commit GETs");
printf("%-24s %s\n", " --loggroup=LOGGROUP", "Set client logr group");
@ -1016,6 +997,9 @@ void usage() {
printf("%-24s %s\n",
" --bg_file_path=PATH",
"Read blob granule files from the local filesystem at PATH and materialize the results.");
printf("%-24s %s\n",
" --stats_export_path=PATH",
"Write the serialized DDSketch data to file at PATH. Can be used in either run or build mode.");
printf(
"%-24s %s\n", " --distributed_tracer_client=CLIENT", "Specify client (disabled, network_lossy, log_file)");
}
@ -1069,6 +1053,7 @@ int parseArguments(int argc, char* argv[], Arguments& args) {
{ "disable_ryw", no_argument, NULL, ARG_DISABLE_RYW },
{ "json_report", optional_argument, NULL, ARG_JSON_REPORT },
{ "bg_file_path", required_argument, NULL, ARG_BG_FILE_PATH },
{ "stats_export_path", optional_argument, NULL, ARG_EXPORT_PATH },
{ "distributed_tracer_client", required_argument, NULL, ARG_DISTRIBUTED_TRACER_CLIENT },
{ NULL, 0, NULL, 0 }
};
@ -1131,6 +1116,19 @@ int parseArguments(int argc, char* argv[], Arguments& args) {
args.mode = MODE_BUILD;
} else if (strcmp(optarg, "run") == 0) {
args.mode = MODE_RUN;
} else if (strcmp(optarg, "report") == 0) {
args.mode = MODE_REPORT;
int i = optind;
for (; i < argc; i++) {
if (argv[i][0] != '-') {
const std::string report_file = argv[i];
strncpy(args.report_files[args.num_report_files], report_file.c_str(), report_file.size());
args.num_report_files++;
} else {
optind = i - 1;
break;
}
}
}
break;
case ARG_ASYNC:
@ -1257,6 +1255,16 @@ int parseArguments(int argc, char* argv[], Arguments& args) {
case ARG_BG_FILE_PATH:
args.bg_materialize_files = true;
strncpy(args.bg_file_path, optarg, std::min(sizeof(args.bg_file_path), strlen(optarg) + 1));
case ARG_EXPORT_PATH:
if (optarg == NULL && (argv[optind] == NULL || (argv[optind] != NULL && argv[optind][0] == '-'))) {
char default_file[] = "sketch_data.json";
strncpy(args.stats_export_path, default_file, sizeof(default_file));
} else {
strncpy(args.stats_export_path,
argv[optind],
std::min(sizeof(args.stats_export_path), strlen(argv[optind]) + 1));
}
break;
case ARG_DISTRIBUTED_TRACER_CLIENT:
if (strcmp(optarg, "disabled") == 0) {
args.distributed_tracer_client = 0;
@ -1335,6 +1343,20 @@ int validateArguments(Arguments const& args) {
return -1;
}
}
// ensure that all of the files provided to mako are valid and exist
if (args.mode == MODE_REPORT) {
if (!args.num_report_files) {
logr.error("No files to merge");
}
for (int i = 0; i < args.num_report_files; i++) {
struct stat buffer;
if (stat(args.report_files[i], &buffer) != 0) {
logr.error("Couldn't open file {}", args.report_files[i]);
return -1;
}
}
}
if (args.distributed_tracer_client < 0) {
logr.error("--disibuted_tracer_client must specify either (disabled, network_lossy, log_file)");
return -1;
@ -1447,6 +1469,248 @@ void printStatsHeader(Arguments const& args, bool show_commit, bool is_first_hea
fmt::print("\n");
}
void printThreadStats(ThreadStatistics& final_stats, Arguments args, FILE* fp, bool is_report = false) {
if (is_report) {
for (auto op = 0; op < MAX_OP; op++) {
if (final_stats.getLatencySampleCount(op) > 0 && op != OP_COMMIT && op != OP_TRANSACTION) {
args.txnspec.ops[op][OP_COUNT] = 1;
}
}
}
fmt::print("Latency (us)");
printStatsHeader(args, true, false, true);
/* Total Samples */
putTitle("Samples");
bool first_op = true;
for (auto op = 0; op < MAX_OP; op++) {
if (args.txnspec.ops[op][OP_COUNT] > 0 || isAbstractOp(op)) {
auto sample_size = final_stats.getLatencySampleCount(op);
if (sample_size > 0) {
putField(sample_size);
} else {
putField("N/A");
}
if (fp) {
if (first_op) {
first_op = false;
} else {
fmt::fprintf(fp, ",");
}
fmt::fprintf(fp, "\"%s\": %lu", getOpName(op), sample_size);
}
}
}
fmt::print("\n");
/* Min Latency */
if (fp) {
fmt::fprintf(fp, "}, \"minLatency\": {");
}
putTitle("Min");
first_op = true;
for (auto op = 0; op < MAX_OP; op++) {
if (args.txnspec.ops[op][OP_COUNT] > 0 || isAbstractOp(op)) {
const auto lat_min = final_stats.getLatencyUsMin(op);
if (lat_min == -1) {
putField("N/A");
} else {
putField(lat_min);
if (fp) {
if (first_op) {
first_op = false;
} else {
fmt::fprintf(fp, ",");
}
fmt::fprintf(fp, "\"%s\": %lu", getOpName(op), lat_min);
}
}
}
}
fmt::print("\n");
/* Avg Latency */
if (fp) {
fmt::fprintf(fp, "}, \"avgLatency\": {");
}
putTitle("Avg");
first_op = true;
for (auto op = 0; op < MAX_OP; op++) {
if (args.txnspec.ops[op][OP_COUNT] > 0 || isAbstractOp(op)) {
if (final_stats.getLatencySampleCount(op) > 0) {
putField(final_stats.mean(op));
if (fp) {
if (first_op) {
first_op = false;
} else {
fmt::fprintf(fp, ",");
}
fmt::fprintf(fp, "\"%s\": %lu", getOpName(op), final_stats.mean(op));
}
} else {
putField("N/A");
}
}
}
fmt::printf("\n");
/* Max Latency */
if (fp) {
fmt::fprintf(fp, "}, \"maxLatency\": {");
}
putTitle("Max");
first_op = true;
for (auto op = 0; op < MAX_OP; op++) {
if (args.txnspec.ops[op][OP_COUNT] > 0 || isAbstractOp(op)) {
const auto lat_max = final_stats.getLatencyUsMax(op);
if (lat_max == 0) {
putField("N/A");
} else {
putField(lat_max);
if (fp) {
if (first_op) {
first_op = false;
} else {
fmt::fprintf(fp, ",");
}
fmt::fprintf(fp, "\"%s\": %lu", getOpName(op), final_stats.getLatencyUsMax(op));
}
}
}
}
fmt::print("\n");
/* Median Latency */
if (fp) {
fmt::fprintf(fp, "}, \"medianLatency\": {");
}
putTitle("Median");
first_op = true;
for (auto op = 0; op < MAX_OP; op++) {
if (args.txnspec.ops[op][OP_COUNT] > 0 || isAbstractOp(op)) {
const auto lat_total = final_stats.getLatencyUsTotal(op);
const auto lat_samples = final_stats.getLatencySampleCount(op);
if (lat_total && lat_samples) {
auto median = final_stats.percentile(op, 0.5);
putField(median);
if (fp) {
if (first_op) {
first_op = false;
} else {
fmt::fprintf(fp, ",");
}
fmt::fprintf(fp, "\"%s\": %lu", getOpName(op), median);
}
} else {
putField("N/A");
}
}
}
fmt::print("\n");
/* 95%ile Latency */
if (fp) {
fmt::fprintf(fp, "}, \"p95Latency\": {");
}
putTitle("95.0 pctile");
first_op = true;
for (auto op = 0; op < MAX_OP; op++) {
if (args.txnspec.ops[op][OP_COUNT] > 0 || isAbstractOp(op)) {
if (!final_stats.getLatencySampleCount(op) || !final_stats.getLatencyUsTotal(op)) {
putField("N/A");
continue;
}
const auto point_95pct = final_stats.percentile(op, 0.95);
putField(point_95pct);
if (fp) {
if (first_op) {
first_op = false;
} else {
fmt::fprintf(fp, ",");
}
fmt::fprintf(fp, "\"%s\": %lu", getOpName(op), point_95pct);
}
}
}
fmt::printf("\n");
/* 99%ile Latency */
if (fp) {
fmt::fprintf(fp, "}, \"p99Latency\": {");
}
putTitle("99.0 pctile");
first_op = true;
for (auto op = 0; op < MAX_OP; op++) {
if (args.txnspec.ops[op][OP_COUNT] > 0 || isAbstractOp(op)) {
if (!final_stats.getLatencySampleCount(op) || !final_stats.getLatencyUsTotal(op)) {
putField("N/A");
continue;
}
const auto point_99pct = final_stats.percentile(op, 0.99);
putField(point_99pct);
if (fp) {
if (first_op) {
first_op = false;
} else {
fmt::fprintf(fp, ",");
}
fmt::fprintf(fp, "\"%s\": %lu", getOpName(op), point_99pct);
}
}
}
fmt::print("\n");
/* 99.9%ile Latency */
if (fp) {
fmt::fprintf(fp, "}, \"p99.9Latency\": {");
}
putTitle("99.9 pctile");
first_op = true;
for (auto op = 0; op < MAX_OP; op++) {
if (args.txnspec.ops[op][OP_COUNT] > 0 || isAbstractOp(op)) {
if (!final_stats.getLatencySampleCount(op) || !final_stats.getLatencyUsTotal(op)) {
putField("N/A");
continue;
}
const auto point_99_9pct = final_stats.percentile(op, 0.999);
putField(point_99_9pct);
if (fp) {
if (first_op) {
first_op = false;
} else {
fmt::fprintf(fp, ",");
}
fmt::fprintf(fp, "\"%s\": %lu", getOpName(op), point_99_9pct);
}
}
}
fmt::print("\n");
if (fp) {
fmt::fprintf(fp, "}}");
}
}
void loadSample(int pid_main, int op, std::vector<DDSketchMako>& data_points, int process_id, int thread_id) {
const auto dirname = fmt::format("{}{}", TEMP_DATA_STORE, pid_main);
const auto filename = getStatsFilename(dirname, process_id, thread_id, op);
std::ifstream fp{ filename };
std::ostringstream sstr;
sstr << fp.rdbuf();
DDSketchMako sketch;
rapidjson::Document doc;
doc.Parse(sstr.str().c_str());
if (!doc.HasParseError()) {
sketch.deserialize(doc);
if (data_points[op].getPopulationSize() > 0) {
data_points[op].mergeWith(sketch);
} else {
data_points[op] = sketch;
}
}
}
void printReport(Arguments const& args,
ThreadStatistics const* stats,
double const duration_sec,
@ -1520,7 +1784,7 @@ void printReport(Arguments const& args,
putField(final_stats.getOpCount(op));
if (fp) {
if (first_op) {
first_op = 0;
first_op = false;
} else {
fmt::fprintf(fp, ",");
}
@ -1544,13 +1808,13 @@ void printReport(Arguments const& args,
/* Errors */
putTitle("Errors");
first_op = 1;
first_op = true;
for (auto op = 0; op < MAX_OP; op++) {
if (args.txnspec.ops[op][OP_COUNT] > 0 && op != OP_TRANSACTION) {
putField(final_stats.getErrorCount(op));
if (fp) {
if (first_op) {
first_op = 0;
first_op = false;
} else {
fmt::fprintf(fp, ",");
}
@ -1563,262 +1827,29 @@ void printReport(Arguments const& args,
}
fmt::print("\n\n");
fmt::print("Latency (us)");
printStatsHeader(args, true, false, true);
/* Total Samples */
putTitle("Samples");
first_op = 1;
// Get the sketches stored in file and merge them together
std::vector<DDSketchMako> data_points(MAX_OP);
for (auto op = 0; op < MAX_OP; op++) {
if (args.txnspec.ops[op][OP_COUNT] > 0 || isAbstractOp(op)) {
if (final_stats.getLatencyUsTotal(op)) {
putField(final_stats.getLatencySampleCount(op));
} else {
putField("N/A");
}
if (fp) {
if (first_op) {
first_op = 0;
} else {
fmt::fprintf(fp, ",");
}
fmt::fprintf(fp, "\"%s\": %lu", getOpName(op), final_stats.getLatencySampleCount(op));
}
}
}
fmt::print("\n");
for (auto i = 0; i < args.num_processes; i++) {
/* Min Latency */
if (fp) {
fmt::fprintf(fp, "}, \"minLatency\": {");
}
putTitle("Min");
first_op = 1;
for (auto op = 0; op < MAX_OP; op++) {
if (args.txnspec.ops[op][OP_COUNT] > 0 || isAbstractOp(op)) {
const auto lat_min = final_stats.getLatencyUsMin(op);
if (lat_min == -1) {
putField("N/A");
} else {
putField(lat_min);
if (fp) {
if (first_op) {
first_op = 0;
} else {
fmt::fprintf(fp, ",");
}
fmt::fprintf(fp, "\"%s\": %lu", getOpName(op), lat_min);
}
}
}
}
fmt::print("\n");
/* Avg Latency */
if (fp) {
fmt::fprintf(fp, "}, \"avgLatency\": {");
}
putTitle("Avg");
first_op = 1;
for (auto op = 0; op < MAX_OP; op++) {
if (args.txnspec.ops[op][OP_COUNT] > 0 || isAbstractOp(op)) {
const auto lat_total = final_stats.getLatencyUsTotal(op);
const auto lat_samples = final_stats.getLatencySampleCount(op);
if (lat_total) {
putField(lat_total / lat_samples);
if (fp) {
if (first_op) {
first_op = 0;
} else {
fmt::fprintf(fp, ",");
}
fmt::fprintf(fp, "\"%s\": %lu", getOpName(op), lat_total / lat_samples);
if (args.async_xacts == 0) {
for (auto j = 0; j < args.num_threads; j++) {
loadSample(pid_main, op, data_points, i, j);
}
} else {
putField("N/A");
// async mode uses only one file per process
loadSample(pid_main, op, data_points, i, 0);
}
}
}
fmt::printf("\n");
final_stats.updateLatencies(data_points);
/* Max Latency */
if (fp) {
fmt::fprintf(fp, "}, \"maxLatency\": {");
}
putTitle("Max");
first_op = 1;
for (auto op = 0; op < MAX_OP; op++) {
if (args.txnspec.ops[op][OP_COUNT] > 0 || isAbstractOp(op)) {
const auto lat_max = final_stats.getLatencyUsMax(op);
if (lat_max == 0) {
putField("N/A");
} else {
putField(lat_max);
if (fp) {
if (first_op) {
first_op = 0;
} else {
fmt::fprintf(fp, ",");
}
fmt::fprintf(fp, "\"%s\": %lu", getOpName(op), final_stats.getLatencyUsMax(op));
}
}
}
}
fmt::print("\n");
printThreadStats(final_stats, args, fp);
auto data_points = std::array<std::vector<uint64_t>, MAX_OP>{};
/* Median Latency */
if (fp) {
fmt::fprintf(fp, "}, \"medianLatency\": {");
}
putTitle("Median");
first_op = 1;
for (auto op = 0; op < MAX_OP; op++) {
if (args.txnspec.ops[op][OP_COUNT] > 0 || isAbstractOp(op)) {
const auto lat_total = final_stats.getLatencyUsTotal(op);
const auto lat_samples = final_stats.getLatencySampleCount(op);
data_points[op].reserve(lat_samples);
if (lat_total && lat_samples) {
for (auto i = 0; i < args.num_processes; i++) {
auto load_sample = [pid_main, op, &data_points](int process_id, int thread_id) {
const auto dirname = fmt::format("{}{}", TEMP_DATA_STORE, pid_main);
const auto filename = getStatsFilename(dirname, process_id, thread_id, op);
auto fp = fopen(filename.c_str(), "r");
if (!fp) {
logr.error("fopen({}): {}", filename, strerror(errno));
return;
}
auto fclose_guard = ExitGuard([fp]() { fclose(fp); });
fseek(fp, 0, SEEK_END);
const auto num_points = ftell(fp) / sizeof(uint64_t);
fseek(fp, 0, 0);
for (auto index = 0u; index < num_points; index++) {
auto value = uint64_t{};
auto nread = fread(&value, sizeof(uint64_t), 1, fp);
if (nread != 1) {
logr.error("Read sample returned {}", nread);
break;
}
data_points[op].push_back(value);
}
};
if (args.async_xacts == 0) {
for (auto j = 0; j < args.num_threads; j++) {
load_sample(i, j);
}
} else {
// async mode uses only one file per process
load_sample(i, 0);
}
}
std::sort(data_points[op].begin(), data_points[op].end());
const auto num_points = data_points[op].size();
auto median = uint64_t{};
if (num_points & 1) {
median = data_points[op][num_points / 2];
} else {
median = (data_points[op][num_points / 2] + data_points[op][num_points / 2 - 1]) >> 1;
}
putField(median);
if (fp) {
if (first_op) {
first_op = 0;
} else {
fmt::fprintf(fp, ",");
}
fmt::fprintf(fp, "\"%s\": %lu", getOpName(op), median);
}
} else {
putField("N/A");
}
}
}
fmt::print("\n");
/* 95%ile Latency */
if (fp) {
fmt::fprintf(fp, "}, \"p95Latency\": {");
}
putTitle("95.0 pctile");
first_op = 1;
for (auto op = 0; op < MAX_OP; op++) {
if (args.txnspec.ops[op][OP_COUNT] > 0 || isAbstractOp(op)) {
if (data_points[op].empty() || !final_stats.getLatencyUsTotal(op)) {
putField("N/A");
continue;
}
const auto num_points = data_points[op].size();
const auto point_95pct = static_cast<size_t>(std::max(0., (num_points * 0.95) - 1));
putField(data_points[op][point_95pct]);
if (fp) {
if (first_op) {
first_op = 0;
} else {
fmt::fprintf(fp, ",");
}
fmt::fprintf(fp, "\"%s\": %lu", getOpName(op), data_points[op][point_95pct]);
}
}
}
fmt::printf("\n");
/* 99%ile Latency */
if (fp) {
fmt::fprintf(fp, "}, \"p99Latency\": {");
}
putTitle("99.0 pctile");
first_op = 1;
for (auto op = 0; op < MAX_OP; op++) {
if (args.txnspec.ops[op][OP_COUNT] > 0 || isAbstractOp(op)) {
if (data_points[op].empty() || !final_stats.getLatencyUsTotal(op)) {
putField("N/A");
continue;
}
const auto num_points = data_points[op].size();
const auto point_99pct = static_cast<size_t>(std::max(0., (num_points * 0.99) - 1));
putField(data_points[op][point_99pct]);
if (fp) {
if (first_op) {
first_op = 0;
} else {
fmt::fprintf(fp, ",");
}
fmt::fprintf(fp, "\"%s\": %lu", getOpName(op), data_points[op][point_99pct]);
}
}
}
fmt::print("\n");
/* 99.9%ile Latency */
if (fp) {
fmt::fprintf(fp, "}, \"p99.9Latency\": {");
}
putTitle("99.9 pctile");
first_op = 1;
for (auto op = 0; op < MAX_OP; op++) {
if (args.txnspec.ops[op][OP_COUNT] > 0 || isAbstractOp(op)) {
if (data_points[op].empty() || !final_stats.getLatencyUsTotal(op)) {
putField("N/A");
continue;
}
const auto num_points = data_points[op].size();
const auto point_99_9pct = static_cast<size_t>(std::max(0., (num_points * 0.999) - 1));
putField(data_points[op][point_99_9pct]);
if (fp) {
if (first_op) {
first_op = 0;
} else {
fmt::fprintf(fp, ",");
}
fmt::fprintf(fp, "\"%s\": %lu", getOpName(op), data_points[op][point_99_9pct]);
}
}
}
fmt::print("\n");
if (fp) {
fmt::fprintf(fp, "}}");
// export the ddsketch if the flag was set
if (args.stats_export_path[0] != 0) {
std::ofstream f(args.stats_export_path);
f << final_stats;
}
const auto command_remove = fmt::format("rm -rf {}{}", TEMP_DATA_STORE, pid_main);
@ -1960,6 +1991,18 @@ int statsProcessMain(Arguments const& args,
return 0;
}
ThreadStatistics mergeSketchReport(Arguments& args) {
ThreadStatistics stats;
for (int i = 0; i < args.num_report_files; i++) {
std::ifstream f{ args.report_files[i] };
ThreadStatistics tmp;
f >> tmp;
stats.combine(tmp);
}
return stats;
}
int main(int argc, char* argv[]) {
setlinebuf(stdout);
@ -1993,6 +2036,12 @@ int main(int argc, char* argv[]) {
}
}
if (args.mode == MODE_REPORT) {
ThreadStatistics stats = mergeSketchReport(args);
printThreadStats(stats, args, NULL, true);
return 0;
}
const auto pid_main = getpid();
/* create the shared memory for stats */
const auto shmpath = fmt::format("mako{}", pid_main);

View File

@ -44,6 +44,7 @@ constexpr const int MODE_INVALID = -1;
constexpr const int MODE_CLEAN = 0;
constexpr const int MODE_BUILD = 1;
constexpr const int MODE_RUN = 2;
constexpr const int MODE_REPORT = 3;
/* for long arguments */
enum ArgKind {
@ -73,6 +74,7 @@ enum ArgKind {
ARG_CLIENT_THREADS_PER_VERSION,
ARG_JSON_REPORT,
ARG_BG_FILE_PATH, // if blob granule files are stored locally, mako will read and materialize them if this is set
ARG_EXPORT_PATH,
ARG_DISTRIBUTED_TRACER_CLIENT
};
@ -119,6 +121,7 @@ constexpr const int NUM_CLUSTERS_MAX = 3;
constexpr const int NUM_DATABASES_MAX = 10;
constexpr const std::string_view KEY_PREFIX{ "mako" };
constexpr const std::string_view TEMP_DATA_STORE{ "/tmp/makoTemp" };
constexpr const int MAX_REPORT_FILES = 200;
/* benchmark parameters */
struct Arguments {
@ -162,6 +165,9 @@ struct Arguments {
char json_output_path[PATH_MAX];
bool bg_materialize_files;
char bg_file_path[PATH_MAX];
char stats_export_path[PATH_MAX];
char report_files[MAX_REPORT_FILES][PATH_MAX];
int num_report_files;
int distributed_tracer_client;
};

View File

@ -24,76 +24,66 @@
#include <array>
#include <cstdint>
#include <cstring>
#include <fstream>
#include <istream>
#include <limits>
#include <list>
#include <new>
#include <ostream>
#include <utility>
#include "mako/mako.hpp"
#include "operations.hpp"
#include "time.hpp"
#include "ddsketch.hpp"
#include "contrib/rapidjson/rapidjson/document.h"
#include "contrib/rapidjson/rapidjson/rapidjson.h"
#include "contrib/rapidjson/rapidjson/stringbuffer.h"
#include "contrib/rapidjson/rapidjson/writer.h"
#include <iostream>
#include <sstream>
#include <vector>
namespace mako {
/* rough cap on the number of samples to avoid OOM hindering benchmark */
constexpr const size_t SAMPLE_CAP = 2000000;
/* size of each block to get detailed latency for each operation */
constexpr const size_t LAT_BLOCK_SIZE = 4093;
/* hard cap on the number of sample blocks = 488 */
constexpr const size_t MAX_LAT_BLOCKS = SAMPLE_CAP / LAT_BLOCK_SIZE;
/* memory block allocated to each operation when collecting detailed latency */
class LatencySampleBlock {
uint64_t samples[LAT_BLOCK_SIZE]{
0,
};
uint64_t index{ 0 };
class DDSketchMako : public DDSketch<uint64_t> {
public:
LatencySampleBlock() noexcept = default;
bool full() const noexcept { return index >= LAT_BLOCK_SIZE; }
void put(timediff_t td) {
assert(!full());
samples[index++] = toIntegerMicroseconds(td);
}
// return {data block, number of samples}
std::pair<uint64_t const*, size_t> data() const noexcept { return { samples, index }; }
};
void serialize(rapidjson::Writer<rapidjson::StringBuffer>& writer) const {
writer.StartObject();
writer.String("errorGuarantee");
writer.Double(errorGuarantee);
writer.String("minValue");
writer.Uint64(minValue);
writer.String("maxValue");
writer.Uint64(maxValue);
writer.String("populationSize");
writer.Uint64(populationSize);
writer.String("zeroPopulationSize");
writer.Uint64(zeroPopulationSize);
writer.String("sum");
writer.Uint64(sum);
/* collect sampled latencies until OOM is hit */
class LatencySampleBin {
std::list<LatencySampleBlock> blocks;
bool noMoreAlloc{ false };
bool tryAlloc() {
try {
blocks.emplace_back();
} catch (const std::bad_alloc&) {
noMoreAlloc = true;
return false;
writer.String("buckets");
writer.StartArray();
for (auto b : buckets) {
writer.Uint64(b);
}
return true;
}
writer.EndArray();
public:
void reserveOneBlock() {
if (blocks.empty())
tryAlloc();
writer.EndObject();
}
void deserialize(const rapidjson::Value& obj) {
errorGuarantee = obj["errorGuarantee"].GetDouble();
minValue = obj["minValue"].GetUint64();
maxValue = obj["maxValue"].GetUint64();
populationSize = obj["populationSize"].GetUint64();
zeroPopulationSize = obj["zeroPopulationSize"].GetUint64();
sum = obj["sum"].GetUint64();
void put(timediff_t td) {
if (blocks.empty() || blocks.back().full()) {
if (blocks.size() >= MAX_LAT_BLOCKS || noMoreAlloc || !tryAlloc())
return;
}
blocks.back().put(td);
}
// iterate & apply for each block user function void(uint64_t const*, size_t)
template <typename Func>
void forEachBlock(Func&& fn) const {
for (const auto& block : blocks) {
auto [ptr, cnt] = block.data();
fn(ptr, cnt);
auto jsonBuckets = obj["buckets"].GetArray();
uint64_t idx = 0;
for (auto it = jsonBuckets.Begin(); it != jsonBuckets.End(); it++) {
buckets[idx] = it->GetUint64();
idx++;
}
}
};
@ -101,21 +91,20 @@ public:
class alignas(64) ThreadStatistics {
uint64_t conflicts;
uint64_t total_errors;
uint64_t ops[MAX_OP];
uint64_t errors[MAX_OP];
uint64_t latency_samples[MAX_OP];
uint64_t latency_us_total[MAX_OP];
uint64_t latency_us_min[MAX_OP];
uint64_t latency_us_max[MAX_OP];
std::array<uint64_t, MAX_OP> ops;
std::array<uint64_t, MAX_OP> errors;
std::array<uint64_t, MAX_OP> latency_samples;
std::array<uint64_t, MAX_OP> latency_us_total;
std::vector<DDSketchMako> sketches;
public:
ThreadStatistics() noexcept {
memset(this, 0, sizeof(ThreadStatistics));
memset(latency_us_min, 0xff, sizeof(latency_us_min));
sketches.resize(MAX_OP);
}
ThreadStatistics(const ThreadStatistics& other) noexcept = default;
ThreadStatistics& operator=(const ThreadStatistics& other) noexcept = default;
ThreadStatistics(const ThreadStatistics& other) = default;
ThreadStatistics& operator=(const ThreadStatistics& other) = default;
uint64_t getConflictCount() const noexcept { return conflicts; }
@ -129,23 +118,24 @@ public:
uint64_t getLatencyUsTotal(int op) const noexcept { return latency_us_total[op]; }
uint64_t getLatencyUsMin(int op) const noexcept { return latency_us_min[op]; }
uint64_t getLatencyUsMin(int op) const noexcept { return sketches[op].min(); }
uint64_t getLatencyUsMax(int op) const noexcept { return latency_us_max[op]; }
uint64_t getLatencyUsMax(int op) const noexcept { return sketches[op].max(); }
uint64_t percentile(int op, double quantile) { return sketches[op].percentile(quantile); }
uint64_t mean(int op) const noexcept { return sketches[op].mean(); }
// with 'this' as final aggregation, factor in 'other'
void combine(const ThreadStatistics& other) {
conflicts += other.conflicts;
for (auto op = 0; op < MAX_OP; op++) {
sketches[op].mergeWith(other.sketches[op]);
ops[op] += other.ops[op];
errors[op] += other.errors[op];
total_errors += other.errors[op];
latency_samples[op] += other.latency_samples[op];
latency_us_total[op] += other.latency_us_total[op];
if (latency_us_min[op] > other.latency_us_min[op])
latency_us_min[op] = other.latency_us_min[op];
if (latency_us_max[op] < other.latency_us_max[op])
latency_us_max[op] = other.latency_us_max[op];
}
}
@ -162,15 +152,106 @@ public:
void addLatency(int op, timediff_t diff) noexcept {
const auto latency_us = toIntegerMicroseconds(diff);
latency_samples[op]++;
sketches[op].addSample(latency_us);
latency_us_total[op] += latency_us;
if (latency_us_min[op] > latency_us)
latency_us_min[op] = latency_us;
if (latency_us_max[op] < latency_us)
latency_us_max[op] = latency_us;
}
void writeToFile(const std::string& filename, int op) const {
rapidjson::StringBuffer ss;
rapidjson::Writer<rapidjson::StringBuffer> writer(ss);
sketches[op].serialize(writer);
std::ofstream f(filename);
f << ss.GetString();
}
void updateLatencies(const std::vector<DDSketchMako> other_sketches) { sketches = other_sketches; }
friend std::ofstream& operator<<(std::ofstream& os, ThreadStatistics& stats);
friend std::ifstream& operator>>(std::ifstream& is, ThreadStatistics& stats);
};
using LatencySampleBinArray = std::array<LatencySampleBin, MAX_OP>;
inline std::ofstream& operator<<(std::ofstream& os, ThreadStatistics& stats) {
rapidjson::StringBuffer ss;
rapidjson::Writer<rapidjson::StringBuffer> writer(ss);
writer.StartObject();
writer.String("conflicts");
writer.Uint64(stats.conflicts);
writer.String("total_errors");
writer.Uint64(stats.total_errors);
writer.String("ops");
writer.StartArray();
for (auto op = 0; op < MAX_OP; op++) {
writer.Uint64(stats.ops[op]);
}
writer.EndArray();
writer.String("errors");
writer.StartArray();
for (auto op = 0; op < MAX_OP; op++) {
writer.Uint64(stats.errors[op]);
}
writer.EndArray();
writer.String("latency_samples");
writer.StartArray();
for (auto op = 0; op < MAX_OP; op++) {
writer.Uint64(stats.latency_samples[op]);
}
writer.EndArray();
writer.String("latency_us_total");
writer.StartArray();
for (auto op = 0; op < MAX_OP; op++) {
writer.Uint64(stats.latency_us_total[op]);
}
writer.EndArray();
for (auto op = 0; op < MAX_OP; op++) {
if (stats.sketches[op].getPopulationSize() > 0) {
std::string op_name = getOpName(op);
writer.String(op_name.c_str());
stats.sketches[op].serialize(writer);
}
}
writer.EndObject();
os << ss.GetString();
return os;
}
inline void populateArray(std::array<uint64_t, MAX_OP>& arr,
rapidjson::GenericArray<false, rapidjson::GenericValue<rapidjson::UTF8<>>>& json) {
uint64_t idx = 0;
for (auto it = json.Begin(); it != json.End(); it++) {
arr[idx] = it->GetUint64();
idx++;
}
}
inline std::ifstream& operator>>(std::ifstream& is, ThreadStatistics& stats) {
std::stringstream buffer;
buffer << is.rdbuf();
rapidjson::Document doc;
doc.Parse(buffer.str().c_str());
stats.conflicts = doc["conflicts"].GetUint64();
stats.total_errors = doc["total_errors"].GetUint64();
auto jsonOps = doc["ops"].GetArray();
auto jsonErrors = doc["errors"].GetArray();
auto jsonLatencySamples = doc["latency_samples"].GetArray();
auto jsonLatencyUsTotal = doc["latency_us_total"].GetArray();
populateArray(stats.ops, jsonOps);
populateArray(stats.errors, jsonErrors);
populateArray(stats.latency_samples, jsonLatencySamples);
populateArray(stats.latency_us_total, jsonLatencyUsTotal);
for (int op = 0; op < MAX_OP; op++) {
const std::string op_name = getOpName(op);
stats.sketches[op].deserialize(doc[op_name.c_str()]);
}
return is;
}
} // namespace mako