Reenable GlobalTagThrottler_GotClientRate trace events

This commit is contained in:
sfc-gh-tclinkenbeard 2022-07-19 09:25:08 -07:00
parent f197314cb5
commit 8a18fa23cf

View File

@ -405,63 +405,6 @@ class GlobalTagThrottlerImpl {
}
}
Optional<double> getTargetTps(TransactionTag tag, bool& isReadBusy, bool& isWriteBusy) const {
auto const readLimitingTps = getLimitingTps(tag, OpType::READ);
auto const writeLimitingTps = getLimitingTps(tag, OpType::WRITE);
Optional<double> limitingTps;
if (readLimitingTps.present() && writeLimitingTps.present()) {
limitingTps = std::min(readLimitingTps.get(), writeLimitingTps.get());
} else if (readLimitingTps.present()) {
limitingTps = readLimitingTps;
} else {
limitingTps = writeLimitingTps;
}
auto const averageTransactionReadCost = getAverageTransactionCost(tag, OpType::READ);
auto const averageTransactionWriteCost = getAverageTransactionCost(tag, OpType::WRITE);
auto const readDesiredTps = getDesiredTps(tag, OpType::READ, averageTransactionReadCost);
auto const writeDesiredTps = getDesiredTps(tag, OpType::WRITE, averageTransactionWriteCost);
Optional<double> desiredTps;
if (readDesiredTps.present() && writeDesiredTps.present()) {
desiredTps = std::min(readDesiredTps.get(), writeDesiredTps.get());
} else if (readDesiredTps.present()) {
desiredTps = readDesiredTps;
} else {
desiredTps = writeDesiredTps;
}
if (!desiredTps.present()) {
return {};
}
if (readLimitingTps.present() && readLimitingTps.get() < readDesiredTps.orDefault(0)) {
isReadBusy = true;
}
if (writeLimitingTps.present() && writeLimitingTps.get() < writeDesiredTps.orDefault(0)) {
isWriteBusy = true;
}
auto const readReservedTps = getReservedTps(tag, OpType::READ, averageTransactionReadCost);
auto const writeReservedTps = getReservedTps(tag, OpType::WRITE, averageTransactionWriteCost);
Optional<double> reservedTps;
if (readReservedTps.present() && writeReservedTps.present()) {
reservedTps = std::max(readReservedTps.get(), writeReservedTps.get());
} else if (readReservedTps.present()) {
reservedTps = readReservedTps;
} else {
reservedTps = writeReservedTps;
}
auto targetTps = desiredTps.get();
if (limitingTps.present()) {
targetTps = std::min(targetTps, limitingTps.get());
}
if (reservedTps.present()) {
targetTps = std::max(targetTps, reservedTps.get());
}
return targetTps;
}
public:
GlobalTagThrottlerImpl(Database db, UID id) : db(db), id(id) {}
Future<Void> monitorThrottlingChanges() { return monitorThrottlingChanges(this); }
@ -472,22 +415,63 @@ public:
lastBusyReadTagCount = lastBusyWriteTagCount = 0;
for (auto& [tag, stats] : tagStatistics) {
// Currently there is no differentiation between batch priority and default priority transactions
auto isReadBusy = false;
auto isWriteBusy = false;
auto const targetTps = getTargetTps(tag, isReadBusy, isWriteBusy);
if (isReadBusy) {
++lastBusyReadTagCount;
auto const readLimitingTps = getLimitingTps(tag, OpType::READ);
auto const writeLimitingTps = getLimitingTps(tag, OpType::WRITE);
Optional<double> limitingTps;
if (readLimitingTps.present() && writeLimitingTps.present()) {
limitingTps = std::min(readLimitingTps.get(), writeLimitingTps.get());
} else if (readLimitingTps.present()) {
limitingTps = readLimitingTps;
} else {
limitingTps = writeLimitingTps;
}
if (isWriteBusy) {
++lastBusyWriteTagCount;
auto const averageTransactionReadCost = getAverageTransactionCost(tag, OpType::READ);
auto const averageTransactionWriteCost = getAverageTransactionCost(tag, OpType::WRITE);
auto const readDesiredTps = getDesiredTps(tag, OpType::READ, averageTransactionReadCost);
auto const writeDesiredTps = getDesiredTps(tag, OpType::WRITE, averageTransactionWriteCost);
Optional<double> desiredTps;
if (readDesiredTps.present() && writeDesiredTps.present()) {
desiredTps = std::min(readDesiredTps.get(), writeDesiredTps.get());
} else if (readDesiredTps.present()) {
desiredTps = readDesiredTps;
} else {
desiredTps = writeDesiredTps;
}
if (!targetTps.present()) {
if (!desiredTps.present()) {
continue;
}
auto const clientRate = stats.updateAndGetPerClientLimit(targetTps.get());
if (readLimitingTps.present() && readLimitingTps.get() < readDesiredTps.orDefault(0)) {
++lastBusyReadTagCount;
}
if (writeLimitingTps.present() && writeLimitingTps.get() < writeDesiredTps.orDefault(0)) {
++lastBusyWriteTagCount;
}
auto const readReservedTps = getReservedTps(tag, OpType::READ, averageTransactionReadCost);
auto const writeReservedTps = getReservedTps(tag, OpType::WRITE, averageTransactionWriteCost);
Optional<double> reservedTps;
if (readReservedTps.present() && writeReservedTps.present()) {
reservedTps = std::max(readReservedTps.get(), writeReservedTps.get());
} else if (readReservedTps.present()) {
reservedTps = readReservedTps;
} else {
reservedTps = writeReservedTps;
}
auto targetTps = desiredTps.get();
if (limitingTps.present()) {
targetTps = std::min(targetTps, limitingTps.get());
}
if (reservedTps.present()) {
targetTps = std::max(targetTps, reservedTps.get());
}
auto const clientRate = stats.updateAndGetPerClientLimit(targetTps);
result[TransactionPriority::BATCH][tag] = result[TransactionPriority::DEFAULT][tag] = clientRate;
/*
TraceEvent("GlobalTagThrottler_GotClientRate", id)
.detail("Tag", printable(tag))
.detail("TargetTps", targetTps)
@ -498,10 +482,10 @@ public:
.detail("ReservedTps", reservedTps)
.detail("DesiredTps", desiredTps)
.detail("NumStorageServers", throughput.size());
*/
}
return result;
}
int64_t autoThrottleCount() const {
int64_t result{ 0 };
for (const auto& [tag, stats] : tagStatistics) {