Add broadcaster error check to unit tests

This commit is contained in:
Lukas Joswiak 2021-08-10 11:21:57 -07:00
parent 564a3d69b7
commit 72e55ef72e
3 changed files with 11 additions and 7 deletions

View File

@ -70,6 +70,7 @@ class ConfigBroadcasterImpl {
Version lastCompactedVersion;
Version mostRecentVersion;
std::unique_ptr<IConfigConsumer> consumer;
Future<Void> consumerFuture;
ActorCollection actors{ false };
std::vector<BroadcastClientDetails> clients;
@ -83,10 +84,6 @@ class ConfigBroadcasterImpl {
template <class Changes>
Future<Void> pushChanges(BroadcastClientDetails& client, Changes const& changes) {
if (client.watcher.isReady()) {
clients.erase(std::remove(clients.begin(), clients.end(), client));
}
// Skip if client has already seen the latest version.
if (client.lastSeenVersion >= mostRecentVersion) {
return Void();
@ -172,7 +169,7 @@ public:
ConfigClassSet configClassSet,
Future<Void> watcher,
ConfigBroadcastInterface broadcastInterface) {
actors.add(consumer->consume(*self));
consumerFuture = consumer->consume(*self);
clients.push_back(BroadcastClientDetails{
watcher, std::move(configClassSet), lastSeenVersion, std::move(broadcastInterface) });
this->actors.add(waitForFailure(this, watcher, &clients.back()));
@ -297,6 +294,8 @@ public:
}
}
Future<Void> getError() const { return consumerFuture; }
UID getID() const { return id; }
static void runPendingRequestStoreTest(bool includeGlobalMutation, int expectedMatches);
@ -345,6 +344,10 @@ void ConfigBroadcaster::applySnapshotAndChanges(
impl().applySnapshotAndChanges(std::move(snapshot), snapshotVersion, changes, changesVersion, annotations);
}
Future<Void> ConfigBroadcaster::getError() const {
return impl().getError();
}
UID ConfigBroadcaster::getID() const {
return impl().getID();
}

View File

@ -61,6 +61,7 @@ public:
Standalone<VectorRef<VersionedConfigMutationRef>> const& changes,
Version changesVersion,
Standalone<VectorRef<VersionedConfigCommitAnnotationRef>> const& annotations);
Future<Void> getError() const;
UID getID() const;
JsonBuilderObject getStatus() const;
void compact(Version compactionVersion);

View File

@ -279,7 +279,7 @@ public:
void compact() { broadcaster.compact(lastWrittenVersion); }
Future<Void> getError() const { return readFrom.getError(); }
Future<Void> getError() const { return readFrom.getError() || broadcaster.getError(); }
};
class TransactionEnvironment {
@ -411,7 +411,7 @@ public:
}
Future<Void> clear(Optional<KeyRef> configClass) { return writeTo.clear(configClass); }
Future<Void> check(Optional<int64_t> value) const { return readFrom.checkEventually(value); }
Future<Void> getError() const { return writeTo.getError() || readFrom.getError(); }
Future<Void> getError() const { return writeTo.getError() || readFrom.getError() || broadcaster.getError(); }
};
// These functions give a common interface to all environments, to improve code reuse