diff --git a/fdbclient/Schemas.cpp b/fdbclient/Schemas.cpp index d2592721c9..140690898e 100644 --- a/fdbclient/Schemas.cpp +++ b/fdbclient/Schemas.cpp @@ -97,6 +97,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema( "ratekeeper", "blob_manager", "blob_worker", + "encrypt_key_proxy", "storage_cache", "router", "coordinator" @@ -497,6 +498,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema( "unreachable_dataDistributor_worker", "unreachable_ratekeeper_worker", "unreachable_blobManager_worker", + "unreachable_encryptKeyProxy_worker", "unreadable_configuration", "full_replication_timeout", "client_issues", diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index bc0f3edd46..8759f493f4 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -479,6 +479,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( WAIT_FOR_DISTRIBUTOR_JOIN_DELAY, 1.0 ); init( WAIT_FOR_RATEKEEPER_JOIN_DELAY, 1.0 ); init( WAIT_FOR_BLOB_MANAGER_JOIN_DELAY, 1.0 ); + init( WAIT_FOR_ENCRYPT_KEY_PROXY_JOIN_DELAY, 1.0 ); init( WORKER_FAILURE_TIME, 1.0 ); if( randomize && BUGGIFY ) WORKER_FAILURE_TIME = 10.0; init( CHECK_OUTSTANDING_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) CHECK_OUTSTANDING_INTERVAL = 0.001; init( VERSION_LAG_METRIC_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) VERSION_LAG_METRIC_INTERVAL = 10.0; @@ -496,6 +497,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( CC_WORKER_HEALTH_CHECKING_INTERVAL, 60.0 ); init( CC_DEGRADED_LINK_EXPIRATION_INTERVAL, 300.0 ); init( CC_MIN_DEGRADATION_INTERVAL, 120.0 ); + init( ENCRYPT_KEY_PROXY_FAILURE_TIME, 0.1 ); init( CC_DEGRADED_PEER_DEGREE_TO_EXCLUDE, 3 ); init( CC_MAX_EXCLUSION_DUE_TO_HEALTH, 2 ); init( CC_HEALTH_TRIGGER_RECOVERY, false ); @@ -792,6 +794,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi // Cluster recovery init ( CLUSTER_RECOVERY_EVENT_NAME_PREFIX, "Master"); + // encrypt key proxy + init( ENABLE_ENCRYPT_KEY_PROXY, false ); + // Blob granlues init( BG_URL, "" ); // TODO: store in system key space, eventually init( BG_SNAPSHOT_FILE_TARGET_BYTES, 10000000 ); if( randomize && BUGGIFY ) { deterministicRandom()->random01() < 0.1 ? BG_SNAPSHOT_FILE_TARGET_BYTES /= 100 : BG_SNAPSHOT_FILE_TARGET_BYTES /= 10; } diff --git a/fdbclient/ServerKnobs.h b/fdbclient/ServerKnobs.h index 68ef823468..13c6d93d54 100644 --- a/fdbclient/ServerKnobs.h +++ b/fdbclient/ServerKnobs.h @@ -400,6 +400,7 @@ public: double WAIT_FOR_DISTRIBUTOR_JOIN_DELAY; double WAIT_FOR_RATEKEEPER_JOIN_DELAY; double WAIT_FOR_BLOB_MANAGER_JOIN_DELAY; + double WAIT_FOR_ENCRYPT_KEY_PROXY_JOIN_DELAY; double WORKER_FAILURE_TIME; double CHECK_OUTSTANDING_INTERVAL; double INCOMPATIBLE_PEERS_LOGGING_INTERVAL; @@ -422,6 +423,7 @@ public: // degraded server is considered healthy. double CC_MIN_DEGRADATION_INTERVAL; // The minimum interval that a server is reported as degraded to be considered // as degraded by Cluster Controller. + double ENCRYPT_KEY_PROXY_FAILURE_TIME; int CC_DEGRADED_PEER_DEGREE_TO_EXCLUDE; // The maximum number of degraded peers when excluding a server. When the // number of degraded peers is more than this value, we will not exclude // this server since it may because of server overload. @@ -743,6 +745,9 @@ public: // Cluster recovery std::string CLUSTER_RECOVERY_EVENT_NAME_PREFIX; + // encrypt key proxy + bool ENABLE_ENCRYPT_KEY_PROXY; + // blob granule stuff // FIXME: configure url with database configuration instead of knob eventually std::string BG_URL; diff --git a/fdbrpc/Locality.cpp b/fdbrpc/Locality.cpp index b136b1bb71..4deec7f517 100644 --- a/fdbrpc/Locality.cpp +++ b/fdbrpc/Locality.cpp @@ -272,6 +272,24 @@ ProcessClass::Fitness ProcessClass::machineClassFitness(ClusterRole role) const default: return ProcessClass::NeverAssign; } + case ProcessClass::EncryptKeyProxy: + switch (_class) { + case ProcessClass::EncryptKeyProxyClass: + return ProcessClass::BestFit; + case ProcessClass::StatelessClass: + return ProcessClass::GoodFit; + case ProcessClass::UnsetClass: + return ProcessClass::UnsetFit; + case ProcessClass::MasterClass: + return ProcessClass::OkayFit; + case ProcessClass::CoordinatorClass: + case ProcessClass::TesterClass: + case ProcessClass::StorageCacheClass: + case ProcessClass::BlobWorkerClass: + return ProcessClass::NeverAssign; + default: + return ProcessClass::WorstFit; + } default: return ProcessClass::NeverAssign; } diff --git a/fdbrpc/Locality.h b/fdbrpc/Locality.h index e868be2f64..25c3eeb22a 100644 --- a/fdbrpc/Locality.h +++ b/fdbrpc/Locality.h @@ -48,6 +48,7 @@ struct ProcessClass { GrvProxyClass, BlobManagerClass, BlobWorkerClass, + EncryptKeyProxyClass, InvalidClass = -1 }; @@ -75,6 +76,7 @@ struct ProcessClass { BlobWorker, StorageCache, Backup, + EncryptKeyProxy, Worker, // used for actor lineage tracking NoRole }; @@ -112,6 +114,7 @@ public: else if (s=="blob_worker") _class = BlobWorkerClass; else if (s=="storage_cache") _class = StorageCacheClass; else if (s=="backup") _class = BackupClass; + else if (s=="encrypt_key_proxy") _class = EncryptKeyProxyClass; else _class = InvalidClass; } @@ -141,6 +144,7 @@ public: else if (classStr=="blob_worker") _class = BlobWorkerClass; else if (classStr=="storage_cache") _class = StorageCacheClass; else if (classStr=="backup") _class = BackupClass; + else if (classStr=="encrypt_key_proxy") _class = EncryptKeyProxyClass; else _class = InvalidClass; if (sourceStr=="command_line") _source = CommandLineSource; @@ -180,6 +184,7 @@ public: case BlobWorkerClass: return "blob_worker"; case StorageCacheClass: return "storage_cache"; case BackupClass: return "backup"; + case EncryptKeyProxyClass: return "encrypt_key_proxy"; default: return "invalid"; } } diff --git a/fdbrpc/simulator.h b/fdbrpc/simulator.h index 19f9d1037d..e03f16cfef 100644 --- a/fdbrpc/simulator.h +++ b/fdbrpc/simulator.h @@ -161,6 +161,8 @@ public: return false; case ProcessClass::BackupClass: return false; + case ProcessClass::EncryptKeyProxyClass: + return false; default: return false; } diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 111ef074cb..39338739a2 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -32,6 +32,8 @@ set(FDBSERVER_SRCS DataDistributorInterface.h DBCoreState.h DiskQueue.actor.cpp + EncryptKeyProxyInterface.h + EncryptKeyProxy.actor.cpp fdbserver.actor.cpp FDBExecHelper.actor.cpp FDBExecHelper.actor.h diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index 7b8dd4bbf4..40456dba38 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -26,6 +26,7 @@ #include #include "fdbrpc/FailureMonitor.h" +#include "fdbserver/EncryptKeyProxyInterface.h" #include "flow/ActorCollection.h" #include "fdbclient/ClusterConnectionMemoryRecord.h" #include "fdbclient/NativeAPI.actor.h" @@ -58,6 +59,7 @@ #include "fdbrpc/ReplicationUtils.h" #include "fdbrpc/sim_validation.h" #include "fdbclient/KeyBackedTypes.h" +#include "flow/Trace.h" #include "flow/Util.h" #include "flow/actorcompiler.h" // This must be the last #include. @@ -93,6 +95,7 @@ struct RatekeeperSingleton : Singleton { void setInterfaceToDbInfo(ClusterControllerData* cc) const { if (interface.present()) { + TraceEvent("CCRK_SetInf", cc->id).detail("Id", interface.get().id()); cc->db.setRatekeeper(interface.get()); } } @@ -114,6 +117,7 @@ struct DataDistributorSingleton : Singleton { void setInterfaceToDbInfo(ClusterControllerData* cc) const { if (interface.present()) { + TraceEvent("CCDD_SetInf", cc->id).detail("Id", interface.get().id()); cc->db.setDistributor(interface.get()); } } @@ -135,6 +139,7 @@ struct BlobManagerSingleton : Singleton { void setInterfaceToDbInfo(ClusterControllerData* cc) const { if (interface.present()) { + TraceEvent("CCBM_SetInf", cc->id).detail("Id", interface.get().id()); cc->db.setBlobManager(interface.get()); } } @@ -147,6 +152,28 @@ struct BlobManagerSingleton : Singleton { void recruit(ClusterControllerData* cc) const { cc->recruitBlobManager.set(true); } }; +struct EncryptKeyProxySingleton : Singleton { + + EncryptKeyProxySingleton(const Optional& interface) : Singleton(interface) {} + + Role getRole() const { return Role::ENCRYPT_KEY_PROXY; } + ProcessClass::ClusterRole getClusterRole() const { return ProcessClass::EncryptKeyProxy; } + + void setInterfaceToDbInfo(ClusterControllerData* cc) const { + if (interface.present()) { + TraceEvent("CCEKP_SetInf", cc->id).detail("Id", interface.get().id()); + cc->db.setEncryptKeyProxy(interface.get()); + } + } + void halt(ClusterControllerData* cc, Optional> pid) const { + if (interface.present()) { + cc->id_worker[pid].haltEncryptKeyProxy = + brokenPromiseToNever(interface.get().haltEncryptKeyProxy.getReply(HaltEncryptKeyProxyRequest(cc->id))); + } + } + void recruit(ClusterControllerData* cc) const { cc->recruitEncryptKeyProxy.set(true); } +}; + ACTOR Future handleLeaderReplacement(Reference self, Future leaderFail) { loop choose { when(wait(leaderFail)) { @@ -195,6 +222,7 @@ ACTOR Future clusterWatchDatabase(ClusterControllerData* cluster, dbInfo.distributor = db->serverInfo->get().distributor; dbInfo.ratekeeper = db->serverInfo->get().ratekeeper; dbInfo.blobManager = db->serverInfo->get().blobManager; + dbInfo.encryptKeyProxy = db->serverInfo->get().encryptKeyProxy; dbInfo.latencyBandConfig = db->serverInfo->get().latencyBandConfig; dbInfo.myLocality = db->serverInfo->get().myLocality; dbInfo.client = ClientDBInfo(); @@ -569,6 +597,11 @@ void checkBetterSingletons(ClusterControllerData* self) { newBMWorker = findNewProcessForSingleton(self, ProcessClass::BlobManager, id_used); } + WorkerDetails newEKPWorker; + if (SERVER_KNOBS->ENABLE_ENCRYPT_KEY_PROXY) { + newEKPWorker = findNewProcessForSingleton(self, ProcessClass::EncryptKeyProxy, id_used); + } + // Find best possible fitnesses for each singleton. auto bestFitnessForRK = findBestFitnessForSingleton(self, newRKWorker, ProcessClass::Ratekeeper); auto bestFitnessForDD = findBestFitnessForSingleton(self, newDDWorker, ProcessClass::DataDistributor); @@ -578,10 +611,16 @@ void checkBetterSingletons(ClusterControllerData* self) { bestFitnessForBM = findBestFitnessForSingleton(self, newBMWorker, ProcessClass::BlobManager); } + ProcessClass::Fitness bestFitnessForEKP; + if (SERVER_KNOBS->ENABLE_ENCRYPT_KEY_PROXY) { + bestFitnessForEKP = findBestFitnessForSingleton(self, newEKPWorker, ProcessClass::EncryptKeyProxy); + } + auto& db = self->db.serverInfo->get(); auto rkSingleton = RatekeeperSingleton(db.ratekeeper); auto ddSingleton = DataDistributorSingleton(db.distributor); BlobManagerSingleton bmSingleton(db.blobManager); + EncryptKeyProxySingleton ekpSingleton(db.encryptKeyProxy); // Check if the singletons are healthy. // side effect: try to rerecruit the singletons to more optimal processes @@ -597,9 +636,14 @@ void checkBetterSingletons(ClusterControllerData* self) { self, newBMWorker, bmSingleton, bestFitnessForBM, self->recruitingBlobManagerID); } + bool ekpHealthy = true; + if (SERVER_KNOBS->ENABLE_ENCRYPT_KEY_PROXY) { + ekpHealthy = isHealthySingleton( + self, newEKPWorker, ekpSingleton, bestFitnessForEKP, self->recruitingEncryptKeyProxyID); + } // if any of the singletons are unhealthy (rerecruited or not stable), then do not // consider any further re-recruitments - if (!(rkHealthy && ddHealthy && bmHealthy)) { + if (!(rkHealthy && ddHealthy && bmHealthy && ekpHealthy)) { return; } @@ -616,6 +660,12 @@ void checkBetterSingletons(ClusterControllerData* self) { newBMProcessId = newBMWorker.interf.locality.processId(); } + Optional> currEKPProcessId, newEKPProcessId; + if (SERVER_KNOBS->ENABLE_ENCRYPT_KEY_PROXY) { + currEKPProcessId = ekpSingleton.interface.get().locality.processId(); + newEKPProcessId = newEKPWorker.interf.locality.processId(); + } + std::vector>> currPids = { currRKProcessId, currDDProcessId }; std::vector>> newPids = { newRKProcessId, newDDProcessId }; if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES) { @@ -623,6 +673,11 @@ void checkBetterSingletons(ClusterControllerData* self) { newPids.emplace_back(newBMProcessId); } + if (SERVER_KNOBS->ENABLE_ENCRYPT_KEY_PROXY) { + currPids.emplace_back(currEKPProcessId); + newPids.emplace_back(newEKPProcessId); + } + auto currColocMap = getColocCounts(currPids); auto newColocMap = getColocCounts(newPids); @@ -632,10 +687,17 @@ void checkBetterSingletons(ClusterControllerData* self) { ASSERT(newColocMap[newBMProcessId] == 0); } + // if the knob is disabled, the EKP coloc counts should have no affect on the coloc counts check below + if (!SERVER_KNOBS->ENABLE_ENCRYPT_KEY_PROXY) { + ASSERT(currColocMap[currEKPProcessId] == 0); + ASSERT(newColocMap[newEKPProcessId] == 0); + } + // if the new coloc counts are collectively better (i.e. each singleton's coloc count has not increased) if (newColocMap[newRKProcessId] <= currColocMap[currRKProcessId] && newColocMap[newDDProcessId] <= currColocMap[currDDProcessId] && - newColocMap[newBMProcessId] <= currColocMap[currBMProcessId]) { + newColocMap[newBMProcessId] <= currColocMap[currBMProcessId] && + newColocMap[newEKPProcessId] <= currColocMap[currEKPProcessId]) { // rerecruit the singleton for which we have found a better process, if any if (newColocMap[newRKProcessId] < currColocMap[currRKProcessId]) { rkSingleton.recruit(self); @@ -643,6 +705,9 @@ void checkBetterSingletons(ClusterControllerData* self) { ddSingleton.recruit(self); } else if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES && newColocMap[newBMProcessId] < currColocMap[currBMProcessId]) { bmSingleton.recruit(self); + } else if (SERVER_KNOBS->ENABLE_ENCRYPT_KEY_PROXY && + newColocMap[newEKPProcessId] < currColocMap[currEKPProcessId]) { + ekpSingleton.recruit(self); } } } @@ -1158,6 +1223,13 @@ void registerWorker(RegisterWorkerRequest req, ClusterControllerData* self, Conf self, w, currSingleton, registeringSingleton, self->recruitingBlobManagerID); } + if (SERVER_KNOBS->ENABLE_ENCRYPT_KEY_PROXY && req.encryptKeyProxyInterf.present()) { + auto currSingleton = EncryptKeyProxySingleton(self->db.serverInfo->get().encryptKeyProxy); + auto registeringSingleton = EncryptKeyProxySingleton(req.encryptKeyProxyInterf); + haltRegisteringOrCurrentSingleton( + self, w, currSingleton, registeringSingleton, self->recruitingEncryptKeyProxyID); + } + // Notify the worker to register again with new process class/exclusive property if (!req.reply.isSet() && newPriorityInfo != req.priorityInfo) { req.reply.send(RegisterWorkerReply(newProcessClass, newPriorityInfo)); @@ -2058,6 +2130,96 @@ ACTOR Future getNextBMEpoch(ClusterControllerData* self) { } } +ACTOR Future startEncryptKeyProxy(ClusterControllerData* self) { + wait(delay(0.0)); // If master fails at the same time, give it a chance to clear master PID. + + TraceEvent("CCEKP_Start", self->id).log(); + loop { + try { + // EncryptKeyServer interface is critical in recovering tlog encrypted transactions, + // hence, the process only waits for the master recruitment and not the full cluster recovery. + state bool noEncryptKeyServer = !self->db.serverInfo->get().encryptKeyProxy.present(); + while (!self->masterProcessId.present() || + self->masterProcessId != self->db.serverInfo->get().master.locality.processId() || + self->db.serverInfo->get().recoveryState < RecoveryState::LOCKING_CSTATE) { + wait(self->db.serverInfo->onChange() || delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY)); + } + if (noEncryptKeyServer && self->db.serverInfo->get().encryptKeyProxy.present()) { + // Existing encryptKeyServer registers while waiting, so skip. + return Void(); + } + + // Recruit EncryptKeyProxy in the same datacenter as the ClusterController. + // This should always be possible, given EncryptKeyProxy is stateless, we can recruit EncryptKeyProxy + // on the same process as the CluserController. + state std::map>, int> id_used; + self->updateKnownIds(&id_used); + state WorkerFitnessInfo ekpWorker = self->getWorkerForRoleInDatacenter(self->clusterControllerDcId, + ProcessClass::EncryptKeyProxy, + ProcessClass::NeverAssign, + self->db.config, + id_used); + + InitializeEncryptKeyProxyRequest req(deterministicRandom()->randomUniqueID()); + state WorkerDetails worker = ekpWorker.worker; + if (self->onMasterIsBetter(worker, ProcessClass::EncryptKeyProxy)) { + worker = self->id_worker[self->masterProcessId.get()].details; + } + + self->recruitingEncryptKeyProxyID = req.reqId; + TraceEvent("CCEKP_Recruit", self->id).detail("Addr", worker.interf.address()).detail("Id", req.reqId); + + ErrorOr interf = wait(worker.interf.encryptKeyProxy.getReplyUnlessFailedFor( + req, SERVER_KNOBS->WAIT_FOR_ENCRYPT_KEY_PROXY_JOIN_DELAY, 0)); + if (interf.present()) { + self->recruitEncryptKeyProxy.set(false); + self->recruitingEncryptKeyProxyID = interf.get().id(); + const auto& encryptKeyProxy = self->db.serverInfo->get().encryptKeyProxy; + TraceEvent("CCEKP_Recruited", self->id) + .detail("Addr", worker.interf.address()) + .detail("Id", interf.get().id()) + .detail("ProcessId", interf.get().locality.processId()); + if (encryptKeyProxy.present() && encryptKeyProxy.get().id() != interf.get().id() && + self->id_worker.count(encryptKeyProxy.get().locality.processId())) { + TraceEvent("CCEKP_HaltAfterRecruit", self->id) + .detail("Id", encryptKeyProxy.get().id()) + .detail("DcId", printable(self->clusterControllerDcId)); + EncryptKeyProxySingleton(encryptKeyProxy).halt(self, encryptKeyProxy.get().locality.processId()); + } + if (!encryptKeyProxy.present() || encryptKeyProxy.get().id() != interf.get().id()) { + self->db.setEncryptKeyProxy(interf.get()); + TraceEvent("CCEKP_UpdateInf", self->id) + .detail("Id", self->db.serverInfo->get().encryptKeyProxy.get().id()); + } + return Void(); + } + } catch (Error& e) { + TraceEvent("CCEKP_RecruitError", self->id).error(e); + if (e.code() != error_code_no_more_servers) { + throw; + } + } + wait(lowPriorityDelay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY)); + } +} + +ACTOR Future monitorEncryptKeyProxy(ClusterControllerData* self) { + loop { + if (self->db.serverInfo->get().encryptKeyProxy.present() && !self->recruitEncryptKeyProxy.get()) { + choose { + when(wait(waitFailureClient(self->db.serverInfo->get().encryptKeyProxy.get().waitFailure, + SERVER_KNOBS->ENCRYPT_KEY_PROXY_FAILURE_TIME))) { + TraceEvent("CCEKP_Died", self->id); + self->db.clearInterf(ProcessClass::EncryptKeyProxyClass); + } + when(wait(self->recruitEncryptKeyProxy.onChange())) {} + } + } else { + wait(startEncryptKeyProxy(self)); + } + } +} + ACTOR Future startBlobManager(ClusterControllerData* self) { wait(delay(0.0)); // If master fails at the same time, give it a chance to clear master PID. @@ -2278,6 +2440,10 @@ ACTOR Future clusterControllerCore(ClusterControllerFullInterface interf, state uint64_t step = 0; state Future> error = errorOr(actorCollection(self.addActor.getFuture())); + // EncryptKeyProxy is necessary for TLog recovery, recruit it as the first process + if (SERVER_KNOBS->ENABLE_ENCRYPT_KEY_PROXY) { + self.addActor.send(monitorEncryptKeyProxy(&self)); + } self.addActor.send(clusterWatchDatabase(&self, &self.db, coordinators, leaderFail)); // Start the master database self.addActor.send(self.updateWorkerList.init(self.db.db)); self.addActor.send(statusServer(interf.clientInterface.databaseStatus.getFuture(), diff --git a/fdbserver/ClusterController.actor.h b/fdbserver/ClusterController.actor.h index 21159d2ded..4911d4b6ed 100644 --- a/fdbserver/ClusterController.actor.h +++ b/fdbserver/ClusterController.actor.h @@ -50,6 +50,7 @@ struct WorkerInfo : NonCopyable { Future haltRatekeeper; Future haltDistributor; Future haltBlobManager; + Future haltEncryptKeyProxy; Standalone> issues; WorkerInfo() @@ -71,7 +72,7 @@ struct WorkerInfo : NonCopyable { : watcher(std::move(r.watcher)), reply(std::move(r.reply)), gen(r.gen), reboots(r.reboots), initialClass(r.initialClass), priorityInfo(r.priorityInfo), details(std::move(r.details)), haltRatekeeper(r.haltRatekeeper), haltDistributor(r.haltDistributor), haltBlobManager(r.haltBlobManager), - issues(r.issues) {} + haltEncryptKeyProxy(r.haltEncryptKeyProxy), issues(r.issues) {} void operator=(WorkerInfo&& r) noexcept { watcher = std::move(r.watcher); reply = std::move(r.reply); @@ -83,6 +84,7 @@ struct WorkerInfo : NonCopyable { haltRatekeeper = r.haltRatekeeper; haltDistributor = r.haltDistributor; haltBlobManager = r.haltBlobManager; + haltEncryptKeyProxy = r.haltEncryptKeyProxy; issues = r.issues; } }; @@ -173,6 +175,14 @@ public: serverInfo->set(newInfo); } + void setEncryptKeyProxy(const EncryptKeyProxyInterface& interf) { + auto newInfo = serverInfo->get(); + newInfo.id = deterministicRandom()->randomUniqueID(); + newInfo.infoGeneration = ++dbInfoCount; + newInfo.encryptKeyProxy = interf; + serverInfo->set(newInfo); + } + void clearInterf(ProcessClass::ClassType t) { auto newInfo = serverInfo->get(); newInfo.id = deterministicRandom()->randomUniqueID(); @@ -183,6 +193,8 @@ public: newInfo.ratekeeper = Optional(); } else if (t == ProcessClass::BlobManagerClass) { newInfo.blobManager = Optional(); + } else if (t == ProcessClass::EncryptKeyProxyClass) { + newInfo.encryptKeyProxy = Optional(); } serverInfo->set(newInfo); } @@ -275,7 +287,9 @@ public: (db.serverInfo->get().ratekeeper.present() && db.serverInfo->get().ratekeeper.get().locality.processId() == processId) || (db.serverInfo->get().blobManager.present() && - db.serverInfo->get().blobManager.get().locality.processId() == processId); + db.serverInfo->get().blobManager.get().locality.processId() == processId) || + (db.serverInfo->get().encryptKeyProxy.present() && + db.serverInfo->get().encryptKeyProxy.get().locality.processId() == processId); } WorkerDetails getStorageWorker(RecruitStorageRequest const& req) { @@ -2845,7 +2859,7 @@ public: ASSERT(masterProcessId.present()); const auto& pid = worker.interf.locality.processId(); if ((role != ProcessClass::DataDistributor && role != ProcessClass::Ratekeeper && - role != ProcessClass::BlobManager) || + role != ProcessClass::BlobManager && role != ProcessClass::EncryptKeyProxy) || pid == masterProcessId.get()) { return false; } @@ -2865,6 +2879,7 @@ public: } } } + for (const CommitProxyInterface& interf : dbInfo.client.commitProxies) { ASSERT(interf.processId.present()); idUsed[interf.processId]++; @@ -3216,6 +3231,8 @@ public: Optional recruitingRatekeeperID; AsyncVar recruitBlobManager; Optional recruitingBlobManagerID; + AsyncVar recruitEncryptKeyProxy; + Optional recruitingEncryptKeyProxyID; // Stores the health information from a particular worker's perspective. struct WorkerHealth { @@ -3254,7 +3271,7 @@ public: ac(false), outstandingRequestChecker(Void()), outstandingRemoteRequestChecker(Void()), startTime(now()), goodRecruitmentTime(Never()), goodRemoteRecruitmentTime(Never()), datacenterVersionDifference(0), versionDifferenceUpdated(false), remoteDCMonitorStarted(false), remoteTransactionSystemDegraded(false), - recruitDistributor(false), recruitRatekeeper(false), recruitBlobManager(false), + recruitDistributor(false), recruitRatekeeper(false), recruitBlobManager(false), recruitEncryptKeyProxy(false), clusterControllerMetrics("ClusterController", id.toString()), openDatabaseRequests("OpenDatabaseRequests", clusterControllerMetrics), registerWorkerRequests("RegisterWorkerRequests", clusterControllerMetrics), diff --git a/fdbserver/EncryptKeyProxy.actor.cpp b/fdbserver/EncryptKeyProxy.actor.cpp new file mode 100644 index 0000000000..be602f0865 --- /dev/null +++ b/fdbserver/EncryptKeyProxy.actor.cpp @@ -0,0 +1,65 @@ +/* + * EncryptKeyProxy.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbserver/EncryptKeyProxyInterface.h" +#include "fdbserver/WorkerInterface.actor.h" +#include "fdbserver/ServerDBInfo.h" +#include "flow/Arena.h" +#include "flow/Error.h" +#include "flow/EventTypes.actor.h" +#include "flow/FastRef.h" +#include "flow/Trace.h" +#include "flow/genericactors.actor.h" +#include "flow/actorcompiler.h" // This must be the last #include. + +struct EncryptKeyProxyData : NonCopyable, ReferenceCounted { + UID myId; + PromiseStream> addActor; + + explicit EncryptKeyProxyData(UID id) : myId(id) {} +}; + +ACTOR Future encryptKeyProxyServer(EncryptKeyProxyInterface ekpInterface, Reference> db) { + state Reference self(new EncryptKeyProxyData(ekpInterface.id())); + state PromiseStream> addActor; + state Future collection = actorCollection(self->addActor.getFuture()); + self->addActor.send(traceRole(Role::ENCRYPT_KEY_PROXY, ekpInterface.id())); + + TraceEvent("EKP_Start", self->myId).log(); + + // TODO(ahusain): skeleton implementation, more to come + try { + loop choose { + when(HaltEncryptKeyProxyRequest req = waitNext(ekpInterface.haltEncryptKeyProxy.getFuture())) { + req.reply.send(Void()); + TraceEvent("EKP_Halted", ekpInterface.id()).detail("ReqID", req.requesterID); + break; + } + when(wait(collection)) { + ASSERT(false); + throw internal_error(); + } + } + } catch (Error& e) { + TraceEvent("EKP_Terminated", ekpInterface.id()).error(e, true); + } + + return Void(); +} \ No newline at end of file diff --git a/fdbserver/EncryptKeyProxyInterface.h b/fdbserver/EncryptKeyProxyInterface.h new file mode 100644 index 0000000000..0123bc4edd --- /dev/null +++ b/fdbserver/EncryptKeyProxyInterface.h @@ -0,0 +1,66 @@ +/* + * EncryptKeyProxyInterface.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FDBSERVER_ENCRYPTKEYPROXYINTERFACE_H +#define FDBSERVER_ENCRYPTKEYPROXYINTERFACE_H +#include "flow/network.h" +#pragma once + +#include "fdbclient/FDBTypes.h" +#include "fdbrpc/fdbrpc.h" +#include "fdbrpc/Locality.h" + +struct EncryptKeyProxyInterface { + constexpr static FileIdentifier file_identifier = 1303419; + struct LocalityData locality; + RequestStream> waitFailure; + RequestStream haltEncryptKeyProxy; + UID myId; + + EncryptKeyProxyInterface() {} + explicit EncryptKeyProxyInterface(const struct LocalityData& loc, UID id) : locality(loc), myId(id) {} + + void initEndpoints() {} + UID id() const { return myId; } + NetworkAddress address() const { return waitFailure.getEndpoint().getPrimaryAddress(); } + bool operator==(const EncryptKeyProxyInterface& toCompare) const { return myId == toCompare.myId; } + bool operator!=(const EncryptKeyProxyInterface& toCompare) const { return !(*this == toCompare); } + + template + void serialize(Archive& ar) { + serializer(ar, waitFailure, locality, myId); + } +}; + +struct HaltEncryptKeyProxyRequest { + constexpr static FileIdentifier file_identifier = 2378138; + UID requesterID; + ReplyPromise reply; + + HaltEncryptKeyProxyRequest() {} + explicit HaltEncryptKeyProxyRequest(UID uid) : requesterID(uid) {} + + template + void serialize(Ar& ar) { + serializer(ar, requesterID, reply); + } +}; + +#endif \ No newline at end of file diff --git a/fdbserver/ServerDBInfo.actor.h b/fdbserver/ServerDBInfo.actor.h index ea02401fde..df9fcca967 100644 --- a/fdbserver/ServerDBInfo.actor.h +++ b/fdbserver/ServerDBInfo.actor.h @@ -49,6 +49,7 @@ struct ServerDBInfo { MasterInterface master; // The best guess as to the most recent master, which might still be recovering Optional ratekeeper; Optional blobManager; + Optional encryptKeyProxy; std::vector resolvers; DBRecoveryCount recoveryCount; // A recovery count from DBCoreState. A successful cluster recovery increments it twice; @@ -82,6 +83,7 @@ struct ServerDBInfo { master, ratekeeper, blobManager, + encryptKeyProxy, resolvers, recoveryCount, recoveryState, diff --git a/fdbserver/Status.actor.cpp b/fdbserver/Status.actor.cpp index a6045ea8f0..71c4d83c87 100644 --- a/fdbserver/Status.actor.cpp +++ b/fdbserver/Status.actor.cpp @@ -803,6 +803,10 @@ ACTOR static Future processStatusFetcher( roles.addRole("blob_manager", db->get().blobManager.get()); } + if (SERVER_KNOBS->ENABLE_ENCRYPT_KEY_PROXY && db->get().encryptKeyProxy.present()) { + roles.addRole("encrypt_key_proxy", db->get().encryptKeyProxy.get()); + } + for (auto& tLogSet : db->get().logSystemConfig.tLogs) { for (auto& it : tLogSet.logRouters) { if (it.present()) { diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index bcaa8f950b..371e2e3d6e 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -27,6 +27,7 @@ #include "fdbserver/BackupInterface.h" #include "fdbserver/DataDistributorInterface.h" +#include "fdbserver/EncryptKeyProxyInterface.h" #include "fdbserver/MasterInterface.h" #include "fdbserver/TLogInterface.h" #include "fdbserver/RatekeeperInterface.h" @@ -60,6 +61,7 @@ struct WorkerInterface { RequestStream storage; RequestStream logRouter; RequestStream backup; + RequestStream encryptKeyProxy; RequestStream debugPing; RequestStream coordinationPing; @@ -125,6 +127,7 @@ struct WorkerInterface { execReq, workerSnapReq, backup, + encryptKeyProxy, updateServerDBInfo, configBroadcastInterface); } @@ -425,6 +428,7 @@ struct RegisterWorkerRequest { Optional distributorInterf; Optional ratekeeperInterf; Optional blobManagerInterf; + Optional encryptKeyProxyInterf; Standalone> issues; std::vector incompatiblePeers; ReplyPromise reply; @@ -443,13 +447,14 @@ struct RegisterWorkerRequest { Optional ddInterf, Optional rkInterf, Optional bmInterf, + Optional ekpInterf, bool degraded, Version lastSeenKnobVersion, ConfigClassSet knobConfigClassSet) : wi(wi), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo), generation(generation), distributorInterf(ddInterf), ratekeeperInterf(rkInterf), blobManagerInterf(bmInterf), - degraded(degraded), lastSeenKnobVersion(lastSeenKnobVersion), knobConfigClassSet(knobConfigClassSet), - requestDbInfo(false) {} + encryptKeyProxyInterf(ekpInterf), degraded(degraded), lastSeenKnobVersion(lastSeenKnobVersion), + knobConfigClassSet(knobConfigClassSet), requestDbInfo(false) {} template void serialize(Ar& ar) { @@ -462,6 +467,7 @@ struct RegisterWorkerRequest { distributorInterf, ratekeeperInterf, blobManagerInterf, + encryptKeyProxyInterf, issues, incompatiblePeers, reply, @@ -791,6 +797,21 @@ struct InitializeBlobWorkerRequest { } }; +struct InitializeEncryptKeyProxyRequest { + constexpr static FileIdentifier file_identifier = 4180191; + UID reqId; + UID interfaceId; + ReplyPromise reply; + + InitializeEncryptKeyProxyRequest() {} + explicit InitializeEncryptKeyProxyRequest(UID uid) : reqId(uid) {} + + template + void serialize(Ar& ar) { + serializer(ar, reqId, interfaceId, reply); + } +}; + struct TraceBatchDumpRequest { constexpr static FileIdentifier file_identifier = 8184121; ReplyPromise reply; @@ -956,6 +977,7 @@ struct Role { static const Role STORAGE_CACHE; static const Role COORDINATOR; static const Role BACKUP; + static const Role ENCRYPT_KEY_PROXY; std::string roleName; std::string abbreviation; @@ -991,6 +1013,8 @@ struct Role { return STORAGE_CACHE; case ProcessClass::Backup: return BACKUP; + case ProcessClass::EncryptKeyProxy: + return ENCRYPT_KEY_PROXY; case ProcessClass::Worker: return WORKER; case ProcessClass::NoRole: @@ -1052,6 +1076,7 @@ ACTOR Future clusterController(Reference ccr, ACTOR Future blobWorker(BlobWorkerInterface bwi, ReplyPromise blobWorkerReady, Reference const> dbInfo); +ACTOR Future encryptKeyProxyServer(EncryptKeyProxyInterface ei, Reference> db); // These servers are started by workerServer class IKeyValueStore; diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index cb506604a3..d5b71a9c30 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -34,6 +34,7 @@ #include "fdbclient/NativeAPI.actor.h" #include "fdbserver/MetricLogger.actor.h" #include "fdbserver/BackupInterface.h" +#include "fdbserver/EncryptKeyProxyInterface.h" #include "fdbserver/RoleLineage.actor.h" #include "fdbserver/WorkerInterface.actor.h" #include "fdbserver/IKeyValueStore.h" @@ -519,6 +520,7 @@ ACTOR Future registrationClient(Reference> const> ddInterf, Reference> const> rkInterf, Reference> const> bmInterf, + Reference> const> ekpInterf, Reference const> degraded, Reference connRecord, Reference> const> issues, @@ -554,6 +556,7 @@ ACTOR Future registrationClient(Referenceget(), rkInterf->get(), bmInterf->get(), + ekpInterf->get(), degraded->get(), localConfig->lastSeenVersion(), localConfig->configClassSet()); @@ -616,6 +619,7 @@ ACTOR Future registrationClient(ReferenceonChange())) { break; } when(wait(rkInterf->onChange())) { break; } when(wait(bmInterf->onChange())) { break; } + when(wait(ekpInterf->onChange())) { break; } when(wait(degraded->onChange())) { break; } when(wait(FlowTransport::transport().onIncompatibleChanged())) { break; } when(wait(issues->onChange())) { break; } @@ -643,6 +647,10 @@ bool addressInDbAndPrimaryDc(const NetworkAddress& address, Reference workerServer(Reference connRecord, new AsyncVar>()); state Reference>> rkInterf(new AsyncVar>()); state Reference>> bmInterf(new AsyncVar>()); + state Reference>> ekpInterf( + new AsyncVar>()); state Future handleErrors = workerHandleErrors(errors.getFuture()); // Needs to be stopped last state ActorCollection errorForwarders(false); state Future loggingTrigger = Void(); @@ -1646,6 +1656,7 @@ ACTOR Future workerServer(Reference connRecord, ddInterf, rkInterf, bmInterf, + ekpInterf, degraded, connRecord, issues, @@ -1681,13 +1692,17 @@ ACTOR Future workerServer(Reference connRecord, dbInfo->get().clusterInterface != ccInterface->get().get()) { TraceEvent("GotServerDBInfoChange") .detail("ChangeID", localInfo.id) + .detail("InfoGeneration", localInfo.infoGeneration) .detail("MasterID", localInfo.master.id()) .detail("RatekeeperID", localInfo.ratekeeper.present() ? localInfo.ratekeeper.get().id() : UID()) .detail("DataDistributorID", localInfo.distributor.present() ? localInfo.distributor.get().id() : UID()) .detail("BlobManagerID", - localInfo.blobManager.present() ? localInfo.blobManager.get().id() : UID()); + localInfo.blobManager.present() ? localInfo.blobManager.get().id() : UID()) + .detail("EncryptKeyProxyID", + localInfo.encryptKeyProxy.present() ? localInfo.encryptKeyProxy.get().id() : UID()); + dbInfo->set(localInfo); } errorForwarders.add( @@ -1882,6 +1897,30 @@ ACTOR Future workerServer(Reference connRecord, forwardPromise(req.reply, backupWorkerCache.get(req.reqId)); } } + when(InitializeEncryptKeyProxyRequest req = waitNext(interf.encryptKeyProxy.getFuture())) { + LocalLineage _; + getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::EncryptKeyProxy; + EncryptKeyProxyInterface recruited(locality, req.reqId); + recruited.initEndpoints(); + + if (ekpInterf->get().present()) { + recruited = ekpInterf->get().get(); + TEST(true); // Recruited while already a encryptKeyProxy server. + } else { + startRole(Role::ENCRYPT_KEY_PROXY, recruited.id(), interf.id()); + DUMPTOKEN(recruited.waitFailure); + + Future encryptKeyProxyProcess = encryptKeyProxyServer(recruited, dbInfo); + errorForwarders.add(forwardError( + errors, + Role::ENCRYPT_KEY_PROXY, + recruited.id(), + setWhenDoneOrError(encryptKeyProxyProcess, ekpInterf, Optional()))); + ekpInterf->set(Optional(recruited)); + } + TraceEvent("EncryptKeyProxyReceived", req.reqId).detail("EncryptKeyProxyId", recruited.id()); + req.reply.send(recruited); + } when(InitializeTLogRequest req = waitNext(interf.tLog.getFuture())) { // For now, there's a one-to-one mapping of spill type to TLogVersion. // With future work, a particular version of the TLog can support multiple @@ -2705,3 +2744,4 @@ const Role Role::BLOB_WORKER("BlobWorker", "BW"); const Role Role::STORAGE_CACHE("StorageCache", "SC"); const Role Role::COORDINATOR("Coordinator", "CD"); const Role Role::BACKUP("Backup", "BK"); +const Role Role::ENCRYPT_KEY_PROXY("EncryptKeyProxy", "EP"); diff --git a/fdbserver/workloads/ConsistencyCheck.actor.cpp b/fdbserver/workloads/ConsistencyCheck.actor.cpp index a9961f5582..2ec76d4d62 100644 --- a/fdbserver/workloads/ConsistencyCheck.actor.cpp +++ b/fdbserver/workloads/ConsistencyCheck.actor.cpp @@ -2369,6 +2369,21 @@ struct ConsistencyCheckWorkload : TestWorkload { return false; } + // Check EncryptKeyProxy + if (SERVER_KNOBS->ENABLE_ENCRYPT_KEY_PROXY && db.encryptKeyProxy.present() && + (!nonExcludedWorkerProcessMap.count(db.encryptKeyProxy.get().address()) || + nonExcludedWorkerProcessMap[db.encryptKeyProxy.get().address()].processClass.machineClassFitness( + ProcessClass::EncryptKeyProxy) > fitnessLowerBound)) { + TraceEvent("ConsistencyCheck_EncyrptKeyProxyNotBest") + .detail("BestEncryptKeyProxyFitness", fitnessLowerBound) + .detail("ExistingEncyrptKeyProxyFitness", + nonExcludedWorkerProcessMap.count(db.encryptKeyProxy.get().address()) + ? nonExcludedWorkerProcessMap[db.encryptKeyProxy.get().address()] + .processClass.machineClassFitness(ProcessClass::EncryptKeyProxy) + : -1); + return false; + } + // TODO: Check Tlog return true;