/* * genericactors.actor.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "flow/flow.h" #include "flow/actorcompiler.h" // This must be the last #include. ACTOR Future allTrue( std::vector> all ) { state int i=0; while (i != all.size()) { bool r = wait( all[i] ); if (!r) return false; i++; } return true; } ACTOR Future anyTrue( std::vector>> input, Reference> output ) { loop { bool oneTrue = false; std::vector> changes; for(auto it : input) { if( it->get() ) oneTrue = true; changes.push_back( it->onChange() ); } output->set( oneTrue ); wait( waitForAny(changes) ); } } ACTOR Future cancelOnly( std::vector> futures ) { // We don't do anything with futures except hold them, we never return, but if we are cancelled we (naturally) drop the futures wait( Never() ); return Void(); } ACTOR Future timeoutWarningCollector( FutureStream input, double logDelay, const char* context, UID id ) { state uint64_t counter = 0; state Future end = delay( logDelay ); loop choose { when ( waitNext( input ) ) { counter++; } when ( wait( end ) ) { if( counter ) TraceEvent(SevWarn, context, id).detail("LateProcessCount", counter).detail("LoggingDelay", logDelay); end = delay( logDelay ); counter = 0; } } } ACTOR Future quorumEqualsTrue( std::vector> futures, int required ) { state std::vector< Future > true_futures; state std::vector< Future > false_futures; true_futures.reserve(futures.size()); false_futures.reserve(futures.size()); for(int i=0; i shortCircuitAny( std::vector> f ) { std::vector> sc; sc.reserve(f.size()); for(Future fut : f) { sc.push_back(returnIfTrue(fut)); } choose { when( wait( waitForAll( f ) ) ) { // Handle a possible race condition? If the _last_ term to // be evaluated triggers the waitForAll before bubbling // out of the returnIfTrue quorum for (const auto& fut : f) { if ( fut.get() ) { return true; } } return false; } when( wait( waitForAny( sc ) ) ) { return true; } } } Future orYield( Future f ) { if(f.isReady()) { if(f.isError()) return tagError(yield(), f.getError()); else return yield(); } else return f; } ACTOR Future returnIfTrue( Future f ) { bool b = wait( f ); if ( b ) { return Void(); } wait( Never() ); throw internal_error(); } ACTOR Future lowPriorityDelay( double waitTime ) { state int loopCount = 0; while(loopCount < FLOW_KNOBS->LOW_PRIORITY_DELAY_COUNT) { wait(delay(waitTime/FLOW_KNOBS->LOW_PRIORITY_DELAY_COUNT, TaskPriority::Low)); loopCount++; } return Void(); }