Merge branch 'apple:main' into dbcorever

This commit is contained in:
Bharadwaj V.R 2022-04-21 09:37:29 -07:00 committed by GitHub
commit c20fb6ef6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 214 additions and 344 deletions

View File

@ -257,7 +257,6 @@ endif()
add_test(NAME fdb_c_upgrade_single_threaded_630api
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--disable-log-dump
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadSingleThr.toml
--upgrade-path "6.3.23" "7.0.0" "7.2.0"
--process-number 1
@ -266,7 +265,6 @@ endif()
add_test(NAME fdb_c_upgrade_single_threaded_700api
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--disable-log-dump
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadSingleThr.toml
--upgrade-path "7.0.0" "7.2.0"
--process-number 1
@ -275,7 +273,6 @@ endif()
add_test(NAME fdb_c_upgrade_multi_threaded_630api
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--disable-log-dump
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml
--upgrade-path "6.3.23" "7.0.0" "7.2.0"
--process-number 3
@ -284,7 +281,6 @@ endif()
add_test(NAME fdb_c_upgrade_multi_threaded_700api
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--disable-log-dump
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml
--upgrade-path "7.0.0" "7.2.0"
--process-number 3
@ -334,6 +330,7 @@ fdb_install(
FILES foundationdb/fdb_c.h
${CMAKE_CURRENT_BINARY_DIR}/foundationdb/fdb_c_options.g.h
${CMAKE_SOURCE_DIR}/fdbclient/vexillographer/fdb.options
${CMAKE_SOURCE_DIR}/bindings/c/foundationdb/fdb_c_types.h
DESTINATION include
DESTINATION_SUFFIX /foundationdb
COMPONENT clients)

View File

@ -2635,7 +2635,11 @@ void print_report(mako_args_t* args,
fseek(f, 0, 0);
index = 0;
while (index < numPoints) {
fread(&dataPoints[op][k++], sizeof(uint64_t), 1, f);
size_t nread = fread(&dataPoints[op][k++], sizeof(uint64_t), 1, f);
if (nread != 1) {
fprintf(stderr, "ERROR: read failed\n");
exit(1);
}
++index;
}
fclose(f);
@ -2759,7 +2763,11 @@ void print_report(mako_args_t* args,
char command_remove[NAME_MAX] = { '\0' };
sprintf(command_remove, "rm -rf %s%d", TEMP_DATA_STORE, *pid_main);
system(command_remove);
int ret = system(command_remove);
if (ret != 0) {
fprintf(stderr, "ERROR: system() call failed\n");
exit(ret);
}
for (op = 0; op < MAX_OP; op++) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op == OP_TRANSACTION) {

View File

@ -3,6 +3,40 @@
.. code-block:: javascript
"cluster":{
"storage_wiggler": {
"wiggle_server_ids":["0ccb4e0feddb55"],
"wiggle_server_addresses": ["127.0.0.1"],
"primary": { // primary DC storage wiggler stats
// One StorageServer wiggle round is considered 'complete', when all StorageServers with creationTime < T are wiggled
"last_round_start_datetime": "2022-04-02 00:05:05.123 +0000",
"last_round_start_timestamp": 1648857905.123, // when did the latest round start
"last_round_finish_datetime": "1970-01-01 00:00:00.000 +0000",
"last_round_finish_timestamp": 0, // when did the latest finished round finish
"smoothed_round_seconds": 1, // moving average duration of a wiggle round
"finished_round": 1,
// 1 wiggle step as 1 storage server is wiggled in the current round
"last_wiggle_start_datetime": "2022-04-02 00:05:05.123 +0000",
"last_wiggle_start_timestamp": 1648857905.123, // when did the latest wiggle step start
"last_wiggle_finish_datetime": "1970-01-01 00:00:00.000 +0000",
"last_wiggle_finish_timestamp": 0,
"smoothed_wiggle_seconds": 1,
"finished_wiggle": 1
},
"remote": { // remote DC storage wiggler stats
"last_round_start_datetime": "2022-04-02 00:05:05.123 +0000",
"last_round_start_timestamp": 1648857905.123,
"last_round_finish_datetime": "1970-01-01 00:00:00.000 +0000",
"last_round_finish_timestamp": 0,
"smoothed_round_seconds": 1,
"finished_round": 1,
"last_wiggle_start_datetime": "2022-04-02 00:05:05.123 +0000",
"last_wiggle_start_timestamp": 1648857905.123,
"last_wiggle_finish_datetime": "1970-01-01 00:00:00.000 +0000",
"last_wiggle_finish_timestamp": 0,
"smoothed_wiggle_seconds": 1,
"finished_wiggle": 1
}
},
"layers":{
"_valid":true,
"_error":"some error description"

View File

@ -14,7 +14,7 @@ Summary
============
Perpetual storage wiggle is a feature that forces the data distributor to constantly build new storage teams when the cluster is healthy. On a high-level note, the process is like this:
Order storage servers by process id. For each storage server n:
Order storage servers by their created time, from oldest to newest. For each storage server n:
a. Exclude storage server n.
@ -22,7 +22,7 @@ b. Wait until all data has been moved off the storage server.
c. Include storage n
Goto a to wiggle the next storage process with different process id.
Goto step a to wiggle the next storage server.
With a perpetual wiggle, storage migrations will be much less impactful. The wiggler will detect the healthy status based on healthy teams, available disk space and the number of unhealthy relocations. It will pause the wiggle until the cluster is healthy again.
@ -47,7 +47,8 @@ Disable perpetual storage wiggle locality matching filter, which wiggles all the
Monitor
=======
The ``status`` command in the FDB :ref:`command line interface <command-line-interface>` will show the current perpetual_storage_wiggle value.
* The ``status`` command will report the IP address of the Storage Server under wiggling.
* The ``status json`` command in the FDB :ref:`command line interface <command-line-interface>` will show the current ``perpetual_storage_wiggle`` value. Plus, the ``cluster.storage_wiggler`` field reports storage wiggle details.
Trace Events
----------------------

View File

@ -23,6 +23,7 @@ Fixes
Status
------
* Added ``cluster.storage_wiggler`` field report storage wiggle stats `(PR #6219) <https://github.com/apple/foundationdb/pull/6219>`_
Bindings
--------
@ -33,6 +34,7 @@ Other Changes
-------------
* OpenTracing support is now deprecated in favor of OpenTelemetry tracing, which will be enabled in a future release. `(PR #6478) <https://github.com/apple/foundationdb/pull/6478/files>`_
* Changed ``memory`` option to limit resident memory instead of virtual memory. Added a new ``memory_vsize`` option if limiting virtual memory is desired. `(PR #6719) <https://github.com/apple/foundationdb/pull/6719>`_
* Change ``perpetual storage wiggle`` to wiggle the storage servers based on their created time. `(PR #6219) <https://github.com/apple/foundationdb/pull/6219>`_
Earlier release notes
---------------------

View File

@ -34,12 +34,16 @@ void ConfigTransactionInterface::setupWellKnownEndpoints() {
}
ConfigTransactionInterface::ConfigTransactionInterface(NetworkAddress const& remote)
: getGeneration(Endpoint::wellKnown({ remote }, WLTOKEN_CONFIGTXN_GETGENERATION)),
: _id(deterministicRandom()->randomUniqueID()),
getGeneration(Endpoint::wellKnown({ remote }, WLTOKEN_CONFIGTXN_GETGENERATION)),
get(Endpoint::wellKnown({ remote }, WLTOKEN_CONFIGTXN_GET)),
getClasses(Endpoint::wellKnown({ remote }, WLTOKEN_CONFIGTXN_GETCLASSES)),
getKnobs(Endpoint::wellKnown({ remote }, WLTOKEN_CONFIGTXN_GETKNOBS)),
commit(Endpoint::wellKnown({ remote }, WLTOKEN_CONFIGTXN_COMMIT)) {}
ConfigTransactionInterface::ConfigTransactionInterface(Hostname const& remote)
: _id(deterministicRandom()->randomUniqueID()), hostname(remote) {}
bool ConfigTransactionInterface::operator==(ConfigTransactionInterface const& rhs) const {
return _id == rhs._id;
}

View File

@ -200,9 +200,12 @@ public:
class RequestStream<ConfigTransactionGetKnobsRequest> getKnobs;
class RequestStream<ConfigTransactionCommitRequest> commit;
Optional<Hostname> hostname;
ConfigTransactionInterface();
void setupWellKnownEndpoints();
ConfigTransactionInterface(NetworkAddress const& remote);
ConfigTransactionInterface(Hostname const& remote);
bool operator==(ConfigTransactionInterface const& rhs) const;
bool operator!=(ConfigTransactionInterface const& rhs) const;
@ -210,6 +213,6 @@ public:
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, getGeneration, get, getClasses, getKnobs, commit);
serializer(ar, getGeneration, get, getClasses, getKnobs, commit, hostname);
}
};

View File

@ -104,6 +104,10 @@ public:
ConnectionStringStatus status = RESOLVED;
AsyncTrigger resolveFinish;
// This function tries to resolve all hostnames once, and return them with coords.
// Best effort, does not guarantee that the resolves succeed.
Future<std::vector<NetworkAddress>> tryResolveHostnames();
std::vector<NetworkAddress> coords;
std::vector<Hostname> hostnames;
std::unordered_map<NetworkAddress, Hostname> networkAddressToHostname;

View File

@ -364,6 +364,29 @@ TEST_CASE("/fdbclient/MonitorLeader/ConnectionString") {
return Void();
}
ACTOR Future<std::vector<NetworkAddress>> tryResolveHostnamesImpl(ClusterConnectionString* self) {
state std::set<NetworkAddress> allCoordinatorsSet;
std::vector<Future<Void>> fs;
for (auto& hostname : self->hostnames) {
fs.push_back(map(hostname.resolve(), [&](Optional<NetworkAddress> const& addr) -> Void {
if (addr.present()) {
allCoordinatorsSet.insert(addr.get());
}
return Void();
}));
}
wait(waitForAll(fs));
for (const auto& coord : self->coords) {
allCoordinatorsSet.insert(coord);
}
std::vector<NetworkAddress> allCoordinators(allCoordinatorsSet.begin(), allCoordinatorsSet.end());
return allCoordinators;
}
Future<std::vector<NetworkAddress>> ClusterConnectionString::tryResolveHostnames() {
return tryResolveHostnamesImpl(this);
}
TEST_CASE("/fdbclient/MonitorLeader/PartialResolve") {
std::string connectionString = "TestCluster:0@host.name:1234,host-name:5678";
std::string hn = "host-name", port = "5678";
@ -373,19 +396,9 @@ TEST_CASE("/fdbclient/MonitorLeader/PartialResolve") {
INetworkConnections::net()->addMockTCPEndpoint(hn, port, { address });
state ClusterConnectionString cs(connectionString);
state std::unordered_set<NetworkAddress> coordinatorAddresses;
std::vector<Future<Void>> fs;
for (auto& hostname : cs.hostnames) {
fs.push_back(map(hostname.resolve(), [&](Optional<NetworkAddress> const& addr) -> Void {
if (addr.present()) {
coordinatorAddresses.insert(addr.get());
}
return Void();
}));
}
wait(waitForAll(fs));
ASSERT(coordinatorAddresses.size() == 1 && coordinatorAddresses.count(address) == 1);
state std::vector<NetworkAddress> allCoordinators = wait(cs.tryResolveHostnames());
ASSERT(allCoordinators.size() == 1 &&
std::find(allCoordinators.begin(), allCoordinators.end(), address) != allCoordinators.end());
return Void();
}
@ -585,7 +598,7 @@ ACTOR Future<Void> monitorNominee(Key key,
.detail("OldAddr", coord.getLeader.getEndpoint().getPrimaryAddress().toString());
if (rep.getError().code() == error_code_request_maybe_delivered) {
// Delay to prevent tight resolving loop due to outdated DNS cache
wait(delay(FLOW_KNOBS->HOSTNAME_RESOLVE_DELAY));
wait(delay(FLOW_KNOBS->HOSTNAME_RECONNECT_INIT_INTERVAL));
throw coordinators_changed();
} else {
throw rep.getError();

View File

@ -406,21 +406,21 @@ TransportData::TransportData(uint64_t transportId, int maxWellKnownEndpoints, IP
struct ConnectPacket {
// The value does not include the size of `connectPacketLength` itself,
// but only the other fields of this structure.
uint32_t connectPacketLength;
uint32_t connectPacketLength = 0;
ProtocolVersion protocolVersion; // Expect currentProtocolVersion
uint16_t canonicalRemotePort; // Port number to reconnect to the originating process
uint64_t connectionId; // Multi-version clients will use the same Id for both connections, other connections will
// set this to zero. Added at protocol Version 0x0FDB00A444020001.
uint16_t canonicalRemotePort = 0; // Port number to reconnect to the originating process
uint64_t connectionId = 0; // Multi-version clients will use the same Id for both connections, other connections
// will set this to zero. Added at protocol Version 0x0FDB00A444020001.
// IP Address to reconnect to the originating process. Only one of these must be populated.
uint32_t canonicalRemoteIp4;
uint32_t canonicalRemoteIp4 = 0;
enum ConnectPacketFlags { FLAG_IPV6 = 1 };
uint16_t flags;
uint8_t canonicalRemoteIp6[16];
uint16_t flags = 0;
uint8_t canonicalRemoteIp6[16] = { 0 };
ConnectPacket() { memset(this, 0, sizeof(*this)); }
ConnectPacket() = default;
IPAddress canonicalRemoteIp() const {
if (isIPv6()) {

View File

@ -73,10 +73,7 @@ Future<REPLY_TYPE(Req)> retryBrokenPromise(RequestStream<Req, P> to, Req request
}
ACTOR template <class Req>
Future<ErrorOr<REPLY_TYPE(Req)>> tryGetReplyFromHostname(RequestStream<Req>* to,
Req request,
Hostname hostname,
WellKnownEndpoints token) {
Future<ErrorOr<REPLY_TYPE(Req)>> tryGetReplyFromHostname(Req request, Hostname hostname, WellKnownEndpoints token) {
// A wrapper of tryGetReply(request), except that the request is sent to an address resolved from a hostname.
// If resolving fails, return lookup_failed().
// Otherwise, return tryGetReply(request).
@ -84,8 +81,8 @@ Future<ErrorOr<REPLY_TYPE(Req)>> tryGetReplyFromHostname(RequestStream<Req>* to,
if (!address.present()) {
return ErrorOr<REPLY_TYPE(Req)>(lookup_failed());
}
*to = RequestStream<Req>(Endpoint::wellKnown({ address.get() }, token));
ErrorOr<REPLY_TYPE(Req)> reply = wait(to->tryGetReply(request));
RequestStream<Req> to(Endpoint::wellKnown({ address.get() }, token));
state ErrorOr<REPLY_TYPE(Req)> reply = wait(to.tryGetReply(request));
if (reply.isError()) {
resetReply(request);
if (reply.getError().code() == error_code_request_maybe_delivered) {
@ -98,8 +95,7 @@ Future<ErrorOr<REPLY_TYPE(Req)>> tryGetReplyFromHostname(RequestStream<Req>* to,
}
ACTOR template <class Req>
Future<ErrorOr<REPLY_TYPE(Req)>> tryGetReplyFromHostname(RequestStream<Req>* to,
Req request,
Future<ErrorOr<REPLY_TYPE(Req)>> tryGetReplyFromHostname(Req request,
Hostname hostname,
WellKnownEndpoints token,
TaskPriority taskID) {
@ -110,8 +106,8 @@ Future<ErrorOr<REPLY_TYPE(Req)>> tryGetReplyFromHostname(RequestStream<Req>* to,
if (!address.present()) {
return ErrorOr<REPLY_TYPE(Req)>(lookup_failed());
}
*to = RequestStream<Req>(Endpoint::wellKnown({ address.get() }, token));
ErrorOr<REPLY_TYPE(Req)> reply = wait(to->tryGetReply(request, taskID));
RequestStream<Req> to(Endpoint::wellKnown({ address.get() }, token));
state ErrorOr<REPLY_TYPE(Req)> reply = wait(to.tryGetReply(request, taskID));
if (reply.isError()) {
resetReply(request);
if (reply.getError().code() == error_code_request_maybe_delivered) {
@ -124,21 +120,21 @@ Future<ErrorOr<REPLY_TYPE(Req)>> tryGetReplyFromHostname(RequestStream<Req>* to,
}
ACTOR template <class Req>
Future<REPLY_TYPE(Req)> retryGetReplyFromHostname(RequestStream<Req>* to,
Req request,
Hostname hostname,
WellKnownEndpoints token) {
Future<REPLY_TYPE(Req)> retryGetReplyFromHostname(Req request, Hostname hostname, WellKnownEndpoints token) {
// Like tryGetReplyFromHostname, except that request_maybe_delivered results in re-resolving the hostname.
// Suitable for use with hostname, where RequestStream is NOT initialized yet.
// Not normally useful for endpoints initialized with NetworkAddress.
state double reconnetInterval = FLOW_KNOBS->HOSTNAME_RECONNECT_INIT_INTERVAL;
loop {
NetworkAddress address = wait(hostname.resolveWithRetry());
*to = RequestStream<Req>(Endpoint::wellKnown({ address }, token));
ErrorOr<REPLY_TYPE(Req)> reply = wait(to->tryGetReply(request));
RequestStream<Req> to(Endpoint::wellKnown({ address }, token));
state ErrorOr<REPLY_TYPE(Req)> reply = wait(to.tryGetReply(request));
if (reply.isError()) {
resetReply(request);
if (reply.getError().code() == error_code_request_maybe_delivered) {
// Connection failure.
wait(delay(reconnetInterval));
reconnetInterval = std::min(2 * reconnetInterval, FLOW_KNOBS->HOSTNAME_RECONNECT_MAX_INTERVAL);
hostname.resetToUnresolved();
INetworkConnections::net()->removeCachedDNS(hostname.host, hostname.service);
} else {
@ -151,22 +147,24 @@ Future<REPLY_TYPE(Req)> retryGetReplyFromHostname(RequestStream<Req>* to,
}
ACTOR template <class Req>
Future<REPLY_TYPE(Req)> retryGetReplyFromHostname(RequestStream<Req>* to,
Req request,
Future<REPLY_TYPE(Req)> retryGetReplyFromHostname(Req request,
Hostname hostname,
WellKnownEndpoints token,
TaskPriority taskID) {
// Like tryGetReplyFromHostname, except that request_maybe_delivered results in re-resolving the hostname.
// Suitable for use with hostname, where RequestStream is NOT initialized yet.
// Not normally useful for endpoints initialized with NetworkAddress.
state double reconnetInterval = FLOW_KNOBS->HOSTNAME_RECONNECT_INIT_INTERVAL;
loop {
NetworkAddress address = wait(hostname.resolveWithRetry());
*to = RequestStream<Req>(Endpoint::wellKnown({ address }, token));
ErrorOr<REPLY_TYPE(Req)> reply = wait(to->tryGetReply(request, taskID));
RequestStream<Req> to(Endpoint::wellKnown({ address }, token));
state ErrorOr<REPLY_TYPE(Req)> reply = wait(to.tryGetReply(request, taskID));
if (reply.isError()) {
resetReply(request);
if (reply.getError().code() == error_code_request_maybe_delivered) {
// Connection failure.
wait(delay(reconnetInterval));
reconnetInterval = std::min(2 * reconnetInterval, FLOW_KNOBS->HOSTNAME_RECONNECT_MAX_INTERVAL);
hostname.resetToUnresolved();
INetworkConnections::net()->removeCachedDNS(hostname.host, hostname.service);
} else {

View File

@ -1097,18 +1097,24 @@ void haltRegisteringOrCurrentSingleton(ClusterControllerData* self,
}
}
void registerWorker(RegisterWorkerRequest req,
ClusterControllerData* self,
std::unordered_set<NetworkAddress> coordinatorAddresses,
ConfigBroadcaster* configBroadcaster) {
ACTOR Future<Void> registerWorker(RegisterWorkerRequest req,
ClusterControllerData* self,
ClusterConnectionString cs,
ConfigBroadcaster* configBroadcaster) {
std::vector<NetworkAddress> coordinatorAddresses = wait(cs.tryResolveHostnames());
const WorkerInterface& w = req.wi;
ProcessClass newProcessClass = req.processClass;
auto info = self->id_worker.find(w.locality.processId());
ClusterControllerPriorityInfo newPriorityInfo = req.priorityInfo;
newPriorityInfo.processClassFitness = newProcessClass.machineClassFitness(ProcessClass::ClusterController);
bool isCoordinator =
(coordinatorAddresses.count(req.wi.address()) > 0) ||
(req.wi.secondaryAddress().present() && coordinatorAddresses.count(req.wi.secondaryAddress().get()) > 0);
(std::find(coordinatorAddresses.begin(), coordinatorAddresses.end(), req.wi.address()) !=
coordinatorAddresses.end()) ||
(req.wi.secondaryAddress().present() &&
std::find(coordinatorAddresses.begin(), coordinatorAddresses.end(), req.wi.secondaryAddress().get()) !=
coordinatorAddresses.end());
for (auto it : req.incompatiblePeers) {
self->db.incompatibleConnections[it] = now() + SERVER_KNOBS->INCOMPATIBLE_PEERS_LOGGING_INTERVAL;
@ -1271,6 +1277,8 @@ void registerWorker(RegisterWorkerRequest req,
if (!req.reply.isSet() && newPriorityInfo != req.priorityInfo) {
req.reply.send(RegisterWorkerReply(newProcessClass, newPriorityInfo));
}
return Void();
}
#define TIME_KEEPER_VERSION LiteralStringRef("1")
@ -2543,30 +2551,12 @@ ACTOR Future<Void> clusterControllerCore(Reference<IClusterConnectionRecord> con
when(RecruitBlobWorkerRequest req = waitNext(interf.recruitBlobWorker.getFuture())) {
clusterRecruitBlobWorker(&self, req);
}
when(state RegisterWorkerRequest req = waitNext(interf.registerWorker.getFuture())) {
when(RegisterWorkerRequest req = waitNext(interf.registerWorker.getFuture())) {
++self.registerWorkerRequests;
state ClusterConnectionString ccs = coordinators.ccr->getConnectionString();
state std::unordered_set<NetworkAddress> coordinatorAddresses;
std::vector<Future<Void>> fs;
for (auto& hostname : ccs.hostnames) {
fs.push_back(map(hostname.resolve(), [&](Optional<NetworkAddress> const& addr) -> Void {
if (addr.present()) {
coordinatorAddresses.insert(addr.get());
}
return Void();
}));
}
wait(waitForAll(fs));
for (const auto& coord : ccs.coordinators()) {
coordinatorAddresses.insert(coord);
}
registerWorker(req,
&self,
coordinatorAddresses,
(configDBType == ConfigDBType::DISABLED) ? nullptr : &configBroadcaster);
self.addActor.send(registerWorker(req,
&self,
coordinators.ccr->getConnectionString(),
(configDBType == ConfigDBType::DISABLED) ? nullptr : &configBroadcaster));
}
when(GetWorkersRequest req = waitNext(interf.getWorkers.getFuture())) {
++self.getWorkersRequests;

View File

@ -75,7 +75,7 @@ struct GenerationRegVal {
}
};
GenerationRegInterface::GenerationRegInterface(NetworkAddress remote)
GenerationRegInterface::GenerationRegInterface(NetworkAddress const& remote)
: read(Endpoint::wellKnown({ remote }, WLTOKEN_GENERATIONREG_READ)),
write(Endpoint::wellKnown({ remote }, WLTOKEN_GENERATIONREG_WRITE)) {}
@ -84,7 +84,7 @@ GenerationRegInterface::GenerationRegInterface(INetwork* local) {
write.makeWellKnownEndpoint(WLTOKEN_GENERATIONREG_WRITE, TaskPriority::Coordination);
}
LeaderElectionRegInterface::LeaderElectionRegInterface(NetworkAddress remote)
LeaderElectionRegInterface::LeaderElectionRegInterface(NetworkAddress const& remote)
: ClientLeaderRegInterface(remote), candidacy(Endpoint::wellKnown({ remote }, WLTOKEN_LEADERELECTIONREG_CANDIDACY)),
electionResult(Endpoint::wellKnown({ remote }, WLTOKEN_LEADERELECTIONREG_ELECTIONRESULT)),
leaderHeartbeat(Endpoint::wellKnown({ remote }, WLTOKEN_LEADERELECTIONREG_LEADERHEARTBEAT)),

View File

@ -53,9 +53,9 @@ struct GenerationRegInterface {
// the v2 of the previous generation is the v1 of the next.
GenerationRegInterface() {}
GenerationRegInterface(NetworkAddress remote);
GenerationRegInterface(NetworkAddress const& remote);
GenerationRegInterface(INetwork* local);
GenerationRegInterface(Hostname hostname) : hostname(hostname){};
GenerationRegInterface(Hostname const& hostname) : hostname(hostname){};
};
struct UniqueGeneration {
@ -128,9 +128,9 @@ struct LeaderElectionRegInterface : ClientLeaderRegInterface {
RequestStream<struct ForwardRequest> forward;
LeaderElectionRegInterface() {}
LeaderElectionRegInterface(NetworkAddress remote);
LeaderElectionRegInterface(NetworkAddress const& remote);
LeaderElectionRegInterface(INetwork* local);
LeaderElectionRegInterface(Hostname hostname) : ClientLeaderRegInterface(hostname) {}
LeaderElectionRegInterface(Hostname const& hostname) : ClientLeaderRegInterface(hostname) {}
};
struct CandidacyRequest {
@ -220,7 +220,7 @@ class ConfigNode;
class ServerCoordinators : public ClientCoordinators {
public:
explicit ServerCoordinators(Reference<IClusterConnectionRecord>);
explicit ServerCoordinators(Reference<IClusterConnectionRecord> ccr);
std::vector<LeaderElectionRegInterface> leaderElectionServers;
std::vector<GenerationRegInterface> stateServers;

View File

@ -242,7 +242,9 @@ ACTOR Future<int> spawnProcess(std::string binPath,
static auto fork_child(const std::string& path, std::vector<char*>& paramList) {
int pipefd[2];
pipe(pipefd);
if (pipe(pipefd) != 0) {
return std::make_pair(-1, Optional<int>{});
}
auto readFD = pipefd[0];
auto writeFD = pipefd[1];
pid_t pid = fork();

View File

@ -51,7 +51,7 @@ ACTOR Future<Void> submitCandidacy(Key key,
.detail("OldAddr", coord.candidacy.getEndpoint().getPrimaryAddress().toString());
if (rep.getError().code() == error_code_request_maybe_delivered) {
// Delay to prevent tight resolving loop due to outdated DNS cache
wait(delay(FLOW_KNOBS->HOSTNAME_RESOLVE_DELAY));
wait(delay(FLOW_KNOBS->HOSTNAME_RECONNECT_INIT_INTERVAL));
throw coordinators_changed();
} else {
throw rep.getError();

View File

@ -663,7 +663,7 @@ static void printUsage(const char* name, bool devhelp) {
" collector_endpoint -- IP:PORT of the fluentd server\n"
" collector_protocol -- UDP or TCP (default is UDP)");
#ifndef TLS_DISABLED
printf(TLS_HELP);
printf("%s", TLS_HELP);
#endif
printOptionUsage("-v, --version", "Print version information and exit.");
printOptionUsage("-h, -?, --help", "Display this help and exit.");

View File

@ -121,7 +121,6 @@ struct EncryptionOpsWorkload : TestWorkload {
EncryptCipherDomainId maxDomainId;
EncryptCipherBaseKeyId minBaseCipherId;
EncryptCipherBaseKeyId headerBaseCipherId;
EncryptCipherRandomSalt headerRandomSalt;
EncryptionOpsWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
mode = getOption(options, LiteralStringRef("fixedSize"), 1);
@ -135,7 +134,6 @@ struct EncryptionOpsWorkload : TestWorkload {
maxDomainId = deterministicRandom()->randomInt(minDomainId, minDomainId + 10) + 5;
minBaseCipherId = 100;
headerBaseCipherId = wcx.clientId * 100 + 1;
headerRandomSalt = wcx.clientId * 100 + 1;
metrics = std::make_unique<WorkloadMetrics>();
@ -185,8 +183,7 @@ struct EncryptionOpsWorkload : TestWorkload {
// insert the Encrypt Header cipherKey
generateRandomBaseCipher(AES_256_KEY_LENGTH, &buff[0], &cipherLen);
cipherKeyCache->insertCipherKey(
ENCRYPT_HEADER_DOMAIN_ID, headerBaseCipherId, buff, cipherLen, headerRandomSalt);
cipherKeyCache->insertCipherKey(ENCRYPT_HEADER_DOMAIN_ID, headerBaseCipherId, buff, cipherLen);
TraceEvent("SetupCipherEssentials_Done").detail("MinDomainId", minDomainId).detail("MaxDomainId", maxDomainId);
}
@ -212,29 +209,6 @@ struct EncryptionOpsWorkload : TestWorkload {
TraceEvent("UpdateBaseCipher").detail("DomainId", encryptDomainId).detail("BaseCipherId", *nextBaseCipherId);
}
Reference<BlobCipherKey> getEncryptionKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCipherId,
const EncryptCipherRandomSalt& salt) {
const bool simCacheMiss = deterministicRandom()->randomInt(1, 100) < 15;
Reference<BlobCipherKeyCache> cipherKeyCache = BlobCipherKeyCache::getInstance();
Reference<BlobCipherKey> cipherKey = cipherKeyCache->getCipherKey(domainId, baseCipherId, salt);
if (simCacheMiss) {
TraceEvent("SimKeyCacheMiss").detail("EncyrptDomainId", domainId).detail("BaseCipherId", baseCipherId);
// simulate KeyCache miss that may happen during decryption; insert a CipherKey with known 'salt'
cipherKeyCache->insertCipherKey(domainId,
baseCipherId,
cipherKey->rawBaseCipher(),
cipherKey->getBaseCipherLen(),
cipherKey->getSalt());
// Ensure the update was a NOP
Reference<BlobCipherKey> cKey = cipherKeyCache->getCipherKey(domainId, baseCipherId, salt);
ASSERT(cKey->isEqual(cipherKey));
}
return cipherKey;
}
Reference<EncryptBuf> doEncryption(Reference<BlobCipherKey> textCipherKey,
Reference<BlobCipherKey> headerCipherKey,
uint8_t* payload,
@ -266,12 +240,11 @@ struct EncryptionOpsWorkload : TestWorkload {
ASSERT_EQ(header.flags.headerVersion, EncryptBlobCipherAes265Ctr::ENCRYPT_HEADER_VERSION);
ASSERT_EQ(header.flags.encryptMode, ENCRYPT_CIPHER_MODE_AES_256_CTR);
Reference<BlobCipherKey> cipherKey = getEncryptionKey(header.cipherTextDetails.encryptDomainId,
header.cipherTextDetails.baseCipherId,
header.cipherTextDetails.salt);
Reference<BlobCipherKey> headerCipherKey = getEncryptionKey(header.cipherHeaderDetails.encryptDomainId,
header.cipherHeaderDetails.baseCipherId,
header.cipherHeaderDetails.salt);
Reference<BlobCipherKeyCache> cipherKeyCache = BlobCipherKeyCache::getInstance();
Reference<BlobCipherKey> cipherKey = cipherKeyCache->getCipherKey(header.cipherTextDetails.encryptDomainId,
header.cipherTextDetails.baseCipherId);
Reference<BlobCipherKey> headerCipherKey = cipherKeyCache->getCipherKey(
header.cipherHeaderDetails.encryptDomainId, header.cipherHeaderDetails.baseCipherId);
ASSERT(cipherKey.isValid());
ASSERT(cipherKey->isEqual(orgCipherKey));
@ -324,7 +297,7 @@ struct EncryptionOpsWorkload : TestWorkload {
Reference<BlobCipherKey> cipherKey = cipherKeyCache->getLatestCipherKey(encryptDomainId);
// Each client working with their own version of encryptHeaderCipherKey, avoid using getLatest()
Reference<BlobCipherKey> headerCipherKey =
cipherKeyCache->getCipherKey(ENCRYPT_HEADER_DOMAIN_ID, headerBaseCipherId, headerRandomSalt);
cipherKeyCache->getCipherKey(ENCRYPT_HEADER_DOMAIN_ID, headerBaseCipherId);
auto end = std::chrono::high_resolution_clock::now();
metrics->updateKeyDerivationTime(std::chrono::duration<double, std::nano>(end - start).count());

View File

@ -19,7 +19,6 @@
*/
#include "flow/BlobCipher.h"
#include "flow/EncryptUtils.h"
#include "flow/Knobs.h"
#include "flow/Error.h"
@ -33,7 +32,6 @@
#include <cstring>
#include <memory>
#include <string>
#include <utility>
#if ENCRYPTION_ENABLED
@ -56,14 +54,12 @@ BlobCipherKey::BlobCipherKey(const EncryptCipherDomainId& domainId,
salt = nondeterministicRandom()->randomUInt64();
}
initKey(domainId, baseCiph, baseCiphLen, baseCiphId, salt);
}
BlobCipherKey::BlobCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCiphId,
const uint8_t* baseCiph,
int baseCiphLen,
const EncryptCipherRandomSalt& salt) {
initKey(domainId, baseCiph, baseCiphLen, baseCiphId, salt);
/*TraceEvent("BlobCipherKey")
.detail("DomainId", domainId)
.detail("BaseCipherId", baseCipherId)
.detail("BaseCipherLen", baseCipherLen)
.detail("RandomSalt", randomSalt)
.detail("CreationTime", creationTime);*/
}
void BlobCipherKey::initKey(const EncryptCipherDomainId& domainId,
@ -86,13 +82,6 @@ void BlobCipherKey::initKey(const EncryptCipherDomainId& domainId,
applyHmacSha256Derivation();
// update the key creation time
creationTime = now();
TraceEvent("BlobCipherKey")
.detail("DomainId", domainId)
.detail("BaseCipherId", baseCipherId)
.detail("BaseCipherLen", baseCipherLen)
.detail("RandomSalt", randomSalt)
.detail("CreationTime", creationTime);
}
void BlobCipherKey::applyHmacSha256Derivation() {
@ -123,77 +112,25 @@ BlobCipherKeyIdCache::BlobCipherKeyIdCache(EncryptCipherDomainId dId)
TraceEvent("Init_BlobCipherKeyIdCache").detail("DomainId", domainId);
}
BlobCipherKeyIdCacheKey BlobCipherKeyIdCache::getCacheKey(const EncryptCipherBaseKeyId& baseCipherKeyId,
const EncryptCipherRandomSalt& salt) {
return std::make_pair(baseCipherKeyId, salt);
}
Reference<BlobCipherKey> BlobCipherKeyIdCache::getLatestCipherKey() {
return getCipherByBaseCipherId(latestBaseCipherKeyId, latestRandomSalt);
return getCipherByBaseCipherId(latestBaseCipherKeyId);
}
Reference<BlobCipherKey> BlobCipherKeyIdCache::getCipherByBaseCipherId(const EncryptCipherBaseKeyId& baseCipherKeyId,
const EncryptCipherRandomSalt& salt) {
BlobCipherKeyIdCacheMapCItr itr = keyIdCache.find(getCacheKey(baseCipherKeyId, salt));
Reference<BlobCipherKey> BlobCipherKeyIdCache::getCipherByBaseCipherId(EncryptCipherBaseKeyId baseCipherKeyId) {
BlobCipherKeyIdCacheMapCItr itr = keyIdCache.find(baseCipherKeyId);
if (itr == keyIdCache.end()) {
TraceEvent("CipherByBaseCipherId_KeyMissing")
.detail("DomainId", domainId)
.detail("BaseCipherId", baseCipherKeyId)
.detail("Salt", salt);
throw encrypt_key_not_found();
}
return itr->second;
}
void BlobCipherKeyIdCache::insertBaseCipherKey(const EncryptCipherBaseKeyId& baseCipherId,
void BlobCipherKeyIdCache::insertBaseCipherKey(EncryptCipherBaseKeyId baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen) {
ASSERT_GT(baseCipherId, ENCRYPT_INVALID_CIPHER_KEY_ID);
// BaseCipherKeys are immutable, given the routine invocation updates 'latestCipher',
// ensure no key-tampering is done
try {
Reference<BlobCipherKey> cipherKey = getLatestCipherKey();
if (cipherKey->getBaseCipherId() == baseCipherId) {
if (memcmp(cipherKey->rawBaseCipher(), baseCipher, baseCipherLen) == 0) {
TraceEvent("InsertBaseCipherKey_AlreadyPresent")
.detail("BaseCipherKeyId", baseCipherId)
.detail("DomainId", domainId);
// Key is already present; nothing more to do.
return;
} else {
TraceEvent("InsertBaseCipherKey_UpdateCipher")
.detail("BaseCipherKeyId", baseCipherId)
.detail("DomainId", domainId);
throw encrypt_update_cipher();
}
}
} catch (Error& e) {
if (e.code() != error_code_encrypt_key_not_found) {
throw e;
}
}
Reference<BlobCipherKey> cipherKey =
makeReference<BlobCipherKey>(domainId, baseCipherId, baseCipher, baseCipherLen);
BlobCipherKeyIdCacheKey cacheKey = getCacheKey(cipherKey->getBaseCipherId(), cipherKey->getSalt());
keyIdCache.emplace(cacheKey, cipherKey);
// Update the latest BaseCipherKeyId for the given encryption domain
latestBaseCipherKeyId = baseCipherId;
latestRandomSalt = cipherKey->getSalt();
}
void BlobCipherKeyIdCache::insertBaseCipherKey(const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen,
const EncryptCipherRandomSalt& salt) {
ASSERT_GT(baseCipherId, ENCRYPT_INVALID_CIPHER_KEY_ID);
BlobCipherKeyIdCacheKey cacheKey = getCacheKey(baseCipherId, salt);
// BaseCipherKeys are immutable, ensure that cached value doesn't get updated.
BlobCipherKeyIdCacheMapCItr itr = keyIdCache.find(cacheKey);
BlobCipherKeyIdCacheMapCItr itr = keyIdCache.find(baseCipherId);
if (itr != keyIdCache.end()) {
if (memcmp(itr->second->rawBaseCipher(), baseCipher, baseCipherLen) == 0) {
TraceEvent("InsertBaseCipherKey_AlreadyPresent")
@ -209,9 +146,9 @@ void BlobCipherKeyIdCache::insertBaseCipherKey(const EncryptCipherBaseKeyId& bas
}
}
Reference<BlobCipherKey> cipherKey =
makeReference<BlobCipherKey>(domainId, baseCipherId, baseCipher, baseCipherLen, salt);
keyIdCache.emplace(cacheKey, cipherKey);
keyIdCache.emplace(baseCipherId, makeReference<BlobCipherKey>(domainId, baseCipherId, baseCipher, baseCipherLen));
// Update the latest BaseCipherKeyId for the given encryption domain
latestBaseCipherKeyId = baseCipherId;
}
void BlobCipherKeyIdCache::cleanup() {
@ -260,41 +197,6 @@ void BlobCipherKeyCache::insertCipherKey(const EncryptCipherDomainId& domainId,
}
}
void BlobCipherKeyCache::insertCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen,
const EncryptCipherRandomSalt& salt) {
if (domainId == ENCRYPT_INVALID_DOMAIN_ID || baseCipherId == ENCRYPT_INVALID_CIPHER_KEY_ID) {
throw encrypt_invalid_id();
}
try {
auto domainItr = domainCacheMap.find(domainId);
if (domainItr == domainCacheMap.end()) {
// Add mapping to track new encryption domain
Reference<BlobCipherKeyIdCache> keyIdCache = makeReference<BlobCipherKeyIdCache>(domainId);
keyIdCache->insertBaseCipherKey(baseCipherId, baseCipher, baseCipherLen, salt);
domainCacheMap.emplace(domainId, keyIdCache);
} else {
// Track new baseCipher keys
Reference<BlobCipherKeyIdCache> keyIdCache = domainItr->second;
keyIdCache->insertBaseCipherKey(baseCipherId, baseCipher, baseCipherLen, salt);
}
TraceEvent("InsertCipherKey")
.detail("DomainId", domainId)
.detail("BaseCipherKeyId", baseCipherId)
.detail("Salt", salt);
} catch (Error& e) {
TraceEvent("InsertCipherKey_Failed")
.detail("BaseCipherKeyId", baseCipherId)
.detail("DomainId", domainId)
.detail("Salt", salt);
throw;
}
}
Reference<BlobCipherKey> BlobCipherKeyCache::getLatestCipherKey(const EncryptCipherDomainId& domainId) {
auto domainItr = domainCacheMap.find(domainId);
if (domainItr == domainCacheMap.end()) {
@ -315,19 +217,17 @@ Reference<BlobCipherKey> BlobCipherKeyCache::getLatestCipherKey(const EncryptCip
}
Reference<BlobCipherKey> BlobCipherKeyCache::getCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCipherId,
const EncryptCipherRandomSalt& salt) {
const EncryptCipherBaseKeyId& baseCipherId) {
auto domainItr = domainCacheMap.find(domainId);
if (domainItr == domainCacheMap.end()) {
TraceEvent("GetCipherKey_MissingDomainId").detail("DomainId", domainId);
throw encrypt_key_not_found();
}
Reference<BlobCipherKeyIdCache> keyIdCache = domainItr->second;
return keyIdCache->getCipherByBaseCipherId(baseCipherId, salt);
return keyIdCache->getCipherByBaseCipherId(baseCipherId);
}
void BlobCipherKeyCache::resetEncryptDomainId(const EncryptCipherDomainId domainId) {
void BlobCipherKeyCache::resetEncyrptDomainId(const EncryptCipherDomainId domainId) {
auto domainItr = domainCacheMap.find(domainId);
if (domainItr == domainCacheMap.end()) {
throw encrypt_key_not_found();
@ -391,8 +291,8 @@ Reference<EncryptBuf> EncryptBlobCipherAes265Ctr::encrypt(const uint8_t* plainte
memset(reinterpret_cast<uint8_t*>(header), 0, sizeof(BlobCipherEncryptHeader));
// Alloc buffer computation accounts for 'header authentication' generation scheme. If single-auth-token needs
// to be generated, allocate buffer sufficient to append header to the cipherText optimizing memcpy cost.
// Alloc buffer computation accounts for 'header authentication' generation scheme. If single-auth-token needs to be
// generated, allocate buffer sufficient to append header to the cipherText optimizing memcpy cost.
const int allocSize = authTokenMode == ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE
? plaintextLen + AES_BLOCK_SIZE + sizeof(BlobCipherEncryptHeader)
@ -440,7 +340,6 @@ Reference<EncryptBuf> EncryptBlobCipherAes265Ctr::encrypt(const uint8_t* plainte
// Populate header encryption-key details
header->cipherHeaderDetails.encryptDomainId = headerCipherKey->getDomainId();
header->cipherHeaderDetails.baseCipherId = headerCipherKey->getBaseCipherId();
header->cipherHeaderDetails.salt = headerCipherKey->getSalt();
// Populate header authToken details
if (header->flags.authTokenMode == ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE) {
@ -725,8 +624,8 @@ void forceLinkBlobCipherTests() {}
// 3. Inserting of 'identical' cipherKey (already cached) more than once works as desired.
// 4. Inserting of 'non-identical' cipherKey (already cached) more than once works as desired.
// 5. Validation encryption ops (correctness):
// 5.1. Encrypt a buffer followed by decryption of the buffer, validate the contents.
// 5.2. Simulate anomalies such as: EncryptionHeader corruption, authToken mismatch / encryptionMode mismatch etc.
// 5.1. Encyrpt a buffer followed by decryption of the buffer, validate the contents.
// 5.2. Simulate anomalies such as: EncyrptionHeader corruption, authToken mismatch / encryptionMode mismatch etc.
// 6. Cache cleanup
// 6.1 cleanup cipherKeys by given encryptDomainId
// 6.2. Cleanup all cached cipherKeys
@ -740,7 +639,6 @@ TEST_CASE("flow/BlobCipher") {
int len;
EncryptCipherBaseKeyId keyId;
std::unique_ptr<uint8_t[]> key;
EncryptCipherRandomSalt generatedSalt;
BaseCipher(const EncryptCipherDomainId& dId, const EncryptCipherBaseKeyId& kId)
: domainId(dId), len(deterministicRandom()->randomInt(AES_256_KEY_LENGTH / 2, AES_256_KEY_LENGTH + 1)),
@ -773,8 +671,6 @@ TEST_CASE("flow/BlobCipher") {
cipherKeyCache->insertCipherKey(
baseCipher->domainId, baseCipher->keyId, baseCipher->key.get(), baseCipher->len);
Reference<BlobCipherKey> fetchedKey = cipherKeyCache->getLatestCipherKey(baseCipher->domainId);
baseCipher->generatedSalt = fetchedKey->getSalt();
}
}
// insert EncryptHeader BlobCipher key
@ -788,8 +684,7 @@ TEST_CASE("flow/BlobCipher") {
for (auto& domainItr : domainKeyMap) {
for (auto& baseKeyItr : domainItr.second) {
Reference<BaseCipher> baseCipher = baseKeyItr.second;
Reference<BlobCipherKey> cipherKey =
cipherKeyCache->getCipherKey(baseCipher->domainId, baseCipher->keyId, baseCipher->generatedSalt);
Reference<BlobCipherKey> cipherKey = cipherKeyCache->getCipherKey(baseCipher->domainId, baseCipher->keyId);
ASSERT(cipherKey.isValid());
// validate common cipher properties - domainId, baseCipherId, baseCipherLen, rawBaseCipher
ASSERT_EQ(cipherKey->getBaseCipherId(), baseCipher->keyId);
@ -864,8 +759,7 @@ TEST_CASE("flow/BlobCipher") {
.detail("BaseCipherId", header.cipherTextDetails.baseCipherId);
Reference<BlobCipherKey> tCipherKeyKey = cipherKeyCache->getCipherKey(header.cipherTextDetails.encryptDomainId,
header.cipherTextDetails.baseCipherId,
header.cipherTextDetails.salt);
header.cipherTextDetails.baseCipherId);
ASSERT(tCipherKeyKey->isEqual(cipherKey));
DecryptBlobCipherAes256Ctr decryptor(
tCipherKeyKey, Reference<BlobCipherKey>(), &header.cipherTextDetails.iv[0]);
@ -952,11 +846,9 @@ TEST_CASE("flow/BlobCipher") {
StringRef(arena, &header.singleAuthToken.authToken[0], AUTH_TOKEN_SIZE).toString());
Reference<BlobCipherKey> tCipherKeyKey = cipherKeyCache->getCipherKey(header.cipherTextDetails.encryptDomainId,
header.cipherTextDetails.baseCipherId,
header.cipherTextDetails.salt);
header.cipherTextDetails.baseCipherId);
Reference<BlobCipherKey> hCipherKey = cipherKeyCache->getCipherKey(header.cipherHeaderDetails.encryptDomainId,
header.cipherHeaderDetails.baseCipherId,
header.cipherHeaderDetails.salt);
header.cipherHeaderDetails.baseCipherId);
ASSERT(tCipherKeyKey->isEqual(cipherKey));
DecryptBlobCipherAes256Ctr decryptor(tCipherKeyKey, hCipherKey, &header.cipherTextDetails.iv[0]);
Reference<EncryptBuf> decrypted = decryptor.decrypt(encrypted->begin(), bufLen, header, arena);
@ -1057,11 +949,9 @@ TEST_CASE("flow/BlobCipher") {
StringRef(arena, &header.singleAuthToken.authToken[0], AUTH_TOKEN_SIZE).toString());
Reference<BlobCipherKey> tCipherKey = cipherKeyCache->getCipherKey(header.cipherTextDetails.encryptDomainId,
header.cipherTextDetails.baseCipherId,
header.cipherTextDetails.salt);
header.cipherTextDetails.baseCipherId);
Reference<BlobCipherKey> hCipherKey = cipherKeyCache->getCipherKey(header.cipherHeaderDetails.encryptDomainId,
header.cipherHeaderDetails.baseCipherId,
header.cipherHeaderDetails.salt);
header.cipherHeaderDetails.baseCipherId);
ASSERT(tCipherKey->isEqual(cipherKey));
DecryptBlobCipherAes256Ctr decryptor(tCipherKey, hCipherKey, &header.cipherTextDetails.iv[0]);
@ -1157,7 +1047,7 @@ TEST_CASE("flow/BlobCipher") {
// Validate dropping encyrptDomainId cached keys
const EncryptCipherDomainId candidate = deterministicRandom()->randomInt(minDomainId, maxDomainId);
cipherKeyCache->resetEncryptDomainId(candidate);
cipherKeyCache->resetEncyrptDomainId(candidate);
std::vector<Reference<BlobCipherKey>> cachedKeys = cipherKeyCache->getAllCiphers(candidate);
ASSERT(cachedKeys.empty());

View File

@ -82,11 +82,11 @@ private:
// This header is persisted along with encrypted buffer, it contains information necessary
// to assist decrypting the buffers to serve read requests.
//
// The total space overhead is 104 bytes.
// The total space overhead is 96 bytes.
#pragma pack(push, 1) // exact fit - no padding
typedef struct BlobCipherEncryptHeader {
static constexpr int headerSize = 104;
static constexpr int headerSize = 96;
union {
struct {
uint8_t size; // reading first byte is sufficient to determine header
@ -101,7 +101,7 @@ typedef struct BlobCipherEncryptHeader {
// Cipher text encryption information
struct {
// Encryption domain boundary identifier.
// Encyrption domain boundary identifier.
EncryptCipherDomainId encryptDomainId{};
// BaseCipher encryption key identifier
EncryptCipherBaseKeyId baseCipherId{};
@ -116,8 +116,6 @@ typedef struct BlobCipherEncryptHeader {
EncryptCipherDomainId encryptDomainId{};
// BaseCipher encryption key identifier.
EncryptCipherBaseKeyId baseCipherId{};
// Random salt
EncryptCipherRandomSalt salt{};
} cipherHeaderDetails;
// Encryption header is stored as plaintext on a persistent storage to assist reconstruction of cipher-key(s) for
@ -166,11 +164,6 @@ public:
const EncryptCipherBaseKeyId& baseCiphId,
const uint8_t* baseCiph,
int baseCiphLen);
BlobCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCiphId,
const uint8_t* baseCiph,
int baseCiphLen,
const EncryptCipherRandomSalt& salt);
uint8_t* data() const { return cipher.get(); }
uint64_t getCreationTime() const { return creationTime; }
@ -213,7 +206,7 @@ private:
// This interface allows FDB processes participating in encryption to store and
// index recently used encyption cipher keys. FDB encryption has two dimensions:
// 1. Mapping on cipher encryption keys per "encryption domains"
// 2. Per encryption domain, the cipher keys are index using {baseCipherKeyId, salt} tuple.
// 2. Per encryption domain, the cipher keys are index using "baseCipherKeyId".
//
// The design supports NIST recommendation of limiting lifetime of an encryption
// key. For details refer to:
@ -221,10 +214,10 @@ private:
//
// Below gives a pictoral representation of in-memory datastructure implemented
// to index encryption keys:
// { encryptionDomain -> { {baseCipherId, salt} -> cipherKey } }
// { encryptionDomain -> { baseCipherId -> cipherKey } }
//
// Supported cache lookups schemes:
// 1. Lookup cipher based on { encryptionDomainId, baseCipherKeyId, salt } triplet.
// 1. Lookup cipher based on { encryptionDomainId, baseCipherKeyId } tuple.
// 2. Lookup latest cipher key for a given encryptionDomainId.
//
// Client is responsible to handle cache-miss usecase, the corrective operation
@ -233,29 +226,15 @@ private:
// required encryption key, however, CPs/SSs cache-miss would result in RPC to
// EncryptKeyServer to refresh the desired encryption key.
struct pair_hash {
template <class T1, class T2>
std::size_t operator()(const std::pair<T1, T2>& pair) const {
auto hash1 = std::hash<T1>{}(pair.first);
auto hash2 = std::hash<T2>{}(pair.second);
// Equal hashes XOR would be ZERO.
return hash1 == hash2 ? hash1 : hash1 ^ hash2;
}
};
using BlobCipherKeyIdCacheKey = std::pair<EncryptCipherBaseKeyId, EncryptCipherRandomSalt>;
using BlobCipherKeyIdCacheMap = std::unordered_map<BlobCipherKeyIdCacheKey, Reference<BlobCipherKey>, pair_hash>;
using BlobCipherKeyIdCacheMap = std::unordered_map<EncryptCipherBaseKeyId, Reference<BlobCipherKey>>;
using BlobCipherKeyIdCacheMapCItr =
std::unordered_map<BlobCipherKeyIdCacheKey, Reference<BlobCipherKey>, pair_hash>::const_iterator;
std::unordered_map<EncryptCipherBaseKeyId, Reference<BlobCipherKey>>::const_iterator;
struct BlobCipherKeyIdCache : ReferenceCounted<BlobCipherKeyIdCache> {
public:
BlobCipherKeyIdCache();
explicit BlobCipherKeyIdCache(EncryptCipherDomainId dId);
BlobCipherKeyIdCacheKey getCacheKey(const EncryptCipherBaseKeyId& baseCipherId,
const EncryptCipherRandomSalt& salt);
// API returns the last inserted cipherKey.
// If none exists, 'encrypt_key_not_found' is thrown.
@ -264,33 +243,14 @@ public:
// API returns cipherKey corresponding to input 'baseCipherKeyId'.
// If none exists, 'encrypt_key_not_found' is thrown.
Reference<BlobCipherKey> getCipherByBaseCipherId(const EncryptCipherBaseKeyId& baseCipherKeyId,
const EncryptCipherRandomSalt& salt);
Reference<BlobCipherKey> getCipherByBaseCipherId(EncryptCipherBaseKeyId baseCipherKeyId);
// API enables inserting base encryption cipher details to the BlobCipherKeyIdCache.
// Given cipherKeys are immutable, attempting to re-insert same 'identical' cipherKey
// is treated as a NOP (success), however, an attempt to update cipherKey would throw
// 'encrypt_update_cipher' exception.
//
// API NOTE: Recommended usecase is to update encryption cipher-key is updated the external
// keyManagementSolution to limit an encryption key lifetime
void insertBaseCipherKey(const EncryptCipherBaseKeyId& baseCipherId, const uint8_t* baseCipher, int baseCipherLen);
// API enables inserting base encryption cipher details to the BlobCipherKeyIdCache
// Given cipherKeys are immutable, attempting to re-insert same 'identical' cipherKey
// is treated as a NOP (success), however, an attempt to update cipherKey would throw
// 'encrypt_update_cipher' exception.
//
// API NOTE: Recommended usecase is to update encryption cipher-key regeneration while performing
// decryption. The encryptionheader would contain relevant details including: 'encryptDomainId',
// 'baseCipherId' & 'salt'. The caller needs to fetch 'baseCipherKey' detail and re-populate KeyCache.
// Also, the invocation will NOT update the latest cipher-key details.
void insertBaseCipherKey(const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen,
const EncryptCipherRandomSalt& salt);
void insertBaseCipherKey(EncryptCipherBaseKeyId baseCipherId, const uint8_t* baseCipher, int baseCipherLen);
// API cleanup the cache by dropping all cached cipherKeys
void cleanup();
@ -302,7 +262,6 @@ private:
EncryptCipherDomainId domainId;
BlobCipherKeyIdCacheMap keyIdCache;
EncryptCipherBaseKeyId latestBaseCipherKeyId;
EncryptCipherRandomSalt latestRandomSalt;
};
using BlobCipherDomainCacheMap = std::unordered_map<EncryptCipherDomainId, Reference<BlobCipherKeyIdCache>>;
@ -318,32 +277,12 @@ public:
// The cipherKeys are indexed using 'baseCipherId', given cipherKeys are immutable,
// attempting to re-insert same 'identical' cipherKey is treated as a NOP (success),
// however, an attempt to update cipherKey would throw 'encrypt_update_cipher' exception.
//
// API NOTE: Recommended usecase is to update encryption cipher-key is updated the external
// keyManagementSolution to limit an encryption key lifetime
void insertCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen);
// Enable clients to insert base encryption cipher details to the BlobCipherKeyCache.
// The cipherKeys are indexed using 'baseCipherId', given cipherKeys are immutable,
// attempting to re-insert same 'identical' cipherKey is treated as a NOP (success),
// however, an attempt to update cipherKey would throw 'encrypt_update_cipher' exception.
//
// API NOTE: Recommended usecase is to update encryption cipher-key regeneration while performing
// decryption. The encryptionheader would contain relevant details including: 'encryptDomainId',
// 'baseCipherId' & 'salt'. The caller needs to fetch 'baseCipherKey' detail and re-populate KeyCache.
// Also, the invocation will NOT update the latest cipher-key details.
void insertCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCipherId,
const uint8_t* baseCipher,
int baseCipherLen,
const EncryptCipherRandomSalt& salt);
// API returns the last insert cipherKey for a given encryption domain Id.
// API returns the last insert cipherKey for a given encyryption domain Id.
// If none exists, it would throw 'encrypt_key_not_found' exception.
Reference<BlobCipherKey> getLatestCipherKey(const EncryptCipherDomainId& domainId);
@ -352,16 +291,14 @@ public:
// If none exists, it would throw 'encrypt_key_not_found' exception.
Reference<BlobCipherKey> getCipherKey(const EncryptCipherDomainId& domainId,
const EncryptCipherBaseKeyId& baseCipherId,
const EncryptCipherRandomSalt& salt);
const EncryptCipherBaseKeyId& baseCipherId);
// API returns point in time list of all 'cached' cipherKeys for a given encryption domainId.
std::vector<Reference<BlobCipherKey>> getAllCiphers(const EncryptCipherDomainId& domainId);
// API enables dropping all 'cached' cipherKeys for a given encryption domain Id.
// Useful to cleanup cache if an encryption domain gets removed/destroyed etc.
void resetEncryptDomainId(const EncryptCipherDomainId domainId);
void resetEncyrptDomainId(const EncryptCipherDomainId domainId);
static Reference<BlobCipherKeyCache> getInstance() {
if (g_network->isSimulated()) {
@ -427,7 +364,7 @@ public:
const BlobCipherEncryptHeader& header,
Arena&);
// Enable caller to validate encryption header auth-token (if available) without needing to read the full encrypted
// Enable caller to validate encryption header auth-token (if available) without needing to read the full encyrpted
// payload. The call is NOP unless header.flags.authTokenMode == ENCRYPT_HEADER_AUTH_TOKEN_MODE_MULTI.
void verifyHeaderAuthToken(const BlobCipherEncryptHeader& header, Arena& arena);

View File

@ -84,13 +84,15 @@ ACTOR Future<Optional<NetworkAddress>> resolveImpl(Hostname* self) {
}
ACTOR Future<NetworkAddress> resolveWithRetryImpl(Hostname* self) {
state double resolveInterval = FLOW_KNOBS->HOSTNAME_RESOLVE_INIT_INTERVAL;
loop {
try {
Optional<NetworkAddress> address = wait(resolveImpl(self));
if (address.present()) {
return address.get();
}
wait(delay(FLOW_KNOBS->HOSTNAME_RESOLVE_DELAY));
wait(delay(resolveInterval));
resolveInterval = std::min(2 * resolveInterval, FLOW_KNOBS->HOSTNAME_RESOLVE_MAX_INTERVAL);
} catch (Error& e) {
ASSERT(e.code() == error_code_actor_cancelled);
throw;

View File

@ -74,6 +74,7 @@ struct Hostname {
Optional<NetworkAddress> resolvedAddress;
enum HostnameStatus { UNRESOLVED, RESOLVING, RESOLVED };
// The resolve functions below use DNS cache.
Future<Optional<NetworkAddress>> resolve();
Future<NetworkAddress> resolveWithRetry();
Optional<NetworkAddress> resolveBlocking(); // This one should only be used when resolving asynchronously is
@ -81,6 +82,11 @@ struct Hostname {
void resetToUnresolved();
HostnameStatus status = UNRESOLVED;
AsyncTrigger resolveFinish;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, host, service, isTLS, resolvedAddress, status);
}
};
#endif

View File

@ -40,7 +40,10 @@ FlowKnobs const* FLOW_KNOBS = &bootstrapGlobalFlowKnobs;
void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
init( AUTOMATIC_TRACE_DUMP, 1 );
init( PREVENT_FAST_SPIN_DELAY, .01 );
init( HOSTNAME_RESOLVE_DELAY, .05 );
init( HOSTNAME_RESOLVE_INIT_INTERVAL, .05 );
init( HOSTNAME_RESOLVE_MAX_INTERVAL, 1.0 );
init( HOSTNAME_RECONNECT_INIT_INTERVAL, .05 );
init( HOSTNAME_RECONNECT_MAX_INTERVAL, 1.0 );
init( CACHE_REFRESH_INTERVAL_WHEN_ALL_ALTERNATIVES_FAILED, 1.0 );
init( DELAY_JITTER_OFFSET, 0.9 );

View File

@ -113,7 +113,10 @@ class FlowKnobs : public KnobsImpl<FlowKnobs> {
public:
int AUTOMATIC_TRACE_DUMP;
double PREVENT_FAST_SPIN_DELAY;
double HOSTNAME_RESOLVE_DELAY;
double HOSTNAME_RESOLVE_INIT_INTERVAL;
double HOSTNAME_RESOLVE_MAX_INTERVAL;
double HOSTNAME_RECONNECT_INIT_INTERVAL;
double HOSTNAME_RECONNECT_MAX_INTERVAL;
double CACHE_REFRESH_INTERVAL_WHEN_ALL_ALTERNATIVES_FAILED;
double DELAY_JITTER_OFFSET;