mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-14 18:02:31 +08:00
Merge pull request #4409 from sfc-gh-etschannen/master
Merge Release 6.3 into Master
This commit is contained in:
commit
a7d8b833ee
@ -95,6 +95,7 @@ void fdb_flow_test() {
|
||||
g_network->run();
|
||||
}
|
||||
|
||||
// FDB object used by bindings
|
||||
namespace FDB {
|
||||
class DatabaseImpl : public Database, NonCopyable {
|
||||
public:
|
||||
|
@ -132,7 +132,7 @@ If you suspect that a client process's workload may be saturating the network th
|
||||
Multi-threaded Client
|
||||
=====================
|
||||
|
||||
FoundationDB client library can start multiple worker threads for each version of client that is loaded. Every single cluster will be serviced by a one client thread. If the client is connected to only one cluster, exactly one thread would be active and the rest will remain idle. Hence, using this feature is useful when the client is actively using more than one cluster.
|
||||
FoundationDB client library can start multiple worker threads for each version of client that is loaded. Every single cluster will be serviced by one of the client threads. If the client is connected to only one cluster, exactly one thread would be active and the rest will remain idle. Hence, using this feature is useful when the client is actively using more than one cluster.
|
||||
|
||||
Clients can be configured to use worker-threads by setting the ``FDBNetworkOptions::CLIENT_THREADS_PER_VERSION`` option.
|
||||
|
||||
|
@ -2,6 +2,10 @@
|
||||
Release Notes
|
||||
#############
|
||||
|
||||
6.2.32
|
||||
======
|
||||
* Fix an issue where symbolic links in cmake-built RPMs are broken if you unpack the RPM to a custom directory. `(PR #4380) <https://github.com/apple/foundationdb/pull/4380>`_
|
||||
|
||||
6.2.31
|
||||
======
|
||||
* Fix a rare invalid memory access on data distributor when snapshotting large clusters. This is a follow up to `PR #4076 <https://github.com/apple/foundationdb/pull/4076>`_. `(PR #4317) <https://github.com/apple/foundationdb/pull/4317>`_
|
||||
|
@ -19,34 +19,18 @@ target_link_libraries(fdbdecode PRIVATE fdbclient)
|
||||
if(NOT OPEN_FOR_IDE)
|
||||
if(GENERATE_DEBUG_PACKAGES)
|
||||
fdb_install(TARGETS fdbbackup DESTINATION bin COMPONENT clients)
|
||||
fdb_install(PROGRAMS $<TARGET_FILE:fdbbackup> DESTINATION fdbmonitor COMPONENT clients RENAME backup_agent/backup_agent)
|
||||
fdb_install(PROGRAMS $<TARGET_FILE:fdbbackup> DESTINATION bin COMPONENT clients RENAME fdbrestore)
|
||||
fdb_install(PROGRAMS $<TARGET_FILE:fdbbackup> DESTINATION bin COMPONENT clients RENAME dr_agent)
|
||||
fdb_install(PROGRAMS $<TARGET_FILE:fdbbackup> DESTINATION bin COMPONENT clients RENAME fdbdr)
|
||||
else()
|
||||
add_custom_target(prepare_fdbbackup_install ALL DEPENDS strip_only_fdbbackup)
|
||||
fdb_install(PROGRAMS ${CMAKE_BINARY_DIR}/packages/bin/fdbbackup DESTINATION bin COMPONENT clients)
|
||||
fdb_install(PROGRAMS ${CMAKE_BINARY_DIR}/packages/bin/fdbbackup DESTINATION fdbmonitor COMPONENT clients RENAME backup_agent/backup_agent)
|
||||
fdb_install(PROGRAMS ${CMAKE_BINARY_DIR}/packages/bin/fdbbackup DESTINATION bin COMPONENT clients RENAME fdbrestore)
|
||||
fdb_install(PROGRAMS ${CMAKE_BINARY_DIR}/packages/bin/fdbbackup DESTINATION bin COMPONENT clients RENAME dr_agent)
|
||||
fdb_install(PROGRAMS ${CMAKE_BINARY_DIR}/packages/bin/fdbbackup DESTINATION bin COMPONENT clients RENAME fdbdr)
|
||||
endif()
|
||||
install_symlink(
|
||||
COMPONENT clients
|
||||
FILE_DIR bin
|
||||
LINK_DIR fdbmonitor
|
||||
FILE_NAME fdbbackup
|
||||
LINK_NAME backup_agent/backup_agent)
|
||||
install_symlink(
|
||||
COMPONENT clients
|
||||
FILE_DIR bin
|
||||
LINK_DIR bin
|
||||
FILE_NAME fdbbackup
|
||||
LINK_NAME fdbrestore)
|
||||
install_symlink(
|
||||
COMPONENT clients
|
||||
FILE_DIR bin
|
||||
LINK_DIR bin
|
||||
FILE_NAME fdbbackup
|
||||
LINK_NAME dr_agent)
|
||||
install_symlink(
|
||||
COMPONENT clients
|
||||
FILE_DIR bin
|
||||
LINK_DIR bin
|
||||
FILE_NAME fdbbackup
|
||||
LINK_NAME fdbdr)
|
||||
symlink_files(
|
||||
LOCATION packages/bin
|
||||
SOURCE fdbbackup
|
||||
|
@ -148,7 +148,7 @@ public:
|
||||
if(transactionItr != transactionOptions.legalOptions.end())
|
||||
setTransactionOption(tr, transactionItr->second, enabled, arg, intrans);
|
||||
else {
|
||||
printf("ERROR: invalid option '%s'. Try `help options' for a list of available options.\n", optionStr.toString().c_str());
|
||||
fprintf(stderr, "ERROR: invalid option '%s'. Try `help options' for a list of available options.\n", optionStr.toString().c_str());
|
||||
throw invalid_option();
|
||||
}
|
||||
}
|
||||
@ -182,7 +182,7 @@ private:
|
||||
//Sets a transaction option. If intrans == true, then this option is also applied to the passed in transaction.
|
||||
void setTransactionOption(Reference<ReadYourWritesTransaction> tr, FDBTransactionOptions::Option option, bool enabled, Optional<StringRef> arg, bool intrans) {
|
||||
if(enabled && arg.present() != FDBTransactionOptions::optionInfo.getMustExist(option).hasParameter) {
|
||||
printf("ERROR: option %s a parameter\n", arg.present() ? "did not expect" : "expected");
|
||||
fprintf(stderr, "ERROR: option %s a parameter\n", arg.present() ? "did not expect" : "expected");
|
||||
throw invalid_option_value();
|
||||
}
|
||||
|
||||
@ -663,7 +663,7 @@ void printUsage(StringRef command) {
|
||||
if (i != helpMap.end())
|
||||
printf("Usage: %s\n", i->second.usage.c_str());
|
||||
else
|
||||
printf("ERROR: Unknown command `%s'\n", command.toString().c_str());
|
||||
fprintf(stderr, "ERROR: Unknown command `%s'\n", command.toString().c_str());
|
||||
}
|
||||
|
||||
std::string getCoordinatorsInfoString(StatusObjectReader statusObj) {
|
||||
@ -804,7 +804,7 @@ std::pair<int, int> getNumOfNonExcludedProcessAndZones(StatusObjectReader status
|
||||
|
||||
void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level, bool displayDatabaseAvailable = true, bool hideErrorMessages = false) {
|
||||
if (FlowTransport::transport().incompatibleOutgoingConnectionsPresent()) {
|
||||
printf("WARNING: One or more of the processes in the cluster is incompatible with this version of fdbcli.\n\n");
|
||||
fprintf(stderr, "WARNING: One or more of the processes in the cluster is incompatible with this version of fdbcli.\n\n");
|
||||
}
|
||||
|
||||
try {
|
||||
@ -1730,7 +1730,7 @@ void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level,
|
||||
|
||||
bool upToDate;
|
||||
if (!statusObjClient.get("cluster_file.up_to_date", upToDate) || !upToDate){
|
||||
printf("WARNING: The cluster file is not up to date. Type 'status' for more information.\n");
|
||||
fprintf(stderr, "WARNING: The cluster file is not up to date. Type 'status' for more information.\n");
|
||||
}
|
||||
}
|
||||
catch (std::runtime_error& ){
|
||||
@ -1933,11 +1933,11 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere
|
||||
ret=true;
|
||||
break;
|
||||
case ConfigurationResult::INVALID_CONFIGURATION:
|
||||
printf("ERROR: These changes would make the configuration invalid\n");
|
||||
fprintf(stderr, "ERROR: These changes would make the configuration invalid\n");
|
||||
ret=true;
|
||||
break;
|
||||
case ConfigurationResult::DATABASE_ALREADY_CREATED:
|
||||
printf("ERROR: Database already exists! To change configuration, don't say `new'\n");
|
||||
fprintf(stderr, "ERROR: Database already exists! To change configuration, don't say `new'\n");
|
||||
ret=true;
|
||||
break;
|
||||
case ConfigurationResult::DATABASE_CREATED:
|
||||
@ -1945,43 +1945,43 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere
|
||||
ret=false;
|
||||
break;
|
||||
case ConfigurationResult::DATABASE_UNAVAILABLE:
|
||||
printf("ERROR: The database is unavailable\n");
|
||||
printf("Type `configure FORCE <TOKEN...>' to configure without this check\n");
|
||||
fprintf(stderr, "ERROR: The database is unavailable\n");
|
||||
fprintf(stderr, "Type `configure FORCE <TOKEN...>' to configure without this check\n");
|
||||
ret=true;
|
||||
break;
|
||||
case ConfigurationResult::STORAGE_IN_UNKNOWN_DCID:
|
||||
printf("ERROR: All storage servers must be in one of the known regions\n");
|
||||
printf("Type `configure FORCE <TOKEN...>' to configure without this check\n");
|
||||
fprintf(stderr, "ERROR: All storage servers must be in one of the known regions\n");
|
||||
fprintf(stderr, "Type `configure FORCE <TOKEN...>' to configure without this check\n");
|
||||
ret=true;
|
||||
break;
|
||||
case ConfigurationResult::REGION_NOT_FULLY_REPLICATED:
|
||||
printf("ERROR: When usable_regions > 1, all regions with priority >= 0 must be fully replicated before changing the configuration\n");
|
||||
printf("Type `configure FORCE <TOKEN...>' to configure without this check\n");
|
||||
fprintf(stderr, "ERROR: When usable_regions > 1, all regions with priority >= 0 must be fully replicated before changing the configuration\n");
|
||||
fprintf(stderr, "Type `configure FORCE <TOKEN...>' to configure without this check\n");
|
||||
ret=true;
|
||||
break;
|
||||
case ConfigurationResult::MULTIPLE_ACTIVE_REGIONS:
|
||||
printf("ERROR: When changing usable_regions, only one region can have priority >= 0\n");
|
||||
printf("Type `configure FORCE <TOKEN...>' to configure without this check\n");
|
||||
fprintf(stderr, "ERROR: When changing usable_regions, only one region can have priority >= 0\n");
|
||||
fprintf(stderr, "Type `configure FORCE <TOKEN...>' to configure without this check\n");
|
||||
ret=true;
|
||||
break;
|
||||
case ConfigurationResult::REGIONS_CHANGED:
|
||||
printf("ERROR: The region configuration cannot be changed while simultaneously changing usable_regions\n");
|
||||
printf("Type `configure FORCE <TOKEN...>' to configure without this check\n");
|
||||
fprintf(stderr, "ERROR: The region configuration cannot be changed while simultaneously changing usable_regions\n");
|
||||
fprintf(stderr, "Type `configure FORCE <TOKEN...>' to configure without this check\n");
|
||||
ret=true;
|
||||
break;
|
||||
case ConfigurationResult::NOT_ENOUGH_WORKERS:
|
||||
printf("ERROR: Not enough processes exist to support the specified configuration\n");
|
||||
printf("Type `configure FORCE <TOKEN...>' to configure without this check\n");
|
||||
fprintf(stderr, "ERROR: Not enough processes exist to support the specified configuration\n");
|
||||
fprintf(stderr, "Type `configure FORCE <TOKEN...>' to configure without this check\n");
|
||||
ret=true;
|
||||
break;
|
||||
case ConfigurationResult::REGION_REPLICATION_MISMATCH:
|
||||
printf("ERROR: `three_datacenter' replication is incompatible with region configuration\n");
|
||||
printf("Type `configure FORCE <TOKEN...>' to configure without this check\n");
|
||||
fprintf(stderr, "ERROR: `three_datacenter' replication is incompatible with region configuration\n");
|
||||
fprintf(stderr, "Type `configure FORCE <TOKEN...>' to configure without this check\n");
|
||||
ret=true;
|
||||
break;
|
||||
case ConfigurationResult::DCID_MISSING:
|
||||
printf("ERROR: `No storage servers in one of the specified regions\n");
|
||||
printf("Type `configure FORCE <TOKEN...>' to configure without this check\n");
|
||||
fprintf(stderr, "ERROR: `No storage servers in one of the specified regions\n");
|
||||
fprintf(stderr, "Type `configure FORCE <TOKEN...>' to configure without this check\n");
|
||||
ret=true;
|
||||
break;
|
||||
case ConfigurationResult::SUCCESS:
|
||||
@ -1989,7 +1989,7 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere
|
||||
ret=false;
|
||||
break;
|
||||
case ConfigurationResult::LOCKED_NOT_NEW:
|
||||
printf("ERROR: `only new databases can be configured as locked`\n");
|
||||
fprintf(stderr, "ERROR: `only new databases can be configured as locked`\n");
|
||||
ret = true;
|
||||
break;
|
||||
default:
|
||||
@ -2003,11 +2003,11 @@ ACTOR Future<bool> fileConfigure(Database db, std::string filePath, bool isNewDa
|
||||
std::string contents(readFileBytes(filePath, 100000));
|
||||
json_spirit::mValue config;
|
||||
if(!json_spirit::read_string( contents, config )) {
|
||||
printf("ERROR: Invalid JSON\n");
|
||||
fprintf(stderr, "ERROR: Invalid JSON\n");
|
||||
return true;
|
||||
}
|
||||
if(config.type() != json_spirit::obj_type) {
|
||||
printf("ERROR: Configuration file must contain a JSON object\n");
|
||||
fprintf(stderr, "ERROR: Configuration file must contain a JSON object\n");
|
||||
return true;
|
||||
}
|
||||
StatusObject configJSON = config.get_obj();
|
||||
@ -2051,27 +2051,27 @@ ACTOR Future<bool> fileConfigure(Database db, std::string filePath, bool isNewDa
|
||||
bool ret;
|
||||
switch(result) {
|
||||
case ConfigurationResult::NO_OPTIONS_PROVIDED:
|
||||
printf("ERROR: No options provided\n");
|
||||
fprintf(stderr, "ERROR: No options provided\n");
|
||||
ret=true;
|
||||
break;
|
||||
case ConfigurationResult::CONFLICTING_OPTIONS:
|
||||
printf("ERROR: Conflicting options\n");
|
||||
fprintf(stderr, "ERROR: Conflicting options\n");
|
||||
ret=true;
|
||||
break;
|
||||
case ConfigurationResult::UNKNOWN_OPTION:
|
||||
printf("ERROR: Unknown option\n"); //This should not be possible because of schema match
|
||||
fprintf(stderr, "ERROR: Unknown option\n"); //This should not be possible because of schema match
|
||||
ret=true;
|
||||
break;
|
||||
case ConfigurationResult::INCOMPLETE_CONFIGURATION:
|
||||
printf("ERROR: Must specify both a replication level and a storage engine when creating a new database\n");
|
||||
fprintf(stderr, "ERROR: Must specify both a replication level and a storage engine when creating a new database\n");
|
||||
ret=true;
|
||||
break;
|
||||
case ConfigurationResult::INVALID_CONFIGURATION:
|
||||
printf("ERROR: These changes would make the configuration invalid\n");
|
||||
fprintf(stderr, "ERROR: These changes would make the configuration invalid\n");
|
||||
ret=true;
|
||||
break;
|
||||
case ConfigurationResult::DATABASE_ALREADY_CREATED:
|
||||
printf("ERROR: Database already exists! To change configuration, don't say `new'\n");
|
||||
fprintf(stderr, "ERROR: Database already exists! To change configuration, don't say `new'\n");
|
||||
ret=true;
|
||||
break;
|
||||
case ConfigurationResult::DATABASE_CREATED:
|
||||
@ -2079,42 +2079,42 @@ ACTOR Future<bool> fileConfigure(Database db, std::string filePath, bool isNewDa
|
||||
ret=false;
|
||||
break;
|
||||
case ConfigurationResult::DATABASE_UNAVAILABLE:
|
||||
printf("ERROR: The database is unavailable\n");
|
||||
fprintf(stderr, "ERROR: The database is unavailable\n");
|
||||
printf("Type `fileconfigure FORCE <FILENAME>' to configure without this check\n");
|
||||
ret=true;
|
||||
break;
|
||||
case ConfigurationResult::STORAGE_IN_UNKNOWN_DCID:
|
||||
printf("ERROR: All storage servers must be in one of the known regions\n");
|
||||
fprintf(stderr, "ERROR: All storage servers must be in one of the known regions\n");
|
||||
printf("Type `fileconfigure FORCE <FILENAME>' to configure without this check\n");
|
||||
ret=true;
|
||||
break;
|
||||
case ConfigurationResult::REGION_NOT_FULLY_REPLICATED:
|
||||
printf("ERROR: When usable_regions > 1, All regions with priority >= 0 must be fully replicated before changing the configuration\n");
|
||||
fprintf(stderr, "ERROR: When usable_regions > 1, All regions with priority >= 0 must be fully replicated before changing the configuration\n");
|
||||
printf("Type `fileconfigure FORCE <FILENAME>' to configure without this check\n");
|
||||
ret=true;
|
||||
break;
|
||||
case ConfigurationResult::MULTIPLE_ACTIVE_REGIONS:
|
||||
printf("ERROR: When changing usable_regions, only one region can have priority >= 0\n");
|
||||
fprintf(stderr, "ERROR: When changing usable_regions, only one region can have priority >= 0\n");
|
||||
printf("Type `fileconfigure FORCE <FILENAME>' to configure without this check\n");
|
||||
ret=true;
|
||||
break;
|
||||
case ConfigurationResult::REGIONS_CHANGED:
|
||||
printf("ERROR: The region configuration cannot be changed while simultaneously changing usable_regions\n");
|
||||
fprintf(stderr, "ERROR: The region configuration cannot be changed while simultaneously changing usable_regions\n");
|
||||
printf("Type `fileconfigure FORCE <FILENAME>' to configure without this check\n");
|
||||
ret=true;
|
||||
break;
|
||||
case ConfigurationResult::NOT_ENOUGH_WORKERS:
|
||||
printf("ERROR: Not enough processes exist to support the specified configuration\n");
|
||||
fprintf(stderr, "ERROR: Not enough processes exist to support the specified configuration\n");
|
||||
printf("Type `fileconfigure FORCE <FILENAME>' to configure without this check\n");
|
||||
ret=true;
|
||||
break;
|
||||
case ConfigurationResult::REGION_REPLICATION_MISMATCH:
|
||||
printf("ERROR: `three_datacenter' replication is incompatible with region configuration\n");
|
||||
fprintf(stderr, "ERROR: `three_datacenter' replication is incompatible with region configuration\n");
|
||||
printf("Type `fileconfigure FORCE <TOKEN...>' to configure without this check\n");
|
||||
ret=true;
|
||||
break;
|
||||
case ConfigurationResult::DCID_MISSING:
|
||||
printf("ERROR: `No storage servers in one of the specified regions\n");
|
||||
fprintf(stderr, "ERROR: `No storage servers in one of the specified regions\n");
|
||||
printf("Type `fileconfigure FORCE <TOKEN...>' to configure without this check\n");
|
||||
ret=true;
|
||||
break;
|
||||
@ -2158,13 +2158,13 @@ ACTOR Future<bool> coordinators( Database db, std::vector<StringRef> tokens, boo
|
||||
// SOMEDAY: Check for keywords
|
||||
auto const& addr = NetworkAddress::parse( t->toString() );
|
||||
if (addresses.count(addr)){
|
||||
printf("ERROR: passed redundant coordinators: `%s'\n", addr.toString().c_str());
|
||||
fprintf(stderr, "ERROR: passed redundant coordinators: `%s'\n", addr.toString().c_str());
|
||||
return true;
|
||||
}
|
||||
addresses.insert(addr);
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_connection_string_invalid) {
|
||||
printf("ERROR: '%s' is not a valid network endpoint address\n", t->toString().c_str());
|
||||
fprintf(stderr, "ERROR: '%s' is not a valid network endpoint address\n", t->toString().c_str());
|
||||
return true;
|
||||
}
|
||||
throw;
|
||||
@ -2183,30 +2183,30 @@ ACTOR Future<bool> coordinators( Database db, std::vector<StringRef> tokens, boo
|
||||
bool err = true;
|
||||
switch(r) {
|
||||
case CoordinatorsResult::INVALID_NETWORK_ADDRESSES:
|
||||
printf("ERROR: The specified network addresses are invalid\n");
|
||||
fprintf(stderr, "ERROR: The specified network addresses are invalid\n");
|
||||
break;
|
||||
case CoordinatorsResult::SAME_NETWORK_ADDRESSES:
|
||||
printf("No change (existing configuration satisfies request)\n");
|
||||
err = false;
|
||||
break;
|
||||
case CoordinatorsResult::NOT_COORDINATORS:
|
||||
printf("ERROR: Coordination servers are not running on the specified network addresses\n");
|
||||
fprintf(stderr, "ERROR: Coordination servers are not running on the specified network addresses\n");
|
||||
break;
|
||||
case CoordinatorsResult::DATABASE_UNREACHABLE:
|
||||
printf("ERROR: Database unreachable\n");
|
||||
fprintf(stderr, "ERROR: Database unreachable\n");
|
||||
break;
|
||||
case CoordinatorsResult::BAD_DATABASE_STATE:
|
||||
printf("ERROR: The database is in an unexpected state from which changing coordinators might be unsafe\n");
|
||||
fprintf(stderr, "ERROR: The database is in an unexpected state from which changing coordinators might be unsafe\n");
|
||||
break;
|
||||
case CoordinatorsResult::COORDINATOR_UNREACHABLE:
|
||||
printf("ERROR: One of the specified coordinators is unreachable\n");
|
||||
fprintf(stderr, "ERROR: One of the specified coordinators is unreachable\n");
|
||||
break;
|
||||
case CoordinatorsResult::SUCCESS:
|
||||
printf("Coordination state changed\n");
|
||||
err=false;
|
||||
break;
|
||||
case CoordinatorsResult::NOT_ENOUGH_MACHINES:
|
||||
printf("ERROR: Too few fdbserver machines to provide coordination at the current redundancy level\n");
|
||||
fprintf(stderr, "ERROR: Too few fdbserver machines to provide coordination at the current redundancy level\n");
|
||||
break;
|
||||
default:
|
||||
ASSERT(false);
|
||||
@ -2226,7 +2226,7 @@ ACTOR Future<bool> include( Database db, std::vector<StringRef> tokens ) {
|
||||
} else {
|
||||
auto a = AddressExclusion::parse( *t );
|
||||
if (!a.isValid()) {
|
||||
printf("ERROR: '%s' is not a valid network endpoint address\n", t->toString().c_str());
|
||||
fprintf(stderr, "ERROR: '%s' is not a valid network endpoint address\n", t->toString().c_str());
|
||||
if( t->toString().find(":tls") != std::string::npos )
|
||||
printf(" Do not include the `:tls' suffix when naming a process\n");
|
||||
return true;
|
||||
@ -2277,7 +2277,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
||||
} else {
|
||||
auto a = AddressExclusion::parse( *t );
|
||||
if (!a.isValid()) {
|
||||
printf("ERROR: '%s' is not a valid network endpoint address\n", t->toString().c_str());
|
||||
fprintf(stderr, "ERROR: '%s' is not a valid network endpoint address\n", t->toString().c_str());
|
||||
if( t->toString().find(":tls") != std::string::npos )
|
||||
printf(" Do not include the `:tls' suffix when naming a process\n");
|
||||
return true;
|
||||
@ -2319,13 +2319,13 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
||||
|
||||
StatusObjectReader statusObjCluster;
|
||||
if (!statusObj.get("cluster", statusObjCluster)) {
|
||||
printf("%s", errorString.c_str());
|
||||
fprintf(stderr, "%s", errorString.c_str());
|
||||
return true;
|
||||
}
|
||||
|
||||
StatusObjectReader processesMap;
|
||||
if (!statusObjCluster.get("processes", processesMap)) {
|
||||
printf("%s", errorString.c_str());
|
||||
fprintf(stderr, "%s", errorString.c_str());
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -2349,7 +2349,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
||||
StatusObjectReader process(proc.second);
|
||||
std::string addrStr;
|
||||
if (!process.get("address", addrStr)) {
|
||||
printf("%s", errorString.c_str());
|
||||
fprintf(stderr, "%s", errorString.c_str());
|
||||
return true;
|
||||
}
|
||||
NetworkAddress addr = NetworkAddress::parse(addrStr);
|
||||
@ -2362,19 +2362,19 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
||||
if(!excluded) {
|
||||
StatusObjectReader disk;
|
||||
if (!process.get("disk", disk)) {
|
||||
printf("%s", errorString.c_str());
|
||||
fprintf(stderr, "%s", errorString.c_str());
|
||||
return true;
|
||||
}
|
||||
|
||||
int64_t total_bytes;
|
||||
if (!disk.get("total_bytes", total_bytes)) {
|
||||
printf("%s", errorString.c_str());
|
||||
fprintf(stderr, "%s", errorString.c_str());
|
||||
return true;
|
||||
}
|
||||
|
||||
int64_t free_bytes;
|
||||
if (!disk.get("free_bytes", free_bytes)) {
|
||||
printf("%s", errorString.c_str());
|
||||
fprintf(stderr, "%s", errorString.c_str());
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -2384,12 +2384,12 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
||||
}
|
||||
catch (...) // std::exception
|
||||
{
|
||||
printf("%s", errorString.c_str());
|
||||
fprintf(stderr, "%s", errorString.c_str());
|
||||
return true;
|
||||
}
|
||||
|
||||
if( ssExcludedCount==ssTotalCount || (1-worstFreeSpaceRatio)*ssTotalCount/(ssTotalCount-ssExcludedCount) > 0.9 ) {
|
||||
printf("ERROR: This exclude may cause the total free space in the cluster to drop below 10%%.\n"
|
||||
fprintf(stderr, "ERROR: This exclude may cause the total free space in the cluster to drop below 10%%.\n"
|
||||
"Type `exclude FORCE <ADDRESS...>' to exclude without checking free space.\n");
|
||||
return true;
|
||||
}
|
||||
@ -2425,22 +2425,22 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
||||
for (const auto& exclusion : exclusionVector) {
|
||||
if (absentExclusions.find(exclusion) != absentExclusions.end()) {
|
||||
if (exclusion.port == 0) {
|
||||
printf(" %s(Whole machine) ---- WARNING: Missing from cluster!Be sure that you excluded the "
|
||||
fprintf(stderr, " %s(Whole machine) ---- WARNING: Missing from cluster!Be sure that you excluded the "
|
||||
"correct machines before removing them from the cluster!\n",
|
||||
exclusion.ip.toString().c_str());
|
||||
} else {
|
||||
printf(" %s ---- WARNING: Missing from cluster! Be sure that you excluded the correct processes "
|
||||
fprintf(stderr, " %s ---- WARNING: Missing from cluster! Be sure that you excluded the correct processes "
|
||||
"before removing them from the cluster!\n",
|
||||
exclusion.toString().c_str());
|
||||
}
|
||||
} else if (std::any_of(notExcludedServers.begin(), notExcludedServers.end(),
|
||||
[&](const NetworkAddress& a) { return addressExcluded({ exclusion }, a); })) {
|
||||
if (exclusion.port == 0) {
|
||||
printf(" %s(Whole machine) ---- WARNING: Exclusion in progress! It is not safe to remove this "
|
||||
fprintf(stderr, " %s(Whole machine) ---- WARNING: Exclusion in progress! It is not safe to remove this "
|
||||
"machine from the cluster\n",
|
||||
exclusion.ip.toString().c_str());
|
||||
} else {
|
||||
printf(" %s ---- WARNING: Exclusion in progress! It is not safe to remove this process from the "
|
||||
fprintf(stderr, " %s ---- WARNING: Exclusion in progress! It is not safe to remove this process from the "
|
||||
"cluster\n",
|
||||
exclusion.toString().c_str());
|
||||
}
|
||||
@ -2462,7 +2462,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
|
||||
for (const auto& c : ccs.coordinators()) {
|
||||
if (std::count(exclusionVector.begin(), exclusionVector.end(), AddressExclusion(c.ip, c.port)) ||
|
||||
std::count(exclusionVector.begin(), exclusionVector.end(), AddressExclusion(c.ip))) {
|
||||
printf("WARNING: %s is a coordinator!\n", c.toString().c_str());
|
||||
fprintf(stderr, "WARNING: %s is a coordinator!\n", c.toString().c_str());
|
||||
foundCoordinator = true;
|
||||
}
|
||||
}
|
||||
@ -2514,7 +2514,7 @@ ACTOR Future<bool> setClass( Database db, std::vector<StringRef> tokens ) {
|
||||
|
||||
AddressExclusion addr = AddressExclusion::parse( tokens[1] );
|
||||
if (!addr.isValid()) {
|
||||
printf("ERROR: '%s' is not a valid network endpoint address\n", tokens[1].toString().c_str());
|
||||
fprintf(stderr, "ERROR: '%s' is not a valid network endpoint address\n", tokens[1].toString().c_str());
|
||||
if( tokens[1].toString().find(":tls") != std::string::npos )
|
||||
printf(" Do not include the `:tls' suffix when naming a process\n");
|
||||
return true;
|
||||
@ -2522,7 +2522,7 @@ ACTOR Future<bool> setClass( Database db, std::vector<StringRef> tokens ) {
|
||||
|
||||
ProcessClass processClass(tokens[2].toString(), ProcessClass::DBSource);
|
||||
if(processClass.classType() == ProcessClass::InvalidClass && tokens[2] != LiteralStringRef("default")) {
|
||||
printf("ERROR: '%s' is not a valid process class\n", tokens[2].toString().c_str());
|
||||
fprintf(stderr, "ERROR: '%s' is not a valid process class\n", tokens[2].toString().c_str());
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -3049,7 +3049,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
}
|
||||
}
|
||||
catch (Error& e) {
|
||||
printf("ERROR: %s (%d)\n", e.what(), e.code());
|
||||
fprintf(stderr, "ERROR: %s (%d)\n", e.what(), e.code());
|
||||
printf("Unable to connect to cluster from `%s'\n", ccf->getFilename().c_str());
|
||||
return 1;
|
||||
}
|
||||
@ -3143,9 +3143,9 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
continue;
|
||||
|
||||
if (tokencmp(tokens[0], "parse_error")) {
|
||||
printf("ERROR: Command failed to completely parse.\n");
|
||||
fprintf(stderr, "ERROR: Command failed to completely parse.\n");
|
||||
if (tokens.size() > 1) {
|
||||
printf("ERROR: Not running partial or malformed command:");
|
||||
fprintf(stderr, "ERROR: Not running partial or malformed command:");
|
||||
for (auto t = tokens.begin() + 1; t != tokens.end(); ++t)
|
||||
printf(" %s", formatStringRef(*t, true).c_str());
|
||||
printf("\n");
|
||||
@ -3162,7 +3162,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
}
|
||||
|
||||
if (!helpMap.count(tokens[0].toString()) && !hiddenCommands.count(tokens[0].toString())) {
|
||||
printf("ERROR: Unknown command `%s'. Try `help'?\n", formatStringRef(tokens[0]).c_str());
|
||||
fprintf(stderr, "ERROR: Unknown command `%s'. Try `help'?\n", formatStringRef(tokens[0]).c_str());
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
@ -3364,7 +3364,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
throw e;
|
||||
}
|
||||
} else {
|
||||
printf("ERROR: Incorrect passphrase entered.\n");
|
||||
fprintf(stderr, "ERROR: Incorrect passphrase entered.\n");
|
||||
is_error = true;
|
||||
}
|
||||
}
|
||||
@ -3387,7 +3387,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
printUsage(tokens[0]);
|
||||
is_error = true;
|
||||
} else if (intrans) {
|
||||
printf("ERROR: Already in transaction\n");
|
||||
fprintf(stderr, "ERROR: Already in transaction\n");
|
||||
is_error = true;
|
||||
} else {
|
||||
activeOptions = FdbOptions(globalOptions);
|
||||
@ -3404,7 +3404,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
printUsage(tokens[0]);
|
||||
is_error = true;
|
||||
} else if (!intrans) {
|
||||
printf("ERROR: No active transaction\n");
|
||||
fprintf(stderr, "ERROR: No active transaction\n");
|
||||
is_error = true;
|
||||
} else {
|
||||
wait( commitTransaction( tr ) );
|
||||
@ -3420,7 +3420,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
printUsage(tokens[0]);
|
||||
is_error = true;
|
||||
} else if (!intrans) {
|
||||
printf("ERROR: No active transaction\n");
|
||||
fprintf(stderr, "ERROR: No active transaction\n");
|
||||
is_error = true;
|
||||
} else {
|
||||
tr->reset();
|
||||
@ -3437,7 +3437,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
printUsage(tokens[0]);
|
||||
is_error = true;
|
||||
} else if (!intrans) {
|
||||
printf("ERROR: No active transaction\n");
|
||||
fprintf(stderr, "ERROR: No active transaction\n");
|
||||
is_error = true;
|
||||
} else {
|
||||
intrans = false;
|
||||
@ -3527,14 +3527,14 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
tr->set(LiteralStringRef("\xff\xff/reboot_worker"), it.second.first);
|
||||
}
|
||||
if (address_interface.size() == 0) {
|
||||
printf("ERROR: no processes to kill. You must run the `kill’ command before running `kill all’.\n");
|
||||
fprintf(stderr, "ERROR: no processes to kill. You must run the `kill’ command before running `kill all’.\n");
|
||||
} else {
|
||||
printf("Attempted to kill %zu processes\n", address_interface.size());
|
||||
}
|
||||
} else {
|
||||
for(int i = 1; i < tokens.size(); i++) {
|
||||
if(!address_interface.count(tokens[i])) {
|
||||
printf("ERROR: process `%s' not recognized.\n", printable(tokens[i]).c_str());
|
||||
fprintf(stderr, "ERROR: process `%s' not recognized.\n", printable(tokens[i]).c_str());
|
||||
is_error = true;
|
||||
break;
|
||||
}
|
||||
@ -3587,7 +3587,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
} else {
|
||||
for(int i = 2; i < tokens.size(); i++) {
|
||||
if(!address_interface.count(tokens[i])) {
|
||||
printf("ERROR: process `%s' not recognized.\n", printable(tokens[i]).c_str());
|
||||
fprintf(stderr, "ERROR: process `%s' not recognized.\n", printable(tokens[i]).c_str());
|
||||
is_error = true;
|
||||
break;
|
||||
}
|
||||
@ -3682,7 +3682,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
|
||||
if (tokencmp(tokens[0], "profile")) {
|
||||
if (tokens.size() == 1) {
|
||||
printf("ERROR: Usage: profile <client|list|flow|heap>\n");
|
||||
fprintf(stderr, "ERROR: Usage: profile <client|list|flow|heap>\n");
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
@ -3690,13 +3690,13 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
getTransaction(db, tr, options, intrans);
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
if (tokens.size() == 2) {
|
||||
printf("ERROR: Usage: profile client <get|set>\n");
|
||||
fprintf(stderr, "ERROR: Usage: profile client <get|set>\n");
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
if (tokencmp(tokens[2], "get")) {
|
||||
if (tokens.size() != 3) {
|
||||
printf("ERROR: Addtional arguments to `get` are not supported.\n");
|
||||
fprintf(stderr, "ERROR: Addtional arguments to `get` are not supported.\n");
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
@ -3721,7 +3721,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
}
|
||||
if (tokencmp(tokens[2], "set")) {
|
||||
if (tokens.size() != 5) {
|
||||
printf("ERROR: Usage: profile client set <RATE|default> <SIZE|default>\n");
|
||||
fprintf(stderr, "ERROR: Usage: profile client set <RATE|default> <SIZE|default>\n");
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
@ -3732,7 +3732,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
char* end;
|
||||
sampleRate = std::strtod((const char*)tokens[3].begin(), &end);
|
||||
if (!std::isspace(*end)) {
|
||||
printf("ERROR: %s failed to parse.\n", printable(tokens[3]).c_str());
|
||||
fprintf(stderr, "ERROR: %s failed to parse.\n", printable(tokens[3]).c_str());
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
@ -3745,7 +3745,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
if (parsed.present()) {
|
||||
sizeLimit = parsed.get();
|
||||
} else {
|
||||
printf("ERROR: `%s` failed to parse.\n", printable(tokens[4]).c_str());
|
||||
fprintf(stderr, "ERROR: `%s` failed to parse.\n", printable(tokens[4]).c_str());
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
@ -3757,13 +3757,13 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
}
|
||||
continue;
|
||||
}
|
||||
printf("ERROR: Unknown action: %s\n", printable(tokens[2]).c_str());
|
||||
fprintf(stderr, "ERROR: Unknown action: %s\n", printable(tokens[2]).c_str());
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
if (tokencmp(tokens[1], "list")) {
|
||||
if (tokens.size() != 2) {
|
||||
printf("ERROR: Usage: profile list\n");
|
||||
fprintf(stderr, "ERROR: Usage: profile list\n");
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
@ -3784,13 +3784,13 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
}
|
||||
if (tokencmp(tokens[1], "flow")) {
|
||||
if (tokens.size() == 2) {
|
||||
printf("ERROR: Usage: profile flow <run>\n");
|
||||
fprintf(stderr, "ERROR: Usage: profile flow <run>\n");
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
if (tokencmp(tokens[2], "run")) {
|
||||
if (tokens.size() < 6) {
|
||||
printf("ERROR: Usage: profile flow run <DURATION_IN_SECONDS> <FILENAME> <PROCESS...>\n");
|
||||
fprintf(stderr, "ERROR: Usage: profile flow run <DURATION_IN_SECONDS> <FILENAME> <PROCESS...>\n");
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
@ -3803,7 +3803,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
char *duration_end;
|
||||
int duration = std::strtol((const char*)tokens[3].begin(), &duration_end, 10);
|
||||
if (!std::isspace(*duration_end)) {
|
||||
printf("ERROR: Failed to parse %s as an integer.", printable(tokens[3]).c_str());
|
||||
fprintf(stderr, "ERROR: Failed to parse %s as an integer.", printable(tokens[3]).c_str());
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
@ -3828,7 +3828,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
for (int tokenidx = 5; tokenidx < tokens.size(); tokenidx++) {
|
||||
auto element = interfaces.find(tokens[tokenidx]);
|
||||
if (element == interfaces.end()) {
|
||||
printf("ERROR: process '%s' not recognized.\n", printable(tokens[tokenidx]).c_str());
|
||||
fprintf(stderr, "ERROR: process '%s' not recognized.\n", printable(tokens[tokenidx]).c_str());
|
||||
is_error = true;
|
||||
}
|
||||
}
|
||||
@ -3846,7 +3846,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
for (int i = 0; i < all_profiler_responses.size(); i++) {
|
||||
const ErrorOr<Void>& err = all_profiler_responses[i].get();
|
||||
if (err.isError()) {
|
||||
printf("ERROR: %s: %s: %s\n", printable(all_profiler_addresses[i]).c_str(), err.getError().name(), err.getError().what());
|
||||
fprintf(stderr, "ERROR: %s: %s: %s\n", printable(all_profiler_addresses[i]).c_str(), err.getError().name(), err.getError().what());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -3857,7 +3857,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
}
|
||||
if (tokencmp(tokens[1], "heap")) {
|
||||
if (tokens.size() != 3) {
|
||||
printf("ERROR: Usage: profile heap <PROCESS>\n");
|
||||
fprintf(stderr, "ERROR: Usage: profile heap <PROCESS>\n");
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
@ -3877,7 +3877,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
}
|
||||
state Key ip_port = tokens[2];
|
||||
if (interfaces.find(ip_port) == interfaces.end()) {
|
||||
printf("ERROR: host %s not found\n", printable(ip_port).c_str());
|
||||
fprintf(stderr, "ERROR: host %s not found\n", printable(ip_port).c_str());
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
@ -3885,11 +3885,11 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
profileRequest.outputFile = LiteralStringRef("heapz");
|
||||
ErrorOr<Void> response = wait(interfaces[ip_port].profiler.tryGetReply(profileRequest));
|
||||
if (response.isError()) {
|
||||
printf("ERROR: %s: %s: %s\n", printable(ip_port).c_str(), response.getError().name(), response.getError().what());
|
||||
fprintf(stderr, "ERROR: %s: %s: %s\n", printable(ip_port).c_str(), response.getError().name(), response.getError().what());
|
||||
}
|
||||
continue;
|
||||
}
|
||||
printf("ERROR: Unknown type: %s\n", printable(tokens[1]).c_str());
|
||||
fprintf(stderr, "ERROR: Unknown type: %s\n", printable(tokens[1]).c_str());
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
@ -3930,14 +3930,14 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
tr->set(LiteralStringRef("\xff\xff/reboot_and_check_worker"), it.second.first);
|
||||
}
|
||||
if (address_interface.size() == 0) {
|
||||
printf("ERROR: no processes to check. You must run the `expensive_data_check’ command before running `expensive_data_check all’.\n");
|
||||
fprintf(stderr, "ERROR: no processes to check. You must run the `expensive_data_check’ command before running `expensive_data_check all’.\n");
|
||||
} else {
|
||||
printf("Attempted to kill and check %zu processes\n", address_interface.size());
|
||||
}
|
||||
} else {
|
||||
for(int i = 1; i < tokens.size(); i++) {
|
||||
if(!address_interface.count(tokens[i])) {
|
||||
printf("ERROR: process `%s' not recognized.\n", printable(tokens[i]).c_str());
|
||||
fprintf(stderr, "ERROR: process `%s' not recognized.\n", printable(tokens[i]).c_str());
|
||||
is_error = true;
|
||||
break;
|
||||
}
|
||||
@ -3973,7 +3973,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
// limit at the (already absurd)
|
||||
// nearly-a-billion
|
||||
if (tokens[3].size() > 9) {
|
||||
printf("ERROR: bad limit\n");
|
||||
fprintf(stderr, "ERROR: bad limit\n");
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
@ -3989,7 +3989,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
place *= 10;
|
||||
}
|
||||
if (!valid) {
|
||||
printf("ERROR: bad limit\n");
|
||||
fprintf(stderr, "ERROR: bad limit\n");
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
@ -4049,7 +4049,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
|
||||
if (tokencmp(tokens[0], "set")) {
|
||||
if(!writeMode) {
|
||||
printf("ERROR: writemode must be enabled to set or clear keys in the database.\n");
|
||||
fprintf(stderr, "ERROR: writemode must be enabled to set or clear keys in the database.\n");
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
@ -4070,7 +4070,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
|
||||
if (tokencmp(tokens[0], "clear")) {
|
||||
if(!writeMode) {
|
||||
printf("ERROR: writemode must be enabled to set or clear keys in the database.\n");
|
||||
fprintf(stderr, "ERROR: writemode must be enabled to set or clear keys in the database.\n");
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
@ -4091,7 +4091,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
|
||||
if (tokencmp(tokens[0], "clearrange")) {
|
||||
if(!writeMode) {
|
||||
printf("ERROR: writemode must be enabled to set or clear keys in the database.\n");
|
||||
fprintf(stderr, "ERROR: writemode must be enabled to set or clear keys in the database.\n");
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
@ -4167,7 +4167,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
printf("\n");
|
||||
}
|
||||
else
|
||||
printf("There are no options enabled\n");
|
||||
fprintf(stderr, "There are no options enabled\n");
|
||||
|
||||
continue;
|
||||
}
|
||||
@ -4177,12 +4177,12 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
}
|
||||
else if(tokencmp(tokens[1], "off")) {
|
||||
if(intrans) {
|
||||
printf("ERROR: Cannot turn option off when using a transaction created with `begin'\n");
|
||||
fprintf(stderr, "ERROR: Cannot turn option off when using a transaction created with `begin'\n");
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
if(tokens.size() > 3) {
|
||||
printf("ERROR: Cannot specify option argument when turning option off\n");
|
||||
fprintf(stderr, "ERROR: Cannot specify option argument when turning option off\n");
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
@ -4190,7 +4190,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
isOn = false;
|
||||
}
|
||||
else {
|
||||
printf("ERROR: Invalid option state `%s': option must be turned `on' or `off'\n", formatStringRef(tokens[1]).c_str());
|
||||
fprintf(stderr, "ERROR: Invalid option state `%s': option must be turned `on' or `off'\n", formatStringRef(tokens[1]).c_str());
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
@ -4248,7 +4248,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
char *end;
|
||||
throttleListLimit = std::strtol((const char*)tokens[3].begin(), &end, 10);
|
||||
if ((tokens.size() > 4 && !std::isspace(*end)) || (tokens.size() == 4 && *end != '\0')) {
|
||||
printf("ERROR: failed to parse limit `%s'.\n", printable(tokens[3]).c_str());
|
||||
fprintf(stderr, "ERROR: failed to parse limit `%s'.\n", printable(tokens[3]).c_str());
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
@ -4324,12 +4324,12 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
char *end;
|
||||
tpsRate = std::strtod((const char*)tokens[4].begin(), &end);
|
||||
if((tokens.size() > 5 && !std::isspace(*end)) || (tokens.size() == 5 && *end != '\0')) {
|
||||
printf("ERROR: failed to parse rate `%s'.\n", printable(tokens[4]).c_str());
|
||||
fprintf(stderr, "ERROR: failed to parse rate `%s'.\n", printable(tokens[4]).c_str());
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
if(tpsRate < 0) {
|
||||
printf("ERROR: rate cannot be negative `%f'\n", tpsRate);
|
||||
fprintf(stderr, "ERROR: rate cannot be negative `%f'\n", tpsRate);
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
@ -4337,14 +4337,14 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
if(tokens.size() == 6) {
|
||||
Optional<uint64_t> parsedDuration = parseDuration(tokens[5].toString());
|
||||
if(!parsedDuration.present()) {
|
||||
printf("ERROR: failed to parse duration `%s'.\n", printable(tokens[5]).c_str());
|
||||
fprintf(stderr, "ERROR: failed to parse duration `%s'.\n", printable(tokens[5]).c_str());
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
duration = parsedDuration.get();
|
||||
|
||||
if(duration == 0) {
|
||||
printf("ERROR: throttle duration cannot be 0\n");
|
||||
fprintf(stderr, "ERROR: throttle duration cannot be 0\n");
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
@ -4360,7 +4360,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
priority = TransactionPriority::BATCH;
|
||||
}
|
||||
else {
|
||||
printf("ERROR: unrecognized priority `%s'. Must be one of `default',\n `immediate', or `batch'.\n", tokens[6].toString().c_str());
|
||||
fprintf(stderr, "ERROR: unrecognized priority `%s'. Must be one of `default',\n `immediate', or `batch'.\n", tokens[6].toString().c_str());
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
@ -4517,7 +4517,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
}
|
||||
|
||||
|
||||
printf("ERROR: Unknown command `%s'. Try `help'?\n", formatStringRef(tokens[0]).c_str());
|
||||
fprintf(stderr, "ERROR: Unknown command `%s'. Try `help'?\n", formatStringRef(tokens[0]).c_str());
|
||||
is_error = true;
|
||||
}
|
||||
|
||||
@ -4525,7 +4525,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
|
||||
} catch (Error& e) {
|
||||
if(e.code() != error_code_actor_cancelled)
|
||||
printf("ERROR: %s (%d)\n", e.what(), e.code());
|
||||
fprintf(stderr, "ERROR: %s (%d)\n", e.what(), e.code());
|
||||
is_error = true;
|
||||
if (intrans) {
|
||||
printf("Rolling back current transaction\n");
|
||||
@ -4721,7 +4721,7 @@ int main(int argc, char **argv) {
|
||||
printf("\n");
|
||||
loaded.print(stdout);
|
||||
} catch (Error& e) {
|
||||
printf("ERROR: %s (%d)\n", e.what(), e.code());
|
||||
fprintf(stderr, "ERROR: %s (%d)\n", e.what(), e.code());
|
||||
printf("Use --log and look at the trace logs for more detailed information on the failure.\n");
|
||||
return 1;
|
||||
}
|
||||
@ -4745,7 +4745,7 @@ int main(int argc, char **argv) {
|
||||
return 1;
|
||||
}
|
||||
} catch (Error& e) {
|
||||
printf("ERROR: %s (%d)\n", e.what(), e.code());
|
||||
fprintf(stderr, "ERROR: %s (%d)\n", e.what(), e.code());
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
@ -346,9 +346,7 @@ void DLApi::init() {
|
||||
if (unlinkOnLoad) {
|
||||
int err = unlink(fdbCPath.c_str());
|
||||
if (err) {
|
||||
TraceEvent(SevError, "ErrorUnlinkingTempClientLibraryFile")
|
||||
.detail("errno", errno)
|
||||
.detail("LibraryPath", fdbCPath);
|
||||
TraceEvent(SevError, "ErrorUnlinkingTempClientLibraryFile").GetLastError().detail("LibraryPath", fdbCPath);
|
||||
throw platform_error();
|
||||
}
|
||||
}
|
||||
@ -1112,14 +1110,20 @@ void MultiVersionApi::setCallbacksOnExternalThreads() {
|
||||
}
|
||||
void MultiVersionApi::addExternalLibrary(std::string path) {
|
||||
std::string filename = basename(path);
|
||||
// we need at least one external library thread to run this library.
|
||||
threadCount = std::max(threadCount, 1);
|
||||
|
||||
if (filename.empty() || !fileExists(path)) {
|
||||
TraceEvent("ExternalClientNotFound").detail("LibraryPath", filename);
|
||||
throw file_not_found();
|
||||
}
|
||||
|
||||
MutexHolder holder(lock);
|
||||
if (networkStartSetup) {
|
||||
throw invalid_option(); // SOMEDAY: it might be good to allow clients to be added after the network is setup
|
||||
}
|
||||
|
||||
// external libraries always run on their own thread; ensure we allocate at least one thread to run this library.
|
||||
threadCount = std::max(threadCount, 1);
|
||||
|
||||
if (externalClientDescriptions.count(filename) == 0) {
|
||||
TraceEvent("AddingExternalClient").detail("LibraryPath", filename);
|
||||
externalClientDescriptions.emplace(std::make_pair(filename, ClientDesc(path, true)));
|
||||
@ -1129,7 +1133,13 @@ void MultiVersionApi::addExternalLibrary(std::string path) {
|
||||
void MultiVersionApi::addExternalLibraryDirectory(std::string path) {
|
||||
TraceEvent("AddingExternalClientDirectory").detail("Directory", path);
|
||||
std::vector<std::string> files = platform::listFiles(path, DYNAMIC_LIB_EXT);
|
||||
// we need at least one external library thread to run these libraries.
|
||||
|
||||
MutexHolder holder(lock);
|
||||
if (networkStartSetup) {
|
||||
throw invalid_option(); // SOMEDAY: it might be good to allow clients to be added after the network is setup
|
||||
}
|
||||
|
||||
// external libraries always run on their own thread; ensure we allocate at least one thread to run this library.
|
||||
threadCount = std::max(threadCount, 1);
|
||||
|
||||
for(auto filename : files) {
|
||||
@ -1171,13 +1181,15 @@ std::vector<std::pair<std::string, bool>> MultiVersionApi::copyExternalLibraryPe
|
||||
break;
|
||||
}
|
||||
if (readCount == -1) {
|
||||
throw platform_error;
|
||||
TraceEvent(SevError, "ExternalClientCopyFailedReadError").GetLastError().detail("LibraryPath", path);
|
||||
throw platform_error();
|
||||
}
|
||||
ssize_t written = 0;
|
||||
while (written != readCount) {
|
||||
ssize_t writeCount = write(tempFd, buf + written, readCount - written);
|
||||
if (writeCount == -1) {
|
||||
throw platform_error;
|
||||
TraceEvent(SevError, "ExternalClientCopyFailedWriteError").GetLastError().detail("LibraryPath", path);
|
||||
throw platform_error();
|
||||
}
|
||||
written += writeCount;
|
||||
}
|
||||
@ -1194,7 +1206,8 @@ std::vector<std::pair<std::string, bool>> MultiVersionApi::copyExternalLibraryPe
|
||||
#else
|
||||
std::vector<std::pair< std::string, bool> > MultiVersionApi::copyExternalLibraryPerThread(std::string path) {
|
||||
if (threadCount > 1) {
|
||||
throw platform_error(); // not supported
|
||||
TraceEvent(SevError, "MultipleClientThreadsUnsupportedOnWindows");
|
||||
throw unsupported_operation();
|
||||
}
|
||||
std::vector<std::pair<std::string, bool>> paths;
|
||||
paths.push_back({ path , false });
|
||||
@ -1453,7 +1466,7 @@ Reference<IDatabase> MultiVersionApi::createDatabase(const char *clusterFilePath
|
||||
}
|
||||
std::string clusterFile(clusterFilePath);
|
||||
|
||||
if (threadCount > 1) {
|
||||
if (threadCount > 1 || localClientDisabled) {
|
||||
ASSERT(localClientDisabled);
|
||||
ASSERT(!bypassMultiClientApi);
|
||||
|
||||
|
@ -140,7 +140,7 @@ struct StorageInfo : NonCopyable, public ReferenceCounted<StorageInfo> {
|
||||
};
|
||||
|
||||
struct ServerCacheInfo {
|
||||
std::vector<Tag> tags;
|
||||
std::vector<Tag> tags; // all tags in both primary and remote DC for the key-range
|
||||
std::vector<Reference<StorageInfo>> src_info;
|
||||
std::vector<Reference<StorageInfo>> dest_info;
|
||||
|
||||
|
@ -110,7 +110,7 @@ description is not currently required but encouraged.
|
||||
<Option name="disable_local_client" code="64"
|
||||
description="Prevents connections through the local client, allowing only connections through externally loaded client libraries." />
|
||||
<Option name="client_threads_per_version" code="65"
|
||||
paramType="Int" paramDescription="Number of client threads to be spawned. Each server will be serviced by a single client thread."
|
||||
paramType="Int" paramDescription="Number of client threads to be spawned. Each cluster will be serviced by a single client thread."
|
||||
description="Spawns multiple worker threads for each version of the client that is loaded. Setting this to a number greater than one implies disable_local_client." />
|
||||
<Option name="disable_client_statistics_logging" code="70"
|
||||
description="Disables logging of client statistics, such as sampled transaction activity." />
|
||||
|
@ -169,7 +169,9 @@ void addLaggingRequest(Future<Optional<Reply>> reply, Promise<Void> requestFinis
|
||||
|
||||
// Keep trying to get a reply from any of servers until success or cancellation; tries to take into account
|
||||
// failMon's information for load balancing and avoiding failed servers
|
||||
// If ALL the servers are failed and the list of servers is not fresh, throws an exception to let the caller refresh the list of servers
|
||||
// If ALL the servers are failed and the list of servers is not fresh, throws an exception to let the caller refresh the
|
||||
// list of servers. When model is set, load balance among alternatives in the same DC, aiming to balance request queue
|
||||
// length on these interfaces. If too many interfaces in the same DC are bad, try remote interfaces.
|
||||
ACTOR template <class Interface, class Request, class Multi>
|
||||
Future< REPLY_TYPE(Request) > loadBalance(
|
||||
Reference<MultiInterface<Multi>> alternatives,
|
||||
@ -199,17 +201,22 @@ Future< REPLY_TYPE(Request) > loadBalance(
|
||||
nextAlt++;
|
||||
|
||||
if(model) {
|
||||
double bestMetric = 1e9;
|
||||
double bestMetric = 1e9; // Storage server with the least outstanding requests.
|
||||
double nextMetric = 1e9;
|
||||
double bestTime = 1e9;
|
||||
double bestTime = 1e9; // The latency to the server with the least outstanding requests.
|
||||
double nextTime = 1e9;
|
||||
int badServers = 0;
|
||||
|
||||
for(int i=0; i<alternatives->size(); i++) {
|
||||
// countBest(): the number of alternatives in the same locality (i.e., DC by default) as alternatives[0].
|
||||
// if the if-statement is correct, it won't try to send requests to the remote ones.
|
||||
if(badServers < std::min(i, FLOW_KNOBS->LOAD_BALANCE_MAX_BAD_OPTIONS + 1) && i == alternatives->countBest()) {
|
||||
// When we have at least one healthy local server, and the bad
|
||||
// server count is within "LOAD_BALANCE_MAX_BAD_OPTIONS". We
|
||||
// do not need to consider any remote servers.
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
RequestStream<Request> const* thisStream = &alternatives->get( i, channel );
|
||||
if (!IFailureMonitor::failureMonitor().getState( thisStream->getEndpoint() ).failed) {
|
||||
auto& qd = model->getMeasurement(thisStream->getEndpoint().token.first());
|
||||
@ -217,9 +224,12 @@ Future< REPLY_TYPE(Request) > loadBalance(
|
||||
double thisMetric = qd.smoothOutstanding.smoothTotal();
|
||||
double thisTime = qd.latency;
|
||||
if(FLOW_KNOBS->LOAD_BALANCE_PENALTY_IS_BAD && qd.penalty > 1.001) {
|
||||
// When a server wants to penalize itself (the default
|
||||
// penalty value is 1.0), consider this server as bad.
|
||||
// penalty is sent from server.
|
||||
++badServers;
|
||||
}
|
||||
|
||||
|
||||
if(thisMetric < bestMetric) {
|
||||
if(i != bestAlt) {
|
||||
nextAlt = bestAlt;
|
||||
@ -242,6 +252,9 @@ Future< REPLY_TYPE(Request) > loadBalance(
|
||||
}
|
||||
}
|
||||
if( nextMetric > 1e8 ) {
|
||||
// If we still don't have a second best choice to issue request to,
|
||||
// go through all the remote servers again, since we may have
|
||||
// skipped it.
|
||||
for(int i=alternatives->countBest(); i<alternatives->size(); i++) {
|
||||
RequestStream<Request> const* thisStream = &alternatives->get( i, channel );
|
||||
if (!IFailureMonitor::failureMonitor().getState( thisStream->getEndpoint() ).failed) {
|
||||
@ -249,7 +262,7 @@ Future< REPLY_TYPE(Request) > loadBalance(
|
||||
if(now() > qd.failedUntil) {
|
||||
double thisMetric = qd.smoothOutstanding.smoothTotal();
|
||||
double thisTime = qd.latency;
|
||||
|
||||
|
||||
if( thisMetric < nextMetric ) {
|
||||
nextAlt = i;
|
||||
nextMetric = thisMetric;
|
||||
@ -261,6 +274,7 @@ Future< REPLY_TYPE(Request) > loadBalance(
|
||||
}
|
||||
|
||||
if(nextTime < 1e9) {
|
||||
// Decide when to send the request to the second best choice.
|
||||
if(bestTime > FLOW_KNOBS->INSTANT_SECOND_REQUEST_MULTIPLIER*(model->secondMultiplier*(nextTime) + FLOW_KNOBS->BASE_SECOND_REQUEST_TIME)) {
|
||||
secondDelay = Void();
|
||||
} else {
|
||||
@ -278,6 +292,7 @@ Future< REPLY_TYPE(Request) > loadBalance(
|
||||
state int numAttempts = 0;
|
||||
state double backoff = 0;
|
||||
state bool triedAllOptions = false;
|
||||
// Issue requests to selected servers.
|
||||
loop {
|
||||
if(now() - startTime > (g_network->isSimulated() ? 30.0 : 600.0)) {
|
||||
TraceEvent ev(g_network->isSimulated() ? SevWarn : SevWarnAlways, "LoadBalanceTooLong");
|
||||
@ -295,7 +310,9 @@ Future< REPLY_TYPE(Request) > loadBalance(
|
||||
}
|
||||
}
|
||||
|
||||
// Find an alternative, if any, that is not failed, starting with nextAlt
|
||||
// Find an alternative, if any, that is not failed, starting with
|
||||
// nextAlt. This logic matters only if model == NULL. Otherwise, the
|
||||
// bestAlt and nextAlt have been decided.
|
||||
state RequestStream<Request> const* stream = nullptr;
|
||||
for(int alternativeNum=0; alternativeNum<alternatives->size(); alternativeNum++) {
|
||||
int useAlt = nextAlt;
|
||||
|
@ -48,7 +48,7 @@ std::string describe( KVPair<K,V> const& p ) { return format("%d ", p.k) + descr
|
||||
template <class T>
|
||||
struct ReferencedInterface : public ReferenceCounted<ReferencedInterface<T>> {
|
||||
T interf;
|
||||
int8_t distance;
|
||||
int8_t distance; // one of enum values in struct LBDistance
|
||||
std::string toString() const {
|
||||
return interf.toString();
|
||||
}
|
||||
@ -239,7 +239,8 @@ public:
|
||||
}
|
||||
private:
|
||||
std::vector<Reference<ReferencedInterface<T>>> alternatives;
|
||||
int16_t bestCount;
|
||||
int16_t bestCount; // The number of interfaces in the same location as alternatives[0]. The same location means
|
||||
// DC by default and machine if more than one alternatives are on the same machine).
|
||||
};
|
||||
|
||||
template <class Ar, class T> void load(Ar& ar, Reference<MultiInterface<T>>&) { ASSERT(false); } //< required for Future<T>
|
||||
|
@ -23,6 +23,8 @@
|
||||
|
||||
void QueueModel::endRequest( uint64_t id, double latency, double penalty, double delta, bool clean, bool futureVersion ) {
|
||||
auto& d = data[id];
|
||||
|
||||
// Remove the penalty added when starting the request.
|
||||
d.smoothOutstanding.addDelta(-delta);
|
||||
|
||||
if(clean) {
|
||||
@ -48,7 +50,7 @@ void QueueModel::endRequest( uint64_t id, double latency, double penalty, double
|
||||
}
|
||||
|
||||
QueueData& QueueModel::getMeasurement( uint64_t id ) {
|
||||
return data[id];
|
||||
return data[id]; // return smoothed penalty
|
||||
}
|
||||
|
||||
double QueueModel::addRequest( uint64_t id ) {
|
||||
|
@ -27,37 +27,76 @@
|
||||
#include "flow/Knobs.h"
|
||||
#include "flow/ActorCollection.h"
|
||||
|
||||
|
||||
// The data structure used for the client-side load balancing algorithm to
|
||||
// decide which storage server to read data from. Conceptually, it tracks the
|
||||
// number of outstanding requests the current client sent to each storage
|
||||
// server. One "QueueData" represents one storage server.
|
||||
struct QueueData {
|
||||
// The current outstanding requests sent by the local client to this storage
|
||||
// server. The number is smoothed out over a continuous timeline.
|
||||
Smoother smoothOutstanding;
|
||||
|
||||
// The last client perceived latency to this storage server.
|
||||
double latency;
|
||||
|
||||
// Represents the "cost" of each storage request. By default, the penalty is
|
||||
// 1 indicating that each outstanding request corresponds 1 outstanding
|
||||
// request. However, storage server can also increase the penalty if it
|
||||
// decides to ask the client to slow down sending requests to it. Penalty
|
||||
// is updated after each LoadBalancedReply.
|
||||
double penalty;
|
||||
|
||||
// Do not consider this storage server if the current time hasn't reach this
|
||||
// time. This field is computed after each request to not repeatedly try the
|
||||
// same storage server that is likely not going to return a valid result.
|
||||
double failedUntil;
|
||||
|
||||
// If the storage server returns a "future version" error, increase above
|
||||
// `failedUntil` by this amount to increase the backoff time.
|
||||
double futureVersionBackoff;
|
||||
|
||||
// If the current time has reached this time, and this storage server still
|
||||
// hasn't returned a valid result, increase above `futureVersionBackoff`
|
||||
// to increase the future backoff amount.
|
||||
double increaseBackoffTime;
|
||||
QueueData() : latency(0.001), penalty(1.0), smoothOutstanding(FLOW_KNOBS->QUEUE_MODEL_SMOOTHING_AMOUNT), failedUntil(0), futureVersionBackoff(FLOW_KNOBS->FUTURE_VERSION_INITIAL_BACKOFF), increaseBackoffTime(0) {}
|
||||
QueueData()
|
||||
: latency(0.001), penalty(1.0), smoothOutstanding(FLOW_KNOBS->QUEUE_MODEL_SMOOTHING_AMOUNT), failedUntil(0),
|
||||
futureVersionBackoff(FLOW_KNOBS->FUTURE_VERSION_INITIAL_BACKOFF), increaseBackoffTime(0) {}
|
||||
};
|
||||
|
||||
typedef double TimeEstimate;
|
||||
|
||||
class QueueModel {
|
||||
public:
|
||||
void endRequest( uint64_t id, double latency, double penalty, double delta, bool clean, bool futureVersion );
|
||||
QueueData& getMeasurement( uint64_t id );
|
||||
double addRequest( uint64_t id );
|
||||
// Finishes the request sent to storage server with `id`.
|
||||
// - latency: the measured client-side latency of the request.
|
||||
// - penalty: the server side penalty sent along with the response from
|
||||
// the storage server. Requires >= 1.
|
||||
// - delta: Update server `id`'s queue model by substract this amount.
|
||||
// This value should be the value returned by `addRequest` below.
|
||||
// - clean: indicates whether the there was an error or not.
|
||||
// - futureVersion: indicates whether there was "future version" error or
|
||||
// not.
|
||||
void endRequest(uint64_t id, double latency, double penalty, double delta, bool clean, bool futureVersion);
|
||||
QueueData& getMeasurement(uint64_t id);
|
||||
|
||||
// Starts a new request to storage server with `id`. If the storage
|
||||
// server contains a penalty, add it to the queue size, and return the
|
||||
// penalty. The returned penalty should be passed as `delta` to `endRequest`
|
||||
// to make `smoothOutstanding` to reflect the real storage queue size.
|
||||
double addRequest(uint64_t id);
|
||||
double secondMultiplier;
|
||||
double secondBudget;
|
||||
PromiseStream< Future<Void> > addActor;
|
||||
PromiseStream<Future<Void>> addActor;
|
||||
Future<Void> laggingRequests; // requests for which a different recipient already answered
|
||||
int laggingRequestCount;
|
||||
|
||||
QueueModel() : secondMultiplier(1.0), secondBudget(0), laggingRequestCount(0) {
|
||||
laggingRequests = actorCollection( addActor.getFuture(), &laggingRequestCount );
|
||||
laggingRequests = actorCollection(addActor.getFuture(), &laggingRequestCount);
|
||||
}
|
||||
|
||||
~QueueModel() {
|
||||
laggingRequests.cancel();
|
||||
}
|
||||
~QueueModel() { laggingRequests.cancel(); }
|
||||
|
||||
private:
|
||||
std::unordered_map<uint64_t, QueueData> data;
|
||||
};
|
||||
@ -65,20 +104,20 @@ private:
|
||||
/* old queue model
|
||||
class QueueModel {
|
||||
public:
|
||||
QueueModel() : new_index(0) {
|
||||
total_time[0] = 0;
|
||||
total_time[1] = 0;
|
||||
}
|
||||
void addMeasurement( uint64_t id, QueueDetails qd );
|
||||
TimeEstimate getTimeEstimate( uint64_t id );
|
||||
TimeEstimate getAverageTimeEstimate();
|
||||
QueueDetails getMeasurement( uint64_t id );
|
||||
void expire();
|
||||
QueueModel() : new_index(0) {
|
||||
total_time[0] = 0;
|
||||
total_time[1] = 0;
|
||||
}
|
||||
void addMeasurement( uint64_t id, QueueDetails qd );
|
||||
TimeEstimate getTimeEstimate( uint64_t id );
|
||||
TimeEstimate getAverageTimeEstimate();
|
||||
QueueDetails getMeasurement( uint64_t id );
|
||||
void expire();
|
||||
|
||||
private:
|
||||
std::map<uint64_t, QueueDetails> data[2];
|
||||
double total_time[2];
|
||||
int new_index; // data[new_index] is the new data
|
||||
std::map<uint64_t, QueueDetails> data[2];
|
||||
double total_time[2];
|
||||
int new_index; // data[new_index] is the new data
|
||||
};
|
||||
*/
|
||||
|
||||
|
@ -1011,6 +1011,7 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
// Check if txn system is recruited successfully in each region
|
||||
void checkRegions(const std::vector<RegionInfo>& regions) {
|
||||
if(desiredDcIds.get().present() && desiredDcIds.get().get().size() == 2 && desiredDcIds.get().get()[0].get() == regions[0].dcId && desiredDcIds.get().get()[1].get() == regions[1].dcId) {
|
||||
return;
|
||||
|
@ -5165,9 +5165,8 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID)
|
||||
.error(e, true /*includeCancelled */);
|
||||
if (e.code() == error_code_snap_storage_failed
|
||||
|| e.code() == error_code_snap_tlog_failed
|
||||
|| e.code() == error_code_operation_cancelled) {
|
||||
if (e.code() == error_code_snap_storage_failed || e.code() == error_code_snap_tlog_failed ||
|
||||
e.code() == error_code_operation_cancelled || e.code() == error_code_snap_disable_tlog_pop_failed) {
|
||||
// enable tlog pop on local tlog nodes
|
||||
std::vector<TLogInterface> tlogs = db->get().logSystemConfig.allLocalLogs(false);
|
||||
try {
|
||||
|
@ -78,6 +78,8 @@ public:
|
||||
commit()
|
||||
read()
|
||||
*/
|
||||
// `init()` MUST be idempotent as it will be called more than once on a KeyValueStore in case
|
||||
// of a rollback.
|
||||
virtual Future<Void> init() {
|
||||
return Void();
|
||||
}
|
||||
|
@ -121,6 +121,16 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
|
||||
};
|
||||
void action(OpenAction& a) {
|
||||
// If the DB has already been initialized, this should be a no-op.
|
||||
if (db != nullptr) {
|
||||
TraceEvent(SevInfo, "RocksDB")
|
||||
.detail("Path", a.path)
|
||||
.detail("Method", "Open")
|
||||
.detail("Skipping", "Already Open");
|
||||
a.done.send(Void());
|
||||
return;
|
||||
}
|
||||
|
||||
std::vector<rocksdb::ColumnFamilyDescriptor> defaultCF = { rocksdb::ColumnFamilyDescriptor{
|
||||
"default", getCFOptions() } };
|
||||
std::vector<rocksdb::ColumnFamilyHandle*> handle;
|
||||
@ -474,3 +484,45 @@ IKeyValueStore* keyValueStoreRocksDB(std::string const& path, UID logID, KeyValu
|
||||
return nullptr;
|
||||
#endif // SSD_ROCKSDB_EXPERIMENTAL
|
||||
}
|
||||
|
||||
#ifdef SSD_ROCKSDB_EXPERIMENTAL
|
||||
#include "flow/UnitTest.h"
|
||||
|
||||
namespace {
|
||||
|
||||
TEST_CASE("fdbserver/KeyValueStoreRocksDB/Reopen") {
|
||||
state const std::string rocksDBTestDir = "rocksdb-kvstore-reopen-test-db";
|
||||
platform::eraseDirectoryRecursive(rocksDBTestDir);
|
||||
|
||||
state IKeyValueStore* kvStore = new RocksDBKeyValueStore(rocksDBTestDir, deterministicRandom()->randomUniqueID());
|
||||
wait(kvStore->init());
|
||||
|
||||
kvStore->set({ LiteralStringRef("foo"), LiteralStringRef("bar") });
|
||||
wait(kvStore->commit(false));
|
||||
|
||||
Optional<Value> val = wait(kvStore->readValue(LiteralStringRef("foo")));
|
||||
ASSERT(Optional<Value>(LiteralStringRef("bar")) == val);
|
||||
|
||||
Future<Void> closed = kvStore->onClosed();
|
||||
kvStore->close();
|
||||
wait(closed);
|
||||
|
||||
kvStore = new RocksDBKeyValueStore(rocksDBTestDir, deterministicRandom()->randomUniqueID());
|
||||
wait(kvStore->init());
|
||||
// Confirm that `init()` is idempotent.
|
||||
wait(kvStore->init());
|
||||
|
||||
Optional<Value> val = wait(kvStore->readValue(LiteralStringRef("foo")));
|
||||
ASSERT(Optional<Value>(LiteralStringRef("bar")) == val);
|
||||
|
||||
Future<Void> closed = kvStore->onClosed();
|
||||
kvStore->close();
|
||||
wait(closed);
|
||||
|
||||
platform::eraseDirectoryRecursive(rocksDBTestDir);
|
||||
return Void();
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
#endif // SSD_ROCKSDB_EXPERIMENTAL
|
||||
|
@ -269,7 +269,8 @@ ACTOR Future<Void> waitForVersion( LogRouterData *self, Version ver ) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Log router pull data from satellite tLog
|
||||
// Log router (LR) asynchronously pull data from satellite tLogs (preferred) or primary tLogs at tag (self->routerTag)
|
||||
// for the version range from the LR's current version (exclusive) to its epoch's end version or recovery version.
|
||||
ACTOR Future<Void> pullAsyncData( LogRouterData *self ) {
|
||||
state Future<Void> dbInfoChange = Void();
|
||||
state Reference<ILogSystem::IPeekCursor> r;
|
||||
|
@ -63,7 +63,7 @@ public:
|
||||
Reference<LocalitySet> logServerSet;
|
||||
std::vector<int> logIndexArray;
|
||||
std::vector<LocalityEntry> logEntryArray;
|
||||
bool isLocal;
|
||||
bool isLocal; // true if the LogSet is in primary DC or primary DC's satellite
|
||||
int8_t locality;
|
||||
Version startVersion;
|
||||
std::vector<Future<TLogLockResult>> replies;
|
||||
|
2388
fdbserver/MasterProxyServer.actor.cpp
Normal file
2388
fdbserver/MasterProxyServer.actor.cpp
Normal file
File diff suppressed because it is too large
Load Diff
@ -207,6 +207,10 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(Reference<ClusterConnec
|
||||
if(g_network->isSimulated() && e.code() != error_code_io_timeout && (bool)g_network->global(INetwork::enASIOTimedOut))
|
||||
TraceEvent(SevError, "IOTimeoutErrorSuppressed").detail("ErrorCode", e.code()).detail("RandomId", randomId).backtrace();
|
||||
|
||||
if (e.code() == error_code_io_timeout && !onShutdown.isReady()) {
|
||||
onShutdown = ISimulator::RebootProcess;
|
||||
}
|
||||
|
||||
if (onShutdown.isReady() && onShutdown.isError()) throw onShutdown.getError();
|
||||
if(e.code() != error_code_actor_cancelled)
|
||||
printf("SimulatedFDBDTerminated: %s\n", e.what());
|
||||
|
@ -619,7 +619,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
||||
specialCounter(cc, "QueueDiskBytesTotal", [tLogData](){ return tLogData->rawPersistentQueue->getStorageBytes().total; });
|
||||
specialCounter(cc, "PeekMemoryReserved", [tLogData]() { return tLogData->peekMemoryLimiter.activePermits(); });
|
||||
specialCounter(cc, "PeekMemoryRequestsStalled", [tLogData]() { return tLogData->peekMemoryLimiter.waiters(); });
|
||||
specialCounter(cc, "Geneartion", [this]() { return this->recoveryCount; });
|
||||
specialCounter(cc, "Generation", [this]() { return this->recoveryCount; });
|
||||
}
|
||||
|
||||
~LogData() {
|
||||
@ -1096,18 +1096,13 @@ ACTOR Future<Void> processPopRequests(TLogData* self, Reference<LogData> logData
|
||||
}
|
||||
}
|
||||
wait(waitForAll(ignoredPops));
|
||||
TraceEvent("ResetIgnorePopRequest")
|
||||
.detail("IgnorePopRequest", self->ignorePopRequest)
|
||||
.detail("IgnorePopDeadline", self->ignorePopDeadline);
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> tLogPop( TLogData* self, TLogPopRequest req, Reference<LogData> logData ) {
|
||||
// timeout check for ignorePopRequest
|
||||
if (self->ignorePopRequest && (g_network->now() > self->ignorePopDeadline)) {
|
||||
TraceEvent("EnableTLogPlayAllIgnoredPops").detail("IgnoredPopDeadline", self->ignorePopDeadline);
|
||||
wait(processPopRequests(self, logData));
|
||||
TraceEvent("ResetIgnorePopRequest")
|
||||
.detail("IgnorePopRequest", self->ignorePopRequest)
|
||||
.detail("IgnorePopDeadline", self->ignorePopDeadline);
|
||||
}
|
||||
if (self->ignorePopRequest) {
|
||||
TraceEvent(SevDebug, "IgnoringPopRequest").detail("IgnorePopDeadline", self->ignorePopDeadline);
|
||||
|
||||
@ -2278,6 +2273,11 @@ ACTOR Future<Void> serveTLogInterface( TLogData* self, TLogInterface tli, Refere
|
||||
when( TLogSnapRequest snapReq = waitNext( tli.snapRequest.getFuture() ) ) {
|
||||
logData->addActor.send( tLogSnapCreate( snapReq, self, logData) );
|
||||
}
|
||||
when(wait(self->ignorePopRequest ? delayUntil(self->ignorePopDeadline) : Never())) {
|
||||
TEST(true); // Hit ignorePopDeadline
|
||||
TraceEvent("EnableTLogPlayAllIgnoredPops").detail("IgnoredPopDeadline", self->ignorePopDeadline);
|
||||
logData->addActor.send(processPopRequests(self, logData));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -351,6 +351,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
||||
return logSystem;
|
||||
}
|
||||
|
||||
// Convert TagPartitionedLogSystem to DBCoreState and override input newState as return value
|
||||
void toCoreState(DBCoreState& newState) final {
|
||||
if( recoveryComplete.isValid() && recoveryComplete.isError() )
|
||||
throw recoveryComplete.getError();
|
||||
@ -1003,6 +1004,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
||||
}
|
||||
}
|
||||
|
||||
// LogRouter or BackupWorker use this function to obtain a cursor for peeking tlogs of a generation (i.e., epoch).
|
||||
// Specifically, the epoch is determined by looking up "dbgid" in tlog sets of generations.
|
||||
// The returned cursor can peek data at the "tag" from the given "begin" version to that epoch's end version or
|
||||
// the recovery version for the latest old epoch. For the current epoch, the cursor has no end version.
|
||||
Reference<IPeekCursor> peekLogRouter(UID dbgid, Version begin, Tag tag) final {
|
||||
bool found = false;
|
||||
for (const auto& log : tLogs) {
|
||||
@ -1095,7 +1100,12 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
||||
bestSet = bestSatelliteSet;
|
||||
}
|
||||
|
||||
TraceEvent("TLogPeekLogRouterOldSets", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("OldEpoch", old.epochEnd).detail("RecoveredAt", recoveredAt.present() ? recoveredAt.get() : -1).detail("FirstOld", firstOld);
|
||||
TraceEvent("TLogPeekLogRouterOldSets", dbgid)
|
||||
.detail("Tag", tag.toString())
|
||||
.detail("Begin", begin)
|
||||
.detail("OldEpoch", old.epochEnd)
|
||||
.detail("RecoveredAt", recoveredAt.present() ? recoveredAt.get() : -1)
|
||||
.detail("FirstOld", firstOld);
|
||||
//FIXME: do this merge on one of the logs in the other data center to avoid sending multiple copies across the WAN
|
||||
return makeReference<ILogSystem::SetPeekCursor>(
|
||||
localSets, bestSet, localSets[bestSet]->bestLocationFor(tag), tag, begin,
|
||||
|
@ -216,8 +216,10 @@ struct RecruitFromConfigurationReply {
|
||||
std::vector<WorkerInterface> grvProxies;
|
||||
std::vector<WorkerInterface> resolvers;
|
||||
std::vector<WorkerInterface> storageServers;
|
||||
std::vector<WorkerInterface> oldLogRouters;
|
||||
Optional<Key> dcId;
|
||||
std::vector<WorkerInterface> oldLogRouters; // During recovery, log routers for older generations will be recruited.
|
||||
Optional<Key> dcId; // dcId is where master is recruited. It prefers to be in configuration.primaryDcId, but
|
||||
// it can be recruited from configuration.secondaryDc: The dcId will be the secondaryDcId and
|
||||
// this generation's primaryDC in memory is different from configuration.primaryDcId.
|
||||
bool satelliteFallback;
|
||||
|
||||
RecruitFromConfigurationReply() : satelliteFallback(false) {}
|
||||
|
@ -1537,7 +1537,9 @@ private:
|
||||
maxLogsSize = maxLogs * rollsize;
|
||||
}
|
||||
}
|
||||
machineId = getSharedMemoryMachineId().toString();
|
||||
if (!zoneId.present() && !(localities.isPresent(LocalityData::keyZoneId) && localities.isPresent(LocalityData::keyMachineId))) {
|
||||
machineId = getSharedMemoryMachineId().toString();
|
||||
}
|
||||
if (!localities.isPresent(LocalityData::keyZoneId))
|
||||
localities.set(LocalityData::keyZoneId, zoneId.present() ? zoneId : machineId);
|
||||
|
||||
|
@ -650,6 +650,9 @@ ACTOR Future<vector<Standalone<CommitTransactionRef>>> recruitEverything( Refere
|
||||
.detail("GrvProxies", recruits.grvProxies.size())
|
||||
.detail("TLogs", recruits.tLogs.size())
|
||||
.detail("Resolvers", recruits.resolvers.size())
|
||||
.detail("SatelliteTLogs", recruits.satelliteTLogs.size())
|
||||
.detail("OldLogRouters", recruits.oldLogRouters.size())
|
||||
.detail("StorageServers", recruits.storageServers.size())
|
||||
.detail("BackupWorkers", self->backupWorkers.size())
|
||||
.trackLatest("MasterRecoveryState");
|
||||
|
||||
|
@ -744,6 +744,7 @@ public:
|
||||
return counters.bytesInput.getValue() - counters.bytesDurable.getValue();
|
||||
}
|
||||
|
||||
// penalty used by loadBalance() to balance requests among SSes. We prefer SS with less write queue size.
|
||||
double getPenalty() {
|
||||
return std::max(std::max(1.0, (queueSize() - (SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER -
|
||||
2.0 * SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER)) /
|
||||
|
@ -223,6 +223,7 @@ public: // workload functions
|
||||
snapFailed = true;
|
||||
break;
|
||||
}
|
||||
wait(delay(5.0));
|
||||
}
|
||||
}
|
||||
CSimpleIni ini;
|
||||
|
Loading…
x
Reference in New Issue
Block a user