Fix some bugs in distribution of configBroadcaster interface

This commit is contained in:
sfc-gh-tclinkenbeard 2021-04-26 18:46:22 -07:00
parent 7211d838cf
commit dc577b6608
5 changed files with 35 additions and 7 deletions

View File

@ -1907,6 +1907,7 @@ ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster, ClusterC
dbInfo.distributor = db->serverInfo->get().distributor;
dbInfo.ratekeeper = db->serverInfo->get().ratekeeper;
dbInfo.latencyBandConfig = db->serverInfo->get().latencyBandConfig;
dbInfo.configBroadcaster = db->serverInfo->get().configBroadcaster;
TraceEvent("CCWDB", cluster->id)
.detail("Lifetime", dbInfo.masterLifetime.toString())

View File

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include "flow/IRandom.h"
#include "fdbserver/ConfigFollowerInterface.h"
#include "fdbserver/CoordinationInterface.h"
@ -28,8 +29,18 @@ void ConfigFollowerInterface::setupWellKnownEndpoints() {
compact.makeWellKnownEndpoint(WLTOKEN_CONFIGFOLLOWER_COMPACT, TaskPriority::Coordination);
}
ConfigFollowerInterface::ConfigFollowerInterface() : id(deterministicRandom()->randomUniqueID()) {}
ConfigFollowerInterface::ConfigFollowerInterface(NetworkAddress const& remote)
: getVersion(Endpoint({ remote }, WLTOKEN_CONFIGFOLLOWER_GETVERSION)),
: id(deterministicRandom()->randomUniqueID()), getVersion(Endpoint({ remote }, WLTOKEN_CONFIGFOLLOWER_GETVERSION)),
getFullDatabase(Endpoint({ remote }, WLTOKEN_CONFIGFOLLOWER_GETFULLDB)),
getChanges(Endpoint({ remote }, WLTOKEN_CONFIGFOLLOWER_GETCHANGES)),
compact(Endpoint({ remote }, WLTOKEN_CONFIGFOLLOWER_COMPACT)) {}
bool ConfigFollowerInterface::operator==(ConfigFollowerInterface const& rhs) const {
return id == rhs.id;
}
bool ConfigFollowerInterface::operator!=(ConfigFollowerInterface const& rhs) const {
return !(*this == rhs);
}

View File

@ -143,13 +143,16 @@ struct ConfigFollowerInterface {
RequestStream<ConfigFollowerGetFullDatabaseRequest> getFullDatabase;
RequestStream<ConfigFollowerGetChangesRequest> getChanges;
RequestStream<ConfigFollowerCompactRequest> compact;
UID id;
ConfigFollowerInterface() = default;
ConfigFollowerInterface();
void setupWellKnownEndpoints();
ConfigFollowerInterface(NetworkAddress const& remote);
bool operator==(ConfigFollowerInterface const& rhs) const;
bool operator!=(ConfigFollowerInterface const& rhs) const;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, getVersion, getFullDatabase, getChanges);
serializer(ar, getVersion, getFullDatabase, getChanges, id);
}
};

View File

@ -158,17 +158,28 @@ class LocalConfigurationImpl {
return Void();
}
ACTOR static Future<Void> monitorBroadcaster(Reference<AsyncVar<ServerDBInfo> const> serverDBInfo,
Reference<AsyncVar<ConfigFollowerInterface>> broadcaster) {
loop {
wait(serverDBInfo->onChange());
broadcaster->set(serverDBInfo->get().configBroadcaster);
}
}
ACTOR static Future<Void> consume(LocalConfigurationImpl* self,
Reference<AsyncVar<ServerDBInfo> const> serverDBInfo) {
wait(self->initFuture);
state Future<ConfigFollowerGetChangesReply> getChangesReply = Never();
state Reference<AsyncVar<ConfigFollowerInterface>> broadcaster =
makeReference<AsyncVar<ConfigFollowerInterface>>(serverDBInfo->get().configBroadcaster);
state Future<Void> monitor = monitorBroadcaster(serverDBInfo, broadcaster);
loop {
auto broadcaster = serverDBInfo->get().configBroadcaster;
choose {
when(wait(serverDBInfo->onChange())) {}
when(wait(fetchChanges(self, serverDBInfo->get().configBroadcaster))) {
when(wait(broadcaster->onChange())) {}
when(wait(brokenPromiseToNever(fetchChanges(self, broadcaster->get())))) {
wait(delay(0.5)); // TODO: Make knob?
}
when(wait(monitor)) { ASSERT(false); }
}
}
}

View File

@ -112,7 +112,9 @@ class SimpleConfigBroadcasterImpl {
self->mostRecentVersion = versionReply.version;
ConfigFollowerGetFullDatabaseReply reply = wait(self->subscriber.getFullDatabase.getReply(
ConfigFollowerGetFullDatabaseRequest{ self->mostRecentVersion, {} }));
TraceEvent(SevDebug, "BroadcasterGotInitialFullDB").detail("Version", self->mostRecentVersion);
TraceEvent(SevDebug, "BroadcasterGotInitialFullDB")
.detail("Version", self->mostRecentVersion)
.detail("PublisherID", publisher.id);
self->database = reply.database;
self->actors.add(fetchUpdates(self));
self->actors.add(compactor(self));