mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-14 09:58:50 +08:00
Refactor ConfigBroadcaster
This commit is contained in:
parent
9d0843254f
commit
a5749de6b6
@ -6,6 +6,8 @@ set(FDBSERVER_SRCS
|
||||
BackupProgress.actor.h
|
||||
BackupWorker.actor.cpp
|
||||
ClusterController.actor.cpp
|
||||
ConfigBroadcaster.actor.cpp
|
||||
ConfigBroadcaster.h
|
||||
ConfigFollowerInterface.cpp
|
||||
ConfigFollowerInterface.h
|
||||
ConflictSet.h
|
||||
@ -26,7 +28,7 @@ set(FDBSERVER_SRCS
|
||||
FDBExecHelper.actor.h
|
||||
GrvProxyServer.actor.cpp
|
||||
IConfigDatabaseNode.h
|
||||
IConfigBroadcaster.h
|
||||
IConfigConsumer.h
|
||||
IDiskQueue.h
|
||||
IKeyValueContainer.h
|
||||
IKeyValueStore.h
|
||||
@ -93,7 +95,7 @@ set(FDBSERVER_SRCS
|
||||
ResolverInterface.h
|
||||
ServerDBInfo.actor.h
|
||||
ServerDBInfo.h
|
||||
SimpleConfigBroadcaster.actor.cpp
|
||||
SimpleConfigConsumer.actor.cpp
|
||||
SimpleConfigDatabaseNode.actor.cpp
|
||||
SimulatedCluster.actor.cpp
|
||||
SimulatedCluster.h
|
||||
|
@ -31,7 +31,7 @@
|
||||
#include "fdbserver/CoordinationInterface.h"
|
||||
#include "fdbserver/DataDistributorInterface.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/IConfigBroadcaster.h"
|
||||
#include "fdbserver/ConfigBroadcaster.h"
|
||||
#include "fdbserver/MoveKeys.actor.h"
|
||||
#include "fdbserver/WorkerInterface.actor.h"
|
||||
#include "fdbserver/LeaderElection.h"
|
||||
@ -3419,7 +3419,7 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
|
||||
ServerCoordinators coordinators,
|
||||
LocalityData locality) {
|
||||
state ClusterControllerData self(interf, locality, coordinators);
|
||||
state SimpleConfigBroadcaster configBroadcaster(coordinators);
|
||||
state ConfigBroadcaster configBroadcaster(coordinators);
|
||||
state Future<Void> coordinationPingDelay = delay(SERVER_KNOBS->WORKER_COORDINATION_PING_DELAY);
|
||||
state uint64_t step = 0;
|
||||
state Future<ErrorOr<Void>> error = errorOr(actorCollection(self.addActor.getFuture()));
|
||||
|
180
fdbserver/ConfigBroadcaster.actor.cpp
Normal file
180
fdbserver/ConfigBroadcaster.actor.cpp
Normal file
@ -0,0 +1,180 @@
|
||||
/*
|
||||
* ConfigBroadcaster.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbserver/ConfigBroadcaster.h"
|
||||
#include "fdbserver/IConfigConsumer.h"
|
||||
#include "flow/actorcompiler.h" // must be last include
|
||||
|
||||
class ConfigBroadcasterImpl {
|
||||
std::map<Key, Endpoint::Token> configClassToToken;
|
||||
std::map<Endpoint::Token, ReplyPromise<ConfigFollowerGetChangesRequest>> tokenToReply;
|
||||
std::map<Endpoint::Token, std::vector<Key>> tokenToConfigClasses;
|
||||
std::map<ConfigKey, Value> snapshot;
|
||||
std::deque<Standalone<VersionedConfigMutationRef>> versionedMutations;
|
||||
Version lastCompactedVersion;
|
||||
Version mostRecentVersion;
|
||||
std::unique_ptr<IConfigConsumer> consumer;
|
||||
ActorCollection actors{ false };
|
||||
|
||||
CounterCollection cc;
|
||||
Counter compactRequest;
|
||||
Counter successfulChangeRequest;
|
||||
Counter failedChangeRequest;
|
||||
Counter fullDBRequest;
|
||||
Future<Void> logger;
|
||||
|
||||
ConfigBroadcasterImpl()
|
||||
: lastCompactedVersion(0), mostRecentVersion(0), cc("ConfigBroadcaster"), compactRequest("CompactRequest", cc),
|
||||
successfulChangeRequest("SuccessfulChangeRequest", cc), failedChangeRequest("FailedChangeRequest", cc),
|
||||
fullDBRequest("FullDBRequest", cc) {
|
||||
logger = traceCounters(
|
||||
"ConfigBroadcasterMetrics", UID{}, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ConfigBroadcasterMetrics");
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> serve(ConfigBroadcaster* self, ConfigBroadcasterImpl* impl, ConfigFollowerInterface cfi) {
|
||||
wait(impl->consumer->getInitialSnapshot(*self));
|
||||
impl->actors.add(impl->consumer->consume(*self));
|
||||
loop {
|
||||
choose {
|
||||
when(ConfigFollowerGetVersionRequest req = waitNext(cfi.getVersion.getFuture())) {
|
||||
req.reply.send(impl->mostRecentVersion);
|
||||
}
|
||||
when(ConfigFollowerGetFullDatabaseRequest req = waitNext(cfi.getFullDatabase.getFuture())) {
|
||||
++impl->fullDBRequest;
|
||||
ConfigFollowerGetFullDatabaseReply reply;
|
||||
reply.database = impl->snapshot;
|
||||
for (const auto& versionedMutation : impl->versionedMutations) {
|
||||
const auto& version = versionedMutation.version;
|
||||
const auto& mutation = versionedMutation.mutation;
|
||||
if (version > req.version) {
|
||||
break;
|
||||
}
|
||||
TraceEvent(SevDebug, "BroadcasterAppendingMutationToFullDBOutput")
|
||||
.detail("ReqVersion", req.version)
|
||||
.detail("MutationVersion", version)
|
||||
.detail("ConfigClass", mutation.getConfigClass())
|
||||
.detail("KnobName", mutation.getKnobName())
|
||||
.detail("KnobValue", mutation.getValue());
|
||||
if (mutation.isSet()) {
|
||||
reply.database[mutation.getKey()] = mutation.getValue();
|
||||
} else {
|
||||
reply.database.erase(mutation.getKey());
|
||||
}
|
||||
}
|
||||
req.reply.send(reply);
|
||||
}
|
||||
when(ConfigFollowerGetChangesRequest req = waitNext(cfi.getChanges.getFuture())) {
|
||||
if (req.lastSeenVersion < impl->lastCompactedVersion) {
|
||||
req.reply.sendError(version_already_compacted());
|
||||
++impl->failedChangeRequest;
|
||||
continue;
|
||||
}
|
||||
ConfigFollowerGetChangesReply reply;
|
||||
reply.mostRecentVersion = impl->mostRecentVersion;
|
||||
for (const auto& versionedMutation : impl->versionedMutations) {
|
||||
if (versionedMutation.version > req.lastSeenVersion) {
|
||||
TraceEvent(SevDebug, "BroadcasterSendingChangeMutation")
|
||||
.detail("Version", versionedMutation.version)
|
||||
.detail("ReqLastSeenVersion", req.lastSeenVersion)
|
||||
.detail("ConfigClass", versionedMutation.mutation.getConfigClass())
|
||||
.detail("KnobName", versionedMutation.mutation.getKnobName())
|
||||
.detail("KnobValue", versionedMutation.mutation.getValue());
|
||||
reply.versionedMutations.push_back_deep(reply.versionedMutations.arena(),
|
||||
versionedMutation);
|
||||
}
|
||||
}
|
||||
req.reply.send(reply);
|
||||
++impl->successfulChangeRequest;
|
||||
}
|
||||
when(ConfigFollowerCompactRequest req = waitNext(cfi.compact.getFuture())) {
|
||||
++impl->compactRequest;
|
||||
while (!impl->versionedMutations.empty()) {
|
||||
const auto& versionedMutation = impl->versionedMutations.front();
|
||||
const auto& version = versionedMutation.version;
|
||||
const auto& mutation = versionedMutation.mutation;
|
||||
if (version > req.version) {
|
||||
break;
|
||||
} else {
|
||||
TraceEvent(SevDebug, "BroadcasterCompactingMutation")
|
||||
.detail("ReqVersion", req.version)
|
||||
.detail("MutationVersion", version)
|
||||
.detail("ConfigClass", mutation.getConfigClass())
|
||||
.detail("KnobName", mutation.getKnobName())
|
||||
.detail("KnobValue", mutation.getValue())
|
||||
.detail("LastCompactedVersion", impl->lastCompactedVersion);
|
||||
if (mutation.isSet()) {
|
||||
impl->snapshot[mutation.getKey()] = mutation.getValue();
|
||||
} else {
|
||||
impl->snapshot.erase(mutation.getKey());
|
||||
}
|
||||
impl->lastCompactedVersion = version;
|
||||
impl->versionedMutations.pop_front();
|
||||
}
|
||||
}
|
||||
req.reply.send(Void());
|
||||
}
|
||||
when(wait(impl->actors.getResult())) { ASSERT(false); }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
Future<Void> serve(ConfigBroadcaster* self, ConfigFollowerInterface const& cfi) { return serve(self, this, cfi); }
|
||||
|
||||
void addVersionedMutations(Standalone<VectorRef<VersionedConfigMutationRef>> const& versionedMutations,
|
||||
Version mostRecentVersion) {
|
||||
// TODO: Implement
|
||||
}
|
||||
|
||||
void setSnapshot(std::map<ConfigKey, Value>&& snapshot, Version lastCompactedVersion) {
|
||||
this->snapshot = std::move(snapshot);
|
||||
this->lastCompactedVersion = lastCompactedVersion;
|
||||
}
|
||||
|
||||
ConfigBroadcasterImpl(ClusterConnectionString const& ccs) : ConfigBroadcasterImpl() {
|
||||
consumer = std::make_unique<SimpleConfigConsumer>(ccs);
|
||||
}
|
||||
|
||||
ConfigBroadcasterImpl(ServerCoordinators const& coordinators) : ConfigBroadcasterImpl() {
|
||||
consumer = std::make_unique<SimpleConfigConsumer>(coordinators);
|
||||
}
|
||||
};
|
||||
|
||||
ConfigBroadcaster::ConfigBroadcaster(ClusterConnectionString const& ccs)
|
||||
: impl(std::make_unique<ConfigBroadcasterImpl>(ccs)) {}
|
||||
|
||||
ConfigBroadcaster::ConfigBroadcaster(ServerCoordinators const& coordinators)
|
||||
: impl(std::make_unique<ConfigBroadcasterImpl>(coordinators)) {}
|
||||
|
||||
ConfigBroadcaster::~ConfigBroadcaster() = default;
|
||||
|
||||
Future<Void> ConfigBroadcaster::serve(ConfigFollowerInterface const& cfi) {
|
||||
return impl->serve(this, cfi);
|
||||
}
|
||||
|
||||
void ConfigBroadcaster::addVersionedMutations(
|
||||
Standalone<VectorRef<VersionedConfigMutationRef>> const& versionedMutations,
|
||||
Version mostRecentVersion) {
|
||||
impl->addVersionedMutations(versionedMutations, mostRecentVersion);
|
||||
}
|
||||
|
||||
void ConfigBroadcaster::setSnapshot(std::map<ConfigKey, Value>&& snapshot, Version lastCompactedVersion) {
|
||||
impl->setSnapshot(std::move(snapshot), lastCompactedVersion);
|
||||
}
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* IConfigBroadcaster.h
|
||||
* ConfigBroadcaster.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
@ -26,18 +26,14 @@
|
||||
#include "flow/flow.h"
|
||||
#include <memory>
|
||||
|
||||
class IConfigBroadcaster {
|
||||
public:
|
||||
virtual Future<Void> serve(ConfigFollowerInterface const&) = 0;
|
||||
};
|
||||
|
||||
class SimpleConfigBroadcaster : public IConfigBroadcaster {
|
||||
std::unique_ptr<class SimpleConfigBroadcasterImpl> impl;
|
||||
class ConfigBroadcaster {
|
||||
std::unique_ptr<class ConfigBroadcasterImpl> impl;
|
||||
|
||||
public:
|
||||
// TODO: Clean up constructors
|
||||
SimpleConfigBroadcaster(ClusterConnectionString const&);
|
||||
SimpleConfigBroadcaster(ServerCoordinators const&);
|
||||
~SimpleConfigBroadcaster();
|
||||
Future<Void> serve(ConfigFollowerInterface const&) override;
|
||||
ConfigBroadcaster(ClusterConnectionString const&);
|
||||
ConfigBroadcaster(ServerCoordinators const&);
|
||||
~ConfigBroadcaster();
|
||||
Future<Void> serve(ConfigFollowerInterface const&);
|
||||
void addVersionedMutations(Standalone<VectorRef<VersionedConfigMutationRef>> const&, Version mostRecentVersion);
|
||||
void setSnapshot(std::map<ConfigKey, Value>&& snapshot, Version lastCompectedVersion);
|
||||
};
|
46
fdbserver/IConfigConsumer.h
Normal file
46
fdbserver/IConfigConsumer.h
Normal file
@ -0,0 +1,46 @@
|
||||
/*
|
||||
* ConfigBroadcaster.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "fdbclient/CoordinationInterface.h"
|
||||
#include "fdbserver/ConfigBroadcaster.h"
|
||||
#include "fdbserver/CoordinationInterface.h"
|
||||
#include "fdbserver/ConfigFollowerInterface.h"
|
||||
#include "flow/flow.h"
|
||||
#include <memory>
|
||||
|
||||
class IConfigConsumer {
|
||||
public:
|
||||
virtual ~IConfigConsumer() = default;
|
||||
virtual Future<Void> getInitialSnapshot(ConfigBroadcaster& broadcaster) = 0;
|
||||
virtual Future<Void> consume(ConfigBroadcaster& broadcaster) = 0;
|
||||
};
|
||||
|
||||
class SimpleConfigConsumer : public IConfigConsumer {
|
||||
std::unique_ptr<class SimpleConfigConsumerImpl> impl;
|
||||
|
||||
public:
|
||||
SimpleConfigConsumer(ClusterConnectionString const& ccs);
|
||||
SimpleConfigConsumer(ServerCoordinators const& coordinators);
|
||||
~SimpleConfigConsumer();
|
||||
Future<Void> getInitialSnapshot(ConfigBroadcaster& broadcaster) override;
|
||||
Future<Void> consume(ConfigBroadcaster& broadcaster) override;
|
||||
};
|
@ -1,243 +0,0 @@
|
||||
/*
|
||||
* SimpleConfigBroadcaster.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbserver/IConfigBroadcaster.h"
|
||||
|
||||
class SimpleConfigBroadcasterImpl {
|
||||
ConfigFollowerInterface subscriber;
|
||||
std::map<ConfigKey, Value> database;
|
||||
// TODO: Should create fewer arenas
|
||||
std::deque<Standalone<VersionedConfigMutationRef>> versionedMutations;
|
||||
Version lastCompactedVersion;
|
||||
Version mostRecentVersion;
|
||||
ActorCollection actors{ false };
|
||||
|
||||
CounterCollection cc;
|
||||
Counter compactRequestIn;
|
||||
Counter successfulChangeRequestIn;
|
||||
Counter failedChangeRequestIn;
|
||||
Counter fullDBRequestIn;
|
||||
Counter compactRequestOut;
|
||||
Counter successfulChangeRequestOut;
|
||||
Counter failedChangeRequestOut;
|
||||
Counter fullDBRequestOut;
|
||||
Future<Void> logger;
|
||||
|
||||
static const double POLLING_INTERVAL; // TODO: Make knob?
|
||||
static const double COMPACTION_INTERVAL; // TODO: Make knob?
|
||||
|
||||
ACTOR static Future<Void> fetchUpdates(SimpleConfigBroadcasterImpl *self) {
|
||||
loop {
|
||||
try {
|
||||
ConfigFollowerGetChangesReply reply = wait(self->subscriber.getChanges.getReply(
|
||||
ConfigFollowerGetChangesRequest{ self->mostRecentVersion, {} }));
|
||||
++self->successfulChangeRequestOut;
|
||||
for (const auto& versionedMutation : reply.versionedMutations) {
|
||||
TraceEvent(SevDebug, "BroadcasterFetchedMutation")
|
||||
.detail("Version", versionedMutation.version)
|
||||
.detail("ConfigClass", versionedMutation.mutation.getConfigClass())
|
||||
.detail("KnobName", versionedMutation.mutation.getKnobName())
|
||||
.detail("KnobValue", versionedMutation.mutation.getValue());
|
||||
self->versionedMutations.push_back(versionedMutation);
|
||||
}
|
||||
self->mostRecentVersion = reply.mostRecentVersion;
|
||||
wait(delayJittered(POLLING_INTERVAL));
|
||||
} catch (Error& e) {
|
||||
++self->failedChangeRequestOut;
|
||||
if (e.code() == error_code_version_already_compacted) {
|
||||
ConfigFollowerGetVersionReply versionReply =
|
||||
wait(self->subscriber.getVersion.getReply(ConfigFollowerGetVersionRequest{}));
|
||||
ASSERT(versionReply.version > self->mostRecentVersion);
|
||||
self->mostRecentVersion = versionReply.version;
|
||||
ConfigFollowerGetFullDatabaseReply dbReply = wait(self->subscriber.getFullDatabase.getReply(
|
||||
ConfigFollowerGetFullDatabaseRequest{ self->mostRecentVersion, {} }));
|
||||
self->database = dbReply.database;
|
||||
++self->fullDBRequestOut;
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> compactor(SimpleConfigBroadcasterImpl* self) {
|
||||
loop {
|
||||
wait(delayJittered(COMPACTION_INTERVAL));
|
||||
// TODO: Enable compaction once bugs are fixed
|
||||
// wait(self->subscriber.compact.getReply(ConfigFollowerCompactRequest{ self->mostRecentVersion }));
|
||||
//++self->compactRequestOut;
|
||||
}
|
||||
}
|
||||
|
||||
void traceQueuedMutations() {
|
||||
TraceEvent te("SimpleConfigBroadcasterQueuedMutations");
|
||||
te.detail("Size", versionedMutations.size());
|
||||
int index = 0;
|
||||
for (const auto &versionedMutation : versionedMutations) {
|
||||
te.detail(format("Version%d", index), versionedMutation.version);
|
||||
te.detail(format("ConfigClass%d", index), versionedMutation.mutation.getConfigClass());
|
||||
te.detail(format("KnobName%d", index), versionedMutation.mutation.getKnobName());
|
||||
te.detail(format("KnobValue%d", index), versionedMutation.mutation.getValue());
|
||||
++index;
|
||||
}
|
||||
}
|
||||
|
||||
static void removeRange(std::map<Key, Value> &database, KeyRef begin, KeyRef end) {
|
||||
ASSERT(end >= begin);
|
||||
auto b = database.lower_bound(begin);
|
||||
auto e = database.lower_bound(end);
|
||||
database.erase(b, e);
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> serve(SimpleConfigBroadcasterImpl* self, ConfigFollowerInterface publisher) {
|
||||
ConfigFollowerGetVersionReply versionReply =
|
||||
wait(self->subscriber.getVersion.getReply(ConfigFollowerGetVersionRequest{}));
|
||||
self->mostRecentVersion = versionReply.version;
|
||||
ConfigFollowerGetFullDatabaseReply reply = wait(self->subscriber.getFullDatabase.getReply(
|
||||
ConfigFollowerGetFullDatabaseRequest{ self->mostRecentVersion, {} }));
|
||||
TraceEvent(SevDebug, "BroadcasterGotInitialFullDB")
|
||||
.detail("Version", self->mostRecentVersion)
|
||||
.detail("PublisherID", publisher.id());
|
||||
self->database = reply.database;
|
||||
self->actors.add(fetchUpdates(self));
|
||||
self->actors.add(compactor(self));
|
||||
loop {
|
||||
//self->traceQueuedMutations();
|
||||
choose {
|
||||
when(ConfigFollowerGetVersionRequest req = waitNext(publisher.getVersion.getFuture())) {
|
||||
req.reply.send(self->mostRecentVersion);
|
||||
}
|
||||
when(ConfigFollowerGetFullDatabaseRequest req = waitNext(publisher.getFullDatabase.getFuture())) {
|
||||
++self->fullDBRequestIn;
|
||||
ConfigFollowerGetFullDatabaseReply reply;
|
||||
reply.database = self->database;
|
||||
for (const auto &versionedMutation : self->versionedMutations) {
|
||||
const auto &version = versionedMutation.version;
|
||||
const auto &mutation = versionedMutation.mutation;
|
||||
if (version > req.version) {
|
||||
break;
|
||||
}
|
||||
TraceEvent(SevDebug, "BroadcasterAppendingMutationToFullDBOutput")
|
||||
.detail("ReqVersion", req.version)
|
||||
.detail("MutationVersion", version)
|
||||
.detail("ConfigClass", mutation.getConfigClass())
|
||||
.detail("KnobName", mutation.getKnobName())
|
||||
.detail("KnobValue", mutation.getValue());
|
||||
if (mutation.isSet()) {
|
||||
reply.database[mutation.getKey()] = mutation.getValue();
|
||||
} else {
|
||||
reply.database.erase(mutation.getKey());
|
||||
}
|
||||
}
|
||||
req.reply.send(reply);
|
||||
}
|
||||
when(ConfigFollowerGetChangesRequest req = waitNext(publisher.getChanges.getFuture())) {
|
||||
if (req.lastSeenVersion < self->lastCompactedVersion) {
|
||||
req.reply.sendError(version_already_compacted());
|
||||
++self->failedChangeRequestIn;
|
||||
continue;
|
||||
}
|
||||
ConfigFollowerGetChangesReply reply;
|
||||
reply.mostRecentVersion = self->mostRecentVersion;
|
||||
for (const auto &versionedMutation : self->versionedMutations) {
|
||||
if (versionedMutation.version > req.lastSeenVersion) {
|
||||
TraceEvent(SevDebug, "BroadcasterSendingChangeMutation")
|
||||
.detail("Version", versionedMutation.version)
|
||||
.detail("ReqLastSeenVersion", req.lastSeenVersion)
|
||||
.detail("ConfigClass", versionedMutation.mutation.getConfigClass())
|
||||
.detail("KnobName", versionedMutation.mutation.getKnobName())
|
||||
.detail("KnobValue", versionedMutation.mutation.getValue());
|
||||
reply.versionedMutations.push_back_deep(reply.versionedMutations.arena(),
|
||||
versionedMutation);
|
||||
}
|
||||
}
|
||||
req.reply.send(reply);
|
||||
++self->successfulChangeRequestIn;
|
||||
}
|
||||
when(ConfigFollowerCompactRequest req = waitNext(publisher.compact.getFuture())) {
|
||||
++self->compactRequestIn;
|
||||
while (!self->versionedMutations.empty()) {
|
||||
const auto& versionedMutation = self->versionedMutations.front();
|
||||
const auto& version = versionedMutation.version;
|
||||
const auto& mutation = versionedMutation.mutation;
|
||||
if (version > req.version) {
|
||||
break;
|
||||
} else {
|
||||
TraceEvent(SevDebug, "BroadcasterCompactingMutation")
|
||||
.detail("ReqVersion", req.version)
|
||||
.detail("MutationVersion", version)
|
||||
.detail("ConfigClass", mutation.getConfigClass())
|
||||
.detail("KnobName", mutation.getKnobName())
|
||||
.detail("KnobValue", mutation.getValue())
|
||||
.detail("LastCompactedVersion", self->lastCompactedVersion);
|
||||
if (mutation.isSet()) {
|
||||
self->database[mutation.getKey()] = mutation.getValue();
|
||||
} else {
|
||||
self->database.erase(mutation.getKey());
|
||||
}
|
||||
self->lastCompactedVersion = version;
|
||||
self->versionedMutations.pop_front();
|
||||
}
|
||||
}
|
||||
req.reply.send(Void());
|
||||
}
|
||||
when(wait(self->actors.getResult())) { ASSERT(false); }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SimpleConfigBroadcasterImpl()
|
||||
: lastCompactedVersion(0), mostRecentVersion(0), cc("ConfigBroadcaster"),
|
||||
compactRequestIn("CompactRequestIn", cc), successfulChangeRequestIn("SuccessfulChangeRequestIn", cc),
|
||||
failedChangeRequestIn("FailedChangeRequestIn", cc), fullDBRequestIn("FullDBRequestIn", cc),
|
||||
compactRequestOut("CompactRequestOut", cc), successfulChangeRequestOut("SuccessfulChangeRequestOut", cc),
|
||||
failedChangeRequestOut("FailedChangeRequestOut", cc), fullDBRequestOut("FullDBRequestOut", cc) {
|
||||
logger = traceCounters(
|
||||
"ConfigBroadcasterMetrics", UID{}, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ConfigBroadcasterMetrics");
|
||||
}
|
||||
|
||||
public:
|
||||
SimpleConfigBroadcasterImpl(ClusterConnectionString const& ccs) : SimpleConfigBroadcasterImpl() {
|
||||
auto coordinators = ccs.coordinators();
|
||||
std::sort(coordinators.begin(), coordinators.end());
|
||||
subscriber = ConfigFollowerInterface(coordinators[0]);
|
||||
}
|
||||
|
||||
SimpleConfigBroadcasterImpl(ServerCoordinators const& coordinators) : SimpleConfigBroadcasterImpl() {
|
||||
subscriber = ConfigFollowerInterface(coordinators.configServers[0]);
|
||||
}
|
||||
|
||||
Future<Void> serve(ConfigFollowerInterface const& publisher) { return serve(this, publisher); }
|
||||
};
|
||||
|
||||
const double SimpleConfigBroadcasterImpl::POLLING_INTERVAL = 0.5;
|
||||
const double SimpleConfigBroadcasterImpl::COMPACTION_INTERVAL = 5.0;
|
||||
|
||||
SimpleConfigBroadcaster::SimpleConfigBroadcaster(ClusterConnectionString const& ccs)
|
||||
: impl(std::make_unique<SimpleConfigBroadcasterImpl>(ccs)) {}
|
||||
|
||||
SimpleConfigBroadcaster::SimpleConfigBroadcaster(ServerCoordinators const& coordinators)
|
||||
: impl(std::make_unique<SimpleConfigBroadcasterImpl>(coordinators)) {}
|
||||
|
||||
SimpleConfigBroadcaster::~SimpleConfigBroadcaster() = default;
|
||||
|
||||
Future<Void> SimpleConfigBroadcaster::serve(ConfigFollowerInterface const& publisher) {
|
||||
return impl->serve(publisher);
|
||||
}
|
138
fdbserver/SimpleConfigConsumer.actor.cpp
Normal file
138
fdbserver/SimpleConfigConsumer.actor.cpp
Normal file
@ -0,0 +1,138 @@
|
||||
/*
|
||||
* SimpleConfigBroadcaster.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbserver/IConfigConsumer.h"
|
||||
#include "fdbserver/ConfigBroadcaster.h"
|
||||
|
||||
class SimpleConfigConsumerImpl {
|
||||
ConfigFollowerInterface cfi;
|
||||
|
||||
Version mostRecentVersion{ 0 };
|
||||
|
||||
CounterCollection cc;
|
||||
Counter compactRequest;
|
||||
Counter successfulChangeRequest;
|
||||
Counter failedChangeRequest;
|
||||
Counter fullDBRequest;
|
||||
Future<Void> logger;
|
||||
|
||||
static const double POLLING_INTERVAL; // TODO: Make knob?
|
||||
static const double COMPACTION_INTERVAL; // TODO: Make knob?
|
||||
|
||||
ACTOR static Future<Void> compactor(SimpleConfigConsumerImpl* self) {
|
||||
loop {
|
||||
wait(delayJittered(COMPACTION_INTERVAL));
|
||||
// TODO: Enable compaction once bugs are fixed
|
||||
// wait(self->cfi.compact.getReply(ConfigFollowerCompactRequest{ self->mostRecentVersion }));
|
||||
//++self->compactRequest;
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> fetchChanges(SimpleConfigConsumerImpl* self, ConfigBroadcaster* broadcaster) {
|
||||
loop {
|
||||
try {
|
||||
ConfigFollowerGetChangesReply reply =
|
||||
wait(self->cfi.getChanges.getReply(ConfigFollowerGetChangesRequest{ self->mostRecentVersion, {} }));
|
||||
++self->successfulChangeRequest;
|
||||
for (const auto& versionedMutation : reply.versionedMutations) {
|
||||
TraceEvent(SevDebug, "ConsumerFetchedMutation")
|
||||
.detail("Version", versionedMutation.version)
|
||||
.detail("ConfigClass", versionedMutation.mutation.getConfigClass())
|
||||
.detail("KnobName", versionedMutation.mutation.getKnobName())
|
||||
.detail("KnobValue", versionedMutation.mutation.getValue());
|
||||
}
|
||||
self->mostRecentVersion = reply.mostRecentVersion;
|
||||
broadcaster->addVersionedMutations(reply.versionedMutations, reply.mostRecentVersion);
|
||||
wait(delayJittered(POLLING_INTERVAL));
|
||||
} catch (Error& e) {
|
||||
++self->failedChangeRequest;
|
||||
if (e.code() == error_code_version_already_compacted) {
|
||||
ConfigFollowerGetVersionReply versionReply =
|
||||
wait(self->cfi.getVersion.getReply(ConfigFollowerGetVersionRequest{}));
|
||||
ASSERT(versionReply.version > self->mostRecentVersion);
|
||||
self->mostRecentVersion = versionReply.version;
|
||||
ConfigFollowerGetFullDatabaseReply dbReply = wait(self->cfi.getFullDatabase.getReply(
|
||||
ConfigFollowerGetFullDatabaseRequest{ self->mostRecentVersion, {} }));
|
||||
// TODO: Remove unnecessary copy
|
||||
auto snapshot = dbReply.database;
|
||||
broadcaster->setSnapshot(std::move(snapshot), self->mostRecentVersion);
|
||||
++self->fullDBRequest;
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> getInitialSnapshot(SimpleConfigConsumerImpl* self, ConfigBroadcaster* broadcaster) {
|
||||
ConfigFollowerGetVersionReply versionReply =
|
||||
wait(self->cfi.getVersion.getReply(ConfigFollowerGetVersionRequest{}));
|
||||
self->mostRecentVersion = versionReply.version;
|
||||
ConfigFollowerGetFullDatabaseReply reply = wait(
|
||||
self->cfi.getFullDatabase.getReply(ConfigFollowerGetFullDatabaseRequest{ self->mostRecentVersion, {} }));
|
||||
TraceEvent(SevDebug, "ConfigGotInitialSnapshot").detail("Version", self->mostRecentVersion);
|
||||
// TODO: Remove unnecessary copy
|
||||
auto snapshot = reply.database;
|
||||
broadcaster->setSnapshot(std::move(snapshot), self->mostRecentVersion);
|
||||
return Void();
|
||||
}
|
||||
|
||||
SimpleConfigConsumerImpl()
|
||||
: mostRecentVersion(0), cc("ConfigConsumer"), compactRequest("CompactRequest", cc),
|
||||
successfulChangeRequest("SuccessfulChangeRequest", cc), failedChangeRequest("FailedChangeRequest", cc),
|
||||
fullDBRequest("FullDBRequest", cc) {
|
||||
logger = traceCounters(
|
||||
"ConfigConsumerMetrics", UID{}, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ConfigConsumerMetrics");
|
||||
}
|
||||
|
||||
public:
|
||||
SimpleConfigConsumerImpl(ClusterConnectionString const& ccs) : SimpleConfigConsumerImpl() {
|
||||
auto coordinators = ccs.coordinators();
|
||||
std::sort(coordinators.begin(), coordinators.end());
|
||||
cfi = ConfigFollowerInterface(coordinators[0]);
|
||||
}
|
||||
|
||||
SimpleConfigConsumerImpl(ServerCoordinators const& coordinators) : SimpleConfigConsumerImpl() {
|
||||
cfi = ConfigFollowerInterface(coordinators.configServers[0]);
|
||||
}
|
||||
|
||||
Future<Void> getInitialSnapshot(ConfigBroadcaster& broadcaster) { return getInitialSnapshot(this, &broadcaster); }
|
||||
|
||||
Future<Void> consume(ConfigBroadcaster& broadcaster) { return fetchChanges(this, &broadcaster) || compactor(this); }
|
||||
};
|
||||
|
||||
const double SimpleConfigConsumerImpl::POLLING_INTERVAL = 0.5;
|
||||
const double SimpleConfigConsumerImpl::COMPACTION_INTERVAL = 5.0;
|
||||
|
||||
SimpleConfigConsumer::SimpleConfigConsumer(ClusterConnectionString const& ccs)
|
||||
: impl(std::make_unique<SimpleConfigConsumerImpl>(ccs)) {}
|
||||
|
||||
SimpleConfigConsumer::SimpleConfigConsumer(ServerCoordinators const& coordinators)
|
||||
: impl(std::make_unique<SimpleConfigConsumerImpl>(coordinators)) {}
|
||||
|
||||
Future<Void> SimpleConfigConsumer::getInitialSnapshot(ConfigBroadcaster& broadcaster) {
|
||||
return impl->getInitialSnapshot(broadcaster);
|
||||
}
|
||||
|
||||
Future<Void> SimpleConfigConsumer::consume(ConfigBroadcaster& broadcaster) {
|
||||
return impl->consume(broadcaster);
|
||||
}
|
||||
|
||||
SimpleConfigConsumer::~SimpleConfigConsumer() = default;
|
Loading…
x
Reference in New Issue
Block a user