mirror of
https://github.com/apple/foundationdb.git
synced 2025-06-02 19:25:52 +08:00
* EncryptKeyProxy server APIs for simulation runs. Description diff-2: FlowSingleton util class Bug fixes diff-1: Expected errors returned to the caller Major changes proposed are: 1. EncryptKeyProxy server APIs: 1.1. Lookup Cipher details via BaseCipherId 1.2. Lookup latest Cipher details via encryption domainId. 2. EncyrptKeyProxy implements caches indexed by: baseCipherId & encyrptDomainId 3. Periodic task to refresh domainId indexed cache to support 'limiting cipher lifetime' abilities if supported by external KMS solutions. Testing EncyrptKeyProxyTest workload to validate the newly added code.
2088 lines
56 KiB
C++
2088 lines
56 KiB
C++
/*
|
|
* genericactors.actor.h
|
|
*
|
|
* This source file is part of the FoundationDB open source project
|
|
*
|
|
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source
|
|
// version.
|
|
#include "flow/FastRef.h"
|
|
#include "flow/network.h"
|
|
#include <utility>
|
|
#include <functional>
|
|
#if defined(NO_INTELLISENSE) && !defined(FLOW_GENERICACTORS_ACTOR_G_H)
|
|
#define FLOW_GENERICACTORS_ACTOR_G_H
|
|
#include "flow/genericactors.actor.g.h"
|
|
#elif !defined(GENERICACTORS_ACTOR_H)
|
|
#define GENERICACTORS_ACTOR_H
|
|
|
|
#include <list>
|
|
#include <utility>
|
|
|
|
#include "flow/flow.h"
|
|
#include "flow/Knobs.h"
|
|
#include "flow/Util.h"
|
|
#include "flow/IndexedSet.h"
|
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
|
|
|
#ifdef _MSC_VER
|
|
#pragma warning(disable : 4355) // 'this' : used in base member initializer list
|
|
#endif
|
|
|
|
ACTOR template <class T, class X>
|
|
Future<T> traceAfter(Future<T> what, const char* type, const char* key, X value, bool traceErrors = false) {
|
|
try {
|
|
T val = wait(what);
|
|
TraceEvent(type).detail(key, value);
|
|
return val;
|
|
} catch (Error& e) {
|
|
if (traceErrors)
|
|
TraceEvent(type).errorUnsuppressed(e).detail(key, value);
|
|
throw;
|
|
}
|
|
}
|
|
|
|
ACTOR template <class T, class X>
|
|
Future<T> traceAfterCall(Future<T> what, const char* type, const char* key, X func, bool traceErrors = false) {
|
|
try {
|
|
state T val = wait(what);
|
|
try {
|
|
TraceEvent(type).detail(key, func(val));
|
|
} catch (Error& e) {
|
|
TraceEvent(SevError, "TraceAfterCallError").error(e);
|
|
}
|
|
return val;
|
|
} catch (Error& e) {
|
|
if (traceErrors)
|
|
TraceEvent(type).errorUnsuppressed(e);
|
|
throw;
|
|
}
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<Optional<T>> stopAfter(Future<T> what) {
|
|
state Optional<T> ret = T();
|
|
try {
|
|
T _ = wait(what);
|
|
ret = Optional<T>(_);
|
|
} catch (Error& e) {
|
|
bool ok = e.code() == error_code_please_reboot || e.code() == error_code_please_reboot_delete ||
|
|
e.code() == error_code_actor_cancelled || e.code() == error_code_please_reboot_remote_kv_store;
|
|
TraceEvent(ok ? SevInfo : SevError, "StopAfterError").error(e);
|
|
if (!ok) {
|
|
fprintf(stderr, "Fatal Error: %s\n", e.what());
|
|
ret = Optional<T>();
|
|
}
|
|
}
|
|
g_network->stop();
|
|
return ret;
|
|
}
|
|
|
|
template <class T>
|
|
T sorted(T range) {
|
|
std::sort(range.begin(), range.end());
|
|
return range;
|
|
}
|
|
|
|
template <class T>
|
|
ErrorOr<T> errorOr(T t) {
|
|
return ErrorOr<T>(t);
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<ErrorOr<T>> errorOr(Future<T> f) {
|
|
try {
|
|
T t = wait(f);
|
|
return ErrorOr<T>(t);
|
|
} catch (Error& e) {
|
|
return ErrorOr<T>(e);
|
|
}
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<T> throwErrorOr(Future<ErrorOr<T>> f) {
|
|
ErrorOr<T> t = wait(f);
|
|
if (t.isError())
|
|
throw t.getError();
|
|
return t.get();
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<T> transformErrors(Future<T> f, Error err) {
|
|
try {
|
|
T t = wait(f);
|
|
return t;
|
|
} catch (Error& e) {
|
|
if (e.code() == error_code_actor_cancelled)
|
|
throw e;
|
|
throw err;
|
|
}
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<T> transformError(Future<T> f, Error inErr, Error outErr) {
|
|
try {
|
|
T t = wait(f);
|
|
return t;
|
|
} catch (Error& e) {
|
|
if (e.code() == inErr.code())
|
|
throw outErr;
|
|
throw e;
|
|
}
|
|
}
|
|
|
|
// Note that the RequestStream<T> version of forwardPromise doesn't exist, because what to do with errors?
|
|
|
|
ACTOR template <class T>
|
|
void forwardEvent(Event* ev, Future<T> input) {
|
|
try {
|
|
T value = wait(input);
|
|
} catch (Error&) {
|
|
}
|
|
ev->set();
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
void forwardEvent(Event* ev, T* t, Error* err, FutureStream<T> input) {
|
|
try {
|
|
T value = waitNext(input);
|
|
*t = std::move(value);
|
|
ev->set();
|
|
} catch (Error& e) {
|
|
*err = e;
|
|
ev->set();
|
|
}
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<Void> waitForAllReady(std::vector<Future<T>> results) {
|
|
state int i = 0;
|
|
loop {
|
|
if (i == results.size())
|
|
return Void();
|
|
try {
|
|
wait(success(results[i]));
|
|
} catch (...) {
|
|
}
|
|
i++;
|
|
}
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<T> timeout(Future<T> what, double time, T timedoutValue, TaskPriority taskID = TaskPriority::DefaultDelay) {
|
|
Future<Void> end = delay(time, taskID);
|
|
choose {
|
|
when(T t = wait(what)) { return t; }
|
|
when(wait(end)) { return timedoutValue; }
|
|
}
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<Optional<T>> timeout(Future<T> what, double time) {
|
|
Future<Void> end = delay(time);
|
|
choose {
|
|
when(T t = wait(what)) { return t; }
|
|
when(wait(end)) { return Optional<T>(); }
|
|
}
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<T> timeoutError(Future<T> what, double time, TaskPriority taskID = TaskPriority::DefaultDelay) {
|
|
Future<Void> end = delay(time, taskID);
|
|
choose {
|
|
when(T t = wait(what)) { return t; }
|
|
when(wait(end)) { throw timed_out(); }
|
|
}
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<T> delayed(Future<T> what, double time = 0.0, TaskPriority taskID = TaskPriority::DefaultDelay) {
|
|
try {
|
|
state T t = wait(what);
|
|
wait(delay(time, taskID));
|
|
return t;
|
|
} catch (Error& e) {
|
|
state Error err = e;
|
|
wait(delay(time, taskID));
|
|
throw err;
|
|
}
|
|
}
|
|
|
|
// wait <interval> then call what() in a loop forever
|
|
ACTOR template <class Func>
|
|
Future<Void> recurring(Func what, double interval, TaskPriority taskID = TaskPriority::DefaultDelay) {
|
|
loop choose {
|
|
when(wait(delay(interval, taskID))) { what(); }
|
|
}
|
|
}
|
|
|
|
ACTOR template <class Func>
|
|
Future<Void> trigger(Func what, Future<Void> signal) {
|
|
wait(signal);
|
|
what();
|
|
return Void();
|
|
}
|
|
|
|
ACTOR template <class Func>
|
|
Future<Void> triggerOnError(Func what, Future<Void> signal) {
|
|
try {
|
|
wait(signal);
|
|
} catch (Error& e) {
|
|
what();
|
|
}
|
|
|
|
return Void();
|
|
}
|
|
|
|
// Waits for a future to complete and cannot be cancelled
|
|
// Most situations will use the overload below, which does not require a promise
|
|
ACTOR template <class T>
|
|
void uncancellable(Future<T> what, Promise<T> result) {
|
|
try {
|
|
T val = wait(what);
|
|
result.send(val);
|
|
} catch (Error& e) {
|
|
result.sendError(e);
|
|
}
|
|
}
|
|
|
|
// Waits for a future to complete and cannot be cancelled
|
|
ACTOR template <class T>
|
|
[[flow_allow_discard]] Future<T> uncancellable(Future<T> what) {
|
|
Promise<T> resultPromise;
|
|
Future<T> result = resultPromise.getFuture();
|
|
|
|
uncancellable(what, resultPromise);
|
|
T val = wait(result);
|
|
|
|
return val;
|
|
}
|
|
|
|
// Holds onto an object until a future either completes or is cancelled
|
|
// Used to prevent the object from being reclaimed
|
|
//
|
|
// NOTE: the order of the arguments is important. The arguments will be destructed in
|
|
// reverse order, and we need the object to be destructed last.
|
|
ACTOR template <class T, class X>
|
|
Future<T> holdWhile(X object, Future<T> what) {
|
|
T val = wait(what);
|
|
return val;
|
|
}
|
|
|
|
ACTOR template <class T, class X>
|
|
Future<Void> holdWhileVoid(X object, Future<T> what) {
|
|
T val = wait(what);
|
|
return Void();
|
|
}
|
|
|
|
// Assign the future value of what to out
|
|
template <class T>
|
|
Future<Void> store(T& out, Future<T> what) {
|
|
return map(what, [&out](T const& v) {
|
|
out = v;
|
|
return Void();
|
|
});
|
|
}
|
|
|
|
template <class T>
|
|
Future<Void> storeOrThrow(T& out, Future<Optional<T>> what, Error e = key_not_found()) {
|
|
return map(what, [&out, e](Optional<T> const& o) {
|
|
if (!o.present())
|
|
throw e;
|
|
out = o.get();
|
|
return Void();
|
|
});
|
|
}
|
|
|
|
// Waits for a future to be ready, and then applies an asynchronous function to it.
|
|
ACTOR template <class T, class F, class U = decltype(std::declval<F>()(std::declval<T>()).getValue())>
|
|
Future<U> mapAsync(Future<T> what, F actorFunc) {
|
|
T val = wait(what);
|
|
U ret = wait(actorFunc(val));
|
|
return ret;
|
|
}
|
|
|
|
// maps a vector of futures with an asynchronous function
|
|
template <class T, class F>
|
|
auto mapAsync(std::vector<Future<T>> const& what, F const& actorFunc) {
|
|
std::vector<std::invoke_result_t<F, T>> ret;
|
|
ret.reserve(what.size());
|
|
for (const auto& f : what)
|
|
ret.push_back(mapAsync(f, actorFunc));
|
|
return ret;
|
|
}
|
|
|
|
// maps a stream with an asynchronous function
|
|
ACTOR template <class T, class F, class U = decltype(std::declval<F>()(std::declval<T>()).getValue())>
|
|
Future<Void> mapAsync(FutureStream<T> input, F actorFunc, PromiseStream<U> output) {
|
|
state Deque<Future<U>> futures;
|
|
|
|
loop {
|
|
try {
|
|
choose {
|
|
when(T nextInput = waitNext(input)) { futures.push_back(actorFunc(nextInput)); }
|
|
when(U nextOutput = wait(futures.size() == 0 ? Never() : futures.front())) {
|
|
output.send(nextOutput);
|
|
futures.pop_front();
|
|
}
|
|
}
|
|
} catch (Error& e) {
|
|
if (e.code() == error_code_end_of_stream) {
|
|
break;
|
|
} else {
|
|
output.sendError(e);
|
|
throw e;
|
|
}
|
|
}
|
|
}
|
|
|
|
while (futures.size()) {
|
|
U nextOutput = wait(futures.front());
|
|
output.send(nextOutput);
|
|
futures.pop_front();
|
|
}
|
|
|
|
output.sendError(end_of_stream());
|
|
|
|
return Void();
|
|
}
|
|
|
|
// Waits for a future to be ready, and then applies a function to it.
|
|
ACTOR template <class T, class F>
|
|
Future<std::invoke_result_t<F, T>> map(Future<T> what, F func) {
|
|
T val = wait(what);
|
|
return func(val);
|
|
}
|
|
|
|
// maps a vector of futures
|
|
template <class T, class F>
|
|
auto map(std::vector<Future<T>> const& what, F const& func) {
|
|
std::vector<Future<std::invoke_result_t<F, T>>> ret;
|
|
ret.reserve(what.size());
|
|
for (const auto& f : what)
|
|
ret.push_back(map(f, func));
|
|
return ret;
|
|
}
|
|
|
|
// maps a stream
|
|
ACTOR template <class T, class F>
|
|
Future<Void> map(FutureStream<T> input, F func, PromiseStream<std::invoke_result_t<F, T>> output) {
|
|
loop {
|
|
try {
|
|
T nextInput = waitNext(input);
|
|
output.send(func(nextInput));
|
|
} catch (Error& e) {
|
|
if (e.code() == error_code_end_of_stream) {
|
|
break;
|
|
} else
|
|
throw;
|
|
}
|
|
}
|
|
|
|
output.sendError(end_of_stream());
|
|
|
|
return Void();
|
|
}
|
|
|
|
// Returns if the future returns true, otherwise waits forever.
|
|
ACTOR Future<Void> returnIfTrue(Future<bool> f);
|
|
|
|
// Returns if the future, when waited on and then evaluated with the predicate, returns true, otherwise waits forever
|
|
template <class T, class F>
|
|
Future<Void> returnIfTrue(Future<T> what, F pred) {
|
|
return returnIfTrue(map(what, pred));
|
|
}
|
|
|
|
// filters a stream
|
|
ACTOR template <class T, class F>
|
|
Future<Void> filter(FutureStream<T> input, F pred, PromiseStream<T> output) {
|
|
loop {
|
|
try {
|
|
T nextInput = waitNext(input);
|
|
if (func(nextInput))
|
|
output.send(nextInput);
|
|
} catch (Error& e) {
|
|
if (e.code() == error_code_end_of_stream) {
|
|
break;
|
|
} else
|
|
throw;
|
|
}
|
|
}
|
|
|
|
output.sendError(end_of_stream());
|
|
|
|
return Void();
|
|
}
|
|
|
|
// filters a stream asynchronously
|
|
ACTOR template <class T, class F>
|
|
Future<Void> asyncFilter(FutureStream<T> input, F actorPred, PromiseStream<T> output) {
|
|
state Deque<std::pair<T, Future<bool>>> futures;
|
|
state std::pair<T, Future<bool>> p;
|
|
|
|
loop {
|
|
try {
|
|
choose {
|
|
when(T nextInput = waitNext(input)) { futures.emplace_back(nextInput, actorPred(nextInput)); }
|
|
when(bool pass = wait(futures.size() == 0 ? Never() : futures.front().second)) {
|
|
if (pass)
|
|
output.send(futures.front().first);
|
|
futures.pop_front();
|
|
}
|
|
}
|
|
} catch (Error& e) {
|
|
if (e.code() == error_code_end_of_stream) {
|
|
break;
|
|
} else {
|
|
throw e;
|
|
}
|
|
}
|
|
}
|
|
|
|
while (futures.size()) {
|
|
p = futures.front();
|
|
bool pass = wait(p.second);
|
|
if (pass)
|
|
output.send(p.first);
|
|
futures.pop_front();
|
|
}
|
|
|
|
output.sendError(end_of_stream());
|
|
|
|
return Void();
|
|
}
|
|
|
|
template <class T>
|
|
struct WorkerCache {
|
|
// SOMEDAY: Would we do better to use "unreliable" (at most once) transport for the initialize requests and get rid
|
|
// of this? It doesn't provide true at most once behavior because things are removed from the cache after they have
|
|
// terminated.
|
|
bool exists(UID id) { return id_interface.count(id) != 0; }
|
|
void set(UID id, const Future<T>& onReady) {
|
|
ASSERT(!exists(id));
|
|
id_interface[id] = onReady;
|
|
}
|
|
Future<T> get(UID id) {
|
|
ASSERT(exists(id));
|
|
return id_interface[id];
|
|
}
|
|
|
|
Future<Void> removeOnReady(UID id, Future<Void> const& ready) { return removeOnReady(this, id, ready); }
|
|
|
|
private:
|
|
ACTOR static Future<Void> removeOnReady(WorkerCache* self, UID id, Future<Void> ready) {
|
|
try {
|
|
wait(ready);
|
|
self->id_interface.erase(id);
|
|
return Void();
|
|
} catch (Error& e) {
|
|
self->id_interface.erase(id);
|
|
throw;
|
|
}
|
|
}
|
|
|
|
std::map<UID, Future<T>> id_interface;
|
|
};
|
|
|
|
template <class K, class V>
|
|
class AsyncMap : NonCopyable {
|
|
public:
|
|
// Represents a complete function from keys to values (K -> V)
|
|
// All values not explicitly inserted map to V()
|
|
// If this isn't appropriate, use V=Optional<X>
|
|
|
|
AsyncMap() : defaultValue(), destructing(false) {}
|
|
|
|
virtual ~AsyncMap() {
|
|
destructing = true;
|
|
items.clear();
|
|
}
|
|
|
|
void set(K const& k, V const& v) {
|
|
auto& i = items[k];
|
|
if (i.value != v)
|
|
setUnconditional(k, v, i);
|
|
}
|
|
void setUnconditional(K const& k, V const& v) { setUnconditional(k, v, items[k]); }
|
|
void triggerAll() {
|
|
std::vector<Promise<Void>> ps;
|
|
for (auto it = items.begin(); it != items.end(); ++it) {
|
|
ps.resize(ps.size() + 1);
|
|
ps.back().swap(it->second.change);
|
|
}
|
|
std::vector<Promise<Void>> noDestroy = ps; // See explanation of noDestroy in setUnconditional()
|
|
for (auto p = ps.begin(); p != ps.end(); ++p)
|
|
p->send(Void());
|
|
}
|
|
void triggerRange(K const& begin, K const& end) {
|
|
std::vector<Promise<Void>> ps;
|
|
for (auto it = items.lower_bound(begin); it != items.end() && it->first < end; ++it) {
|
|
ps.resize(ps.size() + 1);
|
|
ps.back().swap(it->second.change);
|
|
}
|
|
std::vector<Promise<Void>> noDestroy = ps; // See explanation of noDestroy in setUnconditional()
|
|
for (auto p = ps.begin(); p != ps.end(); ++p)
|
|
p->send(Void());
|
|
}
|
|
void trigger(K const& key) {
|
|
if (items.count(key) != 0) {
|
|
auto& i = items[key];
|
|
Promise<Void> trigger;
|
|
i.change.swap(trigger);
|
|
Promise<Void> noDestroy = trigger; // See explanation of noDestroy in setUnconditional()
|
|
|
|
if (i.value == defaultValue)
|
|
items.erase(key);
|
|
|
|
trigger.send(Void());
|
|
}
|
|
}
|
|
void clear(K const& k) { set(k, V()); }
|
|
V const& get(K const& k) const {
|
|
auto it = items.find(k);
|
|
if (it != items.end())
|
|
return it->second.value;
|
|
else
|
|
return defaultValue;
|
|
}
|
|
int count(K const& k) const {
|
|
auto it = items.find(k);
|
|
if (it != items.end())
|
|
return 1;
|
|
return 0;
|
|
}
|
|
virtual Future<Void> onChange(K const& k) { // throws broken_promise if this is destroyed
|
|
auto& item = items[k];
|
|
if (item.value == defaultValue)
|
|
return destroyOnCancel(this, k, item.change.getFuture());
|
|
return item.change.getFuture();
|
|
}
|
|
std::vector<K> getKeys() const {
|
|
std::vector<K> keys;
|
|
keys.reserve(items.size());
|
|
for (auto i = items.begin(); i != items.end(); ++i)
|
|
keys.push_back(i->first);
|
|
return keys;
|
|
}
|
|
void resetNoWaiting() {
|
|
for (auto i = items.begin(); i != items.end(); ++i)
|
|
ASSERT(i->second.change.getFuture().getFutureReferenceCount() == 1);
|
|
items.clear();
|
|
}
|
|
|
|
protected:
|
|
// Invariant: Every item in the map either has value!=defaultValue xor a destroyOnCancel actor waiting on
|
|
// change.getFuture()
|
|
struct P {
|
|
V value;
|
|
Promise<Void> change;
|
|
P() : value() {}
|
|
};
|
|
std::map<K, P> items;
|
|
const V defaultValue;
|
|
bool destructing;
|
|
|
|
void setUnconditional(K const& k, V const& v, P& i) {
|
|
Promise<Void> trigger;
|
|
i.change.swap(trigger);
|
|
Promise<Void> noDestroy =
|
|
trigger; // The send(Void()) or even V::operator= could cause destroyOnCancel,
|
|
// which could undo the change to i.value here. Keeping the promise reference count >= 2
|
|
// prevents destroyOnCancel from erasing anything from the map.
|
|
if (v == defaultValue) {
|
|
items.erase(k);
|
|
} else {
|
|
i.value = v;
|
|
}
|
|
|
|
trigger.send(Void());
|
|
}
|
|
|
|
ACTOR Future<Void> destroyOnCancel(AsyncMap* self, K key, Future<Void> change) {
|
|
try {
|
|
wait(change);
|
|
return Void();
|
|
} catch (Error& e) {
|
|
if (e.code() == error_code_actor_cancelled && !self->destructing && change.getFutureReferenceCount() == 1 &&
|
|
change.getPromiseReferenceCount() == 1) {
|
|
if (EXPENSIVE_VALIDATION) {
|
|
auto& p = self->items[key];
|
|
ASSERT(p.change.getFuture() == change);
|
|
}
|
|
self->items.erase(key);
|
|
}
|
|
throw;
|
|
}
|
|
}
|
|
};
|
|
|
|
template <class V>
|
|
class ReferencedObject : NonCopyable, public ReferenceCounted<ReferencedObject<V>> {
|
|
public:
|
|
ReferencedObject() : value() {}
|
|
ReferencedObject(V const& v) : value(v) {}
|
|
ReferencedObject(V&& v) : value(std::move(v)) {}
|
|
ReferencedObject(ReferencedObject&& r) : value(std::move(r.value)) {}
|
|
|
|
void operator=(ReferencedObject&& r) { value = std::move(r.value); }
|
|
|
|
V const& get() const { return value; }
|
|
|
|
V& mutate() { return value; }
|
|
|
|
void set(V const& v) { value = v; }
|
|
|
|
void set(V&& v) { value = std::move(v); }
|
|
|
|
static Reference<ReferencedObject<V>> from(V const& v) { return makeReference<ReferencedObject<V>>(v); }
|
|
|
|
static Reference<ReferencedObject<V>> from(V&& v) { return makeReference<ReferencedObject<V>>(std::move(v)); }
|
|
|
|
private:
|
|
V value;
|
|
};
|
|
|
|
template <class V>
|
|
class AsyncVar : NonCopyable, public ReferenceCounted<AsyncVar<V>> {
|
|
public:
|
|
AsyncVar() : value() {}
|
|
AsyncVar(V const& v) : value(v) {}
|
|
AsyncVar(AsyncVar&& av) : value(std::move(av.value)), nextChange(std::move(av.nextChange)) {}
|
|
void operator=(AsyncVar&& av) {
|
|
value = std::move(av.value);
|
|
nextChange = std::move(av.nextChange);
|
|
}
|
|
|
|
V const& get() const { return value; }
|
|
Future<Void> onChange() const { return nextChange.getFuture(); }
|
|
void set(V const& v) {
|
|
if (v != value)
|
|
setUnconditional(v);
|
|
}
|
|
void setUnconditional(V const& v) {
|
|
Promise<Void> t;
|
|
this->nextChange.swap(t);
|
|
this->value = v;
|
|
t.send(Void());
|
|
}
|
|
void trigger() {
|
|
Promise<Void> t;
|
|
this->nextChange.swap(t);
|
|
t.send(Void());
|
|
}
|
|
|
|
private:
|
|
V value;
|
|
Promise<Void> nextChange;
|
|
};
|
|
|
|
class AsyncTrigger : NonCopyable {
|
|
public:
|
|
AsyncTrigger() {}
|
|
AsyncTrigger(AsyncTrigger&& at) : v(std::move(at.v)) {}
|
|
void operator=(AsyncTrigger&& at) { v = std::move(at.v); }
|
|
Future<Void> onTrigger() const { return v.onChange(); }
|
|
void trigger() { v.trigger(); }
|
|
|
|
private:
|
|
AsyncVar<Void> v;
|
|
};
|
|
|
|
// Binds an AsyncTrigger object to an AsyncVar, so when the AsyncVar changes
|
|
// the AsyncTrigger is triggered.
|
|
ACTOR template <class T>
|
|
void forward(Reference<AsyncVar<T> const> from, AsyncTrigger* to) {
|
|
loop {
|
|
wait(from->onChange());
|
|
to->trigger();
|
|
}
|
|
}
|
|
|
|
class Debouncer : NonCopyable {
|
|
public:
|
|
explicit Debouncer(double delay) { worker = debounceWorker(this, delay); }
|
|
Debouncer(Debouncer&& at) = default;
|
|
Debouncer& operator=(Debouncer&& at) = default;
|
|
Future<Void> onTrigger() { return output.onChange(); }
|
|
void trigger() { input.setUnconditional(Void()); }
|
|
|
|
private:
|
|
AsyncVar<Void> input;
|
|
AsyncVar<Void> output;
|
|
Future<Void> worker;
|
|
|
|
ACTOR Future<Void> debounceWorker(Debouncer* self, double bounceTime) {
|
|
loop {
|
|
wait(self->input.onChange());
|
|
loop {
|
|
choose {
|
|
when(wait(self->input.onChange())) {}
|
|
when(wait(delay(bounceTime))) { break; }
|
|
}
|
|
}
|
|
self->output.setUnconditional(Void());
|
|
}
|
|
}
|
|
};
|
|
|
|
ACTOR template <class T>
|
|
Future<Void> asyncDeserialize(Reference<AsyncVar<Standalone<StringRef>>> input,
|
|
Reference<AsyncVar<Optional<T>>> output) {
|
|
loop {
|
|
if (input->get().size()) {
|
|
ObjectReader reader(input->get().begin(), IncludeVersion());
|
|
T res;
|
|
reader.deserialize(res);
|
|
output->set(res);
|
|
} else
|
|
output->set(Optional<T>());
|
|
wait(input->onChange());
|
|
}
|
|
}
|
|
|
|
ACTOR template <class V, class T>
|
|
void forwardVector(Future<V> values, std::vector<Promise<T>> out) {
|
|
V in = wait(values);
|
|
ASSERT(in.size() == out.size());
|
|
for (int i = 0; i < out.size(); i++)
|
|
out[i].send(in[i]);
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<Void> delayedAsyncVar(Reference<AsyncVar<T>> in, Reference<AsyncVar<T>> out, double time) {
|
|
try {
|
|
loop {
|
|
wait(delay(time));
|
|
out->set(in->get());
|
|
wait(in->onChange());
|
|
}
|
|
} catch (Error& e) {
|
|
out->set(in->get());
|
|
throw;
|
|
}
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<Void> setAfter(Reference<AsyncVar<T>> var, double time, T val) {
|
|
wait(delay(time));
|
|
var->set(val);
|
|
return Void();
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<Void> resetAfter(Reference<AsyncVar<T>> var,
|
|
double time,
|
|
T val,
|
|
int warningLimit = -1,
|
|
double warningResetDelay = 0,
|
|
const char* context = nullptr) {
|
|
state bool isEqual = var->get() == val;
|
|
state Future<Void> resetDelay = isEqual ? Never() : delay(time);
|
|
state int resetCount = 0;
|
|
state double lastReset = now();
|
|
loop {
|
|
choose {
|
|
when(wait(resetDelay)) {
|
|
var->set(val);
|
|
if (now() - lastReset > warningResetDelay) {
|
|
resetCount = 0;
|
|
}
|
|
resetCount++;
|
|
if (context && warningLimit >= 0 && resetCount > warningLimit) {
|
|
TraceEvent(SevWarnAlways, context)
|
|
.detail("ResetCount", resetCount)
|
|
.detail("LastReset", now() - lastReset);
|
|
}
|
|
lastReset = now();
|
|
isEqual = true;
|
|
resetDelay = Never();
|
|
}
|
|
when(wait(var->onChange())) {}
|
|
}
|
|
if (isEqual && var->get() != val) {
|
|
isEqual = false;
|
|
resetDelay = delay(time);
|
|
}
|
|
if (!isEqual && var->get() == val) {
|
|
isEqual = true;
|
|
resetDelay = Never();
|
|
}
|
|
}
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<Void> setWhenDoneOrError(Future<Void> condition, Reference<AsyncVar<T>> var, T val) {
|
|
try {
|
|
wait(condition);
|
|
} catch (Error& e) {
|
|
if (e.code() == error_code_actor_cancelled)
|
|
throw;
|
|
}
|
|
var->set(val);
|
|
return Void();
|
|
}
|
|
|
|
Future<bool> allTrue(const std::vector<Future<bool>>& all);
|
|
Future<Void> anyTrue(std::vector<Reference<AsyncVar<bool>>> const& input, Reference<AsyncVar<bool>> const& output);
|
|
Future<Void> cancelOnly(std::vector<Future<Void>> const& futures);
|
|
Future<Void> timeoutWarningCollector(FutureStream<Void> const& input,
|
|
double const& logDelay,
|
|
const char* const& context,
|
|
UID const& id);
|
|
Future<bool> quorumEqualsTrue(std::vector<Future<bool>> const& futures, int const& required);
|
|
Future<Void> lowPriorityDelay(double const& waitTime);
|
|
|
|
ACTOR template <class T>
|
|
Future<Void> streamHelper(PromiseStream<T> output, PromiseStream<Error> errors, Future<T> input) {
|
|
try {
|
|
T value = wait(input);
|
|
output.send(value);
|
|
} catch (Error& e) {
|
|
if (e.code() == error_code_actor_cancelled)
|
|
throw;
|
|
errors.send(e);
|
|
}
|
|
return Void();
|
|
}
|
|
|
|
template <class T>
|
|
Future<Void> makeStream(const std::vector<Future<T>>& futures, PromiseStream<T>& stream, PromiseStream<Error>& errors) {
|
|
std::vector<Future<Void>> forwarders;
|
|
forwarders.reserve(futures.size());
|
|
for (int f = 0; f < futures.size(); f++)
|
|
forwarders.push_back(streamHelper(stream, errors, futures[f]));
|
|
return cancelOnly(forwarders);
|
|
}
|
|
|
|
template <class T>
|
|
class QuorumCallback;
|
|
|
|
template <class T>
|
|
struct Quorum final : SAV<Void> {
|
|
int antiQuorum;
|
|
int count;
|
|
|
|
static inline int sizeFor(int count) { return sizeof(Quorum<T>) + sizeof(QuorumCallback<T>) * count; }
|
|
|
|
void destroy() override {
|
|
int size = sizeFor(this->count);
|
|
this->~Quorum();
|
|
freeFast(size, this);
|
|
}
|
|
void cancel() override {
|
|
int cancelled_callbacks = 0;
|
|
for (int i = 0; i < count; i++)
|
|
if (callbacks()[i].next) {
|
|
callbacks()[i].remove();
|
|
callbacks()[i].next = 0;
|
|
++cancelled_callbacks;
|
|
}
|
|
if (canBeSet())
|
|
sendError(actor_cancelled());
|
|
for (int i = 0; i < cancelled_callbacks; i++)
|
|
delPromiseRef();
|
|
}
|
|
explicit Quorum(int quorum, int count) : SAV<Void>(1, count), antiQuorum(count - quorum + 1), count(count) {
|
|
if (!quorum)
|
|
this->send(Void());
|
|
}
|
|
void oneSuccess() {
|
|
if (getPromiseReferenceCount() == antiQuorum && canBeSet())
|
|
this->sendAndDelPromiseRef(Void());
|
|
else
|
|
delPromiseRef();
|
|
}
|
|
void oneError(Error err) {
|
|
if (canBeSet())
|
|
this->sendErrorAndDelPromiseRef(err);
|
|
else
|
|
delPromiseRef();
|
|
}
|
|
|
|
QuorumCallback<T>* callbacks() { return (QuorumCallback<T>*)(this + 1); }
|
|
};
|
|
|
|
template <class T>
|
|
class QuorumCallback : public Callback<T> {
|
|
public:
|
|
void fire(const T& value) override {
|
|
Callback<T>::remove();
|
|
Callback<T>::next = 0;
|
|
head->oneSuccess();
|
|
}
|
|
void error(Error error) override {
|
|
Callback<T>::remove();
|
|
Callback<T>::next = 0;
|
|
head->oneError(error);
|
|
}
|
|
|
|
private:
|
|
template <class U>
|
|
friend Future<Void> quorum(std::vector<Future<U>> const& results, int n);
|
|
Quorum<T>* head;
|
|
QuorumCallback() = default;
|
|
QuorumCallback(Future<T> future, Quorum<T>* head) : head(head) { future.addCallbackAndClear(this); }
|
|
};
|
|
|
|
template <class T>
|
|
Future<Void> quorum(std::vector<Future<T>> const& results, int n) {
|
|
ASSERT(n >= 0 && n <= results.size());
|
|
|
|
int size = Quorum<T>::sizeFor(results.size());
|
|
Quorum<T>* q = new (allocateFast(size)) Quorum<T>(n, results.size());
|
|
|
|
QuorumCallback<T>* nextCallback = q->callbacks();
|
|
for (auto& r : results) {
|
|
if (r.isReady()) {
|
|
new (nextCallback) QuorumCallback<T>();
|
|
nextCallback->next = 0;
|
|
if (r.isError())
|
|
q->oneError(r.getError());
|
|
else
|
|
q->oneSuccess();
|
|
} else
|
|
new (nextCallback) QuorumCallback<T>(r, q);
|
|
++nextCallback;
|
|
}
|
|
return Future<Void>(q);
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<Void> smartQuorum(std::vector<Future<T>> results,
|
|
int required,
|
|
double extraSeconds,
|
|
TaskPriority taskID = TaskPriority::DefaultDelay) {
|
|
if (results.empty() && required == 0)
|
|
return Void();
|
|
wait(quorum(results, required));
|
|
choose {
|
|
when(wait(quorum(results, (int)results.size()))) { return Void(); }
|
|
when(wait(delay(extraSeconds, taskID))) { return Void(); }
|
|
}
|
|
}
|
|
|
|
template <class T>
|
|
Future<Void> waitForAll(std::vector<Future<T>> const& results) {
|
|
if (results.empty())
|
|
return Void();
|
|
return quorum(results, (int)results.size());
|
|
}
|
|
|
|
template <class T>
|
|
Future<Void> waitForAny(std::vector<Future<T>> const& results) {
|
|
if (results.empty())
|
|
return Void();
|
|
return quorum(results, 1);
|
|
}
|
|
|
|
ACTOR Future<bool> shortCircuitAny(std::vector<Future<bool>> f);
|
|
|
|
ACTOR template <class T>
|
|
Future<std::vector<T>> getAll(std::vector<Future<T>> input) {
|
|
if (input.empty())
|
|
return std::vector<T>();
|
|
wait(quorum(input, input.size()));
|
|
|
|
std::vector<T> output;
|
|
output.reserve(input.size());
|
|
for (int i = 0; i < input.size(); i++)
|
|
output.push_back(input[i].get());
|
|
return output;
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<std::vector<T>> appendAll(std::vector<Future<std::vector<T>>> input) {
|
|
wait(quorum(input, input.size()));
|
|
|
|
std::vector<T> output;
|
|
size_t sz = 0;
|
|
for (const auto& f : input) {
|
|
sz += f.get().size();
|
|
}
|
|
output.reserve(sz);
|
|
|
|
for (int i = 0; i < input.size(); i++) {
|
|
auto const& r = input[i].get();
|
|
output.insert(output.end(), r.begin(), r.end());
|
|
}
|
|
return output;
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<Void> onEqual(Future<T> in, T equalTo) {
|
|
T t = wait(in);
|
|
if (t == equalTo)
|
|
return Void();
|
|
wait(Never()); // never return
|
|
throw internal_error(); // does not happen
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<Void> success(Future<T> of) {
|
|
T t = wait(of);
|
|
(void)t;
|
|
return Void();
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<Void> ready(Future<T> f) {
|
|
try {
|
|
wait(success(f));
|
|
} catch (...) {
|
|
}
|
|
return Void();
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<T> waitAndForward(FutureStream<T> input) {
|
|
T output = waitNext(input);
|
|
return output;
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<T> reportErrorsExcept(Future<T> in, const char* context, UID id, std::set<int> const* pExceptErrors) {
|
|
try {
|
|
T t = wait(in);
|
|
return t;
|
|
} catch (Error& e) {
|
|
if (e.code() != error_code_actor_cancelled && (!pExceptErrors || !pExceptErrors->count(e.code())))
|
|
TraceEvent(SevError, context, id).error(e);
|
|
throw;
|
|
}
|
|
}
|
|
|
|
template <class T>
|
|
Future<T> reportErrors(Future<T> const& in, const char* context, UID id = UID()) {
|
|
return reportErrorsExcept(in, context, id, nullptr);
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<T> require(Future<Optional<T>> in, int errorCode) {
|
|
Optional<T> o = wait(in);
|
|
if (o.present()) {
|
|
return o.get();
|
|
} else {
|
|
throw Error(errorCode);
|
|
}
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<T> waitForFirst(std::vector<Future<T>> items) {
|
|
state PromiseStream<T> resultStream;
|
|
state PromiseStream<Error> errorStream;
|
|
|
|
state Future<Void> forCancellation = makeStream(items, resultStream, errorStream);
|
|
|
|
state FutureStream<T> resultFutureStream = resultStream.getFuture();
|
|
state FutureStream<Error> errorFutureStream = errorStream.getFuture();
|
|
|
|
choose {
|
|
when(T val = waitNext(resultFutureStream)) {
|
|
forCancellation = Future<Void>();
|
|
return val;
|
|
}
|
|
when(Error e = waitNext(errorFutureStream)) {
|
|
forCancellation = Future<Void>();
|
|
throw e;
|
|
}
|
|
}
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<T> tag(Future<Void> future, T what) {
|
|
wait(future);
|
|
return what;
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<Void> tag(Future<Void> future, T what, PromiseStream<T> stream) {
|
|
wait(future);
|
|
stream.send(what);
|
|
return Void();
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<T> tagError(Future<Void> future, Error e) {
|
|
wait(future);
|
|
throw e;
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<T> detach(Future<T> f) {
|
|
T x = wait(f);
|
|
return x;
|
|
}
|
|
|
|
// If the future is ready, yields and returns. Otherwise, returns when future is set.
|
|
template <class T>
|
|
Future<T> orYield(Future<T> f) {
|
|
if (f.isReady()) {
|
|
if (f.isError())
|
|
return tagError<T>(yield(), f.getError());
|
|
else
|
|
return tag(yield(), f.get());
|
|
} else
|
|
return f;
|
|
}
|
|
|
|
Future<Void> orYield(Future<Void> f);
|
|
|
|
ACTOR template <class T>
|
|
Future<T> chooseActor(Future<T> lhs, Future<T> rhs) {
|
|
choose {
|
|
when(T t = wait(lhs)) { return t; }
|
|
when(T t = wait(rhs)) { return t; }
|
|
}
|
|
}
|
|
|
|
// set && set -> set
|
|
// error && x -> error
|
|
// all others -> unset
|
|
inline Future<Void> operator&&(Future<Void> const& lhs, Future<Void> const& rhs) {
|
|
if (lhs.isReady()) {
|
|
if (lhs.isError())
|
|
return lhs;
|
|
else
|
|
return rhs;
|
|
}
|
|
if (rhs.isReady()) {
|
|
if (rhs.isError())
|
|
return rhs;
|
|
else
|
|
return lhs;
|
|
}
|
|
|
|
return waitForAll(std::vector<Future<Void>>{ lhs, rhs });
|
|
}
|
|
|
|
// error || unset -> error
|
|
// unset || unset -> unset
|
|
// all others -> set
|
|
inline Future<Void> operator||(Future<Void> const& lhs, Future<Void> const& rhs) {
|
|
if (lhs.isReady()) {
|
|
if (lhs.isError())
|
|
return lhs;
|
|
if (rhs.isReady())
|
|
return rhs;
|
|
return lhs;
|
|
}
|
|
|
|
return chooseActor(lhs, rhs);
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<T> brokenPromiseToNever(Future<T> in) {
|
|
try {
|
|
T t = wait(in);
|
|
return t;
|
|
} catch (Error& e) {
|
|
if (e.code() != error_code_broken_promise)
|
|
throw;
|
|
wait(Never()); // never return
|
|
throw internal_error(); // does not happen
|
|
}
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<T> brokenPromiseToMaybeDelivered(Future<T> in) {
|
|
try {
|
|
T t = wait(in);
|
|
return t;
|
|
} catch (Error& e) {
|
|
if (e.code() == error_code_broken_promise) {
|
|
throw request_maybe_delivered();
|
|
}
|
|
throw;
|
|
}
|
|
}
|
|
|
|
ACTOR template <class T, class U>
|
|
void tagAndForward(Promise<T>* pOutputPromise, U value, Future<Void> signal) {
|
|
state Promise<T> out(std::move(*pOutputPromise));
|
|
wait(signal);
|
|
out.send(std::move(value));
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
void tagAndForward(PromiseStream<T>* pOutput, T value, Future<Void> signal) {
|
|
wait(signal);
|
|
pOutput->send(std::move(value));
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
void tagAndForwardError(Promise<T>* pOutputPromise, Error value, Future<Void> signal) {
|
|
state Promise<T> out(std::move(*pOutputPromise));
|
|
wait(signal);
|
|
out.sendError(value);
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
void tagAndForwardError(PromiseStream<T>* pOutput, Error value, Future<Void> signal) {
|
|
wait(signal);
|
|
pOutput->sendError(value);
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<T> waitOrError(Future<T> f, Future<Void> errorSignal) {
|
|
choose {
|
|
when(T val = wait(f)) { return val; }
|
|
when(wait(errorSignal)) {
|
|
ASSERT(false);
|
|
throw internal_error();
|
|
}
|
|
}
|
|
}
|
|
|
|
// A low-overhead FIFO mutex made with no internal queue structure (no list, deque, vector, etc)
|
|
// The lock is implemented as a Promise<Void>, which is returned to callers in a convenient wrapper
|
|
// called Lock.
|
|
//
|
|
// Usage:
|
|
// Lock lock = wait(mutex.take());
|
|
// lock.release(); // Next waiter will get the lock, OR
|
|
// lock.error(e); // Next waiter will get e, future waiters will see broken_promise
|
|
// lock = Lock(); // Or let Lock and any copies go out of scope. All waiters will see broken_promise.
|
|
struct FlowMutex {
|
|
FlowMutex() { lastPromise.send(Void()); }
|
|
|
|
bool available() { return lastPromise.isSet(); }
|
|
|
|
struct Lock {
|
|
void release() { promise.send(Void()); }
|
|
|
|
void error(Error e = broken_promise()) { promise.sendError(e); }
|
|
|
|
// This is exposed in case the caller wants to use/copy it directly
|
|
Promise<Void> promise;
|
|
};
|
|
|
|
Future<Lock> take() {
|
|
Lock newLock;
|
|
Future<Lock> f = lastPromise.isSet() ? newLock : tag(lastPromise.getFuture(), newLock);
|
|
lastPromise = newLock.promise;
|
|
return f;
|
|
}
|
|
|
|
private:
|
|
Promise<Void> lastPromise;
|
|
};
|
|
|
|
ACTOR template <class T, class V>
|
|
Future<T> forwardErrors(Future<T> f, PromiseStream<V> output) {
|
|
try {
|
|
T val = wait(f);
|
|
return val;
|
|
} catch (Error& e) {
|
|
output.sendError(e);
|
|
throw;
|
|
}
|
|
}
|
|
|
|
struct FlowLock : NonCopyable, public ReferenceCounted<FlowLock> {
|
|
// FlowLock implements a nonblocking critical section: there can be only a limited number of clients executing code
|
|
// between wait(take()) and release(). Not thread safe. take() returns only when the number of holders of the lock
|
|
// is fewer than the number of permits, and release() makes the caller no longer a holder of the lock. release()
|
|
// only runs waiting take()rs after the caller wait()s
|
|
|
|
struct Releaser : NonCopyable {
|
|
FlowLock* lock;
|
|
int remaining;
|
|
Releaser() : lock(0), remaining(0) {}
|
|
Releaser(FlowLock& lock, int64_t amount = 1) : lock(&lock), remaining(amount) {}
|
|
Releaser(Releaser&& r) noexcept : lock(r.lock), remaining(r.remaining) { r.remaining = 0; }
|
|
void operator=(Releaser&& r) {
|
|
if (remaining)
|
|
lock->release(remaining);
|
|
lock = r.lock;
|
|
remaining = r.remaining;
|
|
r.remaining = 0;
|
|
}
|
|
|
|
void release(int64_t amount = -1) {
|
|
if (amount == -1 || amount > remaining)
|
|
amount = remaining;
|
|
|
|
if (remaining)
|
|
lock->release(amount);
|
|
remaining -= amount;
|
|
}
|
|
|
|
~Releaser() {
|
|
if (remaining)
|
|
lock->release(remaining);
|
|
}
|
|
};
|
|
|
|
FlowLock() : permits(1), active(0) {}
|
|
explicit FlowLock(int64_t permits) : permits(permits), active(0) {}
|
|
|
|
Future<Void> take(TaskPriority taskID = TaskPriority::DefaultYield, int64_t amount = 1) {
|
|
if (active + amount <= permits || active == 0) {
|
|
active += amount;
|
|
return safeYieldActor(this, taskID, amount);
|
|
}
|
|
return takeActor(this, taskID, amount);
|
|
}
|
|
void release(int64_t amount = 1) {
|
|
ASSERT((active > 0 || amount == 0) && active - amount >= 0);
|
|
active -= amount;
|
|
|
|
while (!takers.empty()) {
|
|
if (active + takers.begin()->second <= permits || active == 0) {
|
|
std::pair<Promise<Void>, int64_t> next = std::move(*takers.begin());
|
|
active += next.second;
|
|
takers.pop_front();
|
|
next.first.send(Void());
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
Future<Void> releaseWhen(Future<Void> const& signal, int amount = 1) {
|
|
return releaseWhenActor(this, signal, amount);
|
|
}
|
|
|
|
// returns when any permits are available, having taken as many as possible up to the given amount, and modifies
|
|
// amount to the number of permits taken
|
|
Future<Void> takeUpTo(int64_t& amount) { return takeMoreActor(this, &amount); }
|
|
|
|
int64_t available() const { return permits - active; }
|
|
int64_t activePermits() const { return active; }
|
|
int waiters() const { return takers.size(); }
|
|
|
|
// Try to send error to all current and future waiters
|
|
// Only works if broken_on_destruct.canBeSet()
|
|
void kill(Error e = broken_promise()) {
|
|
if (broken_on_destruct.canBeSet()) {
|
|
auto local = broken_on_destruct;
|
|
// It could be the case that calling broken_on_destruct destroys this FlowLock
|
|
local.sendError(e);
|
|
}
|
|
}
|
|
|
|
private:
|
|
std::list<std::pair<Promise<Void>, int64_t>> takers;
|
|
const int64_t permits;
|
|
int64_t active;
|
|
Promise<Void> broken_on_destruct;
|
|
|
|
ACTOR static Future<Void> takeActor(FlowLock* lock, TaskPriority taskID, int64_t amount) {
|
|
state std::list<std::pair<Promise<Void>, int64_t>>::iterator it =
|
|
lock->takers.emplace(lock->takers.end(), Promise<Void>(), amount);
|
|
|
|
try {
|
|
wait(it->first.getFuture());
|
|
} catch (Error& e) {
|
|
if (e.code() == error_code_actor_cancelled) {
|
|
lock->takers.erase(it);
|
|
lock->release(0);
|
|
}
|
|
throw;
|
|
}
|
|
try {
|
|
double duration = BUGGIFY_WITH_PROB(.001)
|
|
? deterministicRandom()->random01() * FLOW_KNOBS->BUGGIFY_FLOW_LOCK_RELEASE_DELAY
|
|
: 0.0;
|
|
choose {
|
|
when(wait(delay(duration, taskID))) {
|
|
} // So release()ing the lock doesn't cause arbitrary code to run on the stack
|
|
when(wait(lock->broken_on_destruct.getFuture())) {}
|
|
}
|
|
return Void();
|
|
} catch (...) {
|
|
TEST(true); // If we get cancelled here, we are holding the lock but the caller doesn't know, so release it
|
|
lock->release(amount);
|
|
throw;
|
|
}
|
|
}
|
|
|
|
ACTOR static Future<Void> takeMoreActor(FlowLock* lock, int64_t* amount) {
|
|
wait(lock->take());
|
|
int64_t extra = std::min(lock->available(), *amount - 1);
|
|
lock->active += extra;
|
|
*amount = 1 + extra;
|
|
return Void();
|
|
}
|
|
|
|
ACTOR static Future<Void> safeYieldActor(FlowLock* lock, TaskPriority taskID, int64_t amount) {
|
|
try {
|
|
choose {
|
|
when(wait(yield(taskID))) {}
|
|
when(wait(lock->broken_on_destruct.getFuture())) {}
|
|
}
|
|
return Void();
|
|
} catch (Error& e) {
|
|
lock->release(amount);
|
|
throw;
|
|
}
|
|
}
|
|
|
|
ACTOR static Future<Void> releaseWhenActor(FlowLock* self, Future<Void> signal, int64_t amount) {
|
|
wait(signal);
|
|
self->release(amount);
|
|
return Void();
|
|
}
|
|
};
|
|
|
|
struct NotifiedInt {
|
|
NotifiedInt(int64_t val = 0) : val(val) {}
|
|
|
|
Future<Void> whenAtLeast(int64_t limit) {
|
|
if (val >= limit)
|
|
return Void();
|
|
Promise<Void> p;
|
|
waiting.emplace(limit, p);
|
|
return p.getFuture();
|
|
}
|
|
|
|
int64_t get() const { return val; }
|
|
|
|
void set(int64_t v) {
|
|
ASSERT(v >= val);
|
|
if (v != val) {
|
|
val = v;
|
|
|
|
std::vector<Promise<Void>> toSend;
|
|
while (waiting.size() && v >= waiting.top().first) {
|
|
Promise<Void> p = std::move(waiting.top().second);
|
|
waiting.pop();
|
|
toSend.push_back(p);
|
|
}
|
|
for (auto& p : toSend) {
|
|
p.send(Void());
|
|
}
|
|
}
|
|
}
|
|
|
|
void operator=(int64_t v) { set(v); }
|
|
|
|
NotifiedInt(NotifiedInt&& r) noexcept : waiting(std::move(r.waiting)), val(r.val) {}
|
|
void operator=(NotifiedInt&& r) noexcept {
|
|
waiting = std::move(r.waiting);
|
|
val = r.val;
|
|
}
|
|
|
|
private:
|
|
typedef std::pair<int64_t, Promise<Void>> Item;
|
|
struct ItemCompare {
|
|
bool operator()(const Item& a, const Item& b) { return a.first > b.first; }
|
|
};
|
|
std::priority_queue<Item, std::vector<Item>, ItemCompare> waiting;
|
|
int64_t val;
|
|
};
|
|
|
|
struct BoundedFlowLock : NonCopyable, public ReferenceCounted<BoundedFlowLock> {
|
|
// BoundedFlowLock is different from a FlowLock in that it has a bound on how many locks can be taken from the
|
|
// oldest outstanding lock. For instance, with a FlowLock that has two permits, if one permit is taken but never
|
|
// released, the other permit can be reused an unlimited amount of times, but with a BoundedFlowLock, it can only be
|
|
// reused a fixed number of times.
|
|
|
|
struct Releaser : NonCopyable {
|
|
BoundedFlowLock* lock;
|
|
int64_t permitNumber;
|
|
Releaser() : lock(nullptr), permitNumber(0) {}
|
|
Releaser(BoundedFlowLock* lock, int64_t permitNumber) : lock(lock), permitNumber(permitNumber) {}
|
|
Releaser(Releaser&& r) noexcept : lock(r.lock), permitNumber(r.permitNumber) { r.permitNumber = 0; }
|
|
void operator=(Releaser&& r) {
|
|
if (permitNumber)
|
|
lock->release(permitNumber);
|
|
lock = r.lock;
|
|
permitNumber = r.permitNumber;
|
|
r.permitNumber = 0;
|
|
}
|
|
|
|
void release() {
|
|
if (permitNumber) {
|
|
lock->release(permitNumber);
|
|
}
|
|
permitNumber = 0;
|
|
}
|
|
|
|
~Releaser() {
|
|
if (permitNumber)
|
|
lock->release(permitNumber);
|
|
}
|
|
};
|
|
|
|
BoundedFlowLock() : minOutstanding(0), nextPermitNumber(0), unrestrictedPermits(1), boundedPermits(0) {}
|
|
explicit BoundedFlowLock(int64_t unrestrictedPermits, int64_t boundedPermits)
|
|
: minOutstanding(0), nextPermitNumber(0), unrestrictedPermits(unrestrictedPermits),
|
|
boundedPermits(boundedPermits) {}
|
|
|
|
Future<int64_t> take() { return takeActor(this); }
|
|
void release(int64_t permitNumber) {
|
|
outstanding.erase(permitNumber);
|
|
updateMinOutstanding();
|
|
}
|
|
|
|
private:
|
|
IndexedSet<int64_t, int64_t> outstanding;
|
|
NotifiedInt minOutstanding;
|
|
int64_t nextPermitNumber;
|
|
const int64_t unrestrictedPermits;
|
|
const int64_t boundedPermits;
|
|
|
|
void updateMinOutstanding() {
|
|
auto it = outstanding.index(unrestrictedPermits - 1);
|
|
if (it == outstanding.end()) {
|
|
minOutstanding.set(nextPermitNumber);
|
|
} else {
|
|
minOutstanding.set(*it);
|
|
}
|
|
}
|
|
|
|
ACTOR static Future<int64_t> takeActor(BoundedFlowLock* lock) {
|
|
state int64_t permitNumber = ++lock->nextPermitNumber;
|
|
lock->outstanding.insert(permitNumber, 1);
|
|
lock->updateMinOutstanding();
|
|
wait(lock->minOutstanding.whenAtLeast(std::max<int64_t>(0, permitNumber - lock->boundedPermits)));
|
|
return permitNumber;
|
|
}
|
|
};
|
|
|
|
ACTOR template <class T>
|
|
Future<Void> yieldPromiseStream(FutureStream<T> input,
|
|
PromiseStream<T> output,
|
|
TaskPriority taskID = TaskPriority::DefaultYield) {
|
|
loop {
|
|
T f = waitNext(input);
|
|
output.send(f);
|
|
wait(yield(taskID));
|
|
}
|
|
}
|
|
|
|
struct YieldedFutureActor final : SAV<Void>,
|
|
ActorCallback<YieldedFutureActor, 1, Void>,
|
|
FastAllocated<YieldedFutureActor> {
|
|
Error in_error_state;
|
|
|
|
typedef ActorCallback<YieldedFutureActor, 1, Void> CB1;
|
|
|
|
using FastAllocated<YieldedFutureActor>::operator new;
|
|
using FastAllocated<YieldedFutureActor>::operator delete;
|
|
|
|
YieldedFutureActor(Future<Void>&& f) : SAV<Void>(1, 1), in_error_state(Error::fromCode(UNSET_ERROR_CODE)) {
|
|
f.addYieldedCallbackAndClear(static_cast<ActorCallback<YieldedFutureActor, 1, Void>*>(this));
|
|
}
|
|
|
|
void cancel() override {
|
|
if (!SAV<Void>::canBeSet())
|
|
return; // Cancel could be invoked *by* a callback within finish(). Otherwise it's guaranteed that we are
|
|
// waiting either on the original future or on a delay().
|
|
ActorCallback<YieldedFutureActor, 1, Void>::remove();
|
|
SAV<Void>::sendErrorAndDelPromiseRef(actor_cancelled());
|
|
}
|
|
|
|
void destroy() override { delete this; }
|
|
|
|
#ifdef ENABLE_SAMPLING
|
|
LineageReference* lineageAddr() { return currentLineage; }
|
|
#endif
|
|
|
|
void a_callback_fire(ActorCallback<YieldedFutureActor, 1, Void>*, Void) {
|
|
if (int16_t(in_error_state.code()) == UNSET_ERROR_CODE) {
|
|
in_error_state = Error::fromCode(SET_ERROR_CODE);
|
|
if (check_yield())
|
|
doYield();
|
|
else
|
|
finish();
|
|
} else {
|
|
// We hit this case when and only when the delay() created by a previous doYield() fires. Then we want to
|
|
// get at least one task done, regardless of what check_yield() would say.
|
|
finish();
|
|
}
|
|
}
|
|
void a_callback_error(ActorCallback<YieldedFutureActor, 1, Void>*, Error const& err) {
|
|
ASSERT(int16_t(in_error_state.code()) == UNSET_ERROR_CODE);
|
|
in_error_state = err;
|
|
if (check_yield())
|
|
doYield();
|
|
else
|
|
finish();
|
|
}
|
|
void finish() {
|
|
ActorCallback<YieldedFutureActor, 1, Void>::remove();
|
|
if (int16_t(in_error_state.code()) == SET_ERROR_CODE)
|
|
SAV<Void>::sendAndDelPromiseRef(Void());
|
|
else
|
|
SAV<Void>::sendErrorAndDelPromiseRef(in_error_state);
|
|
}
|
|
void doYield() {
|
|
// Since we are being fired, we are the first callback in the ring, and `prev` is the source future
|
|
Callback<Void>* source = CB1::prev;
|
|
ASSERT(source->next == static_cast<CB1*>(this));
|
|
|
|
// Remove the source future from the ring. All the remaining callbacks in the ring should be yielded, since
|
|
// yielded callbacks are installed at the end
|
|
CB1::prev = source->prev;
|
|
CB1::prev->next = static_cast<CB1*>(this);
|
|
|
|
// The source future's ring is now empty, since we have removed all the callbacks
|
|
source->next = source->prev = source;
|
|
source->unwait();
|
|
|
|
// Link all the callbacks, including this one, into the ring of a delay future so that after a short time they
|
|
// will be fired again
|
|
delay(0, g_network->getCurrentTask()).addCallbackChainAndClear(static_cast<CB1*>(this));
|
|
}
|
|
};
|
|
|
|
inline Future<Void> yieldedFuture(Future<Void> f) {
|
|
if (f.isReady())
|
|
return yield();
|
|
else
|
|
return Future<Void>(new YieldedFutureActor(std::move(f)));
|
|
}
|
|
|
|
// An AsyncMap that uses a yieldedFuture in its onChange method.
|
|
template <class K, class V>
|
|
class YieldedAsyncMap : public AsyncMap<K, V> {
|
|
public:
|
|
Future<Void> onChange(K const& k) override { // throws broken_promise if this is destroyed
|
|
auto& item = AsyncMap<K, V>::items[k];
|
|
if (item.value == AsyncMap<K, V>::defaultValue)
|
|
return destroyOnCancelYield(this, k, item.change.getFuture());
|
|
return yieldedFuture(item.change.getFuture());
|
|
}
|
|
|
|
ACTOR static Future<Void> destroyOnCancelYield(YieldedAsyncMap* self, K key, Future<Void> change) {
|
|
try {
|
|
wait(yieldedFuture(change));
|
|
return Void();
|
|
} catch (Error& e) {
|
|
if (e.code() == error_code_actor_cancelled && !self->destructing && change.getFutureReferenceCount() == 1 &&
|
|
change.getPromiseReferenceCount() == 1) {
|
|
if (EXPENSIVE_VALIDATION) {
|
|
auto& p = self->items[key];
|
|
ASSERT(p.change.getFuture() == change);
|
|
}
|
|
self->items.erase(key);
|
|
}
|
|
throw;
|
|
}
|
|
}
|
|
};
|
|
|
|
ACTOR template <class T>
|
|
Future<T> delayActionJittered(Future<T> what, double time) {
|
|
wait(delayJittered(time));
|
|
T t = wait(what);
|
|
return t;
|
|
}
|
|
|
|
class AndFuture {
|
|
public:
|
|
AndFuture() {}
|
|
|
|
AndFuture(AndFuture const& f) { futures = f.futures; }
|
|
|
|
AndFuture(AndFuture&& f) noexcept { futures = std::move(f.futures); }
|
|
|
|
AndFuture(Future<Void> const& f) { futures.push_back(f); }
|
|
|
|
AndFuture(Error const& e) { futures.push_back(e); }
|
|
|
|
operator Future<Void>() { return getFuture(); }
|
|
|
|
void operator=(AndFuture const& f) { futures = f.futures; }
|
|
|
|
void operator=(AndFuture&& f) noexcept { futures = std::move(f.futures); }
|
|
|
|
void operator=(Future<Void> const& f) { futures.push_back(f); }
|
|
|
|
void operator=(Error const& e) { futures.push_back(e); }
|
|
|
|
Future<Void> getFuture() {
|
|
if (futures.empty())
|
|
return Void();
|
|
|
|
if (futures.size() == 1)
|
|
return futures[0];
|
|
|
|
Future<Void> f = waitForAll(futures);
|
|
futures = std::vector<Future<Void>>{ f };
|
|
return f;
|
|
}
|
|
|
|
bool isReady() {
|
|
for (int i = futures.size() - 1; i >= 0; --i) {
|
|
if (!futures[i].isReady()) {
|
|
return false;
|
|
} else if (!futures[i].isError()) {
|
|
swapAndPop(&futures, i);
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool isError() {
|
|
for (int i = 0; i < futures.size(); i++)
|
|
if (futures[i].isError())
|
|
return true;
|
|
return false;
|
|
}
|
|
|
|
void cleanup() {
|
|
for (int i = 0; i < futures.size(); i++) {
|
|
if (futures[i].isReady() && !futures[i].isError()) {
|
|
swapAndPop(&futures, i--);
|
|
}
|
|
}
|
|
}
|
|
|
|
void add(Future<Void> const& f) {
|
|
if (!f.isReady() || f.isError())
|
|
futures.push_back(f);
|
|
}
|
|
|
|
void add(AndFuture f) { add(f.getFuture()); }
|
|
|
|
private:
|
|
std::vector<Future<Void>> futures;
|
|
};
|
|
|
|
// Performs an unordered merge of a and b.
|
|
ACTOR template <class T>
|
|
Future<Void> unorderedMergeStreams(FutureStream<T> a, FutureStream<T> b, PromiseStream<T> output) {
|
|
state Future<T> aFuture = waitAndForward(a);
|
|
state Future<T> bFuture = waitAndForward(b);
|
|
state bool aOpen = true;
|
|
state bool bOpen = true;
|
|
|
|
loop {
|
|
try {
|
|
choose {
|
|
when(T val = wait(aFuture)) {
|
|
output.send(val);
|
|
aFuture = waitAndForward(a);
|
|
}
|
|
when(T val = wait(bFuture)) {
|
|
output.send(val);
|
|
bFuture = waitAndForward(b);
|
|
}
|
|
}
|
|
} catch (Error& e) {
|
|
if (e.code() != error_code_end_of_stream) {
|
|
output.sendError(e);
|
|
break;
|
|
}
|
|
|
|
ASSERT(!aFuture.isError() || !bFuture.isError() || aFuture.getError().code() == bFuture.getError().code());
|
|
|
|
if (aFuture.isError()) {
|
|
aFuture = Never();
|
|
aOpen = false;
|
|
}
|
|
if (bFuture.isError()) {
|
|
bFuture = Never();
|
|
bOpen = false;
|
|
}
|
|
|
|
if (!aOpen && !bOpen) {
|
|
output.sendError(e);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
return Void();
|
|
}
|
|
|
|
// Returns the ordered merge of a and b, assuming that a and b are both already ordered (prefer a over b if keys are
|
|
// equal). T must be a class that implements compare()
|
|
ACTOR template <class T>
|
|
Future<Void> orderedMergeStreams(FutureStream<T> a, FutureStream<T> b, PromiseStream<T> output) {
|
|
state Optional<T> savedKVa;
|
|
state bool aOpen;
|
|
state Optional<T> savedKVb;
|
|
state bool bOpen;
|
|
|
|
aOpen = bOpen = true;
|
|
|
|
loop {
|
|
if (aOpen && !savedKVa.present()) {
|
|
try {
|
|
T KVa = waitNext(a);
|
|
savedKVa = Optional<T>(KVa);
|
|
} catch (Error& e) {
|
|
if (e.code() == error_code_end_of_stream) {
|
|
aOpen = false;
|
|
if (!bOpen) {
|
|
output.sendError(e);
|
|
}
|
|
} else {
|
|
output.sendError(e);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
if (bOpen && !savedKVb.present()) {
|
|
try {
|
|
T KVb = waitNext(b);
|
|
savedKVb = Optional<T>(KVb);
|
|
} catch (Error& e) {
|
|
if (e.code() == error_code_end_of_stream) {
|
|
bOpen = false;
|
|
if (!aOpen) {
|
|
output.sendError(e);
|
|
}
|
|
} else {
|
|
output.sendError(e);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!aOpen) {
|
|
output.send(savedKVb.get());
|
|
savedKVb = Optional<T>();
|
|
} else if (!bOpen) {
|
|
output.send(savedKVa.get());
|
|
savedKVa = Optional<T>();
|
|
} else {
|
|
int cmp = savedKVa.get().compare(savedKVb.get());
|
|
|
|
if (cmp == 0) {
|
|
// prefer a
|
|
output.send(savedKVa.get());
|
|
savedKVa = Optional<T>();
|
|
savedKVb = Optional<T>();
|
|
} else if (cmp < 0) {
|
|
output.send(savedKVa.get());
|
|
savedKVa = Optional<T>();
|
|
} else {
|
|
output.send(savedKVb.get());
|
|
savedKVb = Optional<T>();
|
|
}
|
|
}
|
|
}
|
|
|
|
return Void();
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<Void> timeReply(Future<T> replyToTime, PromiseStream<double> timeOutput) {
|
|
state double startTime = now();
|
|
try {
|
|
T _ = wait(replyToTime);
|
|
wait(delay(0));
|
|
timeOutput.send(now() - startTime);
|
|
} catch (Error& e) {
|
|
// Ignore broken promises. They typically occur during shutdown and our callers don't want to have to create
|
|
// brokenPromiseToNever actors to ignore them. For what it's worth we are breaking timeOutput to pass the pain
|
|
// along.
|
|
if (e.code() != error_code_broken_promise)
|
|
throw;
|
|
}
|
|
return Void();
|
|
}
|
|
|
|
ACTOR template <class T>
|
|
Future<T> forward(Future<T> from, Promise<T> to) {
|
|
try {
|
|
T res = wait(from);
|
|
to.send(res);
|
|
return res;
|
|
} catch (Error& e) {
|
|
if (e.code() != error_code_actor_cancelled) {
|
|
to.sendError(e);
|
|
}
|
|
throw e;
|
|
}
|
|
}
|
|
|
|
// Monad
|
|
|
|
ACTOR template <class Fun, class T>
|
|
Future<decltype(std::declval<Fun>()(std::declval<T>()))> fmap(Fun fun, Future<T> f) {
|
|
T val = wait(f);
|
|
return fun(val);
|
|
}
|
|
|
|
ACTOR template <class T, class Fun>
|
|
Future<decltype(std::declval<Fun>()(std::declval<T>()).getValue())> runAfter(Future<T> lhs, Fun rhs) {
|
|
T val1 = wait(lhs);
|
|
decltype(std::declval<Fun>()(std::declval<T>()).getValue()) res = wait(rhs(val1));
|
|
return res;
|
|
}
|
|
|
|
ACTOR template <class T, class U>
|
|
Future<U> runAfter(Future<T> lhs, Future<U> rhs) {
|
|
T val1 = wait(lhs);
|
|
U res = wait(rhs);
|
|
return res;
|
|
}
|
|
|
|
template <class T, class Fun>
|
|
auto operator>>=(Future<T> lhs, Fun&& rhs) -> Future<decltype(rhs(std::declval<T>()))> {
|
|
return runAfter(lhs, std::forward<Fun>(rhs));
|
|
}
|
|
|
|
template <class T, class U>
|
|
Future<U> operator>>(Future<T> const& lhs, Future<U> const& rhs) {
|
|
return runAfter(lhs, rhs);
|
|
}
|
|
|
|
/*
|
|
* IAsyncListener is similar to AsyncVar, but it decouples the input and output, so the translation unit
|
|
* responsible for handling the output does not need to have knowledge of how the output is generated
|
|
*/
|
|
template <class Output>
|
|
class IAsyncListener : public ReferenceCounted<IAsyncListener<Output>> {
|
|
public:
|
|
virtual ~IAsyncListener() = default;
|
|
virtual Output const& get() const = 0;
|
|
virtual Future<Void> onChange() const = 0;
|
|
template <class Input, class F>
|
|
static Reference<IAsyncListener> create(Reference<AsyncVar<Input> const> const& input, F const& f);
|
|
template <class Input, class F>
|
|
static Reference<IAsyncListener> create(Reference<AsyncVar<Input>> const& input, F const& f);
|
|
static Reference<IAsyncListener> create(Reference<AsyncVar<Output>> const& output);
|
|
};
|
|
|
|
namespace IAsyncListenerImpl {
|
|
|
|
template <class Input, class Output, class F>
|
|
class AsyncListener final : public IAsyncListener<Output> {
|
|
// Order matters here, output must outlive monitorActor
|
|
AsyncVar<Output> output;
|
|
Future<Void> monitorActor;
|
|
ACTOR static Future<Void> monitor(Reference<AsyncVar<Input> const> input, AsyncVar<Output>* output, F f) {
|
|
loop {
|
|
wait(input->onChange());
|
|
output->set(f(input->get()));
|
|
}
|
|
}
|
|
|
|
public:
|
|
AsyncListener(Reference<AsyncVar<Input> const> const& input, F const& f)
|
|
: output(f(input->get())), monitorActor(monitor(input, &output, f)) {}
|
|
Output const& get() const override { return output.get(); }
|
|
Future<Void> onChange() const override { return output.onChange(); }
|
|
};
|
|
|
|
} // namespace IAsyncListenerImpl
|
|
|
|
template <class Output>
|
|
template <class Input, class F>
|
|
Reference<IAsyncListener<Output>> IAsyncListener<Output>::create(Reference<AsyncVar<Input> const> const& input,
|
|
F const& f) {
|
|
return makeReference<IAsyncListenerImpl::AsyncListener<Input, Output, F>>(input, f);
|
|
}
|
|
|
|
template <class Output>
|
|
template <class Input, class F>
|
|
Reference<IAsyncListener<Output>> IAsyncListener<Output>::create(Reference<AsyncVar<Input>> const& input, F const& f) {
|
|
return create(Reference<AsyncVar<Input> const>(input), f);
|
|
}
|
|
|
|
template <class Output>
|
|
Reference<IAsyncListener<Output>> IAsyncListener<Output>::create(Reference<AsyncVar<Output>> const& input) {
|
|
auto identity = [](const auto& x) { return x; };
|
|
return makeReference<IAsyncListenerImpl::AsyncListener<Output, Output, decltype(identity)>>(input, identity);
|
|
}
|
|
|
|
// A weak reference type to wrap a future Reference<T> object.
|
|
// Once the future is complete, this object holds a pointer to the referenced object but does
|
|
// not contribute to its reference count.
|
|
//
|
|
// WARNING: this class will not be aware when the underlying object is destroyed. It is up to the
|
|
// user to make sure that an UnsafeWeakFutureReference is discarded at the same time the object is.
|
|
template <class T>
|
|
class UnsafeWeakFutureReference {
|
|
public:
|
|
UnsafeWeakFutureReference() {}
|
|
UnsafeWeakFutureReference(Future<Reference<T>> future) : data(new UnsafeWeakFutureReferenceData(future)) {}
|
|
|
|
// Returns a future to obtain a normal reference handle
|
|
// If the future is ready, this creates a Reference<T> to wrap the object
|
|
Future<Reference<T>> get() {
|
|
if (!data) {
|
|
return Reference<T>();
|
|
} else if (data->ptr.present()) {
|
|
return Reference<T>::addRef(data->ptr.get());
|
|
} else {
|
|
return data->future;
|
|
}
|
|
}
|
|
|
|
// Returns the raw pointer, if the object is ready
|
|
// Note: this should be used with care, as this pointer is not counted as a reference to the object and
|
|
// it could be deleted if all normal references are destroyed.
|
|
Optional<T*> getPtrIfReady() { return data->ptr; }
|
|
|
|
private:
|
|
// A class to hold the state for an UnsafeWeakFutureReference
|
|
struct UnsafeWeakFutureReferenceData : public ReferenceCounted<UnsafeWeakFutureReferenceData>, NonCopyable {
|
|
Optional<T*> ptr;
|
|
Future<Reference<T>> future;
|
|
Future<Void> moveResultFuture;
|
|
|
|
UnsafeWeakFutureReferenceData(Future<Reference<T>> future) : future(future) {
|
|
moveResultFuture = moveResult(this);
|
|
}
|
|
|
|
// Waits for the future to complete and then stores the pointer in local storage
|
|
// When this completes, we will no longer be counted toward the reference count of the object
|
|
ACTOR Future<Void> moveResult(UnsafeWeakFutureReferenceData* self) {
|
|
Reference<T> result = wait(self->future);
|
|
self->ptr = result.getPtr();
|
|
self->future = Future<Reference<T>>();
|
|
return Void();
|
|
}
|
|
};
|
|
|
|
Reference<UnsafeWeakFutureReferenceData> data;
|
|
};
|
|
|
|
// Utility class to provide FLOW compliant singleton pattern.
|
|
// In similuation, the approach allows per-virtual process singleton as desired compared to one singleton instance
|
|
// shared across all virtual processes if 'static singleton' pattern is implemented.
|
|
//
|
|
// API NOTE: Client are expected to pass functor allowing instantiation of the template class
|
|
template <class T>
|
|
class FlowSingleton {
|
|
public:
|
|
static Reference<T> getInstance(std::function<Reference<T>()> func) {
|
|
ASSERT(g_network->isSimulated());
|
|
|
|
auto cItr = instanceMap.find(g_network->getLocalAddress());
|
|
if (cItr == instanceMap.end()) {
|
|
instanceMap.emplace(g_network->getLocalAddress(), func());
|
|
return instanceMap[g_network->getLocalAddress()];
|
|
} else {
|
|
return cItr->second;
|
|
}
|
|
}
|
|
|
|
static int getCount() { return instanceMap.size(); }
|
|
static void resetInstances() { instanceMap.clear(); }
|
|
|
|
private:
|
|
static std::unordered_map<NetworkAddress, Reference<T>> instanceMap;
|
|
};
|
|
|
|
template <class T>
|
|
std::unordered_map<NetworkAddress, Reference<T>> FlowSingleton<T>::instanceMap;
|
|
|
|
#include "flow/unactorcompiler.h"
|
|
|
|
#endif
|