/* * genericactors.actor.h * * This source file is part of the FoundationDB open source project * * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once // When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source // version. #include #if defined(NO_INTELLISENSE) && !defined(FLOW_GENERICACTORS_ACTOR_G_H) #define FLOW_GENERICACTORS_ACTOR_G_H #include "flow/genericactors.actor.g.h" #elif !defined(GENERICACTORS_ACTOR_H) #define GENERICACTORS_ACTOR_H #include #include #include "flow/flow.h" #include "flow/Knobs.h" #include "flow/Util.h" #include "flow/IndexedSet.h" #include "flow/actorcompiler.h" // This must be the last #include. #ifdef _MSC_VER #pragma warning(disable : 4355) // 'this' : used in base member initializer list #endif ACTOR template Future traceAfter(Future what, const char* type, const char* key, X value, bool traceErrors = false) { try { T val = wait(what); TraceEvent(type).detail(key, value); return val; } catch (Error& e) { if (traceErrors) TraceEvent(type).errorUnsuppressed(e).detail(key, value); throw; } } ACTOR template Future traceAfterCall(Future what, const char* type, const char* key, X func, bool traceErrors = false) { try { state T val = wait(what); try { TraceEvent(type).detail(key, func(val)); } catch (Error& e) { TraceEvent(SevError, "TraceAfterCallError").error(e); } return val; } catch (Error& e) { if (traceErrors) TraceEvent(type).errorUnsuppressed(e); throw; } } ACTOR template Future> stopAfter(Future what) { state Optional ret = T(); try { T _ = wait(what); ret = Optional(_); } catch (Error& e) { bool ok = e.code() == error_code_please_reboot || e.code() == error_code_please_reboot_delete || e.code() == error_code_actor_cancelled || e.code() == error_code_please_reboot_remote_kv_store; TraceEvent(ok ? SevInfo : SevError, "StopAfterError").error(e); if (!ok) { fprintf(stderr, "Fatal Error: %s\n", e.what()); ret = Optional(); } } g_network->stop(); return ret; } template T sorted(T range) { std::sort(range.begin(), range.end()); return range; } template ErrorOr errorOr(T t) { return ErrorOr(t); } ACTOR template Future> errorOr(Future f) { try { T t = wait(f); return ErrorOr(t); } catch (Error& e) { return ErrorOr(e); } } ACTOR template Future throwErrorOr(Future> f) { ErrorOr t = wait(f); if (t.isError()) throw t.getError(); return t.get(); } ACTOR template Future transformErrors(Future f, Error err) { try { T t = wait(f); return t; } catch (Error& e) { if (e.code() == error_code_actor_cancelled) throw e; throw err; } } ACTOR template Future transformError(Future f, Error inErr, Error outErr) { try { T t = wait(f); return t; } catch (Error& e) { if (e.code() == inErr.code()) throw outErr; throw e; } } // Note that the RequestStream version of forwardPromise doesn't exist, because what to do with errors? ACTOR template void forwardEvent(Event* ev, Future input) { try { T value = wait(input); } catch (Error&) { } ev->set(); } ACTOR template void forwardEvent(Event* ev, T* t, Error* err, FutureStream input) { try { T value = waitNext(input); *t = std::move(value); ev->set(); } catch (Error& e) { *err = e; ev->set(); } } ACTOR template Future waitForAllReady(std::vector> results) { state int i = 0; loop { if (i == results.size()) return Void(); try { wait(success(results[i])); } catch (...) { } i++; } } ACTOR template Future timeout(Future what, double time, T timedoutValue, TaskPriority taskID = TaskPriority::DefaultDelay) { Future end = delay(time, taskID); choose { when(T t = wait(what)) { return t; } when(wait(end)) { return timedoutValue; } } } ACTOR template Future> timeout(Future what, double time) { Future end = delay(time); choose { when(T t = wait(what)) { return t; } when(wait(end)) { return Optional(); } } } ACTOR template Future timeoutError(Future what, double time, TaskPriority taskID = TaskPriority::DefaultDelay) { Future end = delay(time, taskID); choose { when(T t = wait(what)) { return t; } when(wait(end)) { throw timed_out(); } } } ACTOR template Future delayed(Future what, double time = 0.0, TaskPriority taskID = TaskPriority::DefaultDelay) { try { state T t = wait(what); wait(delay(time, taskID)); return t; } catch (Error& e) { state Error err = e; wait(delay(time, taskID)); throw err; } } ACTOR template Future recurring(Func what, double interval, TaskPriority taskID = TaskPriority::DefaultDelay) { loop choose { when(wait(delay(interval, taskID))) { what(); } } } ACTOR template Future trigger(Func what, Future signal) { wait(signal); what(); return Void(); } ACTOR template Future triggerOnError(Func what, Future signal) { try { wait(signal); } catch (Error& e) { what(); } return Void(); } // Waits for a future to complete and cannot be cancelled // Most situations will use the overload below, which does not require a promise ACTOR template void uncancellable(Future what, Promise result) { try { T val = wait(what); result.send(val); } catch (Error& e) { result.sendError(e); } } // Waits for a future to complete and cannot be cancelled ACTOR template [[flow_allow_discard]] Future uncancellable(Future what) { Promise resultPromise; Future result = resultPromise.getFuture(); uncancellable(what, resultPromise); T val = wait(result); return val; } // Holds onto an object until a future either completes or is cancelled // Used to prevent the object from being reclaimed // // NOTE: the order of the arguments is important. The arguments will be destructed in // reverse order, and we need the object to be destructed last. ACTOR template Future holdWhile(X object, Future what) { T val = wait(what); return val; } ACTOR template Future holdWhileVoid(X object, Future what) { T val = wait(what); return Void(); } // Assign the future value of what to out template Future store(T& out, Future what) { return map(what, [&out](T const& v) { out = v; return Void(); }); } template Future storeOrThrow(T& out, Future> what, Error e = key_not_found()) { return map(what, [&out, e](Optional const& o) { if (!o.present()) throw e; out = o.get(); return Void(); }); } // Waits for a future to be ready, and then applies an asynchronous function to it. ACTOR template ()(std::declval()).getValue())> Future mapAsync(Future what, F actorFunc) { T val = wait(what); U ret = wait(actorFunc(val)); return ret; } // maps a vector of futures with an asynchronous function template auto mapAsync(std::vector> const& what, F const& actorFunc) { std::vector> ret; ret.reserve(what.size()); for (const auto& f : what) ret.push_back(mapAsync(f, actorFunc)); return ret; } // maps a stream with an asynchronous function ACTOR template ()(std::declval()).getValue())> Future mapAsync(FutureStream input, F actorFunc, PromiseStream output) { state Deque> futures; loop { try { choose { when(T nextInput = waitNext(input)) { futures.push_back(actorFunc(nextInput)); } when(U nextOutput = wait(futures.size() == 0 ? Never() : futures.front())) { output.send(nextOutput); futures.pop_front(); } } } catch (Error& e) { if (e.code() == error_code_end_of_stream) { break; } else { output.sendError(e); throw e; } } } while (futures.size()) { U nextOutput = wait(futures.front()); output.send(nextOutput); futures.pop_front(); } output.sendError(end_of_stream()); return Void(); } // Waits for a future to be ready, and then applies a function to it. ACTOR template Future> map(Future what, F func) { T val = wait(what); return func(val); } // maps a vector of futures template auto map(std::vector> const& what, F const& func) { std::vector>> ret; ret.reserve(what.size()); for (const auto& f : what) ret.push_back(map(f, func)); return ret; } // maps a stream ACTOR template Future map(FutureStream input, F func, PromiseStream> output) { loop { try { T nextInput = waitNext(input); output.send(func(nextInput)); } catch (Error& e) { if (e.code() == error_code_end_of_stream) { break; } else throw; } } output.sendError(end_of_stream()); return Void(); } // Returns if the future returns true, otherwise waits forever. ACTOR Future returnIfTrue(Future f); // Returns if the future, when waited on and then evaluated with the predicate, returns true, otherwise waits forever template Future returnIfTrue(Future what, F pred) { return returnIfTrue(map(what, pred)); } // filters a stream ACTOR template Future filter(FutureStream input, F pred, PromiseStream output) { loop { try { T nextInput = waitNext(input); if (func(nextInput)) output.send(nextInput); } catch (Error& e) { if (e.code() == error_code_end_of_stream) { break; } else throw; } } output.sendError(end_of_stream()); return Void(); } // filters a stream asynchronously ACTOR template Future asyncFilter(FutureStream input, F actorPred, PromiseStream output) { state Deque>> futures; state std::pair> p; loop { try { choose { when(T nextInput = waitNext(input)) { futures.emplace_back(nextInput, actorPred(nextInput)); } when(bool pass = wait(futures.size() == 0 ? Never() : futures.front().second)) { if (pass) output.send(futures.front().first); futures.pop_front(); } } } catch (Error& e) { if (e.code() == error_code_end_of_stream) { break; } else { throw e; } } } while (futures.size()) { p = futures.front(); bool pass = wait(p.second); if (pass) output.send(p.first); futures.pop_front(); } output.sendError(end_of_stream()); return Void(); } template struct WorkerCache { // SOMEDAY: Would we do better to use "unreliable" (at most once) transport for the initialize requests and get rid // of this? It doesn't provide true at most once behavior because things are removed from the cache after they have // terminated. bool exists(UID id) { return id_interface.count(id) != 0; } void set(UID id, const Future& onReady) { ASSERT(!exists(id)); id_interface[id] = onReady; } Future get(UID id) { ASSERT(exists(id)); return id_interface[id]; } Future removeOnReady(UID id, Future const& ready) { return removeOnReady(this, id, ready); } private: ACTOR static Future removeOnReady(WorkerCache* self, UID id, Future ready) { try { wait(ready); self->id_interface.erase(id); return Void(); } catch (Error& e) { self->id_interface.erase(id); throw; } } std::map> id_interface; }; template class AsyncMap : NonCopyable { public: // Represents a complete function from keys to values (K -> V) // All values not explicitly inserted map to V() // If this isn't appropriate, use V=Optional AsyncMap() : defaultValue(), destructing(false) {} virtual ~AsyncMap() { destructing = true; items.clear(); } void set(K const& k, V const& v) { auto& i = items[k]; if (i.value != v) setUnconditional(k, v, i); } void setUnconditional(K const& k, V const& v) { setUnconditional(k, v, items[k]); } void triggerAll() { std::vector> ps; for (auto it = items.begin(); it != items.end(); ++it) { ps.resize(ps.size() + 1); ps.back().swap(it->second.change); } std::vector> noDestroy = ps; // See explanation of noDestroy in setUnconditional() for (auto p = ps.begin(); p != ps.end(); ++p) p->send(Void()); } void triggerRange(K const& begin, K const& end) { std::vector> ps; for (auto it = items.lower_bound(begin); it != items.end() && it->first < end; ++it) { ps.resize(ps.size() + 1); ps.back().swap(it->second.change); } std::vector> noDestroy = ps; // See explanation of noDestroy in setUnconditional() for (auto p = ps.begin(); p != ps.end(); ++p) p->send(Void()); } void trigger(K const& key) { if (items.count(key) != 0) { auto& i = items[key]; Promise trigger; i.change.swap(trigger); Promise noDestroy = trigger; // See explanation of noDestroy in setUnconditional() if (i.value == defaultValue) items.erase(key); trigger.send(Void()); } } void clear(K const& k) { set(k, V()); } V const& get(K const& k) const { auto it = items.find(k); if (it != items.end()) return it->second.value; else return defaultValue; } int count(K const& k) const { auto it = items.find(k); if (it != items.end()) return 1; return 0; } virtual Future onChange(K const& k) { // throws broken_promise if this is destroyed auto& item = items[k]; if (item.value == defaultValue) return destroyOnCancel(this, k, item.change.getFuture()); return item.change.getFuture(); } std::vector getKeys() const { std::vector keys; keys.reserve(items.size()); for (auto i = items.begin(); i != items.end(); ++i) keys.push_back(i->first); return keys; } void resetNoWaiting() { for (auto i = items.begin(); i != items.end(); ++i) ASSERT(i->second.change.getFuture().getFutureReferenceCount() == 1); items.clear(); } protected: // Invariant: Every item in the map either has value!=defaultValue xor a destroyOnCancel actor waiting on // change.getFuture() struct P { V value; Promise change; P() : value() {} }; std::map items; const V defaultValue; bool destructing; void setUnconditional(K const& k, V const& v, P& i) { Promise trigger; i.change.swap(trigger); Promise noDestroy = trigger; // The send(Void()) or even V::operator= could cause destroyOnCancel, // which could undo the change to i.value here. Keeping the promise reference count >= 2 // prevents destroyOnCancel from erasing anything from the map. if (v == defaultValue) { items.erase(k); } else { i.value = v; } trigger.send(Void()); } ACTOR Future destroyOnCancel(AsyncMap* self, K key, Future change) { try { wait(change); return Void(); } catch (Error& e) { if (e.code() == error_code_actor_cancelled && !self->destructing && change.getFutureReferenceCount() == 1 && change.getPromiseReferenceCount() == 1) { if (EXPENSIVE_VALIDATION) { auto& p = self->items[key]; ASSERT(p.change.getFuture() == change); } self->items.erase(key); } throw; } } }; template class ReferencedObject : NonCopyable, public ReferenceCounted> { public: ReferencedObject() : value() {} ReferencedObject(V const& v) : value(v) {} ReferencedObject(V&& v) : value(std::move(v)) {} ReferencedObject(ReferencedObject&& r) : value(std::move(r.value)) {} void operator=(ReferencedObject&& r) { value = std::move(r.value); } V const& get() const { return value; } V& mutate() { return value; } void set(V const& v) { value = v; } void set(V&& v) { value = std::move(v); } static Reference> from(V const& v) { return makeReference>(v); } static Reference> from(V&& v) { return makeReference>(std::move(v)); } private: V value; }; template class AsyncVar : NonCopyable, public ReferenceCounted> { public: AsyncVar() : value() {} AsyncVar(V const& v) : value(v) {} AsyncVar(AsyncVar&& av) : value(std::move(av.value)), nextChange(std::move(av.nextChange)) {} void operator=(AsyncVar&& av) { value = std::move(av.value); nextChange = std::move(av.nextChange); } V const& get() const { return value; } Future onChange() const { return nextChange.getFuture(); } void set(V const& v) { if (v != value) setUnconditional(v); } void setUnconditional(V const& v) { Promise t; this->nextChange.swap(t); this->value = v; t.send(Void()); } void trigger() { Promise t; this->nextChange.swap(t); t.send(Void()); } private: V value; Promise nextChange; }; class AsyncTrigger : NonCopyable { public: AsyncTrigger() {} AsyncTrigger(AsyncTrigger&& at) : v(std::move(at.v)) {} void operator=(AsyncTrigger&& at) { v = std::move(at.v); } Future onTrigger() const { return v.onChange(); } void trigger() { v.trigger(); } private: AsyncVar v; }; // Binds an AsyncTrigger object to an AsyncVar, so when the AsyncVar changes // the AsyncTrigger is triggered. ACTOR template void forward(Reference const> from, AsyncTrigger* to) { loop { wait(from->onChange()); to->trigger(); } } class Debouncer : NonCopyable { public: explicit Debouncer(double delay) { worker = debounceWorker(this, delay); } Debouncer(Debouncer&& at) = default; Debouncer& operator=(Debouncer&& at) = default; Future onTrigger() { return output.onChange(); } void trigger() { input.setUnconditional(Void()); } private: AsyncVar input; AsyncVar output; Future worker; ACTOR Future debounceWorker(Debouncer* self, double bounceTime) { loop { wait(self->input.onChange()); loop { choose { when(wait(self->input.onChange())) {} when(wait(delay(bounceTime))) { break; } } } self->output.setUnconditional(Void()); } } }; ACTOR template Future asyncDeserialize(Reference>> input, Reference>> output) { loop { if (input->get().size()) { ObjectReader reader(input->get().begin(), IncludeVersion()); T res; reader.deserialize(res); output->set(res); } else output->set(Optional()); wait(input->onChange()); } } ACTOR template void forwardVector(Future values, std::vector> out) { V in = wait(values); ASSERT(in.size() == out.size()); for (int i = 0; i < out.size(); i++) out[i].send(in[i]); } ACTOR template Future delayedAsyncVar(Reference> in, Reference> out, double time) { try { loop { wait(delay(time)); out->set(in->get()); wait(in->onChange()); } } catch (Error& e) { out->set(in->get()); throw; } } ACTOR template Future setAfter(Reference> var, double time, T val) { wait(delay(time)); var->set(val); return Void(); } ACTOR template Future resetAfter(Reference> var, double time, T val, int warningLimit = -1, double warningResetDelay = 0, const char* context = nullptr) { state bool isEqual = var->get() == val; state Future resetDelay = isEqual ? Never() : delay(time); state int resetCount = 0; state double lastReset = now(); loop { choose { when(wait(resetDelay)) { var->set(val); if (now() - lastReset > warningResetDelay) { resetCount = 0; } resetCount++; if (context && warningLimit >= 0 && resetCount > warningLimit) { TraceEvent(SevWarnAlways, context) .detail("ResetCount", resetCount) .detail("LastReset", now() - lastReset); } lastReset = now(); isEqual = true; resetDelay = Never(); } when(wait(var->onChange())) {} } if (isEqual && var->get() != val) { isEqual = false; resetDelay = delay(time); } if (!isEqual && var->get() == val) { isEqual = true; resetDelay = Never(); } } } ACTOR template Future setWhenDoneOrError(Future condition, Reference> var, T val) { try { wait(condition); } catch (Error& e) { if (e.code() == error_code_actor_cancelled) throw; } var->set(val); return Void(); } Future allTrue(const std::vector>& all); Future anyTrue(std::vector>> const& input, Reference> const& output); Future cancelOnly(std::vector> const& futures); Future timeoutWarningCollector(FutureStream const& input, double const& logDelay, const char* const& context, UID const& id); Future quorumEqualsTrue(std::vector> const& futures, int const& required); Future lowPriorityDelay(double const& waitTime); ACTOR template Future streamHelper(PromiseStream output, PromiseStream errors, Future input) { try { T value = wait(input); output.send(value); } catch (Error& e) { if (e.code() == error_code_actor_cancelled) throw; errors.send(e); } return Void(); } template Future makeStream(const std::vector>& futures, PromiseStream& stream, PromiseStream& errors) { std::vector> forwarders; forwarders.reserve(futures.size()); for (int f = 0; f < futures.size(); f++) forwarders.push_back(streamHelper(stream, errors, futures[f])); return cancelOnly(forwarders); } template class QuorumCallback; template struct Quorum final : SAV { int antiQuorum; int count; static inline int sizeFor(int count) { return sizeof(Quorum) + sizeof(QuorumCallback) * count; } void destroy() override { int size = sizeFor(this->count); this->~Quorum(); freeFast(size, this); } void cancel() override { int cancelled_callbacks = 0; for (int i = 0; i < count; i++) if (callbacks()[i].next) { callbacks()[i].remove(); callbacks()[i].next = 0; ++cancelled_callbacks; } if (canBeSet()) sendError(actor_cancelled()); for (int i = 0; i < cancelled_callbacks; i++) delPromiseRef(); } explicit Quorum(int quorum, int count) : SAV(1, count), antiQuorum(count - quorum + 1), count(count) { if (!quorum) this->send(Void()); } void oneSuccess() { if (getPromiseReferenceCount() == antiQuorum && canBeSet()) this->sendAndDelPromiseRef(Void()); else delPromiseRef(); } void oneError(Error err) { if (canBeSet()) this->sendErrorAndDelPromiseRef(err); else delPromiseRef(); } QuorumCallback* callbacks() { return (QuorumCallback*)(this + 1); } }; template class QuorumCallback : public Callback { public: void fire(const T& value) override { Callback::remove(); Callback::next = 0; head->oneSuccess(); } void error(Error error) override { Callback::remove(); Callback::next = 0; head->oneError(error); } private: template friend Future quorum(std::vector> const& results, int n); Quorum* head; QuorumCallback() = default; QuorumCallback(Future future, Quorum* head) : head(head) { future.addCallbackAndClear(this); } }; template Future quorum(std::vector> const& results, int n) { ASSERT(n >= 0 && n <= results.size()); int size = Quorum::sizeFor(results.size()); Quorum* q = new (allocateFast(size)) Quorum(n, results.size()); QuorumCallback* nextCallback = q->callbacks(); for (auto& r : results) { if (r.isReady()) { new (nextCallback) QuorumCallback(); nextCallback->next = 0; if (r.isError()) q->oneError(r.getError()); else q->oneSuccess(); } else new (nextCallback) QuorumCallback(r, q); ++nextCallback; } return Future(q); } ACTOR template Future smartQuorum(std::vector> results, int required, double extraSeconds, TaskPriority taskID = TaskPriority::DefaultDelay) { if (results.empty() && required == 0) return Void(); wait(quorum(results, required)); choose { when(wait(quorum(results, (int)results.size()))) { return Void(); } when(wait(delay(extraSeconds, taskID))) { return Void(); } } } template Future waitForAll(std::vector> const& results) { if (results.empty()) return Void(); return quorum(results, (int)results.size()); } template Future waitForAny(std::vector> const& results) { if (results.empty()) return Void(); return quorum(results, 1); } ACTOR Future shortCircuitAny(std::vector> f); ACTOR template Future> getAll(std::vector> input) { if (input.empty()) return std::vector(); wait(quorum(input, input.size())); std::vector output; output.reserve(input.size()); for (int i = 0; i < input.size(); i++) output.push_back(input[i].get()); return output; } ACTOR template Future> appendAll(std::vector>> input) { wait(quorum(input, input.size())); std::vector output; size_t sz = 0; for (const auto& f : input) { sz += f.get().size(); } output.reserve(sz); for (int i = 0; i < input.size(); i++) { auto const& r = input[i].get(); output.insert(output.end(), r.begin(), r.end()); } return output; } ACTOR template Future onEqual(Future in, T equalTo) { T t = wait(in); if (t == equalTo) return Void(); wait(Never()); // never return throw internal_error(); // does not happen } ACTOR template Future success(Future of) { T t = wait(of); (void)t; return Void(); } ACTOR template Future ready(Future f) { try { wait(success(f)); } catch (...) { } return Void(); } ACTOR template Future waitAndForward(FutureStream input) { T output = waitNext(input); return output; } ACTOR template Future reportErrorsExcept(Future in, const char* context, UID id, std::set const* pExceptErrors) { try { T t = wait(in); return t; } catch (Error& e) { if (e.code() != error_code_actor_cancelled && (!pExceptErrors || !pExceptErrors->count(e.code()))) TraceEvent(SevError, context, id).error(e); throw; } } template Future reportErrors(Future const& in, const char* context, UID id = UID()) { return reportErrorsExcept(in, context, id, nullptr); } ACTOR template Future require(Future> in, int errorCode) { Optional o = wait(in); if (o.present()) { return o.get(); } else { throw Error(errorCode); } } ACTOR template Future waitForFirst(std::vector> items) { state PromiseStream resultStream; state PromiseStream errorStream; state Future forCancellation = makeStream(items, resultStream, errorStream); state FutureStream resultFutureStream = resultStream.getFuture(); state FutureStream errorFutureStream = errorStream.getFuture(); choose { when(T val = waitNext(resultFutureStream)) { forCancellation = Future(); return val; } when(Error e = waitNext(errorFutureStream)) { forCancellation = Future(); throw e; } } } ACTOR template Future tag(Future future, T what) { wait(future); return what; } ACTOR template Future tag(Future future, T what, PromiseStream stream) { wait(future); stream.send(what); return Void(); } ACTOR template Future tagError(Future future, Error e) { wait(future); throw e; } ACTOR template Future detach(Future f) { T x = wait(f); return x; } // If the future is ready, yields and returns. Otherwise, returns when future is set. template Future orYield(Future f) { if (f.isReady()) { if (f.isError()) return tagError(yield(), f.getError()); else return tag(yield(), f.get()); } else return f; } Future orYield(Future f); ACTOR template Future chooseActor(Future lhs, Future rhs) { choose { when(T t = wait(lhs)) { return t; } when(T t = wait(rhs)) { return t; } } } // set && set -> set // error && x -> error // all others -> unset inline Future operator&&(Future const& lhs, Future const& rhs) { if (lhs.isReady()) { if (lhs.isError()) return lhs; else return rhs; } if (rhs.isReady()) { if (rhs.isError()) return rhs; else return lhs; } return waitForAll(std::vector>{ lhs, rhs }); } // error || unset -> error // unset || unset -> unset // all others -> set inline Future operator||(Future const& lhs, Future const& rhs) { if (lhs.isReady()) { if (lhs.isError()) return lhs; if (rhs.isReady()) return rhs; return lhs; } return chooseActor(lhs, rhs); } ACTOR template Future brokenPromiseToNever(Future in) { try { T t = wait(in); return t; } catch (Error& e) { if (e.code() != error_code_broken_promise) throw; wait(Never()); // never return throw internal_error(); // does not happen } } ACTOR template Future brokenPromiseToMaybeDelivered(Future in) { try { T t = wait(in); return t; } catch (Error& e) { if (e.code() == error_code_broken_promise) { throw request_maybe_delivered(); } throw; } } ACTOR template void tagAndForward(Promise* pOutputPromise, U value, Future signal) { state Promise out(std::move(*pOutputPromise)); wait(signal); out.send(std::move(value)); } ACTOR template void tagAndForward(PromiseStream* pOutput, T value, Future signal) { wait(signal); pOutput->send(std::move(value)); } ACTOR template void tagAndForwardError(Promise* pOutputPromise, Error value, Future signal) { state Promise out(std::move(*pOutputPromise)); wait(signal); out.sendError(value); } ACTOR template void tagAndForwardError(PromiseStream* pOutput, Error value, Future signal) { wait(signal); pOutput->sendError(value); } ACTOR template Future waitOrError(Future f, Future errorSignal) { choose { when(T val = wait(f)) { return val; } when(wait(errorSignal)) { ASSERT(false); throw internal_error(); } } } // A low-overhead FIFO mutex made with no internal queue structure (no list, deque, vector, etc) // The lock is implemented as a Promise, which is returned to callers in a convenient wrapper // called Lock. // // Usage: // Lock lock = wait(mutex.take()); // lock.release(); // Next waiter will get the lock, OR // lock.error(e); // Next waiter will get e, future waiters will see broken_promise // lock = Lock(); // Or let Lock and any copies go out of scope. All waiters will see broken_promise. struct FlowMutex { FlowMutex() { lastPromise.send(Void()); } bool available() { return lastPromise.isSet(); } struct Lock { void release() { promise.send(Void()); } void error(Error e = broken_promise()) { promise.sendError(e); } // This is exposed in case the caller wants to use/copy it directly Promise promise; }; Future take() { Lock newLock; Future f = lastPromise.isSet() ? newLock : tag(lastPromise.getFuture(), newLock); lastPromise = newLock.promise; return f; } private: Promise lastPromise; }; ACTOR template Future forwardErrors(Future f, PromiseStream output) { try { T val = wait(f); return val; } catch (Error& e) { output.sendError(e); throw; } } struct FlowLock : NonCopyable, public ReferenceCounted { // FlowLock implements a nonblocking critical section: there can be only a limited number of clients executing code // between wait(take()) and release(). Not thread safe. take() returns only when the number of holders of the lock // is fewer than the number of permits, and release() makes the caller no longer a holder of the lock. release() // only runs waiting take()rs after the caller wait()s struct Releaser : NonCopyable { FlowLock* lock; int remaining; Releaser() : lock(0), remaining(0) {} Releaser(FlowLock& lock, int64_t amount = 1) : lock(&lock), remaining(amount) {} Releaser(Releaser&& r) noexcept : lock(r.lock), remaining(r.remaining) { r.remaining = 0; } void operator=(Releaser&& r) { if (remaining) lock->release(remaining); lock = r.lock; remaining = r.remaining; r.remaining = 0; } void release(int64_t amount = -1) { if (amount == -1 || amount > remaining) amount = remaining; if (remaining) lock->release(amount); remaining -= amount; } ~Releaser() { if (remaining) lock->release(remaining); } }; FlowLock() : permits(1), active(0) {} explicit FlowLock(int64_t permits) : permits(permits), active(0) {} Future take(TaskPriority taskID = TaskPriority::DefaultYield, int64_t amount = 1) { if (active + amount <= permits || active == 0) { active += amount; return safeYieldActor(this, taskID, amount); } return takeActor(this, taskID, amount); } void release(int64_t amount = 1) { ASSERT((active > 0 || amount == 0) && active - amount >= 0); active -= amount; while (!takers.empty()) { if (active + takers.begin()->second <= permits || active == 0) { std::pair, int64_t> next = std::move(*takers.begin()); active += next.second; takers.pop_front(); next.first.send(Void()); } else { break; } } } Future releaseWhen(Future const& signal, int amount = 1) { return releaseWhenActor(this, signal, amount); } // returns when any permits are available, having taken as many as possible up to the given amount, and modifies // amount to the number of permits taken Future takeUpTo(int64_t& amount) { return takeMoreActor(this, &amount); } int64_t available() const { return permits - active; } int64_t activePermits() const { return active; } int waiters() const { return takers.size(); } // Try to send error to all current and future waiters // Only works if broken_on_destruct.canBeSet() void kill(Error e = broken_promise()) { if (broken_on_destruct.canBeSet()) { auto local = broken_on_destruct; // It could be the case that calling broken_on_destruct destroys this FlowLock local.sendError(e); } } private: std::list, int64_t>> takers; const int64_t permits; int64_t active; Promise broken_on_destruct; ACTOR static Future takeActor(FlowLock* lock, TaskPriority taskID, int64_t amount) { state std::list, int64_t>>::iterator it = lock->takers.emplace(lock->takers.end(), Promise(), amount); try { wait(it->first.getFuture()); } catch (Error& e) { if (e.code() == error_code_actor_cancelled) { lock->takers.erase(it); lock->release(0); } throw; } try { double duration = BUGGIFY_WITH_PROB(.001) ? deterministicRandom()->random01() * FLOW_KNOBS->BUGGIFY_FLOW_LOCK_RELEASE_DELAY : 0.0; choose { when(wait(delay(duration, taskID))) { } // So release()ing the lock doesn't cause arbitrary code to run on the stack when(wait(lock->broken_on_destruct.getFuture())) {} } return Void(); } catch (...) { TEST(true); // If we get cancelled here, we are holding the lock but the caller doesn't know, so release it lock->release(amount); throw; } } ACTOR static Future takeMoreActor(FlowLock* lock, int64_t* amount) { wait(lock->take()); int64_t extra = std::min(lock->available(), *amount - 1); lock->active += extra; *amount = 1 + extra; return Void(); } ACTOR static Future safeYieldActor(FlowLock* lock, TaskPriority taskID, int64_t amount) { try { choose { when(wait(yield(taskID))) {} when(wait(lock->broken_on_destruct.getFuture())) {} } return Void(); } catch (Error& e) { lock->release(amount); throw; } } ACTOR static Future releaseWhenActor(FlowLock* self, Future signal, int64_t amount) { wait(signal); self->release(amount); return Void(); } }; struct NotifiedInt { NotifiedInt(int64_t val = 0) : val(val) {} Future whenAtLeast(int64_t limit) { if (val >= limit) return Void(); Promise p; waiting.emplace(limit, p); return p.getFuture(); } int64_t get() const { return val; } void set(int64_t v) { ASSERT(v >= val); if (v != val) { val = v; std::vector> toSend; while (waiting.size() && v >= waiting.top().first) { Promise p = std::move(waiting.top().second); waiting.pop(); toSend.push_back(p); } for (auto& p : toSend) { p.send(Void()); } } } void operator=(int64_t v) { set(v); } NotifiedInt(NotifiedInt&& r) noexcept : waiting(std::move(r.waiting)), val(r.val) {} void operator=(NotifiedInt&& r) noexcept { waiting = std::move(r.waiting); val = r.val; } private: typedef std::pair> Item; struct ItemCompare { bool operator()(const Item& a, const Item& b) { return a.first > b.first; } }; std::priority_queue, ItemCompare> waiting; int64_t val; }; struct BoundedFlowLock : NonCopyable, public ReferenceCounted { // BoundedFlowLock is different from a FlowLock in that it has a bound on how many locks can be taken from the // oldest outstanding lock. For instance, with a FlowLock that has two permits, if one permit is taken but never // released, the other permit can be reused an unlimited amount of times, but with a BoundedFlowLock, it can only be // reused a fixed number of times. struct Releaser : NonCopyable { BoundedFlowLock* lock; int64_t permitNumber; Releaser() : lock(nullptr), permitNumber(0) {} Releaser(BoundedFlowLock* lock, int64_t permitNumber) : lock(lock), permitNumber(permitNumber) {} Releaser(Releaser&& r) noexcept : lock(r.lock), permitNumber(r.permitNumber) { r.permitNumber = 0; } void operator=(Releaser&& r) { if (permitNumber) lock->release(permitNumber); lock = r.lock; permitNumber = r.permitNumber; r.permitNumber = 0; } void release() { if (permitNumber) { lock->release(permitNumber); } permitNumber = 0; } ~Releaser() { if (permitNumber) lock->release(permitNumber); } }; BoundedFlowLock() : minOutstanding(0), nextPermitNumber(0), unrestrictedPermits(1), boundedPermits(0) {} explicit BoundedFlowLock(int64_t unrestrictedPermits, int64_t boundedPermits) : minOutstanding(0), nextPermitNumber(0), unrestrictedPermits(unrestrictedPermits), boundedPermits(boundedPermits) {} Future take() { return takeActor(this); } void release(int64_t permitNumber) { outstanding.erase(permitNumber); updateMinOutstanding(); } private: IndexedSet outstanding; NotifiedInt minOutstanding; int64_t nextPermitNumber; const int64_t unrestrictedPermits; const int64_t boundedPermits; void updateMinOutstanding() { auto it = outstanding.index(unrestrictedPermits - 1); if (it == outstanding.end()) { minOutstanding.set(nextPermitNumber); } else { minOutstanding.set(*it); } } ACTOR static Future takeActor(BoundedFlowLock* lock) { state int64_t permitNumber = ++lock->nextPermitNumber; lock->outstanding.insert(permitNumber, 1); lock->updateMinOutstanding(); wait(lock->minOutstanding.whenAtLeast(std::max(0, permitNumber - lock->boundedPermits))); return permitNumber; } }; ACTOR template Future yieldPromiseStream(FutureStream input, PromiseStream output, TaskPriority taskID = TaskPriority::DefaultYield) { loop { T f = waitNext(input); output.send(f); wait(yield(taskID)); } } struct YieldedFutureActor final : SAV, ActorCallback, FastAllocated { Error in_error_state; typedef ActorCallback CB1; using FastAllocated::operator new; using FastAllocated::operator delete; YieldedFutureActor(Future&& f) : SAV(1, 1), in_error_state(Error::fromCode(UNSET_ERROR_CODE)) { f.addYieldedCallbackAndClear(static_cast*>(this)); } void cancel() override { if (!SAV::canBeSet()) return; // Cancel could be invoked *by* a callback within finish(). Otherwise it's guaranteed that we are // waiting either on the original future or on a delay(). ActorCallback::remove(); SAV::sendErrorAndDelPromiseRef(actor_cancelled()); } void destroy() override { delete this; } #ifdef ENABLE_SAMPLING LineageReference* lineageAddr() { return currentLineage; } #endif void a_callback_fire(ActorCallback*, Void) { if (int16_t(in_error_state.code()) == UNSET_ERROR_CODE) { in_error_state = Error::fromCode(SET_ERROR_CODE); if (check_yield()) doYield(); else finish(); } else { // We hit this case when and only when the delay() created by a previous doYield() fires. Then we want to // get at least one task done, regardless of what check_yield() would say. finish(); } } void a_callback_error(ActorCallback*, Error const& err) { ASSERT(int16_t(in_error_state.code()) == UNSET_ERROR_CODE); in_error_state = err; if (check_yield()) doYield(); else finish(); } void finish() { ActorCallback::remove(); if (int16_t(in_error_state.code()) == SET_ERROR_CODE) SAV::sendAndDelPromiseRef(Void()); else SAV::sendErrorAndDelPromiseRef(in_error_state); } void doYield() { // Since we are being fired, we are the first callback in the ring, and `prev` is the source future Callback* source = CB1::prev; ASSERT(source->next == static_cast(this)); // Remove the source future from the ring. All the remaining callbacks in the ring should be yielded, since // yielded callbacks are installed at the end CB1::prev = source->prev; CB1::prev->next = static_cast(this); // The source future's ring is now empty, since we have removed all the callbacks source->next = source->prev = source; source->unwait(); // Link all the callbacks, including this one, into the ring of a delay future so that after a short time they // will be fired again delay(0, g_network->getCurrentTask()).addCallbackChainAndClear(static_cast(this)); } }; inline Future yieldedFuture(Future f) { if (f.isReady()) return yield(); else return Future(new YieldedFutureActor(std::move(f))); } // An AsyncMap that uses a yieldedFuture in its onChange method. template class YieldedAsyncMap : public AsyncMap { public: Future onChange(K const& k) override { // throws broken_promise if this is destroyed auto& item = AsyncMap::items[k]; if (item.value == AsyncMap::defaultValue) return destroyOnCancelYield(this, k, item.change.getFuture()); return yieldedFuture(item.change.getFuture()); } ACTOR static Future destroyOnCancelYield(YieldedAsyncMap* self, K key, Future change) { try { wait(yieldedFuture(change)); return Void(); } catch (Error& e) { if (e.code() == error_code_actor_cancelled && !self->destructing && change.getFutureReferenceCount() == 1 && change.getPromiseReferenceCount() == 1) { if (EXPENSIVE_VALIDATION) { auto& p = self->items[key]; ASSERT(p.change.getFuture() == change); } self->items.erase(key); } throw; } } }; ACTOR template Future delayActionJittered(Future what, double time) { wait(delayJittered(time)); T t = wait(what); return t; } class AndFuture { public: AndFuture() {} AndFuture(AndFuture const& f) { futures = f.futures; } AndFuture(AndFuture&& f) noexcept { futures = std::move(f.futures); } AndFuture(Future const& f) { futures.push_back(f); } AndFuture(Error const& e) { futures.push_back(e); } operator Future() { return getFuture(); } void operator=(AndFuture const& f) { futures = f.futures; } void operator=(AndFuture&& f) noexcept { futures = std::move(f.futures); } void operator=(Future const& f) { futures.push_back(f); } void operator=(Error const& e) { futures.push_back(e); } Future getFuture() { if (futures.empty()) return Void(); if (futures.size() == 1) return futures[0]; Future f = waitForAll(futures); futures = std::vector>{ f }; return f; } bool isReady() { for (int i = futures.size() - 1; i >= 0; --i) { if (!futures[i].isReady()) { return false; } else if (!futures[i].isError()) { swapAndPop(&futures, i); } } return true; } bool isError() { for (int i = 0; i < futures.size(); i++) if (futures[i].isError()) return true; return false; } void cleanup() { for (int i = 0; i < futures.size(); i++) { if (futures[i].isReady() && !futures[i].isError()) { swapAndPop(&futures, i--); } } } void add(Future const& f) { if (!f.isReady() || f.isError()) futures.push_back(f); } void add(AndFuture f) { add(f.getFuture()); } private: std::vector> futures; }; // Performs an unordered merge of a and b. ACTOR template Future unorderedMergeStreams(FutureStream a, FutureStream b, PromiseStream output) { state Future aFuture = waitAndForward(a); state Future bFuture = waitAndForward(b); state bool aOpen = true; state bool bOpen = true; loop { try { choose { when(T val = wait(aFuture)) { output.send(val); aFuture = waitAndForward(a); } when(T val = wait(bFuture)) { output.send(val); bFuture = waitAndForward(b); } } } catch (Error& e) { if (e.code() != error_code_end_of_stream) { output.sendError(e); break; } ASSERT(!aFuture.isError() || !bFuture.isError() || aFuture.getError().code() == bFuture.getError().code()); if (aFuture.isError()) { aFuture = Never(); aOpen = false; } if (bFuture.isError()) { bFuture = Never(); bOpen = false; } if (!aOpen && !bOpen) { output.sendError(e); break; } } } return Void(); } // Returns the ordered merge of a and b, assuming that a and b are both already ordered (prefer a over b if keys are // equal). T must be a class that implements compare() ACTOR template Future orderedMergeStreams(FutureStream a, FutureStream b, PromiseStream output) { state Optional savedKVa; state bool aOpen; state Optional savedKVb; state bool bOpen; aOpen = bOpen = true; loop { if (aOpen && !savedKVa.present()) { try { T KVa = waitNext(a); savedKVa = Optional(KVa); } catch (Error& e) { if (e.code() == error_code_end_of_stream) { aOpen = false; if (!bOpen) { output.sendError(e); } } else { output.sendError(e); break; } } } if (bOpen && !savedKVb.present()) { try { T KVb = waitNext(b); savedKVb = Optional(KVb); } catch (Error& e) { if (e.code() == error_code_end_of_stream) { bOpen = false; if (!aOpen) { output.sendError(e); } } else { output.sendError(e); break; } } } if (!aOpen) { output.send(savedKVb.get()); savedKVb = Optional(); } else if (!bOpen) { output.send(savedKVa.get()); savedKVa = Optional(); } else { int cmp = savedKVa.get().compare(savedKVb.get()); if (cmp == 0) { // prefer a output.send(savedKVa.get()); savedKVa = Optional(); savedKVb = Optional(); } else if (cmp < 0) { output.send(savedKVa.get()); savedKVa = Optional(); } else { output.send(savedKVb.get()); savedKVb = Optional(); } } } return Void(); } ACTOR template Future timeReply(Future replyToTime, PromiseStream timeOutput) { state double startTime = now(); try { T _ = wait(replyToTime); wait(delay(0)); timeOutput.send(now() - startTime); } catch (Error& e) { // Ignore broken promises. They typically occur during shutdown and our callers don't want to have to create // brokenPromiseToNever actors to ignore them. For what it's worth we are breaking timeOutput to pass the pain // along. if (e.code() != error_code_broken_promise) throw; } return Void(); } ACTOR template Future forward(Future from, Promise to) { try { T res = wait(from); to.send(res); return res; } catch (Error& e) { if (e.code() != error_code_actor_cancelled) { to.sendError(e); } throw e; } } // Monad ACTOR template Future()(std::declval()))> fmap(Fun fun, Future f) { T val = wait(f); return fun(val); } ACTOR template Future()(std::declval()).getValue())> runAfter(Future lhs, Fun rhs) { T val1 = wait(lhs); decltype(std::declval()(std::declval()).getValue()) res = wait(rhs(val1)); return res; } ACTOR template Future runAfter(Future lhs, Future rhs) { T val1 = wait(lhs); U res = wait(rhs); return res; } template auto operator>>=(Future lhs, Fun&& rhs) -> Future()))> { return runAfter(lhs, std::forward(rhs)); } template Future operator>>(Future const& lhs, Future const& rhs) { return runAfter(lhs, rhs); } /* * IAsyncListener is similar to AsyncVar, but it decouples the input and output, so the translation unit * responsible for handling the output does not need to have knowledge of how the output is generated */ template class IAsyncListener : public ReferenceCounted> { public: virtual ~IAsyncListener() = default; virtual Output const& get() const = 0; virtual Future onChange() const = 0; template static Reference create(Reference const> const& input, F const& f); template static Reference create(Reference> const& input, F const& f); static Reference create(Reference> const& output); }; namespace IAsyncListenerImpl { template class AsyncListener final : public IAsyncListener { // Order matters here, output must outlive monitorActor AsyncVar output; Future monitorActor; ACTOR static Future monitor(Reference const> input, AsyncVar* output, F f) { loop { wait(input->onChange()); output->set(f(input->get())); } } public: AsyncListener(Reference const> const& input, F const& f) : output(f(input->get())), monitorActor(monitor(input, &output, f)) {} Output const& get() const override { return output.get(); } Future onChange() const override { return output.onChange(); } }; } // namespace IAsyncListenerImpl template template Reference> IAsyncListener::create(Reference const> const& input, F const& f) { return makeReference>(input, f); } template template Reference> IAsyncListener::create(Reference> const& input, F const& f) { return create(Reference const>(input), f); } template Reference> IAsyncListener::create(Reference> const& input) { auto identity = [](const auto& x) { return x; }; return makeReference>(input, identity); } // A weak reference type to wrap a future Reference object. // Once the future is complete, this object holds a pointer to the referenced object but does // not contribute to its reference count. // // WARNING: this class will not be aware when the underlying object is destroyed. It is up to the // user to make sure that an UnsafeWeakFutureReference is discarded at the same time the object is. template class UnsafeWeakFutureReference { public: UnsafeWeakFutureReference() {} UnsafeWeakFutureReference(Future> future) : data(new UnsafeWeakFutureReferenceData(future)) {} // Returns a future to obtain a normal reference handle // If the future is ready, this creates a Reference to wrap the object Future> get() { if (!data) { return Reference(); } else if (data->ptr.present()) { return Reference::addRef(data->ptr.get()); } else { return data->future; } } // Returns the raw pointer, if the object is ready // Note: this should be used with care, as this pointer is not counted as a reference to the object and // it could be deleted if all normal references are destroyed. Optional getPtrIfReady() { return data->ptr; } private: // A class to hold the state for an UnsafeWeakFutureReference struct UnsafeWeakFutureReferenceData : public ReferenceCounted, NonCopyable { Optional ptr; Future> future; Future moveResultFuture; UnsafeWeakFutureReferenceData(Future> future) : future(future) { moveResultFuture = moveResult(this); } // Waits for the future to complete and then stores the pointer in local storage // When this completes, we will no longer be counted toward the reference count of the object ACTOR Future moveResult(UnsafeWeakFutureReferenceData* self) { Reference result = wait(self->future); self->ptr = result.getPtr(); self->future = Future>(); return Void(); } }; Reference data; }; // Call a lambda every seconds ACTOR template Future repeatEvery(double interval, Fn fn) { loop { wait(delay(interval)); fn(); } } #include "flow/unactorcompiler.h" #endif