Move client knob to database config for blob granules.

This commit is contained in:
Suraj Gupta 2021-12-06 15:10:20 -05:00 committed by Josh Slocum
parent d31cb07647
commit fc3376fe8f
11 changed files with 52 additions and 28 deletions

View File

@ -264,7 +264,8 @@ CommandFactory configureFactory(
"<single|double|triple|three_data_hall|three_datacenter|ssd|memory|memory-radixtree-beta|proxies=<PROXIES>|" "<single|double|triple|three_data_hall|three_datacenter|ssd|memory|memory-radixtree-beta|proxies=<PROXIES>|"
"commit_proxies=<COMMIT_PROXIES>|grv_proxies=<GRV_PROXIES>|logs=<LOGS>|resolvers=<RESOLVERS>>*|" "commit_proxies=<COMMIT_PROXIES>|grv_proxies=<GRV_PROXIES>|logs=<LOGS>|resolvers=<RESOLVERS>>*|"
"count=<TSS_COUNT>|perpetual_storage_wiggle=<WIGGLE_SPEED>|perpetual_storage_wiggle_locality=" "count=<TSS_COUNT>|perpetual_storage_wiggle=<WIGGLE_SPEED>|perpetual_storage_wiggle_locality="
"<<LOCALITY_KEY>:<LOCALITY_VALUE>|0>|storage_migration_type={disabled|gradual|aggressive}", "<<LOCALITY_KEY>:<LOCALITY_VALUE>|0>|storage_migration_type={disabled|gradual|aggressive}|"
"blob_granules_enabled={0|1}",
"change the database configuration", "change the database configuration",
"The `new' option, if present, initializes a new database with the given configuration rather than changing " "The `new' option, if present, initializes a new database with the given configuration rather than changing "
"the configuration of an existing one. When used, both a redundancy mode and a storage engine must be " "the configuration of an existing one. When used, both a redundancy mode and a storage engine must be "

View File

@ -259,9 +259,6 @@ void ClientKnobs::initialize(Randomize randomize) {
init( MVC_CLIENTLIB_CHUNK_SIZE, 8*1024 ); init( MVC_CLIENTLIB_CHUNK_SIZE, 8*1024 );
init( MVC_CLIENTLIB_CHUNKS_PER_TRANSACTION, 32 ); init( MVC_CLIENTLIB_CHUNKS_PER_TRANSACTION, 32 );
// blob granules
init( ENABLE_BLOB_GRANULES, true );
// clang-format on // clang-format on
} }

View File

@ -250,9 +250,6 @@ public:
int MVC_CLIENTLIB_CHUNK_SIZE; int MVC_CLIENTLIB_CHUNK_SIZE;
int MVC_CLIENTLIB_CHUNKS_PER_TRANSACTION; int MVC_CLIENTLIB_CHUNKS_PER_TRANSACTION;
// blob granules
bool ENABLE_BLOB_GRANULES;
ClientKnobs(Randomize randomize); ClientKnobs(Randomize randomize);
void initialize(Randomize randomize); void initialize(Randomize randomize);
}; };

View File

@ -50,6 +50,7 @@ void DatabaseConfiguration::resetInternal() {
perpetualStorageWiggleSpeed = 0; perpetualStorageWiggleSpeed = 0;
perpetualStorageWiggleLocality = "0"; perpetualStorageWiggleLocality = "0";
storageMigrationType = StorageMigrationType::DEFAULT; storageMigrationType = StorageMigrationType::DEFAULT;
blobGranulesEnabled = true;
} }
int toInt(ValueRef const& v) { int toInt(ValueRef const& v) {
@ -402,6 +403,7 @@ StatusObject DatabaseConfiguration::toJSON(bool noPolicies) const {
result["perpetual_storage_wiggle"] = perpetualStorageWiggleSpeed; result["perpetual_storage_wiggle"] = perpetualStorageWiggleSpeed;
result["perpetual_storage_wiggle_locality"] = perpetualStorageWiggleLocality; result["perpetual_storage_wiggle_locality"] = perpetualStorageWiggleLocality;
result["storage_migration_type"] = storageMigrationType.toString(); result["storage_migration_type"] = storageMigrationType.toString();
result["blob_granules_enabled"] = (int32_t)blobGranulesEnabled;
return result; return result;
} }
@ -627,6 +629,24 @@ bool DatabaseConfiguration::setInternal(KeyRef key, ValueRef value) {
storageMigrationType = (StorageMigrationType::MigrationType)type; storageMigrationType = (StorageMigrationType::MigrationType)type;
} else if (ck == LiteralStringRef("proxies")) { } else if (ck == LiteralStringRef("proxies")) {
overwriteProxiesCount(); overwriteProxiesCount();
int proxiesCount;
parse(&proxiesCount, value);
if (proxiesCount > 1) {
int derivedGrvProxyCount =
std::max(1,
std::min(CLIENT_KNOBS->DEFAULT_MAX_GRV_PROXIES,
proxiesCount / (CLIENT_KNOBS->DEFAULT_COMMIT_GRV_PROXIES_RATIO + 1)));
int derivedCommitProxyCount = proxiesCount - derivedGrvProxyCount;
if (grvProxyCount == -1) {
grvProxyCount = derivedGrvProxyCount;
}
if (commitProxyCount == -1) {
commitProxyCount = derivedCommitProxyCount;
}
}
} else if (ck == LiteralStringRef("blob_granules_enabled")) {
parse((&type), value);
blobGranulesEnabled = (type != 0);
} else { } else {
return false; return false;
} }

View File

@ -250,6 +250,9 @@ struct DatabaseConfiguration {
// Storage Migration Type // Storage Migration Type
StorageMigrationType storageMigrationType; StorageMigrationType storageMigrationType;
// Blob Granules
bool blobGranulesEnabled;
// Excluded servers (no state should be here) // Excluded servers (no state should be here)
bool isExcludedServer(NetworkAddressList) const; bool isExcludedServer(NetworkAddressList) const;
bool isExcludedLocality(const LocalityData& locality) const; bool isExcludedLocality(const LocalityData& locality) const;

View File

@ -770,7 +770,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"disabled", "disabled",
"aggressive", "aggressive",
"gradual" "gradual"
]} ]},
"blob_granules_enabled":1,
}, },
"data":{ "data":{
"least_operating_space_bytes_log_server":0, "least_operating_space_bytes_log_server":0,

View File

@ -3720,7 +3720,7 @@ void checkBetterSingletons(ClusterControllerData* self) {
WorkerDetails newDDWorker = findNewProcessForSingleton(self, ProcessClass::DataDistributor, id_used); WorkerDetails newDDWorker = findNewProcessForSingleton(self, ProcessClass::DataDistributor, id_used);
WorkerDetails newBMWorker; WorkerDetails newBMWorker;
if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES) { if (self->db.config.blobGranulesEnabled) {
newBMWorker = findNewProcessForSingleton(self, ProcessClass::BlobManager, id_used); newBMWorker = findNewProcessForSingleton(self, ProcessClass::BlobManager, id_used);
} }
@ -3729,7 +3729,7 @@ void checkBetterSingletons(ClusterControllerData* self) {
auto bestFitnessForDD = findBestFitnessForSingleton(self, newDDWorker, ProcessClass::DataDistributor); auto bestFitnessForDD = findBestFitnessForSingleton(self, newDDWorker, ProcessClass::DataDistributor);
ProcessClass::Fitness bestFitnessForBM; ProcessClass::Fitness bestFitnessForBM;
if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES) { if (self->db.config.blobGranulesEnabled) {
bestFitnessForBM = findBestFitnessForSingleton(self, newBMWorker, ProcessClass::BlobManager); bestFitnessForBM = findBestFitnessForSingleton(self, newBMWorker, ProcessClass::BlobManager);
} }
@ -3747,7 +3747,7 @@ void checkBetterSingletons(ClusterControllerData* self) {
self, newDDWorker, ddSingleton, bestFitnessForDD, self->recruitingDistributorID); self, newDDWorker, ddSingleton, bestFitnessForDD, self->recruitingDistributorID);
bool bmHealthy = true; bool bmHealthy = true;
if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES) { if (self->db.config.blobGranulesEnabled) {
bmHealthy = isHealthySingleton<BlobManagerInterface>( bmHealthy = isHealthySingleton<BlobManagerInterface>(
self, newBMWorker, bmSingleton, bestFitnessForBM, self->recruitingBlobManagerID); self, newBMWorker, bmSingleton, bestFitnessForBM, self->recruitingBlobManagerID);
} }
@ -3766,14 +3766,15 @@ void checkBetterSingletons(ClusterControllerData* self) {
Optional<Standalone<StringRef>> newDDProcessId = newDDWorker.interf.locality.processId(); Optional<Standalone<StringRef>> newDDProcessId = newDDWorker.interf.locality.processId();
Optional<Standalone<StringRef>> currBMProcessId, newBMProcessId; Optional<Standalone<StringRef>> currBMProcessId, newBMProcessId;
if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES) { if (self->db.config.blobGranulesEnabled) {
currBMProcessId = bmSingleton.interface.get().locality.processId(); currBMProcessId = bmSingleton.interface.get().locality.processId();
newBMProcessId = newBMWorker.interf.locality.processId(); newBMProcessId = newBMWorker.interf.locality.processId();
} }
std::vector<Optional<Standalone<StringRef>>> currPids = { currRKProcessId, currDDProcessId }; std::vector<Optional<Standalone<StringRef>>> currPids = { currRKProcessId, currDDProcessId };
std::vector<Optional<Standalone<StringRef>>> newPids = { newRKProcessId, newDDProcessId }; std::vector<Optional<Standalone<StringRef>>> newPids = { newRKProcessId, newDDProcessId };
if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES) { if (self->db.config.blobGranulesEnabled) {
currPids.emplace_back(currBMProcessId); currPids.emplace_back(currBMProcessId);
newPids.emplace_back(newBMProcessId); newPids.emplace_back(newBMProcessId);
} }
@ -3782,7 +3783,7 @@ void checkBetterSingletons(ClusterControllerData* self) {
auto newColocMap = getColocCounts(newPids); auto newColocMap = getColocCounts(newPids);
// if the knob is disabled, the BM coloc counts should have no affect on the coloc counts check below // if the knob is disabled, the BM coloc counts should have no affect on the coloc counts check below
if (!CLIENT_KNOBS->ENABLE_BLOB_GRANULES) { if (!self->db.config.blobGranulesEnabled) {
ASSERT(currColocMap[currBMProcessId] == 0); ASSERT(currColocMap[currBMProcessId] == 0);
ASSERT(newColocMap[newBMProcessId] == 0); ASSERT(newColocMap[newBMProcessId] == 0);
} }
@ -3796,7 +3797,7 @@ void checkBetterSingletons(ClusterControllerData* self) {
rkSingleton.recruit(self); rkSingleton.recruit(self);
} else if (newColocMap[newDDProcessId] < currColocMap[currDDProcessId]) { } else if (newColocMap[newDDProcessId] < currColocMap[currDDProcessId]) {
ddSingleton.recruit(self); ddSingleton.recruit(self);
} else if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES && newColocMap[newBMProcessId] < currColocMap[currBMProcessId]) { } else if (self->db.config.blobGranulesEnabled && newColocMap[newBMProcessId] < currColocMap[currBMProcessId]) {
bmSingleton.recruit(self); bmSingleton.recruit(self);
} }
} }
@ -3818,7 +3819,7 @@ ACTOR Future<Void> doCheckOutstandingRequests(ClusterControllerData* self) {
checkOutstandingRecruitmentRequests(self); checkOutstandingRecruitmentRequests(self);
checkOutstandingStorageRequests(self); checkOutstandingStorageRequests(self);
if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES) { if (self->db.config.blobGranulesEnabled) {
checkOutstandingBlobWorkerRequests(self); checkOutstandingBlobWorkerRequests(self);
} }
checkBetterSingletons(self); checkBetterSingletons(self);
@ -4368,7 +4369,7 @@ void registerWorker(RegisterWorkerRequest req, ClusterControllerData* self, Conf
self, w, currSingleton, registeringSingleton, self->recruitingRatekeeperID); self, w, currSingleton, registeringSingleton, self->recruitingRatekeeperID);
} }
if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES && req.blobManagerInterf.present()) { if (self->db.config.blobGranulesEnabled && req.blobManagerInterf.present()) {
auto currSingleton = BlobManagerSingleton(self->db.serverInfo->get().blobManager); auto currSingleton = BlobManagerSingleton(self->db.serverInfo->get().blobManager);
auto registeringSingleton = BlobManagerSingleton(req.blobManagerInterf); auto registeringSingleton = BlobManagerSingleton(req.blobManagerInterf);
haltRegisteringOrCurrentSingleton<BlobManagerInterface>( haltRegisteringOrCurrentSingleton<BlobManagerInterface>(
@ -5517,7 +5518,7 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
self.addActor.send(handleForcedRecoveries(&self, interf)); self.addActor.send(handleForcedRecoveries(&self, interf));
self.addActor.send(monitorDataDistributor(&self)); self.addActor.send(monitorDataDistributor(&self));
self.addActor.send(monitorRatekeeper(&self)); self.addActor.send(monitorRatekeeper(&self));
if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES) { if (self.db.config.blobGranulesEnabled) {
self.addActor.send(monitorBlobManager(&self)); self.addActor.send(monitorBlobManager(&self));
} }
// self.addActor.send(monitorTSSMapping(&self)); // self.addActor.send(monitorTSSMapping(&self));

View File

@ -2021,7 +2021,7 @@ void setupSimulatedSystem(std::vector<Future<Void>>* systemActors,
// TODO: caching disabled for this merge // TODO: caching disabled for this merge
int storageCacheMachines = dc == 0 ? 1 : 0; int storageCacheMachines = dc == 0 ? 1 : 0;
int blobWorkerMachines = 0; int blobWorkerMachines = 0;
if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES) { if (simconfig.db.blobGranulesEnabled) {
int blobWorkerProcesses = 1 + deterministicRandom()->randomInt(0, NUM_EXTRA_BW_MACHINES + 1); int blobWorkerProcesses = 1 + deterministicRandom()->randomInt(0, NUM_EXTRA_BW_MACHINES + 1);
blobWorkerMachines = std::max(1, blobWorkerProcesses / processesPerMachine); blobWorkerMachines = std::max(1, blobWorkerProcesses / processesPerMachine);
} }

View File

@ -797,7 +797,7 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
roles.addRole("ratekeeper", db->get().ratekeeper.get()); roles.addRole("ratekeeper", db->get().ratekeeper.get());
} }
if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES && db->get().blobManager.present()) { if (configuration.present() && configuration.get().blobGranulesEnabled && db->get().blobManager.present()) {
roles.addRole("blob_manager", db->get().blobManager.get()); roles.addRole("blob_manager", db->get().blobManager.get());
} }
@ -864,7 +864,7 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
wait(yield()); wait(yield());
} }
if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES) { if (configuration.present() && configuration.get().blobGranulesEnabled) {
for (auto blobWorker : blobWorkers) { for (auto blobWorker : blobWorkers) {
roles.addRole("blob_worker", blobWorker); roles.addRole("blob_worker", blobWorker);
wait(yield()); wait(yield());
@ -2890,7 +2890,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
errorOr(getGrvProxiesAndMetrics(db, address_workers)); errorOr(getGrvProxiesAndMetrics(db, address_workers));
state Future<ErrorOr<std::vector<BlobWorkerInterface>>> blobWorkersFuture; state Future<ErrorOr<std::vector<BlobWorkerInterface>>> blobWorkersFuture;
if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES) { if (configuration.present() && configuration.get().blobGranulesEnabled) {
blobWorkersFuture = errorOr(timeoutError(getBlobWorkers(cx, true), 5.0)); blobWorkersFuture = errorOr(timeoutError(getBlobWorkers(cx, true), 5.0));
} }
@ -3011,7 +3011,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
} }
// ...also blob workers // ...also blob workers
if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES) { if (configuration.present() && configuration.get().blobGranulesEnabled) {
ErrorOr<std::vector<BlobWorkerInterface>> _blobWorkers = wait(blobWorkersFuture); ErrorOr<std::vector<BlobWorkerInterface>> _blobWorkers = wait(blobWorkersFuture);
if (_blobWorkers.present()) { if (_blobWorkers.present()) {
blobWorkers = _blobWorkers.get(); blobWorkers = _blobWorkers.get();

View File

@ -24,6 +24,7 @@
#include "contrib/fmt-8.0.1/include/fmt/format.h" #include "contrib/fmt-8.0.1/include/fmt/format.h"
#include "fdbclient/BlobGranuleReader.actor.h" #include "fdbclient/BlobGranuleReader.actor.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/NativeAPI.actor.h" #include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ReadYourWrites.h" #include "fdbclient/ReadYourWrites.h"
#include "fdbserver/Knobs.h" #include "fdbserver/Knobs.h"
@ -60,6 +61,8 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
int64_t bytesRead = 0; int64_t bytesRead = 0;
std::vector<Future<Void>> clients; std::vector<Future<Void>> clients;
DatabaseConfiguration config;
Reference<BackupContainerFileSystem> bstore; Reference<BackupContainerFileSystem> bstore;
AsyncVar<Standalone<VectorRef<KeyRangeRef>>> granuleRanges; AsyncVar<Standalone<VectorRef<KeyRangeRef>>> granuleRanges;
@ -127,7 +130,8 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
std::string description() const override { return "BlobGranuleVerifier"; } std::string description() const override { return "BlobGranuleVerifier"; }
Future<Void> setup(Database const& cx) override { Future<Void> setup(Database const& cx) override {
if (!CLIENT_KNOBS->ENABLE_BLOB_GRANULES) { config = wait(getDatabaseConfiguration(cx));
if (!config.blobGranulesEnabled) {
return Void(); return Void();
} }
@ -377,7 +381,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
} }
Future<Void> start(Database const& cx) override { Future<Void> start(Database const& cx) override {
if (!CLIENT_KNOBS->ENABLE_BLOB_GRANULES) { if (!config.blobGranulesEnabled) {
return Void(); return Void();
} }
@ -457,7 +461,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
} }
Future<bool> check(Database const& cx) override { Future<bool> check(Database const& cx) override {
if (!CLIENT_KNOBS->ENABLE_BLOB_GRANULES) { if (!config.blobGranulesEnabled) {
return true; return true;
} }

View File

@ -296,7 +296,7 @@ struct ConsistencyCheckWorkload : TestWorkload {
wait(::success(self->checkForExtraDataStores(cx, self))); wait(::success(self->checkForExtraDataStores(cx, self)));
// Check blob workers are operating as expected // Check blob workers are operating as expected
if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES) { if (configuration.blobGranulesEnabled) {
bool blobWorkersCorrect = wait(self->checkBlobWorkers(cx, configuration, self)); bool blobWorkersCorrect = wait(self->checkBlobWorkers(cx, configuration, self));
if (!blobWorkersCorrect) if (!blobWorkersCorrect)
self->testFailure("Blob workers incorrect"); self->testFailure("Blob workers incorrect");
@ -2352,7 +2352,7 @@ struct ConsistencyCheckWorkload : TestWorkload {
} }
// Check BlobManager // Check BlobManager
if (CLIENT_KNOBS->ENABLE_BLOB_GRANULES && db.blobManager.present() && if (configuration.&& db.blobManager.present() &&
(!nonExcludedWorkerProcessMap.count(db.blobManager.get().address()) || (!nonExcludedWorkerProcessMap.count(db.blobManager.get().address()) ||
nonExcludedWorkerProcessMap[db.blobManager.get().address()].processClass.machineClassFitness( nonExcludedWorkerProcessMap[db.blobManager.get().address()].processClass.machineClassFitness(
ProcessClass::BlobManager) > fitnessLowerBound)) { ProcessClass::BlobManager) > fitnessLowerBound)) {