From cb568bbd55f15ef842eb3c43e7674b4b47bf641f Mon Sep 17 00:00:00 2001 From: Suraj Gupta Date: Wed, 8 Dec 2021 00:43:58 -0500 Subject: [PATCH] Add watch on config key. --- fdbcli/fdbcli.actor.cpp | 1 + fdbclient/ManagementAPI.actor.cpp | 10 +++++++ fdbserver/BlobManager.actor.cpp | 22 ++++++++++++++ fdbserver/BlobManagerInterface.h | 21 +++++++++++-- fdbserver/BlobWorker.actor.cpp | 2 ++ fdbserver/ClusterController.actor.cpp | 43 ++++++++++++++++++++++++--- 6 files changed, 93 insertions(+), 6 deletions(-) diff --git a/fdbcli/fdbcli.actor.cpp b/fdbcli/fdbcli.actor.cpp index 0bd6a0bdb1..8ce87824fe 100644 --- a/fdbcli/fdbcli.actor.cpp +++ b/fdbcli/fdbcli.actor.cpp @@ -1180,6 +1180,7 @@ void configureGenerator(const char* text, const char* line, std::vector blobWorkerRecruiter( } } +ACTOR Future haltBlobGranules(BlobManagerData* bmData) { + std::vector blobWorkers = wait(getBlobWorkers(bmData->db)); + std::vector> deregisterBlobWorkers; + for (auto& worker : blobWorkers) { + printf("BM: sending halt to BW %s\n", worker.myId.toString().c_str()); + // TODO: send a special req to blob workers so they clean up granules/CFs + bmData->addActor.send( + brokenPromiseToNever(worker.haltBlobWorker.getReply(HaltBlobWorkerRequest(bmData->epoch, bmData->id)))); + deregisterBlobWorkers.emplace_back(deregisterBlobWorker(bmData, worker)); + } + waitForAll(deregisterBlobWorkers); + + return Void(); +} + ACTOR Future blobManager(BlobManagerInterface bmInterf, Reference const> dbInfo, int64_t epoch) { @@ -1420,6 +1435,13 @@ ACTOR Future blobManager(BlobManagerInterface bmInterf, TraceEvent("BlobManagerHalted", bmInterf.id()).detail("ReqID", req.requesterID); break; } + when(state HaltBlobGranulesRequest req = waitNext(bmInterf.haltBlobGranules.getFuture())) { + printf("BM: got haltBlobGranules\n"); + wait(haltBlobGranules(&self)); + req.reply.send(Void()); + TraceEvent("BlobGranulesHalted", bmInterf.id()).detail("ReqID", req.requesterID); + break; + } when(wait(collection)) { TraceEvent("BlobManagerActorCollectionError"); ASSERT(false); diff --git a/fdbserver/BlobManagerInterface.h b/fdbserver/BlobManagerInterface.h index 4f6efcd8c5..1de8174b7b 100644 --- a/fdbserver/BlobManagerInterface.h +++ b/fdbserver/BlobManagerInterface.h @@ -30,6 +30,7 @@ struct BlobManagerInterface { constexpr static FileIdentifier file_identifier = 369169; RequestStream> waitFailure; RequestStream haltBlobManager; + RequestStream haltBlobGranules; struct LocalityData locality; UID myId; @@ -44,7 +45,7 @@ struct BlobManagerInterface { template void serialize(Archive& ar) { - serializer(ar, waitFailure, haltBlobManager, locality, myId); + serializer(ar, waitFailure, haltBlobManager, haltBlobGranules, locality, myId); } }; @@ -52,9 +53,25 @@ struct HaltBlobManagerRequest { constexpr static FileIdentifier file_identifier = 4149140; UID requesterID; ReplyPromise reply; + bool haltBlobGranules; HaltBlobManagerRequest() {} - explicit HaltBlobManagerRequest(UID uid) : requesterID(uid) {} + explicit HaltBlobManagerRequest(UID uid, bool haltBlobGranules = false) + : requesterID(uid), haltBlobGranules(haltBlobGranules) {} + + template + void serialize(Ar& ar) { + serializer(ar, requesterID, reply, haltBlobGranules); + } +}; + +struct HaltBlobGranulesRequest { + constexpr static FileIdentifier file_identifier = 904267; + UID requesterID; + ReplyPromise reply; + + HaltBlobGranulesRequest() {} + explicit HaltBlobGranulesRequest(UID uid) : requesterID(uid) {} template void serialize(Ar& ar) { diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index d24993db11..d9519b983d 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -2890,6 +2890,8 @@ ACTOR Future blobWorker(BlobWorkerInterface bwInterf, self->addActor.send(handleRangeAssign(self, granuleToReassign, true)); } when(HaltBlobWorkerRequest req = waitNext(bwInterf.haltBlobWorker.getFuture())) { + printf("BW %s got halt request\n", self->id.toString().c_str()); + req.reply.send(Void()); if (self->managerEpochOk(req.managerEpoch)) { TraceEvent("BlobWorkerHalted", self->id) diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index d684ba824d..642f08e2f9 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -24,6 +24,7 @@ #include #include +#include "fdbclient/SystemData.h" #include "fdbrpc/FailureMonitor.h" #include "flow/ActorCollection.h" #include "flow/SystemMonitor.h" @@ -3352,6 +3353,13 @@ struct BlobManagerSingleton : Singleton { cc->lastRecruitTime = now(); cc->recruitBlobManager.set(true); } + void haltBlobGranules(ClusterControllerData* cc, Optional> pid) const { + printf("CC: about to send haltBlobGranules\n"); + if (interface.present()) { + cc->id_worker[pid].haltBlobManager = + brokenPromiseToNever(interface.get().haltBlobGranules.getReply(HaltBlobGranulesRequest(cc->id))); + } + } }; ACTOR Future clusterWatchDatabase(ClusterControllerData* cluster, ClusterControllerData::DBInfo* db) { @@ -5349,12 +5357,31 @@ ACTOR Future startBlobManager(ClusterControllerData* self) { } } +ACTOR Future watchBlobGranulesConfigKey(ClusterControllerData* self) { + state Reference tr = makeReference(self->cx); + + loop { + try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + Key blobGranuleConfigKey = configKeysPrefix.withSuffix(StringRef("blob_granules_enabled")); + state Future watch = tr->watch(blobGranuleConfigKey); + wait(tr->commit()); + wait(watch); + return Void(); + } catch (Error& e) { + wait(tr->onError(e)); + } + } +} + ACTOR Future monitorBlobManager(ClusterControllerData* self) { while (self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) { wait(self->db.serverInfo->onChange()); } loop { + state Future watchConfigChange = watchBlobGranulesConfigKey(self); if (self->db.serverInfo->get().blobManager.present() && !self->recruitBlobManager.get()) { choose { when(wait(waitFailureClient(self->db.serverInfo->get().blobManager.get().waitFailure, @@ -5364,9 +5391,19 @@ ACTOR Future monitorBlobManager(ClusterControllerData* self) { self->db.clearInterf(ProcessClass::BlobManagerClass); } when(wait(self->recruitBlobManager.onChange())) {} + when(wait(watchConfigChange)) { + if (!self->db.config.blobGranulesEnabled) { + const auto& blobManager = self->db.serverInfo->get().blobManager; + BlobManagerSingleton(blobManager) + .haltBlobGranules(self, blobManager.get().locality.processId()); + } + } } } else { - wait(startBlobManager(self)); + wait(watchConfigChange); + if (self->db.config.blobGranulesEnabled) { + wait(startBlobManager(self)); + } } } } @@ -5518,9 +5555,7 @@ ACTOR Future clusterControllerCore(ClusterControllerFullInterface interf, self.addActor.send(handleForcedRecoveries(&self, interf)); self.addActor.send(monitorDataDistributor(&self)); self.addActor.send(monitorRatekeeper(&self)); - if (self.db.config.blobGranulesEnabled) { - self.addActor.send(monitorBlobManager(&self)); - } + self.addActor.send(monitorBlobManager(&self)); // self.addActor.send(monitorTSSMapping(&self)); self.addActor.send(dbInfoUpdater(&self)); self.addActor.send(traceCounters("ClusterControllerMetrics",