diff --git a/fdbserver/GlobalTagThrottler.actor.cpp b/fdbserver/GlobalTagThrottler.actor.cpp index 79c7661795..b615f035c5 100644 --- a/fdbserver/GlobalTagThrottler.actor.cpp +++ b/fdbserver/GlobalTagThrottler.actor.cpp @@ -405,63 +405,6 @@ class GlobalTagThrottlerImpl { } } - Optional<double> getTargetTps(TransactionTag tag, bool& isReadBusy, bool& isWriteBusy) const { - auto const readLimitingTps = getLimitingTps(tag, OpType::READ); - auto const writeLimitingTps = getLimitingTps(tag, OpType::WRITE); - Optional<double> limitingTps; - if (readLimitingTps.present() && writeLimitingTps.present()) { - limitingTps = std::min(readLimitingTps.get(), writeLimitingTps.get()); - } else if (readLimitingTps.present()) { - limitingTps = readLimitingTps; - } else { - limitingTps = writeLimitingTps; - } - - auto const averageTransactionReadCost = getAverageTransactionCost(tag, OpType::READ); - auto const averageTransactionWriteCost = getAverageTransactionCost(tag, OpType::WRITE); - auto const readDesiredTps = getDesiredTps(tag, OpType::READ, averageTransactionReadCost); - auto const writeDesiredTps = getDesiredTps(tag, OpType::WRITE, averageTransactionWriteCost); - Optional<double> desiredTps; - if (readDesiredTps.present() && writeDesiredTps.present()) { - desiredTps = std::min(readDesiredTps.get(), writeDesiredTps.get()); - } else if (readDesiredTps.present()) { - desiredTps = readDesiredTps; - } else { - desiredTps = writeDesiredTps; - } - - if (!desiredTps.present()) { - return {}; - } - - if (readLimitingTps.present() && readLimitingTps.get() < readDesiredTps.orDefault(0)) { - isReadBusy = true; - } - if (writeLimitingTps.present() && writeLimitingTps.get() < writeDesiredTps.orDefault(0)) { - isWriteBusy = true; - } - - auto const readReservedTps = getReservedTps(tag, OpType::READ, averageTransactionReadCost); - auto const writeReservedTps = getReservedTps(tag, OpType::WRITE, averageTransactionWriteCost); - Optional<double> reservedTps; - if (readReservedTps.present() && writeReservedTps.present()) { - reservedTps = std::max(readReservedTps.get(), writeReservedTps.get()); - } else if (readReservedTps.present()) { - reservedTps = readReservedTps; - } else { - reservedTps = writeReservedTps; - } - - auto targetTps = desiredTps.get(); - if (limitingTps.present()) { - targetTps = std::min(targetTps, limitingTps.get()); - } - if (reservedTps.present()) { - targetTps = std::max(targetTps, reservedTps.get()); - } - return targetTps; - } - public: GlobalTagThrottlerImpl(Database db, UID id) : db(db), id(id) {} Future<Void> monitorThrottlingChanges() { return monitorThrottlingChanges(this); } @@ -472,22 +415,63 @@ public: lastBusyReadTagCount = lastBusyWriteTagCount = 0; for (auto& [tag, stats] : tagStatistics) { // Currently there is no differentiation between batch priority and default priority transactions - auto isReadBusy = false; - auto isWriteBusy = false; - auto const targetTps = getTargetTps(tag, isReadBusy, isWriteBusy); - if (isReadBusy) { - ++lastBusyReadTagCount; + auto const readLimitingTps = getLimitingTps(tag, OpType::READ); + auto const writeLimitingTps = getLimitingTps(tag, OpType::WRITE); + Optional<double> limitingTps; + if (readLimitingTps.present() && writeLimitingTps.present()) { + limitingTps = std::min(readLimitingTps.get(), writeLimitingTps.get()); + } else if (readLimitingTps.present()) { + limitingTps = readLimitingTps; + } else { + limitingTps = writeLimitingTps; } - if (isWriteBusy) { - ++lastBusyWriteTagCount; + + auto const averageTransactionReadCost = getAverageTransactionCost(tag, OpType::READ); + auto const averageTransactionWriteCost = getAverageTransactionCost(tag, OpType::WRITE); + auto const readDesiredTps = getDesiredTps(tag, OpType::READ, averageTransactionReadCost); + auto const writeDesiredTps = getDesiredTps(tag, OpType::WRITE, averageTransactionWriteCost); + Optional<double> desiredTps; + if (readDesiredTps.present() && writeDesiredTps.present()) { + desiredTps = std::min(readDesiredTps.get(), writeDesiredTps.get()); + } else if (readDesiredTps.present()) { + desiredTps = readDesiredTps; + } else { + desiredTps = writeDesiredTps; } - if (!targetTps.present()) { + + if (!desiredTps.present()) { continue; } - auto const clientRate = stats.updateAndGetPerClientLimit(targetTps.get()); + + if (readLimitingTps.present() && readLimitingTps.get() < readDesiredTps.orDefault(0)) { + ++lastBusyReadTagCount; + } + if (writeLimitingTps.present() && writeLimitingTps.get() < writeDesiredTps.orDefault(0)) { + ++lastBusyWriteTagCount; + } + + auto const readReservedTps = getReservedTps(tag, OpType::READ, averageTransactionReadCost); + auto const writeReservedTps = getReservedTps(tag, OpType::WRITE, averageTransactionWriteCost); + Optional<double> reservedTps; + if (readReservedTps.present() && writeReservedTps.present()) { + reservedTps = std::max(readReservedTps.get(), writeReservedTps.get()); + } else if (readReservedTps.present()) { + reservedTps = readReservedTps; + } else { + reservedTps = writeReservedTps; + } + + auto targetTps = desiredTps.get(); + if (limitingTps.present()) { + targetTps = std::min(targetTps, limitingTps.get()); + } + if (reservedTps.present()) { + targetTps = std::max(targetTps, reservedTps.get()); + } + + auto const clientRate = stats.updateAndGetPerClientLimit(targetTps); result[TransactionPriority::BATCH][tag] = result[TransactionPriority::DEFAULT][tag] = clientRate; - /* TraceEvent("GlobalTagThrottler_GotClientRate", id) .detail("Tag", printable(tag)) .detail("TargetTps", targetTps) @@ -498,10 +482,10 @@ public: .detail("ReservedTps", reservedTps) .detail("DesiredTps", desiredTps) .detail("NumStorageServers", throughput.size()); - */ } return result; } + int64_t autoThrottleCount() const { int64_t result{ 0 }; for (const auto& [tag, stats] : tagStatistics) {