foundationdb/fdbserver/GlobalTagThrottler.actor.cpp
Markus Pilman 1de37afd52
Make TEST macros C++ only (#7558)
* proof of concept

* use code-probe instead of test

* code probe working on gcc

* code probe implemented

* renamed TestProbe to CodeProbe

* fixed refactoring typo

* support filtered output

* print probes at end of simulation

* fix missed probes print

* fix deduplication

* Fix refactoring issues

* revert bad refactor

* make sure file paths are relative

* fix more wrong refactor changes
2022-07-19 13:15:51 -07:00

534 lines
21 KiB
C++

/*
* GlobalTagThrottler.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/FDBTypes.h"
#include "fdbclient/TagThrottle.actor.h"
#include "fdbrpc/Smoother.h"
#include "fdbserver/TagThrottler.h"
#include <limits>
#include "flow/actorcompiler.h" // must be last include
class GlobalTagThrottlerImpl {
class QuotaAndCounters {
Optional<ThrottleApi::TagQuotaValue> quota;
std::unordered_map<UID, double> ssToReadCostRate;
std::unordered_map<UID, double> ssToWriteCostRate;
Smoother totalReadCostRate;
Smoother totalWriteCostRate;
Smoother transactionCounter;
Smoother perClientRate;
Optional<double> getReadTPSLimit() const {
if (totalReadCostRate.smoothTotal() > 0) {
return quota.get().totalReadQuota * transactionCounter.smoothRate() / totalReadCostRate.smoothTotal();
} else {
return {};
}
}
Optional<double> getWriteTPSLimit() const {
if (totalWriteCostRate.smoothTotal() > 0) {
return quota.get().totalWriteQuota * transactionCounter.smoothRate() / totalWriteCostRate.smoothTotal();
} else {
return {};
}
}
public:
QuotaAndCounters()
: totalReadCostRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME),
totalWriteCostRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME),
transactionCounter(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME),
perClientRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME) {}
void setQuota(ThrottleApi::TagQuotaValue const& quota) { this->quota = quota; }
void updateReadCostRate(UID ssId, double newReadCostRate) {
auto& currentReadCostRate = ssToReadCostRate[ssId];
auto diff = newReadCostRate - currentReadCostRate;
currentReadCostRate += diff;
totalReadCostRate.addDelta(diff);
}
void updateWriteCostRate(UID ssId, double newWriteCostRate) {
auto& currentWriteCostRate = ssToWriteCostRate[ssId];
auto diff = newWriteCostRate - currentWriteCostRate;
currentWriteCostRate += diff;
totalWriteCostRate.addDelta(diff);
}
void addTransactions(int count) { transactionCounter.addDelta(count); }
Optional<double> getTargetTotalTPSLimit() const {
if (!quota.present())
return {};
auto readLimit = getReadTPSLimit();
auto writeLimit = getWriteTPSLimit();
// TODO: Implement expiration logic
if (!readLimit.present() && !writeLimit.present()) {
return {};
} else {
if (!readLimit.present()) {
return writeLimit.get();
} else if (!writeLimit.present()) {
return readLimit.get();
} else {
return std::min(readLimit.get(), writeLimit.get());
}
}
}
Optional<ClientTagThrottleLimits> updateAndGetPerClientLimit() {
auto targetRate = getTargetTotalTPSLimit();
if (targetRate.present() && transactionCounter.smoothRate() > 0) {
auto newPerClientRate = std::max(
SERVER_KNOBS->GLOBAL_TAG_THROTTLING_MIN_RATE,
std::min(targetRate.get(),
(targetRate.get() / transactionCounter.smoothRate()) * perClientRate.smoothTotal()));
perClientRate.setTotal(newPerClientRate);
return ClientTagThrottleLimits(perClientRate.getTotal(), ClientTagThrottleLimits::NO_EXPIRATION);
} else {
return {};
}
}
void processTraceEvent(TraceEvent& te) const {
if (quota.present()) {
te.detail("ProvidedReadTPSLimit", getReadTPSLimit())
.detail("ProvidedWriteTPSLimit", getWriteTPSLimit())
.detail("ReadCostRate", totalReadCostRate.smoothTotal())
.detail("WriteCostRate", totalWriteCostRate.smoothTotal())
.detail("TotalReadQuota", quota.get().totalReadQuota)
.detail("ReservedReadQuota", quota.get().reservedReadQuota)
.detail("TotalWriteQuota", quota.get().totalWriteQuota)
.detail("ReservedWriteQuota", quota.get().reservedWriteQuota);
}
}
};
Database db;
UID id;
std::map<TransactionTag, QuotaAndCounters> trackedTags;
uint64_t throttledTagChangeId{ 0 };
Future<Void> traceActor;
ACTOR static Future<Void> tracer(GlobalTagThrottlerImpl const* self) {
loop {
for (const auto& [tag, quotaAndCounters] : self->trackedTags) {
TraceEvent te("GlobalTagThrottling");
te.detail("Tag", tag);
quotaAndCounters.processTraceEvent(te);
}
wait(delay(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_TRACE_INTERVAL));
}
}
ACTOR static Future<Void> monitorThrottlingChanges(GlobalTagThrottlerImpl* self) {
loop {
state ReadYourWritesTransaction tr(self->db);
loop {
// TODO: Clean up quotas that have been removed
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
state RangeResult currentQuotas = wait(tr.getRange(tagQuotaKeys, CLIENT_KNOBS->TOO_MANY));
TraceEvent("GlobalTagThrottler_ReadCurrentQuotas").detail("Size", currentQuotas.size());
for (auto const kv : currentQuotas) {
auto const tag = kv.key.removePrefix(tagQuotaPrefix);
auto const quota = ThrottleApi::TagQuotaValue::fromValue(kv.value);
self->trackedTags[tag].setQuota(quota);
}
++self->throttledTagChangeId;
// FIXME: Should wait on watch instead
// wait(tr.watch(tagThrottleSignalKey));
wait(delay(5.0));
TraceEvent("GlobalTagThrottler_ChangeSignaled");
CODE_PROBE(true, "Global tag throttler detected quota changes");
break;
} catch (Error& e) {
TraceEvent("GlobalTagThrottlerMonitoringChangesError", self->id).error(e);
wait(tr.onError(e));
}
}
}
}
public:
GlobalTagThrottlerImpl(Database db, UID id) : db(db), id(id) { traceActor = tracer(this); }
Future<Void> monitorThrottlingChanges() { return monitorThrottlingChanges(this); }
void addRequests(TransactionTag tag, int count) { trackedTags[tag].addTransactions(count); }
uint64_t getThrottledTagChangeId() const { return throttledTagChangeId; }
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() {
// TODO: For now, only enforce total throttling rates.
// We should use reserved quotas as well.
PrioritizedTransactionTagMap<ClientTagThrottleLimits> result;
for (auto& [tag, quotaAndCounters] : trackedTags) {
// Currently there is no differentiation between batch priority and default priority transactions
auto const limit = quotaAndCounters.updateAndGetPerClientLimit();
if (limit.present()) {
result[TransactionPriority::BATCH][tag] = result[TransactionPriority::DEFAULT][tag] = limit.get();
}
}
return result;
}
int64_t autoThrottleCount() const { return trackedTags.size(); }
uint32_t busyReadTagCount() const {
// TODO: Implement
return 0;
}
uint32_t busyWriteTagCount() const {
// TODO: Implement
return 0;
}
int64_t manualThrottleCount() const { return trackedTags.size(); }
Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const& ss) {
for (const auto& busyReadTag : ss.busiestReadTags) {
trackedTags[busyReadTag.tag].updateReadCostRate(ss.id, busyReadTag.rate);
}
for (const auto& busyWriteTag : ss.busiestWriteTags) {
trackedTags[busyWriteTag.tag].updateWriteCostRate(ss.id, busyWriteTag.rate);
}
// TODO: Call ThrottleApi::throttleTags
return Void();
}
void setQuota(TransactionTagRef tag, ThrottleApi::TagQuotaValue const& tagQuotaValue) {
trackedTags[tag].setQuota(tagQuotaValue);
}
};
GlobalTagThrottler::GlobalTagThrottler(Database db, UID id) : impl(PImpl<GlobalTagThrottlerImpl>::create(db, id)) {}
GlobalTagThrottler::~GlobalTagThrottler() = default;
Future<Void> GlobalTagThrottler::monitorThrottlingChanges() {
return impl->monitorThrottlingChanges();
}
void GlobalTagThrottler::addRequests(TransactionTag tag, int count) {
return impl->addRequests(tag, count);
}
uint64_t GlobalTagThrottler::getThrottledTagChangeId() const {
return impl->getThrottledTagChangeId();
}
PrioritizedTransactionTagMap<ClientTagThrottleLimits> GlobalTagThrottler::getClientRates() {
return impl->getClientRates();
}
int64_t GlobalTagThrottler::autoThrottleCount() const {
return impl->autoThrottleCount();
}
uint32_t GlobalTagThrottler::busyReadTagCount() const {
return impl->busyReadTagCount();
}
uint32_t GlobalTagThrottler::busyWriteTagCount() const {
return impl->busyWriteTagCount();
}
int64_t GlobalTagThrottler::manualThrottleCount() const {
return impl->manualThrottleCount();
}
bool GlobalTagThrottler::isAutoThrottlingEnabled() const {
return true;
}
Future<Void> GlobalTagThrottler::tryUpdateAutoThrottling(StorageQueueInfo const& ss) {
return impl->tryUpdateAutoThrottling(ss);
}
void GlobalTagThrottler::setQuota(TransactionTagRef tag, ThrottleApi::TagQuotaValue const& tagQuotaValue) {
return impl->setQuota(tag, tagQuotaValue);
}
namespace GlobalTagThrottlerTesting {
Optional<double> getTPSLimit(GlobalTagThrottler& globalTagThrottler, TransactionTag tag) {
auto clientRates = globalTagThrottler.getClientRates();
auto it1 = clientRates.find(TransactionPriority::DEFAULT);
if (it1 != clientRates.end()) {
auto it2 = it1->second.find(tag);
if (it2 != it1->second.end()) {
return it2->second.tpsRate;
}
}
return {};
}
class StorageServerCollection {
class Cost {
Smoother smoother;
public:
Cost() : smoother(5.0) {}
Cost& operator+=(double delta) {
smoother.addDelta(delta);
return *this;
}
double smoothRate() const { return smoother.smoothRate(); }
};
std::vector<std::map<TransactionTag, Cost>> readCosts;
std::vector<std::map<TransactionTag, Cost>> writeCosts;
public:
StorageServerCollection(size_t size) : readCosts(size), writeCosts(size) { ASSERT_GT(size, 0); }
void addReadCost(TransactionTag tag, double cost) {
auto const costPerSS = cost / readCosts.size();
for (auto& readCost : readCosts) {
readCost[tag] += costPerSS;
}
}
void addWriteCost(TransactionTag tag, double cost) {
auto const costPerSS = cost / writeCosts.size();
for (auto& writeCost : writeCosts) {
writeCost[tag] += costPerSS;
}
}
std::vector<StorageQueueInfo> getStorageQueueInfos() const {
std::vector<StorageQueueInfo> result;
result.reserve(readCosts.size());
for (int i = 0; i < readCosts.size(); ++i) {
StorageQueueInfo sqInfo(UID(i, i), LocalityData{});
for (const auto& [tag, readCost] : readCosts[i]) {
double fractionalBusyness{ 0.0 }; // unused for global tag throttling
sqInfo.busiestReadTags.emplace_back(tag, readCost.smoothRate(), fractionalBusyness);
}
for (const auto& [tag, writeCost] : writeCosts[i]) {
double fractionalBusyness{ 0.0 }; // unused for global tag throttling
sqInfo.busiestWriteTags.emplace_back(tag, writeCost.smoothRate(), fractionalBusyness);
}
result.push_back(sqInfo);
}
return result;
}
};
ACTOR static Future<Void> runClient(GlobalTagThrottler* globalTagThrottler,
StorageServerCollection* storageServers,
TransactionTag tag,
double desiredTpsRate,
double costPerTransaction,
bool write) {
loop {
auto tpsLimit = getTPSLimit(*globalTagThrottler, tag);
state double tpsRate = tpsLimit.present() ? std::min<double>(desiredTpsRate, tpsLimit.get()) : desiredTpsRate;
wait(delay(1 / tpsRate));
if (write) {
storageServers->addWriteCost(tag, costPerTransaction);
} else {
storageServers->addReadCost(tag, costPerTransaction);
}
globalTagThrottler->addRequests(tag, 1);
}
}
ACTOR static Future<Void> monitorClientRates(GlobalTagThrottler* globalTagThrottler,
TransactionTag tag,
double desiredTPSLimit) {
state int successes = 0;
loop {
wait(delay(1.0));
auto currentTPSLimit = getTPSLimit(*globalTagThrottler, tag);
if (currentTPSLimit.present()) {
TraceEvent("GlobalTagThrottling_RateMonitor")
.detail("Tag", tag)
.detail("CurrentTPSRate", currentTPSLimit.get())
.detail("DesiredTPSRate", desiredTPSLimit);
if (abs(currentTPSLimit.get() - desiredTPSLimit) < 1.0) {
if (++successes == 3) {
return Void();
}
} else {
successes = 0;
}
} else {
successes = 0;
}
}
}
ACTOR static Future<Void> updateGlobalTagThrottler(GlobalTagThrottler* globalTagThrottler,
StorageServerCollection const* storageServers) {
loop {
wait(delay(1.0));
auto const storageQueueInfos = storageServers->getStorageQueueInfos();
for (const auto& sq : storageQueueInfos) {
globalTagThrottler->tryUpdateAutoThrottling(sq);
}
}
}
} // namespace GlobalTagThrottlerTesting
TEST_CASE("/GlobalTagThrottler/Simple") {
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
ThrottleApi::TagQuotaValue tagQuotaValue;
TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalReadQuota = 100.0;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client =
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, false);
state Future<Void> monitor =
GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 100.0 / 6.0);
state Future<Void> updater =
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
wait(timeoutError(monitor || client || updater, 300.0));
return Void();
}
TEST_CASE("/GlobalTagThrottler/WriteThrottling") {
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
ThrottleApi::TagQuotaValue tagQuotaValue;
TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalWriteQuota = 100.0;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client =
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, true);
state Future<Void> monitor =
GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 100.0 / 6.0);
state Future<Void> updater =
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
wait(timeoutError(monitor || client || updater, 300.0));
return Void();
}
TEST_CASE("/GlobalTagThrottler/MultiTagThrottling") {
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
ThrottleApi::TagQuotaValue tagQuotaValue;
TransactionTag testTag1 = "sampleTag1"_sr;
TransactionTag testTag2 = "sampleTag2"_sr;
tagQuotaValue.totalReadQuota = 100.0;
globalTagThrottler.setQuota(testTag1, tagQuotaValue);
globalTagThrottler.setQuota(testTag2, tagQuotaValue);
state std::vector<Future<Void>> futures;
state std::vector<Future<Void>> monitorFutures;
futures.push_back(
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag1, 5.0, 6.0, false));
futures.push_back(
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag2, 5.0, 6.0, false));
futures.push_back(GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers));
monitorFutures.push_back(GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag1, 100.0 / 6.0));
monitorFutures.push_back(GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag2, 100.0 / 6.0));
wait(timeoutError(waitForAny(futures) || waitForAll(monitorFutures), 300.0));
return Void();
}
TEST_CASE("/GlobalTagThrottler/ActiveThrottling") {
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
ThrottleApi::TagQuotaValue tagQuotaValue;
TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalReadQuota = 100.0;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client =
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 20.0, 10.0, false);
state Future<Void> monitor = GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 10.0);
state Future<Void> updater =
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
wait(timeoutError(monitor || client || updater, 300.0));
return Void();
}
TEST_CASE("/GlobalTagThrottler/MultiClientThrottling") {
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
ThrottleApi::TagQuotaValue tagQuotaValue;
TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalReadQuota = 100.0;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client =
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, false);
state Future<Void> client2 =
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, false);
state Future<Void> monitor =
GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 100.0 / 6.0);
state Future<Void> updater =
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
wait(timeoutError(monitor || client || updater, 300.0));
return Void();
}
TEST_CASE("/GlobalTagThrottler/MultiClientActiveThrottling") {
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
ThrottleApi::TagQuotaValue tagQuotaValue;
TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalReadQuota = 100.0;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client =
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 20.0, 10.0, false);
state Future<Void> client2 =
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 20.0, 10.0, false);
state Future<Void> monitor = GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 5.0);
state Future<Void> updater =
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
wait(timeoutError(monitor || client || updater, 300.0));
return Void();
}
// Global transaction rate should be 20.0, with a distribution of (5, 15) between the 2 clients
TEST_CASE("/GlobalTagThrottler/SkewedMultiClientActiveThrottling") {
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
ThrottleApi::TagQuotaValue tagQuotaValue;
TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalReadQuota = 100.0;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client =
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 5.0, false);
state Future<Void> client2 =
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 25.0, 5.0, false);
state Future<Void> monitor = GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 15.0);
state Future<Void> updater =
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
wait(timeoutError(monitor || client || updater, 300.0));
return Void();
}
// Test that the tag throttler can reach equilibrium, then adjust to a new equilibrium once the quota is changed
TEST_CASE("/GlobalTagThrottler/UpdateQuota") {
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
state ThrottleApi::TagQuotaValue tagQuotaValue;
state TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalReadQuota = 100.0;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client =
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, false);
state Future<Void> monitor =
GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 100.0 / 6.0);
state Future<Void> updater =
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
wait(timeoutError(monitor || client || updater, 300.0));
tagQuotaValue.totalReadQuota = 50.0;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
monitor = GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 50.0 / 6.0);
wait(timeoutError(monitor || client || updater, 300.0));
return Void();
}