mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-14 09:58:50 +08:00
Remove global_tag_throttler status section
This commit is contained in:
parent
ac6889286c
commit
9df990e375
@ -125,20 +125,3 @@ In each test, the `GlobalTagThrottlerTesting::monitor` function is used to perio
|
||||
On the ratekeeper, every `SERVER_KNOBS->TAG_THROTTLE_PUSH_INTERVAL` seconds, the ratekeeper will call `GlobalTagThrottler::getClientRates`. At the end of the rate calculation for each tag, a trace event of type `GlobalTagThrottler_GotClientRate` is produced. This trace event reports the relevant inputs that went in to the rate calculation, and can be used for debugging.
|
||||
|
||||
On storage servers, every `SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL` seconds, there are `BusyReadTag` events for every tag that has sufficient read cost to be reported to the ratekeeper. Both cost and fractional busyness are reported.
|
||||
|
||||
### Status
|
||||
For each storage server, the busiest read tag is reported in the full status output, along with its cost and fractional busyness.
|
||||
|
||||
At path `.cluster.qos.global_tag_throttler`, throttling limitations for each tag are reported:
|
||||
|
||||
```
|
||||
{
|
||||
"<tagName>": {
|
||||
"desired_tps": <number>,
|
||||
"reserved_tps": <number>,
|
||||
"limiting_tps": [<number>|"unset"],
|
||||
"target_tps": <number>
|
||||
},
|
||||
...
|
||||
}
|
||||
```
|
||||
|
@ -168,7 +168,6 @@ class GlobalTagThrottlerImpl {
|
||||
std::unordered_map<UID, Optional<double>> throttlingRatios;
|
||||
std::unordered_map<TransactionTag, PerTagStatistics> tagStatistics;
|
||||
std::unordered_map<UID, std::unordered_map<TransactionTag, ThroughputCounters>> throughput;
|
||||
GlobalTagThrottlerStatusReply statusReply;
|
||||
|
||||
// Returns the cost rate for the given tag on the given storage server
|
||||
Optional<double> getCurrentCost(UID storageServerId, TransactionTag tag, OpType opType) const {
|
||||
@ -422,12 +421,6 @@ class GlobalTagThrottlerImpl {
|
||||
.detail("DesiredTps", desiredTps)
|
||||
.detail("NumStorageServers", throughput.size());
|
||||
|
||||
auto& tagStats = statusReply.status[tag];
|
||||
tagStats.desiredTps = desiredTps.get();
|
||||
tagStats.limitingTps = limitingTps;
|
||||
tagStats.targetTps = targetTps.get();
|
||||
tagStats.reservedTps = reservedTps.get();
|
||||
|
||||
return targetTps;
|
||||
}
|
||||
|
||||
@ -440,7 +433,6 @@ public:
|
||||
PrioritizedTransactionTagMap<double> getProxyRates(int numProxies) {
|
||||
PrioritizedTransactionTagMap<double> result;
|
||||
lastBusyReadTagCount = lastBusyWriteTagCount = 0;
|
||||
statusReply = {};
|
||||
|
||||
for (auto& [tag, stats] : tagStatistics) {
|
||||
// Currently there is no differentiation between batch priority and default priority transactions
|
||||
@ -468,7 +460,6 @@ public:
|
||||
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() {
|
||||
PrioritizedTransactionTagMap<ClientTagThrottleLimits> result;
|
||||
lastBusyReadTagCount = lastBusyWriteTagCount = 0;
|
||||
statusReply = {};
|
||||
|
||||
for (auto& [tag, stats] : tagStatistics) {
|
||||
// Currently there is no differentiation between batch priority and default priority transactions
|
||||
@ -524,8 +515,6 @@ public:
|
||||
}
|
||||
|
||||
void removeQuota(TransactionTagRef tag) { tagStatistics[tag].clearQuota(); }
|
||||
|
||||
GlobalTagThrottlerStatusReply getStatus() const { return statusReply; }
|
||||
};
|
||||
|
||||
GlobalTagThrottler::GlobalTagThrottler(Database db, UID id) : impl(PImpl<GlobalTagThrottlerImpl>::create(db, id)) {}
|
||||
@ -574,10 +563,6 @@ void GlobalTagThrottler::removeQuota(TransactionTagRef tag) {
|
||||
return impl->removeQuota(tag);
|
||||
}
|
||||
|
||||
GlobalTagThrottlerStatusReply GlobalTagThrottler::getGlobalTagThrottlerStatusReply() const {
|
||||
return impl->getStatus();
|
||||
}
|
||||
|
||||
namespace GlobalTagThrottlerTesting {
|
||||
|
||||
enum class LimitType { RESERVED, TOTAL };
|
||||
@ -727,11 +712,6 @@ bool isNear(Optional<double> a, Optional<double> b) {
|
||||
}
|
||||
}
|
||||
|
||||
bool isNear(GlobalTagThrottlerStatusReply::TagStats const& a, GlobalTagThrottlerStatusReply::TagStats const& b) {
|
||||
return isNear(a.desiredTps, b.desiredTps) && isNear(a.targetTps, b.targetTps) &&
|
||||
isNear(a.reservedTps, b.reservedTps) && isNear(a.limitingTps, b.limitingTps);
|
||||
}
|
||||
|
||||
bool targetRateIsNear(GlobalTagThrottler& globalTagThrottler, TransactionTag tag, Optional<double> expected) {
|
||||
Optional<double> rate;
|
||||
auto targetRates = globalTagThrottler.getProxyRates(1);
|
||||
@ -766,22 +746,6 @@ bool clientRateIsNear(GlobalTagThrottler& globalTagThrottler, TransactionTag tag
|
||||
return isNear(rate, expected);
|
||||
}
|
||||
|
||||
bool statusIsNear(GlobalTagThrottler const& globalTagThrottler,
|
||||
TransactionTag tag,
|
||||
GlobalTagThrottlerStatusReply::TagStats expectedStats) {
|
||||
auto const stats = globalTagThrottler.getGlobalTagThrottlerStatusReply().status[tag];
|
||||
TraceEvent("GlobalTagThrottling_StatusMonitor")
|
||||
.detail("DesiredTps", stats.desiredTps)
|
||||
.detail("ExpectedDesiredTps", expectedStats.desiredTps)
|
||||
.detail("LimitingTps", stats.limitingTps)
|
||||
.detail("ExpectedLimitingTps", expectedStats.limitingTps)
|
||||
.detail("TargetTps", stats.targetTps)
|
||||
.detail("ExpectedTargetTps", expectedStats.targetTps)
|
||||
.detail("ReservedTps", stats.reservedTps)
|
||||
.detail("ExpectedReservedTps", expectedStats.reservedTps);
|
||||
return isNear(stats, expectedStats);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> updateGlobalTagThrottler(GlobalTagThrottler* globalTagThrottler,
|
||||
StorageServerCollection const* storageServers) {
|
||||
loop {
|
||||
@ -1120,27 +1084,3 @@ TEST_CASE("/GlobalTagThrottler/ReservedWriteQuota") {
|
||||
wait(timeoutError(monitor || client || updater, 300.0));
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/GlobalTagThrottler/Status") {
|
||||
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
|
||||
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10, 100);
|
||||
GlobalTagThrottlerStatusReply::TagStats expectedStats;
|
||||
ThrottleApi::TagQuotaValue tagQuotaValue;
|
||||
TransactionTag testTag = "sampleTag1"_sr;
|
||||
tagQuotaValue.totalReadQuota = tagQuotaValue.totalWriteQuota = 100.0;
|
||||
globalTagThrottler.setQuota(testTag, tagQuotaValue);
|
||||
expectedStats.desiredTps = 100.0 / 6.0;
|
||||
expectedStats.limitingTps = {};
|
||||
expectedStats.targetTps = 100.0 / 6.0;
|
||||
expectedStats.reservedTps = 0.0;
|
||||
state Future<Void> client = GlobalTagThrottlerTesting::runClient(
|
||||
&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, GlobalTagThrottlerTesting::OpType::READ);
|
||||
state Future<Void> monitor =
|
||||
GlobalTagThrottlerTesting::monitor(&globalTagThrottler, [testTag, expectedStats](auto& gtt) {
|
||||
return GlobalTagThrottlerTesting::statusIsNear(gtt, testTag, expectedStats);
|
||||
});
|
||||
state Future<Void> updater =
|
||||
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
|
||||
wait(timeoutError(monitor || client || updater, 300.0));
|
||||
return Void();
|
||||
}
|
||||
|
@ -520,10 +520,6 @@ public:
|
||||
TraceEvent("RatekeeperHalted", rkInterf.id()).detail("ReqID", req.requesterID);
|
||||
break;
|
||||
}
|
||||
when(GlobalTagThrottlerStatusRequest req = waitNext(rkInterf.getGlobalTagThrottlerStatus.getFuture())) {
|
||||
req.reply.send(self.tagThrottler->getGlobalTagThrottlerStatusReply());
|
||||
break;
|
||||
}
|
||||
when(ReportCommitCostEstimationRequest req =
|
||||
waitNext(rkInterf.reportCommitCostEstimation.getFuture())) {
|
||||
self.updateCommitCostEstimation(req.ssTrTagCommitCost);
|
||||
|
@ -2159,16 +2159,9 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(
|
||||
timeoutError(rkWorker.interf.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("RkUpdate"))), 1.0);
|
||||
state Future<TraceEventFields> f2 = timeoutError(
|
||||
rkWorker.interf.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("RkUpdateBatch"))), 1.0);
|
||||
state Future<GlobalTagThrottlerStatusReply> f3 =
|
||||
SERVER_KNOBS->GLOBAL_TAG_THROTTLING
|
||||
? timeoutError(db->get().ratekeeper.get().getGlobalTagThrottlerStatus.getReply(
|
||||
GlobalTagThrottlerStatusRequest{}),
|
||||
1.0)
|
||||
: Future<GlobalTagThrottlerStatusReply>(GlobalTagThrottlerStatusReply{});
|
||||
wait(success(f1) && success(f2) && success(f3));
|
||||
wait(success(f1) && success(f2));
|
||||
TraceEventFields ratekeeper = f1.get();
|
||||
TraceEventFields batchRatekeeper = f2.get();
|
||||
auto const globalTagThrottlerStatus = f3.get();
|
||||
|
||||
bool autoThrottlingEnabled = ratekeeper.getInt("AutoThrottlingEnabled");
|
||||
double tpsLimit = ratekeeper.getDouble("TPSLimit");
|
||||
@ -2230,23 +2223,6 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(
|
||||
|
||||
(*qos)["throttled_tags"] = throttledTagsObj;
|
||||
|
||||
if (SERVER_KNOBS->GLOBAL_TAG_THROTTLING) {
|
||||
JsonBuilderObject globalTagThrottlerObj;
|
||||
for (const auto& [tag, tagStats] : globalTagThrottlerStatus.status) {
|
||||
JsonBuilderObject tagStatsObj;
|
||||
tagStatsObj["desired_tps"] = tagStats.desiredTps;
|
||||
tagStatsObj["reserved_tps"] = tagStats.reservedTps;
|
||||
if (tagStats.limitingTps.present()) {
|
||||
tagStatsObj["limiting_tps"] = tagStats.limitingTps.get();
|
||||
} else {
|
||||
tagStatsObj["limiting_tps"] = "<unset>"_sr;
|
||||
}
|
||||
tagStatsObj["target_tps"] = tagStats.targetTps;
|
||||
globalTagThrottlerObj[printable(tag)] = tagStatsObj;
|
||||
}
|
||||
(*qos)["global_tag_throttler"] = globalTagThrottlerObj;
|
||||
}
|
||||
|
||||
JsonBuilderObject perfLimit = getPerfLimit(ratekeeper, transPerSec, tpsLimit);
|
||||
if (!perfLimit.empty()) {
|
||||
(*qos)["performance_limited_by"] = perfLimit;
|
||||
|
@ -34,7 +34,6 @@ struct RatekeeperInterface {
|
||||
RequestStream<struct ReportCommitCostEstimationRequest> reportCommitCostEstimation;
|
||||
struct LocalityData locality;
|
||||
UID myId;
|
||||
RequestStream<struct GlobalTagThrottlerStatusRequest> getGlobalTagThrottlerStatus;
|
||||
|
||||
RatekeeperInterface() {}
|
||||
explicit RatekeeperInterface(const struct LocalityData& l, UID id) : locality(l), myId(id) {}
|
||||
@ -47,14 +46,7 @@ struct RatekeeperInterface {
|
||||
|
||||
template <class Archive>
|
||||
void serialize(Archive& ar) {
|
||||
serializer(ar,
|
||||
waitFailure,
|
||||
getRateInfo,
|
||||
haltRatekeeper,
|
||||
reportCommitCostEstimation,
|
||||
locality,
|
||||
myId,
|
||||
getGlobalTagThrottlerStatus);
|
||||
serializer(ar, waitFailure, getRateInfo, haltRatekeeper, reportCommitCostEstimation, locality, myId);
|
||||
}
|
||||
};
|
||||
|
||||
@ -167,39 +159,4 @@ struct ReportCommitCostEstimationRequest {
|
||||
}
|
||||
};
|
||||
|
||||
struct GlobalTagThrottlerStatusReply {
|
||||
constexpr static FileIdentifier file_identifier = 9510482;
|
||||
|
||||
struct TagStats {
|
||||
constexpr static FileIdentifier file_identifier = 6018293;
|
||||
double desiredTps;
|
||||
Optional<double> limitingTps;
|
||||
double targetTps;
|
||||
double reservedTps;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, desiredTps, limitingTps, targetTps, reservedTps);
|
||||
}
|
||||
};
|
||||
|
||||
std::unordered_map<TransactionTag, TagStats> status;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, status);
|
||||
}
|
||||
};
|
||||
|
||||
struct GlobalTagThrottlerStatusRequest {
|
||||
constexpr static FileIdentifier file_identifier = 5620934;
|
||||
|
||||
ReplyPromise<struct GlobalTagThrottlerStatusReply> reply;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, reply);
|
||||
}
|
||||
};
|
||||
|
||||
#endif // FDBSERVER_RATEKEEPERINTERFACE_H
|
||||
|
@ -50,8 +50,6 @@ public:
|
||||
virtual int64_t manualThrottleCount() const = 0;
|
||||
virtual bool isAutoThrottlingEnabled() const = 0;
|
||||
|
||||
virtual GlobalTagThrottlerStatusReply getGlobalTagThrottlerStatusReply() const = 0;
|
||||
|
||||
// Based on the busiest read and write tags in the provided storage queue info, update
|
||||
// tag throttling limits.
|
||||
virtual Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const&) = 0;
|
||||
@ -75,7 +73,6 @@ public:
|
||||
int64_t manualThrottleCount() const override;
|
||||
bool isAutoThrottlingEnabled() const override;
|
||||
Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const&) override;
|
||||
GlobalTagThrottlerStatusReply getGlobalTagThrottlerStatusReply() const override { return {}; }
|
||||
};
|
||||
|
||||
class GlobalTagThrottler : public ITagThrottler {
|
||||
@ -99,8 +96,6 @@ public:
|
||||
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() override;
|
||||
PrioritizedTransactionTagMap<double> getProxyRates(int numProxies) override;
|
||||
|
||||
GlobalTagThrottlerStatusReply getGlobalTagThrottlerStatusReply() const override;
|
||||
|
||||
// Testing only:
|
||||
public:
|
||||
void setQuota(TransactionTagRef, ThrottleApi::TagQuotaValue const&);
|
||||
|
Loading…
x
Reference in New Issue
Block a user