Refactor profile command, remove profile heap|flow commands

This commit is contained in:
Chaoguang Lin 2021-07-12 20:08:35 +00:00
parent a08f0e9aa6
commit 1aa12faf01
5 changed files with 158 additions and 241 deletions

View File

@ -11,6 +11,7 @@ set(FDBCLI_SRCS
ForceRecoveryWithDataLossCommand.actor.cpp
KillCommand.actor.cpp
MaintenanceCommand.actor.cpp
ProfileCommand.actor.cpp
SetClassCommand.actor.cpp
SnapshotCommand.actor.cpp
SuspendCommand.actor.cpp

View File

@ -32,7 +32,9 @@ namespace fdb_cli {
const KeyRef consistencyCheckSpecialKey = LiteralStringRef("\xff\xff/management/consistency_check_suspended");
ACTOR Future<bool> consistencyCheckCommandActor(Reference<ITransaction> tr, std::vector<StringRef> tokens) {
ACTOR Future<bool> consistencyCheckCommandActor(Reference<ITransaction> tr,
std::vector<StringRef> tokens,
bool intrans) {
// Here we do not proceed in a try-catch loop since the transaction is always supposed to succeed.
// If not, the outer loop catch block(fdbcli.actor.cpp) will handle the error and print out the error message
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
@ -41,10 +43,12 @@ ACTOR Future<bool> consistencyCheckCommandActor(Reference<ITransaction> tr, std:
printf("ConsistencyCheck is %s\n", suspended.present() ? "off" : "on");
} else if (tokens.size() == 2 && tokencmp(tokens[1], "off")) {
tr->set(consistencyCheckSpecialKey, Value());
wait(safeThreadFutureToFuture(tr->commit()));
if (!intrans)
wait(safeThreadFutureToFuture(tr->commit()));
} else if (tokens.size() == 2 && tokencmp(tokens[1], "on")) {
tr->clear(consistencyCheckSpecialKey);
wait(safeThreadFutureToFuture(tr->commit()));
if (!intrans)
wait(safeThreadFutureToFuture(tr->commit()));
} else {
printUsage(tokens[0]);
return false;

View File

@ -0,0 +1,141 @@
/*
* ProfileCommand.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "boost/lexical_cast.hpp"
#include "fdbcli/fdbcli.actor.h"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/IClientApi.h"
#include "fdbclient/Knobs.h"
#include "fdbclient/Tuple.h"
#include "flow/Arena.h"
#include "flow/FastRef.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
namespace fdb_cli {
ACTOR Future<bool> profileCommandActor(Reference<ITransaction> tr, std::vector<StringRef> tokens, bool intrans) {
state bool result = true;
if (tokens.size() == 1) {
fprintf(stderr, "ERROR: Usage: profile <client|list|flow|heap>\n");
result = false;
} else if (tokencmp(tokens[1], "client")) {
if (tokens.size() == 2) {
fprintf(stderr, "ERROR: Usage: profile client <get|set>\n");
return false;
}
if (tokencmp(tokens[2], "get")) {
if (tokens.size() != 3) {
fprintf(stderr, "ERROR: Addtional arguments to `get` are not supported.\n");
return false;
}
state std::string sampleRateStr = "default";
state std::string sizeLimitStr = "default";
Optional<Value> sampleRateValue =
wait(safeThreadFutureToFuture(tr->get(GlobalConfig::prefixedKey(fdbClientInfoTxnSampleRate))));
if (sampleRateValue.present() &&
!std::isinf(boost::lexical_cast<double>(sampleRateValue.get().toString()))) {
sampleRateStr = sampleRateValue.get().toString();
}
Optional<Value> sizeLimitValue =
wait(safeThreadFutureToFuture(tr->get(GlobalConfig::prefixedKey(fdbClientInfoTxnSizeLimit))));
if (sizeLimitValue.present() && boost::lexical_cast<int64_t>(sizeLimitValue.get().toString()) != -1) {
sizeLimitStr = sizeLimitValue.get().toString();
}
printf("Client profiling rate is set to %s and size limit is set to %s.\n",
sampleRateStr.c_str(),
sizeLimitStr.c_str());
} else if (tokencmp(tokens[2], "set")) {
if (tokens.size() != 5) {
fprintf(stderr, "ERROR: Usage: profile client set <RATE|default> <SIZE|default>\n");
return false;
}
double sampleRate;
if (tokencmp(tokens[3], "default")) {
sampleRate = std::numeric_limits<double>::infinity();
} else {
char* end;
sampleRate = std::strtod((const char*)tokens[3].begin(), &end);
if (!std::isspace(*end)) {
fprintf(stderr, "ERROR: %s failed to parse.\n", printable(tokens[3]).c_str());
return false;
}
}
int64_t sizeLimit;
if (tokencmp(tokens[4], "default")) {
sizeLimit = -1;
} else {
Optional<uint64_t> parsed = parse_with_suffix(tokens[4].toString());
if (parsed.present()) {
sizeLimit = parsed.get();
} else {
fprintf(stderr, "ERROR: `%s` failed to parse.\n", printable(tokens[4]).c_str());
return false;
}
}
Tuple rate = Tuple().appendDouble(sampleRate);
Tuple size = Tuple().append(sizeLimit);
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSampleRate), rate.pack());
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSizeLimit), size.pack());
if (!intrans) {
wait(safeThreadFutureToFuture(tr->commit()));
}
} else {
fprintf(stderr, "ERROR: Unknown action: %s\n", printable(tokens[2]).c_str());
result = false;
}
} else if (tokencmp(tokens[1], "list")) {
if (tokens.size() != 2) {
fprintf(stderr, "ERROR: Usage: profile list\n");
return false;
}
// Hold the reference to the standalone's memory
state ThreadFuture<RangeResult> kvsFuture =
tr->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces/"),
LiteralStringRef("\xff\xff/worker_interfaces0")),
CLIENT_KNOBS->TOO_MANY);
RangeResult kvs = wait(safeThreadFutureToFuture(kvsFuture));
ASSERT(!kvs.more);
for (const auto& pair : kvs) {
auto ip_port =
(pair.key.endsWith(LiteralStringRef(":tls")) ? pair.key.removeSuffix(LiteralStringRef(":tls"))
: pair.key)
.removePrefix(LiteralStringRef("\xff\xff/worker_interfaces/"));
printf("%s\n", printable(ip_port).c_str());
}
} else {
fprintf(stderr, "ERROR: Unknown type: %s\n", printable(tokens[1]).c_str());
result = false;
}
return result;
}
CommandFactory profileFactory("profile",
CommandHelp("profile <client|list> <action> <ARGS>",
"namespace for all the profiling-related commands.",
"Different types support different actions. Run `profile` to get a list of "
"types, and iteratively explore the help.\n"));
} // namespace fdb_cli

View File

@ -623,10 +623,6 @@ void initHelp() {
helpMap["writemode"] = CommandHelp("writemode <on|off>",
"enables or disables sets and clears",
"Setting or clearing keys from the CLI is not recommended.");
helpMap["profile"] = CommandHelp("profile <client|list|flow|heap> <action> <ARGS>",
"namespace for all the profiling-related commands.",
"Different types support different actions. Run `profile` to get a list of "
"types, and iteratively explore the help.\n");
helpMap["lock"] = CommandHelp(
"lock",
"lock the database with a randomly generated lockUID",
@ -3742,246 +3738,17 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
if (tokencmp(tokens[0], "consistencycheck")) {
getTransaction(db, tr, tr2, options, intrans);
bool _result = wait(makeInterruptable(consistencyCheckCommandActor(tr2, tokens)));
bool _result = wait(makeInterruptable(consistencyCheckCommandActor(tr2, tokens, intrans)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "profile")) {
if (tokens.size() == 1) {
fprintf(stderr, "ERROR: Usage: profile <client|list|flow|heap>\n");
getTransaction(db, tr, tr2, options, intrans);
bool _result = wait(makeInterruptable(profileCommandActor(tr2, tokens, intrans)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[1], "client")) {
getTransaction(db, tr, options, intrans);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (tokens.size() == 2) {
fprintf(stderr, "ERROR: Usage: profile client <get|set>\n");
is_error = true;
continue;
}
wait(makeInterruptable(GlobalConfig::globalConfig().onInitialized()));
if (tokencmp(tokens[2], "get")) {
if (tokens.size() != 3) {
fprintf(stderr, "ERROR: Addtional arguments to `get` are not supported.\n");
is_error = true;
continue;
}
const double sampleRateDbl = GlobalConfig::globalConfig().get<double>(
fdbClientInfoTxnSampleRate, std::numeric_limits<double>::infinity());
const int64_t sizeLimit =
GlobalConfig::globalConfig().get<int64_t>(fdbClientInfoTxnSizeLimit, -1);
std::string sampleRateStr = "default", sizeLimitStr = "default";
if (!std::isinf(sampleRateDbl)) {
sampleRateStr = boost::lexical_cast<std::string>(sampleRateDbl);
}
if (sizeLimit != -1) {
sizeLimitStr = boost::lexical_cast<std::string>(sizeLimit);
}
printf("Client profiling rate is set to %s and size limit is set to %s.\n",
sampleRateStr.c_str(),
sizeLimitStr.c_str());
continue;
}
if (tokencmp(tokens[2], "set")) {
if (tokens.size() != 5) {
fprintf(stderr, "ERROR: Usage: profile client set <RATE|default> <SIZE|default>\n");
is_error = true;
continue;
}
double sampleRate;
if (tokencmp(tokens[3], "default")) {
sampleRate = std::numeric_limits<double>::infinity();
} else {
char* end;
sampleRate = std::strtod((const char*)tokens[3].begin(), &end);
if (!std::isspace(*end)) {
fprintf(stderr, "ERROR: %s failed to parse.\n", printable(tokens[3]).c_str());
is_error = true;
continue;
}
}
int64_t sizeLimit;
if (tokencmp(tokens[4], "default")) {
sizeLimit = -1;
} else {
Optional<uint64_t> parsed = parse_with_suffix(tokens[4].toString());
if (parsed.present()) {
sizeLimit = parsed.get();
} else {
fprintf(stderr, "ERROR: `%s` failed to parse.\n", printable(tokens[4]).c_str());
is_error = true;
continue;
}
}
Tuple rate = Tuple().appendDouble(sampleRate);
Tuple size = Tuple().append(sizeLimit);
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSampleRate), rate.pack());
tr->set(GlobalConfig::prefixedKey(fdbClientInfoTxnSizeLimit), size.pack());
if (!intrans) {
wait(commitTransaction(tr));
}
continue;
}
fprintf(stderr, "ERROR: Unknown action: %s\n", printable(tokens[2]).c_str());
is_error = true;
continue;
}
if (tokencmp(tokens[1], "list")) {
if (tokens.size() != 2) {
fprintf(stderr, "ERROR: Usage: profile list\n");
is_error = true;
continue;
}
getTransaction(db, tr, options, intrans);
RangeResult kvs = wait(
makeInterruptable(tr->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces/"),
LiteralStringRef("\xff\xff/worker_interfaces0")),
CLIENT_KNOBS->TOO_MANY)));
ASSERT(!kvs.more);
for (const auto& pair : kvs) {
auto ip_port = (pair.key.endsWith(LiteralStringRef(":tls"))
? pair.key.removeSuffix(LiteralStringRef(":tls"))
: pair.key)
.removePrefix(LiteralStringRef("\xff\xff/worker_interfaces/"));
printf("%s\n", printable(ip_port).c_str());
}
continue;
}
if (tokencmp(tokens[1], "flow")) {
if (tokens.size() == 2) {
fprintf(stderr, "ERROR: Usage: profile flow <run>\n");
is_error = true;
continue;
}
if (tokencmp(tokens[2], "run")) {
if (tokens.size() < 6) {
fprintf(
stderr,
"ERROR: Usage: profile flow run <DURATION_IN_SECONDS> <FILENAME> <PROCESS...>\n");
is_error = true;
continue;
}
getTransaction(db, tr, options, intrans);
RangeResult kvs = wait(makeInterruptable(
tr->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces/"),
LiteralStringRef("\xff\xff/worker_interfaces0")),
CLIENT_KNOBS->TOO_MANY)));
ASSERT(!kvs.more);
char* duration_end;
int duration = std::strtol((const char*)tokens[3].begin(), &duration_end, 10);
if (!std::isspace(*duration_end)) {
fprintf(
stderr, "ERROR: Failed to parse %s as an integer.", printable(tokens[3]).c_str());
is_error = true;
continue;
}
std::map<Key, ClientWorkerInterface> interfaces;
state std::vector<Key> all_profiler_addresses;
state std::vector<Future<ErrorOr<Void>>> all_profiler_responses;
for (const auto& pair : kvs) {
auto ip_port = (pair.key.endsWith(LiteralStringRef(":tls"))
? pair.key.removeSuffix(LiteralStringRef(":tls"))
: pair.key)
.removePrefix(LiteralStringRef("\xff\xff/worker_interfaces/"));
interfaces.emplace(
ip_port,
BinaryReader::fromStringRef<ClientWorkerInterface>(pair.value, IncludeVersion()));
}
if (tokens.size() == 6 && tokencmp(tokens[5], "all")) {
for (const auto& pair : interfaces) {
ProfilerRequest profileRequest(
ProfilerRequest::Type::FLOW, ProfilerRequest::Action::RUN, duration);
profileRequest.outputFile = tokens[4];
all_profiler_addresses.push_back(pair.first);
all_profiler_responses.push_back(pair.second.profiler.tryGetReply(profileRequest));
}
} else {
for (int tokenidx = 5; tokenidx < tokens.size(); tokenidx++) {
auto element = interfaces.find(tokens[tokenidx]);
if (element == interfaces.end()) {
fprintf(stderr,
"ERROR: process '%s' not recognized.\n",
printable(tokens[tokenidx]).c_str());
is_error = true;
}
}
if (!is_error) {
for (int tokenidx = 5; tokenidx < tokens.size(); tokenidx++) {
ProfilerRequest profileRequest(
ProfilerRequest::Type::FLOW, ProfilerRequest::Action::RUN, duration);
profileRequest.outputFile = tokens[4];
all_profiler_addresses.push_back(tokens[tokenidx]);
all_profiler_responses.push_back(
interfaces[tokens[tokenidx]].profiler.tryGetReply(profileRequest));
}
}
}
if (!is_error) {
wait(waitForAll(all_profiler_responses));
for (int i = 0; i < all_profiler_responses.size(); i++) {
const ErrorOr<Void>& err = all_profiler_responses[i].get();
if (err.isError()) {
fprintf(stderr,
"ERROR: %s: %s: %s\n",
printable(all_profiler_addresses[i]).c_str(),
err.getError().name(),
err.getError().what());
}
}
}
all_profiler_addresses.clear();
all_profiler_responses.clear();
continue;
}
}
if (tokencmp(tokens[1], "heap")) {
if (tokens.size() != 3) {
fprintf(stderr, "ERROR: Usage: profile heap <PROCESS>\n");
is_error = true;
continue;
}
getTransaction(db, tr, options, intrans);
RangeResult kvs = wait(
makeInterruptable(tr->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces/"),
LiteralStringRef("\xff\xff/worker_interfaces0")),
CLIENT_KNOBS->TOO_MANY)));
ASSERT(!kvs.more);
std::map<Key, ClientWorkerInterface> interfaces;
for (const auto& pair : kvs) {
auto ip_port = (pair.key.endsWith(LiteralStringRef(":tls"))
? pair.key.removeSuffix(LiteralStringRef(":tls"))
: pair.key)
.removePrefix(LiteralStringRef("\xff\xff/worker_interfaces/"));
interfaces.emplace(
ip_port,
BinaryReader::fromStringRef<ClientWorkerInterface>(pair.value, IncludeVersion()));
}
state Key ip_port = tokens[2];
if (interfaces.find(ip_port) == interfaces.end()) {
fprintf(stderr, "ERROR: host %s not found\n", printable(ip_port).c_str());
is_error = true;
continue;
}
ProfilerRequest profileRequest(
ProfilerRequest::Type::GPROF_HEAP, ProfilerRequest::Action::RUN, 0);
profileRequest.outputFile = LiteralStringRef("heapz");
ErrorOr<Void> response = wait(interfaces[ip_port].profiler.tryGetReply(profileRequest));
if (response.isError()) {
fprintf(stderr,
"ERROR: %s: %s: %s\n",
printable(ip_port).c_str(),
response.getError().name(),
response.getError().what());
}
continue;
}
fprintf(stderr, "ERROR: Unknown type: %s\n", printable(tokens[1]).c_str());
is_error = true;
continue;
}

View File

@ -100,7 +100,9 @@ ACTOR Future<bool> advanceVersionCommandActor(Reference<IDatabase> db, std::vect
// cache_range command
ACTOR Future<bool> cacheRangeCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
// consistency command
ACTOR Future<bool> consistencyCheckCommandActor(Reference<ITransaction> tr, std::vector<StringRef> tokens);
ACTOR Future<bool> consistencyCheckCommandActor(Reference<ITransaction> tr,
std::vector<StringRef> tokens,
bool intrans);
// datadistribution command
ACTOR Future<bool> dataDistributionCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
// expensive_data_check command
@ -122,6 +124,8 @@ ACTOR Future<bool> clearHealthyZone(Reference<IDatabase> db,
bool printWarning = false,
bool clearSSFailureZoneString = false);
ACTOR Future<bool> maintenanceCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
// profile command
ACTOR Future<bool> profileCommandActor(Reference<ITransaction> tr, std::vector<StringRef> tokens, bool intrans);
// setclass command
ACTOR Future<bool> setClassCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
// snapshot command