diff --git a/bindings/c/CMakeLists.txt b/bindings/c/CMakeLists.txt index 5dc079da48..dce7604936 100644 --- a/bindings/c/CMakeLists.txt +++ b/bindings/c/CMakeLists.txt @@ -257,7 +257,6 @@ endif() add_test(NAME fdb_c_upgrade_single_threaded_630api COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py --build-dir ${CMAKE_BINARY_DIR} - --disable-log-dump --test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadSingleThr.toml --upgrade-path "6.3.23" "7.0.0" "7.2.0" --process-number 1 @@ -266,7 +265,6 @@ endif() add_test(NAME fdb_c_upgrade_single_threaded_700api COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py --build-dir ${CMAKE_BINARY_DIR} - --disable-log-dump --test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadSingleThr.toml --upgrade-path "7.0.0" "7.2.0" --process-number 1 @@ -275,7 +273,6 @@ endif() add_test(NAME fdb_c_upgrade_multi_threaded_630api COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py --build-dir ${CMAKE_BINARY_DIR} - --disable-log-dump --test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml --upgrade-path "6.3.23" "7.0.0" "7.2.0" --process-number 3 @@ -284,7 +281,6 @@ endif() add_test(NAME fdb_c_upgrade_multi_threaded_700api COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py --build-dir ${CMAKE_BINARY_DIR} - --disable-log-dump --test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml --upgrade-path "7.0.0" "7.2.0" --process-number 3 @@ -334,6 +330,7 @@ fdb_install( FILES foundationdb/fdb_c.h ${CMAKE_CURRENT_BINARY_DIR}/foundationdb/fdb_c_options.g.h ${CMAKE_SOURCE_DIR}/fdbclient/vexillographer/fdb.options + ${CMAKE_SOURCE_DIR}/bindings/c/foundationdb/fdb_c_types.h DESTINATION include DESTINATION_SUFFIX /foundationdb COMPONENT clients) diff --git a/bindings/c/test/mako/mako.c b/bindings/c/test/mako/mako.c index 46a75ba387..f51ebf6cb9 100644 --- a/bindings/c/test/mako/mako.c +++ b/bindings/c/test/mako/mako.c @@ -2635,7 +2635,11 @@ void print_report(mako_args_t* args, fseek(f, 0, 0); index = 0; while (index < numPoints) { - fread(&dataPoints[op][k++], sizeof(uint64_t), 1, f); + size_t nread = fread(&dataPoints[op][k++], sizeof(uint64_t), 1, f); + if (nread != 1) { + fprintf(stderr, "ERROR: read failed\n"); + exit(1); + } ++index; } fclose(f); @@ -2759,7 +2763,11 @@ void print_report(mako_args_t* args, char command_remove[NAME_MAX] = { '\0' }; sprintf(command_remove, "rm -rf %s%d", TEMP_DATA_STORE, *pid_main); - system(command_remove); + int ret = system(command_remove); + if (ret != 0) { + fprintf(stderr, "ERROR: system() call failed\n"); + exit(ret); + } for (op = 0; op < MAX_OP; op++) { if (args->txnspec.ops[op][OP_COUNT] > 0 || op == OP_TRANSACTION) { diff --git a/documentation/sphinx/source/mr-status-json-schemas.rst.inc b/documentation/sphinx/source/mr-status-json-schemas.rst.inc index be1bbebb29..d68f4d82b1 100644 --- a/documentation/sphinx/source/mr-status-json-schemas.rst.inc +++ b/documentation/sphinx/source/mr-status-json-schemas.rst.inc @@ -3,6 +3,40 @@ .. code-block:: javascript "cluster":{ + "storage_wiggler": { + "wiggle_server_ids":["0ccb4e0feddb55"], + "wiggle_server_addresses": ["127.0.0.1"], + "primary": { // primary DC storage wiggler stats + // One StorageServer wiggle round is considered 'complete', when all StorageServers with creationTime < T are wiggled + "last_round_start_datetime": "2022-04-02 00:05:05.123 +0000", + "last_round_start_timestamp": 1648857905.123, // when did the latest round start + "last_round_finish_datetime": "1970-01-01 00:00:00.000 +0000", + "last_round_finish_timestamp": 0, // when did the latest finished round finish + "smoothed_round_seconds": 1, // moving average duration of a wiggle round + "finished_round": 1, + // 1 wiggle step as 1 storage server is wiggled in the current round + "last_wiggle_start_datetime": "2022-04-02 00:05:05.123 +0000", + "last_wiggle_start_timestamp": 1648857905.123, // when did the latest wiggle step start + "last_wiggle_finish_datetime": "1970-01-01 00:00:00.000 +0000", + "last_wiggle_finish_timestamp": 0, + "smoothed_wiggle_seconds": 1, + "finished_wiggle": 1 + }, + "remote": { // remote DC storage wiggler stats + "last_round_start_datetime": "2022-04-02 00:05:05.123 +0000", + "last_round_start_timestamp": 1648857905.123, + "last_round_finish_datetime": "1970-01-01 00:00:00.000 +0000", + "last_round_finish_timestamp": 0, + "smoothed_round_seconds": 1, + "finished_round": 1, + "last_wiggle_start_datetime": "2022-04-02 00:05:05.123 +0000", + "last_wiggle_start_timestamp": 1648857905.123, + "last_wiggle_finish_datetime": "1970-01-01 00:00:00.000 +0000", + "last_wiggle_finish_timestamp": 0, + "smoothed_wiggle_seconds": 1, + "finished_wiggle": 1 + } + }, "layers":{ "_valid":true, "_error":"some error description" diff --git a/documentation/sphinx/source/perpetual-storage-wiggle.rst b/documentation/sphinx/source/perpetual-storage-wiggle.rst index 43c4065362..7b1670c971 100644 --- a/documentation/sphinx/source/perpetual-storage-wiggle.rst +++ b/documentation/sphinx/source/perpetual-storage-wiggle.rst @@ -14,7 +14,7 @@ Summary ============ Perpetual storage wiggle is a feature that forces the data distributor to constantly build new storage teams when the cluster is healthy. On a high-level note, the process is like this: -Order storage servers by process id. For each storage server n: +Order storage servers by their created time, from oldest to newest. For each storage server n: a. Exclude storage server n. @@ -22,7 +22,7 @@ b. Wait until all data has been moved off the storage server. c. Include storage n -Goto a to wiggle the next storage process with different process id. +Goto step a to wiggle the next storage server. With a perpetual wiggle, storage migrations will be much less impactful. The wiggler will detect the healthy status based on healthy teams, available disk space and the number of unhealthy relocations. It will pause the wiggle until the cluster is healthy again. @@ -47,7 +47,8 @@ Disable perpetual storage wiggle locality matching filter, which wiggles all the Monitor ======= -The ``status`` command in the FDB :ref:`command line interface <command-line-interface>` will show the current perpetual_storage_wiggle value. +* The ``status`` command will report the IP address of the Storage Server under wiggling. +* The ``status json`` command in the FDB :ref:`command line interface <command-line-interface>` will show the current ``perpetual_storage_wiggle`` value. Plus, the ``cluster.storage_wiggler`` field reports storage wiggle details. Trace Events ---------------------- diff --git a/documentation/sphinx/source/release-notes/release-notes-710.rst b/documentation/sphinx/source/release-notes/release-notes-710.rst index 78412c15bb..84a3405b70 100644 --- a/documentation/sphinx/source/release-notes/release-notes-710.rst +++ b/documentation/sphinx/source/release-notes/release-notes-710.rst @@ -23,6 +23,7 @@ Fixes Status ------ +* Added ``cluster.storage_wiggler`` field report storage wiggle stats `(PR #6219) <https://github.com/apple/foundationdb/pull/6219>`_ Bindings -------- @@ -33,6 +34,7 @@ Other Changes ------------- * OpenTracing support is now deprecated in favor of OpenTelemetry tracing, which will be enabled in a future release. `(PR #6478) <https://github.com/apple/foundationdb/pull/6478/files>`_ * Changed ``memory`` option to limit resident memory instead of virtual memory. Added a new ``memory_vsize`` option if limiting virtual memory is desired. `(PR #6719) <https://github.com/apple/foundationdb/pull/6719>`_ +* Change ``perpetual storage wiggle`` to wiggle the storage servers based on their created time. `(PR #6219) <https://github.com/apple/foundationdb/pull/6219>`_ Earlier release notes --------------------- diff --git a/fdbclient/ConfigTransactionInterface.cpp b/fdbclient/ConfigTransactionInterface.cpp index fab26f3498..1f053a0052 100644 --- a/fdbclient/ConfigTransactionInterface.cpp +++ b/fdbclient/ConfigTransactionInterface.cpp @@ -34,12 +34,16 @@ void ConfigTransactionInterface::setupWellKnownEndpoints() { } ConfigTransactionInterface::ConfigTransactionInterface(NetworkAddress const& remote) - : getGeneration(Endpoint::wellKnown({ remote }, WLTOKEN_CONFIGTXN_GETGENERATION)), + : _id(deterministicRandom()->randomUniqueID()), + getGeneration(Endpoint::wellKnown({ remote }, WLTOKEN_CONFIGTXN_GETGENERATION)), get(Endpoint::wellKnown({ remote }, WLTOKEN_CONFIGTXN_GET)), getClasses(Endpoint::wellKnown({ remote }, WLTOKEN_CONFIGTXN_GETCLASSES)), getKnobs(Endpoint::wellKnown({ remote }, WLTOKEN_CONFIGTXN_GETKNOBS)), commit(Endpoint::wellKnown({ remote }, WLTOKEN_CONFIGTXN_COMMIT)) {} +ConfigTransactionInterface::ConfigTransactionInterface(Hostname const& remote) + : _id(deterministicRandom()->randomUniqueID()), hostname(remote) {} + bool ConfigTransactionInterface::operator==(ConfigTransactionInterface const& rhs) const { return _id == rhs._id; } diff --git a/fdbclient/ConfigTransactionInterface.h b/fdbclient/ConfigTransactionInterface.h index 7489594ad0..98b65e4c4b 100644 --- a/fdbclient/ConfigTransactionInterface.h +++ b/fdbclient/ConfigTransactionInterface.h @@ -200,9 +200,12 @@ public: class RequestStream<ConfigTransactionGetKnobsRequest> getKnobs; class RequestStream<ConfigTransactionCommitRequest> commit; + Optional<Hostname> hostname; + ConfigTransactionInterface(); void setupWellKnownEndpoints(); ConfigTransactionInterface(NetworkAddress const& remote); + ConfigTransactionInterface(Hostname const& remote); bool operator==(ConfigTransactionInterface const& rhs) const; bool operator!=(ConfigTransactionInterface const& rhs) const; @@ -210,6 +213,6 @@ public: template <class Ar> void serialize(Ar& ar) { - serializer(ar, getGeneration, get, getClasses, getKnobs, commit); + serializer(ar, getGeneration, get, getClasses, getKnobs, commit, hostname); } }; diff --git a/fdbclient/CoordinationInterface.h b/fdbclient/CoordinationInterface.h index c8f252088c..3f2f8bd385 100644 --- a/fdbclient/CoordinationInterface.h +++ b/fdbclient/CoordinationInterface.h @@ -104,6 +104,10 @@ public: ConnectionStringStatus status = RESOLVED; AsyncTrigger resolveFinish; + // This function tries to resolve all hostnames once, and return them with coords. + // Best effort, does not guarantee that the resolves succeed. + Future<std::vector<NetworkAddress>> tryResolveHostnames(); + std::vector<NetworkAddress> coords; std::vector<Hostname> hostnames; std::unordered_map<NetworkAddress, Hostname> networkAddressToHostname; diff --git a/fdbclient/MonitorLeader.actor.cpp b/fdbclient/MonitorLeader.actor.cpp index 678c47eba0..0b80e5076c 100644 --- a/fdbclient/MonitorLeader.actor.cpp +++ b/fdbclient/MonitorLeader.actor.cpp @@ -364,6 +364,29 @@ TEST_CASE("/fdbclient/MonitorLeader/ConnectionString") { return Void(); } +ACTOR Future<std::vector<NetworkAddress>> tryResolveHostnamesImpl(ClusterConnectionString* self) { + state std::set<NetworkAddress> allCoordinatorsSet; + std::vector<Future<Void>> fs; + for (auto& hostname : self->hostnames) { + fs.push_back(map(hostname.resolve(), [&](Optional<NetworkAddress> const& addr) -> Void { + if (addr.present()) { + allCoordinatorsSet.insert(addr.get()); + } + return Void(); + })); + } + wait(waitForAll(fs)); + for (const auto& coord : self->coords) { + allCoordinatorsSet.insert(coord); + } + std::vector<NetworkAddress> allCoordinators(allCoordinatorsSet.begin(), allCoordinatorsSet.end()); + return allCoordinators; +} + +Future<std::vector<NetworkAddress>> ClusterConnectionString::tryResolveHostnames() { + return tryResolveHostnamesImpl(this); +} + TEST_CASE("/fdbclient/MonitorLeader/PartialResolve") { std::string connectionString = "TestCluster:0@host.name:1234,host-name:5678"; std::string hn = "host-name", port = "5678"; @@ -373,19 +396,9 @@ TEST_CASE("/fdbclient/MonitorLeader/PartialResolve") { INetworkConnections::net()->addMockTCPEndpoint(hn, port, { address }); state ClusterConnectionString cs(connectionString); - - state std::unordered_set<NetworkAddress> coordinatorAddresses; - std::vector<Future<Void>> fs; - for (auto& hostname : cs.hostnames) { - fs.push_back(map(hostname.resolve(), [&](Optional<NetworkAddress> const& addr) -> Void { - if (addr.present()) { - coordinatorAddresses.insert(addr.get()); - } - return Void(); - })); - } - wait(waitForAll(fs)); - ASSERT(coordinatorAddresses.size() == 1 && coordinatorAddresses.count(address) == 1); + state std::vector<NetworkAddress> allCoordinators = wait(cs.tryResolveHostnames()); + ASSERT(allCoordinators.size() == 1 && + std::find(allCoordinators.begin(), allCoordinators.end(), address) != allCoordinators.end()); return Void(); } @@ -585,7 +598,7 @@ ACTOR Future<Void> monitorNominee(Key key, .detail("OldAddr", coord.getLeader.getEndpoint().getPrimaryAddress().toString()); if (rep.getError().code() == error_code_request_maybe_delivered) { // Delay to prevent tight resolving loop due to outdated DNS cache - wait(delay(FLOW_KNOBS->HOSTNAME_RESOLVE_DELAY)); + wait(delay(FLOW_KNOBS->HOSTNAME_RECONNECT_INIT_INTERVAL)); throw coordinators_changed(); } else { throw rep.getError(); diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index d5dac42abe..898f4f9204 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -406,21 +406,21 @@ TransportData::TransportData(uint64_t transportId, int maxWellKnownEndpoints, IP struct ConnectPacket { // The value does not include the size of `connectPacketLength` itself, // but only the other fields of this structure. - uint32_t connectPacketLength; + uint32_t connectPacketLength = 0; ProtocolVersion protocolVersion; // Expect currentProtocolVersion - uint16_t canonicalRemotePort; // Port number to reconnect to the originating process - uint64_t connectionId; // Multi-version clients will use the same Id for both connections, other connections will - // set this to zero. Added at protocol Version 0x0FDB00A444020001. + uint16_t canonicalRemotePort = 0; // Port number to reconnect to the originating process + uint64_t connectionId = 0; // Multi-version clients will use the same Id for both connections, other connections + // will set this to zero. Added at protocol Version 0x0FDB00A444020001. // IP Address to reconnect to the originating process. Only one of these must be populated. - uint32_t canonicalRemoteIp4; + uint32_t canonicalRemoteIp4 = 0; enum ConnectPacketFlags { FLAG_IPV6 = 1 }; - uint16_t flags; - uint8_t canonicalRemoteIp6[16]; + uint16_t flags = 0; + uint8_t canonicalRemoteIp6[16] = { 0 }; - ConnectPacket() { memset(this, 0, sizeof(*this)); } + ConnectPacket() = default; IPAddress canonicalRemoteIp() const { if (isIPv6()) { diff --git a/fdbrpc/genericactors.actor.h b/fdbrpc/genericactors.actor.h index 6226676ae1..955d3f913e 100644 --- a/fdbrpc/genericactors.actor.h +++ b/fdbrpc/genericactors.actor.h @@ -73,10 +73,7 @@ Future<REPLY_TYPE(Req)> retryBrokenPromise(RequestStream<Req, P> to, Req request } ACTOR template <class Req> -Future<ErrorOr<REPLY_TYPE(Req)>> tryGetReplyFromHostname(RequestStream<Req>* to, - Req request, - Hostname hostname, - WellKnownEndpoints token) { +Future<ErrorOr<REPLY_TYPE(Req)>> tryGetReplyFromHostname(Req request, Hostname hostname, WellKnownEndpoints token) { // A wrapper of tryGetReply(request), except that the request is sent to an address resolved from a hostname. // If resolving fails, return lookup_failed(). // Otherwise, return tryGetReply(request). @@ -84,8 +81,8 @@ Future<ErrorOr<REPLY_TYPE(Req)>> tryGetReplyFromHostname(RequestStream<Req>* to, if (!address.present()) { return ErrorOr<REPLY_TYPE(Req)>(lookup_failed()); } - *to = RequestStream<Req>(Endpoint::wellKnown({ address.get() }, token)); - ErrorOr<REPLY_TYPE(Req)> reply = wait(to->tryGetReply(request)); + RequestStream<Req> to(Endpoint::wellKnown({ address.get() }, token)); + state ErrorOr<REPLY_TYPE(Req)> reply = wait(to.tryGetReply(request)); if (reply.isError()) { resetReply(request); if (reply.getError().code() == error_code_request_maybe_delivered) { @@ -98,8 +95,7 @@ Future<ErrorOr<REPLY_TYPE(Req)>> tryGetReplyFromHostname(RequestStream<Req>* to, } ACTOR template <class Req> -Future<ErrorOr<REPLY_TYPE(Req)>> tryGetReplyFromHostname(RequestStream<Req>* to, - Req request, +Future<ErrorOr<REPLY_TYPE(Req)>> tryGetReplyFromHostname(Req request, Hostname hostname, WellKnownEndpoints token, TaskPriority taskID) { @@ -110,8 +106,8 @@ Future<ErrorOr<REPLY_TYPE(Req)>> tryGetReplyFromHostname(RequestStream<Req>* to, if (!address.present()) { return ErrorOr<REPLY_TYPE(Req)>(lookup_failed()); } - *to = RequestStream<Req>(Endpoint::wellKnown({ address.get() }, token)); - ErrorOr<REPLY_TYPE(Req)> reply = wait(to->tryGetReply(request, taskID)); + RequestStream<Req> to(Endpoint::wellKnown({ address.get() }, token)); + state ErrorOr<REPLY_TYPE(Req)> reply = wait(to.tryGetReply(request, taskID)); if (reply.isError()) { resetReply(request); if (reply.getError().code() == error_code_request_maybe_delivered) { @@ -124,21 +120,21 @@ Future<ErrorOr<REPLY_TYPE(Req)>> tryGetReplyFromHostname(RequestStream<Req>* to, } ACTOR template <class Req> -Future<REPLY_TYPE(Req)> retryGetReplyFromHostname(RequestStream<Req>* to, - Req request, - Hostname hostname, - WellKnownEndpoints token) { +Future<REPLY_TYPE(Req)> retryGetReplyFromHostname(Req request, Hostname hostname, WellKnownEndpoints token) { // Like tryGetReplyFromHostname, except that request_maybe_delivered results in re-resolving the hostname. // Suitable for use with hostname, where RequestStream is NOT initialized yet. // Not normally useful for endpoints initialized with NetworkAddress. + state double reconnetInterval = FLOW_KNOBS->HOSTNAME_RECONNECT_INIT_INTERVAL; loop { NetworkAddress address = wait(hostname.resolveWithRetry()); - *to = RequestStream<Req>(Endpoint::wellKnown({ address }, token)); - ErrorOr<REPLY_TYPE(Req)> reply = wait(to->tryGetReply(request)); + RequestStream<Req> to(Endpoint::wellKnown({ address }, token)); + state ErrorOr<REPLY_TYPE(Req)> reply = wait(to.tryGetReply(request)); if (reply.isError()) { resetReply(request); if (reply.getError().code() == error_code_request_maybe_delivered) { // Connection failure. + wait(delay(reconnetInterval)); + reconnetInterval = std::min(2 * reconnetInterval, FLOW_KNOBS->HOSTNAME_RECONNECT_MAX_INTERVAL); hostname.resetToUnresolved(); INetworkConnections::net()->removeCachedDNS(hostname.host, hostname.service); } else { @@ -151,22 +147,24 @@ Future<REPLY_TYPE(Req)> retryGetReplyFromHostname(RequestStream<Req>* to, } ACTOR template <class Req> -Future<REPLY_TYPE(Req)> retryGetReplyFromHostname(RequestStream<Req>* to, - Req request, +Future<REPLY_TYPE(Req)> retryGetReplyFromHostname(Req request, Hostname hostname, WellKnownEndpoints token, TaskPriority taskID) { // Like tryGetReplyFromHostname, except that request_maybe_delivered results in re-resolving the hostname. // Suitable for use with hostname, where RequestStream is NOT initialized yet. // Not normally useful for endpoints initialized with NetworkAddress. + state double reconnetInterval = FLOW_KNOBS->HOSTNAME_RECONNECT_INIT_INTERVAL; loop { NetworkAddress address = wait(hostname.resolveWithRetry()); - *to = RequestStream<Req>(Endpoint::wellKnown({ address }, token)); - ErrorOr<REPLY_TYPE(Req)> reply = wait(to->tryGetReply(request, taskID)); + RequestStream<Req> to(Endpoint::wellKnown({ address }, token)); + state ErrorOr<REPLY_TYPE(Req)> reply = wait(to.tryGetReply(request, taskID)); if (reply.isError()) { resetReply(request); if (reply.getError().code() == error_code_request_maybe_delivered) { // Connection failure. + wait(delay(reconnetInterval)); + reconnetInterval = std::min(2 * reconnetInterval, FLOW_KNOBS->HOSTNAME_RECONNECT_MAX_INTERVAL); hostname.resetToUnresolved(); INetworkConnections::net()->removeCachedDNS(hostname.host, hostname.service); } else { diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index 21e42a5f5c..a8160f82a5 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -1097,18 +1097,24 @@ void haltRegisteringOrCurrentSingleton(ClusterControllerData* self, } } -void registerWorker(RegisterWorkerRequest req, - ClusterControllerData* self, - std::unordered_set<NetworkAddress> coordinatorAddresses, - ConfigBroadcaster* configBroadcaster) { +ACTOR Future<Void> registerWorker(RegisterWorkerRequest req, + ClusterControllerData* self, + ClusterConnectionString cs, + ConfigBroadcaster* configBroadcaster) { + std::vector<NetworkAddress> coordinatorAddresses = wait(cs.tryResolveHostnames()); + const WorkerInterface& w = req.wi; ProcessClass newProcessClass = req.processClass; auto info = self->id_worker.find(w.locality.processId()); ClusterControllerPriorityInfo newPriorityInfo = req.priorityInfo; newPriorityInfo.processClassFitness = newProcessClass.machineClassFitness(ProcessClass::ClusterController); + bool isCoordinator = - (coordinatorAddresses.count(req.wi.address()) > 0) || - (req.wi.secondaryAddress().present() && coordinatorAddresses.count(req.wi.secondaryAddress().get()) > 0); + (std::find(coordinatorAddresses.begin(), coordinatorAddresses.end(), req.wi.address()) != + coordinatorAddresses.end()) || + (req.wi.secondaryAddress().present() && + std::find(coordinatorAddresses.begin(), coordinatorAddresses.end(), req.wi.secondaryAddress().get()) != + coordinatorAddresses.end()); for (auto it : req.incompatiblePeers) { self->db.incompatibleConnections[it] = now() + SERVER_KNOBS->INCOMPATIBLE_PEERS_LOGGING_INTERVAL; @@ -1271,6 +1277,8 @@ void registerWorker(RegisterWorkerRequest req, if (!req.reply.isSet() && newPriorityInfo != req.priorityInfo) { req.reply.send(RegisterWorkerReply(newProcessClass, newPriorityInfo)); } + + return Void(); } #define TIME_KEEPER_VERSION LiteralStringRef("1") @@ -2543,30 +2551,12 @@ ACTOR Future<Void> clusterControllerCore(Reference<IClusterConnectionRecord> con when(RecruitBlobWorkerRequest req = waitNext(interf.recruitBlobWorker.getFuture())) { clusterRecruitBlobWorker(&self, req); } - when(state RegisterWorkerRequest req = waitNext(interf.registerWorker.getFuture())) { + when(RegisterWorkerRequest req = waitNext(interf.registerWorker.getFuture())) { ++self.registerWorkerRequests; - state ClusterConnectionString ccs = coordinators.ccr->getConnectionString(); - - state std::unordered_set<NetworkAddress> coordinatorAddresses; - std::vector<Future<Void>> fs; - for (auto& hostname : ccs.hostnames) { - fs.push_back(map(hostname.resolve(), [&](Optional<NetworkAddress> const& addr) -> Void { - if (addr.present()) { - coordinatorAddresses.insert(addr.get()); - } - return Void(); - })); - } - wait(waitForAll(fs)); - - for (const auto& coord : ccs.coordinators()) { - coordinatorAddresses.insert(coord); - } - - registerWorker(req, - &self, - coordinatorAddresses, - (configDBType == ConfigDBType::DISABLED) ? nullptr : &configBroadcaster); + self.addActor.send(registerWorker(req, + &self, + coordinators.ccr->getConnectionString(), + (configDBType == ConfigDBType::DISABLED) ? nullptr : &configBroadcaster)); } when(GetWorkersRequest req = waitNext(interf.getWorkers.getFuture())) { ++self.getWorkersRequests; diff --git a/fdbserver/Coordination.actor.cpp b/fdbserver/Coordination.actor.cpp index 851007fef7..53086e5516 100644 --- a/fdbserver/Coordination.actor.cpp +++ b/fdbserver/Coordination.actor.cpp @@ -75,7 +75,7 @@ struct GenerationRegVal { } }; -GenerationRegInterface::GenerationRegInterface(NetworkAddress remote) +GenerationRegInterface::GenerationRegInterface(NetworkAddress const& remote) : read(Endpoint::wellKnown({ remote }, WLTOKEN_GENERATIONREG_READ)), write(Endpoint::wellKnown({ remote }, WLTOKEN_GENERATIONREG_WRITE)) {} @@ -84,7 +84,7 @@ GenerationRegInterface::GenerationRegInterface(INetwork* local) { write.makeWellKnownEndpoint(WLTOKEN_GENERATIONREG_WRITE, TaskPriority::Coordination); } -LeaderElectionRegInterface::LeaderElectionRegInterface(NetworkAddress remote) +LeaderElectionRegInterface::LeaderElectionRegInterface(NetworkAddress const& remote) : ClientLeaderRegInterface(remote), candidacy(Endpoint::wellKnown({ remote }, WLTOKEN_LEADERELECTIONREG_CANDIDACY)), electionResult(Endpoint::wellKnown({ remote }, WLTOKEN_LEADERELECTIONREG_ELECTIONRESULT)), leaderHeartbeat(Endpoint::wellKnown({ remote }, WLTOKEN_LEADERELECTIONREG_LEADERHEARTBEAT)), diff --git a/fdbserver/CoordinationInterface.h b/fdbserver/CoordinationInterface.h index 461904787c..b966678cc6 100644 --- a/fdbserver/CoordinationInterface.h +++ b/fdbserver/CoordinationInterface.h @@ -53,9 +53,9 @@ struct GenerationRegInterface { // the v2 of the previous generation is the v1 of the next. GenerationRegInterface() {} - GenerationRegInterface(NetworkAddress remote); + GenerationRegInterface(NetworkAddress const& remote); GenerationRegInterface(INetwork* local); - GenerationRegInterface(Hostname hostname) : hostname(hostname){}; + GenerationRegInterface(Hostname const& hostname) : hostname(hostname){}; }; struct UniqueGeneration { @@ -128,9 +128,9 @@ struct LeaderElectionRegInterface : ClientLeaderRegInterface { RequestStream<struct ForwardRequest> forward; LeaderElectionRegInterface() {} - LeaderElectionRegInterface(NetworkAddress remote); + LeaderElectionRegInterface(NetworkAddress const& remote); LeaderElectionRegInterface(INetwork* local); - LeaderElectionRegInterface(Hostname hostname) : ClientLeaderRegInterface(hostname) {} + LeaderElectionRegInterface(Hostname const& hostname) : ClientLeaderRegInterface(hostname) {} }; struct CandidacyRequest { @@ -220,7 +220,7 @@ class ConfigNode; class ServerCoordinators : public ClientCoordinators { public: - explicit ServerCoordinators(Reference<IClusterConnectionRecord>); + explicit ServerCoordinators(Reference<IClusterConnectionRecord> ccr); std::vector<LeaderElectionRegInterface> leaderElectionServers; std::vector<GenerationRegInterface> stateServers; diff --git a/fdbserver/FDBExecHelper.actor.cpp b/fdbserver/FDBExecHelper.actor.cpp index 3e34fc8e25..fa93d91b00 100644 --- a/fdbserver/FDBExecHelper.actor.cpp +++ b/fdbserver/FDBExecHelper.actor.cpp @@ -242,7 +242,9 @@ ACTOR Future<int> spawnProcess(std::string binPath, static auto fork_child(const std::string& path, std::vector<char*>& paramList) { int pipefd[2]; - pipe(pipefd); + if (pipe(pipefd) != 0) { + return std::make_pair(-1, Optional<int>{}); + } auto readFD = pipefd[0]; auto writeFD = pipefd[1]; pid_t pid = fork(); diff --git a/fdbserver/LeaderElection.actor.cpp b/fdbserver/LeaderElection.actor.cpp index 04f7923a1a..272e07c1ec 100644 --- a/fdbserver/LeaderElection.actor.cpp +++ b/fdbserver/LeaderElection.actor.cpp @@ -51,7 +51,7 @@ ACTOR Future<Void> submitCandidacy(Key key, .detail("OldAddr", coord.candidacy.getEndpoint().getPrimaryAddress().toString()); if (rep.getError().code() == error_code_request_maybe_delivered) { // Delay to prevent tight resolving loop due to outdated DNS cache - wait(delay(FLOW_KNOBS->HOSTNAME_RESOLVE_DELAY)); + wait(delay(FLOW_KNOBS->HOSTNAME_RECONNECT_INIT_INTERVAL)); throw coordinators_changed(); } else { throw rep.getError(); diff --git a/fdbserver/fdbserver.actor.cpp b/fdbserver/fdbserver.actor.cpp index d27d65d97e..3a2cc88f0a 100644 --- a/fdbserver/fdbserver.actor.cpp +++ b/fdbserver/fdbserver.actor.cpp @@ -663,7 +663,7 @@ static void printUsage(const char* name, bool devhelp) { " collector_endpoint -- IP:PORT of the fluentd server\n" " collector_protocol -- UDP or TCP (default is UDP)"); #ifndef TLS_DISABLED - printf(TLS_HELP); + printf("%s", TLS_HELP); #endif printOptionUsage("-v, --version", "Print version information and exit."); printOptionUsage("-h, -?, --help", "Display this help and exit."); diff --git a/fdbserver/workloads/EncryptionOps.actor.cpp b/fdbserver/workloads/EncryptionOps.actor.cpp index c0fd7e43a8..11959aaacc 100644 --- a/fdbserver/workloads/EncryptionOps.actor.cpp +++ b/fdbserver/workloads/EncryptionOps.actor.cpp @@ -121,7 +121,6 @@ struct EncryptionOpsWorkload : TestWorkload { EncryptCipherDomainId maxDomainId; EncryptCipherBaseKeyId minBaseCipherId; EncryptCipherBaseKeyId headerBaseCipherId; - EncryptCipherRandomSalt headerRandomSalt; EncryptionOpsWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) { mode = getOption(options, LiteralStringRef("fixedSize"), 1); @@ -135,7 +134,6 @@ struct EncryptionOpsWorkload : TestWorkload { maxDomainId = deterministicRandom()->randomInt(minDomainId, minDomainId + 10) + 5; minBaseCipherId = 100; headerBaseCipherId = wcx.clientId * 100 + 1; - headerRandomSalt = wcx.clientId * 100 + 1; metrics = std::make_unique<WorkloadMetrics>(); @@ -185,8 +183,7 @@ struct EncryptionOpsWorkload : TestWorkload { // insert the Encrypt Header cipherKey generateRandomBaseCipher(AES_256_KEY_LENGTH, &buff[0], &cipherLen); - cipherKeyCache->insertCipherKey( - ENCRYPT_HEADER_DOMAIN_ID, headerBaseCipherId, buff, cipherLen, headerRandomSalt); + cipherKeyCache->insertCipherKey(ENCRYPT_HEADER_DOMAIN_ID, headerBaseCipherId, buff, cipherLen); TraceEvent("SetupCipherEssentials_Done").detail("MinDomainId", minDomainId).detail("MaxDomainId", maxDomainId); } @@ -212,29 +209,6 @@ struct EncryptionOpsWorkload : TestWorkload { TraceEvent("UpdateBaseCipher").detail("DomainId", encryptDomainId).detail("BaseCipherId", *nextBaseCipherId); } - Reference<BlobCipherKey> getEncryptionKey(const EncryptCipherDomainId& domainId, - const EncryptCipherBaseKeyId& baseCipherId, - const EncryptCipherRandomSalt& salt) { - const bool simCacheMiss = deterministicRandom()->randomInt(1, 100) < 15; - - Reference<BlobCipherKeyCache> cipherKeyCache = BlobCipherKeyCache::getInstance(); - Reference<BlobCipherKey> cipherKey = cipherKeyCache->getCipherKey(domainId, baseCipherId, salt); - - if (simCacheMiss) { - TraceEvent("SimKeyCacheMiss").detail("EncyrptDomainId", domainId).detail("BaseCipherId", baseCipherId); - // simulate KeyCache miss that may happen during decryption; insert a CipherKey with known 'salt' - cipherKeyCache->insertCipherKey(domainId, - baseCipherId, - cipherKey->rawBaseCipher(), - cipherKey->getBaseCipherLen(), - cipherKey->getSalt()); - // Ensure the update was a NOP - Reference<BlobCipherKey> cKey = cipherKeyCache->getCipherKey(domainId, baseCipherId, salt); - ASSERT(cKey->isEqual(cipherKey)); - } - return cipherKey; - } - Reference<EncryptBuf> doEncryption(Reference<BlobCipherKey> textCipherKey, Reference<BlobCipherKey> headerCipherKey, uint8_t* payload, @@ -266,12 +240,11 @@ struct EncryptionOpsWorkload : TestWorkload { ASSERT_EQ(header.flags.headerVersion, EncryptBlobCipherAes265Ctr::ENCRYPT_HEADER_VERSION); ASSERT_EQ(header.flags.encryptMode, ENCRYPT_CIPHER_MODE_AES_256_CTR); - Reference<BlobCipherKey> cipherKey = getEncryptionKey(header.cipherTextDetails.encryptDomainId, - header.cipherTextDetails.baseCipherId, - header.cipherTextDetails.salt); - Reference<BlobCipherKey> headerCipherKey = getEncryptionKey(header.cipherHeaderDetails.encryptDomainId, - header.cipherHeaderDetails.baseCipherId, - header.cipherHeaderDetails.salt); + Reference<BlobCipherKeyCache> cipherKeyCache = BlobCipherKeyCache::getInstance(); + Reference<BlobCipherKey> cipherKey = cipherKeyCache->getCipherKey(header.cipherTextDetails.encryptDomainId, + header.cipherTextDetails.baseCipherId); + Reference<BlobCipherKey> headerCipherKey = cipherKeyCache->getCipherKey( + header.cipherHeaderDetails.encryptDomainId, header.cipherHeaderDetails.baseCipherId); ASSERT(cipherKey.isValid()); ASSERT(cipherKey->isEqual(orgCipherKey)); @@ -324,7 +297,7 @@ struct EncryptionOpsWorkload : TestWorkload { Reference<BlobCipherKey> cipherKey = cipherKeyCache->getLatestCipherKey(encryptDomainId); // Each client working with their own version of encryptHeaderCipherKey, avoid using getLatest() Reference<BlobCipherKey> headerCipherKey = - cipherKeyCache->getCipherKey(ENCRYPT_HEADER_DOMAIN_ID, headerBaseCipherId, headerRandomSalt); + cipherKeyCache->getCipherKey(ENCRYPT_HEADER_DOMAIN_ID, headerBaseCipherId); auto end = std::chrono::high_resolution_clock::now(); metrics->updateKeyDerivationTime(std::chrono::duration<double, std::nano>(end - start).count()); diff --git a/flow/BlobCipher.cpp b/flow/BlobCipher.cpp index b6a2562078..25fa277ab7 100644 --- a/flow/BlobCipher.cpp +++ b/flow/BlobCipher.cpp @@ -19,7 +19,6 @@ */ #include "flow/BlobCipher.h" - #include "flow/EncryptUtils.h" #include "flow/Knobs.h" #include "flow/Error.h" @@ -33,7 +32,6 @@ #include <cstring> #include <memory> #include <string> -#include <utility> #if ENCRYPTION_ENABLED @@ -56,14 +54,12 @@ BlobCipherKey::BlobCipherKey(const EncryptCipherDomainId& domainId, salt = nondeterministicRandom()->randomUInt64(); } initKey(domainId, baseCiph, baseCiphLen, baseCiphId, salt); -} - -BlobCipherKey::BlobCipherKey(const EncryptCipherDomainId& domainId, - const EncryptCipherBaseKeyId& baseCiphId, - const uint8_t* baseCiph, - int baseCiphLen, - const EncryptCipherRandomSalt& salt) { - initKey(domainId, baseCiph, baseCiphLen, baseCiphId, salt); + /*TraceEvent("BlobCipherKey") + .detail("DomainId", domainId) + .detail("BaseCipherId", baseCipherId) + .detail("BaseCipherLen", baseCipherLen) + .detail("RandomSalt", randomSalt) + .detail("CreationTime", creationTime);*/ } void BlobCipherKey::initKey(const EncryptCipherDomainId& domainId, @@ -86,13 +82,6 @@ void BlobCipherKey::initKey(const EncryptCipherDomainId& domainId, applyHmacSha256Derivation(); // update the key creation time creationTime = now(); - - TraceEvent("BlobCipherKey") - .detail("DomainId", domainId) - .detail("BaseCipherId", baseCipherId) - .detail("BaseCipherLen", baseCipherLen) - .detail("RandomSalt", randomSalt) - .detail("CreationTime", creationTime); } void BlobCipherKey::applyHmacSha256Derivation() { @@ -123,77 +112,25 @@ BlobCipherKeyIdCache::BlobCipherKeyIdCache(EncryptCipherDomainId dId) TraceEvent("Init_BlobCipherKeyIdCache").detail("DomainId", domainId); } -BlobCipherKeyIdCacheKey BlobCipherKeyIdCache::getCacheKey(const EncryptCipherBaseKeyId& baseCipherKeyId, - const EncryptCipherRandomSalt& salt) { - return std::make_pair(baseCipherKeyId, salt); -} - Reference<BlobCipherKey> BlobCipherKeyIdCache::getLatestCipherKey() { - return getCipherByBaseCipherId(latestBaseCipherKeyId, latestRandomSalt); + return getCipherByBaseCipherId(latestBaseCipherKeyId); } -Reference<BlobCipherKey> BlobCipherKeyIdCache::getCipherByBaseCipherId(const EncryptCipherBaseKeyId& baseCipherKeyId, - const EncryptCipherRandomSalt& salt) { - BlobCipherKeyIdCacheMapCItr itr = keyIdCache.find(getCacheKey(baseCipherKeyId, salt)); +Reference<BlobCipherKey> BlobCipherKeyIdCache::getCipherByBaseCipherId(EncryptCipherBaseKeyId baseCipherKeyId) { + BlobCipherKeyIdCacheMapCItr itr = keyIdCache.find(baseCipherKeyId); if (itr == keyIdCache.end()) { - TraceEvent("CipherByBaseCipherId_KeyMissing") - .detail("DomainId", domainId) - .detail("BaseCipherId", baseCipherKeyId) - .detail("Salt", salt); throw encrypt_key_not_found(); } return itr->second; } -void BlobCipherKeyIdCache::insertBaseCipherKey(const EncryptCipherBaseKeyId& baseCipherId, +void BlobCipherKeyIdCache::insertBaseCipherKey(EncryptCipherBaseKeyId baseCipherId, const uint8_t* baseCipher, int baseCipherLen) { ASSERT_GT(baseCipherId, ENCRYPT_INVALID_CIPHER_KEY_ID); - // BaseCipherKeys are immutable, given the routine invocation updates 'latestCipher', - // ensure no key-tampering is done - try { - Reference<BlobCipherKey> cipherKey = getLatestCipherKey(); - if (cipherKey->getBaseCipherId() == baseCipherId) { - if (memcmp(cipherKey->rawBaseCipher(), baseCipher, baseCipherLen) == 0) { - TraceEvent("InsertBaseCipherKey_AlreadyPresent") - .detail("BaseCipherKeyId", baseCipherId) - .detail("DomainId", domainId); - // Key is already present; nothing more to do. - return; - } else { - TraceEvent("InsertBaseCipherKey_UpdateCipher") - .detail("BaseCipherKeyId", baseCipherId) - .detail("DomainId", domainId); - throw encrypt_update_cipher(); - } - } - } catch (Error& e) { - if (e.code() != error_code_encrypt_key_not_found) { - throw e; - } - } - - Reference<BlobCipherKey> cipherKey = - makeReference<BlobCipherKey>(domainId, baseCipherId, baseCipher, baseCipherLen); - BlobCipherKeyIdCacheKey cacheKey = getCacheKey(cipherKey->getBaseCipherId(), cipherKey->getSalt()); - keyIdCache.emplace(cacheKey, cipherKey); - - // Update the latest BaseCipherKeyId for the given encryption domain - latestBaseCipherKeyId = baseCipherId; - latestRandomSalt = cipherKey->getSalt(); -} - -void BlobCipherKeyIdCache::insertBaseCipherKey(const EncryptCipherBaseKeyId& baseCipherId, - const uint8_t* baseCipher, - int baseCipherLen, - const EncryptCipherRandomSalt& salt) { - ASSERT_GT(baseCipherId, ENCRYPT_INVALID_CIPHER_KEY_ID); - - BlobCipherKeyIdCacheKey cacheKey = getCacheKey(baseCipherId, salt); - // BaseCipherKeys are immutable, ensure that cached value doesn't get updated. - BlobCipherKeyIdCacheMapCItr itr = keyIdCache.find(cacheKey); + BlobCipherKeyIdCacheMapCItr itr = keyIdCache.find(baseCipherId); if (itr != keyIdCache.end()) { if (memcmp(itr->second->rawBaseCipher(), baseCipher, baseCipherLen) == 0) { TraceEvent("InsertBaseCipherKey_AlreadyPresent") @@ -209,9 +146,9 @@ void BlobCipherKeyIdCache::insertBaseCipherKey(const EncryptCipherBaseKeyId& bas } } - Reference<BlobCipherKey> cipherKey = - makeReference<BlobCipherKey>(domainId, baseCipherId, baseCipher, baseCipherLen, salt); - keyIdCache.emplace(cacheKey, cipherKey); + keyIdCache.emplace(baseCipherId, makeReference<BlobCipherKey>(domainId, baseCipherId, baseCipher, baseCipherLen)); + // Update the latest BaseCipherKeyId for the given encryption domain + latestBaseCipherKeyId = baseCipherId; } void BlobCipherKeyIdCache::cleanup() { @@ -260,41 +197,6 @@ void BlobCipherKeyCache::insertCipherKey(const EncryptCipherDomainId& domainId, } } -void BlobCipherKeyCache::insertCipherKey(const EncryptCipherDomainId& domainId, - const EncryptCipherBaseKeyId& baseCipherId, - const uint8_t* baseCipher, - int baseCipherLen, - const EncryptCipherRandomSalt& salt) { - if (domainId == ENCRYPT_INVALID_DOMAIN_ID || baseCipherId == ENCRYPT_INVALID_CIPHER_KEY_ID) { - throw encrypt_invalid_id(); - } - - try { - auto domainItr = domainCacheMap.find(domainId); - if (domainItr == domainCacheMap.end()) { - // Add mapping to track new encryption domain - Reference<BlobCipherKeyIdCache> keyIdCache = makeReference<BlobCipherKeyIdCache>(domainId); - keyIdCache->insertBaseCipherKey(baseCipherId, baseCipher, baseCipherLen, salt); - domainCacheMap.emplace(domainId, keyIdCache); - } else { - // Track new baseCipher keys - Reference<BlobCipherKeyIdCache> keyIdCache = domainItr->second; - keyIdCache->insertBaseCipherKey(baseCipherId, baseCipher, baseCipherLen, salt); - } - - TraceEvent("InsertCipherKey") - .detail("DomainId", domainId) - .detail("BaseCipherKeyId", baseCipherId) - .detail("Salt", salt); - } catch (Error& e) { - TraceEvent("InsertCipherKey_Failed") - .detail("BaseCipherKeyId", baseCipherId) - .detail("DomainId", domainId) - .detail("Salt", salt); - throw; - } -} - Reference<BlobCipherKey> BlobCipherKeyCache::getLatestCipherKey(const EncryptCipherDomainId& domainId) { auto domainItr = domainCacheMap.find(domainId); if (domainItr == domainCacheMap.end()) { @@ -315,19 +217,17 @@ Reference<BlobCipherKey> BlobCipherKeyCache::getLatestCipherKey(const EncryptCip } Reference<BlobCipherKey> BlobCipherKeyCache::getCipherKey(const EncryptCipherDomainId& domainId, - const EncryptCipherBaseKeyId& baseCipherId, - const EncryptCipherRandomSalt& salt) { + const EncryptCipherBaseKeyId& baseCipherId) { auto domainItr = domainCacheMap.find(domainId); if (domainItr == domainCacheMap.end()) { - TraceEvent("GetCipherKey_MissingDomainId").detail("DomainId", domainId); throw encrypt_key_not_found(); } Reference<BlobCipherKeyIdCache> keyIdCache = domainItr->second; - return keyIdCache->getCipherByBaseCipherId(baseCipherId, salt); + return keyIdCache->getCipherByBaseCipherId(baseCipherId); } -void BlobCipherKeyCache::resetEncryptDomainId(const EncryptCipherDomainId domainId) { +void BlobCipherKeyCache::resetEncyrptDomainId(const EncryptCipherDomainId domainId) { auto domainItr = domainCacheMap.find(domainId); if (domainItr == domainCacheMap.end()) { throw encrypt_key_not_found(); @@ -391,8 +291,8 @@ Reference<EncryptBuf> EncryptBlobCipherAes265Ctr::encrypt(const uint8_t* plainte memset(reinterpret_cast<uint8_t*>(header), 0, sizeof(BlobCipherEncryptHeader)); - // Alloc buffer computation accounts for 'header authentication' generation scheme. If single-auth-token needs - // to be generated, allocate buffer sufficient to append header to the cipherText optimizing memcpy cost. + // Alloc buffer computation accounts for 'header authentication' generation scheme. If single-auth-token needs to be + // generated, allocate buffer sufficient to append header to the cipherText optimizing memcpy cost. const int allocSize = authTokenMode == ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE ? plaintextLen + AES_BLOCK_SIZE + sizeof(BlobCipherEncryptHeader) @@ -440,7 +340,6 @@ Reference<EncryptBuf> EncryptBlobCipherAes265Ctr::encrypt(const uint8_t* plainte // Populate header encryption-key details header->cipherHeaderDetails.encryptDomainId = headerCipherKey->getDomainId(); header->cipherHeaderDetails.baseCipherId = headerCipherKey->getBaseCipherId(); - header->cipherHeaderDetails.salt = headerCipherKey->getSalt(); // Populate header authToken details if (header->flags.authTokenMode == ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE) { @@ -725,8 +624,8 @@ void forceLinkBlobCipherTests() {} // 3. Inserting of 'identical' cipherKey (already cached) more than once works as desired. // 4. Inserting of 'non-identical' cipherKey (already cached) more than once works as desired. // 5. Validation encryption ops (correctness): -// 5.1. Encrypt a buffer followed by decryption of the buffer, validate the contents. -// 5.2. Simulate anomalies such as: EncryptionHeader corruption, authToken mismatch / encryptionMode mismatch etc. +// 5.1. Encyrpt a buffer followed by decryption of the buffer, validate the contents. +// 5.2. Simulate anomalies such as: EncyrptionHeader corruption, authToken mismatch / encryptionMode mismatch etc. // 6. Cache cleanup // 6.1 cleanup cipherKeys by given encryptDomainId // 6.2. Cleanup all cached cipherKeys @@ -740,7 +639,6 @@ TEST_CASE("flow/BlobCipher") { int len; EncryptCipherBaseKeyId keyId; std::unique_ptr<uint8_t[]> key; - EncryptCipherRandomSalt generatedSalt; BaseCipher(const EncryptCipherDomainId& dId, const EncryptCipherBaseKeyId& kId) : domainId(dId), len(deterministicRandom()->randomInt(AES_256_KEY_LENGTH / 2, AES_256_KEY_LENGTH + 1)), @@ -773,8 +671,6 @@ TEST_CASE("flow/BlobCipher") { cipherKeyCache->insertCipherKey( baseCipher->domainId, baseCipher->keyId, baseCipher->key.get(), baseCipher->len); - Reference<BlobCipherKey> fetchedKey = cipherKeyCache->getLatestCipherKey(baseCipher->domainId); - baseCipher->generatedSalt = fetchedKey->getSalt(); } } // insert EncryptHeader BlobCipher key @@ -788,8 +684,7 @@ TEST_CASE("flow/BlobCipher") { for (auto& domainItr : domainKeyMap) { for (auto& baseKeyItr : domainItr.second) { Reference<BaseCipher> baseCipher = baseKeyItr.second; - Reference<BlobCipherKey> cipherKey = - cipherKeyCache->getCipherKey(baseCipher->domainId, baseCipher->keyId, baseCipher->generatedSalt); + Reference<BlobCipherKey> cipherKey = cipherKeyCache->getCipherKey(baseCipher->domainId, baseCipher->keyId); ASSERT(cipherKey.isValid()); // validate common cipher properties - domainId, baseCipherId, baseCipherLen, rawBaseCipher ASSERT_EQ(cipherKey->getBaseCipherId(), baseCipher->keyId); @@ -864,8 +759,7 @@ TEST_CASE("flow/BlobCipher") { .detail("BaseCipherId", header.cipherTextDetails.baseCipherId); Reference<BlobCipherKey> tCipherKeyKey = cipherKeyCache->getCipherKey(header.cipherTextDetails.encryptDomainId, - header.cipherTextDetails.baseCipherId, - header.cipherTextDetails.salt); + header.cipherTextDetails.baseCipherId); ASSERT(tCipherKeyKey->isEqual(cipherKey)); DecryptBlobCipherAes256Ctr decryptor( tCipherKeyKey, Reference<BlobCipherKey>(), &header.cipherTextDetails.iv[0]); @@ -952,11 +846,9 @@ TEST_CASE("flow/BlobCipher") { StringRef(arena, &header.singleAuthToken.authToken[0], AUTH_TOKEN_SIZE).toString()); Reference<BlobCipherKey> tCipherKeyKey = cipherKeyCache->getCipherKey(header.cipherTextDetails.encryptDomainId, - header.cipherTextDetails.baseCipherId, - header.cipherTextDetails.salt); + header.cipherTextDetails.baseCipherId); Reference<BlobCipherKey> hCipherKey = cipherKeyCache->getCipherKey(header.cipherHeaderDetails.encryptDomainId, - header.cipherHeaderDetails.baseCipherId, - header.cipherHeaderDetails.salt); + header.cipherHeaderDetails.baseCipherId); ASSERT(tCipherKeyKey->isEqual(cipherKey)); DecryptBlobCipherAes256Ctr decryptor(tCipherKeyKey, hCipherKey, &header.cipherTextDetails.iv[0]); Reference<EncryptBuf> decrypted = decryptor.decrypt(encrypted->begin(), bufLen, header, arena); @@ -1057,11 +949,9 @@ TEST_CASE("flow/BlobCipher") { StringRef(arena, &header.singleAuthToken.authToken[0], AUTH_TOKEN_SIZE).toString()); Reference<BlobCipherKey> tCipherKey = cipherKeyCache->getCipherKey(header.cipherTextDetails.encryptDomainId, - header.cipherTextDetails.baseCipherId, - header.cipherTextDetails.salt); + header.cipherTextDetails.baseCipherId); Reference<BlobCipherKey> hCipherKey = cipherKeyCache->getCipherKey(header.cipherHeaderDetails.encryptDomainId, - header.cipherHeaderDetails.baseCipherId, - header.cipherHeaderDetails.salt); + header.cipherHeaderDetails.baseCipherId); ASSERT(tCipherKey->isEqual(cipherKey)); DecryptBlobCipherAes256Ctr decryptor(tCipherKey, hCipherKey, &header.cipherTextDetails.iv[0]); @@ -1157,7 +1047,7 @@ TEST_CASE("flow/BlobCipher") { // Validate dropping encyrptDomainId cached keys const EncryptCipherDomainId candidate = deterministicRandom()->randomInt(minDomainId, maxDomainId); - cipherKeyCache->resetEncryptDomainId(candidate); + cipherKeyCache->resetEncyrptDomainId(candidate); std::vector<Reference<BlobCipherKey>> cachedKeys = cipherKeyCache->getAllCiphers(candidate); ASSERT(cachedKeys.empty()); diff --git a/flow/BlobCipher.h b/flow/BlobCipher.h index 3c2e88a54e..19e34ac389 100644 --- a/flow/BlobCipher.h +++ b/flow/BlobCipher.h @@ -82,11 +82,11 @@ private: // This header is persisted along with encrypted buffer, it contains information necessary // to assist decrypting the buffers to serve read requests. // -// The total space overhead is 104 bytes. +// The total space overhead is 96 bytes. #pragma pack(push, 1) // exact fit - no padding typedef struct BlobCipherEncryptHeader { - static constexpr int headerSize = 104; + static constexpr int headerSize = 96; union { struct { uint8_t size; // reading first byte is sufficient to determine header @@ -101,7 +101,7 @@ typedef struct BlobCipherEncryptHeader { // Cipher text encryption information struct { - // Encryption domain boundary identifier. + // Encyrption domain boundary identifier. EncryptCipherDomainId encryptDomainId{}; // BaseCipher encryption key identifier EncryptCipherBaseKeyId baseCipherId{}; @@ -116,8 +116,6 @@ typedef struct BlobCipherEncryptHeader { EncryptCipherDomainId encryptDomainId{}; // BaseCipher encryption key identifier. EncryptCipherBaseKeyId baseCipherId{}; - // Random salt - EncryptCipherRandomSalt salt{}; } cipherHeaderDetails; // Encryption header is stored as plaintext on a persistent storage to assist reconstruction of cipher-key(s) for @@ -166,11 +164,6 @@ public: const EncryptCipherBaseKeyId& baseCiphId, const uint8_t* baseCiph, int baseCiphLen); - BlobCipherKey(const EncryptCipherDomainId& domainId, - const EncryptCipherBaseKeyId& baseCiphId, - const uint8_t* baseCiph, - int baseCiphLen, - const EncryptCipherRandomSalt& salt); uint8_t* data() const { return cipher.get(); } uint64_t getCreationTime() const { return creationTime; } @@ -213,7 +206,7 @@ private: // This interface allows FDB processes participating in encryption to store and // index recently used encyption cipher keys. FDB encryption has two dimensions: // 1. Mapping on cipher encryption keys per "encryption domains" -// 2. Per encryption domain, the cipher keys are index using {baseCipherKeyId, salt} tuple. +// 2. Per encryption domain, the cipher keys are index using "baseCipherKeyId". // // The design supports NIST recommendation of limiting lifetime of an encryption // key. For details refer to: @@ -221,10 +214,10 @@ private: // // Below gives a pictoral representation of in-memory datastructure implemented // to index encryption keys: -// { encryptionDomain -> { {baseCipherId, salt} -> cipherKey } } +// { encryptionDomain -> { baseCipherId -> cipherKey } } // // Supported cache lookups schemes: -// 1. Lookup cipher based on { encryptionDomainId, baseCipherKeyId, salt } triplet. +// 1. Lookup cipher based on { encryptionDomainId, baseCipherKeyId } tuple. // 2. Lookup latest cipher key for a given encryptionDomainId. // // Client is responsible to handle cache-miss usecase, the corrective operation @@ -233,29 +226,15 @@ private: // required encryption key, however, CPs/SSs cache-miss would result in RPC to // EncryptKeyServer to refresh the desired encryption key. -struct pair_hash { - template <class T1, class T2> - std::size_t operator()(const std::pair<T1, T2>& pair) const { - auto hash1 = std::hash<T1>{}(pair.first); - auto hash2 = std::hash<T2>{}(pair.second); - - // Equal hashes XOR would be ZERO. - return hash1 == hash2 ? hash1 : hash1 ^ hash2; - } -}; -using BlobCipherKeyIdCacheKey = std::pair<EncryptCipherBaseKeyId, EncryptCipherRandomSalt>; -using BlobCipherKeyIdCacheMap = std::unordered_map<BlobCipherKeyIdCacheKey, Reference<BlobCipherKey>, pair_hash>; +using BlobCipherKeyIdCacheMap = std::unordered_map<EncryptCipherBaseKeyId, Reference<BlobCipherKey>>; using BlobCipherKeyIdCacheMapCItr = - std::unordered_map<BlobCipherKeyIdCacheKey, Reference<BlobCipherKey>, pair_hash>::const_iterator; + std::unordered_map<EncryptCipherBaseKeyId, Reference<BlobCipherKey>>::const_iterator; struct BlobCipherKeyIdCache : ReferenceCounted<BlobCipherKeyIdCache> { public: BlobCipherKeyIdCache(); explicit BlobCipherKeyIdCache(EncryptCipherDomainId dId); - BlobCipherKeyIdCacheKey getCacheKey(const EncryptCipherBaseKeyId& baseCipherId, - const EncryptCipherRandomSalt& salt); - // API returns the last inserted cipherKey. // If none exists, 'encrypt_key_not_found' is thrown. @@ -264,33 +243,14 @@ public: // API returns cipherKey corresponding to input 'baseCipherKeyId'. // If none exists, 'encrypt_key_not_found' is thrown. - Reference<BlobCipherKey> getCipherByBaseCipherId(const EncryptCipherBaseKeyId& baseCipherKeyId, - const EncryptCipherRandomSalt& salt); + Reference<BlobCipherKey> getCipherByBaseCipherId(EncryptCipherBaseKeyId baseCipherKeyId); // API enables inserting base encryption cipher details to the BlobCipherKeyIdCache. // Given cipherKeys are immutable, attempting to re-insert same 'identical' cipherKey // is treated as a NOP (success), however, an attempt to update cipherKey would throw // 'encrypt_update_cipher' exception. - // - // API NOTE: Recommended usecase is to update encryption cipher-key is updated the external - // keyManagementSolution to limit an encryption key lifetime - void insertBaseCipherKey(const EncryptCipherBaseKeyId& baseCipherId, const uint8_t* baseCipher, int baseCipherLen); - - // API enables inserting base encryption cipher details to the BlobCipherKeyIdCache - // Given cipherKeys are immutable, attempting to re-insert same 'identical' cipherKey - // is treated as a NOP (success), however, an attempt to update cipherKey would throw - // 'encrypt_update_cipher' exception. - // - // API NOTE: Recommended usecase is to update encryption cipher-key regeneration while performing - // decryption. The encryptionheader would contain relevant details including: 'encryptDomainId', - // 'baseCipherId' & 'salt'. The caller needs to fetch 'baseCipherKey' detail and re-populate KeyCache. - // Also, the invocation will NOT update the latest cipher-key details. - - void insertBaseCipherKey(const EncryptCipherBaseKeyId& baseCipherId, - const uint8_t* baseCipher, - int baseCipherLen, - const EncryptCipherRandomSalt& salt); + void insertBaseCipherKey(EncryptCipherBaseKeyId baseCipherId, const uint8_t* baseCipher, int baseCipherLen); // API cleanup the cache by dropping all cached cipherKeys void cleanup(); @@ -302,7 +262,6 @@ private: EncryptCipherDomainId domainId; BlobCipherKeyIdCacheMap keyIdCache; EncryptCipherBaseKeyId latestBaseCipherKeyId; - EncryptCipherRandomSalt latestRandomSalt; }; using BlobCipherDomainCacheMap = std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKeyIdCache>>; @@ -318,32 +277,12 @@ public: // The cipherKeys are indexed using 'baseCipherId', given cipherKeys are immutable, // attempting to re-insert same 'identical' cipherKey is treated as a NOP (success), // however, an attempt to update cipherKey would throw 'encrypt_update_cipher' exception. - // - // API NOTE: Recommended usecase is to update encryption cipher-key is updated the external - // keyManagementSolution to limit an encryption key lifetime void insertCipherKey(const EncryptCipherDomainId& domainId, const EncryptCipherBaseKeyId& baseCipherId, const uint8_t* baseCipher, int baseCipherLen); - - // Enable clients to insert base encryption cipher details to the BlobCipherKeyCache. - // The cipherKeys are indexed using 'baseCipherId', given cipherKeys are immutable, - // attempting to re-insert same 'identical' cipherKey is treated as a NOP (success), - // however, an attempt to update cipherKey would throw 'encrypt_update_cipher' exception. - // - // API NOTE: Recommended usecase is to update encryption cipher-key regeneration while performing - // decryption. The encryptionheader would contain relevant details including: 'encryptDomainId', - // 'baseCipherId' & 'salt'. The caller needs to fetch 'baseCipherKey' detail and re-populate KeyCache. - // Also, the invocation will NOT update the latest cipher-key details. - - void insertCipherKey(const EncryptCipherDomainId& domainId, - const EncryptCipherBaseKeyId& baseCipherId, - const uint8_t* baseCipher, - int baseCipherLen, - const EncryptCipherRandomSalt& salt); - - // API returns the last insert cipherKey for a given encryption domain Id. + // API returns the last insert cipherKey for a given encyryption domain Id. // If none exists, it would throw 'encrypt_key_not_found' exception. Reference<BlobCipherKey> getLatestCipherKey(const EncryptCipherDomainId& domainId); @@ -352,16 +291,14 @@ public: // If none exists, it would throw 'encrypt_key_not_found' exception. Reference<BlobCipherKey> getCipherKey(const EncryptCipherDomainId& domainId, - const EncryptCipherBaseKeyId& baseCipherId, - const EncryptCipherRandomSalt& salt); - + const EncryptCipherBaseKeyId& baseCipherId); // API returns point in time list of all 'cached' cipherKeys for a given encryption domainId. std::vector<Reference<BlobCipherKey>> getAllCiphers(const EncryptCipherDomainId& domainId); // API enables dropping all 'cached' cipherKeys for a given encryption domain Id. // Useful to cleanup cache if an encryption domain gets removed/destroyed etc. - void resetEncryptDomainId(const EncryptCipherDomainId domainId); + void resetEncyrptDomainId(const EncryptCipherDomainId domainId); static Reference<BlobCipherKeyCache> getInstance() { if (g_network->isSimulated()) { @@ -427,7 +364,7 @@ public: const BlobCipherEncryptHeader& header, Arena&); - // Enable caller to validate encryption header auth-token (if available) without needing to read the full encrypted + // Enable caller to validate encryption header auth-token (if available) without needing to read the full encyrpted // payload. The call is NOP unless header.flags.authTokenMode == ENCRYPT_HEADER_AUTH_TOKEN_MODE_MULTI. void verifyHeaderAuthToken(const BlobCipherEncryptHeader& header, Arena& arena); diff --git a/flow/Hostname.actor.cpp b/flow/Hostname.actor.cpp index 6562f2d484..ab89280a44 100644 --- a/flow/Hostname.actor.cpp +++ b/flow/Hostname.actor.cpp @@ -84,13 +84,15 @@ ACTOR Future<Optional<NetworkAddress>> resolveImpl(Hostname* self) { } ACTOR Future<NetworkAddress> resolveWithRetryImpl(Hostname* self) { + state double resolveInterval = FLOW_KNOBS->HOSTNAME_RESOLVE_INIT_INTERVAL; loop { try { Optional<NetworkAddress> address = wait(resolveImpl(self)); if (address.present()) { return address.get(); } - wait(delay(FLOW_KNOBS->HOSTNAME_RESOLVE_DELAY)); + wait(delay(resolveInterval)); + resolveInterval = std::min(2 * resolveInterval, FLOW_KNOBS->HOSTNAME_RESOLVE_MAX_INTERVAL); } catch (Error& e) { ASSERT(e.code() == error_code_actor_cancelled); throw; diff --git a/flow/Hostname.h b/flow/Hostname.h index 0faa4c53df..2492a17370 100644 --- a/flow/Hostname.h +++ b/flow/Hostname.h @@ -74,6 +74,7 @@ struct Hostname { Optional<NetworkAddress> resolvedAddress; enum HostnameStatus { UNRESOLVED, RESOLVING, RESOLVED }; + // The resolve functions below use DNS cache. Future<Optional<NetworkAddress>> resolve(); Future<NetworkAddress> resolveWithRetry(); Optional<NetworkAddress> resolveBlocking(); // This one should only be used when resolving asynchronously is @@ -81,6 +82,11 @@ struct Hostname { void resetToUnresolved(); HostnameStatus status = UNRESOLVED; AsyncTrigger resolveFinish; + + template <class Ar> + void serialize(Ar& ar) { + serializer(ar, host, service, isTLS, resolvedAddress, status); + } }; #endif diff --git a/flow/Knobs.cpp b/flow/Knobs.cpp index 555112c16f..652ff709d2 100644 --- a/flow/Knobs.cpp +++ b/flow/Knobs.cpp @@ -40,7 +40,10 @@ FlowKnobs const* FLOW_KNOBS = &bootstrapGlobalFlowKnobs; void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) { init( AUTOMATIC_TRACE_DUMP, 1 ); init( PREVENT_FAST_SPIN_DELAY, .01 ); - init( HOSTNAME_RESOLVE_DELAY, .05 ); + init( HOSTNAME_RESOLVE_INIT_INTERVAL, .05 ); + init( HOSTNAME_RESOLVE_MAX_INTERVAL, 1.0 ); + init( HOSTNAME_RECONNECT_INIT_INTERVAL, .05 ); + init( HOSTNAME_RECONNECT_MAX_INTERVAL, 1.0 ); init( CACHE_REFRESH_INTERVAL_WHEN_ALL_ALTERNATIVES_FAILED, 1.0 ); init( DELAY_JITTER_OFFSET, 0.9 ); diff --git a/flow/Knobs.h b/flow/Knobs.h index d2faff94af..4234010c37 100644 --- a/flow/Knobs.h +++ b/flow/Knobs.h @@ -113,7 +113,10 @@ class FlowKnobs : public KnobsImpl<FlowKnobs> { public: int AUTOMATIC_TRACE_DUMP; double PREVENT_FAST_SPIN_DELAY; - double HOSTNAME_RESOLVE_DELAY; + double HOSTNAME_RESOLVE_INIT_INTERVAL; + double HOSTNAME_RESOLVE_MAX_INTERVAL; + double HOSTNAME_RECONNECT_INIT_INTERVAL; + double HOSTNAME_RECONNECT_MAX_INTERVAL; double CACHE_REFRESH_INTERVAL_WHEN_ALL_ALTERNATIVES_FAILED; double DELAY_JITTER_OFFSET;