/* * ActorCollection.actor.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "flow/ActorCollection.h" #include "flow/IndexedSet.h" #include "flow/UnitTest.h" #include "flow/actorcompiler.h" // This must be the last #include. ACTOR Future actorCollection( FutureStream> addActor, int* pCount, double *lastChangeTime, double *idleTime, double *allTime, bool returnWhenEmptied ) { state int64_t nextTag = 0; state Map> tag_streamHelper; state PromiseStream complete; state PromiseStream errors; state int count = 0; if (!pCount) pCount = &count; loop choose { when (Future f = waitNext(addActor)) { int64_t t = nextTag++; tag_streamHelper[t] = streamHelper( complete, errors, tag(f, t) ); ++*pCount; if( *pCount == 1 && lastChangeTime && idleTime && allTime) { double currentTime = now(); *idleTime += currentTime - *lastChangeTime; *allTime += currentTime - *lastChangeTime; *lastChangeTime = currentTime; } } when (int64_t t = waitNext(complete.getFuture())) { if (!--*pCount) { if( lastChangeTime && idleTime && allTime) { double currentTime = now(); *allTime += currentTime - *lastChangeTime; *lastChangeTime = currentTime; } if (returnWhenEmptied) return Void(); } tag_streamHelper.erase(t); } when (Error e = waitNext(errors.getFuture())) { throw e; } } } template struct Traceable> { static constexpr bool value = Traceable::value && Traceable::value; static std::string toString(const std::pair& p) { auto tStr = Traceable::toString(p.first); auto uStr = Traceable::toString(p.second); std::string result(tStr.size() + uStr.size() + 3, 'x'); std::copy(tStr.begin(), tStr.end(), result.begin()); auto iter = result.begin() + tStr.size(); *(iter++) = ' '; *(iter++) = '-'; *(iter++) = ' '; std::copy(uStr.begin(), uStr.end(), iter); return result; } };