Add watch on config key.

This commit is contained in:
Suraj Gupta 2021-12-08 00:43:58 -05:00 committed by Josh Slocum
parent 9fbccb21d2
commit cb568bbd55
6 changed files with 93 additions and 6 deletions

View File

@ -1180,6 +1180,7 @@ void configureGenerator(const char* text, const char* line, std::vector<std::str
"perpetual_storage_wiggle=",
"perpetual_storage_wiggle_locality=",
"storage_migration_type=",
"blob_granules_enabled=",
nullptr };
arrayGenerator(text, line, opts, lc);
}

View File

@ -174,6 +174,16 @@ std::map<std::string, std::string> configForToken(std::string const& mode) {
}
out[p + key] = format("%d", type);
}
if (key == "blob_granules_enabled") {
int enabled = std::stoi(value);
if (enabled != 0 && enabled != 1) {
printf("Error: Only 0 or 1 are valid values for blob_granules_enabled. "
"1 enables blob granules and 0 disables them.\n");
return out;
}
out[p + key] = value;
}
return out;
}

View File

@ -1357,6 +1357,21 @@ ACTOR Future<Void> blobWorkerRecruiter(
}
}
ACTOR Future<Void> haltBlobGranules(BlobManagerData* bmData) {
std::vector<BlobWorkerInterface> blobWorkers = wait(getBlobWorkers(bmData->db));
std::vector<Future<Void>> deregisterBlobWorkers;
for (auto& worker : blobWorkers) {
printf("BM: sending halt to BW %s\n", worker.myId.toString().c_str());
// TODO: send a special req to blob workers so they clean up granules/CFs
bmData->addActor.send(
brokenPromiseToNever(worker.haltBlobWorker.getReply(HaltBlobWorkerRequest(bmData->epoch, bmData->id))));
deregisterBlobWorkers.emplace_back(deregisterBlobWorker(bmData, worker));
}
waitForAll(deregisterBlobWorkers);
return Void();
}
ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
int64_t epoch) {
@ -1420,6 +1435,13 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
TraceEvent("BlobManagerHalted", bmInterf.id()).detail("ReqID", req.requesterID);
break;
}
when(state HaltBlobGranulesRequest req = waitNext(bmInterf.haltBlobGranules.getFuture())) {
printf("BM: got haltBlobGranules\n");
wait(haltBlobGranules(&self));
req.reply.send(Void());
TraceEvent("BlobGranulesHalted", bmInterf.id()).detail("ReqID", req.requesterID);
break;
}
when(wait(collection)) {
TraceEvent("BlobManagerActorCollectionError");
ASSERT(false);

View File

@ -30,6 +30,7 @@ struct BlobManagerInterface {
constexpr static FileIdentifier file_identifier = 369169;
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream<struct HaltBlobManagerRequest> haltBlobManager;
RequestStream<struct HaltBlobGranulesRequest> haltBlobGranules;
struct LocalityData locality;
UID myId;
@ -44,7 +45,7 @@ struct BlobManagerInterface {
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, waitFailure, haltBlobManager, locality, myId);
serializer(ar, waitFailure, haltBlobManager, haltBlobGranules, locality, myId);
}
};
@ -52,9 +53,25 @@ struct HaltBlobManagerRequest {
constexpr static FileIdentifier file_identifier = 4149140;
UID requesterID;
ReplyPromise<Void> reply;
bool haltBlobGranules;
HaltBlobManagerRequest() {}
explicit HaltBlobManagerRequest(UID uid) : requesterID(uid) {}
explicit HaltBlobManagerRequest(UID uid, bool haltBlobGranules = false)
: requesterID(uid), haltBlobGranules(haltBlobGranules) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, requesterID, reply, haltBlobGranules);
}
};
struct HaltBlobGranulesRequest {
constexpr static FileIdentifier file_identifier = 904267;
UID requesterID;
ReplyPromise<Void> reply;
HaltBlobGranulesRequest() {}
explicit HaltBlobGranulesRequest(UID uid) : requesterID(uid) {}
template <class Ar>
void serialize(Ar& ar) {

View File

@ -2890,6 +2890,8 @@ ACTOR Future<Void> blobWorker(BlobWorkerInterface bwInterf,
self->addActor.send(handleRangeAssign(self, granuleToReassign, true));
}
when(HaltBlobWorkerRequest req = waitNext(bwInterf.haltBlobWorker.getFuture())) {
printf("BW %s got halt request\n", self->id.toString().c_str());
req.reply.send(Void());
if (self->managerEpochOk(req.managerEpoch)) {
TraceEvent("BlobWorkerHalted", self->id)

View File

@ -24,6 +24,7 @@
#include <set>
#include <vector>
#include "fdbclient/SystemData.h"
#include "fdbrpc/FailureMonitor.h"
#include "flow/ActorCollection.h"
#include "flow/SystemMonitor.h"
@ -3352,6 +3353,13 @@ struct BlobManagerSingleton : Singleton<BlobManagerInterface> {
cc->lastRecruitTime = now();
cc->recruitBlobManager.set(true);
}
void haltBlobGranules(ClusterControllerData* cc, Optional<Standalone<StringRef>> pid) const {
printf("CC: about to send haltBlobGranules\n");
if (interface.present()) {
cc->id_worker[pid].haltBlobManager =
brokenPromiseToNever(interface.get().haltBlobGranules.getReply(HaltBlobGranulesRequest(cc->id)));
}
}
};
ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster, ClusterControllerData::DBInfo* db) {
@ -5349,12 +5357,31 @@ ACTOR Future<Void> startBlobManager(ClusterControllerData* self) {
}
}
ACTOR Future<Void> watchBlobGranulesConfigKey(ClusterControllerData* self) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->cx);
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Key blobGranuleConfigKey = configKeysPrefix.withSuffix(StringRef("blob_granules_enabled"));
state Future<Void> watch = tr->watch(blobGranuleConfigKey);
wait(tr->commit());
wait(watch);
return Void();
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
ACTOR Future<Void> monitorBlobManager(ClusterControllerData* self) {
while (self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
wait(self->db.serverInfo->onChange());
}
loop {
state Future<Void> watchConfigChange = watchBlobGranulesConfigKey(self);
if (self->db.serverInfo->get().blobManager.present() && !self->recruitBlobManager.get()) {
choose {
when(wait(waitFailureClient(self->db.serverInfo->get().blobManager.get().waitFailure,
@ -5364,9 +5391,19 @@ ACTOR Future<Void> monitorBlobManager(ClusterControllerData* self) {
self->db.clearInterf(ProcessClass::BlobManagerClass);
}
when(wait(self->recruitBlobManager.onChange())) {}
when(wait(watchConfigChange)) {
if (!self->db.config.blobGranulesEnabled) {
const auto& blobManager = self->db.serverInfo->get().blobManager;
BlobManagerSingleton(blobManager)
.haltBlobGranules(self, blobManager.get().locality.processId());
}
}
}
} else {
wait(startBlobManager(self));
wait(watchConfigChange);
if (self->db.config.blobGranulesEnabled) {
wait(startBlobManager(self));
}
}
}
}
@ -5518,9 +5555,7 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
self.addActor.send(handleForcedRecoveries(&self, interf));
self.addActor.send(monitorDataDistributor(&self));
self.addActor.send(monitorRatekeeper(&self));
if (self.db.config.blobGranulesEnabled) {
self.addActor.send(monitorBlobManager(&self));
}
self.addActor.send(monitorBlobManager(&self));
// self.addActor.send(monitorTSSMapping(&self));
self.addActor.send(dbInfoUpdater(&self));
self.addActor.send(traceCounters("ClusterControllerMetrics",