From 73ad702d145c09b71dbf332b5d9951c128187fe1 Mon Sep 17 00:00:00 2001 From: Evan Tschannen <ejt@apple.com> Date: Wed, 22 Jan 2020 15:41:22 -0800 Subject: [PATCH] Clients which fetch status should not disconnect from the coordinators and cluster controller between each retrieval --- fdbcli/fdbcli.actor.cpp | 14 ++++----- fdbclient/DatabaseBackupAgent.actor.cpp | 4 +-- fdbclient/DatabaseContext.h | 4 +++ fdbclient/Knobs.cpp | 1 + fdbclient/Knobs.h | 1 + fdbclient/ReadYourWrites.actor.cpp | 6 ++-- fdbclient/StatusClient.actor.cpp | 31 +++++++++++++++---- fdbclient/StatusClient.h | 3 +- .../workloads/DDMetricsExclude.actor.cpp | 2 +- fdbserver/workloads/StatusWorkload.actor.cpp | 6 ++-- 10 files changed, 49 insertions(+), 23 deletions(-) diff --git a/fdbcli/fdbcli.actor.cpp b/fdbcli/fdbcli.actor.cpp index 6ae53ca79d..0224b7bcea 100644 --- a/fdbcli/fdbcli.actor.cpp +++ b/fdbcli/fdbcli.actor.cpp @@ -1600,9 +1600,9 @@ ACTOR Future<Void> timeWarning( double when, const char* msg ) { return Void(); } -ACTOR Future<Void> checkStatus(Future<Void> f, Reference<ClusterConnectionFile> clusterFile, bool displayDatabaseAvailable = true) { +ACTOR Future<Void> checkStatus(Future<Void> f, Database db, bool displayDatabaseAvailable = true) { wait(f); - StatusObject s = wait(StatusClient::statusFetcher(clusterFile)); + StatusObject s = wait(StatusClient::statusFetcher(db)); printf("\n"); printStatus(s, StatusClient::MINIMAL, displayDatabaseAvailable); printf("\n"); @@ -1644,7 +1644,7 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere state Optional<ConfigureAutoResult> conf; if( tokens[startToken] == LiteralStringRef("auto") ) { - StatusObject s = wait( makeInterruptable(StatusClient::statusFetcher( ccf )) ); + StatusObject s = wait( makeInterruptable(StatusClient::statusFetcher( db )) ); if(warn.isValid()) warn.cancel(); @@ -2057,7 +2057,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc } if(!force) { - StatusObject status = wait( makeInterruptable( StatusClient::statusFetcher( ccf ) ) ); + StatusObject status = wait( makeInterruptable( StatusClient::statusFetcher( db ) ) ); state std::string errorString = "ERROR: Could not calculate the impact of this exclude on the total free space in the cluster.\n" "Please try the exclude again in 30 seconds.\n" @@ -2632,7 +2632,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { if (!opt.exec.present()) { if(opt.initialStatusCheck) { - Future<Void> checkStatusF = checkStatus(Void(), db->getConnectionFile()); + Future<Void> checkStatusF = checkStatus(Void(), db); wait(makeInterruptable(success(checkStatusF))); } else { @@ -2670,7 +2670,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { linenoise.historyAdd(line); } - warn = checkStatus(timeWarning(5.0, "\nWARNING: Long delay (Ctrl-C to interrupt)\n"), db->getConnectionFile()); + warn = checkStatus(timeWarning(5.0, "\nWARNING: Long delay (Ctrl-C to interrupt)\n"), db); try { state UID randomID = deterministicRandom()->randomUniqueID(); @@ -2815,7 +2815,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { continue; } - StatusObject s = wait(makeInterruptable(StatusClient::statusFetcher(db->getConnectionFile()))); + StatusObject s = wait(makeInterruptable(StatusClient::statusFetcher(db))); if (!opt.exec.present()) printf("\n"); printStatus(s, level); diff --git a/fdbclient/DatabaseBackupAgent.actor.cpp b/fdbclient/DatabaseBackupAgent.actor.cpp index d9e05da706..c3fb295622 100644 --- a/fdbclient/DatabaseBackupAgent.actor.cpp +++ b/fdbclient/DatabaseBackupAgent.actor.cpp @@ -1959,8 +1959,8 @@ public: } if (!g_network->isSimulated() && !forceAction) { - state StatusObject srcStatus = wait(StatusClient::statusFetcher(backupAgent->taskBucket->src->getConnectionFile())); - StatusObject destStatus = wait(StatusClient::statusFetcher(dest->getConnectionFile())); + state StatusObject srcStatus = wait(StatusClient::statusFetcher(backupAgent->taskBucket->src)); + StatusObject destStatus = wait(StatusClient::statusFetcher(dest)); checkAtomicSwitchOverConfig(srcStatus, destStatus, tagName); } diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index 7b4991f4a4..cad13bc059 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -191,6 +191,10 @@ public: Future<Void> clientInfoMonitor; Future<Void> connected; + Reference<AsyncVar<Optional<ClusterInterface>>> statusClusterInterface; + Future<Void> statusLeaderMon; + double lastStatusFetch; + int apiVersion; int mvCacheInsertLocation; diff --git a/fdbclient/Knobs.cpp b/fdbclient/Knobs.cpp index 921d6878fb..0ebc528bca 100644 --- a/fdbclient/Knobs.cpp +++ b/fdbclient/Knobs.cpp @@ -46,6 +46,7 @@ ClientKnobs::ClientKnobs(bool randomize) { init( CLIENT_EXAMPLE_AMOUNT, 20 ); init( MAX_CLIENT_STATUS_AGE, 1.0 ); init( MAX_PROXY_CONNECTIONS, 5 ); if( randomize && BUGGIFY ) MAX_PROXY_CONNECTIONS = 1; + init( STATUS_IDLE_TIMEOUT, 120.0 ); // wrong_shard_server sometimes comes from the only nonfailed server, so we need to avoid a fast spin diff --git a/fdbclient/Knobs.h b/fdbclient/Knobs.h index 25b26b0d12..dc98d159c7 100644 --- a/fdbclient/Knobs.h +++ b/fdbclient/Knobs.h @@ -45,6 +45,7 @@ public: int CLIENT_EXAMPLE_AMOUNT; double MAX_CLIENT_STATUS_AGE; int MAX_PROXY_CONNECTIONS; + double STATUS_IDLE_TIMEOUT; // wrong_shard_server sometimes comes from the only nonfailed server, so we need to avoid a fast spin double WRONG_SHARD_SERVER_DELAY; // SOMEDAY: This delay can limit performance of retrieving data when the cache is mostly wrong (e.g. dumping the database after a test) diff --git a/fdbclient/ReadYourWrites.actor.cpp b/fdbclient/ReadYourWrites.actor.cpp index 1d9efbef2d..b1d08dc8ee 100644 --- a/fdbclient/ReadYourWrites.actor.cpp +++ b/fdbclient/ReadYourWrites.actor.cpp @@ -1165,8 +1165,8 @@ Optional<Value> getValueFromJSON(StatusObject statusObj) { } } -ACTOR Future<Optional<Value>> getJSON(Reference<ClusterConnectionFile> clusterFile) { - StatusObject statusObj = wait(StatusClient::statusFetcher(clusterFile)); +ACTOR Future<Optional<Value>> getJSON(Database db) { + StatusObject statusObj = wait(StatusClient::statusFetcher(db)); return getValueFromJSON(statusObj); } @@ -1194,7 +1194,7 @@ Future< Optional<Value> > ReadYourWritesTransaction::get( const Key& key, bool s if (key == LiteralStringRef("\xff\xff/status/json")){ if (tr.getDatabase().getPtr() && tr.getDatabase()->getConnectionFile()) { - return getJSON(tr.getDatabase()->getConnectionFile()); + return getJSON(tr.getDatabase()); } else { return Optional<Value>(); diff --git a/fdbclient/StatusClient.actor.cpp b/fdbclient/StatusClient.actor.cpp index 4798b217f4..e27954f568 100644 --- a/fdbclient/StatusClient.actor.cpp +++ b/fdbclient/StatusClient.actor.cpp @@ -452,7 +452,7 @@ StatusObject getClientDatabaseStatus(StatusObjectReader client, StatusObjectRead return databaseStatus; } -ACTOR Future<StatusObject> statusFetcherImpl( Reference<ClusterConnectionFile> f ) { +ACTOR Future<StatusObject> statusFetcherImpl( Reference<ClusterConnectionFile> f, Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface) { if (!g_network) throw network_not_setup(); state StatusObject statusObj; @@ -462,13 +462,15 @@ ACTOR Future<StatusObject> statusFetcherImpl( Reference<ClusterConnectionFile> f // This could be read from the JSON but doing so safely is ugly so using a real var. state bool quorum_reachable = false; state int coordinatorsFaultTolerance = 0; - state Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface(new AsyncVar<Optional<ClusterInterface>>); + + if(!clusterInterface) { + clusterInterface = Reference<AsyncVar<Optional<ClusterInterface>>>(new AsyncVar<Optional<ClusterInterface>>); + state Future<Void> leaderMon = monitorLeader<ClusterInterface>(f, clusterInterface); + } try { state int64_t clientTime = time(0); - state Future<Void> leaderMon = monitorLeader<ClusterInterface>(f, clusterInterface); - StatusObject _statusObjClient = wait(clientStatusFetcher(f, &clientMessages, &quorum_reachable, &coordinatorsFaultTolerance)); statusObjClient = _statusObjClient; @@ -548,6 +550,23 @@ ACTOR Future<StatusObject> statusFetcherImpl( Reference<ClusterConnectionFile> f return statusObj; } -Future<StatusObject> StatusClient::statusFetcher( Reference<ClusterConnectionFile> clusterFile ) { - return statusFetcherImpl(clusterFile); +ACTOR Future<Void> timeoutMonitorLeader(Database db) { + state Future<Void> leadMon = monitorLeader<ClusterInterface>(db->getConnectionFile(), db->statusClusterInterface); + loop { + wait(delay(CLIENT_KNOBS->STATUS_IDLE_TIMEOUT + 0.00001 + db->lastStatusFetch - now())); + if(now() - db->lastStatusFetch > CLIENT_KNOBS->STATUS_IDLE_TIMEOUT) { + db->statusClusterInterface = Reference<AsyncVar<Optional<ClusterInterface>>>(); + return Void(); + } + } +} + +Future<StatusObject> StatusClient::statusFetcher( Database db ) { + db->lastStatusFetch = now(); + if(!db->statusClusterInterface) { + db->statusClusterInterface = Reference<AsyncVar<Optional<ClusterInterface>>>(new AsyncVar<Optional<ClusterInterface>>); + db->statusLeaderMon = timeoutMonitorLeader(db); + } + + return statusFetcherImpl(db->getConnectionFile(), db->statusClusterInterface); } diff --git a/fdbclient/StatusClient.h b/fdbclient/StatusClient.h index 5a78b9b20f..6b780163a4 100755 --- a/fdbclient/StatusClient.h +++ b/fdbclient/StatusClient.h @@ -23,11 +23,12 @@ #include "flow/flow.h" #include "fdbclient/Status.h" +#include "fdbclient/DatabaseContext.h" class StatusClient { public: enum StatusLevel { MINIMAL = 0, NORMAL = 1, DETAILED = 2, JSON = 3 }; - static Future<StatusObject> statusFetcher(Reference<ClusterConnectionFile> clusterFile); + static Future<StatusObject> statusFetcher(Database db); }; #endif \ No newline at end of file diff --git a/fdbserver/workloads/DDMetricsExclude.actor.cpp b/fdbserver/workloads/DDMetricsExclude.actor.cpp index 8c72c1637f..e616eeb8b1 100644 --- a/fdbserver/workloads/DDMetricsExclude.actor.cpp +++ b/fdbserver/workloads/DDMetricsExclude.actor.cpp @@ -46,7 +46,7 @@ struct DDMetricsExcludeWorkload : TestWorkload { ACTOR static Future<double> getMovingDataAmount(Database cx, DDMetricsExcludeWorkload* self) { try { - StatusObject statusObj = wait(StatusClient::statusFetcher(cx->getConnectionFile())); + StatusObject statusObj = wait(StatusClient::statusFetcher(cx)); StatusObjectReader statusObjCluster; ((StatusObjectReader)statusObj).get("cluster", statusObjCluster); StatusObjectReader statusObjData; diff --git a/fdbserver/workloads/StatusWorkload.actor.cpp b/fdbserver/workloads/StatusWorkload.actor.cpp index ee67fe4115..e9bfa5ab88 100644 --- a/fdbserver/workloads/StatusWorkload.actor.cpp +++ b/fdbserver/workloads/StatusWorkload.actor.cpp @@ -69,7 +69,7 @@ struct StatusWorkload : TestWorkload { if (clientId != 0) return Void(); - return success(timeout(fetcher(cx->getConnectionFile(), this), testDuration)); + return success(timeout(fetcher(cx, this), testDuration)); } virtual Future<bool> check(Database const& cx) { return errors.getValue() == 0; @@ -161,7 +161,7 @@ struct StatusWorkload : TestWorkload { } } - ACTOR Future<Void> fetcher(Reference<ClusterConnectionFile> connFile, StatusWorkload *self) { + ACTOR Future<Void> fetcher(Database cx, StatusWorkload *self) { state double lastTime = now(); loop{ @@ -170,7 +170,7 @@ struct StatusWorkload : TestWorkload { // Since we count the requests that start, we could potentially never really hear back? ++self->requests; state double issued = now(); - StatusObject result = wait(StatusClient::statusFetcher(connFile)); + StatusObject result = wait(StatusClient::statusFetcher(cx)); ++self->replies; BinaryWriter br(AssumeVersion(currentProtocolVersion)); save(br, result);