From c7938c1be386fa13b17e0db1e2c0c83caa746d51 Mon Sep 17 00:00:00 2001 From: Markus Pilman <markus.pilman@snowflake.com> Date: Fri, 13 Nov 2020 11:00:38 -0700 Subject: [PATCH 01/28] don't generate machine id if it is set by the user --- fdbserver/fdbserver.actor.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fdbserver/fdbserver.actor.cpp b/fdbserver/fdbserver.actor.cpp index 1b938cffa5..69b5e0cc5b 100644 --- a/fdbserver/fdbserver.actor.cpp +++ b/fdbserver/fdbserver.actor.cpp @@ -1504,7 +1504,9 @@ private: maxLogsSize = maxLogs * rollsize; } } - machineId = getSharedMemoryMachineId().toString(); + if (!zoneId.present() && !(localities.isPresent(LocalityData::keyZoneId) && localities.isPresent(LocalityData::keyMachineId))) { + machineId = getSharedMemoryMachineId().toString(); + } if (!localities.isPresent(LocalityData::keyZoneId)) localities.set(LocalityData::keyZoneId, zoneId.present() ? zoneId : machineId); From 2e5e906be3f2ac0b34746b181879b2107d8d1b38 Mon Sep 17 00:00:00 2001 From: Andrew Noyes <andrew.noyes@snowflake.com> Date: Tue, 9 Feb 2021 00:38:14 +0000 Subject: [PATCH 02/28] Fix comparison of iterators from different containers --- fdbserver/MoveKeys.actor.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 2e2b03f88c..95fef31fd6 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -238,7 +238,6 @@ ACTOR Future<vector<vector<UID>>> additionalSources(Standalone<RangeResultRef> s std::map<UID, StorageServerInterface> ssiMap; for(int s=0; s<serverListValues.size(); s++) { - auto si = decodeServerListValue(serverListValues[s].get()); StorageServerInterface ssi = decodeServerListValue(serverListValues[s].get()); ssiMap[ssi.id()] = ssi; } @@ -258,7 +257,7 @@ ACTOR Future<vector<vector<UID>>> additionalSources(Standalone<RangeResultRef> s } for(int s=0; s<dest.size(); s++) { - if( std::find(src.begin(), src.end(), dest[s]) == dest.end() ) { + if (std::find(src.begin(), src.end(), dest[s]) == src.end()) { destInterfs.push_back( ssiMap[dest[s]] ); } } From cb196daefe49df21330f8ff0aaa26d0549879a27 Mon Sep 17 00:00:00 2001 From: Andrew Noyes <andrew.noyes@snowflake.com> Date: Thu, 4 Feb 2021 02:37:51 +0000 Subject: [PATCH 03/28] Fix bugs turned up by _GLIBCXX_DEBUG Compiling with -D_GLIBCXX_DEBUG enables libstc++ "debug mode", where additional debug information is tracked with iterators and reported if iterators are misused. This turned up two bugs. I threw in removing dead code and avoiding an unnecessary map lookup while I was in the neighborhood. --- fdbrpc/HealthMonitor.actor.cpp | 10 ++++++---- fdbserver/MoveKeys.actor.cpp | 3 +-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/fdbrpc/HealthMonitor.actor.cpp b/fdbrpc/HealthMonitor.actor.cpp index bf03370fd2..fa549a50f2 100644 --- a/fdbrpc/HealthMonitor.actor.cpp +++ b/fdbrpc/HealthMonitor.actor.cpp @@ -29,10 +29,12 @@ void HealthMonitor::reportPeerClosed(const NetworkAddress& peerAddress) { } void HealthMonitor::purgeOutdatedHistory() { - for (auto it : peerClosedHistory) { - if (it.first < now() - FLOW_KNOBS->HEALTH_MONITOR_CLIENT_REQUEST_INTERVAL_SECS) { - peerClosedNum[it.second] -= 1; - ASSERT(peerClosedNum[it.second] >= 0); + for (auto it = peerClosedHistory.begin(); it != peerClosedHistory.end();) { + if (it->first < now() - FLOW_KNOBS->HEALTH_MONITOR_CLIENT_REQUEST_INTERVAL_SECS) { + auto& count = peerClosedNum[it->second]; + --count; + ASSERT(count >= 0); + ++it; // Increment before pop_front to avoid iterator invalidation peerClosedHistory.pop_front(); } else { break; diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index eea20328cd..32a044f613 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -240,7 +240,6 @@ ACTOR Future<vector<vector<UID>>> additionalSources(Standalone<RangeResultRef> s std::map<UID, StorageServerInterface> ssiMap; for(int s=0; s<serverListValues.size(); s++) { - auto si = decodeServerListValue(serverListValues[s].get()); StorageServerInterface ssi = decodeServerListValue(serverListValues[s].get()); ssiMap[ssi.id()] = ssi; } @@ -260,7 +259,7 @@ ACTOR Future<vector<vector<UID>>> additionalSources(Standalone<RangeResultRef> s } for(int s=0; s<dest.size(); s++) { - if( std::find(src.begin(), src.end(), dest[s]) == dest.end() ) { + if (std::find(src.begin(), src.end(), dest[s]) == src.end()) { destInterfs.push_back( ssiMap[dest[s]] ); } } From 9122be4d8142a7ac103d2581923fcf10b4148257 Mon Sep 17 00:00:00 2001 From: Meng Xu <meng_xu@apple.com> Date: Wed, 10 Feb 2021 13:45:06 -0800 Subject: [PATCH 04/28] Add comments to HA code and loadBalance code --- bindings/flow/fdb_flow.actor.cpp | 1 + fdbclient/StorageServerInterface.h | 2 +- fdbrpc/LoadBalance.actor.h | 16 ++++++++++++---- fdbrpc/MultiInterface.h | 5 +++-- fdbserver/ClusterController.actor.cpp | 1 + fdbserver/MasterProxyServer.actor.cpp | 2 +- fdbserver/TagPartitionedLogSystem.actor.cpp | 7 ++++++- fdbserver/WorkerInterface.actor.h | 7 +++++-- fdbserver/storageserver.actor.cpp | 1 + 9 files changed, 31 insertions(+), 11 deletions(-) diff --git a/bindings/flow/fdb_flow.actor.cpp b/bindings/flow/fdb_flow.actor.cpp index 3ed3d93700..6f90be62f9 100644 --- a/bindings/flow/fdb_flow.actor.cpp +++ b/bindings/flow/fdb_flow.actor.cpp @@ -94,6 +94,7 @@ void fdb_flow_test() { g_network->run(); } +// FDB obj used by bindings namespace FDB { class DatabaseImpl : public Database, NonCopyable { public: diff --git a/fdbclient/StorageServerInterface.h b/fdbclient/StorageServerInterface.h index eb6fbf1d38..08fd31ef3d 100644 --- a/fdbclient/StorageServerInterface.h +++ b/fdbclient/StorageServerInterface.h @@ -137,7 +137,7 @@ struct StorageInfo : NonCopyable, public ReferenceCounted<StorageInfo> { }; struct ServerCacheInfo { - std::vector<Tag> tags; + std::vector<Tag> tags; // all tags in both primary and remote DC for the key-range std::vector<Reference<StorageInfo>> src_info; std::vector<Reference<StorageInfo>> dest_info; diff --git a/fdbrpc/LoadBalance.actor.h b/fdbrpc/LoadBalance.actor.h index 71c89ea406..41987e4a8c 100644 --- a/fdbrpc/LoadBalance.actor.h +++ b/fdbrpc/LoadBalance.actor.h @@ -169,7 +169,9 @@ void addLaggingRequest(Future<Optional<Reply>> reply, Promise<Void> requestFinis // Keep trying to get a reply from any of servers until success or cancellation; tries to take into account // failMon's information for load balancing and avoiding failed servers -// If ALL the servers are failed and the list of servers is not fresh, throws an exception to let the caller refresh the list of servers +// If ALL the servers are failed and the list of servers is not fresh, throws an exception to let the caller refresh the +// list of servers When model is set, load balance among alternatives in the same DC, aiming to balance request queue +// length on these interfaces. If too many interfaces in the same DC are bad, try remote interfaces. ACTOR template <class Interface, class Request, class Multi> Future< REPLY_TYPE(Request) > loadBalance( Reference<MultiInterface<Multi>> alternatives, @@ -206,10 +208,15 @@ Future< REPLY_TYPE(Request) > loadBalance( int badServers = 0; for(int i=0; i<alternatives->size(); i++) { + // countBest(): the number of alternatives in the same locality (i.e., DC by default) as alternatives[0]. + // if the if-statement is correct, it won't try to send requests to the remote ones. if(badServers < std::min(i, FLOW_KNOBS->LOAD_BALANCE_MAX_BAD_OPTIONS + 1) && i == alternatives->countBest()) { + // If there are not enough local servers that are bad, + // we won't even try to send requests to remote servers. + // An interface is bad if its endpoint fails or if reply from the interface has a high penalty. break; } - + RequestStream<Request> const* thisStream = &alternatives->get( i, channel ); if (!IFailureMonitor::failureMonitor().getState( thisStream->getEndpoint() ).failed) { auto& qd = model->getMeasurement(thisStream->getEndpoint().token.first()); @@ -217,9 +224,10 @@ Future< REPLY_TYPE(Request) > loadBalance( double thisMetric = qd.smoothOutstanding.smoothTotal(); double thisTime = qd.latency; if(FLOW_KNOBS->LOAD_BALANCE_PENALTY_IS_BAD && qd.penalty > 1.001) { + // penalty is sent from server. ++badServers; } - + if(thisMetric < bestMetric) { if(i != bestAlt) { nextAlt = bestAlt; @@ -249,7 +257,7 @@ Future< REPLY_TYPE(Request) > loadBalance( if(now() > qd.failedUntil) { double thisMetric = qd.smoothOutstanding.smoothTotal(); double thisTime = qd.latency; - + if( thisMetric < nextMetric ) { nextAlt = i; nextMetric = thisMetric; diff --git a/fdbrpc/MultiInterface.h b/fdbrpc/MultiInterface.h index 5ae2c7b2e1..c9ae560946 100644 --- a/fdbrpc/MultiInterface.h +++ b/fdbrpc/MultiInterface.h @@ -43,7 +43,7 @@ std::string describe( KVPair<K,V> const& p ) { return format("%d ", p.k) + descr template <class T> struct ReferencedInterface : public ReferenceCounted<ReferencedInterface<T>> { T interf; - int8_t distance; + int8_t distance; // choose one enum value in LBDistance type std::string toString() const { return interf.toString(); } @@ -222,7 +222,8 @@ public: } private: vector<Reference<ReferencedInterface<T>>> alternatives; - int16_t bestCount; + int16_t bestCount; // The number of interfaces in the same location as alternatives[0]. The same location means + // DC by default and machine if more than one alternatives are on the same machine). }; template <class Ar, class T> void load(Ar& ar, Reference<MultiInterface<T>>&) { ASSERT(false); } //< required for Future<T> diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index 8cd2b87d7f..9f3d85f190 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -945,6 +945,7 @@ public: } } + // Check if txn system is recruited successfully in each region void checkRegions(const std::vector<RegionInfo>& regions) { if(desiredDcIds.get().present() && desiredDcIds.get().get().size() == 2 && desiredDcIds.get().get()[0].get() == regions[0].dcId && desiredDcIds.get().get()[1].get() == regions[1].dcId) { return; diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index d6858f3918..2397e562a7 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -420,7 +420,7 @@ struct ProxyCommitData { uint64_t commitVersionRequestNumber; uint64_t mostRecentProcessedRequestNumber; KeyRangeMap<Deque<std::pair<Version,int>>> keyResolvers; - KeyRangeMap<ServerCacheInfo> keyInfo; + KeyRangeMap<ServerCacheInfo> keyInfo; // keyrange -> all storage servers in all DCs for the keyrange KeyRangeMap<bool> cacheInfo; std::map<Key, applyMutationsData> uid_applyMutationsData; bool firstProxy; diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index ab79ab7170..2b1288ffbe 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -1038,7 +1038,12 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS bestSet = bestSatelliteSet; } - TraceEvent("TLogPeekLogRouterOldSets", dbgid).detail("Tag", tag.toString()).detail("Begin", begin).detail("OldEpoch", old.epochEnd).detail("RecoveredAt", recoveredAt.present() ? recoveredAt.get() : -1).detail("FirstOld", firstOld); + TraceEvent("TLogPeekLogRouterOldSets", dbgid) + .detail("Tag", tag.toString()) + .detail("Begin", begin) + .detail("OldEpoch", old.epochEnd) + .detail("RecoveredAt", recoveredAt.present() ? recoveredAt.get() : -1) + .detail("FirstOld", firstOld); //FIXME: do this merge on one of the logs in the other data center to avoid sending multiple copies across the WAN return Reference<ILogSystem::SetPeekCursor>( new ILogSystem::SetPeekCursor( localSets, bestSet, localSets[bestSet]->bestLocationFor( tag ), tag, begin, firstOld && recoveredAt.present() ? recoveredAt.get() + 1 : old.epochEnd, true ) ); } diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index 9beab6c9d8..7a8b169bfe 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -210,8 +210,11 @@ struct RecruitFromConfigurationReply { std::vector<WorkerInterface> proxies; std::vector<WorkerInterface> resolvers; std::vector<WorkerInterface> storageServers; - std::vector<WorkerInterface> oldLogRouters; - Optional<Key> dcId; + std::vector<WorkerInterface> oldLogRouters; // why need oldLogRouters? + Optional<Key> dcId; // dcId is where master is recruited. It prefers to be in configuration.primaryDcId, but + // it can be recruited from configuration.secondaryDc: The dcId will be the secondaryDcId and + // this generation's primaryDC in memory is different from configuration.primaryDcId. + // when is dcId set? bool satelliteFallback; RecruitFromConfigurationReply() : satelliteFallback(false) {} diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 59653cd2b3..eb2f386b70 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -691,6 +691,7 @@ public: return counters.bytesInput.getValue() - counters.bytesDurable.getValue(); } + // penalty used by loadBalance() to balance requests among SSes. We prefer SS with less write queue size. double getPenalty() { return std::max(std::max(1.0, (queueSize() - (SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER - 2.0 * SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER)) / From bb321982671fa4860d7ec9a8238043508beb5134 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard <trevor.clinkenbeard@snowflake.com> Date: Wed, 17 Feb 2021 00:47:32 -0800 Subject: [PATCH 05/28] Reboot simulated process on io_timeout error --- fdbserver/SimulatedCluster.actor.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fdbserver/SimulatedCluster.actor.cpp b/fdbserver/SimulatedCluster.actor.cpp index 0ace285664..b72ffe72bc 100644 --- a/fdbserver/SimulatedCluster.actor.cpp +++ b/fdbserver/SimulatedCluster.actor.cpp @@ -202,6 +202,10 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(Reference<ClusterConnec if(g_network->isSimulated() && e.code() != error_code_io_timeout && (bool)g_network->global(INetwork::enASIOTimedOut)) TraceEvent(SevError, "IOTimeoutErrorSuppressed").detail("ErrorCode", e.code()).detail("RandomId", randomId).backtrace(); + if (e.code() == error_code_io_timeout && !onShutdown.isReady()) { + onShutdown = ISimulator::RebootProcess; + } + if (onShutdown.isReady() && onShutdown.isError()) throw onShutdown.getError(); if(e.code() != error_code_actor_cancelled) printf("SimulatedFDBDTerminated: %s\n", e.what()); From 5324f127c97f7478a9bbd421a183679c6fe554ca Mon Sep 17 00:00:00 2001 From: Russell Sears <russell_sears@apple.com> Date: Wed, 17 Feb 2021 21:09:57 +0000 Subject: [PATCH 06/28] Address skipped PR comments from multithreaded client PR --- documentation/sphinx/source/api-general.rst | 2 +- fdbclient/MultiVersionTransaction.actor.cpp | 33 ++++++++++++++------- fdbclient/vexillographer/fdb.options | 2 +- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/documentation/sphinx/source/api-general.rst b/documentation/sphinx/source/api-general.rst index 528728696d..55ecb5c25e 100644 --- a/documentation/sphinx/source/api-general.rst +++ b/documentation/sphinx/source/api-general.rst @@ -132,7 +132,7 @@ If you suspect that a client process's workload may be saturating the network th Multi-threaded Client ===================== -FoundationDB client library can start multiple worker threads for each version of client that is loaded. Every single cluster will be serviced by a one client thread. If the client is connected to only one cluster, exactly one thread would be active and the rest will remain idle. Hence, using this feature is useful when the client is actively using more than one cluster. +FoundationDB client library can start multiple worker threads for each version of client that is loaded. Every single cluster will be serviced by one of the client threads. If the client is connected to only one cluster, exactly one thread would be active and the rest will remain idle. Hence, using this feature is useful when the client is actively using more than one cluster. Clients can be configured to use worker-threads by setting the ``FDBNetworkOptions::CLIENT_THREADS_PER_VERSION`` option. diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index 9379bb6eea..5c2bb34f93 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -293,9 +293,7 @@ void DLApi::init() { if (unlinkOnLoad) { int err = unlink(fdbCPath.c_str()); if (err) { - TraceEvent(SevError, "ErrorUnlinkingTempClientLibraryFile") - .detail("errno", errno) - .detail("LibraryPath", fdbCPath); + TraceEvent(SevError, "ErrorUnlinkingTempClientLibraryFile").GetLastError().detail("LibraryPath", fdbCPath); throw platform_error(); } } @@ -1015,14 +1013,20 @@ void MultiVersionApi::setCallbacksOnExternalThreads() { } void MultiVersionApi::addExternalLibrary(std::string path) { std::string filename = basename(path); - // we need at least one external library thread to run this library. - threadCount = std::max(threadCount, 1); if (filename.empty() || !fileExists(path)) { TraceEvent("ExternalClientNotFound").detail("LibraryPath", filename); throw file_not_found(); } + MutexHolder holder(lock); + if (networkStartSetup) { + throw invalid_option(); // SOMEDAY: it might be good to allow clients to be added after the network is setup + } + + // external libraries always run on their own thread; ensure we allocate at least one thread to run this library. + threadCount = std::max(threadCount, 1); + if (externalClientDescriptions.count(filename) == 0) { TraceEvent("AddingExternalClient").detail("LibraryPath", filename); externalClientDescriptions.emplace(std::make_pair(filename, ClientDesc(path, true))); @@ -1032,7 +1036,13 @@ void MultiVersionApi::addExternalLibrary(std::string path) { void MultiVersionApi::addExternalLibraryDirectory(std::string path) { TraceEvent("AddingExternalClientDirectory").detail("Directory", path); std::vector<std::string> files = platform::listFiles(path, DYNAMIC_LIB_EXT); - // we need at least one external library thread to run these libraries. + + MutexHolder holder(lock); + if (networkStartSetup) { + throw invalid_option(); // SOMEDAY: it might be good to allow clients to be added after the network is setup + } + + // external libraries always run on their own thread; ensure we allocate at least one thread to run this library. threadCount = std::max(threadCount, 1); for(auto filename : files) { @@ -1074,13 +1084,15 @@ std::vector<std::pair<std::string, bool>> MultiVersionApi::copyExternalLibraryPe break; } if (readCount == -1) { - throw platform_error; + TraceEvent("ExternalClientCopyFailedReadError").GetLastError().detail("LibraryPath", path); + throw platform_error(); } ssize_t written = 0; while (written != readCount) { ssize_t writeCount = write(tempFd, buf + written, readCount - written); if (writeCount == -1) { - throw platform_error; + TraceEvent("ExternalClientCopyFailedWriteError").GetLastError().detail("LibraryPath", path); + throw platform_error(); } written += writeCount; } @@ -1097,7 +1109,8 @@ std::vector<std::pair<std::string, bool>> MultiVersionApi::copyExternalLibraryPe #else std::vector<std::pair< std::string, bool> > MultiVersionApi::copyExternalLibraryPerThread(std::string path) { if (threadCount > 1) { - throw platform_error(); // not supported + TraceEvent(SevError, "MultipleClientThreadsUnsupportedOnWindows"); + throw unsupported_operation(); } std::vector<std::pair<std::string, bool>> paths; paths.push_back({ path , false }); @@ -1356,7 +1369,7 @@ Reference<IDatabase> MultiVersionApi::createDatabase(const char *clusterFilePath } std::string clusterFile(clusterFilePath); - if (threadCount > 1) { + if (threadCount > 1 || localClientDisabled) { ASSERT(localClientDisabled); ASSERT(!bypassMultiClientApi); diff --git a/fdbclient/vexillographer/fdb.options b/fdbclient/vexillographer/fdb.options index c6cf25df27..837a2f27c8 100644 --- a/fdbclient/vexillographer/fdb.options +++ b/fdbclient/vexillographer/fdb.options @@ -110,7 +110,7 @@ description is not currently required but encouraged. <Option name="disable_local_client" code="64" description="Prevents connections through the local client, allowing only connections through externally loaded client libraries." /> <Option name="client_threads_per_version" code="65" - paramType="Int" paramDescription="Number of client threads to be spawned. Each server will be serviced by a single client thread." + paramType="Int" paramDescription="Number of client threads to be spawned. Each cluster will be serviced by a single client thread." description="Spawns multiple worker threads for each version of the client that is loaded. Setting this to a number greater than one implies disable_local_client." /> <Option name="disable_client_statistics_logging" code="70" description="Disables logging of client statistics, such as sampled transaction activity." /> From 471a3489fb3fae6fa218821ee5ba874ec70a67b3 Mon Sep 17 00:00:00 2001 From: Meng Xu <meng_xu@apple.com> Date: Wed, 17 Feb 2021 14:43:31 -0800 Subject: [PATCH 07/28] Resolve review comments and add trace fields to MasterRecoveryState --- bindings/flow/fdb_flow.actor.cpp | 2 +- fdbrpc/LoadBalance.actor.h | 4 ++-- fdbrpc/MultiInterface.h | 2 +- fdbrpc/QueueModel.cpp | 2 +- fdbserver/TagPartitionedLogSystem.actor.cpp | 1 + fdbserver/WorkerInterface.actor.h | 2 +- fdbserver/masterserver.actor.cpp | 17 ++++++++++------- 7 files changed, 17 insertions(+), 13 deletions(-) diff --git a/bindings/flow/fdb_flow.actor.cpp b/bindings/flow/fdb_flow.actor.cpp index 6f90be62f9..13e371cc01 100644 --- a/bindings/flow/fdb_flow.actor.cpp +++ b/bindings/flow/fdb_flow.actor.cpp @@ -94,7 +94,7 @@ void fdb_flow_test() { g_network->run(); } -// FDB obj used by bindings +// FDB object used by bindings namespace FDB { class DatabaseImpl : public Database, NonCopyable { public: diff --git a/fdbrpc/LoadBalance.actor.h b/fdbrpc/LoadBalance.actor.h index 41987e4a8c..9884d88033 100644 --- a/fdbrpc/LoadBalance.actor.h +++ b/fdbrpc/LoadBalance.actor.h @@ -170,7 +170,7 @@ void addLaggingRequest(Future<Optional<Reply>> reply, Promise<Void> requestFinis // Keep trying to get a reply from any of servers until success or cancellation; tries to take into account // failMon's information for load balancing and avoiding failed servers // If ALL the servers are failed and the list of servers is not fresh, throws an exception to let the caller refresh the -// list of servers When model is set, load balance among alternatives in the same DC, aiming to balance request queue +// list of servers. When model is set, load balance among alternatives in the same DC, aiming to balance request queue // length on these interfaces. If too many interfaces in the same DC are bad, try remote interfaces. ACTOR template <class Interface, class Request, class Multi> Future< REPLY_TYPE(Request) > loadBalance( @@ -211,7 +211,7 @@ Future< REPLY_TYPE(Request) > loadBalance( // countBest(): the number of alternatives in the same locality (i.e., DC by default) as alternatives[0]. // if the if-statement is correct, it won't try to send requests to the remote ones. if(badServers < std::min(i, FLOW_KNOBS->LOAD_BALANCE_MAX_BAD_OPTIONS + 1) && i == alternatives->countBest()) { - // If there are not enough local servers that are bad, + // If the number of local bad servers is limited, // we won't even try to send requests to remote servers. // An interface is bad if its endpoint fails or if reply from the interface has a high penalty. break; diff --git a/fdbrpc/MultiInterface.h b/fdbrpc/MultiInterface.h index c9ae560946..43dcacc6c7 100644 --- a/fdbrpc/MultiInterface.h +++ b/fdbrpc/MultiInterface.h @@ -43,7 +43,7 @@ std::string describe( KVPair<K,V> const& p ) { return format("%d ", p.k) + descr template <class T> struct ReferencedInterface : public ReferenceCounted<ReferencedInterface<T>> { T interf; - int8_t distance; // choose one enum value in LBDistance type + int8_t distance; // one of enum values in struct LBDistance std::string toString() const { return interf.toString(); } diff --git a/fdbrpc/QueueModel.cpp b/fdbrpc/QueueModel.cpp index 360b10b581..22ca09607d 100644 --- a/fdbrpc/QueueModel.cpp +++ b/fdbrpc/QueueModel.cpp @@ -48,7 +48,7 @@ void QueueModel::endRequest( uint64_t id, double latency, double penalty, double } QueueData& QueueModel::getMeasurement( uint64_t id ) { - return data[id]; + return data[id]; // return smoothed penalty } double QueueModel::addRequest( uint64_t id ) { diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 2b1288ffbe..8d0db8de63 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -340,6 +340,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS return logSystem; } + // Convert TagPartitionedLogSystem to DBCoreState and override input newState as return value void toCoreState(DBCoreState& newState) final { if( recoveryComplete.isValid() && recoveryComplete.isError() ) throw recoveryComplete.getError(); diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index 7a8b169bfe..098a5e566b 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -210,7 +210,7 @@ struct RecruitFromConfigurationReply { std::vector<WorkerInterface> proxies; std::vector<WorkerInterface> resolvers; std::vector<WorkerInterface> storageServers; - std::vector<WorkerInterface> oldLogRouters; // why need oldLogRouters? + std::vector<WorkerInterface> oldLogRouters; // During recovery, log routers for older generations will be recruited. Optional<Key> dcId; // dcId is where master is recruited. It prefers to be in configuration.primaryDcId, but // it can be recruited from configuration.secondaryDc: The dcId will be the secondaryDcId and // this generation's primaryDC in memory is different from configuration.primaryDcId. diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index 595e5cb177..8bd8da7812 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -610,13 +610,16 @@ ACTOR Future<vector<Standalone<CommitTransactionRef>>> recruitEverything( Refere self->backupWorkers.swap(recruits.backupWorkers); TraceEvent("MasterRecoveryState", self->dbgid) - .detail("StatusCode", RecoveryStatus::initializing_transaction_servers) - .detail("Status", RecoveryStatus::names[RecoveryStatus::initializing_transaction_servers]) - .detail("Proxies", recruits.proxies.size()) - .detail("TLogs", recruits.tLogs.size()) - .detail("Resolvers", recruits.resolvers.size()) - .detail("BackupWorkers", self->backupWorkers.size()) - .trackLatest("MasterRecoveryState"); + .detail("StatusCode", RecoveryStatus::initializing_transaction_servers) + .detail("Status", RecoveryStatus::names[RecoveryStatus::initializing_transaction_servers]) + .detail("Proxies", recruits.proxies.size()) + .detail("TLogs", recruits.tLogs.size()) + .detail("Resolvers", recruits.resolvers.size()) + .detail("SatelliteTLogs", recruits.satelliteTLogs.size()) + .detail("OldLogRouters", recruits.oldLogRouters.size()) + .detail("StorageServers", recruits.storageServers.size()) + .detail("BackupWorkers", self->backupWorkers.size()) + .trackLatest("MasterRecoveryState"); // Actually, newSeedServers does both the recruiting and initialization of the seed servers; so if this is a brand new database we are sort of lying that we are // past the recruitment phase. In a perfect world we would split that up so that the recruitment part happens above (in parallel with recruiting the transaction servers?). From 33eb1de00e43acbe2cc078815e4ef91656698778 Mon Sep 17 00:00:00 2001 From: Meng Xu <meng_xu@apple.com> Date: Fri, 19 Feb 2021 21:44:07 -0800 Subject: [PATCH 08/28] Add some comment to log system and resolve review comment by deleting my questions. --- fdbserver/ClusterController.actor.cpp | 1 + fdbserver/LogRouter.actor.cpp | 2 ++ fdbserver/LogSystem.h | 2 +- fdbserver/TagPartitionedLogSystem.actor.cpp | 2 ++ fdbserver/WorkerInterface.actor.h | 1 - 5 files changed, 6 insertions(+), 2 deletions(-) diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index 9f3d85f190..e1130b44a2 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -2091,6 +2091,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) { } } } + // Recruit storage cache role. The storage cache project is paused. Its code path is unlikely used. Optional<uint16_t> newStorageCache = req.storageCacheInterf.present() ? req.storageCacheInterf.get().first : Optional<uint16_t>(); auto& cacheInfo = self->id_worker[w.locality.processId()].storageCacheInfo; if (req.storageCacheInterf.present()) { diff --git a/fdbserver/LogRouter.actor.cpp b/fdbserver/LogRouter.actor.cpp index 3f12860cf1..c5ab4993b1 100644 --- a/fdbserver/LogRouter.actor.cpp +++ b/fdbserver/LogRouter.actor.cpp @@ -235,6 +235,8 @@ ACTOR Future<Void> waitForVersion( LogRouterData *self, Version ver ) { return Void(); } +// Log router (LR) asynchronously pull data from satellite tLogs (preferred) or primary tLogs at tag (self->routerTag) +// for the version range from the LR's current version (exclusive) to its epoch's end version or recovery version. ACTOR Future<Void> pullAsyncData( LogRouterData *self ) { state Future<Void> dbInfoChange = Void(); state Reference<ILogSystem::IPeekCursor> r; diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index 32a7caa27c..feec353fe3 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -60,7 +60,7 @@ public: Reference<LocalitySet> logServerSet; std::vector<int> logIndexArray; std::vector<LocalityEntry> logEntryArray; - bool isLocal; + bool isLocal; // true if the LogSet is in primary DC or primary DC's satellite int8_t locality; Version startVersion; std::vector<Future<TLogLockResult>> replies; diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 8d0db8de63..538dee6ec1 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -949,6 +949,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS } } + // Return a peek cursor that peeks data at tag from the begin version to either the recovery version if peeking at + // the latest epoch or the peek cursor's epoch's end version if peeking at an old epoch. Reference<IPeekCursor> peekLogRouter(UID dbgid, Version begin, Tag tag) final { bool found = false; for (const auto& log : tLogs) { diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index 098a5e566b..95d0eee13b 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -214,7 +214,6 @@ struct RecruitFromConfigurationReply { Optional<Key> dcId; // dcId is where master is recruited. It prefers to be in configuration.primaryDcId, but // it can be recruited from configuration.secondaryDc: The dcId will be the secondaryDcId and // this generation's primaryDC in memory is different from configuration.primaryDcId. - // when is dcId set? bool satelliteFallback; RecruitFromConfigurationReply() : satelliteFallback(false) {} From 12380176cecbec987f09e198112cd9c578e67bb5 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard <trevor.clinkenbeard@snowflake.com> Date: Sat, 20 Feb 2021 14:45:31 -0800 Subject: [PATCH 09/28] Add 5 second delay between SnapTest retries --- fdbserver/workloads/SnapTest.actor.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/fdbserver/workloads/SnapTest.actor.cpp b/fdbserver/workloads/SnapTest.actor.cpp index 85c5fbbd09..98f1800b11 100644 --- a/fdbserver/workloads/SnapTest.actor.cpp +++ b/fdbserver/workloads/SnapTest.actor.cpp @@ -219,6 +219,7 @@ public: // workload functions snapFailed = true; break; } + wait(delay(5.0)); } } CSimpleIni ini; From 32486a27855bb6d8a71b9cc62f32566bda46c53d Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard <trevor.clinkenbeard@snowflake.com> Date: Sat, 20 Feb 2021 18:24:21 -0800 Subject: [PATCH 10/28] Reenable tlog pops in ddSnapCreateCore even if some disable requests fail If some tlogs successfully disable pops but others fail to, we do not want to wait TLOG_IGNORE_POP_AUTO_ENABLE_DELAY seconds before reenabling pops --- fdbserver/DataDistribution.actor.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 0482b3d142..c4a4c8059e 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -5100,9 +5100,8 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID) .error(e, true /*includeCancelled */); - if (e.code() == error_code_snap_storage_failed - || e.code() == error_code_snap_tlog_failed - || e.code() == error_code_operation_cancelled) { + if (e.code() == error_code_snap_storage_failed || e.code() == error_code_snap_tlog_failed || + e.code() == error_code_operation_cancelled || e.code() == error_code_snap_disable_tlog_pop_failed) { // enable tlog pop on local tlog nodes std::vector<TLogInterface> tlogs = db->get().logSystemConfig.allLocalLogs(false); try { From 8873eb155799bb9dab7934a10e764b2661280e54 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard <trevor.clinkenbeard@snowflake.com> Date: Sun, 21 Feb 2021 17:38:29 -0800 Subject: [PATCH 11/28] Enforce ignorePopDeadline even when no pop requests are sent --- fdbserver/TLogServer.actor.cpp | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 494628a73f..a2cbcabd1b 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -1083,18 +1083,13 @@ ACTOR Future<Void> processPopRequests(TLogData* self, Reference<LogData> logData } } wait(waitForAll(ignoredPops)); + TraceEvent("ResetIgnorePopRequest") + .detail("IgnorePopRequest", self->ignorePopRequest) + .detail("IgnorePopDeadline", self->ignorePopDeadline); return Void(); } ACTOR Future<Void> tLogPop( TLogData* self, TLogPopRequest req, Reference<LogData> logData ) { - // timeout check for ignorePopRequest - if (self->ignorePopRequest && (g_network->now() > self->ignorePopDeadline)) { - TraceEvent("EnableTLogPlayAllIgnoredPops").detail("IgnoredPopDeadline", self->ignorePopDeadline); - wait(processPopRequests(self, logData)); - TraceEvent("ResetIgnorePopRequest") - .detail("IgnorePopRequest", self->ignorePopRequest) - .detail("IgnorePopDeadline", self->ignorePopDeadline); - } if (self->ignorePopRequest) { TraceEvent(SevDebug, "IgnoringPopRequest").detail("IgnorePopDeadline", self->ignorePopDeadline); @@ -2268,6 +2263,11 @@ ACTOR Future<Void> serveTLogInterface( TLogData* self, TLogInterface tli, Refere when( TLogSnapRequest snapReq = waitNext( tli.snapRequest.getFuture() ) ) { logData->addActor.send( tLogSnapCreate( snapReq, self, logData) ); } + when(wait(self->ignorePopRequest ? delayUntil(self->ignorePopDeadline) : Never())) { + TEST(true); // Hit ignorePopDeadline + TraceEvent("EnableTLogPlayAllIgnoredPops").detail("IgnoredPopDeadline", self->ignorePopDeadline); + logData->addActor.send(processPopRequests(self, logData)); + } } } From 6ddec2c32f1166665fcd13c797a6ce83d94ad8dd Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard <trevor.clinkenbeard@snowflake.com> Date: Mon, 22 Feb 2021 10:18:43 -0800 Subject: [PATCH 12/28] Fix typo --- fdbserver/TLogServer.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index e1d0722b8e..6520332811 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -606,7 +606,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> { specialCounter(cc, "QueueDiskBytesTotal", [tLogData](){ return tLogData->rawPersistentQueue->getStorageBytes().total; }); specialCounter(cc, "PeekMemoryReserved", [tLogData]() { return tLogData->peekMemoryLimiter.activePermits(); }); specialCounter(cc, "PeekMemoryRequestsStalled", [tLogData]() { return tLogData->peekMemoryLimiter.waiters(); }); - specialCounter(cc, "Geneartion", [this]() { return this->recoveryCount; }); + specialCounter(cc, "Generation", [this]() { return this->recoveryCount; }); } ~LogData() { From 56f191290001816b7a09b8b3ac6526782448e3da Mon Sep 17 00:00:00 2001 From: Andrew Noyes <andrew.noyes@snowflake.com> Date: Mon, 22 Feb 2021 22:56:30 +0000 Subject: [PATCH 13/28] Avoid using relative symlinks to work around cpack bug https://discourse.cmake.org/t/installing-a-symlink-with-a-relative-path-for-rpm/895 closes #2681 --- cmake/InstallLayout.cmake | 101 +++++++------------------------------- fdbbackup/CMakeLists.txt | 32 +++--------- 2 files changed, 26 insertions(+), 107 deletions(-) diff --git a/cmake/InstallLayout.cmake b/cmake/InstallLayout.cmake index cab747cae0..dd4821c86d 100644 --- a/cmake/InstallLayout.cmake +++ b/cmake/InstallLayout.cmake @@ -2,83 +2,6 @@ # Helper Functions ################################################################################ -function(install_symlink_impl) - if (NOT WIN32) - set(options "") - set(one_value_options TO DESTINATION) - set(multi_value_options COMPONENTS) - cmake_parse_arguments(SYM "${options}" "${one_value_options}" "${multi_value_options}" "${ARGN}") - - file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/symlinks) - get_filename_component(fname ${SYM_DESTINATION} NAME) - get_filename_component(dest_dir ${SYM_DESTINATION} DIRECTORY) - set(sl ${CMAKE_CURRENT_BINARY_DIR}/symlinks/${fname}) - execute_process(COMMAND ${CMAKE_COMMAND} -E create_symlink ${SYM_TO} ${sl}) - foreach(component IN LISTS SYM_COMPONENTS) - install(FILES ${sl} DESTINATION ${dest_dir} COMPONENT ${component}) - endforeach() - endif() -endfunction() - -function(install_symlink) - if(NOT WIN32 AND NOT OPEN_FOR_IDE) - set(options "") - set(one_value_options COMPONENT LINK_DIR FILE_DIR LINK_NAME FILE_NAME) - set(multi_value_options "") - cmake_parse_arguments(IN "${options}" "${one_value_options}" "${multi_value_options}" "${ARGN}") - - set(rel_path "") - string(REGEX MATCHALL "\\/" slashes "${IN_LINK_NAME}") - foreach(ignored IN LISTS slashes) - set(rel_path "../${rel_path}") - endforeach() - if("${IN_FILE_DIR}" MATCHES "bin") - if("${IN_LINK_DIR}" MATCHES "lib") - install_symlink_impl( - TO "../${rel_path}bin/${IN_FILE_NAME}" - DESTINATION "lib/${IN_LINK_NAME}" - COMPONENTS "${IN_COMPONENT}-tgz") - install_symlink_impl( - TO "../${rel_path}bin/${IN_FILE_NAME}" - DESTINATION "usr/lib64/${IN_LINK_NAME}" - COMPONENTS "${IN_COMPONENT}-el6" - "${IN_COMPONENT}-el7" - "${IN_COMPONENT}-deb") - install_symlink_impl( - TO "../${rel_path}bin/${IN_FILE_NAME}" - DESTINATION "usr/lib64/${IN_LINK_NAME}" - COMPONENTS "${IN_COMPONENT}-deb") - elseif("${IN_LINK_DIR}" MATCHES "bin") - install_symlink_impl( - TO "../${rel_path}bin/${IN_FILE_NAME}" - DESTINATION "bin/${IN_LINK_NAME}" - COMPONENTS "${IN_COMPONENT}-tgz") - install_symlink_impl( - TO "../${rel_path}bin/${IN_FILE_NAME}" - DESTINATION "usr/bin/${IN_LINK_NAME}" - COMPONENTS "${IN_COMPONENT}-el6" - "${IN_COMPONENT}-el7" - "${IN_COMPONENT}-deb") - elseif("${IN_LINK_DIR}" MATCHES "fdbmonitor") - install_symlink_impl( - TO "../../${rel_path}bin/${IN_FILE_NAME}" - DESTINATION "lib/foundationdb/${IN_LINK_NAME}" - COMPONENTS "${IN_COMPONENT}-tgz") - install_symlink_impl( - TO "../../${rel_path}bin/${IN_FILE_NAME}" - DESTINATION "usr/lib/foundationdb/${IN_LINK_NAME}" - COMPONENTS "${IN_COMPONENT}-el6" - "${IN_COMPONENT}-el7" - "${IN_COMPONENT}-deb") - else() - message(FATAL_ERROR "Unknown LINK_DIR ${IN_LINK_DIR}") - endif() - else() - message(FATAL_ERROR "Unknown FILE_DIR ${IN_FILE_DIR}") - endif() - endif() -endfunction() - function(symlink_files) if (NOT WIN32) set(options "") @@ -159,7 +82,7 @@ endfunction() function(fdb_install) if(NOT WIN32 AND NOT OPEN_FOR_IDE) - set(one_value_options COMPONENT DESTINATION EXPORT DESTINATION_SUFFIX) + set(one_value_options COMPONENT DESTINATION EXPORT DESTINATION_SUFFIX RENAME) set(multi_value_options TARGETS FILES PROGRAMS DIRECTORY) cmake_parse_arguments(IN "${options}" "${one_value_options}" "${multi_value_options}" "${ARGN}") @@ -180,6 +103,9 @@ function(fdb_install) foreach(package tgz deb el6 el7 pm) set(install_path "${install_destination_for_${IN_DESTINATION}_${package}}") if(install_export) + if(IN_RENAME) + message(FATAL_ERROR "RENAME for EXPORT target not implemented") + endif() install( EXPORT "${IN_EXPORT}-${package}" DESTINATION "${install_path}${IN_DESTINATION_SUFFIX}" @@ -191,11 +117,20 @@ function(fdb_install) set(export_args EXPORT "${IN_EXPORT}-${package}") endif() if(NOT ${install_path} STREQUAL "") - install( - ${args} - ${export_args} - DESTINATION "${install_path}${IN_DESTINATION_SUFFIX}" - COMPONENT "${IN_COMPONENT}-${package}") + if(IN_RENAME) + install( + ${args} + ${export_args} + DESTINATION "${install_path}${IN_DESTINATION_SUFFIX}" + COMPONENT "${IN_COMPONENT}-${package}" + RENAME ${IN_RENAME}) + else() + install( + ${args} + ${export_args} + DESTINATION "${install_path}${IN_DESTINATION_SUFFIX}" + COMPONENT "${IN_COMPONENT}-${package}") + endif() endif() endif() endforeach() diff --git a/fdbbackup/CMakeLists.txt b/fdbbackup/CMakeLists.txt index e6e12e9e8a..04b363de7c 100644 --- a/fdbbackup/CMakeLists.txt +++ b/fdbbackup/CMakeLists.txt @@ -7,34 +7,18 @@ target_link_libraries(fdbbackup PRIVATE fdbclient) if(NOT OPEN_FOR_IDE) if(GENERATE_DEBUG_PACKAGES) fdb_install(TARGETS fdbbackup DESTINATION bin COMPONENT clients) + fdb_install(TARGETS fdbbackup DESTINATION fdbmonitor COMPONENT clients RENAME backup_agent/backup_agent) + fdb_install(TARGETS fdbbackup DESTINATION bin COMPONENT clients RENAME fdbrestore) + fdb_install(TARGETS fdbbackup DESTINATION bin COMPONENT clients RENAME dr_agent) + fdb_install(TARGETS fdbbackup DESTINATION bin COMPONENT clients RENAME fdbdr) else() add_custom_target(prepare_fdbbackup_install ALL DEPENDS strip_only_fdbbackup) fdb_install(PROGRAMS ${CMAKE_BINARY_DIR}/packages/bin/fdbbackup DESTINATION bin COMPONENT clients) + fdb_install(PROGRAMS ${CMAKE_BINARY_DIR}/packages/bin/fdbbackup DESTINATION fdbmonitor COMPONENT clients RENAME backup_agent/backup_agent) + fdb_install(PROGRAMS ${CMAKE_BINARY_DIR}/packages/bin/fdbbackup DESTINATION bin COMPONENT clients RENAME fdbrestore) + fdb_install(PROGRAMS ${CMAKE_BINARY_DIR}/packages/bin/fdbbackup DESTINATION bin COMPONENT clients RENAME dr_agent) + fdb_install(PROGRAMS ${CMAKE_BINARY_DIR}/packages/bin/fdbbackup DESTINATION bin COMPONENT clients RENAME fdbdr) endif() - install_symlink( - COMPONENT clients - FILE_DIR bin - LINK_DIR fdbmonitor - FILE_NAME fdbbackup - LINK_NAME backup_agent/backup_agent) - install_symlink( - COMPONENT clients - FILE_DIR bin - LINK_DIR bin - FILE_NAME fdbbackup - LINK_NAME fdbrestore) - install_symlink( - COMPONENT clients - FILE_DIR bin - LINK_DIR bin - FILE_NAME fdbbackup - LINK_NAME dr_agent) - install_symlink( - COMPONENT clients - FILE_DIR bin - LINK_DIR bin - FILE_NAME fdbbackup - LINK_NAME fdbdr) symlink_files( LOCATION packages/bin SOURCE fdbbackup From 309c6fddc9e911003a1de1b1393f9f1e3dd6e5e3 Mon Sep 17 00:00:00 2001 From: Zhe Wu <zhewu@apple.com> Date: Mon, 22 Feb 2021 15:29:18 -0800 Subject: [PATCH 14/28] Add documentation to client side load balancing algorithm --- fdbrpc/LoadBalance.actor.h | 18 +++++++++++++++--- fdbrpc/QueueModel.cpp | 2 ++ fdbrpc/QueueModel.h | 38 +++++++++++++++++++++++++++++++++++++- 3 files changed, 54 insertions(+), 4 deletions(-) diff --git a/fdbrpc/LoadBalance.actor.h b/fdbrpc/LoadBalance.actor.h index 74ebe1d003..b23e0b7b9a 100644 --- a/fdbrpc/LoadBalance.actor.h +++ b/fdbrpc/LoadBalance.actor.h @@ -196,14 +196,17 @@ Future<REPLY_TYPE(Request)> loadBalance( nextAlt++; if(model) { - double bestMetric = 1e9; + double bestMetric = 1e9; // Server with the shortest queue size. double nextMetric = 1e9; - double bestTime = 1e9; + double bestTime = 1e9; // The latency to the server with shortest queue size. double nextTime = 1e9; int badServers = 0; for(int i=0; i<alternatives->size(); i++) { if(badServers < std::min(i, FLOW_KNOBS->LOAD_BALANCE_MAX_BAD_OPTIONS + 1) && i == alternatives->countBest()) { + // When we have at least one healthy local server, and the bad + // server count is within "LOAD_BALANCE_MAX_BAD_OPTIONS". We + // do not need to consider any remote servers. break; } @@ -214,6 +217,8 @@ Future<REPLY_TYPE(Request)> loadBalance( double thisMetric = qd.smoothOutstanding.smoothTotal(); double thisTime = qd.latency; if(FLOW_KNOBS->LOAD_BALANCE_PENALTY_IS_BAD && qd.penalty > 1.001) { + // When a server wants to penalize itself (the default + // penalty value is 1.0), consider this server as bad. ++badServers; } @@ -239,6 +244,9 @@ Future<REPLY_TYPE(Request)> loadBalance( } } if( nextMetric > 1e8 ) { + // If we still don't have a second best choice to issue request to, + // go through all the remote servers again, since we may have + // skipped it. for(int i=alternatives->countBest(); i<alternatives->size(); i++) { RequestStream<Request> const* thisStream = &alternatives->get( i, channel ); if (!IFailureMonitor::failureMonitor().getState( thisStream->getEndpoint() ).failed) { @@ -258,6 +266,7 @@ Future<REPLY_TYPE(Request)> loadBalance( } if(nextTime < 1e9) { + // Decide when to send the request to the second best choice. if(bestTime > FLOW_KNOBS->INSTANT_SECOND_REQUEST_MULTIPLIER*(model->secondMultiplier*(nextTime) + FLOW_KNOBS->BASE_SECOND_REQUEST_TIME)) { secondDelay = Void(); } else { @@ -275,6 +284,7 @@ Future<REPLY_TYPE(Request)> loadBalance( state int numAttempts = 0; state double backoff = 0; state bool triedAllOptions = false; + // Issue requests to selected servers. loop { if(now() - startTime > (g_network->isSimulated() ? 30.0 : 600.0)) { TraceEvent ev(g_network->isSimulated() ? SevWarn : SevWarnAlways, "LoadBalanceTooLong"); @@ -292,7 +302,9 @@ Future<REPLY_TYPE(Request)> loadBalance( } } - // Find an alternative, if any, that is not failed, starting with nextAlt + // Find an alternative, if any, that is not failed, starting with + // nextAlt. This logic matters only if model == NULL. Otherwise, the + // bestAlt and nextAlt have been decided. state RequestStream<Request> const* stream = NULL; for(int alternativeNum=0; alternativeNum<alternatives->size(); alternativeNum++) { int useAlt = nextAlt; diff --git a/fdbrpc/QueueModel.cpp b/fdbrpc/QueueModel.cpp index 360b10b581..74bb2229fc 100644 --- a/fdbrpc/QueueModel.cpp +++ b/fdbrpc/QueueModel.cpp @@ -23,6 +23,8 @@ void QueueModel::endRequest( uint64_t id, double latency, double penalty, double delta, bool clean, bool futureVersion ) { auto& d = data[id]; + + // Remove the penalty added when starting the request. d.smoothOutstanding.addDelta(-delta); if(clean) { diff --git a/fdbrpc/QueueModel.h b/fdbrpc/QueueModel.h index c19652f63c..cc716bc4d2 100644 --- a/fdbrpc/QueueModel.h +++ b/fdbrpc/QueueModel.h @@ -27,13 +27,35 @@ #include "flow/Knobs.h" #include "flow/ActorCollection.h" - +/* +The data structure used for the client-side load balancer to decide which +storage server to read data from. Conceptually, it represents the size of +storage server read request queue. One QueueData represents one storage +server. +*/ struct QueueData { + // The latest queue size in this storage server. Smoother smoothOutstanding; double latency; + + // The additional queue size used in the next request to this storage + // server. This penalty is sent from the storage server from the last + // request, which is the server side mechanism to ask the client to slow + // down request. double penalty; + + // Do not consider this storage server if the current time hasn't reach this + // time. This field is computed after each request to not repeatedly try the + // same storage server that is likely not going to return a valid result. double failedUntil; + + // If the storage server returns a "future version" error, increase above + // `failedUntil` by this amount to increase the backoff time. double futureVersionBackoff; + + // If the current time has reached this time, and this storage server still + // hasn't returned a valid result, increase above `futureVersionBackoff` + // to increase the future backoff amount. double increaseBackoffTime; QueueData() : latency(0.001), penalty(1.0), smoothOutstanding(FLOW_KNOBS->QUEUE_MODEL_SMOOTHING_AMOUNT), failedUntil(0), futureVersionBackoff(FLOW_KNOBS->FUTURE_VERSION_INITIAL_BACKOFF), increaseBackoffTime(0) {} }; @@ -42,8 +64,22 @@ typedef double TimeEstimate; class QueueModel { public: + // Finishes the request sent to storage server with `id`. + // - latency: the measured client-side latency of the request. + // - penalty: the server side penalty sent along with the response from + // the storage server. + // - delta: Update server `id`'s queue model by substract this amount. + // This value should be the value returned by `addRequest` below. + // - clean: indicates whether the there was an error or not. + // - futureVersion: indicates whether there was "future version" error or + // not. void endRequest( uint64_t id, double latency, double penalty, double delta, bool clean, bool futureVersion ); QueueData& getMeasurement( uint64_t id ); + + // Starts a new request to storage server with `id`. If the storage + // server contains a penalty, add it to the queue size, and return the + // penalty. The returned penalty should be passed as `delta` to `endRequest` + // to make `smoothOutstanding` to reflect the real storage queue size. double addRequest( uint64_t id ); double secondMultiplier; double secondBudget; From e6caa9463ac351887de6576edb5fe9161caa30fd Mon Sep 17 00:00:00 2001 From: Andrew Noyes <andrew.noyes@snowflake.com> Date: Mon, 22 Feb 2021 23:48:01 +0000 Subject: [PATCH 15/28] Add release note --- .../sphinx/source/release-notes/release-notes-620.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/documentation/sphinx/source/release-notes/release-notes-620.rst b/documentation/sphinx/source/release-notes/release-notes-620.rst index 96950c726a..35094d3497 100644 --- a/documentation/sphinx/source/release-notes/release-notes-620.rst +++ b/documentation/sphinx/source/release-notes/release-notes-620.rst @@ -4,6 +4,10 @@ Release Notes ############# +6.2.32 +====== +* Fix an issue where symbolic links in cmake-built RPMs are broken if you unpack the RPM to a custom directory. `(PR #4380) <https://github.com/apple/foundationdb/pull/4380>`_ + 6.2.31 ====== * Fix a rare invalid memory access on data distributor when snapshotting large clusters. This is a follow up to `PR #4076 <https://github.com/apple/foundationdb/pull/4076>`_. `(PR #4317) <https://github.com/apple/foundationdb/pull/4317>`_ From 97d4a630fb61369be147cf5d258368353dcb91c5 Mon Sep 17 00:00:00 2001 From: Andrew Noyes <andrew.noyes@snowflake.com> Date: Mon, 22 Feb 2021 23:51:37 +0000 Subject: [PATCH 16/28] Update version to 6.2.32 --- CMakeLists.txt | 2 +- versions.target | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 2a915a9642..683b6a75c3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,7 +18,7 @@ # limitations under the License. cmake_minimum_required(VERSION 3.12) project(foundationdb - VERSION 6.2.31 + VERSION 6.2.32 DESCRIPTION "FoundationDB is a scalable, fault-tolerant, ordered key-value store with full ACID transactions." HOMEPAGE_URL "http://www.foundationdb.org/" LANGUAGES C CXX ASM) diff --git a/versions.target b/versions.target index ee2b5ae8e7..827857cb64 100644 --- a/versions.target +++ b/versions.target @@ -1,7 +1,7 @@ <?xml version="1.0"?> <Project xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> <PropertyGroup> - <Version>6.2.31</Version> + <Version>6.2.32</Version> <PackageName>6.2</PackageName> </PropertyGroup> </Project> From 0caab02e5333a084d98b31f2b119fab6160ca87f Mon Sep 17 00:00:00 2001 From: Andrew Noyes <andrew.noyes@snowflake.com> Date: Tue, 23 Feb 2021 00:05:22 +0000 Subject: [PATCH 17/28] update installer WIX GUID following release --- packaging/msi/FDBInstaller.wxs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packaging/msi/FDBInstaller.wxs b/packaging/msi/FDBInstaller.wxs index 5a0856e20d..2485c6002a 100644 --- a/packaging/msi/FDBInstaller.wxs +++ b/packaging/msi/FDBInstaller.wxs @@ -32,7 +32,7 @@ <Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'> <Product Name='$(var.Title)' - Id='{EEDA0074-D8F7-40D8-BB98-FFE6169A267F}' + Id='{E85C2EC9-9DEC-454D-B26B-DBFE8EE29F89}' UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}' Version='$(var.Version)' Manufacturer='$(var.Manufacturer)' From 0c28e1d640802dd4c5dea0b03a9ec404a96f2337 Mon Sep 17 00:00:00 2001 From: Meng Xu <meng_xu@apple.com> Date: Mon, 22 Feb 2021 16:20:31 -0800 Subject: [PATCH 18/28] Add comment to peekLogRouter in TagPartitionedLogSystem --- fdbserver/TagPartitionedLogSystem.actor.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 0e068e6fca..fe8cd80b14 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -959,8 +959,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS } } - // Return a peek cursor that peeks data at tag from the begin version to either the recovery version if peeking at - // the latest epoch or the peek cursor's epoch's end version if peeking at an old epoch. + // LogRouter or BackupWorker use this function to obtain a cursor for peeking tlogs of a generation (i.e., epoch). + // Specifically, the epoch is determined by looking up "dbgid" in tlog sets of generations. + // The returned cursor can peek data at the "tag" from the given "begin" version to that epoch's end version or + // the recovery version for the latest old epoch. For the current epoch, the cursor has no end version. Reference<IPeekCursor> peekLogRouter(UID dbgid, Version begin, Tag tag) final { bool found = false; for (const auto& log : tLogs) { From 0167a449f004ce84e2c0a7795f636cff2d26ed49 Mon Sep 17 00:00:00 2001 From: Andrew Noyes <andrew.noyes@snowflake.com> Date: Tue, 23 Feb 2021 03:05:29 +0000 Subject: [PATCH 19/28] Fix fdbrestore,dr_agent,fdbdr,backup_agent for FDB_RELEASE --- fdbbackup/CMakeLists.txt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/fdbbackup/CMakeLists.txt b/fdbbackup/CMakeLists.txt index 04b363de7c..88616749ea 100644 --- a/fdbbackup/CMakeLists.txt +++ b/fdbbackup/CMakeLists.txt @@ -7,10 +7,10 @@ target_link_libraries(fdbbackup PRIVATE fdbclient) if(NOT OPEN_FOR_IDE) if(GENERATE_DEBUG_PACKAGES) fdb_install(TARGETS fdbbackup DESTINATION bin COMPONENT clients) - fdb_install(TARGETS fdbbackup DESTINATION fdbmonitor COMPONENT clients RENAME backup_agent/backup_agent) - fdb_install(TARGETS fdbbackup DESTINATION bin COMPONENT clients RENAME fdbrestore) - fdb_install(TARGETS fdbbackup DESTINATION bin COMPONENT clients RENAME dr_agent) - fdb_install(TARGETS fdbbackup DESTINATION bin COMPONENT clients RENAME fdbdr) + fdb_install(PROGRAMS $<fdbbackup:TARGET_FILE> DESTINATION fdbmonitor COMPONENT clients RENAME backup_agent/backup_agent) + fdb_install(PROGRAMS $<fdbbackup:TARGET_FILE> DESTINATION bin COMPONENT clients RENAME fdbrestore) + fdb_install(PROGRAMS $<fdbbackup:TARGET_FILE> DESTINATION bin COMPONENT clients RENAME dr_agent) + fdb_install(PROGRAMS $<fdbbackup:TARGET_FILE> DESTINATION bin COMPONENT clients RENAME fdbdr) else() add_custom_target(prepare_fdbbackup_install ALL DEPENDS strip_only_fdbbackup) fdb_install(PROGRAMS ${CMAKE_BINARY_DIR}/packages/bin/fdbbackup DESTINATION bin COMPONENT clients) From 1ee9c3e45d12b581aa0c1748c7cf8cb39d62228f Mon Sep 17 00:00:00 2001 From: Vishesh Yadav <vishesh3y@gmail.com> Date: Mon, 22 Feb 2021 20:18:01 -0700 Subject: [PATCH 20/28] Apply suggestions from code review Co-authored-by: A.J. Beamon <aj.beamon@snowflake.com> --- fdbclient/MultiVersionTransaction.actor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index 5c2bb34f93..6a931e9cd2 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -1084,14 +1084,14 @@ std::vector<std::pair<std::string, bool>> MultiVersionApi::copyExternalLibraryPe break; } if (readCount == -1) { - TraceEvent("ExternalClientCopyFailedReadError").GetLastError().detail("LibraryPath", path); + TraceEvent(SevError, "ExternalClientCopyFailedReadError").GetLastError().detail("LibraryPath", path); throw platform_error(); } ssize_t written = 0; while (written != readCount) { ssize_t writeCount = write(tempFd, buf + written, readCount - written); if (writeCount == -1) { - TraceEvent("ExternalClientCopyFailedWriteError").GetLastError().detail("LibraryPath", path); + TraceEvent(SevError, "ExternalClientCopyFailedWriteError").GetLastError().detail("LibraryPath", path); throw platform_error(); } written += writeCount; From 8b308ed2da91dd0707f2af2246b270caad9ba34c Mon Sep 17 00:00:00 2001 From: Andrew Noyes <andrew.noyes@snowflake.com> Date: Tue, 23 Feb 2021 04:50:39 +0000 Subject: [PATCH 21/28] Fix syntax --- fdbbackup/CMakeLists.txt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/fdbbackup/CMakeLists.txt b/fdbbackup/CMakeLists.txt index 88616749ea..753181116a 100644 --- a/fdbbackup/CMakeLists.txt +++ b/fdbbackup/CMakeLists.txt @@ -7,10 +7,10 @@ target_link_libraries(fdbbackup PRIVATE fdbclient) if(NOT OPEN_FOR_IDE) if(GENERATE_DEBUG_PACKAGES) fdb_install(TARGETS fdbbackup DESTINATION bin COMPONENT clients) - fdb_install(PROGRAMS $<fdbbackup:TARGET_FILE> DESTINATION fdbmonitor COMPONENT clients RENAME backup_agent/backup_agent) - fdb_install(PROGRAMS $<fdbbackup:TARGET_FILE> DESTINATION bin COMPONENT clients RENAME fdbrestore) - fdb_install(PROGRAMS $<fdbbackup:TARGET_FILE> DESTINATION bin COMPONENT clients RENAME dr_agent) - fdb_install(PROGRAMS $<fdbbackup:TARGET_FILE> DESTINATION bin COMPONENT clients RENAME fdbdr) + fdb_install(PROGRAMS $<TARGET_FILE:fdbbackup> DESTINATION fdbmonitor COMPONENT clients RENAME backup_agent/backup_agent) + fdb_install(PROGRAMS $<TARGET_FILE:fdbbackup> DESTINATION bin COMPONENT clients RENAME fdbrestore) + fdb_install(PROGRAMS $<TARGET_FILE:fdbbackup> DESTINATION bin COMPONENT clients RENAME dr_agent) + fdb_install(PROGRAMS $<TARGET_FILE:fdbbackup> DESTINATION bin COMPONENT clients RENAME fdbdr) else() add_custom_target(prepare_fdbbackup_install ALL DEPENDS strip_only_fdbbackup) fdb_install(PROGRAMS ${CMAKE_BINARY_DIR}/packages/bin/fdbbackup DESTINATION bin COMPONENT clients) From 8ee7ee9e4545529eca1dd18aa77cec0f2ee546ca Mon Sep 17 00:00:00 2001 From: Zhe Wu <zhewu@apple.com> Date: Tue, 23 Feb 2021 10:21:35 -0800 Subject: [PATCH 22/28] Apply clang-format to fdbrpc/QueueModel.h --- fdbrpc/QueueModel.h | 57 +++++++++++++++++++++++---------------------- 1 file changed, 29 insertions(+), 28 deletions(-) diff --git a/fdbrpc/QueueModel.h b/fdbrpc/QueueModel.h index cc716bc4d2..0e7ddefd95 100644 --- a/fdbrpc/QueueModel.h +++ b/fdbrpc/QueueModel.h @@ -43,7 +43,7 @@ struct QueueData { // request, which is the server side mechanism to ask the client to slow // down request. double penalty; - + // Do not consider this storage server if the current time hasn't reach this // time. This field is computed after each request to not repeatedly try the // same storage server that is likely not going to return a valid result. @@ -57,7 +57,9 @@ struct QueueData { // hasn't returned a valid result, increase above `futureVersionBackoff` // to increase the future backoff amount. double increaseBackoffTime; - QueueData() : latency(0.001), penalty(1.0), smoothOutstanding(FLOW_KNOBS->QUEUE_MODEL_SMOOTHING_AMOUNT), failedUntil(0), futureVersionBackoff(FLOW_KNOBS->FUTURE_VERSION_INITIAL_BACKOFF), increaseBackoffTime(0) {} + QueueData() + : latency(0.001), penalty(1.0), smoothOutstanding(FLOW_KNOBS->QUEUE_MODEL_SMOOTHING_AMOUNT), failedUntil(0), + futureVersionBackoff(FLOW_KNOBS->FUTURE_VERSION_INITIAL_BACKOFF), increaseBackoffTime(0) {} }; typedef double TimeEstimate; @@ -67,33 +69,32 @@ public: // Finishes the request sent to storage server with `id`. // - latency: the measured client-side latency of the request. // - penalty: the server side penalty sent along with the response from - // the storage server. + // the storage server. // - delta: Update server `id`'s queue model by substract this amount. // This value should be the value returned by `addRequest` below. - // - clean: indicates whether the there was an error or not. + // - clean: indicates whether the there was an error or not. // - futureVersion: indicates whether there was "future version" error or // not. - void endRequest( uint64_t id, double latency, double penalty, double delta, bool clean, bool futureVersion ); - QueueData& getMeasurement( uint64_t id ); + void endRequest(uint64_t id, double latency, double penalty, double delta, bool clean, bool futureVersion); + QueueData& getMeasurement(uint64_t id); - // Starts a new request to storage server with `id`. If the storage - // server contains a penalty, add it to the queue size, and return the - // penalty. The returned penalty should be passed as `delta` to `endRequest` - // to make `smoothOutstanding` to reflect the real storage queue size. - double addRequest( uint64_t id ); + // Starts a new request to storage server with `id`. If the storage + // server contains a penalty, add it to the queue size, and return the + // penalty. The returned penalty should be passed as `delta` to `endRequest` + // to make `smoothOutstanding` to reflect the real storage queue size. + double addRequest(uint64_t id); double secondMultiplier; double secondBudget; - PromiseStream< Future<Void> > addActor; + PromiseStream<Future<Void>> addActor; Future<Void> laggingRequests; // requests for which a different recipient already answered int laggingRequestCount; QueueModel() : secondMultiplier(1.0), secondBudget(0), laggingRequestCount(0) { - laggingRequests = actorCollection( addActor.getFuture(), &laggingRequestCount ); + laggingRequests = actorCollection(addActor.getFuture(), &laggingRequestCount); } - ~QueueModel() { - laggingRequests.cancel(); - } + ~QueueModel() { laggingRequests.cancel(); } + private: std::unordered_map<uint64_t, QueueData> data; }; @@ -101,20 +102,20 @@ private: /* old queue model class QueueModel { public: - QueueModel() : new_index(0) { - total_time[0] = 0; - total_time[1] = 0; - } - void addMeasurement( uint64_t id, QueueDetails qd ); - TimeEstimate getTimeEstimate( uint64_t id ); - TimeEstimate getAverageTimeEstimate(); - QueueDetails getMeasurement( uint64_t id ); - void expire(); + QueueModel() : new_index(0) { + total_time[0] = 0; + total_time[1] = 0; + } + void addMeasurement( uint64_t id, QueueDetails qd ); + TimeEstimate getTimeEstimate( uint64_t id ); + TimeEstimate getAverageTimeEstimate(); + QueueDetails getMeasurement( uint64_t id ); + void expire(); private: - std::map<uint64_t, QueueDetails> data[2]; - double total_time[2]; - int new_index; // data[new_index] is the new data + std::map<uint64_t, QueueDetails> data[2]; + double total_time[2]; + int new_index; // data[new_index] is the new data }; */ From 25a7f87433fde9ffefb09e5fc1af973598c200ed Mon Sep 17 00:00:00 2001 From: Zhe Wu <zhewu@apple.com> Date: Tue, 23 Feb 2021 12:20:05 -0800 Subject: [PATCH 23/28] Revise documentation in QueueModel to clarify the meaning of 'outstanding' and 'penalty' --- fdbrpc/QueueModel.h | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/fdbrpc/QueueModel.h b/fdbrpc/QueueModel.h index 0e7ddefd95..7bf3d312f4 100644 --- a/fdbrpc/QueueModel.h +++ b/fdbrpc/QueueModel.h @@ -27,21 +27,23 @@ #include "flow/Knobs.h" #include "flow/ActorCollection.h" -/* -The data structure used for the client-side load balancer to decide which -storage server to read data from. Conceptually, it represents the size of -storage server read request queue. One QueueData represents one storage -server. -*/ +// The data structure used for the client-side load balancing algorithm to +// decide which storage server to read data from. Conceptually, it tracks the +// number of outstanding requests the current client sent to each storage +// server. One "QueueData" represents one storage server. struct QueueData { - // The latest queue size in this storage server. + // The current outstanding requests sent by the local client to this storage + // server. The number is smoothed out over a continuous timeline. Smoother smoothOutstanding; + + // The last client perceived latency to this storage server. double latency; - // The additional queue size used in the next request to this storage - // server. This penalty is sent from the storage server from the last - // request, which is the server side mechanism to ask the client to slow - // down request. + // Represents the "cost" of each storage request. By default, the penalty is + // 1 indicates that each outstanding request corresponds 1 outstanding + // request. However, storage server can also increase the penalty if it + // decides to ask the client to slow down sending requests to it. Penalty + // is updated after each LoadBalancedReply. double penalty; // Do not consider this storage server if the current time hasn't reach this @@ -69,7 +71,7 @@ public: // Finishes the request sent to storage server with `id`. // - latency: the measured client-side latency of the request. // - penalty: the server side penalty sent along with the response from - // the storage server. + // the storage server. Requires >= 1. // - delta: Update server `id`'s queue model by substract this amount. // This value should be the value returned by `addRequest` below. // - clean: indicates whether the there was an error or not. From a17b17f89678b34cd00b8f5a67bdb8229dab8429 Mon Sep 17 00:00:00 2001 From: Zhe Wu <zhewu@apple.com> Date: Tue, 23 Feb 2021 12:23:10 -0800 Subject: [PATCH 24/28] add more clarification --- fdbrpc/LoadBalance.actor.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fdbrpc/LoadBalance.actor.h b/fdbrpc/LoadBalance.actor.h index b23e0b7b9a..2b441da89b 100644 --- a/fdbrpc/LoadBalance.actor.h +++ b/fdbrpc/LoadBalance.actor.h @@ -196,9 +196,9 @@ Future<REPLY_TYPE(Request)> loadBalance( nextAlt++; if(model) { - double bestMetric = 1e9; // Server with the shortest queue size. + double bestMetric = 1e9; // Storage server with the least outstanding requests. double nextMetric = 1e9; - double bestTime = 1e9; // The latency to the server with shortest queue size. + double bestTime = 1e9; // The latency to the server with the least outstanding requests. double nextTime = 1e9; int badServers = 0; From 519457cb7c3303123656ee4c46c12efe26fd1fa4 Mon Sep 17 00:00:00 2001 From: Andrew Noyes <andrew.noyes@snowflake.com> Date: Wed, 24 Feb 2021 16:31:57 +0000 Subject: [PATCH 25/28] Update version following release --- CMakeLists.txt | 2 +- packaging/msi/FDBInstaller.wxs | 2 +- versions.target | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 683b6a75c3..7027471c70 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,7 +18,7 @@ # limitations under the License. cmake_minimum_required(VERSION 3.12) project(foundationdb - VERSION 6.2.32 + VERSION 6.2.33 DESCRIPTION "FoundationDB is a scalable, fault-tolerant, ordered key-value store with full ACID transactions." HOMEPAGE_URL "http://www.foundationdb.org/" LANGUAGES C CXX ASM) diff --git a/packaging/msi/FDBInstaller.wxs b/packaging/msi/FDBInstaller.wxs index 2485c6002a..77ec3c5cb2 100644 --- a/packaging/msi/FDBInstaller.wxs +++ b/packaging/msi/FDBInstaller.wxs @@ -32,7 +32,7 @@ <Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'> <Product Name='$(var.Title)' - Id='{E85C2EC9-9DEC-454D-B26B-DBFE8EE29F89}' + Id='{96B6A022-0F52-4E53-B552-21B417508DE0}' UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}' Version='$(var.Version)' Manufacturer='$(var.Manufacturer)' diff --git a/versions.target b/versions.target index 827857cb64..1cbfdd37b6 100644 --- a/versions.target +++ b/versions.target @@ -1,7 +1,7 @@ <?xml version="1.0"?> <Project xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> <PropertyGroup> - <Version>6.2.32</Version> + <Version>6.2.33</Version> <PackageName>6.2</PackageName> </PropertyGroup> </Project> From 4f2ed263cc048411f0561057b71e0acf6367ff9a Mon Sep 17 00:00:00 2001 From: Zhe Wu <zhewu@apple.com> Date: Wed, 24 Feb 2021 15:15:56 -0800 Subject: [PATCH 26/28] Update comment --- fdbrpc/QueueModel.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbrpc/QueueModel.h b/fdbrpc/QueueModel.h index 7bf3d312f4..74f57f6647 100644 --- a/fdbrpc/QueueModel.h +++ b/fdbrpc/QueueModel.h @@ -40,7 +40,7 @@ struct QueueData { double latency; // Represents the "cost" of each storage request. By default, the penalty is - // 1 indicates that each outstanding request corresponds 1 outstanding + // 1 indicating that each outstanding request corresponds 1 outstanding // request. However, storage server can also increase the penalty if it // decides to ask the client to slow down sending requests to it. Penalty // is updated after each LoadBalancedReply. From 229b4ad2c3f90572f1a4debb94467cb09eb58b67 Mon Sep 17 00:00:00 2001 From: "Johannes M. Scheuermann" <jscheuermann@apple.com> Date: Thu, 25 Feb 2021 11:09:02 +0000 Subject: [PATCH 27/28] fdbcli: Output errors and warnings to stderr --- fdbcli/fdbcli.actor.cpp | 230 ++++++++++++++++++++-------------------- 1 file changed, 115 insertions(+), 115 deletions(-) diff --git a/fdbcli/fdbcli.actor.cpp b/fdbcli/fdbcli.actor.cpp index f4a57b2b03..a568ba0f6f 100644 --- a/fdbcli/fdbcli.actor.cpp +++ b/fdbcli/fdbcli.actor.cpp @@ -145,7 +145,7 @@ public: if(transactionItr != transactionOptions.legalOptions.end()) setTransactionOption(tr, transactionItr->second, enabled, arg, intrans); else { - printf("ERROR: invalid option '%s'. Try `help options' for a list of available options.\n", optionStr.toString().c_str()); + fprintf(stderr, "ERROR: invalid option '%s'. Try `help options' for a list of available options.\n", optionStr.toString().c_str()); throw invalid_option(); } } @@ -184,7 +184,7 @@ private: //Sets a transaction option. If intrans == true, then this option is also applied to the passed in transaction. void setTransactionOption(Reference<ReadYourWritesTransaction> tr, FDBTransactionOptions::Option option, bool enabled, Optional<StringRef> arg, bool intrans) { if(enabled && arg.present() != FDBTransactionOptions::optionInfo.getMustExist(option).hasParameter) { - printf("ERROR: option %s a parameter\n", arg.present() ? "did not expect" : "expected"); + fprintf(stderr, "ERROR: option %s a parameter\n", arg.present() ? "did not expect" : "expected"); throw invalid_option_value(); } @@ -635,7 +635,7 @@ void printUsage(StringRef command) { if (i != helpMap.end()) printf("Usage: %s\n", i->second.usage.c_str()); else - printf("ERROR: Unknown command `%s'\n", command.toString().c_str()); + fprintf(stderr, "ERROR: Unknown command `%s'\n", command.toString().c_str()); } std::string getCoordinatorsInfoString(StatusObjectReader statusObj) { @@ -776,7 +776,7 @@ std::pair<int, int> getNumOfNonExcludedProcessAndZones(StatusObjectReader status void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level, bool displayDatabaseAvailable = true, bool hideErrorMessages = false) { if (FlowTransport::transport().incompatibleOutgoingConnectionsPresent()) { - printf("WARNING: One or more of the processes in the cluster is incompatible with this version of fdbcli.\n\n"); + fprintf(stderr, "WARNING: One or more of the processes in the cluster is incompatible with this version of fdbcli.\n\n"); } try { @@ -1694,7 +1694,7 @@ void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level, bool upToDate; if (!statusObjClient.get("cluster_file.up_to_date", upToDate) || !upToDate){ - printf("WARNING: The cluster file is not up to date. Type 'status' for more information.\n"); + fprintf(stderr, "WARNING: The cluster file is not up to date. Type 'status' for more information.\n"); } } catch (std::runtime_error& ){ @@ -1887,11 +1887,11 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere ret=true; break; case ConfigurationResult::INVALID_CONFIGURATION: - printf("ERROR: These changes would make the configuration invalid\n"); + fprintf(stderr, "ERROR: These changes would make the configuration invalid\n"); ret=true; break; case ConfigurationResult::DATABASE_ALREADY_CREATED: - printf("ERROR: Database already exists! To change configuration, don't say `new'\n"); + fprintf(stderr, "ERROR: Database already exists! To change configuration, don't say `new'\n"); ret=true; break; case ConfigurationResult::DATABASE_CREATED: @@ -1899,43 +1899,43 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere ret=false; break; case ConfigurationResult::DATABASE_UNAVAILABLE: - printf("ERROR: The database is unavailable\n"); - printf("Type `configure FORCE <TOKEN...>' to configure without this check\n"); + fprintf(stderr, "ERROR: The database is unavailable\n"); + fprintf(stderr, "Type `configure FORCE <TOKEN...>' to configure without this check\n"); ret=true; break; case ConfigurationResult::STORAGE_IN_UNKNOWN_DCID: - printf("ERROR: All storage servers must be in one of the known regions\n"); - printf("Type `configure FORCE <TOKEN...>' to configure without this check\n"); + fprintf(stderr, "ERROR: All storage servers must be in one of the known regions\n"); + fprintf(stderr, "Type `configure FORCE <TOKEN...>' to configure without this check\n"); ret=true; break; case ConfigurationResult::REGION_NOT_FULLY_REPLICATED: - printf("ERROR: When usable_regions > 1, all regions with priority >= 0 must be fully replicated before changing the configuration\n"); - printf("Type `configure FORCE <TOKEN...>' to configure without this check\n"); + fprintf(stderr, "ERROR: When usable_regions > 1, all regions with priority >= 0 must be fully replicated before changing the configuration\n"); + fprintf(stderr, "Type `configure FORCE <TOKEN...>' to configure without this check\n"); ret=true; break; case ConfigurationResult::MULTIPLE_ACTIVE_REGIONS: - printf("ERROR: When changing usable_regions, only one region can have priority >= 0\n"); - printf("Type `configure FORCE <TOKEN...>' to configure without this check\n"); + fprintf(stderr, "ERROR: When changing usable_regions, only one region can have priority >= 0\n"); + fprintf(stderr, "Type `configure FORCE <TOKEN...>' to configure without this check\n"); ret=true; break; case ConfigurationResult::REGIONS_CHANGED: - printf("ERROR: The region configuration cannot be changed while simultaneously changing usable_regions\n"); - printf("Type `configure FORCE <TOKEN...>' to configure without this check\n"); + fprintf(stderr, "ERROR: The region configuration cannot be changed while simultaneously changing usable_regions\n"); + fprintf(stderr, "Type `configure FORCE <TOKEN...>' to configure without this check\n"); ret=true; break; case ConfigurationResult::NOT_ENOUGH_WORKERS: - printf("ERROR: Not enough processes exist to support the specified configuration\n"); - printf("Type `configure FORCE <TOKEN...>' to configure without this check\n"); + fprintf(stderr, "ERROR: Not enough processes exist to support the specified configuration\n"); + fprintf(stderr, "Type `configure FORCE <TOKEN...>' to configure without this check\n"); ret=true; break; case ConfigurationResult::REGION_REPLICATION_MISMATCH: - printf("ERROR: `three_datacenter' replication is incompatible with region configuration\n"); - printf("Type `configure FORCE <TOKEN...>' to configure without this check\n"); + fprintf(stderr, "ERROR: `three_datacenter' replication is incompatible with region configuration\n"); + fprintf(stderr, "Type `configure FORCE <TOKEN...>' to configure without this check\n"); ret=true; break; case ConfigurationResult::DCID_MISSING: - printf("ERROR: `No storage servers in one of the specified regions\n"); - printf("Type `configure FORCE <TOKEN...>' to configure without this check\n"); + fprintf(stderr, "ERROR: `No storage servers in one of the specified regions\n"); + fprintf(stderr, "Type `configure FORCE <TOKEN...>' to configure without this check\n"); ret=true; break; case ConfigurationResult::SUCCESS: @@ -1943,7 +1943,7 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere ret=false; break; case ConfigurationResult::LOCKED_NOT_NEW: - printf("ERROR: `only new databases can be configured as locked`\n"); + fprintf(stderr, "ERROR: `only new databases can be configured as locked`\n"); ret = true; break; default: @@ -1957,11 +1957,11 @@ ACTOR Future<bool> fileConfigure(Database db, std::string filePath, bool isNewDa std::string contents(readFileBytes(filePath, 100000)); json_spirit::mValue config; if(!json_spirit::read_string( contents, config )) { - printf("ERROR: Invalid JSON\n"); + fprintf(stderr, "ERROR: Invalid JSON\n"); return true; } if(config.type() != json_spirit::obj_type) { - printf("ERROR: Configuration file must contain a JSON object\n"); + fprintf(stderr, "ERROR: Configuration file must contain a JSON object\n"); return true; } StatusObject configJSON = config.get_obj(); @@ -2003,27 +2003,27 @@ ACTOR Future<bool> fileConfigure(Database db, std::string filePath, bool isNewDa bool ret; switch(result) { case ConfigurationResult::NO_OPTIONS_PROVIDED: - printf("ERROR: No options provided\n"); + fprintf(stderr, "ERROR: No options provided\n"); ret=true; break; case ConfigurationResult::CONFLICTING_OPTIONS: - printf("ERROR: Conflicting options\n"); + fprintf(stderr, "ERROR: Conflicting options\n"); ret=true; break; case ConfigurationResult::UNKNOWN_OPTION: - printf("ERROR: Unknown option\n"); //This should not be possible because of schema match + fprintf(stderr, "ERROR: Unknown option\n"); //This should not be possible because of schema match ret=true; break; case ConfigurationResult::INCOMPLETE_CONFIGURATION: - printf("ERROR: Must specify both a replication level and a storage engine when creating a new database\n"); + fprintf(stderr, "ERROR: Must specify both a replication level and a storage engine when creating a new database\n"); ret=true; break; case ConfigurationResult::INVALID_CONFIGURATION: - printf("ERROR: These changes would make the configuration invalid\n"); + fprintf(stderr, "ERROR: These changes would make the configuration invalid\n"); ret=true; break; case ConfigurationResult::DATABASE_ALREADY_CREATED: - printf("ERROR: Database already exists! To change configuration, don't say `new'\n"); + fprintf(stderr, "ERROR: Database already exists! To change configuration, don't say `new'\n"); ret=true; break; case ConfigurationResult::DATABASE_CREATED: @@ -2031,42 +2031,42 @@ ACTOR Future<bool> fileConfigure(Database db, std::string filePath, bool isNewDa ret=false; break; case ConfigurationResult::DATABASE_UNAVAILABLE: - printf("ERROR: The database is unavailable\n"); + fprintf(stderr, "ERROR: The database is unavailable\n"); printf("Type `fileconfigure FORCE <FILENAME>' to configure without this check\n"); ret=true; break; case ConfigurationResult::STORAGE_IN_UNKNOWN_DCID: - printf("ERROR: All storage servers must be in one of the known regions\n"); + fprintf(stderr, "ERROR: All storage servers must be in one of the known regions\n"); printf("Type `fileconfigure FORCE <FILENAME>' to configure without this check\n"); ret=true; break; case ConfigurationResult::REGION_NOT_FULLY_REPLICATED: - printf("ERROR: When usable_regions > 1, All regions with priority >= 0 must be fully replicated before changing the configuration\n"); + fprintf(stderr, "ERROR: When usable_regions > 1, All regions with priority >= 0 must be fully replicated before changing the configuration\n"); printf("Type `fileconfigure FORCE <FILENAME>' to configure without this check\n"); ret=true; break; case ConfigurationResult::MULTIPLE_ACTIVE_REGIONS: - printf("ERROR: When changing usable_regions, only one region can have priority >= 0\n"); + fprintf(stderr, "ERROR: When changing usable_regions, only one region can have priority >= 0\n"); printf("Type `fileconfigure FORCE <FILENAME>' to configure without this check\n"); ret=true; break; case ConfigurationResult::REGIONS_CHANGED: - printf("ERROR: The region configuration cannot be changed while simultaneously changing usable_regions\n"); + fprintf(stderr, "ERROR: The region configuration cannot be changed while simultaneously changing usable_regions\n"); printf("Type `fileconfigure FORCE <FILENAME>' to configure without this check\n"); ret=true; break; case ConfigurationResult::NOT_ENOUGH_WORKERS: - printf("ERROR: Not enough processes exist to support the specified configuration\n"); + fprintf(stderr, "ERROR: Not enough processes exist to support the specified configuration\n"); printf("Type `fileconfigure FORCE <FILENAME>' to configure without this check\n"); ret=true; break; case ConfigurationResult::REGION_REPLICATION_MISMATCH: - printf("ERROR: `three_datacenter' replication is incompatible with region configuration\n"); + fprintf(stderr, "ERROR: `three_datacenter' replication is incompatible with region configuration\n"); printf("Type `fileconfigure FORCE <TOKEN...>' to configure without this check\n"); ret=true; break; case ConfigurationResult::DCID_MISSING: - printf("ERROR: `No storage servers in one of the specified regions\n"); + fprintf(stderr, "ERROR: `No storage servers in one of the specified regions\n"); printf("Type `fileconfigure FORCE <TOKEN...>' to configure without this check\n"); ret=true; break; @@ -2110,13 +2110,13 @@ ACTOR Future<bool> coordinators( Database db, std::vector<StringRef> tokens, boo // SOMEDAY: Check for keywords auto const& addr = NetworkAddress::parse( t->toString() ); if (addresses.count(addr)){ - printf("ERROR: passed redundant coordinators: `%s'\n", addr.toString().c_str()); + fprintf(stderr, "ERROR: passed redundant coordinators: `%s'\n", addr.toString().c_str()); return true; } addresses.insert(addr); } catch (Error& e) { if (e.code() == error_code_connection_string_invalid) { - printf("ERROR: '%s' is not a valid network endpoint address\n", t->toString().c_str()); + fprintf(stderr, "ERROR: '%s' is not a valid network endpoint address\n", t->toString().c_str()); return true; } throw; @@ -2135,30 +2135,30 @@ ACTOR Future<bool> coordinators( Database db, std::vector<StringRef> tokens, boo bool err = true; switch(r) { case CoordinatorsResult::INVALID_NETWORK_ADDRESSES: - printf("ERROR: The specified network addresses are invalid\n"); + fprintf(stderr, "ERROR: The specified network addresses are invalid\n"); break; case CoordinatorsResult::SAME_NETWORK_ADDRESSES: printf("No change (existing configuration satisfies request)\n"); err = false; break; case CoordinatorsResult::NOT_COORDINATORS: - printf("ERROR: Coordination servers are not running on the specified network addresses\n"); + fprintf(stderr, "ERROR: Coordination servers are not running on the specified network addresses\n"); break; case CoordinatorsResult::DATABASE_UNREACHABLE: - printf("ERROR: Database unreachable\n"); + fprintf(stderr, "ERROR: Database unreachable\n"); break; case CoordinatorsResult::BAD_DATABASE_STATE: - printf("ERROR: The database is in an unexpected state from which changing coordinators might be unsafe\n"); + fprintf(stderr, "ERROR: The database is in an unexpected state from which changing coordinators might be unsafe\n"); break; case CoordinatorsResult::COORDINATOR_UNREACHABLE: - printf("ERROR: One of the specified coordinators is unreachable\n"); + fprintf(stderr, "ERROR: One of the specified coordinators is unreachable\n"); break; case CoordinatorsResult::SUCCESS: printf("Coordination state changed\n"); err=false; break; case CoordinatorsResult::NOT_ENOUGH_MACHINES: - printf("ERROR: Too few fdbserver machines to provide coordination at the current redundancy level\n"); + fprintf(stderr, "ERROR: Too few fdbserver machines to provide coordination at the current redundancy level\n"); break; default: ASSERT(false); @@ -2178,7 +2178,7 @@ ACTOR Future<bool> include( Database db, std::vector<StringRef> tokens ) { } else { auto a = AddressExclusion::parse( *t ); if (!a.isValid()) { - printf("ERROR: '%s' is not a valid network endpoint address\n", t->toString().c_str()); + fprintf(stderr, "ERROR: '%s' is not a valid network endpoint address\n", t->toString().c_str()); if( t->toString().find(":tls") != std::string::npos ) printf(" Do not include the `:tls' suffix when naming a process\n"); return true; @@ -2230,7 +2230,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc } else { auto a = AddressExclusion::parse( *t ); if (!a.isValid()) { - printf("ERROR: '%s' is not a valid network endpoint address\n", t->toString().c_str()); + fprintf(stderr, "ERROR: '%s' is not a valid network endpoint address\n", t->toString().c_str()); if( t->toString().find(":tls") != std::string::npos ) printf(" Do not include the `:tls' suffix when naming a process\n"); return true; @@ -2271,13 +2271,13 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc StatusObjectReader statusObjCluster; if (!statusObj.get("cluster", statusObjCluster)) { - printf("%s", errorString.c_str()); + fprintf(stderr, "%s", errorString.c_str()); return true; } StatusObjectReader processesMap; if (!statusObjCluster.get("processes", processesMap)) { - printf("%s", errorString.c_str()); + fprintf(stderr, "%s", errorString.c_str()); return true; } @@ -2301,7 +2301,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc StatusObjectReader process(proc.second); std::string addrStr; if (!process.get("address", addrStr)) { - printf("%s", errorString.c_str()); + fprintf(stderr, "%s", errorString.c_str()); return true; } NetworkAddress addr = NetworkAddress::parse(addrStr); @@ -2314,19 +2314,19 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc if(!excluded) { StatusObjectReader disk; if (!process.get("disk", disk)) { - printf("%s", errorString.c_str()); + fprintf(stderr, "%s", errorString.c_str()); return true; } int64_t total_bytes; if (!disk.get("total_bytes", total_bytes)) { - printf("%s", errorString.c_str()); + fprintf(stderr, "%s", errorString.c_str()); return true; } int64_t free_bytes; if (!disk.get("free_bytes", free_bytes)) { - printf("%s", errorString.c_str()); + fprintf(stderr, "%s", errorString.c_str()); return true; } @@ -2336,12 +2336,12 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc } catch (...) // std::exception { - printf("%s", errorString.c_str()); + fprintf(stderr, "%s", errorString.c_str()); return true; } if( ssExcludedCount==ssTotalCount || (1-worstFreeSpaceRatio)*ssTotalCount/(ssTotalCount-ssExcludedCount) > 0.9 ) { - printf("ERROR: This exclude may cause the total free space in the cluster to drop below 10%%.\n" + fprintf(stderr, "ERROR: This exclude may cause the total free space in the cluster to drop below 10%%.\n" "Type `exclude FORCE <ADDRESS...>' to exclude without checking free space.\n"); return true; } @@ -2377,22 +2377,22 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc for (const auto& exclusion : exclusionVector) { if (absentExclusions.find(exclusion) != absentExclusions.end()) { if (exclusion.port == 0) { - printf(" %s(Whole machine) ---- WARNING: Missing from cluster!Be sure that you excluded the " + fprintf(stderr, " %s(Whole machine) ---- WARNING: Missing from cluster!Be sure that you excluded the " "correct machines before removing them from the cluster!\n", exclusion.ip.toString().c_str()); } else { - printf(" %s ---- WARNING: Missing from cluster! Be sure that you excluded the correct processes " + fprintf(stderr, " %s ---- WARNING: Missing from cluster! Be sure that you excluded the correct processes " "before removing them from the cluster!\n", exclusion.toString().c_str()); } } else if (std::any_of(notExcludedServers.begin(), notExcludedServers.end(), [&](const NetworkAddress& a) { return addressExcluded({ exclusion }, a); })) { if (exclusion.port == 0) { - printf(" %s(Whole machine) ---- WARNING: Exclusion in progress! It is not safe to remove this " + fprintf(stderr, " %s(Whole machine) ---- WARNING: Exclusion in progress! It is not safe to remove this " "machine from the cluster\n", exclusion.ip.toString().c_str()); } else { - printf(" %s ---- WARNING: Exclusion in progress! It is not safe to remove this process from the " + fprintf(stderr, " %s ---- WARNING: Exclusion in progress! It is not safe to remove this process from the " "cluster\n", exclusion.toString().c_str()); } @@ -2414,7 +2414,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc for( auto& c : ccs.coordinators()) { if (std::count(exclusionVector.begin(), exclusionVector.end(), AddressExclusion(c.ip, c.port)) || std::count(exclusionVector.begin(), exclusionVector.end(), AddressExclusion(c.ip))) { - printf("WARNING: %s is a coordinator!\n", c.toString().c_str()); + fprintf(stderr, "WARNING: %s is a coordinator!\n", c.toString().c_str()); foundCoordinator = true; } } @@ -2466,7 +2466,7 @@ ACTOR Future<bool> setClass( Database db, std::vector<StringRef> tokens ) { AddressExclusion addr = AddressExclusion::parse( tokens[1] ); if (!addr.isValid()) { - printf("ERROR: '%s' is not a valid network endpoint address\n", tokens[1].toString().c_str()); + fprintf(stderr, "ERROR: '%s' is not a valid network endpoint address\n", tokens[1].toString().c_str()); if( tokens[1].toString().find(":tls") != std::string::npos ) printf(" Do not include the `:tls' suffix when naming a process\n"); return true; @@ -2474,7 +2474,7 @@ ACTOR Future<bool> setClass( Database db, std::vector<StringRef> tokens ) { ProcessClass processClass(tokens[2].toString(), ProcessClass::DBSource); if(processClass.classType() == ProcessClass::InvalidClass && tokens[2] != LiteralStringRef("default")) { - printf("ERROR: '%s' is not a valid process class\n", tokens[2].toString().c_str()); + fprintf(stderr, "ERROR: '%s' is not a valid process class\n", tokens[2].toString().c_str()); return true; } @@ -2968,7 +2968,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { } } catch (Error& e) { - printf("ERROR: %s (%d)\n", e.what(), e.code()); + fprintf(stderr, "ERROR: %s (%d)\n", e.what(), e.code()); printf("Unable to connect to cluster from `%s'\n", ccf->getFilename().c_str()); return 1; } @@ -3062,9 +3062,9 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { continue; if (tokencmp(tokens[0], "parse_error")) { - printf("ERROR: Command failed to completely parse.\n"); + fprintf(stderr, "ERROR: Command failed to completely parse.\n"); if (tokens.size() > 1) { - printf("ERROR: Not running partial or malformed command:"); + fprintf(stderr, "ERROR: Not running partial or malformed command:"); for (auto t = tokens.begin() + 1; t != tokens.end(); ++t) printf(" %s", formatStringRef(*t, true).c_str()); printf("\n"); @@ -3081,7 +3081,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { } if (!helpMap.count(tokens[0].toString()) && !hiddenCommands.count(tokens[0].toString())) { - printf("ERROR: Unknown command `%s'. Try `help'?\n", formatStringRef(tokens[0]).c_str()); + fprintf(stderr, "ERROR: Unknown command `%s'. Try `help'?\n", formatStringRef(tokens[0]).c_str()); is_error = true; continue; } @@ -3283,7 +3283,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { throw e; } } else { - printf("ERROR: Incorrect passphrase entered.\n"); + fprintf(stderr, "ERROR: Incorrect passphrase entered.\n"); is_error = true; } } @@ -3306,7 +3306,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { printUsage(tokens[0]); is_error = true; } else if (intrans) { - printf("ERROR: Already in transaction\n"); + fprintf(stderr, "ERROR: Already in transaction\n"); is_error = true; } else { activeOptions = FdbOptions(globalOptions); @@ -3323,7 +3323,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { printUsage(tokens[0]); is_error = true; } else if (!intrans) { - printf("ERROR: No active transaction\n"); + fprintf(stderr, "ERROR: No active transaction\n"); is_error = true; } else { wait( commitTransaction( tr ) ); @@ -3339,7 +3339,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { printUsage(tokens[0]); is_error = true; } else if (!intrans) { - printf("ERROR: No active transaction\n"); + fprintf(stderr, "ERROR: No active transaction\n"); is_error = true; } else { tr->reset(); @@ -3356,7 +3356,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { printUsage(tokens[0]); is_error = true; } else if (!intrans) { - printf("ERROR: No active transaction\n"); + fprintf(stderr, "ERROR: No active transaction\n"); is_error = true; } else { intrans = false; @@ -3442,14 +3442,14 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { tr->set(LiteralStringRef("\xff\xff/reboot_worker"), it.second.first); } if (address_interface.size() == 0) { - printf("ERROR: no processes to kill. You must run the `kill’ command before running `kill all’.\n"); + fprintf(stderr, "ERROR: no processes to kill. You must run the `kill’ command before running `kill all’.\n"); } else { printf("Attempted to kill %zu processes\n", address_interface.size()); } } else { for(int i = 1; i < tokens.size(); i++) { if(!address_interface.count(tokens[i])) { - printf("ERROR: process `%s' not recognized.\n", printable(tokens[i]).c_str()); + fprintf(stderr, "ERROR: process `%s' not recognized.\n", printable(tokens[i]).c_str()); is_error = true; break; } @@ -3492,7 +3492,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { } else { for(int i = 2; i < tokens.size(); i++) { if(!address_interface.count(tokens[i])) { - printf("ERROR: process `%s' not recognized.\n", printable(tokens[i]).c_str()); + fprintf(stderr, "ERROR: process `%s' not recognized.\n", printable(tokens[i]).c_str()); is_error = true; break; } @@ -3581,7 +3581,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { if (tokencmp(tokens[0], "profile")) { if (tokens.size() == 1) { - printf("ERROR: Usage: profile <client|list|flow|heap>\n"); + fprintf(stderr, "ERROR: Usage: profile <client|list|flow|heap>\n"); is_error = true; continue; } @@ -3589,13 +3589,13 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { getTransaction(db, tr, options, intrans); tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); if (tokens.size() == 2) { - printf("ERROR: Usage: profile client <get|set>\n"); + fprintf(stderr, "ERROR: Usage: profile client <get|set>\n"); is_error = true; continue; } if (tokencmp(tokens[2], "get")) { if (tokens.size() != 3) { - printf("ERROR: Addtional arguments to `get` are not supported.\n"); + fprintf(stderr, "ERROR: Addtional arguments to `get` are not supported.\n"); is_error = true; continue; } @@ -3620,7 +3620,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { } if (tokencmp(tokens[2], "set")) { if (tokens.size() != 5) { - printf("ERROR: Usage: profile client set <RATE|default> <SIZE|default>\n"); + fprintf(stderr, "ERROR: Usage: profile client set <RATE|default> <SIZE|default>\n"); is_error = true; continue; } @@ -3631,7 +3631,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { char* end; sampleRate = std::strtod((const char*)tokens[3].begin(), &end); if (!std::isspace(*end)) { - printf("ERROR: %s failed to parse.\n", printable(tokens[3]).c_str()); + fprintf(stderr, "ERROR: %s failed to parse.\n", printable(tokens[3]).c_str()); is_error = true; continue; } @@ -3644,7 +3644,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { if (parsed.present()) { sizeLimit = parsed.get(); } else { - printf("ERROR: `%s` failed to parse.\n", printable(tokens[4]).c_str()); + fprintf(stderr, "ERROR: `%s` failed to parse.\n", printable(tokens[4]).c_str()); is_error = true; continue; } @@ -3656,13 +3656,13 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { } continue; } - printf("ERROR: Unknown action: %s\n", printable(tokens[2]).c_str()); + fprintf(stderr, "ERROR: Unknown action: %s\n", printable(tokens[2]).c_str()); is_error = true; continue; } if (tokencmp(tokens[1], "list")) { if (tokens.size() != 2) { - printf("ERROR: Usage: profile list\n"); + fprintf(stderr, "ERROR: Usage: profile list\n"); is_error = true; continue; } @@ -3683,13 +3683,13 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { } if (tokencmp(tokens[1], "flow")) { if (tokens.size() == 2) { - printf("ERROR: Usage: profile flow <run>\n"); + fprintf(stderr, "ERROR: Usage: profile flow <run>\n"); is_error = true; continue; } if (tokencmp(tokens[2], "run")) { if (tokens.size() < 6) { - printf("ERROR: Usage: profile flow run <DURATION_IN_SECONDS> <FILENAME> <PROCESS...>\n"); + fprintf(stderr, "ERROR: Usage: profile flow run <DURATION_IN_SECONDS> <FILENAME> <PROCESS...>\n"); is_error = true; continue; } @@ -3702,7 +3702,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { char *duration_end; int duration = std::strtol((const char*)tokens[3].begin(), &duration_end, 10); if (!std::isspace(*duration_end)) { - printf("ERROR: Failed to parse %s as an integer.", printable(tokens[3]).c_str()); + fprintf(stderr, "ERROR: Failed to parse %s as an integer.", printable(tokens[3]).c_str()); is_error = true; continue; } @@ -3727,7 +3727,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { for (int tokenidx = 5; tokenidx < tokens.size(); tokenidx++) { auto element = interfaces.find(tokens[tokenidx]); if (element == interfaces.end()) { - printf("ERROR: process '%s' not recognized.\n", printable(tokens[tokenidx]).c_str()); + fprintf(stderr, "ERROR: process '%s' not recognized.\n", printable(tokens[tokenidx]).c_str()); is_error = true; } } @@ -3745,7 +3745,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { for (int i = 0; i < all_profiler_responses.size(); i++) { const ErrorOr<Void>& err = all_profiler_responses[i].get(); if (err.isError()) { - printf("ERROR: %s: %s: %s\n", printable(all_profiler_addresses[i]).c_str(), err.getError().name(), err.getError().what()); + fprintf(stderr, "ERROR: %s: %s: %s\n", printable(all_profiler_addresses[i]).c_str(), err.getError().name(), err.getError().what()); } } } @@ -3756,7 +3756,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { } if (tokencmp(tokens[1], "heap")) { if (tokens.size() != 3) { - printf("ERROR: Usage: profile heap <PROCESS>\n"); + fprintf(stderr, "ERROR: Usage: profile heap <PROCESS>\n"); is_error = true; continue; } @@ -3772,7 +3772,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { } state Key ip_port = tokens[2]; if (interfaces.find(ip_port) == interfaces.end()) { - printf("ERROR: host %s not found\n", printable(ip_port).c_str()); + fprintf(stderr, "ERROR: host %s not found\n", printable(ip_port).c_str()); is_error = true; continue; } @@ -3780,11 +3780,11 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { profileRequest.outputFile = LiteralStringRef("heapz"); ErrorOr<Void> response = wait(interfaces[ip_port].profiler.tryGetReply(profileRequest)); if (response.isError()) { - printf("ERROR: %s: %s: %s\n", printable(ip_port).c_str(), response.getError().name(), response.getError().what()); + fprintf(stderr, "ERROR: %s: %s: %s\n", printable(ip_port).c_str(), response.getError().name(), response.getError().what()); } continue; } - printf("ERROR: Unknown type: %s\n", printable(tokens[1]).c_str()); + fprintf(stderr, "ERROR: Unknown type: %s\n", printable(tokens[1]).c_str()); is_error = true; continue; } @@ -3817,14 +3817,14 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { tr->set(LiteralStringRef("\xff\xff/reboot_and_check_worker"), it.second.first); } if (address_interface.size() == 0) { - printf("ERROR: no processes to check. You must run the `expensive_data_check’ command before running `expensive_data_check all’.\n"); + fprintf(stderr, "ERROR: no processes to check. You must run the `expensive_data_check’ command before running `expensive_data_check all’.\n"); } else { printf("Attempted to kill and check %zu processes\n", address_interface.size()); } } else { for(int i = 1; i < tokens.size(); i++) { if(!address_interface.count(tokens[i])) { - printf("ERROR: process `%s' not recognized.\n", printable(tokens[i]).c_str()); + fprintf(stderr, "ERROR: process `%s' not recognized.\n", printable(tokens[i]).c_str()); is_error = true; break; } @@ -3854,7 +3854,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { // limit at the (already absurd) // nearly-a-billion if (tokens[3].size() > 9) { - printf("ERROR: bad limit\n"); + fprintf(stderr, "ERROR: bad limit\n"); is_error = true; continue; } @@ -3870,7 +3870,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { place *= 10; } if (!valid) { - printf("ERROR: bad limit\n"); + fprintf(stderr, "ERROR: bad limit\n"); is_error = true; continue; } @@ -3930,7 +3930,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { if (tokencmp(tokens[0], "set")) { if(!writeMode) { - printf("ERROR: writemode must be enabled to set or clear keys in the database.\n"); + fprintf(stderr, "ERROR: writemode must be enabled to set or clear keys in the database.\n"); is_error = true; continue; } @@ -3951,7 +3951,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { if (tokencmp(tokens[0], "clear")) { if(!writeMode) { - printf("ERROR: writemode must be enabled to set or clear keys in the database.\n"); + fprintf(stderr, "ERROR: writemode must be enabled to set or clear keys in the database.\n"); is_error = true; continue; } @@ -3972,7 +3972,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { if (tokencmp(tokens[0], "clearrange")) { if(!writeMode) { - printf("ERROR: writemode must be enabled to set or clear keys in the database.\n"); + fprintf(stderr, "ERROR: writemode must be enabled to set or clear keys in the database.\n"); is_error = true; continue; } @@ -4048,7 +4048,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { printf("\n"); } else - printf("There are no options enabled\n"); + fprintf(stderr, "There are no options enabled\n"); continue; } @@ -4058,12 +4058,12 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { } else if(tokencmp(tokens[1], "off")) { if(intrans) { - printf("ERROR: Cannot turn option off when using a transaction created with `begin'\n"); + fprintf(stderr, "ERROR: Cannot turn option off when using a transaction created with `begin'\n"); is_error = true; continue; } if(tokens.size() > 3) { - printf("ERROR: Cannot specify option argument when turning option off\n"); + fprintf(stderr, "ERROR: Cannot specify option argument when turning option off\n"); is_error = true; continue; } @@ -4071,7 +4071,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { isOn = false; } else { - printf("ERROR: Invalid option state `%s': option must be turned `on' or `off'\n", formatStringRef(tokens[1]).c_str()); + fprintf(stderr, "ERROR: Invalid option state `%s': option must be turned `on' or `off'\n", formatStringRef(tokens[1]).c_str()); is_error = true; continue; } @@ -4113,7 +4113,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { char *end; throttleListLimit = std::strtol((const char*)tokens[2].begin(), &end, 10); if ((tokens.size() > 3 && !std::isspace(*end)) || (tokens.size() == 3 && *end != '\0')) { - printf("ERROR: failed to parse limit `%s'.\n", printable(tokens[2]).c_str()); + fprintf(stderr, "ERROR: failed to parse limit `%s'.\n", printable(tokens[2]).c_str()); is_error = true; continue; } @@ -4169,12 +4169,12 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { char *end; tpsRate = std::strtod((const char*)tokens[4].begin(), &end); if((tokens.size() > 5 && !std::isspace(*end)) || (tokens.size() == 5 && *end != '\0')) { - printf("ERROR: failed to parse rate `%s'.\n", printable(tokens[4]).c_str()); + fprintf(stderr, "ERROR: failed to parse rate `%s'.\n", printable(tokens[4]).c_str()); is_error = true; continue; } if(tpsRate < 0) { - printf("ERROR: rate cannot be negative `%f'\n", tpsRate); + fprintf(stderr, "ERROR: rate cannot be negative `%f'\n", tpsRate); is_error = true; continue; } @@ -4182,14 +4182,14 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { if(tokens.size() == 6) { Optional<uint64_t> parsedDuration = parseDuration(tokens[5].toString()); if(!parsedDuration.present()) { - printf("ERROR: failed to parse duration `%s'.\n", printable(tokens[5]).c_str()); + fprintf(stderr, "ERROR: failed to parse duration `%s'.\n", printable(tokens[5]).c_str()); is_error = true; continue; } duration = parsedDuration.get(); if(duration == 0) { - printf("ERROR: throttle duration cannot be 0\n"); + fprintf(stderr, "ERROR: throttle duration cannot be 0\n"); is_error = true; continue; } @@ -4205,7 +4205,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { priority = TransactionPriority::BATCH; } else { - printf("ERROR: unrecognized priority `%s'. Must be one of `default',\n `immediate', or `batch'.\n", tokens[6].toString().c_str()); + fprintf(stderr, "ERROR: unrecognized priority `%s'. Must be one of `default',\n `immediate', or `batch'.\n", tokens[6].toString().c_str()); is_error = true; continue; } @@ -4344,7 +4344,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { continue; } - printf("ERROR: Unknown command `%s'. Try `help'?\n", formatStringRef(tokens[0]).c_str()); + fprintf(stderr, "ERROR: Unknown command `%s'. Try `help'?\n", formatStringRef(tokens[0]).c_str()); is_error = true; } @@ -4352,7 +4352,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) { } catch (Error& e) { if(e.code() != error_code_actor_cancelled) - printf("ERROR: %s (%d)\n", e.what(), e.code()); + fprintf(stderr, "ERROR: %s (%d)\n", e.what(), e.code()); is_error = true; if (intrans) { printf("Rolling back current transaction\n"); @@ -4548,7 +4548,7 @@ int main(int argc, char **argv) { printf("\n"); loaded.print(stdout); } catch (Error& e) { - printf("ERROR: %s (%d)\n", e.what(), e.code()); + fprintf(stderr, "ERROR: %s (%d)\n", e.what(), e.code()); printf("Use --log and look at the trace logs for more detailed information on the failure.\n"); return 1; } @@ -4572,7 +4572,7 @@ int main(int argc, char **argv) { return 1; } } catch (Error& e) { - printf("ERROR: %s (%d)\n", e.what(), e.code()); + fprintf(stderr, "ERROR: %s (%d)\n", e.what(), e.code()); return 1; } } From 0e9e53a1631b0e91d35bf8684ce5bdec235a8dca Mon Sep 17 00:00:00 2001 From: Daniel Smith <daniel.smith@datadoghq.com> Date: Mon, 1 Mar 2021 17:20:41 +0000 Subject: [PATCH 28/28] Make the RocksDB init method idempotent --- fdbserver/IKeyValueStore.h | 2 + fdbserver/KeyValueStoreRocksDB.actor.cpp | 52 ++++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/fdbserver/IKeyValueStore.h b/fdbserver/IKeyValueStore.h index 3c82a0b7a8..3e3bfd8d8a 100644 --- a/fdbserver/IKeyValueStore.h +++ b/fdbserver/IKeyValueStore.h @@ -78,6 +78,8 @@ public: commit() read() */ + // `init()` MUST be idempotent as it will be called more than once on a KeyValueStore in case + // of a rollback. virtual Future<Void> init() { return Void(); } diff --git a/fdbserver/KeyValueStoreRocksDB.actor.cpp b/fdbserver/KeyValueStoreRocksDB.actor.cpp index 6a8edf1f7f..061cbf2e96 100644 --- a/fdbserver/KeyValueStoreRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreRocksDB.actor.cpp @@ -123,6 +123,16 @@ struct RocksDBKeyValueStore : IKeyValueStore { } }; void action(OpenAction& a) { + // If the DB has already been initialized, this should be a no-op. + if (db != nullptr) { + TraceEvent(SevInfo, "RocksDB") + .detail("Path", a.path) + .detail("Method", "Open") + .detail("Skipping", "Already Open"); + a.done.send(Void()); + return; + } + std::vector<rocksdb::ColumnFamilyDescriptor> defaultCF = { rocksdb::ColumnFamilyDescriptor{ "default", getCFOptions() } }; std::vector<rocksdb::ColumnFamilyHandle*> handle; @@ -478,3 +488,45 @@ IKeyValueStore* keyValueStoreRocksDB(std::string const& path, UID logID, KeyValu return nullptr; #endif // SSD_ROCKSDB_EXPERIMENTAL } + +#ifdef SSD_ROCKSDB_EXPERIMENTAL +#include "flow/UnitTest.h" + +namespace { + +TEST_CASE("fdbserver/KeyValueStoreRocksDB/Reopen") { + state const std::string rocksDBTestDir = "rocksdb-kvstore-reopen-test-db"; + platform::eraseDirectoryRecursive(rocksDBTestDir); + + state IKeyValueStore* kvStore = new RocksDBKeyValueStore(rocksDBTestDir, deterministicRandom()->randomUniqueID()); + wait(kvStore->init()); + + kvStore->set({ LiteralStringRef("foo"), LiteralStringRef("bar") }); + wait(kvStore->commit(false)); + + Optional<Value> val = wait(kvStore->readValue(LiteralStringRef("foo"))); + ASSERT(Optional<Value>(LiteralStringRef("bar")) == val); + + Future<Void> closed = kvStore->onClosed(); + kvStore->close(); + wait(closed); + + kvStore = new RocksDBKeyValueStore(rocksDBTestDir, deterministicRandom()->randomUniqueID()); + wait(kvStore->init()); + // Confirm that `init()` is idempotent. + wait(kvStore->init()); + + Optional<Value> val = wait(kvStore->readValue(LiteralStringRef("foo"))); + ASSERT(Optional<Value>(LiteralStringRef("bar")) == val); + + Future<Void> closed = kvStore->onClosed(); + kvStore->close(); + wait(closed); + + platform::eraseDirectoryRecursive(rocksDBTestDir); + return Void(); +} + +} // namespace + +#endif // SSD_ROCKSDB_EXPERIMENTAL