diff --git a/fdbserver/ConfigBroadcaster.actor.cpp b/fdbserver/ConfigBroadcaster.actor.cpp index 4d55bf9d53..451b4429a5 100644 --- a/fdbserver/ConfigBroadcaster.actor.cpp +++ b/fdbserver/ConfigBroadcaster.actor.cpp @@ -321,6 +321,22 @@ public: return registerNode(&self, this, w, lastSeenVersion, configClassSet, watcher, broadcastInterface); } + // Updates the broadcasters knowledge of which replicas are fully up to + // date, based on data gathered by the consumer. + void updateKnownReplicas(std::vector const& readReplicas) { + if (!newConfigNodesAllowed.canBeSet()) { + return; + } + + for (const auto& cfi : readReplicas) { + this->activeConfigNodes.insert(cfi.address()); + } + if (activeConfigNodes.size() >= coordinators / 2 + 1) { + disallowUnregistered = true; + newConfigNodesAllowed.send(Void()); + } + } + void applyChanges(Standalone> const& changes, Version mostRecentVersion, Standalone> const& annotations, @@ -334,15 +350,7 @@ public: addChanges(changes, mostRecentVersion, annotations); } - if (newConfigNodesAllowed.canBeSet()) { - for (const auto& cfi : readReplicas) { - this->activeConfigNodes.insert(cfi.address()); - } - if (activeConfigNodes.size() >= coordinators / 2 + 1) { - disallowUnregistered = true; - newConfigNodesAllowed.send(Void()); - } - } + updateKnownReplicas(readReplicas); } template @@ -361,13 +369,7 @@ public: .detail("ActiveReplicas", readReplicas.size()); actors.add(pushSnapshotAndChanges(this, snapshot, snapshotVersion, changes, changesVersion, annotations)); - for (const auto& cfi : readReplicas) { - this->activeConfigNodes.insert(cfi.address()); - } - if (activeConfigNodes.size() >= coordinators / 2 + 1 && newConfigNodesAllowed.canBeSet()) { - disallowUnregistered = true; - newConfigNodesAllowed.send(Void()); - } + updateKnownReplicas(readReplicas); } ConfigBroadcasterImpl(ConfigFollowerInterface const& cfi) : ConfigBroadcasterImpl() {