foundationdb/fdbserver/ConfigBroadcaster.actor.cpp
Lukas Joswiak 795b666e23 Fix a rare configuration database data loss bug
See the comment contained in this commit. This bug could only manifest
under a specific set of circumstances:

1. A coordinator change is started
2. The coordinator change succeeds, but its action of clearing
   `previousCoordinatorsKey` is delayed.
3. A minority of `ConfigNode`s have an old state of the configuration
   database, compared to the majority.
4. A `ConfigNode` in the majority dies and permanently loses data.
5. A long delay occurs on the `PaxosConfigConsumer` when it tries to
   read the latest changes from the `ConfigNode`s.

In the above circumstances, the `ConfigBroadcaster` could incorrectly
send a snapshot of an old state of the configuration database to a
majority of `ConfigNode`s. This would cause new, durable, and
acknowledged commit data to be overwritten.

Note that this bug only affects the configuration database (used for
knob storage). It does not affect the normal keyspace.
2022-11-22 11:20:04 -08:00

790 lines
34 KiB
C++

/*
* ConfigBroadcaster.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbserver/ConfigBroadcaster.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/IConfigConsumer.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // must be last include
namespace {
bool matchesConfigClass(ConfigClassSet const& configClassSet, Optional<KeyRef> configClass) {
return !configClass.present() || configClassSet.contains(configClass.get());
}
// Helper functions for STL containers, with flow-friendly error handling
template <class MapContainer, class K>
auto get(MapContainer& m, K const& k) -> decltype(m.at(k)) {
auto it = m.find(k);
ASSERT(it != m.end());
return it->second;
}
template <class Container, class K>
void remove(Container& container, K const& k) {
auto it = container.find(k);
ASSERT(it != container.end());
container.erase(it);
}
} // namespace
class ConfigBroadcasterImpl {
// Holds information about each client connected to the broadcaster.
struct BroadcastClientDetails {
// Triggered when the worker dies.
Future<Void> watcher;
ConfigClassSet configClassSet;
Version lastSeenVersion;
ConfigBroadcastInterface broadcastInterface;
bool operator==(BroadcastClientDetails const& rhs) const {
return configClassSet == rhs.configClassSet && lastSeenVersion == rhs.lastSeenVersion &&
broadcastInterface == rhs.broadcastInterface;
}
bool operator!=(BroadcastClientDetails const& rhs) const { return !(*this == rhs); }
BroadcastClientDetails() = default;
BroadcastClientDetails(Future<Void> watcher,
ConfigClassSet const& configClassSet,
Version lastSeenVersion,
ConfigBroadcastInterface broadcastInterface)
: watcher(watcher), configClassSet(configClassSet), lastSeenVersion(lastSeenVersion),
broadcastInterface(broadcastInterface) {}
};
ConfigDBType configDBType;
std::map<ConfigKey, KnobValue> snapshot;
std::deque<VersionedConfigMutation> mutationHistory;
std::deque<VersionedConfigCommitAnnotation> annotationHistory;
Version lastCompactedVersion;
Version largestLiveVersion;
Version mostRecentVersion;
CoordinatorsHash coordinatorsHash;
std::unique_ptr<IConfigConsumer> consumer;
Future<Void> consumerFuture;
ActorCollection actors{ false };
std::unordered_map<NetworkAddress, Future<Void>> registrationActors;
std::map<UID, BroadcastClientDetails> clients;
std::map<UID, Future<Void>> clientFailures;
// State related to changing coordinators
// Used to read a snapshot from the previous coordinators after a change
// coordinators command.
Future<Optional<Value>> previousCoordinatorsFuture;
std::unique_ptr<IConfigConsumer> previousCoordinatorsConsumer;
Future<Void> previousCoordinatorsSnapshotFuture;
Version largestConfigNodeVersion{ ::invalidVersion };
UID id;
CounterCollection cc;
Counter compactRequest;
Counter successfulChangeRequest;
Counter failedChangeRequest;
Counter snapshotRequest;
Future<Void> logger;
int coordinators = 0;
std::unordered_set<NetworkAddress> registeredConfigNodes;
std::unordered_set<NetworkAddress> activeConfigNodes;
std::unordered_set<NetworkAddress> registrationResponses;
std::unordered_set<NetworkAddress> registrationResponsesUnregistered;
bool disallowUnregistered = false;
Promise<Void> newConfigNodesAllowed;
Future<Void> pushSnapshot(Version snapshotVersion, BroadcastClientDetails const& client) {
if (client.lastSeenVersion >= snapshotVersion) {
return Void();
}
++snapshotRequest;
ConfigBroadcastSnapshotRequest request;
for (const auto& [key, value] : snapshot) {
if (matchesConfigClass(client.configClassSet, key.configClass)) {
request.snapshot[key] = value;
}
}
request.version = snapshotVersion;
// TODO: Don't need a delay if there are no atomic changes.
// Delay restarting the cluster controller to allow messages to be sent to workers.
request.restartDelay = client.broadcastInterface.address() == g_network->getLocalAddress()
? SERVER_KNOBS->BROADCASTER_SELF_UPDATE_DELAY
: 0.0;
TraceEvent(SevDebug, "ConfigBroadcasterSnapshotRequest", id)
.detail("Size", request.snapshot.size())
.detail("Version", request.version);
return success(brokenPromiseToNever(client.broadcastInterface.snapshot.getReply(request)));
}
template <class Changes>
Future<Void> pushChanges(BroadcastClientDetails& client, Changes const& changes) {
// Skip if client has already seen the latest version.
if (client.lastSeenVersion >= mostRecentVersion) {
return Void();
}
ConfigBroadcastChangesRequest req;
for (const auto& versionedMutation : changes) {
if (versionedMutation.version > client.lastSeenVersion &&
matchesConfigClass(client.configClassSet, versionedMutation.mutation.getConfigClass())) {
TraceEvent te(SevDebug, "ConfigBroadcasterSendingChangeMutation", id);
te.detail("Version", versionedMutation.version)
.detail("ReqLastSeenVersion", client.lastSeenVersion)
.detail("ConfigClass", versionedMutation.mutation.getConfigClass())
.detail("KnobName", versionedMutation.mutation.getKnobName())
.detail("ClientID", client.broadcastInterface.id());
if (versionedMutation.mutation.isSet()) {
te.detail("Op", "Set").detail("KnobValue", versionedMutation.mutation.getValue().toString());
} else {
te.detail("Op", "Clear");
}
req.changes.push_back_deep(req.changes.arena(), versionedMutation);
}
}
if (req.changes.size() == 0) {
return Void();
}
client.lastSeenVersion = mostRecentVersion;
req.mostRecentVersion = mostRecentVersion;
// TODO: Don't need a delay if there are no atomic changes.
// Delay restarting the cluster controller to allow messages to be sent to workers.
req.restartDelay = client.broadcastInterface.address() == g_network->getLocalAddress()
? SERVER_KNOBS->BROADCASTER_SELF_UPDATE_DELAY
: 0.0;
++successfulChangeRequest;
return success(client.broadcastInterface.changes.getReply(req));
}
ConfigBroadcasterImpl()
: lastCompactedVersion(0), largestLiveVersion(0), mostRecentVersion(0),
id(deterministicRandom()->randomUniqueID()), cc("ConfigBroadcaster"), compactRequest("CompactRequest", cc),
successfulChangeRequest("SuccessfulChangeRequest", cc), failedChangeRequest("FailedChangeRequest", cc),
snapshotRequest("SnapshotRequest", cc) {
logger = cc.traceCounters(
"ConfigBroadcasterMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, "ConfigBroadcasterMetrics");
}
void addChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version mostRecentVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations) {
this->mostRecentVersion = mostRecentVersion;
mutationHistory.insert(mutationHistory.end(), changes.begin(), changes.end());
annotationHistory.insert(annotationHistory.end(), annotations.begin(), annotations.end());
for (const auto& change : changes) {
const auto& mutation = change.mutation;
if (mutation.isSet()) {
snapshot[mutation.getKey()] = mutation.getValue();
} else {
snapshot.erase(mutation.getKey());
}
}
for (auto& [id, client] : clients) {
actors.add(brokenPromiseToNever(pushChanges(client, changes)));
}
}
ACTOR static Future<Void> pushSnapshotAndChanges(ConfigBroadcasterImpl* self, Version snapshotVersion) {
std::vector<Future<Void>> futures;
for (const auto& [id, client] : self->clients) {
futures.push_back(brokenPromiseToNever(self->pushSnapshot(snapshotVersion, client)));
}
wait(waitForAll(futures));
return Void();
}
ACTOR static Future<Void> waitForFailure(ConfigBroadcasterImpl* self,
Future<Void> watcher,
UID clientUID,
NetworkAddress clientAddress,
bool isCoordinator) {
wait(watcher);
TraceEvent(SevDebug, "ConfigBroadcastClientDied", self->id)
.detail("ClientID", clientUID)
.detail("Address", clientAddress)
.detail("IsUnregistered",
self->registrationResponsesUnregistered.find(clientAddress) !=
self->registrationResponsesUnregistered.end())
.detail("IsActive", self->activeConfigNodes.find(clientAddress) != self->activeConfigNodes.end());
self->clients.erase(clientUID);
self->clientFailures.erase(clientUID);
if (isCoordinator) {
self->registrationResponses.erase(clientAddress);
if (self->activeConfigNodes.find(clientAddress) != self->activeConfigNodes.end()) {
self->activeConfigNodes.erase(clientAddress);
if (self->registrationResponsesUnregistered.find(clientAddress) !=
self->registrationResponsesUnregistered.end()) {
self->registrationResponsesUnregistered.erase(clientAddress);
self->disallowUnregistered = false;
// See comment where this promise is reset below.
if (self->newConfigNodesAllowed.isSet()) {
self->newConfigNodesAllowed.reset();
}
}
}
}
return Void();
}
// Determines whether the registering ConfigNode is allowed to start
// serving configuration database requests and snapshot data. In order to
// ensure strict serializability, some nodes may be temporarily restricted
// from participation until the other nodes in the system are brought up to
// date.
ACTOR static Future<Void> registerNodeInternal(ConfigBroadcaster* broadcaster,
ConfigBroadcasterImpl* self,
ConfigBroadcastInterface configBroadcastInterface) {
if (self->configDBType == ConfigDBType::SIMPLE) {
self->consumerFuture = self->consumer->consume(*broadcaster);
wait(success(brokenPromiseToNever(
configBroadcastInterface.ready.getReply(ConfigBroadcastReadyRequest{ 0, {}, -1, -1 }))));
return Void();
}
state NetworkAddress address = configBroadcastInterface.address();
// Ask the registering ConfigNode whether it has registered in the past.
state ConfigBroadcastRegisteredReply reply = wait(
brokenPromiseToNever(configBroadcastInterface.registered.getReply(ConfigBroadcastRegisteredRequest{})));
self->largestConfigNodeVersion = std::max(self->largestConfigNodeVersion, reply.lastSeenVersion);
state bool registered = reply.registered;
TraceEvent("ConfigBroadcasterRegisterNodeReceivedRegistrationReply", self->id)
.detail("Address", address)
.detail("Registered", registered)
.detail("DisallowUnregistered", self->disallowUnregistered)
.detail("LastSeenVersion", reply.lastSeenVersion);
if (self->activeConfigNodes.find(address) != self->activeConfigNodes.end()) {
self->activeConfigNodes.erase(address);
if (self->registrationResponsesUnregistered.find(address) !=
self->registrationResponsesUnregistered.end()) {
self->registrationResponsesUnregistered.erase(address);
// If an unregistered node died which was active, reset the
// disallow unregistered flag so if it re-registers it can be
// set as active again.
self->disallowUnregistered = false;
// Since a node can die and re-register before the broadcaster
// receives notice that the node has died, we need to check for
// re-registration of a node here. There are two places that can
// reset the promise to allow new nodes, so make sure the promise
// is actually set before resetting it. This prevents a node from
// dying, registering, waiting on the promise, then the broadcaster
// receives the notification the node has died and resets the
// promise again.
if (self->newConfigNodesAllowed.isSet()) {
self->newConfigNodesAllowed.reset();
}
}
}
int responsesRemaining = self->coordinators - (int)self->registrationResponses.size();
int nodesTillQuorum = self->coordinators / 2 + 1 - (int)self->activeConfigNodes.size();
if (registered) {
self->registeredConfigNodes.insert(address);
self->activeConfigNodes.insert(address);
self->disallowUnregistered = true;
} else if ((self->activeConfigNodes.size() < self->coordinators / 2 + 1 && !self->disallowUnregistered) ||
(self->registrationResponsesUnregistered.size() < self->coordinators / 2 &&
responsesRemaining <= nodesTillQuorum)) {
// Received a registration request from an unregistered node. There
// are two cases where we want to allow unregistered nodes to
// register:
// * the cluster is just starting and no nodes are registered
// * there are registered and unregistered nodes, but the
// registered nodes may not represent a majority due to previous
// data loss. In this case, unregistered nodes must be allowed
// to register so they can be rolled forward and form a quorum.
// But only a minority of unregistered nodes should be allowed
// to register so they cannot override the registered nodes as
// a source of truth
self->activeConfigNodes.insert(address);
self->registrationResponsesUnregistered.insert(address);
if ((self->activeConfigNodes.size() >= self->coordinators / 2 + 1 ||
self->registrationResponsesUnregistered.size() >= self->coordinators / 2 + 1) &&
self->newConfigNodesAllowed.canBeSet()) {
self->newConfigNodesAllowed.send(Void());
}
} else {
self->disallowUnregistered = true;
}
self->registrationResponses.insert(address);
// Read previous coordinators and fetch snapshot from them if they
// exist. This path should only be hit once after the coordinators are
// changed.
wait(yield());
Optional<Value> previousCoordinators = wait(self->previousCoordinatorsFuture);
TraceEvent("ConfigBroadcasterRegisterNodeReadPreviousCoordinators", self->id)
.detail("PreviousCoordinators", previousCoordinators)
.detail("HasStartedConsumer", self->previousCoordinatorsSnapshotFuture.isValid());
if (previousCoordinators.present()) {
if (!self->previousCoordinatorsSnapshotFuture.isValid()) {
// Create a consumer to read a snapshot from the previous
// coordinators. The snapshot will be forwarded to the new
// coordinators to bring them up to date.
size_t previousCoordinatorsHash = std::hash<std::string>()(previousCoordinators.get().toString());
if (previousCoordinatorsHash != self->coordinatorsHash) {
ServerCoordinators previousCoordinatorsData(Reference<IClusterConnectionRecord>(
new ClusterConnectionMemoryRecord(previousCoordinators.get().toString())));
TraceEvent("ConfigBroadcasterRegisterNodeStartingConsumer", self->id).log();
self->previousCoordinatorsConsumer = IConfigConsumer::createPaxos(
previousCoordinatorsData, 0.5, SERVER_KNOBS->COMPACTION_INTERVAL, true);
self->previousCoordinatorsSnapshotFuture =
self->previousCoordinatorsConsumer->readSnapshot(*broadcaster);
} else {
// If the cluster controller restarts without a coordinator
// change having taken place, there is no need to read a
// previous snapshot.
self->previousCoordinatorsSnapshotFuture = Void();
}
}
wait(self->previousCoordinatorsSnapshotFuture);
}
state bool sendSnapshot =
self->previousCoordinatorsConsumer && reply.lastSeenVersion <= self->mostRecentVersion;
// If a coordinator change is ongoing, a quorum of ConfigNodes are
// already registered and the largest version at least one of those
// ConfigNodes knows about is greater than the version of the latest
// snapshot the broadcaster has, don't send a snapshot to any
// ConfigNodes. This could end up overwriting committed data. Consider
// the following scenario, with three ConfigNodes:
//
// T=0:
// A: v5
// T=1:
// change coordinators, new coordinators are B, C, D
// T=2:
// B: v5, C: v5, D: v5
// T=3:
// B: v5, C: v10, D: v10
// (some commits happen on only C and D)
// (previousCoordinatorsKey has not been cleared yet)
// T=4:
// D dies and loses its data
// T=5:
// D starts
// B: v5 (registered=yes), C: v10 (registered=yes), D: v0 (registered=no)
// Broadcaster: has an old snapshot, only knows about v5
// self->mostRecentVersion=5
// T=6:
// B, C, D (re-)register with broadcaster
//
// At T=5, the broadcaster would send snapshots to B and D because the
// largest version they know about (5) is less than or equal to
// self->mostRecentVersion (5). But this would cause a majority of
// nodes to think v5 is the latest committed version, causing C to be
// rolled back, and losing commit data between versions 5 and 10.
//
// This is a special case where the coordinators are being changed.
// During a coordinator change, a majority of ConfigNodes being
// registered means the coordinator change already took place, and it
// is being retried due to some failure. In that case, we don't want to
// resend snapshots if a majority of the new ConfigNodes are
// registered, because they could have been accepting commits. Instead,
// let the rollback/rollforward algorithm update the out of date nodes.
if (self->previousCoordinatorsConsumer && self->largestConfigNodeVersion > self->mostRecentVersion &&
self->registeredConfigNodes.size() >= self->coordinators / 2 + 1) {
sendSnapshot = false;
}
// Unregistered nodes need to wait for either:
// 1. A quorum of registered nodes to register and send their
// snapshots, so the unregistered nodes can be rolled forward, or
// 2. A quorum of unregistered nodes to contact the broadcaster (this
// means there is no previous data in the configuration database)
// The above conditions do not apply when changing coordinators, as a
// snapshot of the current state of the configuration database needs to
// be sent to all new coordinators.
TraceEvent("ConfigBroadcasterRegisterNodeDetermineEligibility", self->id)
.detail("Registered", registered)
.detail("SendSnapshot", sendSnapshot);
if (!registered && !sendSnapshot) {
wait(self->newConfigNodesAllowed.getFuture());
}
sendSnapshot = sendSnapshot || (!registered && self->snapshot.size() > 0);
TraceEvent("ConfigBroadcasterRegisterNodeSendingReadyRequest", self->id)
.detail("ConfigNodeAddress", address)
.detail("SendSnapshot", sendSnapshot)
.detail("SnapshotVersion", self->mostRecentVersion)
.detail("SnapshotSize", self->snapshot.size())
.detail("LargestLiveVersion", self->largestLiveVersion);
if (sendSnapshot) {
Version liveVersion = std::max(self->largestLiveVersion, self->mostRecentVersion);
wait(success(brokenPromiseToNever(configBroadcastInterface.ready.getReply(ConfigBroadcastReadyRequest{
self->coordinatorsHash, self->snapshot, self->mostRecentVersion, liveVersion }))));
} else {
wait(success(brokenPromiseToNever(configBroadcastInterface.ready.getReply(
ConfigBroadcastReadyRequest{ self->coordinatorsHash, {}, -1, -1 }))));
}
// Start the consumer last, so at least some nodes will be registered.
if (!self->consumerFuture.isValid()) {
if (sendSnapshot) {
self->consumer->allowSpecialCaseRollforward();
}
self->consumerFuture = self->consumer->consume(*broadcaster);
}
return Void();
}
ACTOR static Future<Void> registerNode(ConfigBroadcaster* self,
ConfigBroadcasterImpl* impl,
ConfigBroadcastInterface broadcastInterface,
Version lastSeenVersion,
ConfigClassSet configClassSet,
Future<Void> watcher,
bool isCoordinator) {
state BroadcastClientDetails client(
watcher, std::move(configClassSet), lastSeenVersion, std::move(broadcastInterface));
if (impl->clients.count(broadcastInterface.id())) {
// Client already registered
return Void();
}
TraceEvent(SevDebug, "ConfigBroadcasterRegisteringWorker", impl->id)
.detail("ClientID", broadcastInterface.id())
.detail("MostRecentVersion", impl->mostRecentVersion)
.detail("IsCoordinator", isCoordinator);
if (isCoordinator) {
// A client re-registering will cause the old actor to be
// cancelled.
impl->registrationActors[broadcastInterface.address()] =
registerNodeInternal(self, impl, broadcastInterface);
}
// Push full snapshot to worker if it isn't up to date.
wait(impl->pushSnapshot(impl->mostRecentVersion, client));
impl->clients[broadcastInterface.id()] = client;
impl->clientFailures[broadcastInterface.id()] =
waitForFailure(impl, watcher, broadcastInterface.id(), broadcastInterface.address(), isCoordinator);
return Void();
}
public:
Future<Void> registerNode(ConfigBroadcaster& self,
ConfigBroadcastInterface const& broadcastInterface,
Version lastSeenVersion,
ConfigClassSet configClassSet,
Future<Void> watcher,
bool isCoordinator) {
return registerNode(&self, this, broadcastInterface, lastSeenVersion, configClassSet, watcher, isCoordinator);
}
// Updates the broadcasters knowledge of which replicas are fully up to
// date, based on data gathered by the consumer.
void updateKnownReplicas(std::vector<ConfigFollowerInterface> const& readReplicas) {
if (!newConfigNodesAllowed.canBeSet()) {
return;
}
for (const auto& cfi : readReplicas) {
this->activeConfigNodes.insert(cfi.address());
}
if (activeConfigNodes.size() >= coordinators / 2 + 1) {
disallowUnregistered = true;
newConfigNodesAllowed.send(Void());
}
}
void applyChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version mostRecentVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations,
std::vector<ConfigFollowerInterface> const& readReplicas) {
if (mostRecentVersion >= 0) {
TraceEvent(SevInfo, "ConfigBroadcasterApplyingChanges", id)
.detail("ChangesSize", changes.size())
.detail("AnnotationsSize", annotations.size())
.detail("CurrentMostRecentVersion", this->mostRecentVersion)
.detail("NewMostRecentVersion", mostRecentVersion)
.detail("ActiveReplicas", readReplicas.size());
addChanges(changes, mostRecentVersion, annotations);
}
updateKnownReplicas(readReplicas);
}
template <class Snapshot>
void applySnapshotAndChanges(Snapshot&& snapshot,
Version snapshotVersion,
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version changesVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations,
std::vector<ConfigFollowerInterface> const& readReplicas,
Version largestLiveVersion,
bool fromPreviousCoordinators) {
TraceEvent(SevInfo, "ConfigBroadcasterApplyingSnapshotAndChanges", id)
.detail("CurrentMostRecentVersion", this->mostRecentVersion)
.detail("SnapshotSize", snapshot.size())
.detail("SnapshotVersion", snapshotVersion)
.detail("ChangesSize", changes.size())
.detail("ChangesVersion", changesVersion)
.detail("AnnotationsSize", annotations.size())
.detail("ActiveReplicas", readReplicas.size())
.detail("LargestLiveVersion", largestLiveVersion)
.detail("FromPreviousCoordinators", fromPreviousCoordinators);
// Avoid updating state if the snapshot contains no mutations, or if it
// contains old mutations. This can happen when the set of coordinators
// is changed, and a new coordinator comes online that has not yet had
// the current configuration database pushed to it, or when a new
// coordinator contains state from an old configuration database
// generation.
if ((snapshot.size() != 0 || changes.size() != 0) &&
(snapshotVersion > this->mostRecentVersion || changesVersion > this->mostRecentVersion)) {
this->snapshot = std::forward<Snapshot>(snapshot);
this->lastCompactedVersion = snapshotVersion;
this->largestLiveVersion = std::max(this->largestLiveVersion, largestLiveVersion);
addChanges(changes, changesVersion, annotations);
actors.add(pushSnapshotAndChanges(this, snapshotVersion));
}
if (!fromPreviousCoordinators) {
updateKnownReplicas(readReplicas);
}
}
ConfigBroadcasterImpl(ConfigFollowerInterface const& cfi) : ConfigBroadcasterImpl() {
configDBType = ConfigDBType::SIMPLE;
coordinators = 1;
consumer = IConfigConsumer::createTestSimple(cfi, 0.5, Optional<double>{});
TraceEvent(SevDebug, "ConfigBroadcasterStartingConsumer", id).detail("Consumer", consumer->getID());
}
ConfigBroadcasterImpl(ServerCoordinators const& coordinators,
ConfigDBType configDBType,
Future<Optional<Value>> previousCoordinatorsFuture)
: ConfigBroadcasterImpl() {
this->configDBType = configDBType;
this->coordinators = coordinators.configServers.size();
if (configDBType != ConfigDBType::DISABLED) {
if (configDBType == ConfigDBType::SIMPLE) {
consumer = IConfigConsumer::createSimple(coordinators, 0.5, SERVER_KNOBS->COMPACTION_INTERVAL);
} else {
this->previousCoordinatorsFuture = previousCoordinatorsFuture;
consumer = IConfigConsumer::createPaxos(coordinators, 0.5, SERVER_KNOBS->COMPACTION_INTERVAL);
}
coordinatorsHash = std::hash<std::string>()(coordinators.ccr->getConnectionString().toString());
TraceEvent(SevInfo, "ConfigBroadcasterStartingConsumer", id)
.detail("Consumer", consumer->getID())
.detail("UsingSimpleConsumer", configDBType == ConfigDBType::SIMPLE)
.detail("CoordinatorsCount", this->coordinators)
.detail("CoordinatorsHash", coordinatorsHash)
.detail("CompactionInterval", SERVER_KNOBS->COMPACTION_INTERVAL);
}
}
JsonBuilderObject getStatus() const {
JsonBuilderObject result;
JsonBuilderArray mutationsArray;
for (const auto& versionedMutation : mutationHistory) {
JsonBuilderObject mutationObject;
mutationObject["version"] = versionedMutation.version;
const auto& mutation = versionedMutation.mutation;
mutationObject["type"] = mutation.isSet() ? "set" : "clear";
mutationObject["config_class"] = mutation.getConfigClass().orDefault("<global>"_sr);
mutationObject["knob_name"] = mutation.getKnobName();
if (mutation.isSet()) {
mutationObject["knob_value"] = mutation.getValue().toString();
}
mutationsArray.push_back(std::move(mutationObject));
}
result["mutations"] = std::move(mutationsArray);
JsonBuilderArray commitsArray;
for (const auto& versionedAnnotation : annotationHistory) {
JsonBuilderObject commitObject;
commitObject["version"] = versionedAnnotation.version;
commitObject["description"] = versionedAnnotation.annotation.description;
commitObject["timestamp"] = versionedAnnotation.annotation.timestamp;
commitsArray.push_back(std::move(commitObject));
}
result["commits"] = std::move(commitsArray);
JsonBuilderObject snapshotObject;
std::map<Optional<Key>, std::vector<std::pair<Key, Value>>> snapshotMap;
for (const auto& [configKey, value] : snapshot) {
snapshotMap[configKey.configClass.castTo<Key>()].emplace_back(configKey.knobName, value.toString());
}
for (const auto& [configClass, kvs] : snapshotMap) {
JsonBuilderObject kvsObject;
for (const auto& [knobName, knobValue] : kvs) {
kvsObject[knobName] = knobValue;
}
snapshotObject[configClass.orDefault("<global>"_sr)] = std::move(kvsObject);
}
result["snapshot"] = std::move(snapshotObject);
result["last_compacted_version"] = lastCompactedVersion;
result["most_recent_version"] = mostRecentVersion;
return result;
}
void compact(Version compactionVersion) {
{
auto it = std::find_if(mutationHistory.begin(), mutationHistory.end(), [compactionVersion](const auto& vm) {
return vm.version > compactionVersion;
});
mutationHistory.erase(mutationHistory.begin(), it);
}
{
auto it = std::find_if(annotationHistory.begin(),
annotationHistory.end(),
[compactionVersion](const auto& va) { return va.version > compactionVersion; });
annotationHistory.erase(annotationHistory.begin(), it);
}
lastCompactedVersion = compactionVersion;
}
Future<Void> getError() const { return consumerFuture || actors.getResult(); }
Future<Void> getClientFailure(UID clientUID) const { return clientFailures.find(clientUID)->second; }
UID getID() const { return id; }
static void runPendingRequestStoreTest(bool includeGlobalMutation, int expectedMatches);
};
ConfigBroadcaster::ConfigBroadcaster() {}
ConfigBroadcaster::ConfigBroadcaster(ConfigFollowerInterface const& cfi)
: impl(PImpl<ConfigBroadcasterImpl>::create(cfi)) {}
ConfigBroadcaster::ConfigBroadcaster(ServerCoordinators const& coordinators,
ConfigDBType configDBType,
Future<Optional<Value>> previousCoordinatorsFuture)
: impl(PImpl<ConfigBroadcasterImpl>::create(coordinators, configDBType, previousCoordinatorsFuture)) {}
ConfigBroadcaster::ConfigBroadcaster(ConfigBroadcaster&&) = default;
ConfigBroadcaster& ConfigBroadcaster::operator=(ConfigBroadcaster&&) = default;
ConfigBroadcaster::~ConfigBroadcaster() = default;
Future<Void> ConfigBroadcaster::registerNode(ConfigBroadcastInterface const& broadcastInterface,
Version lastSeenVersion,
ConfigClassSet const& configClassSet,
Future<Void> watcher,
bool isCoordinator) {
return impl->registerNode(*this, broadcastInterface, lastSeenVersion, configClassSet, watcher, isCoordinator);
}
void ConfigBroadcaster::applyChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version mostRecentVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations,
std::vector<ConfigFollowerInterface> const& readReplicas) {
impl->applyChanges(changes, mostRecentVersion, annotations, readReplicas);
}
void ConfigBroadcaster::applySnapshotAndChanges(
std::map<ConfigKey, KnobValue> const& snapshot,
Version snapshotVersion,
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version changesVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations,
std::vector<ConfigFollowerInterface> const& readReplicas,
Version largestLiveVersion,
bool fromPreviousCoordinators) {
impl->applySnapshotAndChanges(snapshot,
snapshotVersion,
changes,
changesVersion,
annotations,
readReplicas,
largestLiveVersion,
fromPreviousCoordinators);
}
void ConfigBroadcaster::applySnapshotAndChanges(
std::map<ConfigKey, KnobValue>&& snapshot,
Version snapshotVersion,
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version changesVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations,
std::vector<ConfigFollowerInterface> const& readReplicas,
Version largestLiveVersion,
bool fromPreviousCoordinators) {
impl->applySnapshotAndChanges(std::move(snapshot),
snapshotVersion,
changes,
changesVersion,
annotations,
readReplicas,
largestLiveVersion,
fromPreviousCoordinators);
}
Future<Void> ConfigBroadcaster::getError() const {
return impl->getError();
}
Future<Void> ConfigBroadcaster::getClientFailure(UID clientUID) const {
return impl->getClientFailure(clientUID);
}
UID ConfigBroadcaster::getID() const {
return impl->getID();
}
JsonBuilderObject ConfigBroadcaster::getStatus() const {
return impl->getStatus();
}
void ConfigBroadcaster::compact(Version compactionVersion) {
impl->compact(compactionVersion);
}
ACTOR static Future<Void> lockConfigNodesImpl(ServerCoordinators coordinators) {
if (coordinators.configServers.empty()) {
return Void();
}
CoordinatorsHash coordinatorsHash = std::hash<std::string>()(coordinators.ccr->getConnectionString().toString());
std::vector<Future<Void>> lockRequests;
lockRequests.reserve(coordinators.configServers.size());
for (int i = 0; i < coordinators.configServers.size(); i++) {
if (coordinators.configServers[i].hostname.present()) {
lockRequests.push_back(retryGetReplyFromHostname(ConfigFollowerLockRequest{ coordinatorsHash },
coordinators.configServers[i].hostname.get(),
WLTOKEN_CONFIGFOLLOWER_LOCK));
} else {
lockRequests.push_back(
retryBrokenPromise(coordinators.configServers[i].lock, ConfigFollowerLockRequest{ coordinatorsHash }));
}
}
int quorum_size = lockRequests.size() / 2 + 1;
wait(quorum(lockRequests, quorum_size));
return Void();
}
Future<Void> ConfigBroadcaster::lockConfigNodes(ServerCoordinators coordinators) {
return lockConfigNodesImpl(coordinators);
}