1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-05-31 18:19:35 +08:00

Merge pull request from sfc-gh-ahusain/ahusain-encryptServerRole

Add new FDB EncryptKeyProxy role
This commit is contained in:
Evan Tschannen 2022-01-26 14:13:56 -08:00 committed by GitHub
commit bb082344e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 448 additions and 9 deletions

@ -97,6 +97,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"ratekeeper",
"blob_manager",
"blob_worker",
"encrypt_key_proxy",
"storage_cache",
"router",
"coordinator"
@ -497,6 +498,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"unreachable_dataDistributor_worker",
"unreachable_ratekeeper_worker",
"unreachable_blobManager_worker",
"unreachable_encryptKeyProxy_worker",
"unreadable_configuration",
"full_replication_timeout",
"client_issues",

@ -479,6 +479,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( WAIT_FOR_DISTRIBUTOR_JOIN_DELAY, 1.0 );
init( WAIT_FOR_RATEKEEPER_JOIN_DELAY, 1.0 );
init( WAIT_FOR_BLOB_MANAGER_JOIN_DELAY, 1.0 );
init( WAIT_FOR_ENCRYPT_KEY_PROXY_JOIN_DELAY, 1.0 );
init( WORKER_FAILURE_TIME, 1.0 ); if( randomize && BUGGIFY ) WORKER_FAILURE_TIME = 10.0;
init( CHECK_OUTSTANDING_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) CHECK_OUTSTANDING_INTERVAL = 0.001;
init( VERSION_LAG_METRIC_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) VERSION_LAG_METRIC_INTERVAL = 10.0;
@ -496,6 +497,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( CC_WORKER_HEALTH_CHECKING_INTERVAL, 60.0 );
init( CC_DEGRADED_LINK_EXPIRATION_INTERVAL, 300.0 );
init( CC_MIN_DEGRADATION_INTERVAL, 120.0 );
init( ENCRYPT_KEY_PROXY_FAILURE_TIME, 0.1 );
init( CC_DEGRADED_PEER_DEGREE_TO_EXCLUDE, 3 );
init( CC_MAX_EXCLUSION_DUE_TO_HEALTH, 2 );
init( CC_HEALTH_TRIGGER_RECOVERY, false );
@ -792,6 +794,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
// Cluster recovery
init ( CLUSTER_RECOVERY_EVENT_NAME_PREFIX, "Master");
// encrypt key proxy
init( ENABLE_ENCRYPT_KEY_PROXY, false );
// Blob granlues
init( BG_URL, "" ); // TODO: store in system key space, eventually
init( BG_SNAPSHOT_FILE_TARGET_BYTES, 10000000 ); if( randomize && BUGGIFY ) { deterministicRandom()->random01() < 0.1 ? BG_SNAPSHOT_FILE_TARGET_BYTES /= 100 : BG_SNAPSHOT_FILE_TARGET_BYTES /= 10; }

@ -400,6 +400,7 @@ public:
double WAIT_FOR_DISTRIBUTOR_JOIN_DELAY;
double WAIT_FOR_RATEKEEPER_JOIN_DELAY;
double WAIT_FOR_BLOB_MANAGER_JOIN_DELAY;
double WAIT_FOR_ENCRYPT_KEY_PROXY_JOIN_DELAY;
double WORKER_FAILURE_TIME;
double CHECK_OUTSTANDING_INTERVAL;
double INCOMPATIBLE_PEERS_LOGGING_INTERVAL;
@ -422,6 +423,7 @@ public:
// degraded server is considered healthy.
double CC_MIN_DEGRADATION_INTERVAL; // The minimum interval that a server is reported as degraded to be considered
// as degraded by Cluster Controller.
double ENCRYPT_KEY_PROXY_FAILURE_TIME;
int CC_DEGRADED_PEER_DEGREE_TO_EXCLUDE; // The maximum number of degraded peers when excluding a server. When the
// number of degraded peers is more than this value, we will not exclude
// this server since it may because of server overload.
@ -743,6 +745,9 @@ public:
// Cluster recovery
std::string CLUSTER_RECOVERY_EVENT_NAME_PREFIX;
// encrypt key proxy
bool ENABLE_ENCRYPT_KEY_PROXY;
// blob granule stuff
// FIXME: configure url with database configuration instead of knob eventually
std::string BG_URL;

@ -272,6 +272,24 @@ ProcessClass::Fitness ProcessClass::machineClassFitness(ClusterRole role) const
default:
return ProcessClass::NeverAssign;
}
case ProcessClass::EncryptKeyProxy:
switch (_class) {
case ProcessClass::EncryptKeyProxyClass:
return ProcessClass::BestFit;
case ProcessClass::StatelessClass:
return ProcessClass::GoodFit;
case ProcessClass::UnsetClass:
return ProcessClass::UnsetFit;
case ProcessClass::MasterClass:
return ProcessClass::OkayFit;
case ProcessClass::CoordinatorClass:
case ProcessClass::TesterClass:
case ProcessClass::StorageCacheClass:
case ProcessClass::BlobWorkerClass:
return ProcessClass::NeverAssign;
default:
return ProcessClass::WorstFit;
}
default:
return ProcessClass::NeverAssign;
}

@ -48,6 +48,7 @@ struct ProcessClass {
GrvProxyClass,
BlobManagerClass,
BlobWorkerClass,
EncryptKeyProxyClass,
InvalidClass = -1
};
@ -75,6 +76,7 @@ struct ProcessClass {
BlobWorker,
StorageCache,
Backup,
EncryptKeyProxy,
Worker, // used for actor lineage tracking
NoRole
};
@ -112,6 +114,7 @@ public:
else if (s=="blob_worker") _class = BlobWorkerClass;
else if (s=="storage_cache") _class = StorageCacheClass;
else if (s=="backup") _class = BackupClass;
else if (s=="encrypt_key_proxy") _class = EncryptKeyProxyClass;
else _class = InvalidClass;
}
@ -141,6 +144,7 @@ public:
else if (classStr=="blob_worker") _class = BlobWorkerClass;
else if (classStr=="storage_cache") _class = StorageCacheClass;
else if (classStr=="backup") _class = BackupClass;
else if (classStr=="encrypt_key_proxy") _class = EncryptKeyProxyClass;
else _class = InvalidClass;
if (sourceStr=="command_line") _source = CommandLineSource;
@ -180,6 +184,7 @@ public:
case BlobWorkerClass: return "blob_worker";
case StorageCacheClass: return "storage_cache";
case BackupClass: return "backup";
case EncryptKeyProxyClass: return "encrypt_key_proxy";
default: return "invalid";
}
}

@ -161,6 +161,8 @@ public:
return false;
case ProcessClass::BackupClass:
return false;
case ProcessClass::EncryptKeyProxyClass:
return false;
default:
return false;
}

@ -32,6 +32,8 @@ set(FDBSERVER_SRCS
DataDistributorInterface.h
DBCoreState.h
DiskQueue.actor.cpp
EncryptKeyProxyInterface.h
EncryptKeyProxy.actor.cpp
fdbserver.actor.cpp
FDBExecHelper.actor.cpp
FDBExecHelper.actor.h

@ -26,6 +26,7 @@
#include <vector>
#include "fdbrpc/FailureMonitor.h"
#include "fdbserver/EncryptKeyProxyInterface.h"
#include "flow/ActorCollection.h"
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbclient/NativeAPI.actor.h"
@ -58,6 +59,7 @@
#include "fdbrpc/ReplicationUtils.h"
#include "fdbrpc/sim_validation.h"
#include "fdbclient/KeyBackedTypes.h"
#include "flow/Trace.h"
#include "flow/Util.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -93,6 +95,7 @@ struct RatekeeperSingleton : Singleton<RatekeeperInterface> {
void setInterfaceToDbInfo(ClusterControllerData* cc) const {
if (interface.present()) {
TraceEvent("CCRK_SetInf", cc->id).detail("Id", interface.get().id());
cc->db.setRatekeeper(interface.get());
}
}
@ -114,6 +117,7 @@ struct DataDistributorSingleton : Singleton<DataDistributorInterface> {
void setInterfaceToDbInfo(ClusterControllerData* cc) const {
if (interface.present()) {
TraceEvent("CCDD_SetInf", cc->id).detail("Id", interface.get().id());
cc->db.setDistributor(interface.get());
}
}
@ -135,6 +139,7 @@ struct BlobManagerSingleton : Singleton<BlobManagerInterface> {
void setInterfaceToDbInfo(ClusterControllerData* cc) const {
if (interface.present()) {
TraceEvent("CCBM_SetInf", cc->id).detail("Id", interface.get().id());
cc->db.setBlobManager(interface.get());
}
}
@ -147,6 +152,28 @@ struct BlobManagerSingleton : Singleton<BlobManagerInterface> {
void recruit(ClusterControllerData* cc) const { cc->recruitBlobManager.set(true); }
};
struct EncryptKeyProxySingleton : Singleton<EncryptKeyProxyInterface> {
EncryptKeyProxySingleton(const Optional<EncryptKeyProxyInterface>& interface) : Singleton(interface) {}
Role getRole() const { return Role::ENCRYPT_KEY_PROXY; }
ProcessClass::ClusterRole getClusterRole() const { return ProcessClass::EncryptKeyProxy; }
void setInterfaceToDbInfo(ClusterControllerData* cc) const {
if (interface.present()) {
TraceEvent("CCEKP_SetInf", cc->id).detail("Id", interface.get().id());
cc->db.setEncryptKeyProxy(interface.get());
}
}
void halt(ClusterControllerData* cc, Optional<Standalone<StringRef>> pid) const {
if (interface.present()) {
cc->id_worker[pid].haltEncryptKeyProxy =
brokenPromiseToNever(interface.get().haltEncryptKeyProxy.getReply(HaltEncryptKeyProxyRequest(cc->id)));
}
}
void recruit(ClusterControllerData* cc) const { cc->recruitEncryptKeyProxy.set(true); }
};
ACTOR Future<Void> handleLeaderReplacement(Reference<ClusterRecoveryData> self, Future<Void> leaderFail) {
loop choose {
when(wait(leaderFail)) {
@ -195,6 +222,7 @@ ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster,
dbInfo.distributor = db->serverInfo->get().distributor;
dbInfo.ratekeeper = db->serverInfo->get().ratekeeper;
dbInfo.blobManager = db->serverInfo->get().blobManager;
dbInfo.encryptKeyProxy = db->serverInfo->get().encryptKeyProxy;
dbInfo.latencyBandConfig = db->serverInfo->get().latencyBandConfig;
dbInfo.myLocality = db->serverInfo->get().myLocality;
dbInfo.client = ClientDBInfo();
@ -569,6 +597,11 @@ void checkBetterSingletons(ClusterControllerData* self) {
newBMWorker = findNewProcessForSingleton(self, ProcessClass::BlobManager, id_used);
}
WorkerDetails newEKPWorker;
if (SERVER_KNOBS->ENABLE_ENCRYPT_KEY_PROXY) {
newEKPWorker = findNewProcessForSingleton(self, ProcessClass::EncryptKeyProxy, id_used);
}
// Find best possible fitnesses for each singleton.
auto bestFitnessForRK = findBestFitnessForSingleton(self, newRKWorker, ProcessClass::Ratekeeper);
auto bestFitnessForDD = findBestFitnessForSingleton(self, newDDWorker, ProcessClass::DataDistributor);
@ -578,10 +611,16 @@ void checkBetterSingletons(ClusterControllerData* self) {
bestFitnessForBM = findBestFitnessForSingleton(self, newBMWorker, ProcessClass::BlobManager);
}
ProcessClass::Fitness bestFitnessForEKP;
if (SERVER_KNOBS->ENABLE_ENCRYPT_KEY_PROXY) {
bestFitnessForEKP = findBestFitnessForSingleton(self, newEKPWorker, ProcessClass::EncryptKeyProxy);
}
auto& db = self->db.serverInfo->get();
auto rkSingleton = RatekeeperSingleton(db.ratekeeper);
auto ddSingleton = DataDistributorSingleton(db.distributor);
BlobManagerSingleton bmSingleton(db.blobManager);
EncryptKeyProxySingleton ekpSingleton(db.encryptKeyProxy);
// Check if the singletons are healthy.
// side effect: try to rerecruit the singletons to more optimal processes
@ -597,9 +636,14 @@ void checkBetterSingletons(ClusterControllerData* self) {
self, newBMWorker, bmSingleton, bestFitnessForBM, self->recruitingBlobManagerID);
}
bool ekpHealthy = true;
if (SERVER_KNOBS->ENABLE_ENCRYPT_KEY_PROXY) {
ekpHealthy = isHealthySingleton<EncryptKeyProxyInterface>(
self, newEKPWorker, ekpSingleton, bestFitnessForEKP, self->recruitingEncryptKeyProxyID);
}
// if any of the singletons are unhealthy (rerecruited or not stable), then do not
// consider any further re-recruitments
if (!(rkHealthy && ddHealthy && bmHealthy)) {
if (!(rkHealthy && ddHealthy && bmHealthy && ekpHealthy)) {
return;
}
@ -616,6 +660,12 @@ void checkBetterSingletons(ClusterControllerData* self) {
newBMProcessId = newBMWorker.interf.locality.processId();
}
Optional<Standalone<StringRef>> currEKPProcessId, newEKPProcessId;
if (SERVER_KNOBS->ENABLE_ENCRYPT_KEY_PROXY) {
currEKPProcessId = ekpSingleton.interface.get().locality.processId();
newEKPProcessId = newEKPWorker.interf.locality.processId();
}
std::vector<Optional<Standalone<StringRef>>> currPids = { currRKProcessId, currDDProcessId };
std::vector<Optional<Standalone<StringRef>>> newPids = { newRKProcessId, newDDProcessId };
if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES) {
@ -623,6 +673,11 @@ void checkBetterSingletons(ClusterControllerData* self) {
newPids.emplace_back(newBMProcessId);
}
if (SERVER_KNOBS->ENABLE_ENCRYPT_KEY_PROXY) {
currPids.emplace_back(currEKPProcessId);
newPids.emplace_back(newEKPProcessId);
}
auto currColocMap = getColocCounts(currPids);
auto newColocMap = getColocCounts(newPids);
@ -632,10 +687,17 @@ void checkBetterSingletons(ClusterControllerData* self) {
ASSERT(newColocMap[newBMProcessId] == 0);
}
// if the knob is disabled, the EKP coloc counts should have no affect on the coloc counts check below
if (!SERVER_KNOBS->ENABLE_ENCRYPT_KEY_PROXY) {
ASSERT(currColocMap[currEKPProcessId] == 0);
ASSERT(newColocMap[newEKPProcessId] == 0);
}
// if the new coloc counts are collectively better (i.e. each singleton's coloc count has not increased)
if (newColocMap[newRKProcessId] <= currColocMap[currRKProcessId] &&
newColocMap[newDDProcessId] <= currColocMap[currDDProcessId] &&
newColocMap[newBMProcessId] <= currColocMap[currBMProcessId]) {
newColocMap[newBMProcessId] <= currColocMap[currBMProcessId] &&
newColocMap[newEKPProcessId] <= currColocMap[currEKPProcessId]) {
// rerecruit the singleton for which we have found a better process, if any
if (newColocMap[newRKProcessId] < currColocMap[currRKProcessId]) {
rkSingleton.recruit(self);
@ -643,6 +705,9 @@ void checkBetterSingletons(ClusterControllerData* self) {
ddSingleton.recruit(self);
} else if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES && newColocMap[newBMProcessId] < currColocMap[currBMProcessId]) {
bmSingleton.recruit(self);
} else if (SERVER_KNOBS->ENABLE_ENCRYPT_KEY_PROXY &&
newColocMap[newEKPProcessId] < currColocMap[currEKPProcessId]) {
ekpSingleton.recruit(self);
}
}
}
@ -1158,6 +1223,13 @@ void registerWorker(RegisterWorkerRequest req, ClusterControllerData* self, Conf
self, w, currSingleton, registeringSingleton, self->recruitingBlobManagerID);
}
if (SERVER_KNOBS->ENABLE_ENCRYPT_KEY_PROXY && req.encryptKeyProxyInterf.present()) {
auto currSingleton = EncryptKeyProxySingleton(self->db.serverInfo->get().encryptKeyProxy);
auto registeringSingleton = EncryptKeyProxySingleton(req.encryptKeyProxyInterf);
haltRegisteringOrCurrentSingleton<EncryptKeyProxyInterface>(
self, w, currSingleton, registeringSingleton, self->recruitingEncryptKeyProxyID);
}
// Notify the worker to register again with new process class/exclusive property
if (!req.reply.isSet() && newPriorityInfo != req.priorityInfo) {
req.reply.send(RegisterWorkerReply(newProcessClass, newPriorityInfo));
@ -2058,6 +2130,96 @@ ACTOR Future<int64_t> getNextBMEpoch(ClusterControllerData* self) {
}
}
ACTOR Future<Void> startEncryptKeyProxy(ClusterControllerData* self) {
wait(delay(0.0)); // If master fails at the same time, give it a chance to clear master PID.
TraceEvent("CCEKP_Start", self->id).log();
loop {
try {
// EncryptKeyServer interface is critical in recovering tlog encrypted transactions,
// hence, the process only waits for the master recruitment and not the full cluster recovery.
state bool noEncryptKeyServer = !self->db.serverInfo->get().encryptKeyProxy.present();
while (!self->masterProcessId.present() ||
self->masterProcessId != self->db.serverInfo->get().master.locality.processId() ||
self->db.serverInfo->get().recoveryState < RecoveryState::LOCKING_CSTATE) {
wait(self->db.serverInfo->onChange() || delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY));
}
if (noEncryptKeyServer && self->db.serverInfo->get().encryptKeyProxy.present()) {
// Existing encryptKeyServer registers while waiting, so skip.
return Void();
}
// Recruit EncryptKeyProxy in the same datacenter as the ClusterController.
// This should always be possible, given EncryptKeyProxy is stateless, we can recruit EncryptKeyProxy
// on the same process as the CluserController.
state std::map<Optional<Standalone<StringRef>>, int> id_used;
self->updateKnownIds(&id_used);
state WorkerFitnessInfo ekpWorker = self->getWorkerForRoleInDatacenter(self->clusterControllerDcId,
ProcessClass::EncryptKeyProxy,
ProcessClass::NeverAssign,
self->db.config,
id_used);
InitializeEncryptKeyProxyRequest req(deterministicRandom()->randomUniqueID());
state WorkerDetails worker = ekpWorker.worker;
if (self->onMasterIsBetter(worker, ProcessClass::EncryptKeyProxy)) {
worker = self->id_worker[self->masterProcessId.get()].details;
}
self->recruitingEncryptKeyProxyID = req.reqId;
TraceEvent("CCEKP_Recruit", self->id).detail("Addr", worker.interf.address()).detail("Id", req.reqId);
ErrorOr<EncryptKeyProxyInterface> interf = wait(worker.interf.encryptKeyProxy.getReplyUnlessFailedFor(
req, SERVER_KNOBS->WAIT_FOR_ENCRYPT_KEY_PROXY_JOIN_DELAY, 0));
if (interf.present()) {
self->recruitEncryptKeyProxy.set(false);
self->recruitingEncryptKeyProxyID = interf.get().id();
const auto& encryptKeyProxy = self->db.serverInfo->get().encryptKeyProxy;
TraceEvent("CCEKP_Recruited", self->id)
.detail("Addr", worker.interf.address())
.detail("Id", interf.get().id())
.detail("ProcessId", interf.get().locality.processId());
if (encryptKeyProxy.present() && encryptKeyProxy.get().id() != interf.get().id() &&
self->id_worker.count(encryptKeyProxy.get().locality.processId())) {
TraceEvent("CCEKP_HaltAfterRecruit", self->id)
.detail("Id", encryptKeyProxy.get().id())
.detail("DcId", printable(self->clusterControllerDcId));
EncryptKeyProxySingleton(encryptKeyProxy).halt(self, encryptKeyProxy.get().locality.processId());
}
if (!encryptKeyProxy.present() || encryptKeyProxy.get().id() != interf.get().id()) {
self->db.setEncryptKeyProxy(interf.get());
TraceEvent("CCEKP_UpdateInf", self->id)
.detail("Id", self->db.serverInfo->get().encryptKeyProxy.get().id());
}
return Void();
}
} catch (Error& e) {
TraceEvent("CCEKP_RecruitError", self->id).error(e);
if (e.code() != error_code_no_more_servers) {
throw;
}
}
wait(lowPriorityDelay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY));
}
}
ACTOR Future<Void> monitorEncryptKeyProxy(ClusterControllerData* self) {
loop {
if (self->db.serverInfo->get().encryptKeyProxy.present() && !self->recruitEncryptKeyProxy.get()) {
choose {
when(wait(waitFailureClient(self->db.serverInfo->get().encryptKeyProxy.get().waitFailure,
SERVER_KNOBS->ENCRYPT_KEY_PROXY_FAILURE_TIME))) {
TraceEvent("CCEKP_Died", self->id);
self->db.clearInterf(ProcessClass::EncryptKeyProxyClass);
}
when(wait(self->recruitEncryptKeyProxy.onChange())) {}
}
} else {
wait(startEncryptKeyProxy(self));
}
}
}
ACTOR Future<Void> startBlobManager(ClusterControllerData* self) {
wait(delay(0.0)); // If master fails at the same time, give it a chance to clear master PID.
@ -2278,6 +2440,10 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
state uint64_t step = 0;
state Future<ErrorOr<Void>> error = errorOr(actorCollection(self.addActor.getFuture()));
// EncryptKeyProxy is necessary for TLog recovery, recruit it as the first process
if (SERVER_KNOBS->ENABLE_ENCRYPT_KEY_PROXY) {
self.addActor.send(monitorEncryptKeyProxy(&self));
}
self.addActor.send(clusterWatchDatabase(&self, &self.db, coordinators, leaderFail)); // Start the master database
self.addActor.send(self.updateWorkerList.init(self.db.db));
self.addActor.send(statusServer(interf.clientInterface.databaseStatus.getFuture(),

@ -50,6 +50,7 @@ struct WorkerInfo : NonCopyable {
Future<Void> haltRatekeeper;
Future<Void> haltDistributor;
Future<Void> haltBlobManager;
Future<Void> haltEncryptKeyProxy;
Standalone<VectorRef<StringRef>> issues;
WorkerInfo()
@ -71,7 +72,7 @@ struct WorkerInfo : NonCopyable {
: watcher(std::move(r.watcher)), reply(std::move(r.reply)), gen(r.gen), reboots(r.reboots),
initialClass(r.initialClass), priorityInfo(r.priorityInfo), details(std::move(r.details)),
haltRatekeeper(r.haltRatekeeper), haltDistributor(r.haltDistributor), haltBlobManager(r.haltBlobManager),
issues(r.issues) {}
haltEncryptKeyProxy(r.haltEncryptKeyProxy), issues(r.issues) {}
void operator=(WorkerInfo&& r) noexcept {
watcher = std::move(r.watcher);
reply = std::move(r.reply);
@ -83,6 +84,7 @@ struct WorkerInfo : NonCopyable {
haltRatekeeper = r.haltRatekeeper;
haltDistributor = r.haltDistributor;
haltBlobManager = r.haltBlobManager;
haltEncryptKeyProxy = r.haltEncryptKeyProxy;
issues = r.issues;
}
};
@ -173,6 +175,14 @@ public:
serverInfo->set(newInfo);
}
void setEncryptKeyProxy(const EncryptKeyProxyInterface& interf) {
auto newInfo = serverInfo->get();
newInfo.id = deterministicRandom()->randomUniqueID();
newInfo.infoGeneration = ++dbInfoCount;
newInfo.encryptKeyProxy = interf;
serverInfo->set(newInfo);
}
void clearInterf(ProcessClass::ClassType t) {
auto newInfo = serverInfo->get();
newInfo.id = deterministicRandom()->randomUniqueID();
@ -183,6 +193,8 @@ public:
newInfo.ratekeeper = Optional<RatekeeperInterface>();
} else if (t == ProcessClass::BlobManagerClass) {
newInfo.blobManager = Optional<BlobManagerInterface>();
} else if (t == ProcessClass::EncryptKeyProxyClass) {
newInfo.encryptKeyProxy = Optional<EncryptKeyProxyInterface>();
}
serverInfo->set(newInfo);
}
@ -275,7 +287,9 @@ public:
(db.serverInfo->get().ratekeeper.present() &&
db.serverInfo->get().ratekeeper.get().locality.processId() == processId) ||
(db.serverInfo->get().blobManager.present() &&
db.serverInfo->get().blobManager.get().locality.processId() == processId);
db.serverInfo->get().blobManager.get().locality.processId() == processId) ||
(db.serverInfo->get().encryptKeyProxy.present() &&
db.serverInfo->get().encryptKeyProxy.get().locality.processId() == processId);
}
WorkerDetails getStorageWorker(RecruitStorageRequest const& req) {
@ -2845,7 +2859,7 @@ public:
ASSERT(masterProcessId.present());
const auto& pid = worker.interf.locality.processId();
if ((role != ProcessClass::DataDistributor && role != ProcessClass::Ratekeeper &&
role != ProcessClass::BlobManager) ||
role != ProcessClass::BlobManager && role != ProcessClass::EncryptKeyProxy) ||
pid == masterProcessId.get()) {
return false;
}
@ -2865,6 +2879,7 @@ public:
}
}
}
for (const CommitProxyInterface& interf : dbInfo.client.commitProxies) {
ASSERT(interf.processId.present());
idUsed[interf.processId]++;
@ -3216,6 +3231,8 @@ public:
Optional<UID> recruitingRatekeeperID;
AsyncVar<bool> recruitBlobManager;
Optional<UID> recruitingBlobManagerID;
AsyncVar<bool> recruitEncryptKeyProxy;
Optional<UID> recruitingEncryptKeyProxyID;
// Stores the health information from a particular worker's perspective.
struct WorkerHealth {
@ -3254,7 +3271,7 @@ public:
ac(false), outstandingRequestChecker(Void()), outstandingRemoteRequestChecker(Void()), startTime(now()),
goodRecruitmentTime(Never()), goodRemoteRecruitmentTime(Never()), datacenterVersionDifference(0),
versionDifferenceUpdated(false), remoteDCMonitorStarted(false), remoteTransactionSystemDegraded(false),
recruitDistributor(false), recruitRatekeeper(false), recruitBlobManager(false),
recruitDistributor(false), recruitRatekeeper(false), recruitBlobManager(false), recruitEncryptKeyProxy(false),
clusterControllerMetrics("ClusterController", id.toString()),
openDatabaseRequests("OpenDatabaseRequests", clusterControllerMetrics),
registerWorkerRequests("RegisterWorkerRequests", clusterControllerMetrics),

@ -0,0 +1,65 @@
/*
* EncryptKeyProxy.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/EncryptKeyProxyInterface.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/ServerDBInfo.h"
#include "flow/Arena.h"
#include "flow/Error.h"
#include "flow/EventTypes.actor.h"
#include "flow/FastRef.h"
#include "flow/Trace.h"
#include "flow/genericactors.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
struct EncryptKeyProxyData : NonCopyable, ReferenceCounted<EncryptKeyProxyData> {
UID myId;
PromiseStream<Future<Void>> addActor;
explicit EncryptKeyProxyData(UID id) : myId(id) {}
};
ACTOR Future<Void> encryptKeyProxyServer(EncryptKeyProxyInterface ekpInterface, Reference<AsyncVar<ServerDBInfo>> db) {
state Reference<EncryptKeyProxyData> self(new EncryptKeyProxyData(ekpInterface.id()));
state PromiseStream<Future<Void>> addActor;
state Future<Void> collection = actorCollection(self->addActor.getFuture());
self->addActor.send(traceRole(Role::ENCRYPT_KEY_PROXY, ekpInterface.id()));
TraceEvent("EKP_Start", self->myId).log();
// TODO(ahusain): skeleton implementation, more to come
try {
loop choose {
when(HaltEncryptKeyProxyRequest req = waitNext(ekpInterface.haltEncryptKeyProxy.getFuture())) {
req.reply.send(Void());
TraceEvent("EKP_Halted", ekpInterface.id()).detail("ReqID", req.requesterID);
break;
}
when(wait(collection)) {
ASSERT(false);
throw internal_error();
}
}
} catch (Error& e) {
TraceEvent("EKP_Terminated", ekpInterface.id()).error(e, true);
}
return Void();
}

@ -0,0 +1,66 @@
/*
* EncryptKeyProxyInterface.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBSERVER_ENCRYPTKEYPROXYINTERFACE_H
#define FDBSERVER_ENCRYPTKEYPROXYINTERFACE_H
#include "flow/network.h"
#pragma once
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/Locality.h"
struct EncryptKeyProxyInterface {
constexpr static FileIdentifier file_identifier = 1303419;
struct LocalityData locality;
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream<struct HaltEncryptKeyProxyRequest> haltEncryptKeyProxy;
UID myId;
EncryptKeyProxyInterface() {}
explicit EncryptKeyProxyInterface(const struct LocalityData& loc, UID id) : locality(loc), myId(id) {}
void initEndpoints() {}
UID id() const { return myId; }
NetworkAddress address() const { return waitFailure.getEndpoint().getPrimaryAddress(); }
bool operator==(const EncryptKeyProxyInterface& toCompare) const { return myId == toCompare.myId; }
bool operator!=(const EncryptKeyProxyInterface& toCompare) const { return !(*this == toCompare); }
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, waitFailure, locality, myId);
}
};
struct HaltEncryptKeyProxyRequest {
constexpr static FileIdentifier file_identifier = 2378138;
UID requesterID;
ReplyPromise<Void> reply;
HaltEncryptKeyProxyRequest() {}
explicit HaltEncryptKeyProxyRequest(UID uid) : requesterID(uid) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, requesterID, reply);
}
};
#endif

@ -49,6 +49,7 @@ struct ServerDBInfo {
MasterInterface master; // The best guess as to the most recent master, which might still be recovering
Optional<RatekeeperInterface> ratekeeper;
Optional<BlobManagerInterface> blobManager;
Optional<EncryptKeyProxyInterface> encryptKeyProxy;
std::vector<ResolverInterface> resolvers;
DBRecoveryCount
recoveryCount; // A recovery count from DBCoreState. A successful cluster recovery increments it twice;
@ -82,6 +83,7 @@ struct ServerDBInfo {
master,
ratekeeper,
blobManager,
encryptKeyProxy,
resolvers,
recoveryCount,
recoveryState,

@ -803,6 +803,10 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
roles.addRole("blob_manager", db->get().blobManager.get());
}
if (SERVER_KNOBS->ENABLE_ENCRYPT_KEY_PROXY && db->get().encryptKeyProxy.present()) {
roles.addRole("encrypt_key_proxy", db->get().encryptKeyProxy.get());
}
for (auto& tLogSet : db->get().logSystemConfig.tLogs) {
for (auto& it : tLogSet.logRouters) {
if (it.present()) {

@ -27,6 +27,7 @@
#include "fdbserver/BackupInterface.h"
#include "fdbserver/DataDistributorInterface.h"
#include "fdbserver/EncryptKeyProxyInterface.h"
#include "fdbserver/MasterInterface.h"
#include "fdbserver/TLogInterface.h"
#include "fdbserver/RatekeeperInterface.h"
@ -60,6 +61,7 @@ struct WorkerInterface {
RequestStream<struct InitializeStorageRequest> storage;
RequestStream<struct InitializeLogRouterRequest> logRouter;
RequestStream<struct InitializeBackupRequest> backup;
RequestStream<struct InitializeEncryptKeyProxyRequest> encryptKeyProxy;
RequestStream<struct LoadedPingRequest> debugPing;
RequestStream<struct CoordinationPingMessage> coordinationPing;
@ -125,6 +127,7 @@ struct WorkerInterface {
execReq,
workerSnapReq,
backup,
encryptKeyProxy,
updateServerDBInfo,
configBroadcastInterface);
}
@ -425,6 +428,7 @@ struct RegisterWorkerRequest {
Optional<DataDistributorInterface> distributorInterf;
Optional<RatekeeperInterface> ratekeeperInterf;
Optional<BlobManagerInterface> blobManagerInterf;
Optional<EncryptKeyProxyInterface> encryptKeyProxyInterf;
Standalone<VectorRef<StringRef>> issues;
std::vector<NetworkAddress> incompatiblePeers;
ReplyPromise<RegisterWorkerReply> reply;
@ -443,13 +447,14 @@ struct RegisterWorkerRequest {
Optional<DataDistributorInterface> ddInterf,
Optional<RatekeeperInterface> rkInterf,
Optional<BlobManagerInterface> bmInterf,
Optional<EncryptKeyProxyInterface> ekpInterf,
bool degraded,
Version lastSeenKnobVersion,
ConfigClassSet knobConfigClassSet)
: wi(wi), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo),
generation(generation), distributorInterf(ddInterf), ratekeeperInterf(rkInterf), blobManagerInterf(bmInterf),
degraded(degraded), lastSeenKnobVersion(lastSeenKnobVersion), knobConfigClassSet(knobConfigClassSet),
requestDbInfo(false) {}
encryptKeyProxyInterf(ekpInterf), degraded(degraded), lastSeenKnobVersion(lastSeenKnobVersion),
knobConfigClassSet(knobConfigClassSet), requestDbInfo(false) {}
template <class Ar>
void serialize(Ar& ar) {
@ -462,6 +467,7 @@ struct RegisterWorkerRequest {
distributorInterf,
ratekeeperInterf,
blobManagerInterf,
encryptKeyProxyInterf,
issues,
incompatiblePeers,
reply,
@ -791,6 +797,21 @@ struct InitializeBlobWorkerRequest {
}
};
struct InitializeEncryptKeyProxyRequest {
constexpr static FileIdentifier file_identifier = 4180191;
UID reqId;
UID interfaceId;
ReplyPromise<EncryptKeyProxyInterface> reply;
InitializeEncryptKeyProxyRequest() {}
explicit InitializeEncryptKeyProxyRequest(UID uid) : reqId(uid) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reqId, interfaceId, reply);
}
};
struct TraceBatchDumpRequest {
constexpr static FileIdentifier file_identifier = 8184121;
ReplyPromise<Void> reply;
@ -956,6 +977,7 @@ struct Role {
static const Role STORAGE_CACHE;
static const Role COORDINATOR;
static const Role BACKUP;
static const Role ENCRYPT_KEY_PROXY;
std::string roleName;
std::string abbreviation;
@ -991,6 +1013,8 @@ struct Role {
return STORAGE_CACHE;
case ProcessClass::Backup:
return BACKUP;
case ProcessClass::EncryptKeyProxy:
return ENCRYPT_KEY_PROXY;
case ProcessClass::Worker:
return WORKER;
case ProcessClass::NoRole:
@ -1052,6 +1076,7 @@ ACTOR Future<Void> clusterController(Reference<IClusterConnectionRecord> ccr,
ACTOR Future<Void> blobWorker(BlobWorkerInterface bwi,
ReplyPromise<InitializeBlobWorkerReply> blobWorkerReady,
Reference<AsyncVar<ServerDBInfo> const> dbInfo);
ACTOR Future<Void> encryptKeyProxyServer(EncryptKeyProxyInterface ei, Reference<AsyncVar<ServerDBInfo>> db);
// These servers are started by workerServer
class IKeyValueStore;

@ -34,6 +34,7 @@
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/MetricLogger.actor.h"
#include "fdbserver/BackupInterface.h"
#include "fdbserver/EncryptKeyProxyInterface.h"
#include "fdbserver/RoleLineage.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/IKeyValueStore.h"
@ -519,6 +520,7 @@ ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControl
Reference<AsyncVar<Optional<DataDistributorInterface>> const> ddInterf,
Reference<AsyncVar<Optional<RatekeeperInterface>> const> rkInterf,
Reference<AsyncVar<Optional<BlobManagerInterface>> const> bmInterf,
Reference<AsyncVar<Optional<EncryptKeyProxyInterface>> const> ekpInterf,
Reference<AsyncVar<bool> const> degraded,
Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<std::set<std::string>> const> issues,
@ -554,6 +556,7 @@ ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControl
ddInterf->get(),
rkInterf->get(),
bmInterf->get(),
ekpInterf->get(),
degraded->get(),
localConfig->lastSeenVersion(),
localConfig->configClassSet());
@ -616,6 +619,7 @@ ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControl
when(wait(ddInterf->onChange())) { break; }
when(wait(rkInterf->onChange())) { break; }
when(wait(bmInterf->onChange())) { break; }
when(wait(ekpInterf->onChange())) { break; }
when(wait(degraded->onChange())) { break; }
when(wait(FlowTransport::transport().onIncompatibleChanged())) { break; }
when(wait(issues->onChange())) { break; }
@ -643,6 +647,10 @@ bool addressInDbAndPrimaryDc(const NetworkAddress& address, Reference<AsyncVar<S
return true;
}
if (dbi.encryptKeyProxy.present() && dbi.encryptKeyProxy.get().address() == address) {
return true;
}
for (const auto& resolver : dbi.resolvers) {
if (resolver.address() == address) {
return true;
@ -1376,6 +1384,8 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
new AsyncVar<Optional<DataDistributorInterface>>());
state Reference<AsyncVar<Optional<RatekeeperInterface>>> rkInterf(new AsyncVar<Optional<RatekeeperInterface>>());
state Reference<AsyncVar<Optional<BlobManagerInterface>>> bmInterf(new AsyncVar<Optional<BlobManagerInterface>>());
state Reference<AsyncVar<Optional<EncryptKeyProxyInterface>>> ekpInterf(
new AsyncVar<Optional<EncryptKeyProxyInterface>>());
state Future<Void> handleErrors = workerHandleErrors(errors.getFuture()); // Needs to be stopped last
state ActorCollection errorForwarders(false);
state Future<Void> loggingTrigger = Void();
@ -1646,6 +1656,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
ddInterf,
rkInterf,
bmInterf,
ekpInterf,
degraded,
connRecord,
issues,
@ -1681,13 +1692,17 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
dbInfo->get().clusterInterface != ccInterface->get().get()) {
TraceEvent("GotServerDBInfoChange")
.detail("ChangeID", localInfo.id)
.detail("InfoGeneration", localInfo.infoGeneration)
.detail("MasterID", localInfo.master.id())
.detail("RatekeeperID",
localInfo.ratekeeper.present() ? localInfo.ratekeeper.get().id() : UID())
.detail("DataDistributorID",
localInfo.distributor.present() ? localInfo.distributor.get().id() : UID())
.detail("BlobManagerID",
localInfo.blobManager.present() ? localInfo.blobManager.get().id() : UID());
localInfo.blobManager.present() ? localInfo.blobManager.get().id() : UID())
.detail("EncryptKeyProxyID",
localInfo.encryptKeyProxy.present() ? localInfo.encryptKeyProxy.get().id() : UID());
dbInfo->set(localInfo);
}
errorForwarders.add(
@ -1882,6 +1897,30 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
forwardPromise(req.reply, backupWorkerCache.get(req.reqId));
}
}
when(InitializeEncryptKeyProxyRequest req = waitNext(interf.encryptKeyProxy.getFuture())) {
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::EncryptKeyProxy;
EncryptKeyProxyInterface recruited(locality, req.reqId);
recruited.initEndpoints();
if (ekpInterf->get().present()) {
recruited = ekpInterf->get().get();
TEST(true); // Recruited while already a encryptKeyProxy server.
} else {
startRole(Role::ENCRYPT_KEY_PROXY, recruited.id(), interf.id());
DUMPTOKEN(recruited.waitFailure);
Future<Void> encryptKeyProxyProcess = encryptKeyProxyServer(recruited, dbInfo);
errorForwarders.add(forwardError(
errors,
Role::ENCRYPT_KEY_PROXY,
recruited.id(),
setWhenDoneOrError(encryptKeyProxyProcess, ekpInterf, Optional<EncryptKeyProxyInterface>())));
ekpInterf->set(Optional<EncryptKeyProxyInterface>(recruited));
}
TraceEvent("EncryptKeyProxyReceived", req.reqId).detail("EncryptKeyProxyId", recruited.id());
req.reply.send(recruited);
}
when(InitializeTLogRequest req = waitNext(interf.tLog.getFuture())) {
// For now, there's a one-to-one mapping of spill type to TLogVersion.
// With future work, a particular version of the TLog can support multiple
@ -2705,3 +2744,4 @@ const Role Role::BLOB_WORKER("BlobWorker", "BW");
const Role Role::STORAGE_CACHE("StorageCache", "SC");
const Role Role::COORDINATOR("Coordinator", "CD");
const Role Role::BACKUP("Backup", "BK");
const Role Role::ENCRYPT_KEY_PROXY("EncryptKeyProxy", "EP");

@ -2369,6 +2369,21 @@ struct ConsistencyCheckWorkload : TestWorkload {
return false;
}
// Check EncryptKeyProxy
if (SERVER_KNOBS->ENABLE_ENCRYPT_KEY_PROXY && db.encryptKeyProxy.present() &&
(!nonExcludedWorkerProcessMap.count(db.encryptKeyProxy.get().address()) ||
nonExcludedWorkerProcessMap[db.encryptKeyProxy.get().address()].processClass.machineClassFitness(
ProcessClass::EncryptKeyProxy) > fitnessLowerBound)) {
TraceEvent("ConsistencyCheck_EncyrptKeyProxyNotBest")
.detail("BestEncryptKeyProxyFitness", fitnessLowerBound)
.detail("ExistingEncyrptKeyProxyFitness",
nonExcludedWorkerProcessMap.count(db.encryptKeyProxy.get().address())
? nonExcludedWorkerProcessMap[db.encryptKeyProxy.get().address()]
.processClass.machineClassFitness(ProcessClass::EncryptKeyProxy)
: -1);
return false;
}
// TODO: Check Tlog
return true;