From 249ff2b2fdcbf4fcbc84a6043d62a039e7cde154 Mon Sep 17 00:00:00 2001 From: Lukas Joswiak Date: Fri, 29 Jul 2022 17:28:34 -0700 Subject: [PATCH] Fix configuration database unit tests --- fdbclient/ManagementAPI.actor.cpp | 3 ++ fdbclient/PaxosConfigTransaction.actor.cpp | 16 +++--- fdbclient/SimpleConfigTransaction.actor.cpp | 19 ++++--- .../fdbclient/ConfigTransactionInterface.h | 15 ++++-- fdbserver/ClusterController.actor.cpp | 18 +++---- fdbserver/ClusterRecovery.actor.cpp | 1 - fdbserver/ConfigBroadcaster.actor.cpp | 33 ++++++------ fdbserver/ConfigDatabaseUnitTests.actor.cpp | 24 +++------ fdbserver/ConfigNode.actor.cpp | 51 ++++++++++++------- .../fdbserver/ConfigBroadcastInterface.h | 1 + .../include/fdbserver/ConfigBroadcaster.h | 3 +- fdbserver/include/fdbserver/ConfigNode.h | 1 + .../include/fdbserver/WorkerInterface.actor.h | 8 +-- fdbserver/worker.actor.cpp | 38 +++++++------- 14 files changed, 125 insertions(+), 106 deletions(-) diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index 2a4b3c9acd..d49993301d 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -956,6 +956,9 @@ ACTOR Future> changeQuorumChecker(Transaction* tr, std::sort(old.coords.begin(), old.coords.end()); if (conn->hostnames == old.hostnames && conn->coords == old.coords && old.clusterKeyName() == newName) { connectionStrings.clear(); + if (g_network->isSimulated() && g_simulator.configDBType == ConfigDBType::DISABLED) { + disableConfigDB = true; + } if (!disableConfigDB) { wait(verifyConfigurationDatabaseAlive(tr->getDatabase())); } diff --git a/fdbclient/PaxosConfigTransaction.actor.cpp b/fdbclient/PaxosConfigTransaction.actor.cpp index 0202d11d28..d86ce29ac2 100644 --- a/fdbclient/PaxosConfigTransaction.actor.cpp +++ b/fdbclient/PaxosConfigTransaction.actor.cpp @@ -327,10 +327,10 @@ class PaxosConfigTransactionImpl { } wait(waitForAll(fs)); state Reference configNodes(new ConfigTransactionInfo(readReplicas)); - ConfigTransactionGetConfigClassesReply reply = - wait(basicLoadBalance(configNodes, - &ConfigTransactionInterface::getClasses, - ConfigTransactionGetConfigClassesRequest{ generation })); + ConfigTransactionGetConfigClassesReply reply = wait( + basicLoadBalance(configNodes, + &ConfigTransactionInterface::getClasses, + ConfigTransactionGetConfigClassesRequest{ self->coordinatorsHash, generation })); RangeResult result; result.reserve(result.arena(), reply.configClasses.size()); for (const auto& configClass : reply.configClasses) { @@ -361,10 +361,10 @@ class PaxosConfigTransactionImpl { } wait(waitForAll(fs)); state Reference configNodes(new ConfigTransactionInfo(readReplicas)); - ConfigTransactionGetKnobsReply reply = - wait(basicLoadBalance(configNodes, - &ConfigTransactionInterface::getKnobs, - ConfigTransactionGetKnobsRequest{ generation, configClass })); + ConfigTransactionGetKnobsReply reply = wait(basicLoadBalance( + configNodes, + &ConfigTransactionInterface::getKnobs, + ConfigTransactionGetKnobsRequest{ self->coordinatorsHash, generation, configClass })); RangeResult result; result.reserve(result.arena(), reply.knobNames.size()); for (const auto& knobName : reply.knobNames) { diff --git a/fdbclient/SimpleConfigTransaction.actor.cpp b/fdbclient/SimpleConfigTransaction.actor.cpp index 20084df3ce..dba5d327b7 100644 --- a/fdbclient/SimpleConfigTransaction.actor.cpp +++ b/fdbclient/SimpleConfigTransaction.actor.cpp @@ -43,11 +43,13 @@ class SimpleConfigTransactionImpl { state ConfigTransactionGetGenerationReply reply; if (self->cti.hostname.present()) { wait(store(reply, - retryGetReplyFromHostname(ConfigTransactionGetGenerationRequest{}, + retryGetReplyFromHostname(ConfigTransactionGetGenerationRequest{ 0, Optional() }, self->cti.hostname.get(), WLTOKEN_CONFIGTXN_GETGENERATION))); } else { - wait(store(reply, retryBrokenPromise(self->cti.getGeneration, ConfigTransactionGetGenerationRequest{}))); + wait(store(reply, + retryBrokenPromise(self->cti.getGeneration, + ConfigTransactionGetGenerationRequest{ 0, Optional() }))); } if (self->dID.present()) { TraceEvent("SimpleConfigTransactionGotReadVersion", self->dID.get()) @@ -96,13 +98,13 @@ class SimpleConfigTransactionImpl { state ConfigTransactionGetConfigClassesReply reply; if (self->cti.hostname.present()) { wait(store(reply, - retryGetReplyFromHostname(ConfigTransactionGetConfigClassesRequest{ generation }, + retryGetReplyFromHostname(ConfigTransactionGetConfigClassesRequest{ 0, generation }, self->cti.hostname.get(), WLTOKEN_CONFIGTXN_GETCLASSES))); } else { wait(store( reply, - retryBrokenPromise(self->cti.getClasses, ConfigTransactionGetConfigClassesRequest{ generation }))); + retryBrokenPromise(self->cti.getClasses, ConfigTransactionGetConfigClassesRequest{ 0, generation }))); } RangeResult result; for (const auto& configClass : reply.configClasses) { @@ -119,13 +121,13 @@ class SimpleConfigTransactionImpl { state ConfigTransactionGetKnobsReply reply; if (self->cti.hostname.present()) { wait(store(reply, - retryGetReplyFromHostname(ConfigTransactionGetKnobsRequest{ generation, configClass }, + retryGetReplyFromHostname(ConfigTransactionGetKnobsRequest{ 0, generation, configClass }, self->cti.hostname.get(), WLTOKEN_CONFIGTXN_GETKNOBS))); } else { - wait(store( - reply, - retryBrokenPromise(self->cti.getKnobs, ConfigTransactionGetKnobsRequest{ generation, configClass }))); + wait(store(reply, + retryBrokenPromise(self->cti.getKnobs, + ConfigTransactionGetKnobsRequest{ 0, generation, configClass }))); } RangeResult result; for (const auto& knobName : reply.knobNames) { @@ -138,6 +140,7 @@ class SimpleConfigTransactionImpl { if (!self->getGenerationFuture.isValid()) { self->getGenerationFuture = getGeneration(self); } + self->toCommit.coordinatorsHash = 0; wait(store(self->toCommit.generation, self->getGenerationFuture)); self->toCommit.annotation.timestamp = now(); if (self->cti.hostname.present()) { diff --git a/fdbclient/include/fdbclient/ConfigTransactionInterface.h b/fdbclient/include/fdbclient/ConfigTransactionInterface.h index 7d00a49dbe..44cc6d21e9 100644 --- a/fdbclient/include/fdbclient/ConfigTransactionInterface.h +++ b/fdbclient/include/fdbclient/ConfigTransactionInterface.h @@ -150,15 +150,17 @@ struct ConfigTransactionGetConfigClassesReply { struct ConfigTransactionGetConfigClassesRequest { static constexpr FileIdentifier file_identifier = 7163400; + size_t coordinatorsHash; ConfigGeneration generation; ReplyPromise reply; ConfigTransactionGetConfigClassesRequest() = default; - explicit ConfigTransactionGetConfigClassesRequest(ConfigGeneration generation) : generation(generation) {} + explicit ConfigTransactionGetConfigClassesRequest(size_t coordinatorsHash, ConfigGeneration generation) + : coordinatorsHash(coordinatorsHash), generation(generation) {} template void serialize(Ar& ar) { - serializer(ar, generation); + serializer(ar, coordinatorsHash, generation); } }; @@ -177,17 +179,20 @@ struct ConfigTransactionGetKnobsReply { struct ConfigTransactionGetKnobsRequest { static constexpr FileIdentifier file_identifier = 987410; + size_t coordinatorsHash; ConfigGeneration generation; Optional configClass; ReplyPromise reply; ConfigTransactionGetKnobsRequest() = default; - explicit ConfigTransactionGetKnobsRequest(ConfigGeneration generation, Optional configClass) - : generation(generation), configClass(configClass) {} + explicit ConfigTransactionGetKnobsRequest(size_t coordinatorsHash, + ConfigGeneration generation, + Optional configClass) + : coordinatorsHash(coordinatorsHash), generation(generation), configClass(configClass) {} template void serialize(Ar& ar) { - serializer(ar, generation, configClass, reply); + serializer(ar, coordinatorsHash, generation, configClass, reply); } }; diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index 773b0477a6..dc4c9d5405 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -1223,13 +1223,12 @@ ACTOR Future registerWorker(RegisterWorkerRequest req, w.locality.processId() == self->db.serverInfo->get().master.locality.processId()) { self->masterProcessId = w.locality.processId(); } - if (configBroadcaster != nullptr) { + if (configBroadcaster != nullptr && req.lastSeenKnobVersion.present() && req.knobConfigClassSet.present()) { self->addActor.send(configBroadcaster->registerNode( - w, - req.lastSeenKnobVersion, - req.knobConfigClassSet, - self->id_worker[w.locality.processId()].watcher, self->id_worker[w.locality.processId()].details.interf.configBroadcastInterface, + req.lastSeenKnobVersion.get(), + req.knobConfigClassSet.get(), + self->id_worker[w.locality.processId()].watcher, isCoordinator)); } self->updateDBInfoEndpoints.insert(w.updateServerDBInfo.getEndpoint()); @@ -1261,12 +1260,11 @@ ACTOR Future registerWorker(RegisterWorkerRequest req, self->updateDBInfoEndpoints.insert(w.updateServerDBInfo.getEndpoint()); self->updateDBInfo.trigger(); } - if (configBroadcaster != nullptr) { - self->addActor.send(configBroadcaster->registerNode(w, - req.lastSeenKnobVersion, - req.knobConfigClassSet, + if (configBroadcaster != nullptr && req.lastSeenKnobVersion.present() && req.knobConfigClassSet.present()) { + self->addActor.send(configBroadcaster->registerNode(info->second.details.interf.configBroadcastInterface, + req.lastSeenKnobVersion.get(), + req.knobConfigClassSet.get(), info->second.watcher, - info->second.details.interf.configBroadcastInterface, isCoordinator)); } checkOutstandingRequests(self); diff --git a/fdbserver/ClusterRecovery.actor.cpp b/fdbserver/ClusterRecovery.actor.cpp index 26d2e63277..b3cfd9e05c 100644 --- a/fdbserver/ClusterRecovery.actor.cpp +++ b/fdbserver/ClusterRecovery.actor.cpp @@ -521,7 +521,6 @@ ACTOR Future changeCoordinators(Reference self) { loop { ChangeCoordinatorsRequest req = waitNext(self->clusterController.changeCoordinators.getFuture()); TraceEvent("ChangeCoordinators", self->dbgid).log(); - ++self->changeCoordinatorsRequests; state ChangeCoordinatorsRequest changeCoordinatorsRequest = req; if (self->masterInterface.id() != changeCoordinatorsRequest.masterId) { diff --git a/fdbserver/ConfigBroadcaster.actor.cpp b/fdbserver/ConfigBroadcaster.actor.cpp index 43b42cb5d5..6796f5925b 100644 --- a/fdbserver/ConfigBroadcaster.actor.cpp +++ b/fdbserver/ConfigBroadcaster.actor.cpp @@ -245,17 +245,18 @@ class ConfigBroadcasterImpl { // date. ACTOR static Future registerNodeInternal(ConfigBroadcaster* broadcaster, ConfigBroadcasterImpl* self, - WorkerInterface w) { + ConfigBroadcastInterface configBroadcastInterface) { if (self->configDBType == ConfigDBType::SIMPLE) { - wait(success( - brokenPromiseToNever(w.configBroadcastInterface.ready.getReply(ConfigBroadcastReadyRequest{})))); + self->consumerFuture = self->consumer->consume(*broadcaster); + wait(success(brokenPromiseToNever( + configBroadcastInterface.ready.getReply(ConfigBroadcastReadyRequest{ 0, {}, -1, -1 })))); return Void(); } - state NetworkAddress address = w.address(); + state NetworkAddress address = configBroadcastInterface.address(); // Ask the registering ConfigNode whether it has registered in the past. state ConfigBroadcastRegisteredReply reply = wait( - brokenPromiseToNever(w.configBroadcastInterface.registered.getReply(ConfigBroadcastRegisteredRequest{}))); + brokenPromiseToNever(configBroadcastInterface.registered.getReply(ConfigBroadcastRegisteredRequest{}))); self->maxLastSeenVersion = std::max(self->maxLastSeenVersion, reply.lastSeenVersion); state bool registered = reply.registered; TraceEvent("ConfigBroadcasterRegisterNodeReceivedRegistrationReply", self->id) @@ -373,10 +374,10 @@ class ConfigBroadcasterImpl { .detail("LargestLiveVersion", self->largestLiveVersion); if (sendSnapshot) { Version liveVersion = std::max(self->largestLiveVersion, self->mostRecentVersion); - wait(success(brokenPromiseToNever(w.configBroadcastInterface.ready.getReply(ConfigBroadcastReadyRequest{ + wait(success(brokenPromiseToNever(configBroadcastInterface.ready.getReply(ConfigBroadcastReadyRequest{ self->coordinatorsHash, self->snapshot, self->mostRecentVersion, liveVersion })))); } else { - wait(success(brokenPromiseToNever(w.configBroadcastInterface.ready.getReply( + wait(success(brokenPromiseToNever(configBroadcastInterface.ready.getReply( ConfigBroadcastReadyRequest{ self->coordinatorsHash, {}, -1, -1 })))); } @@ -392,11 +393,10 @@ class ConfigBroadcasterImpl { ACTOR static Future registerNode(ConfigBroadcaster* self, ConfigBroadcasterImpl* impl, - WorkerInterface w, + ConfigBroadcastInterface broadcastInterface, Version lastSeenVersion, ConfigClassSet configClassSet, Future watcher, - ConfigBroadcastInterface broadcastInterface, bool isCoordinator) { state BroadcastClientDetails client( watcher, std::move(configClassSet), lastSeenVersion, std::move(broadcastInterface)); @@ -412,27 +412,25 @@ class ConfigBroadcasterImpl { .detail("IsCoordinator", isCoordinator); if (isCoordinator) { - impl->actors.add(registerNodeInternal(self, impl, w)); + impl->actors.add(registerNodeInternal(self, impl, broadcastInterface)); } // Push full snapshot to worker if it isn't up to date. wait(impl->pushSnapshot(impl->mostRecentVersion, client)); impl->clients[broadcastInterface.id()] = client; impl->clientFailures[broadcastInterface.id()] = - waitForFailure(impl, watcher, broadcastInterface.id(), w.address(), isCoordinator); + waitForFailure(impl, watcher, broadcastInterface.id(), broadcastInterface.address(), isCoordinator); return Void(); } public: Future registerNode(ConfigBroadcaster& self, - WorkerInterface const& w, + ConfigBroadcastInterface const& broadcastInterface, Version lastSeenVersion, ConfigClassSet configClassSet, Future watcher, - ConfigBroadcastInterface const& broadcastInterface, bool isCoordinator) { - return registerNode( - &self, this, w, lastSeenVersion, configClassSet, watcher, broadcastInterface, isCoordinator); + return registerNode(&self, this, broadcastInterface, lastSeenVersion, configClassSet, watcher, isCoordinator); } // Updates the broadcasters knowledge of which replicas are fully up to @@ -621,13 +619,12 @@ ConfigBroadcaster& ConfigBroadcaster::operator=(ConfigBroadcaster&&) = default; ConfigBroadcaster::~ConfigBroadcaster() = default; -Future ConfigBroadcaster::registerNode(WorkerInterface const& w, +Future ConfigBroadcaster::registerNode(ConfigBroadcastInterface const& broadcastInterface, Version lastSeenVersion, ConfigClassSet const& configClassSet, Future watcher, - ConfigBroadcastInterface const& broadcastInterface, bool isCoordinator) { - return impl->registerNode(*this, w, lastSeenVersion, configClassSet, watcher, broadcastInterface, isCoordinator); + return impl->registerNode(*this, broadcastInterface, lastSeenVersion, configClassSet, watcher, isCoordinator); } void ConfigBroadcaster::applyChanges(Standalone> const& changes, diff --git a/fdbserver/ConfigDatabaseUnitTests.actor.cpp b/fdbserver/ConfigDatabaseUnitTests.actor.cpp index 1a59eed57e..b50c1cc1e5 100644 --- a/fdbserver/ConfigDatabaseUnitTests.actor.cpp +++ b/fdbserver/ConfigDatabaseUnitTests.actor.cpp @@ -269,8 +269,8 @@ class BroadcasterToLocalConfigEnvironment { wait(self->readFrom.setup()); self->cbi = makeReference>(); self->readFrom.connectToBroadcaster(self->cbi); - self->broadcastServer = self->broadcaster.registerNode( - WorkerInterface(), 0, configClassSet, self->workerFailure.getFuture(), self->cbi->get(), true); + self->broadcastServer = + self->broadcaster.registerNode(self->cbi->get(), 0, configClassSet, self->workerFailure.getFuture(), true); return Void(); } @@ -305,12 +305,8 @@ public: broadcastServer.cancel(); cbi->set(ConfigBroadcastInterface{}); readFrom.connectToBroadcaster(cbi); - broadcastServer = broadcaster.registerNode(WorkerInterface(), - readFrom.lastSeenVersion(), - readFrom.configClassSet(), - workerFailure.getFuture(), - cbi->get(), - true); + broadcastServer = broadcaster.registerNode( + cbi->get(), readFrom.lastSeenVersion(), readFrom.configClassSet(), workerFailure.getFuture(), true); } Future restartLocalConfig(std::string const& newConfigPath) { @@ -442,8 +438,8 @@ class TransactionToLocalConfigEnvironment { wait(self->readFrom.setup()); self->cbi = makeReference>(); self->readFrom.connectToBroadcaster(self->cbi); - self->broadcastServer = self->broadcaster.registerNode( - WorkerInterface(), 0, configClassSet, self->workerFailure.getFuture(), self->cbi->get(), true); + self->broadcastServer = + self->broadcaster.registerNode(self->cbi->get(), 0, configClassSet, self->workerFailure.getFuture(), true); return Void(); } @@ -462,12 +458,8 @@ public: broadcastServer.cancel(); cbi->set(ConfigBroadcastInterface{}); readFrom.connectToBroadcaster(cbi); - broadcastServer = broadcaster.registerNode(WorkerInterface(), - readFrom.lastSeenVersion(), - readFrom.configClassSet(), - workerFailure.getFuture(), - cbi->get(), - true); + broadcastServer = broadcaster.registerNode( + cbi->get(), readFrom.lastSeenVersion(), readFrom.configClassSet(), workerFailure.getFuture(), true); } Future restartLocalConfig(std::string const& newConfigPath) { diff --git a/fdbserver/ConfigNode.actor.cpp b/fdbserver/ConfigNode.actor.cpp index a1e52e2d0d..1a6b9d0c9f 100644 --- a/fdbserver/ConfigNode.actor.cpp +++ b/fdbserver/ConfigNode.actor.cpp @@ -135,18 +135,23 @@ class ConfigNodeImpl { Counter getGenerationRequests; Future logger; - ACTOR static Future> getCoordinatorsHash(ConfigNodeImpl* self) { + ACTOR static Future getCoordinatorsHash(ConfigNodeImpl* self) { + state size_t coordinatorsHash = 0; Optional value = wait(self->kvStore->readValue(coordinatorsHashKey)); - if (!value.present()) { - return Optional(); + if (value.present()) { + coordinatorsHash = BinaryReader::fromStringRef(value.get(), IncludeVersion()); + } else { + self->kvStore->set( + KeyValueRef(coordinatorsHashKey, BinaryWriter::toValue(coordinatorsHash, IncludeVersion()))); + wait(self->kvStore->commit()); } - return BinaryReader::fromStringRef(value.get(), IncludeVersion()); + return coordinatorsHash; } ACTOR static Future> getLocked(ConfigNodeImpl* self) { Optional value = wait(self->kvStore->readValue(lockedKey)); if (!value.present()) { - return false; + return Optional(); } return BinaryReader::fromStringRef>(value.get(), IncludeVersion()); } @@ -247,9 +252,8 @@ class ConfigNodeImpl { // New transactions increment the database's current live version. This effectively serves as a lock, providing // serializability ACTOR static Future getNewGeneration(ConfigNodeImpl* self, ConfigTransactionGetGenerationRequest req) { - state Optional coordinatorsHash = wait(getCoordinatorsHash(self)); - ASSERT(coordinatorsHash.present()); - if (req.coordinatorsHash != coordinatorsHash.get()) { + state size_t coordinatorsHash = wait(getCoordinatorsHash(self)); + if (req.coordinatorsHash != coordinatorsHash) { req.reply.sendError(coordinators_changed()); return Void(); } @@ -273,9 +277,8 @@ class ConfigNodeImpl { req.reply.sendError(coordinators_changed()); return Void(); } - state Optional coordinatorsHash = wait(getCoordinatorsHash(self)); - ASSERT(coordinatorsHash.present()); - if (req.coordinatorsHash != coordinatorsHash.get()) { + state size_t coordinatorsHash = wait(getCoordinatorsHash(self)); + if (req.coordinatorsHash != coordinatorsHash) { req.reply.sendError(coordinators_changed()); return Void(); } @@ -317,7 +320,11 @@ class ConfigNodeImpl { req.reply.sendError(coordinators_changed()); return Void(); } - + state size_t coordinatorsHash = wait(getCoordinatorsHash(self)); + if (req.coordinatorsHash != coordinatorsHash) { + req.reply.sendError(coordinators_changed()); + return Void(); + } ConfigGeneration currentGeneration = wait(getGeneration(self)); if (req.generation != currentGeneration) { req.reply.sendError(transaction_too_old()); @@ -357,6 +364,11 @@ class ConfigNodeImpl { req.reply.sendError(coordinators_changed()); return Void(); } + state size_t coordinatorsHash = wait(getCoordinatorsHash(self)); + if (req.coordinatorsHash != coordinatorsHash) { + req.reply.sendError(coordinators_changed()); + return Void(); + } ConfigGeneration currentGeneration = wait(getGeneration(self)); if (req.generation != currentGeneration) { @@ -441,9 +453,8 @@ class ConfigNodeImpl { req.reply.sendError(coordinators_changed()); return Void(); } - state Optional coordinatorsHash = wait(getCoordinatorsHash(self)); - ASSERT(coordinatorsHash.present()); - if (req.coordinatorsHash != coordinatorsHash.get()) { + state size_t coordinatorsHash = wait(getCoordinatorsHash(self)); + if (req.coordinatorsHash != coordinatorsHash) { req.reply.sendError(coordinators_changed()); return Void(); } @@ -747,8 +758,8 @@ class ConfigNodeImpl { } when(state ConfigFollowerLockRequest req = waitNext(cfi->lock.getFuture())) { ++self->lockRequests; - Optional coordinatorsHash = wait(getCoordinatorsHash(self)); - if (!coordinatorsHash.present() || coordinatorsHash.get() == req.coordinatorsHash) { + size_t coordinatorsHash = wait(getCoordinatorsHash(self)); + if (coordinatorsHash == 0 || coordinatorsHash == req.coordinatorsHash) { TraceEvent("ConfigNodeLocking", self->id).log(); self->kvStore->set(KeyValueRef(registeredKey, BinaryWriter::toValue(false, IncludeVersion()))); self->kvStore->set(KeyValueRef( @@ -803,6 +814,8 @@ public: return serve(this, &cbi, &cti, &cfi); } + Future serve(ConfigBroadcastInterface const& cbi) { return serve(this, &cbi, true); } + Future serve(ConfigTransactionInterface const& cti) { return serve(this, &cti); } Future serve(ConfigFollowerInterface const& cfi) { @@ -824,6 +837,10 @@ Future ConfigNode::serve(ConfigBroadcastInterface const& cbi, return impl->serve(cbi, cti, cfi); } +Future ConfigNode::serve(ConfigBroadcastInterface const& cbi) { + return impl->serve(cbi); +} + Future ConfigNode::serve(ConfigTransactionInterface const& cti) { return impl->serve(cti); } diff --git a/fdbserver/include/fdbserver/ConfigBroadcastInterface.h b/fdbserver/include/fdbserver/ConfigBroadcastInterface.h index 882fbc075c..34aa32d605 100644 --- a/fdbserver/include/fdbserver/ConfigBroadcastInterface.h +++ b/fdbserver/include/fdbserver/ConfigBroadcastInterface.h @@ -192,6 +192,7 @@ public: bool operator==(ConfigBroadcastInterface const& rhs) const { return (_id == rhs._id); } bool operator!=(ConfigBroadcastInterface const& rhs) const { return !(*this == rhs); } UID id() const { return _id; } + NetworkAddress address() const { return snapshot.getEndpoint().getPrimaryAddress(); } template void serialize(Ar& ar) { diff --git a/fdbserver/include/fdbserver/ConfigBroadcaster.h b/fdbserver/include/fdbserver/ConfigBroadcaster.h index 861964b220..47808e9861 100644 --- a/fdbserver/include/fdbserver/ConfigBroadcaster.h +++ b/fdbserver/include/fdbserver/ConfigBroadcaster.h @@ -44,11 +44,10 @@ public: ConfigBroadcaster(ConfigBroadcaster&&); ConfigBroadcaster& operator=(ConfigBroadcaster&&); ~ConfigBroadcaster(); - Future registerNode(WorkerInterface const& w, + Future registerNode(ConfigBroadcastInterface const& broadcastInterface, Version lastSeenVersion, ConfigClassSet const& configClassSet, Future watcher, - ConfigBroadcastInterface const& worker, bool isCoordinator); void applyChanges(Standalone> const& changes, Version mostRecentVersion, diff --git a/fdbserver/include/fdbserver/ConfigNode.h b/fdbserver/include/fdbserver/ConfigNode.h index 333652893b..5454ac63da 100644 --- a/fdbserver/include/fdbserver/ConfigNode.h +++ b/fdbserver/include/fdbserver/ConfigNode.h @@ -38,6 +38,7 @@ public: ConfigFollowerInterface const&); public: // Testing + Future serve(ConfigBroadcastInterface const&); Future serve(ConfigTransactionInterface const&); Future serve(ConfigFollowerInterface const&); void close(); diff --git a/fdbserver/include/fdbserver/WorkerInterface.actor.h b/fdbserver/include/fdbserver/WorkerInterface.actor.h index ca168fe96d..b60b77b64c 100644 --- a/fdbserver/include/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/include/fdbserver/WorkerInterface.actor.h @@ -434,8 +434,8 @@ struct RegisterWorkerRequest { std::vector incompatiblePeers; ReplyPromise reply; bool degraded; - Version lastSeenKnobVersion; - ConfigClassSet knobConfigClassSet; + Optional lastSeenKnobVersion; + Optional knobConfigClassSet; bool requestDbInfo; bool recoveredDiskFiles; @@ -451,8 +451,8 @@ struct RegisterWorkerRequest { Optional bmInterf, Optional ekpInterf, bool degraded, - Version lastSeenKnobVersion, - ConfigClassSet knobConfigClassSet, + Optional lastSeenKnobVersion, + Optional knobConfigClassSet, bool recoveredDiskFiles) : wi(wi), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo), generation(generation), distributorInterf(ddInterf), ratekeeperInterf(rkInterf), blobManagerInterf(bmInterf), diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index c42971e766..59b094a1e6 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -589,20 +589,20 @@ ACTOR Future registrationClient( incorrectTime = Optional(); } - RegisterWorkerRequest request(interf, - initialClass, - processClass, - asyncPriorityInfo->get(), - requestGeneration++, - ddInterf->get(), - rkInterf->get(), - bmInterf->get().present() ? bmInterf->get().get().second - : Optional(), - ekpInterf->get(), - degraded->get(), - localConfig->lastSeenVersion(), - localConfig->configClassSet(), - recoveredDiskFiles.isSet()); + RegisterWorkerRequest request( + interf, + initialClass, + processClass, + asyncPriorityInfo->get(), + requestGeneration++, + ddInterf->get(), + rkInterf->get(), + bmInterf->get().present() ? bmInterf->get().get().second : Optional(), + ekpInterf->get(), + degraded->get(), + localConfig.isValid() ? localConfig->lastSeenVersion() : Optional(), + localConfig.isValid() ? localConfig->configClassSet() : Optional(), + recoveredDiskFiles.isSet()); for (auto const& i : issues->get()) { request.issues.push_back_deep(request.issues.arena(), i); @@ -3319,8 +3319,11 @@ ACTOR Future fdbd(Reference connRecord, state std::vector> actors; state Promise recoveredDiskFiles; state Reference configNode; - state Reference localConfig = makeReference( - dataFolder, configPath, manualKnobOverrides, g_network->isSimulated() ? IsTest::True : IsTest::False); + state Reference localConfig; + if (configDBType != ConfigDBType::DISABLED) { + localConfig = makeReference( + dataFolder, configPath, manualKnobOverrides, g_network->isSimulated() ? IsTest::True : IsTest::False); + } // setupStackSignal(); getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::Worker; @@ -3341,7 +3344,8 @@ ACTOR Future fdbd(Reference connRecord, .detail("MachineId", localities.machineId()) .detail("DiskPath", dataFolder) .detail("CoordPath", coordFolder) - .detail("WhiteListBinPath", whitelistBinPaths); + .detail("WhiteListBinPath", whitelistBinPaths) + .detail("ConfigDBType", configDBType); state ConfigBroadcastInterface configBroadcastInterface; // SOMEDAY: start the services on the machine in a staggered fashion in simulation?