1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-06-02 03:12:12 +08:00

Reenable consumer in config broadcaster

This commit is contained in:
sfc-gh-tclinkenbeard 2021-08-17 12:08:59 -07:00
parent 616a01d01d
commit 0bacc310ef
2 changed files with 14 additions and 9 deletions

@ -204,32 +204,37 @@ class ConfigBroadcasterImpl {
return Void();
}
ACTOR static Future<Void> registerWorker(ConfigBroadcasterImpl* self,
ACTOR static Future<Void> registerWorker(ConfigBroadcaster* self,
ConfigBroadcasterImpl* impl,
Version lastSeenVersion,
ConfigClassSet configClassSet,
Future<Void> watcher,
ConfigBroadcastInterface broadcastInterface) {
state BroadcastClientDetails client(
watcher, std::move(configClassSet), lastSeenVersion, std::move(broadcastInterface));
if (!impl->consumerFuture.isValid()) {
impl->consumerFuture = impl->consumer->consume(*self);
}
if (self->clients.count(broadcastInterface.id())) {
if (impl->clients.count(broadcastInterface.id())) {
// Client already registered
return Void();
}
// Push full snapshot to worker if it isn't up to date.
wait(self->pushSnapshot(self->mostRecentVersion, client));
self->clients[broadcastInterface.id()] = client;
self->actors.add(waitForFailure(self, watcher, broadcastInterface.id()));
wait(impl->pushSnapshot(impl->mostRecentVersion, client));
impl->clients[broadcastInterface.id()] = client;
impl->actors.add(waitForFailure(impl, watcher, broadcastInterface.id()));
return Void();
}
public:
Future<Void> registerWorker(Version lastSeenVersion,
Future<Void> registerWorker(ConfigBroadcaster& self,
Version lastSeenVersion,
ConfigClassSet configClassSet,
Future<Void> watcher,
ConfigBroadcastInterface broadcastInterface) {
return registerWorker(this, lastSeenVersion, configClassSet, watcher, broadcastInterface);
return registerWorker(&self, this, lastSeenVersion, configClassSet, watcher, broadcastInterface);
}
void applyChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
@ -355,7 +360,7 @@ Future<Void> ConfigBroadcaster::registerWorker(Version lastSeenVersion,
ConfigClassSet const& configClassSet,
Future<Void> watcher,
ConfigBroadcastInterface broadcastInterface) {
return impl->registerWorker(lastSeenVersion, configClassSet, watcher, broadcastInterface);
return impl->registerWorker(*this, lastSeenVersion, configClassSet, watcher, broadcastInterface);
}
void ConfigBroadcaster::applyChanges(Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,

@ -164,7 +164,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES rare/CheckRelocation.toml)
add_fdb_test(TEST_FILES rare/ClogUnclog.toml)
add_fdb_test(TEST_FILES rare/CloggedCycleWithKills.toml)
add_fdb_test(TEST_FILES rare/ConfigIncrement.toml IGNORE)
add_fdb_test(TEST_FILES rare/ConfigIncrement.toml)
add_fdb_test(TEST_FILES rare/ConflictRangeCheck.toml)
add_fdb_test(TEST_FILES rare/ConflictRangeRYOWCheck.toml)
add_fdb_test(TEST_FILES rare/CycleRollbackClogged.toml)