Fix configuration database unit tests

This commit is contained in:
Lukas Joswiak 2022-07-29 17:28:34 -07:00
parent 1a33515934
commit 249ff2b2fd
14 changed files with 125 additions and 106 deletions

View File

@ -956,6 +956,9 @@ ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr,
std::sort(old.coords.begin(), old.coords.end()); std::sort(old.coords.begin(), old.coords.end());
if (conn->hostnames == old.hostnames && conn->coords == old.coords && old.clusterKeyName() == newName) { if (conn->hostnames == old.hostnames && conn->coords == old.coords && old.clusterKeyName() == newName) {
connectionStrings.clear(); connectionStrings.clear();
if (g_network->isSimulated() && g_simulator.configDBType == ConfigDBType::DISABLED) {
disableConfigDB = true;
}
if (!disableConfigDB) { if (!disableConfigDB) {
wait(verifyConfigurationDatabaseAlive(tr->getDatabase())); wait(verifyConfigurationDatabaseAlive(tr->getDatabase()));
} }

View File

@ -327,10 +327,10 @@ class PaxosConfigTransactionImpl {
} }
wait(waitForAll(fs)); wait(waitForAll(fs));
state Reference<ConfigTransactionInfo> configNodes(new ConfigTransactionInfo(readReplicas)); state Reference<ConfigTransactionInfo> configNodes(new ConfigTransactionInfo(readReplicas));
ConfigTransactionGetConfigClassesReply reply = ConfigTransactionGetConfigClassesReply reply = wait(
wait(basicLoadBalance(configNodes, basicLoadBalance(configNodes,
&ConfigTransactionInterface::getClasses, &ConfigTransactionInterface::getClasses,
ConfigTransactionGetConfigClassesRequest{ generation })); ConfigTransactionGetConfigClassesRequest{ self->coordinatorsHash, generation }));
RangeResult result; RangeResult result;
result.reserve(result.arena(), reply.configClasses.size()); result.reserve(result.arena(), reply.configClasses.size());
for (const auto& configClass : reply.configClasses) { for (const auto& configClass : reply.configClasses) {
@ -361,10 +361,10 @@ class PaxosConfigTransactionImpl {
} }
wait(waitForAll(fs)); wait(waitForAll(fs));
state Reference<ConfigTransactionInfo> configNodes(new ConfigTransactionInfo(readReplicas)); state Reference<ConfigTransactionInfo> configNodes(new ConfigTransactionInfo(readReplicas));
ConfigTransactionGetKnobsReply reply = ConfigTransactionGetKnobsReply reply = wait(basicLoadBalance(
wait(basicLoadBalance(configNodes, configNodes,
&ConfigTransactionInterface::getKnobs, &ConfigTransactionInterface::getKnobs,
ConfigTransactionGetKnobsRequest{ generation, configClass })); ConfigTransactionGetKnobsRequest{ self->coordinatorsHash, generation, configClass }));
RangeResult result; RangeResult result;
result.reserve(result.arena(), reply.knobNames.size()); result.reserve(result.arena(), reply.knobNames.size());
for (const auto& knobName : reply.knobNames) { for (const auto& knobName : reply.knobNames) {

View File

@ -43,11 +43,13 @@ class SimpleConfigTransactionImpl {
state ConfigTransactionGetGenerationReply reply; state ConfigTransactionGetGenerationReply reply;
if (self->cti.hostname.present()) { if (self->cti.hostname.present()) {
wait(store(reply, wait(store(reply,
retryGetReplyFromHostname(ConfigTransactionGetGenerationRequest{}, retryGetReplyFromHostname(ConfigTransactionGetGenerationRequest{ 0, Optional<Version>() },
self->cti.hostname.get(), self->cti.hostname.get(),
WLTOKEN_CONFIGTXN_GETGENERATION))); WLTOKEN_CONFIGTXN_GETGENERATION)));
} else { } else {
wait(store(reply, retryBrokenPromise(self->cti.getGeneration, ConfigTransactionGetGenerationRequest{}))); wait(store(reply,
retryBrokenPromise(self->cti.getGeneration,
ConfigTransactionGetGenerationRequest{ 0, Optional<Version>() })));
} }
if (self->dID.present()) { if (self->dID.present()) {
TraceEvent("SimpleConfigTransactionGotReadVersion", self->dID.get()) TraceEvent("SimpleConfigTransactionGotReadVersion", self->dID.get())
@ -96,13 +98,13 @@ class SimpleConfigTransactionImpl {
state ConfigTransactionGetConfigClassesReply reply; state ConfigTransactionGetConfigClassesReply reply;
if (self->cti.hostname.present()) { if (self->cti.hostname.present()) {
wait(store(reply, wait(store(reply,
retryGetReplyFromHostname(ConfigTransactionGetConfigClassesRequest{ generation }, retryGetReplyFromHostname(ConfigTransactionGetConfigClassesRequest{ 0, generation },
self->cti.hostname.get(), self->cti.hostname.get(),
WLTOKEN_CONFIGTXN_GETCLASSES))); WLTOKEN_CONFIGTXN_GETCLASSES)));
} else { } else {
wait(store( wait(store(
reply, reply,
retryBrokenPromise(self->cti.getClasses, ConfigTransactionGetConfigClassesRequest{ generation }))); retryBrokenPromise(self->cti.getClasses, ConfigTransactionGetConfigClassesRequest{ 0, generation })));
} }
RangeResult result; RangeResult result;
for (const auto& configClass : reply.configClasses) { for (const auto& configClass : reply.configClasses) {
@ -119,13 +121,13 @@ class SimpleConfigTransactionImpl {
state ConfigTransactionGetKnobsReply reply; state ConfigTransactionGetKnobsReply reply;
if (self->cti.hostname.present()) { if (self->cti.hostname.present()) {
wait(store(reply, wait(store(reply,
retryGetReplyFromHostname(ConfigTransactionGetKnobsRequest{ generation, configClass }, retryGetReplyFromHostname(ConfigTransactionGetKnobsRequest{ 0, generation, configClass },
self->cti.hostname.get(), self->cti.hostname.get(),
WLTOKEN_CONFIGTXN_GETKNOBS))); WLTOKEN_CONFIGTXN_GETKNOBS)));
} else { } else {
wait(store( wait(store(reply,
reply, retryBrokenPromise(self->cti.getKnobs,
retryBrokenPromise(self->cti.getKnobs, ConfigTransactionGetKnobsRequest{ generation, configClass }))); ConfigTransactionGetKnobsRequest{ 0, generation, configClass })));
} }
RangeResult result; RangeResult result;
for (const auto& knobName : reply.knobNames) { for (const auto& knobName : reply.knobNames) {
@ -138,6 +140,7 @@ class SimpleConfigTransactionImpl {
if (!self->getGenerationFuture.isValid()) { if (!self->getGenerationFuture.isValid()) {
self->getGenerationFuture = getGeneration(self); self->getGenerationFuture = getGeneration(self);
} }
self->toCommit.coordinatorsHash = 0;
wait(store(self->toCommit.generation, self->getGenerationFuture)); wait(store(self->toCommit.generation, self->getGenerationFuture));
self->toCommit.annotation.timestamp = now(); self->toCommit.annotation.timestamp = now();
if (self->cti.hostname.present()) { if (self->cti.hostname.present()) {

View File

@ -150,15 +150,17 @@ struct ConfigTransactionGetConfigClassesReply {
struct ConfigTransactionGetConfigClassesRequest { struct ConfigTransactionGetConfigClassesRequest {
static constexpr FileIdentifier file_identifier = 7163400; static constexpr FileIdentifier file_identifier = 7163400;
size_t coordinatorsHash;
ConfigGeneration generation; ConfigGeneration generation;
ReplyPromise<ConfigTransactionGetConfigClassesReply> reply; ReplyPromise<ConfigTransactionGetConfigClassesReply> reply;
ConfigTransactionGetConfigClassesRequest() = default; ConfigTransactionGetConfigClassesRequest() = default;
explicit ConfigTransactionGetConfigClassesRequest(ConfigGeneration generation) : generation(generation) {} explicit ConfigTransactionGetConfigClassesRequest(size_t coordinatorsHash, ConfigGeneration generation)
: coordinatorsHash(coordinatorsHash), generation(generation) {}
template <class Ar> template <class Ar>
void serialize(Ar& ar) { void serialize(Ar& ar) {
serializer(ar, generation); serializer(ar, coordinatorsHash, generation);
} }
}; };
@ -177,17 +179,20 @@ struct ConfigTransactionGetKnobsReply {
struct ConfigTransactionGetKnobsRequest { struct ConfigTransactionGetKnobsRequest {
static constexpr FileIdentifier file_identifier = 987410; static constexpr FileIdentifier file_identifier = 987410;
size_t coordinatorsHash;
ConfigGeneration generation; ConfigGeneration generation;
Optional<Key> configClass; Optional<Key> configClass;
ReplyPromise<ConfigTransactionGetKnobsReply> reply; ReplyPromise<ConfigTransactionGetKnobsReply> reply;
ConfigTransactionGetKnobsRequest() = default; ConfigTransactionGetKnobsRequest() = default;
explicit ConfigTransactionGetKnobsRequest(ConfigGeneration generation, Optional<Key> configClass) explicit ConfigTransactionGetKnobsRequest(size_t coordinatorsHash,
: generation(generation), configClass(configClass) {} ConfigGeneration generation,
Optional<Key> configClass)
: coordinatorsHash(coordinatorsHash), generation(generation), configClass(configClass) {}
template <class Ar> template <class Ar>
void serialize(Ar& ar) { void serialize(Ar& ar) {
serializer(ar, generation, configClass, reply); serializer(ar, coordinatorsHash, generation, configClass, reply);
} }
}; };

View File

@ -1223,13 +1223,12 @@ ACTOR Future<Void> registerWorker(RegisterWorkerRequest req,
w.locality.processId() == self->db.serverInfo->get().master.locality.processId()) { w.locality.processId() == self->db.serverInfo->get().master.locality.processId()) {
self->masterProcessId = w.locality.processId(); self->masterProcessId = w.locality.processId();
} }
if (configBroadcaster != nullptr) { if (configBroadcaster != nullptr && req.lastSeenKnobVersion.present() && req.knobConfigClassSet.present()) {
self->addActor.send(configBroadcaster->registerNode( self->addActor.send(configBroadcaster->registerNode(
w,
req.lastSeenKnobVersion,
req.knobConfigClassSet,
self->id_worker[w.locality.processId()].watcher,
self->id_worker[w.locality.processId()].details.interf.configBroadcastInterface, self->id_worker[w.locality.processId()].details.interf.configBroadcastInterface,
req.lastSeenKnobVersion.get(),
req.knobConfigClassSet.get(),
self->id_worker[w.locality.processId()].watcher,
isCoordinator)); isCoordinator));
} }
self->updateDBInfoEndpoints.insert(w.updateServerDBInfo.getEndpoint()); self->updateDBInfoEndpoints.insert(w.updateServerDBInfo.getEndpoint());
@ -1261,12 +1260,11 @@ ACTOR Future<Void> registerWorker(RegisterWorkerRequest req,
self->updateDBInfoEndpoints.insert(w.updateServerDBInfo.getEndpoint()); self->updateDBInfoEndpoints.insert(w.updateServerDBInfo.getEndpoint());
self->updateDBInfo.trigger(); self->updateDBInfo.trigger();
} }
if (configBroadcaster != nullptr) { if (configBroadcaster != nullptr && req.lastSeenKnobVersion.present() && req.knobConfigClassSet.present()) {
self->addActor.send(configBroadcaster->registerNode(w, self->addActor.send(configBroadcaster->registerNode(info->second.details.interf.configBroadcastInterface,
req.lastSeenKnobVersion, req.lastSeenKnobVersion.get(),
req.knobConfigClassSet, req.knobConfigClassSet.get(),
info->second.watcher, info->second.watcher,
info->second.details.interf.configBroadcastInterface,
isCoordinator)); isCoordinator));
} }
checkOutstandingRequests(self); checkOutstandingRequests(self);

View File

@ -521,7 +521,6 @@ ACTOR Future<Void> changeCoordinators(Reference<ClusterRecoveryData> self) {
loop { loop {
ChangeCoordinatorsRequest req = waitNext(self->clusterController.changeCoordinators.getFuture()); ChangeCoordinatorsRequest req = waitNext(self->clusterController.changeCoordinators.getFuture());
TraceEvent("ChangeCoordinators", self->dbgid).log(); TraceEvent("ChangeCoordinators", self->dbgid).log();
++self->changeCoordinatorsRequests; ++self->changeCoordinatorsRequests;
state ChangeCoordinatorsRequest changeCoordinatorsRequest = req; state ChangeCoordinatorsRequest changeCoordinatorsRequest = req;
if (self->masterInterface.id() != changeCoordinatorsRequest.masterId) { if (self->masterInterface.id() != changeCoordinatorsRequest.masterId) {

View File

@ -245,17 +245,18 @@ class ConfigBroadcasterImpl {
// date. // date.
ACTOR static Future<Void> registerNodeInternal(ConfigBroadcaster* broadcaster, ACTOR static Future<Void> registerNodeInternal(ConfigBroadcaster* broadcaster,
ConfigBroadcasterImpl* self, ConfigBroadcasterImpl* self,
WorkerInterface w) { ConfigBroadcastInterface configBroadcastInterface) {
if (self->configDBType == ConfigDBType::SIMPLE) { if (self->configDBType == ConfigDBType::SIMPLE) {
wait(success( self->consumerFuture = self->consumer->consume(*broadcaster);
brokenPromiseToNever(w.configBroadcastInterface.ready.getReply(ConfigBroadcastReadyRequest{})))); wait(success(brokenPromiseToNever(
configBroadcastInterface.ready.getReply(ConfigBroadcastReadyRequest{ 0, {}, -1, -1 }))));
return Void(); return Void();
} }
state NetworkAddress address = w.address(); state NetworkAddress address = configBroadcastInterface.address();
// Ask the registering ConfigNode whether it has registered in the past. // Ask the registering ConfigNode whether it has registered in the past.
state ConfigBroadcastRegisteredReply reply = wait( state ConfigBroadcastRegisteredReply reply = wait(
brokenPromiseToNever(w.configBroadcastInterface.registered.getReply(ConfigBroadcastRegisteredRequest{}))); brokenPromiseToNever(configBroadcastInterface.registered.getReply(ConfigBroadcastRegisteredRequest{})));
self->maxLastSeenVersion = std::max(self->maxLastSeenVersion, reply.lastSeenVersion); self->maxLastSeenVersion = std::max(self->maxLastSeenVersion, reply.lastSeenVersion);
state bool registered = reply.registered; state bool registered = reply.registered;
TraceEvent("ConfigBroadcasterRegisterNodeReceivedRegistrationReply", self->id) TraceEvent("ConfigBroadcasterRegisterNodeReceivedRegistrationReply", self->id)
@ -373,10 +374,10 @@ class ConfigBroadcasterImpl {
.detail("LargestLiveVersion", self->largestLiveVersion); .detail("LargestLiveVersion", self->largestLiveVersion);
if (sendSnapshot) { if (sendSnapshot) {
Version liveVersion = std::max(self->largestLiveVersion, self->mostRecentVersion); Version liveVersion = std::max(self->largestLiveVersion, self->mostRecentVersion);
wait(success(brokenPromiseToNever(w.configBroadcastInterface.ready.getReply(ConfigBroadcastReadyRequest{ wait(success(brokenPromiseToNever(configBroadcastInterface.ready.getReply(ConfigBroadcastReadyRequest{
self->coordinatorsHash, self->snapshot, self->mostRecentVersion, liveVersion })))); self->coordinatorsHash, self->snapshot, self->mostRecentVersion, liveVersion }))));
} else { } else {
wait(success(brokenPromiseToNever(w.configBroadcastInterface.ready.getReply( wait(success(brokenPromiseToNever(configBroadcastInterface.ready.getReply(
ConfigBroadcastReadyRequest{ self->coordinatorsHash, {}, -1, -1 })))); ConfigBroadcastReadyRequest{ self->coordinatorsHash, {}, -1, -1 }))));
} }
@ -392,11 +393,10 @@ class ConfigBroadcasterImpl {
ACTOR static Future<Void> registerNode(ConfigBroadcaster* self, ACTOR static Future<Void> registerNode(ConfigBroadcaster* self,
ConfigBroadcasterImpl* impl, ConfigBroadcasterImpl* impl,
WorkerInterface w, ConfigBroadcastInterface broadcastInterface,
Version lastSeenVersion, Version lastSeenVersion,
ConfigClassSet configClassSet, ConfigClassSet configClassSet,
Future<Void> watcher, Future<Void> watcher,
ConfigBroadcastInterface broadcastInterface,
bool isCoordinator) { bool isCoordinator) {
state BroadcastClientDetails client( state BroadcastClientDetails client(
watcher, std::move(configClassSet), lastSeenVersion, std::move(broadcastInterface)); watcher, std::move(configClassSet), lastSeenVersion, std::move(broadcastInterface));
@ -412,27 +412,25 @@ class ConfigBroadcasterImpl {
.detail("IsCoordinator", isCoordinator); .detail("IsCoordinator", isCoordinator);
if (isCoordinator) { if (isCoordinator) {
impl->actors.add(registerNodeInternal(self, impl, w)); impl->actors.add(registerNodeInternal(self, impl, broadcastInterface));
} }
// Push full snapshot to worker if it isn't up to date. // Push full snapshot to worker if it isn't up to date.
wait(impl->pushSnapshot(impl->mostRecentVersion, client)); wait(impl->pushSnapshot(impl->mostRecentVersion, client));
impl->clients[broadcastInterface.id()] = client; impl->clients[broadcastInterface.id()] = client;
impl->clientFailures[broadcastInterface.id()] = impl->clientFailures[broadcastInterface.id()] =
waitForFailure(impl, watcher, broadcastInterface.id(), w.address(), isCoordinator); waitForFailure(impl, watcher, broadcastInterface.id(), broadcastInterface.address(), isCoordinator);
return Void(); return Void();
} }
public: public:
Future<Void> registerNode(ConfigBroadcaster& self, Future<Void> registerNode(ConfigBroadcaster& self,
WorkerInterface const& w, ConfigBroadcastInterface const& broadcastInterface,
Version lastSeenVersion, Version lastSeenVersion,
ConfigClassSet configClassSet, ConfigClassSet configClassSet,
Future<Void> watcher, Future<Void> watcher,
ConfigBroadcastInterface const& broadcastInterface,
bool isCoordinator) { bool isCoordinator) {
return registerNode( return registerNode(&self, this, broadcastInterface, lastSeenVersion, configClassSet, watcher, isCoordinator);
&self, this, w, lastSeenVersion, configClassSet, watcher, broadcastInterface, isCoordinator);
} }
// Updates the broadcasters knowledge of which replicas are fully up to // Updates the broadcasters knowledge of which replicas are fully up to
@ -621,13 +619,12 @@ ConfigBroadcaster& ConfigBroadcaster::operator=(ConfigBroadcaster&&) = default;
ConfigBroadcaster::~ConfigBroadcaster() = default; ConfigBroadcaster::~ConfigBroadcaster() = default;
Future<Void> ConfigBroadcaster::registerNode(WorkerInterface const& w, Future<Void> ConfigBroadcaster::registerNode(ConfigBroadcastInterface const& broadcastInterface,
Version lastSeenVersion, Version lastSeenVersion,
ConfigClassSet const& configClassSet, ConfigClassSet const& configClassSet,
Future<Void> watcher, Future<Void> watcher,
ConfigBroadcastInterface const& broadcastInterface,
bool isCoordinator) { bool isCoordinator) {
return impl->registerNode(*this, w, lastSeenVersion, configClassSet, watcher, broadcastInterface, isCoordinator); return impl->registerNode(*this, broadcastInterface, lastSeenVersion, configClassSet, watcher, isCoordinator);
} }
void ConfigBroadcaster::applyChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes, void ConfigBroadcaster::applyChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,

View File

@ -269,8 +269,8 @@ class BroadcasterToLocalConfigEnvironment {
wait(self->readFrom.setup()); wait(self->readFrom.setup());
self->cbi = makeReference<AsyncVar<ConfigBroadcastInterface>>(); self->cbi = makeReference<AsyncVar<ConfigBroadcastInterface>>();
self->readFrom.connectToBroadcaster(self->cbi); self->readFrom.connectToBroadcaster(self->cbi);
self->broadcastServer = self->broadcaster.registerNode( self->broadcastServer =
WorkerInterface(), 0, configClassSet, self->workerFailure.getFuture(), self->cbi->get(), true); self->broadcaster.registerNode(self->cbi->get(), 0, configClassSet, self->workerFailure.getFuture(), true);
return Void(); return Void();
} }
@ -305,12 +305,8 @@ public:
broadcastServer.cancel(); broadcastServer.cancel();
cbi->set(ConfigBroadcastInterface{}); cbi->set(ConfigBroadcastInterface{});
readFrom.connectToBroadcaster(cbi); readFrom.connectToBroadcaster(cbi);
broadcastServer = broadcaster.registerNode(WorkerInterface(), broadcastServer = broadcaster.registerNode(
readFrom.lastSeenVersion(), cbi->get(), readFrom.lastSeenVersion(), readFrom.configClassSet(), workerFailure.getFuture(), true);
readFrom.configClassSet(),
workerFailure.getFuture(),
cbi->get(),
true);
} }
Future<Void> restartLocalConfig(std::string const& newConfigPath) { Future<Void> restartLocalConfig(std::string const& newConfigPath) {
@ -442,8 +438,8 @@ class TransactionToLocalConfigEnvironment {
wait(self->readFrom.setup()); wait(self->readFrom.setup());
self->cbi = makeReference<AsyncVar<ConfigBroadcastInterface>>(); self->cbi = makeReference<AsyncVar<ConfigBroadcastInterface>>();
self->readFrom.connectToBroadcaster(self->cbi); self->readFrom.connectToBroadcaster(self->cbi);
self->broadcastServer = self->broadcaster.registerNode( self->broadcastServer =
WorkerInterface(), 0, configClassSet, self->workerFailure.getFuture(), self->cbi->get(), true); self->broadcaster.registerNode(self->cbi->get(), 0, configClassSet, self->workerFailure.getFuture(), true);
return Void(); return Void();
} }
@ -462,12 +458,8 @@ public:
broadcastServer.cancel(); broadcastServer.cancel();
cbi->set(ConfigBroadcastInterface{}); cbi->set(ConfigBroadcastInterface{});
readFrom.connectToBroadcaster(cbi); readFrom.connectToBroadcaster(cbi);
broadcastServer = broadcaster.registerNode(WorkerInterface(), broadcastServer = broadcaster.registerNode(
readFrom.lastSeenVersion(), cbi->get(), readFrom.lastSeenVersion(), readFrom.configClassSet(), workerFailure.getFuture(), true);
readFrom.configClassSet(),
workerFailure.getFuture(),
cbi->get(),
true);
} }
Future<Void> restartLocalConfig(std::string const& newConfigPath) { Future<Void> restartLocalConfig(std::string const& newConfigPath) {

View File

@ -135,18 +135,23 @@ class ConfigNodeImpl {
Counter getGenerationRequests; Counter getGenerationRequests;
Future<Void> logger; Future<Void> logger;
ACTOR static Future<Optional<size_t>> getCoordinatorsHash(ConfigNodeImpl* self) { ACTOR static Future<size_t> getCoordinatorsHash(ConfigNodeImpl* self) {
state size_t coordinatorsHash = 0;
Optional<Value> value = wait(self->kvStore->readValue(coordinatorsHashKey)); Optional<Value> value = wait(self->kvStore->readValue(coordinatorsHashKey));
if (!value.present()) { if (value.present()) {
return Optional<size_t>(); coordinatorsHash = BinaryReader::fromStringRef<size_t>(value.get(), IncludeVersion());
} else {
self->kvStore->set(
KeyValueRef(coordinatorsHashKey, BinaryWriter::toValue(coordinatorsHash, IncludeVersion())));
wait(self->kvStore->commit());
} }
return BinaryReader::fromStringRef<size_t>(value.get(), IncludeVersion()); return coordinatorsHash;
} }
ACTOR static Future<Optional<size_t>> getLocked(ConfigNodeImpl* self) { ACTOR static Future<Optional<size_t>> getLocked(ConfigNodeImpl* self) {
Optional<Value> value = wait(self->kvStore->readValue(lockedKey)); Optional<Value> value = wait(self->kvStore->readValue(lockedKey));
if (!value.present()) { if (!value.present()) {
return false; return Optional<size_t>();
} }
return BinaryReader::fromStringRef<Optional<size_t>>(value.get(), IncludeVersion()); return BinaryReader::fromStringRef<Optional<size_t>>(value.get(), IncludeVersion());
} }
@ -247,9 +252,8 @@ class ConfigNodeImpl {
// New transactions increment the database's current live version. This effectively serves as a lock, providing // New transactions increment the database's current live version. This effectively serves as a lock, providing
// serializability // serializability
ACTOR static Future<Void> getNewGeneration(ConfigNodeImpl* self, ConfigTransactionGetGenerationRequest req) { ACTOR static Future<Void> getNewGeneration(ConfigNodeImpl* self, ConfigTransactionGetGenerationRequest req) {
state Optional<size_t> coordinatorsHash = wait(getCoordinatorsHash(self)); state size_t coordinatorsHash = wait(getCoordinatorsHash(self));
ASSERT(coordinatorsHash.present()); if (req.coordinatorsHash != coordinatorsHash) {
if (req.coordinatorsHash != coordinatorsHash.get()) {
req.reply.sendError(coordinators_changed()); req.reply.sendError(coordinators_changed());
return Void(); return Void();
} }
@ -273,9 +277,8 @@ class ConfigNodeImpl {
req.reply.sendError(coordinators_changed()); req.reply.sendError(coordinators_changed());
return Void(); return Void();
} }
state Optional<size_t> coordinatorsHash = wait(getCoordinatorsHash(self)); state size_t coordinatorsHash = wait(getCoordinatorsHash(self));
ASSERT(coordinatorsHash.present()); if (req.coordinatorsHash != coordinatorsHash) {
if (req.coordinatorsHash != coordinatorsHash.get()) {
req.reply.sendError(coordinators_changed()); req.reply.sendError(coordinators_changed());
return Void(); return Void();
} }
@ -317,7 +320,11 @@ class ConfigNodeImpl {
req.reply.sendError(coordinators_changed()); req.reply.sendError(coordinators_changed());
return Void(); return Void();
} }
state size_t coordinatorsHash = wait(getCoordinatorsHash(self));
if (req.coordinatorsHash != coordinatorsHash) {
req.reply.sendError(coordinators_changed());
return Void();
}
ConfigGeneration currentGeneration = wait(getGeneration(self)); ConfigGeneration currentGeneration = wait(getGeneration(self));
if (req.generation != currentGeneration) { if (req.generation != currentGeneration) {
req.reply.sendError(transaction_too_old()); req.reply.sendError(transaction_too_old());
@ -357,6 +364,11 @@ class ConfigNodeImpl {
req.reply.sendError(coordinators_changed()); req.reply.sendError(coordinators_changed());
return Void(); return Void();
} }
state size_t coordinatorsHash = wait(getCoordinatorsHash(self));
if (req.coordinatorsHash != coordinatorsHash) {
req.reply.sendError(coordinators_changed());
return Void();
}
ConfigGeneration currentGeneration = wait(getGeneration(self)); ConfigGeneration currentGeneration = wait(getGeneration(self));
if (req.generation != currentGeneration) { if (req.generation != currentGeneration) {
@ -441,9 +453,8 @@ class ConfigNodeImpl {
req.reply.sendError(coordinators_changed()); req.reply.sendError(coordinators_changed());
return Void(); return Void();
} }
state Optional<size_t> coordinatorsHash = wait(getCoordinatorsHash(self)); state size_t coordinatorsHash = wait(getCoordinatorsHash(self));
ASSERT(coordinatorsHash.present()); if (req.coordinatorsHash != coordinatorsHash) {
if (req.coordinatorsHash != coordinatorsHash.get()) {
req.reply.sendError(coordinators_changed()); req.reply.sendError(coordinators_changed());
return Void(); return Void();
} }
@ -747,8 +758,8 @@ class ConfigNodeImpl {
} }
when(state ConfigFollowerLockRequest req = waitNext(cfi->lock.getFuture())) { when(state ConfigFollowerLockRequest req = waitNext(cfi->lock.getFuture())) {
++self->lockRequests; ++self->lockRequests;
Optional<size_t> coordinatorsHash = wait(getCoordinatorsHash(self)); size_t coordinatorsHash = wait(getCoordinatorsHash(self));
if (!coordinatorsHash.present() || coordinatorsHash.get() == req.coordinatorsHash) { if (coordinatorsHash == 0 || coordinatorsHash == req.coordinatorsHash) {
TraceEvent("ConfigNodeLocking", self->id).log(); TraceEvent("ConfigNodeLocking", self->id).log();
self->kvStore->set(KeyValueRef(registeredKey, BinaryWriter::toValue(false, IncludeVersion()))); self->kvStore->set(KeyValueRef(registeredKey, BinaryWriter::toValue(false, IncludeVersion())));
self->kvStore->set(KeyValueRef( self->kvStore->set(KeyValueRef(
@ -803,6 +814,8 @@ public:
return serve(this, &cbi, &cti, &cfi); return serve(this, &cbi, &cti, &cfi);
} }
Future<Void> serve(ConfigBroadcastInterface const& cbi) { return serve(this, &cbi, true); }
Future<Void> serve(ConfigTransactionInterface const& cti) { return serve(this, &cti); } Future<Void> serve(ConfigTransactionInterface const& cti) { return serve(this, &cti); }
Future<Void> serve(ConfigFollowerInterface const& cfi) { Future<Void> serve(ConfigFollowerInterface const& cfi) {
@ -824,6 +837,10 @@ Future<Void> ConfigNode::serve(ConfigBroadcastInterface const& cbi,
return impl->serve(cbi, cti, cfi); return impl->serve(cbi, cti, cfi);
} }
Future<Void> ConfigNode::serve(ConfigBroadcastInterface const& cbi) {
return impl->serve(cbi);
}
Future<Void> ConfigNode::serve(ConfigTransactionInterface const& cti) { Future<Void> ConfigNode::serve(ConfigTransactionInterface const& cti) {
return impl->serve(cti); return impl->serve(cti);
} }

View File

@ -192,6 +192,7 @@ public:
bool operator==(ConfigBroadcastInterface const& rhs) const { return (_id == rhs._id); } bool operator==(ConfigBroadcastInterface const& rhs) const { return (_id == rhs._id); }
bool operator!=(ConfigBroadcastInterface const& rhs) const { return !(*this == rhs); } bool operator!=(ConfigBroadcastInterface const& rhs) const { return !(*this == rhs); }
UID id() const { return _id; } UID id() const { return _id; }
NetworkAddress address() const { return snapshot.getEndpoint().getPrimaryAddress(); }
template <class Ar> template <class Ar>
void serialize(Ar& ar) { void serialize(Ar& ar) {

View File

@ -44,11 +44,10 @@ public:
ConfigBroadcaster(ConfigBroadcaster&&); ConfigBroadcaster(ConfigBroadcaster&&);
ConfigBroadcaster& operator=(ConfigBroadcaster&&); ConfigBroadcaster& operator=(ConfigBroadcaster&&);
~ConfigBroadcaster(); ~ConfigBroadcaster();
Future<Void> registerNode(WorkerInterface const& w, Future<Void> registerNode(ConfigBroadcastInterface const& broadcastInterface,
Version lastSeenVersion, Version lastSeenVersion,
ConfigClassSet const& configClassSet, ConfigClassSet const& configClassSet,
Future<Void> watcher, Future<Void> watcher,
ConfigBroadcastInterface const& worker,
bool isCoordinator); bool isCoordinator);
void applyChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes, void applyChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version mostRecentVersion, Version mostRecentVersion,

View File

@ -38,6 +38,7 @@ public:
ConfigFollowerInterface const&); ConfigFollowerInterface const&);
public: // Testing public: // Testing
Future<Void> serve(ConfigBroadcastInterface const&);
Future<Void> serve(ConfigTransactionInterface const&); Future<Void> serve(ConfigTransactionInterface const&);
Future<Void> serve(ConfigFollowerInterface const&); Future<Void> serve(ConfigFollowerInterface const&);
void close(); void close();

View File

@ -434,8 +434,8 @@ struct RegisterWorkerRequest {
std::vector<NetworkAddress> incompatiblePeers; std::vector<NetworkAddress> incompatiblePeers;
ReplyPromise<RegisterWorkerReply> reply; ReplyPromise<RegisterWorkerReply> reply;
bool degraded; bool degraded;
Version lastSeenKnobVersion; Optional<Version> lastSeenKnobVersion;
ConfigClassSet knobConfigClassSet; Optional<ConfigClassSet> knobConfigClassSet;
bool requestDbInfo; bool requestDbInfo;
bool recoveredDiskFiles; bool recoveredDiskFiles;
@ -451,8 +451,8 @@ struct RegisterWorkerRequest {
Optional<BlobManagerInterface> bmInterf, Optional<BlobManagerInterface> bmInterf,
Optional<EncryptKeyProxyInterface> ekpInterf, Optional<EncryptKeyProxyInterface> ekpInterf,
bool degraded, bool degraded,
Version lastSeenKnobVersion, Optional<Version> lastSeenKnobVersion,
ConfigClassSet knobConfigClassSet, Optional<ConfigClassSet> knobConfigClassSet,
bool recoveredDiskFiles) bool recoveredDiskFiles)
: wi(wi), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo), : wi(wi), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo),
generation(generation), distributorInterf(ddInterf), ratekeeperInterf(rkInterf), blobManagerInterf(bmInterf), generation(generation), distributorInterf(ddInterf), ratekeeperInterf(rkInterf), blobManagerInterf(bmInterf),

View File

@ -589,20 +589,20 @@ ACTOR Future<Void> registrationClient(
incorrectTime = Optional<double>(); incorrectTime = Optional<double>();
} }
RegisterWorkerRequest request(interf, RegisterWorkerRequest request(
initialClass, interf,
processClass, initialClass,
asyncPriorityInfo->get(), processClass,
requestGeneration++, asyncPriorityInfo->get(),
ddInterf->get(), requestGeneration++,
rkInterf->get(), ddInterf->get(),
bmInterf->get().present() ? bmInterf->get().get().second rkInterf->get(),
: Optional<BlobManagerInterface>(), bmInterf->get().present() ? bmInterf->get().get().second : Optional<BlobManagerInterface>(),
ekpInterf->get(), ekpInterf->get(),
degraded->get(), degraded->get(),
localConfig->lastSeenVersion(), localConfig.isValid() ? localConfig->lastSeenVersion() : Optional<Version>(),
localConfig->configClassSet(), localConfig.isValid() ? localConfig->configClassSet() : Optional<ConfigClassSet>(),
recoveredDiskFiles.isSet()); recoveredDiskFiles.isSet());
for (auto const& i : issues->get()) { for (auto const& i : issues->get()) {
request.issues.push_back_deep(request.issues.arena(), i); request.issues.push_back_deep(request.issues.arena(), i);
@ -3319,8 +3319,11 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
state std::vector<Future<Void>> actors; state std::vector<Future<Void>> actors;
state Promise<Void> recoveredDiskFiles; state Promise<Void> recoveredDiskFiles;
state Reference<ConfigNode> configNode; state Reference<ConfigNode> configNode;
state Reference<LocalConfiguration> localConfig = makeReference<LocalConfiguration>( state Reference<LocalConfiguration> localConfig;
dataFolder, configPath, manualKnobOverrides, g_network->isSimulated() ? IsTest::True : IsTest::False); if (configDBType != ConfigDBType::DISABLED) {
localConfig = makeReference<LocalConfiguration>(
dataFolder, configPath, manualKnobOverrides, g_network->isSimulated() ? IsTest::True : IsTest::False);
}
// setupStackSignal(); // setupStackSignal();
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::Worker; getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::Worker;
@ -3341,7 +3344,8 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
.detail("MachineId", localities.machineId()) .detail("MachineId", localities.machineId())
.detail("DiskPath", dataFolder) .detail("DiskPath", dataFolder)
.detail("CoordPath", coordFolder) .detail("CoordPath", coordFolder)
.detail("WhiteListBinPath", whitelistBinPaths); .detail("WhiteListBinPath", whitelistBinPaths)
.detail("ConfigDBType", configDBType);
state ConfigBroadcastInterface configBroadcastInterface; state ConfigBroadcastInterface configBroadcastInterface;
// SOMEDAY: start the services on the machine in a staggered fashion in simulation? // SOMEDAY: start the services on the machine in a staggered fashion in simulation?