/* * genericactors.actor.h * * This source file is part of the FoundationDB open source project * * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once // When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source version. #if defined(NO_INTELLISENSE) && !defined(FLOW_GENERICACTORS_ACTOR_G_H) #define FLOW_GENERICACTORS_ACTOR_G_H #include "flow/genericactors.actor.g.h" #elif !defined(GENERICACTORS_ACTOR_H) #define GENERICACTORS_ACTOR_H #include #include "flow/flow.h" #include "flow/Knobs.h" #include "flow/Util.h" #include "flow/IndexedSet.h" #include "flow/actorcompiler.h" // This must be the last #include. #pragma warning( disable: 4355 ) // 'this' : used in base member initializer list ACTOR template Future traceAfter(Future what, const char* type, const char* key, X value, bool traceErrors = false) { try { T val = wait(what); TraceEvent(type).detail(key, value); return val; } catch( Error &e ) { if(traceErrors) TraceEvent(type).error(e,true).detail(key, value); throw; } } ACTOR template Future traceAfterCall(Future what, const char* type, const char* key, X func, bool traceErrors = false) { try { state T val = wait(what); try { TraceEvent(type).detail(key, func(val)); } catch( Error &e ) { TraceEvent(SevError, "TraceAfterCallError").error(e); } return val; } catch( Error &e ) { if(traceErrors) TraceEvent(type).error(e,true); throw; } } ACTOR template Future> stopAfter( Future what ) { state Optional ret = T(); try { T _ = wait(what); ret = Optional(_); } catch (Error& e) { bool ok = e.code() == error_code_please_reboot || e.code() == error_code_please_reboot_delete || e.code() == error_code_actor_cancelled; TraceEvent(ok ? SevInfo : SevError, "StopAfterError").error(e); if(!ok) { fprintf(stderr, "Fatal Error: %s\n", e.what()); ret = Optional(); } } g_network->stop(); return ret; } template T sorted(T range) { std::sort(range.begin(), range.end()); return range; } template ErrorOr errorOr( T t ) { return ErrorOr(t); } ACTOR template Future> errorOr( Future f ) { try { T t = wait(f); return ErrorOr(t); } catch (Error& e) { return ErrorOr(e); } } ACTOR template Future throwErrorOr( Future> f ) { ErrorOr t = wait(f); if(t.isError()) throw t.getError(); return t.get(); } ACTOR template Future transformErrors( Future f, Error err ) { try { T t = wait( f ); return t; } catch( Error &e ) { if( e.code() == error_code_actor_cancelled ) throw e; throw err; } } ACTOR template Future transformError( Future f, Error inErr, Error outErr ) { try { T t = wait( f ); return t; } catch( Error &e ) { if( e.code() == inErr.code() ) throw outErr; throw e; } } // Note that the RequestStream version of forwardPromise doesn't exist, because what to do with errors? ACTOR template void forwardEvent( Event* ev, Future input ) { try { T value = wait(input); } catch (Error&) { } ev->set(); } ACTOR template void forwardEvent( Event* ev, T* t, Error* err, FutureStream input ) { try { T value = waitNext(input); *t = std::move(value); ev->set(); } catch (Error& e) { *err = e; ev->set(); } } ACTOR template Future waitForAllReady( std::vector> results ) { state int i = 0; loop { if (i == results.size()) return Void(); try { wait(success(results[i])); } catch (...) { } i++; } } ACTOR template Future timeout( Future what, double time, T timedoutValue, TaskPriority taskID = TaskPriority::DefaultDelay ) { Future end = delay( time, taskID ); choose { when( T t = wait( what ) ) { return t; } when( wait( end ) ) { return timedoutValue; } } } ACTOR template Future> timeout( Future what, double time ) { Future end = delay( time ); choose { when( T t = wait( what ) ) { return t; } when( wait( end ) ) { return Optional(); } } } ACTOR template Future timeoutError( Future what, double time, TaskPriority taskID = TaskPriority::DefaultDelay ) { Future end = delay( time, taskID ); choose { when( T t = wait( what ) ) { return t; } when( wait( end ) ) { throw timed_out(); } } } ACTOR template Future delayed( Future what, double time = 0.0, TaskPriority taskID = TaskPriority::DefaultDelay ) { try { state T t = wait( what ); wait( delay( time, taskID ) ); return t; } catch( Error &e ) { state Error err = e; wait( delay( time, taskID ) ); throw err; } } ACTOR template Future recurring( Func what, double interval, TaskPriority taskID = TaskPriority::DefaultDelay ) { loop choose { when ( wait( delay( interval, taskID ) ) ) { what(); } } } ACTOR template Future trigger( Func what, Future signal ) { wait( signal ); what(); return Void(); } ACTOR template Future triggerOnError( Func what, Future signal ) { try { wait( signal ); } catch(Error &e) { what(); } return Void(); } //Waits for a future to complete and cannot be cancelled //Most situations will use the overload below, which does not require a promise ACTOR template void uncancellable(Future what, Promise result) { try { T val = wait(what); result.send(val); } catch( Error &e ) { result.sendError(e); } } // Waits for a future to complete and cannot be cancelled ACTOR template [[flow_allow_discard]] Future uncancellable(Future what) { Promise resultPromise; Future result = resultPromise.getFuture(); uncancellable(what, resultPromise); T val = wait(result); return val; } //Holds onto an object until a future either completes or is cancelled //Used to prevent the object from being reclaimed // // NOTE: the order of the arguments is important. The arguments will be destructed in // reverse order, and we need the object to be destructed last. ACTOR template Future holdWhile(X object, Future what) { T val = wait(what); return val; } ACTOR template Future holdWhileVoid(X object, Future what) { T val = wait(what); return Void(); } // Assign the future value of what to out template Future store(T &out, Future what) { return map(what, [&out](T const &v) { out = v; return Void(); }); } template Future storeOrThrow(T &out, Future> what, Error e = key_not_found()) { return map(what, [&out,e](Optional const &o) { if(!o.present()) throw e; out = o.get(); return Void(); }); } //Waits for a future to be ready, and then applies an asynchronous function to it. ACTOR template()(fake()).getValue() )> Future mapAsync(Future what, F actorFunc) { T val = wait(what); U ret = wait(actorFunc(val)); return ret; } //maps a vector of futures with an asynchronous function template std::vector>> mapAsync(std::vector> const& what, F const& actorFunc) { std::vector> ret; ret.reserve(what.size()); for (const auto& f : what) ret.push_back(mapAsync(f, actorFunc)); return ret; } //maps a stream with an asynchronous function ACTOR template()(fake()).getValue() )> Future mapAsync( FutureStream input, F actorFunc, PromiseStream output ) { state Deque> futures; loop { try { choose { when( T nextInput = waitNext( input ) ) { futures.push_back( actorFunc(nextInput) ); } when( U nextOutput = wait( futures.size() == 0 ? Never() : futures.front() ) ) { output.send( nextOutput ); futures.pop_front(); } } } catch ( Error& e ) { if( e.code() == error_code_end_of_stream ) { break; } else { output.sendError( e ); throw e; } } } while(futures.size()) { U nextOutput = wait( futures.front() ); output.send( nextOutput ); futures.pop_front(); } output.sendError(end_of_stream()); return Void(); } //Waits for a future to be ready, and then applies a function to it. ACTOR template Future> map(Future what, F func) { T val = wait(what); return func(val); } //maps a vector of futures template std::vector>> map(std::vector> const& what, F const& func) { std::vector>> ret; ret.reserve(what.size()); for (const auto& f : what) ret.push_back(map(f, func)); return ret; } //maps a stream ACTOR template Future map( FutureStream input, F func, PromiseStream> output ) { loop { try { T nextInput = waitNext( input ); output.send(func(nextInput)); } catch ( Error& e ) { if( e.code() == error_code_end_of_stream ) { break; } else throw; } } output.sendError(end_of_stream()); return Void(); } //Returns if the future returns true, otherwise waits forever. ACTOR Future returnIfTrue( Future f ); //Returns if the future, when waited on and then evaluated with the predicate, returns true, otherwise waits forever template Future returnIfTrue( Future what, F pred) { return returnIfTrue( map( what, pred ) ); } //filters a stream ACTOR template Future filter( FutureStream input, F pred, PromiseStream output ) { loop { try { T nextInput = waitNext( input ); if(func(nextInput)) output.send(nextInput); } catch ( Error& e ) { if( e.code() == error_code_end_of_stream ) { break; } else throw; } } output.sendError(end_of_stream()); return Void(); } //filters a stream asynchronously ACTOR template Future asyncFilter( FutureStream input, F actorPred, PromiseStream output ) { state Deque>> futures; state std::pair> p; loop { try { choose { when ( T nextInput = waitNext(input) ) { futures.push_back( std::pair>(nextInput, actorPred(nextInput)) ); } when ( bool pass = wait( futures.size() == 0 ? Never() : futures.front().second ) ) { if(pass) output.send(futures.front().first); futures.pop_front(); } } } catch ( Error& e ) { if( e.code() == error_code_end_of_stream ) { break; } else { throw e; } } } while(futures.size()) { p = futures.front(); bool pass = wait( p.second ); if(pass) output.send(p.first); futures.pop_front(); } output.sendError(end_of_stream()); return Void(); } template struct WorkerCache { // SOMEDAY: Would we do better to use "unreliable" (at most once) transport for the initialize requests and get rid of this? // It doesn't provide true at most once behavior because things are removed from the cache after they have terminated. bool exists( UID id ) { return id_interface.count( id ) != 0; } void set( UID id, const Future& onReady ) { ASSERT( !exists( id ) ); id_interface[ id ] = onReady; } Future get( UID id ) { ASSERT( exists( id ) ); return id_interface[ id ]; } Future removeOnReady( UID id, Future const& ready ) { return removeOnReady( this, id, ready ); } private: ACTOR static Future removeOnReady( WorkerCache* self, UID id, Future ready ) { try { wait(ready); self->id_interface.erase(id); return Void(); } catch ( Error &e ) { self->id_interface.erase(id); throw; } } std::map> id_interface; }; template class AsyncMap : NonCopyable { public: // Represents a complete function from keys to values (K -> V) // All values not explicitly inserted map to V() // If this isn't appropriate, use V=Optional AsyncMap() : defaultValue(), destructing(false) {} virtual ~AsyncMap() { destructing = true; items.clear(); } void set( K const& k, V const& v ) { auto& i = items[k]; if (i.value != v) setUnconditional(k,v,i); } void setUnconditional( K const& k, V const& v ) { setUnconditional(k,v,items[k]); } void triggerAll() { std::vector> ps; for(auto it = items.begin(); it != items.end(); ++it){ ps.resize(ps.size()+1); ps.back().swap( it->second.change ); } std::vector> noDestroy = ps; // See explanation of noDestroy in setUnconditional() for(auto p=ps.begin(); p!=ps.end(); ++p) p->send(Void()); } void triggerRange( K const& begin, K const& end ) { std::vector> ps; for(auto it = items.lower_bound(begin); it != items.end() && it->first < end; ++it){ ps.resize(ps.size()+1); ps.back().swap( it->second.change ); } std::vector> noDestroy = ps; // See explanation of noDestroy in setUnconditional() for(auto p=ps.begin(); p!=ps.end(); ++p) p->send(Void()); } void trigger( K const& key ) { if( items.count(key) != 0 ) { auto& i = items[key]; Promise trigger; i.change.swap(trigger); Promise noDestroy = trigger; // See explanation of noDestroy in setUnconditional() if (i.value == defaultValue) items.erase(key); trigger.send(Void()); } } void clear( K const& k ) { set(k, V()); } V const& get( K const& k ) { auto it = items.find(k); if (it != items.end()) return it->second.value; else return defaultValue; } int count( K const& k ) { auto it = items.find(k); if (it != items.end()) return 1; return 0; } virtual Future onChange( K const& k ) { // throws broken_promise if this is destroyed auto &item = items[k]; if (item.value == defaultValue) return destroyOnCancel( this, k, item.change.getFuture() ); return item.change.getFuture(); } std::vector getKeys() { std::vector keys; keys.reserve(items.size()); for(auto i = items.begin(); i != items.end(); ++i) keys.push_back( i->first ); return keys; } void resetNoWaiting() { for(auto i = items.begin(); i != items.end(); ++i) ASSERT( i->second.change.getFuture().getFutureReferenceCount() == 1 ); items.clear(); } protected: // Invariant: Every item in the map either has value!=defaultValue xor a destroyOnCancel actor waiting on change.getFuture() struct P { V value; Promise change; P() : value() {} }; std::map items; const V defaultValue; bool destructing; void setUnconditional( K const& k, V const& v, P& i ) { Promise trigger; i.change.swap(trigger); Promise noDestroy = trigger; // The send(Void()) or even V::operator= could cause destroyOnCancel, // which could undo the change to i.value here. Keeping the promise reference count >= 2 // prevents destroyOnCancel from erasing anything from the map. if (v == defaultValue) items.erase(k); else i.value = v; trigger.send(Void()); } ACTOR Future destroyOnCancel( AsyncMap* self, K key, Future change ) { try { wait(change); return Void(); } catch (Error& e) { if (e.code() == error_code_actor_cancelled && !self->destructing && change.getFutureReferenceCount()==1 && change.getPromiseReferenceCount()==1) { if(EXPENSIVE_VALIDATION) { auto& p = self->items[key]; ASSERT(p.change.getFuture() == change); } self->items.erase(key); } throw; } } }; template class ReferencedObject : NonCopyable, public ReferenceCounted> { public: ReferencedObject() : value() {} ReferencedObject(V const& v) : value(v) {} ReferencedObject(V&& v) : value(std::move(v)) {} ReferencedObject(ReferencedObject&& r) : value(std::move(r.value)) {} void operator=(ReferencedObject&& r) { value = std::move(r.value); } V const& get() const { return value; } V& mutate() { return value; } void set(V const& v) { value = v; } void set(V&& v) { value = std::move(v); } static Reference> from(V const& v) { return Reference>(new ReferencedObject(v)); } static Reference> from(V&& v) { return Reference>(new ReferencedObject(std::move(v))); } private: V value; }; template class AsyncVar : NonCopyable, public ReferenceCounted> { public: AsyncVar() : value() {} AsyncVar( V const& v ) : value(v) {} AsyncVar(AsyncVar&& av) : value(std::move(av.value)), nextChange(std::move(av.nextChange)) {} void operator=(AsyncVar&& av) { value = std::move(av.value); nextChange = std::move(av.nextChange); } V const& get() const { return value; } Future onChange() const { return nextChange.getFuture(); } void set( V const& v ) { if (v != value) setUnconditional(v); } void setUnconditional( V const& v ) { Promise t; this->nextChange.swap(t); this->value = v; t.send(Void()); } void trigger() { Promise t; this->nextChange.swap(t); t.send(Void()); } private: V value; Promise nextChange; }; class AsyncTrigger : NonCopyable { public: AsyncTrigger() {} AsyncTrigger(AsyncTrigger&& at) : v(std::move(at.v)) {} void operator=(AsyncTrigger&& at) { v = std::move(at.v); } Future onTrigger() { return v.onChange(); } void trigger() { v.trigger(); } private: AsyncVar v; }; class Debouncer : NonCopyable { public: explicit Debouncer( double delay ) { worker = debounceWorker(this, delay); } Debouncer(Debouncer&& at) : input(std::move(at.input)), output(std::move(at.output)) {} void operator=(Debouncer&& at) { input = std::move(at.input); output = std::move(at.output); } Future onTrigger() { return output.onChange(); } void trigger() { input.setUnconditional(Void()); } private: AsyncVar input; AsyncVar output; Future worker; ACTOR Future debounceWorker( Debouncer* self, double bounceTime ) { loop { wait( self->input.onChange() ); loop { choose { when(wait( self->input.onChange() )) {} when(wait( delay(bounceTime) )) { break; } } } self->output.setUnconditional(Void()); } } }; ACTOR template Future asyncDeserialize(Reference>> input, Reference>> output) { loop { if (input->get().size()) { ObjectReader reader(input->get().begin(), IncludeVersion()); T res; reader.deserialize(res); output->set(res); } else output->set( Optional() ); wait( input->onChange() ); } } ACTOR template void forwardVector( Future values, std::vector> out ) { V in = wait( values ); ASSERT (in.size() == out.size()); for(int i=0; i Future delayedAsyncVar(Reference> in, Reference> out, double time) { try { loop { wait( delay( time ) ); out->set( in->get() ); wait( in->onChange() ); } } catch (Error& e) { out->set( in->get() ); throw; } } ACTOR template Future setAfter(Reference> var, double time, T val) { wait( delay( time ) ); var->set( val ); return Void(); } ACTOR template Future resetAfter( Reference> var, double time, T val, int warningLimit = -1, double warningResetDelay = 0, const char* context = NULL ) { state bool isEqual = var->get() == val; state Future resetDelay = isEqual ? Never() : delay(time); state int resetCount = 0; state double lastReset = now(); loop { choose { when( wait( resetDelay ) ) { var->set( val ); if(now() - lastReset > warningResetDelay) { resetCount = 0; } resetCount++; if(context && warningLimit >= 0 && resetCount > warningLimit) { TraceEvent(SevWarnAlways, context).detail("ResetCount", resetCount).detail("LastReset", now() - lastReset); } lastReset = now(); isEqual = true; resetDelay = Never(); } when( wait( var->onChange() ) ) {} } if( isEqual && var->get() != val ) { isEqual = false; resetDelay = delay(time); } if( !isEqual && var->get() == val ) { isEqual = true; resetDelay = Never(); } } } ACTOR template Future setWhenDoneOrError( Future condition, Reference> var, T val ) { try { wait( condition ); } catch ( Error& e ) { if (e.code() == error_code_actor_cancelled) throw; } var->set( val ); return Void(); } Future allTrue( const std::vector>& all ); Future anyTrue( std::vector>> const& input, Reference> const& output ); Future cancelOnly( std::vector> const& futures ); Future timeoutWarningCollector( FutureStream const& input, double const& logDelay, const char* const& context, UID const& id ); Future quorumEqualsTrue( std::vector> const& futures, int const& required ); Future lowPriorityDelay( double const& waitTime ); ACTOR template Future ioTimeoutError( Future what, double time ) { Future end = lowPriorityDelay( time ); choose { when( T t = wait( what ) ) { return t; } when( wait( end ) ) { Error err = io_timeout(); if(g_network->isSimulated()) { err = err.asInjectedFault(); } TraceEvent(SevError, "IoTimeoutError").error(err); throw err; } } } ACTOR template Future streamHelper( PromiseStream output, PromiseStream errors, Future input ) { try { T value = wait(input); output.send(value); } catch (Error& e) { if (e.code() == error_code_actor_cancelled) throw; errors.send(e); } return Void(); } template Future makeStream( const std::vector>& futures, PromiseStream& stream, PromiseStream& errors ) { std::vector> forwarders; forwarders.reserve(futures.size()); for(int f=0; f class QuorumCallback; template struct Quorum : SAV { int antiQuorum; int count; static inline int sizeFor(int count) { return sizeof(Quorum) + sizeof(QuorumCallback)*count; } virtual void destroy() { int size = sizeFor(this->count); this->~Quorum(); freeFast(size, this); } virtual void cancel() { int cancelled_callbacks = 0; for (int i = 0; i < count; i++) if (callbacks()[i].next) { callbacks()[i].remove(); callbacks()[i].next = 0; ++cancelled_callbacks; } if (canBeSet()) sendError(actor_cancelled()); for (int i = 0; i < cancelled_callbacks; i++) delPromiseRef(); } explicit Quorum(int quorum, int count) : SAV(1, count), antiQuorum(count - quorum + 1), count(count) { if (!quorum) this->send(Void()); } void oneSuccess() { if (getPromiseReferenceCount() == antiQuorum && canBeSet()) this->sendAndDelPromiseRef(Void()); else delPromiseRef(); } void oneError(Error err) { if (canBeSet()) this->sendErrorAndDelPromiseRef(err); else delPromiseRef(); } QuorumCallback* callbacks() { return (QuorumCallback*)(this + 1); } }; template class QuorumCallback : public Callback { public: virtual void fire(const T& value) { Callback::remove(); Callback::next = 0; head->oneSuccess(); } virtual void error(Error error) { Callback::remove(); Callback::next = 0; head->oneError(error); } private: template friend Future quorum(std::vector> const& results, int n); Quorum* head; QuorumCallback() = default; QuorumCallback(Future future, Quorum* head) : head(head) { future.addCallbackAndClear(this); } }; template Future quorum(std::vector> const& results, int n) { ASSERT(n >= 0 && n <= results.size()); int size = Quorum::sizeFor(results.size()); Quorum* q = new (allocateFast(size)) Quorum(n, results.size()); QuorumCallback* nextCallback = q->callbacks(); for (auto& r : results) { if (r.isReady()) { new (nextCallback) QuorumCallback(); nextCallback->next = 0; if (r.isError()) q->oneError(r.getError()); else q->oneSuccess(); } else new (nextCallback) QuorumCallback(r, q); ++nextCallback; } return Future(q); } ACTOR template Future smartQuorum( std::vector> results, int required, double extraSeconds, TaskPriority taskID = TaskPriority::DefaultDelay ) { if (results.empty() && required == 0) return Void(); wait(quorum(results, required)); choose { when (wait(quorum(results, (int)results.size()))) {return Void();} when (wait(delay(extraSeconds, taskID))) {return Void(); } } } template Future waitForAll( std::vector> const& results ) { if (results.empty()) return Void(); return quorum( results, (int)results.size() ); } template Future waitForAny( std::vector> const& results ) { if (results.empty()) return Void(); return quorum( results, 1 ); } ACTOR Future shortCircuitAny( std::vector> f ); ACTOR template Future> getAll( std::vector> input ) { if (input.empty()) return std::vector(); wait( quorum( input, input.size() ) ); std::vector output; output.reserve(input.size()); for(int i=0; i Future> appendAll( std::vector>> input ) { wait( quorum( input, input.size() ) ); std::vector output; size_t sz = 0; for (const auto& f : input) { sz += f.get().size(); } output.reserve(sz); for(int i=0; i Future onEqual( Future in, T equalTo ) { T t = wait(in); if ( t == equalTo ) return Void(); wait(Never()); // never return throw internal_error(); // does not happen } ACTOR template Future success( Future of ) { T t = wait( of ); (void)t; return Void(); } ACTOR template Future ready( Future f ) { try { wait(success(f)); } catch (...) { } return Void(); } ACTOR template Future waitAndForward( FutureStream input ) { T output = waitNext( input ); return output; } ACTOR template Future reportErrorsExcept( Future in, const char* context, UID id, std::set const* pExceptErrors ) { try { T t = wait( in ); return t; } catch (Error& e) { if (e.code() != error_code_actor_cancelled && (!pExceptErrors || !pExceptErrors->count(e.code()))) TraceEvent(SevError, context, id).error(e); throw; } } template Future reportErrors( Future const& in, const char* context, UID id = UID() ) { return reportErrorsExcept(in, context, id, NULL); } ACTOR template Future require( Future> in, int errorCode ) { Optional o = wait(in); if (o.present()) { return o.get(); } else { throw Error(errorCode); } } ACTOR template Future waitForFirst( std::vector> items ) { state PromiseStream resultStream; state PromiseStream errorStream; state Future forCancellation = makeStream( items, resultStream, errorStream ); state FutureStream resultFutureStream = resultStream.getFuture(); state FutureStream errorFutureStream = errorStream.getFuture(); choose { when (T val = waitNext( resultFutureStream )) { forCancellation = Future(); return val; } when (Error e = waitNext( errorFutureStream )) { forCancellation = Future(); throw e; } } } ACTOR template Future tag( Future future, T what ) { wait(future); return what; } ACTOR template Future tag( Future future, T what, PromiseStream stream ) { wait( future ); stream.send( what ); return Void(); } ACTOR template Future tagError( Future future, Error e) { wait(future); throw e; } //If the future is ready, yields and returns. Otherwise, returns when future is set. template Future orYield( Future f ) { if(f.isReady()) { if(f.isError()) return tagError(yield(), f.getError()); else return tag(yield(), f.get()); } else return f; } Future orYield( Future f ); ACTOR template Future chooseActor( Future lhs, Future rhs ) { choose { when ( T t = wait(lhs) ) { return t; } when ( T t = wait(rhs) ) { return t; } } } // set && set -> set // error && x -> error // all others -> unset inline Future operator &&( Future const& lhs, Future const& rhs ) { if(lhs.isReady()) { if(lhs.isError()) return lhs; else return rhs; } if(rhs.isReady()) { if(rhs.isError()) return rhs; else return lhs; } return waitForAll(std::vector>{ lhs, rhs }); } // error || unset -> error // unset || unset -> unset // all others -> set inline Future operator ||( Future const& lhs, Future const& rhs ) { if(lhs.isReady()) { if(lhs.isError()) return lhs; if(rhs.isReady()) return rhs; return lhs; } return chooseActor( lhs, rhs ); } ACTOR template Future brokenPromiseToNever( Future in ) { try { T t = wait(in); return t; } catch (Error& e) { if (e.code() != error_code_broken_promise) throw; wait(Never()); // never return throw internal_error(); // does not happen } } ACTOR template Future brokenPromiseToMaybeDelivered( Future in ) { try { T t = wait(in); return t; } catch (Error& e) { if (e.code() == error_code_broken_promise) { throw request_maybe_delivered(); } throw; } } ACTOR template void tagAndForward( Promise* pOutputPromise, T value, Future signal ) { state Promise out( std::move(*pOutputPromise) ); wait( signal ); out.send(value); } ACTOR template void tagAndForwardError( Promise* pOutputPromise, Error value, Future signal ) { state Promise out( std::move(*pOutputPromise) ); wait( signal ); out.sendError(value); } ACTOR template Future waitOrError(Future f, Future errorSignal) { choose { when(T val = wait(f)) { return val; } when(wait(errorSignal)) { ASSERT(false); throw internal_error(); } } } struct FlowLock : NonCopyable, public ReferenceCounted { // FlowLock implements a nonblocking critical section: there can be only a limited number of clients executing code between // wait(take()) and release(). Not thread safe. take() returns only when the number of holders of the lock is fewer than the // number of permits, and release() makes the caller no longer a holder of the lock. release() only runs waiting take()rs // after the caller wait()s struct Releaser : NonCopyable { FlowLock* lock; int remaining; Releaser() : lock(0), remaining(0) {} Releaser( FlowLock& lock, int64_t amount = 1 ) : lock(&lock), remaining(amount) {} Releaser(Releaser&& r) BOOST_NOEXCEPT : lock(r.lock), remaining(r.remaining) { r.remaining = 0; } void operator=(Releaser&& r) { if (remaining) lock->release(remaining); lock = r.lock; remaining = r.remaining; r.remaining = 0; } void release( int64_t amount = -1 ) { if( amount == -1 || amount > remaining ) amount = remaining; if (remaining) lock->release( amount ); remaining -= amount; } ~Releaser() { if (remaining) lock->release(remaining); } }; FlowLock() : permits(1), active(0) {} explicit FlowLock(int64_t permits) : permits(permits), active(0) {} Future take(TaskPriority taskID = TaskPriority::DefaultYield, int64_t amount = 1) { if (active + amount <= permits || active == 0) { active += amount; return safeYieldActor(this, taskID, amount); } return takeActor(this, taskID, amount); } void release( int64_t amount = 1 ) { ASSERT( (active > 0 || amount == 0) && active - amount >= 0 ); active -= amount; while( !takers.empty() ) { if( active + takers.begin()->second <= permits || active == 0 ) { std::pair< Promise, int64_t > next = std::move( *takers.begin() ); active += next.second; takers.pop_front(); next.first.send(Void()); } else { break; } } } Future releaseWhen( Future const& signal, int amount = 1 ) { return releaseWhenActor( this, signal, amount ); } // returns when any permits are available, having taken as many as possible up to the given amount, and modifies amount to the number of permits taken Future takeUpTo(int64_t& amount) { return takeMoreActor(this, &amount); } int64_t available() const { return permits - active; } int64_t activePermits() const { return active; } int waiters() const { return takers.size(); } private: std::list< std::pair< Promise, int64_t > > takers; const int64_t permits; int64_t active; Promise broken_on_destruct; ACTOR static Future takeActor(FlowLock* lock, TaskPriority taskID, int64_t amount) { state std::list, int64_t>>::iterator it = lock->takers.insert(lock->takers.end(), std::make_pair(Promise(), amount)); try { wait( it->first.getFuture() ); } catch (Error& e) { if (e.code() == error_code_actor_cancelled) { lock->takers.erase(it); lock->release(0); } throw; } try { double duration = BUGGIFY_WITH_PROB(.001) ? deterministicRandom()->random01()*FLOW_KNOBS->BUGGIFY_FLOW_LOCK_RELEASE_DELAY : 0.0; choose{ when(wait(delay(duration, taskID))) {} // So release()ing the lock doesn't cause arbitrary code to run on the stack when(wait(lock->broken_on_destruct.getFuture())) {} } return Void(); } catch (...) { TEST(true); // If we get cancelled here, we are holding the lock but the caller doesn't know, so release it lock->release(amount); throw; } } ACTOR static Future takeMoreActor(FlowLock* lock, int64_t* amount) { wait(lock->take()); int64_t extra = std::min( lock->available(), *amount-1 ); lock->active += extra; *amount = 1 + extra; return Void(); } ACTOR static Future safeYieldActor(FlowLock* lock, TaskPriority taskID, int64_t amount) { try { choose{ when(wait(yield(taskID))) {} when(wait(lock->broken_on_destruct.getFuture())) {} } return Void(); } catch (Error& e) { lock->release(amount); throw; } } ACTOR static Future releaseWhenActor( FlowLock* self, Future signal, int64_t amount ) { wait(signal); self->release(amount); return Void(); } }; struct NotifiedInt { NotifiedInt( int64_t val = 0 ) : val(val) {} Future whenAtLeast( int64_t limit ) { if (val >= limit) return Void(); Promise p; waiting.push( std::make_pair(limit,p) ); return p.getFuture(); } int64_t get() const { return val; } void set( int64_t v ) { ASSERT( v >= val ); if (v != val) { val = v; std::vector> toSend; while ( waiting.size() && v >= waiting.top().first ) { Promise p = std::move(waiting.top().second); waiting.pop(); toSend.push_back(p); } for(auto& p : toSend) { p.send(Void()); } } } void operator=( int64_t v ) { set( v ); } NotifiedInt(NotifiedInt&& r) BOOST_NOEXCEPT : waiting(std::move(r.waiting)), val(r.val) {} void operator=(NotifiedInt&& r) BOOST_NOEXCEPT { waiting = std::move(r.waiting); val = r.val; } private: typedef std::pair> Item; struct ItemCompare { bool operator()(const Item& a, const Item& b) { return a.first > b.first; } }; std::priority_queue, ItemCompare> waiting; int64_t val; }; struct BoundedFlowLock : NonCopyable, public ReferenceCounted { // BoundedFlowLock is different from a FlowLock in that it has a bound on how many locks can be taken from the oldest outstanding lock. // For instance, with a FlowLock that has two permits, if one permit is taken but never released, the other permit can be reused an unlimited // amount of times, but with a BoundedFlowLock, it can only be reused a fixed number of times. struct Releaser : NonCopyable { BoundedFlowLock* lock; int64_t permitNumber; Releaser() : lock(nullptr), permitNumber(0) {} Releaser( BoundedFlowLock* lock, int64_t permitNumber ) : lock(lock), permitNumber(permitNumber) {} Releaser(Releaser&& r) BOOST_NOEXCEPT : lock(r.lock), permitNumber(r.permitNumber) { r.permitNumber = 0; } void operator=(Releaser&& r) { if (permitNumber) lock->release(permitNumber); lock = r.lock; permitNumber = r.permitNumber; r.permitNumber = 0; } void release() { if (permitNumber) { lock->release(permitNumber); } permitNumber = 0; } ~Releaser() { if (permitNumber) lock->release(permitNumber); } }; BoundedFlowLock() : unrestrictedPermits(1), boundedPermits(0), nextPermitNumber(0), minOutstanding(0) {} explicit BoundedFlowLock(int64_t unrestrictedPermits, int64_t boundedPermits) : unrestrictedPermits(unrestrictedPermits), boundedPermits(boundedPermits), nextPermitNumber(0), minOutstanding(0) {} Future take() { return takeActor(this); } void release( int64_t permitNumber ) { outstanding.erase(permitNumber); updateMinOutstanding(); } private: IndexedSet outstanding; NotifiedInt minOutstanding; int64_t nextPermitNumber; const int64_t unrestrictedPermits; const int64_t boundedPermits; void updateMinOutstanding() { auto it = outstanding.index(unrestrictedPermits-1); if(it == outstanding.end()) { minOutstanding.set(nextPermitNumber); } else { minOutstanding.set(*it); } } ACTOR static Future takeActor(BoundedFlowLock* lock) { state int64_t permitNumber = ++lock->nextPermitNumber; lock->outstanding.insert(permitNumber, 1); lock->updateMinOutstanding(); wait( lock->minOutstanding.whenAtLeast(std::max(0, permitNumber - lock->boundedPermits)) ); return permitNumber; } }; ACTOR template Future yieldPromiseStream( FutureStream input, PromiseStream output, TaskPriority taskID = TaskPriority::DefaultYield ) { loop { T f = waitNext( input ); output.send( f ); wait( yield( taskID ) ); } } struct YieldedFutureActor : SAV, ActorCallback, FastAllocated { Error in_error_state; typedef ActorCallback CB1; using FastAllocated::operator new; using FastAllocated::operator delete; YieldedFutureActor(Future && f) : SAV(1, 1), in_error_state(Error::fromCode(UNSET_ERROR_CODE)) { f.addYieldedCallbackAndClear(static_cast< ActorCallback< YieldedFutureActor, 1, Void >* >(this)); } void cancel() { if (!SAV::canBeSet()) return; // Cancel could be invoked *by* a callback within finish(). Otherwise it's guaranteed that we are waiting either on the original future or on a delay(). ActorCallback::remove(); SAV::sendErrorAndDelPromiseRef(actor_cancelled()); } virtual void destroy() { delete this; } void a_callback_fire(ActorCallback*, Void) { if (int16_t(in_error_state.code()) == UNSET_ERROR_CODE) { in_error_state = Error::fromCode(SET_ERROR_CODE); if (check_yield()) doYield(); else finish(); } else { // We hit this case when and only when the delay() created by a previous doYield() fires. Then we want to get at least one task done, regardless of what check_yield() would say. finish(); } } void a_callback_error(ActorCallback*, Error const& err) { ASSERT(int16_t(in_error_state.code()) == UNSET_ERROR_CODE); in_error_state = err; if (check_yield()) doYield(); else finish(); } void finish() { ActorCallback::remove(); if (int16_t(in_error_state.code()) == SET_ERROR_CODE) SAV::sendAndDelPromiseRef(Void()); else SAV::sendErrorAndDelPromiseRef(in_error_state); } void doYield() { // Since we are being fired, we are the first callback in the ring, and `prev` is the source future Callback* source = CB1::prev; ASSERT(source->next == static_cast(this)); // Remove the source future from the ring. All the remaining callbacks in the ring should be yielded, since yielded callbacks are installed at the end CB1::prev = source->prev; CB1::prev->next = static_cast(this); // The source future's ring is now empty, since we have removed all the callbacks source->next = source->prev = source; source->unwait(); // Link all the callbacks, including this one, into the ring of a delay future so that after a short time they will be fired again delay(0, g_network->getCurrentTask()).addCallbackChainAndClear(static_cast< CB1* >(this)); } }; inline Future yieldedFuture(Future f) { if (f.isReady()) return yield(); else return Future(new YieldedFutureActor(std::move(f))); } //An AsyncMap that uses a yieldedFuture in its onChange method. template class YieldedAsyncMap : public AsyncMap { public: Future onChange(K const& k) { // throws broken_promise if this is destroyed auto &item = AsyncMap::items[k]; if (item.value == AsyncMap::defaultValue) return destroyOnCancelYield(this, k, item.change.getFuture()); return yieldedFuture(item.change.getFuture()); } ACTOR static Future destroyOnCancelYield( YieldedAsyncMap* self, K key, Future change ) { try { wait(yieldedFuture(change)); return Void(); } catch (Error& e) { if (e.code() == error_code_actor_cancelled && !self->destructing && change.getFutureReferenceCount() == 1 && change.getPromiseReferenceCount() == 1) { if(EXPENSIVE_VALIDATION) { auto& p = self->items[key]; ASSERT(p.change.getFuture() == change); } self->items.erase(key); } throw; } } }; ACTOR template Future delayActionJittered( Future what, double time ) { wait( delayJittered( time ) ); T t = wait( what ); return t; } class AndFuture { public: AndFuture() { } AndFuture(AndFuture const& f) { futures = f.futures; } AndFuture(AndFuture&& f) BOOST_NOEXCEPT { futures = std::move(f.futures); } AndFuture(Future const& f) { futures.push_back(f); } AndFuture(Error const& e) { futures.push_back(e); } operator Future() { return getFuture(); } void operator=(AndFuture const& f) { futures = f.futures; } void operator=(AndFuture&& f) BOOST_NOEXCEPT { futures = std::move(f.futures); } void operator=(Future const& f) { futures.push_back(f); } void operator=(Error const& e) { futures.push_back(e); } Future getFuture() { if(futures.empty()) return Void(); if(futures.size() == 1) return futures[0]; Future f = waitForAll(futures); futures = std::vector>{ f }; return f; } bool isReady() { for( int i = futures.size() - 1; i >= 0; --i ) { if( !futures[i].isReady() ) { return false; } else if(!futures[i].isError()) { swapAndPop(&futures, i); } } return true; } bool isError() { for( int i = 0; i < futures.size(); i++ ) if( futures[i].isError() ) return true; return false; } void cleanup() { for( int i = 0; i < futures.size(); i++ ) { if( futures[i].isReady() && !futures[i].isError() ) { swapAndPop(&futures, i--); } } } void add(Future const& f) { if(!f.isReady() || f.isError()) futures.push_back(f); } void add(AndFuture f) { add(f.getFuture()); } private: std::vector> futures; }; // Performs an unordered merge of a and b. ACTOR template Future unorderedMergeStreams(FutureStream a, FutureStream b, PromiseStream output) { state Future aFuture = waitAndForward(a); state Future bFuture = waitAndForward(b); state bool aOpen = true; state bool bOpen = true; loop{ try { choose { when(T val = wait(aFuture)) { output.send(val); aFuture = waitAndForward(a); } when(T val = wait(bFuture)) { output.send(val); bFuture = waitAndForward(b); } } } catch (Error &e) { if (e.code() != error_code_end_of_stream) { output.sendError(e); break; } ASSERT(!aFuture.isError() || !bFuture.isError() || aFuture.getError().code() == bFuture.getError().code()); if (aFuture.isError()) { aFuture = Never(); aOpen = false; } if (bFuture.isError()) { bFuture = Never(); bOpen = false; } if (!aOpen && !bOpen) { output.sendError(e); break; } } } return Void(); } // Returns the ordered merge of a and b, assuming that a and b are both already ordered (prefer a over b if keys are equal). T must be a class that implements compare() ACTOR template Future orderedMergeStreams( FutureStream a, FutureStream b, PromiseStream output ) { state Optional savedKVa; state bool aOpen; state Optional savedKVb; state bool bOpen; aOpen = bOpen = true; loop { if ( aOpen && !savedKVa.present() ) { try { T KVa = waitNext( a ); savedKVa = Optional( KVa ); } catch ( Error& e ) { if ( e.code() == error_code_end_of_stream ) { aOpen = false; if (!bOpen) { output.sendError(e); } } else { output.sendError(e); break; } } } if ( bOpen && !savedKVb.present() ) { try { T KVb = waitNext( b ); savedKVb = Optional( KVb ); } catch ( Error& e ) { if ( e.code() == error_code_end_of_stream ) { bOpen = false; if (!aOpen) { output.sendError(e); } } else { output.sendError(e); break; } } } if (!aOpen) { output.send( savedKVb.get() ); savedKVb = Optional(); } else if (!bOpen) { output.send( savedKVa.get() ); savedKVa = Optional(); } else { int cmp = savedKVa.get().compare( savedKVb.get() ); if ( cmp == 0 ) { // prefer a output.send( savedKVa.get() ); savedKVa = Optional(); savedKVb = Optional(); } else if ( cmp < 0 ) { output.send( savedKVa.get() ); savedKVa = Optional(); } else { output.send( savedKVb.get() ); savedKVb = Optional(); } } } return Void(); } ACTOR template Future timeReply(Future replyToTime, PromiseStream timeOutput){ state double startTime = now(); try { T _ = wait(replyToTime); wait( delay(0) ); timeOutput.send(now() - startTime); } catch( Error &e ) { // Ignore broken promises. They typically occur during shutdown and our callers don't want to have to create brokenPromiseToNever actors to ignore them. For what it's worth we are breaking timeOutput to pass the pain along. if( e.code() != error_code_broken_promise ) throw; } return Void(); } #include "flow/unactorcompiler.h" #endif