Add fdbcli quota command

This commit is contained in:
sfc-gh-tclinkenbeard 2022-06-16 14:07:16 -07:00
parent fabc751b0d
commit 99d243197e
6 changed files with 201 additions and 1 deletions

View File

@ -20,6 +20,7 @@ set(FDBCLI_SRCS
ChangeFeedCommand.actor.cpp
MaintenanceCommand.actor.cpp
ProfileCommand.actor.cpp
QuotaCommand.actor.cpp
SetClassCommand.actor.cpp
SnapshotCommand.actor.cpp
StatusCommand.actor.cpp

View File

@ -0,0 +1,177 @@
/*
* QuotaCommand.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbcli/fdbcli.actor.h"
#include "flow/actorcompiler.h" // This must be the last include
namespace {
enum class LimitType { RESERVED, TOTAL };
enum class OpType { READ, WRITE };
Optional<TransactionTag> parseTag(StringRef token) {
if (token.size() > CLIENT_KNOBS->MAX_TRANSACTION_TAG_LENGTH) {
return {};
} else {
return token;
}
}
Optional<LimitType> parseLimitType(StringRef token) {
if (token == "reserved"_sr) {
return LimitType::RESERVED;
} else if (token == "total"_sr) {
return LimitType::TOTAL;
} else {
return {};
}
}
Optional<OpType> parseOpType(StringRef token) {
if (token == "read"_sr) {
return OpType::READ;
} else if (token == "write"_sr) {
return OpType::WRITE;
} else {
return {};
}
}
Optional<double> parseLimitValue(StringRef token) {
try {
return std::stod(token.toString());
} catch (...) {
return {};
}
}
ACTOR Future<Void> getQuota(Reference<IDatabase> db, TransactionTag tag, LimitType limitType, OpType opType) {
state Reference<ITransaction> tr = db->createTransaction();
loop {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
try {
state ThreadFuture<Optional<Value>> resultFuture = tr->get(tag.withPrefix(tagQuotaPrefix));
Optional<Value> v = wait(safeThreadFutureToFuture(resultFuture));
if (!v.present()) {
fmt::print("<empty>\n");
} else {
auto const quota = ThrottleApi::TagQuotaValue::fromValue(v.get());
if (limitType == LimitType::TOTAL && opType == OpType::READ) {
fmt::print("{}\n", quota.totalReadQuota);
} else if (limitType == LimitType::TOTAL && opType == OpType::WRITE) {
fmt::print("{}\n", quota.totalWriteQuota);
} else if (limitType == LimitType::RESERVED && opType == OpType::READ) {
fmt::print("{}\n", quota.reservedReadQuota);
} else if (limitType == LimitType::RESERVED && opType == OpType::WRITE) {
fmt::print("{}\n", quota.reservedWriteQuota);
}
}
return Void();
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
ACTOR Future<Void> setQuota(Reference<IDatabase> db,
TransactionTag tag,
LimitType limitType,
OpType opType,
double value) {
state Reference<ITransaction> tr = db->createTransaction();
state Key key = tag.withPrefix(tagQuotaPrefix);
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
try {
state ThreadFuture<Optional<Value>> resultFuture = tr->get(key);
Optional<Value> v = wait(safeThreadFutureToFuture(resultFuture));
ThrottleApi::TagQuotaValue quota;
if (v.present()) {
quota = ThrottleApi::TagQuotaValue::fromValue(v.get());
}
if (limitType == LimitType::TOTAL && opType == OpType::READ) {
quota.totalReadQuota = value;
} else if (limitType == LimitType::TOTAL && opType == OpType::WRITE) {
quota.totalWriteQuota = value;
} else if (limitType == LimitType::RESERVED && opType == OpType::READ) {
quota.reservedReadQuota = value;
} else if (limitType == LimitType::RESERVED && opType == OpType::WRITE) {
quota.reservedWriteQuota = value;
}
ThrottleApi::setTagQuota(tr,
tag,
quota.reservedReadQuota,
quota.totalReadQuota,
quota.reservedWriteQuota,
quota.totalWriteQuota);
wait(safeThreadFutureToFuture(tr->commit()));
return Void();
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
constexpr auto usage = "Usage: quota [get|set] <tag> [reserved|total] [read|write] <value>\n";
} // namespace
namespace fdb_cli {
ACTOR Future<bool> quotaCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens) {
state bool result = true;
if (tokens.size() != 5 && tokens.size() != 6) {
printf(usage);
return false;
} else {
auto tag = parseTag(tokens[2]);
auto limitType = parseLimitType(tokens[3]);
auto opType = parseOpType(tokens[4]);
if (!tag.present() || !limitType.present() || !opType.present()) {
printf(usage);
return false;
}
if (tokens[1] == "get"_sr) {
if (tokens.size() != 5) {
printf(usage);
return false;
}
wait(getQuota(db, tag.get(), limitType.get(), opType.get()));
return true;
} else if (tokens[1] == "set"_sr) {
if (tokens.size() != 6) {
printf(usage);
return false;
}
auto const limitValue = parseLimitValue(tokens[5]);
if (!limitValue.present()) {
printf(usage);
return false;
}
wait(setQuota(db, tag.get(), limitType.get(), opType.get(), limitValue.get()));
return true;
} else {
printf(usage);
return false;
}
}
}
} // namespace fdb_cli

View File

@ -508,6 +508,9 @@ void initHelp() {
CommandHelp("getversion",
"Fetch the current read version",
"Displays the current read version of the database or currently running transaction.");
helpMap["quota"] = CommandHelp("quota",
"quota [get|set] <tag> [reserved|total] [read|write] <value>",
"Get or modify the throughput quota for the specified tag.");
helpMap["reset"] =
CommandHelp("reset",
"reset the current transaction",
@ -1467,6 +1470,14 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
continue;
}
if (tokencmp(tokens[0], "quota")) {
bool _result = wait(makeInterruptable(quotaCommandActor(db, tokens)));
if (!_result) {
is_error = true;
}
continue;
}
if (tokencmp(tokens[0], "reset")) {
if (tokens.size() != 1) {
printUsage(tokens[0]);

View File

@ -218,6 +218,8 @@ ACTOR Future<bool> profileCommandActor(Database db,
Reference<ITransaction> tr,
std::vector<StringRef> tokens,
bool intrans);
// quota command
ACTOR Future<bool> quotaCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
// setclass command
ACTOR Future<bool> setClassCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
// snapshot command

View File

@ -135,6 +135,11 @@ Key ThrottleApi::getTagQuotaKey(TransactionTagRef tag) {
return tag.withPrefix(tagQuotaPrefix);
}
bool ThrottleApi::TagQuotaValue::isValid() const {
return reservedReadQuota <= totalReadQuota && reservedWriteQuota <= totalWriteQuota && reservedReadQuota >= 0 &&
reservedWriteQuota >= 0;
}
Value ThrottleApi::TagQuotaValue::toValue() const {
Tuple tuple;
tuple.appendDouble(reservedReadQuota);
@ -159,7 +164,7 @@ ThrottleApi::TagQuotaValue ThrottleApi::TagQuotaValue::fromValue(ValueRef value)
TraceEvent(SevWarnAlways, "TagQuotaValueFailedToDeserialize").error(e);
throw invalid_throttle_quota_value();
}
if (result.reservedReadQuota > result.totalReadQuota || result.reservedWriteQuota > result.totalWriteQuota) {
if (!result.isValid()) {
TraceEvent(SevWarnAlways, "TagQuotaValueInvalidQuotas")
.detail("ReservedReadQuota", result.reservedReadQuota)
.detail("TotalReadQuota", result.totalReadQuota)

View File

@ -603,6 +603,7 @@ public:
double totalReadQuota{ 0.0 };
double reservedWriteQuota{ 0.0 };
double totalWriteQuota{ 0.0 };
bool isValid() const;
Value toValue() const;
static TagQuotaValue fromValue(ValueRef);
};
@ -621,6 +622,9 @@ void setTagQuota(Reference<Tr> tr,
tagQuotaValue.totalReadQuota = totalReadQuota;
tagQuotaValue.reservedWriteQuota = reservedWriteQuota;
tagQuotaValue.totalWriteQuota = totalWriteQuota;
if (!tagQuotaValue.isValid()) {
throw invalid_throttle_quota_value();
}
tr->set(getTagQuotaKey(tag), tagQuotaValue.toValue());
signalThrottleChange(tr);
}