mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-15 10:22:20 +08:00
Merge pull request #3528 from Daniel-B-Smith/declval
s/fake/std::declval/
This commit is contained in:
commit
9a65b1fbd9
@ -1365,12 +1365,12 @@ const char* StartThreadFunc::name = "START_THREAD";
|
|||||||
REGISTER_INSTRUCTION_FUNC(StartThreadFunc);
|
REGISTER_INSTRUCTION_FUNC(StartThreadFunc);
|
||||||
|
|
||||||
ACTOR template <class Function>
|
ACTOR template <class Function>
|
||||||
Future<decltype(fake<Function>()(Reference<ReadTransaction>()).getValue())> read(Reference<Database> db,
|
Future<decltype(std::declval<Function>()(Reference<ReadTransaction>()).getValue())> read(Reference<Database> db,
|
||||||
Function func) {
|
Function func) {
|
||||||
state Reference<ReadTransaction> tr = db->createTransaction();
|
state Reference<ReadTransaction> tr = db->createTransaction();
|
||||||
loop {
|
loop {
|
||||||
try {
|
try {
|
||||||
state decltype(fake<Function>()(Reference<ReadTransaction>()).getValue()) result = wait(func(tr));
|
state decltype(std::declval<Function>()(Reference<ReadTransaction>()).getValue()) result = wait(func(tr));
|
||||||
return result;
|
return result;
|
||||||
} catch (Error& e) {
|
} catch (Error& e) {
|
||||||
wait(tr->onError(e));
|
wait(tr->onError(e));
|
||||||
|
@ -27,6 +27,8 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
#include "flow/IDispatched.h"
|
#include "flow/IDispatched.h"
|
||||||
#include "bindings/flow/fdb_flow.h"
|
#include "bindings/flow/fdb_flow.h"
|
||||||
#include "bindings/flow/IDirectory.h"
|
#include "bindings/flow/IDirectory.h"
|
||||||
@ -57,7 +59,7 @@ struct FlowTesterStack {
|
|||||||
void push(Future<Standalone<StringRef>> value) {
|
void push(Future<Standalone<StringRef>> value) {
|
||||||
data.push_back(StackItem(index, value));
|
data.push_back(StackItem(index, value));
|
||||||
}
|
}
|
||||||
|
|
||||||
void push(Standalone<StringRef> value) {
|
void push(Standalone<StringRef> value) {
|
||||||
push(Future<Standalone<StringRef>>(value));
|
push(Future<Standalone<StringRef>>(value));
|
||||||
}
|
}
|
||||||
@ -86,10 +88,10 @@ struct FlowTesterStack {
|
|||||||
items.push_back(data.back());
|
items.push_back(data.back());
|
||||||
data.pop_back();
|
data.pop_back();
|
||||||
count--;
|
count--;
|
||||||
}
|
}
|
||||||
return items;
|
return items;
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<std::vector<FDB::Tuple>> waitAndPop(int count);
|
Future<std::vector<FDB::Tuple>> waitAndPop(int count);
|
||||||
Future<FDB::Tuple> waitAndPop();
|
Future<FDB::Tuple> waitAndPop();
|
||||||
|
|
||||||
@ -106,7 +108,7 @@ struct FlowTesterStack {
|
|||||||
|
|
||||||
struct InstructionData : public ReferenceCounted<InstructionData> {
|
struct InstructionData : public ReferenceCounted<InstructionData> {
|
||||||
bool isDatabase;
|
bool isDatabase;
|
||||||
bool isSnapshot;
|
bool isSnapshot;
|
||||||
StringRef instruction;
|
StringRef instruction;
|
||||||
Reference<FDB::Transaction> tr;
|
Reference<FDB::Transaction> tr;
|
||||||
|
|
||||||
@ -153,7 +155,7 @@ struct DirectoryOrSubspace {
|
|||||||
return "DirectorySubspace";
|
return "DirectorySubspace";
|
||||||
}
|
}
|
||||||
else if(directory.present()) {
|
else if(directory.present()) {
|
||||||
return "IDirectory";
|
return "IDirectory";
|
||||||
}
|
}
|
||||||
else if(subspace.present()) {
|
else if(subspace.present()) {
|
||||||
return "Subspace";
|
return "Subspace";
|
||||||
@ -169,10 +171,10 @@ struct DirectoryTesterData {
|
|||||||
int directoryListIndex;
|
int directoryListIndex;
|
||||||
int directoryErrorIndex;
|
int directoryErrorIndex;
|
||||||
|
|
||||||
Reference<FDB::IDirectory> directory() {
|
Reference<FDB::IDirectory> directory() {
|
||||||
ASSERT(directoryListIndex < directoryList.size());
|
ASSERT(directoryListIndex < directoryList.size());
|
||||||
ASSERT(directoryList[directoryListIndex].directory.present());
|
ASSERT(directoryList[directoryListIndex].directory.present());
|
||||||
return directoryList[directoryListIndex].directory.get();
|
return directoryList[directoryListIndex].directory.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
FDB::Subspace* subspace() {
|
FDB::Subspace* subspace() {
|
||||||
@ -220,10 +222,10 @@ struct FlowTesterData : public ReferenceCounted<FlowTesterData> {
|
|||||||
std::string tupleToString(FDB::Tuple const& tuple);
|
std::string tupleToString(FDB::Tuple const& tuple);
|
||||||
|
|
||||||
ACTOR template <class F>
|
ACTOR template <class F>
|
||||||
Future<decltype(fake<F>()().getValue())> executeMutation(Reference<InstructionData> instruction, F func) {
|
Future<decltype(std::declval<F>()().getValue())> executeMutation(Reference<InstructionData> instruction, F func) {
|
||||||
loop {
|
loop {
|
||||||
try {
|
try {
|
||||||
state decltype(fake<F>()().getValue()) result = wait(func());
|
state decltype(std::declval<F>()().getValue()) result = wait(func());
|
||||||
if(instruction->isDatabase) {
|
if(instruction->isDatabase) {
|
||||||
wait(instruction->tr->commit());
|
wait(instruction->tr->commit());
|
||||||
}
|
}
|
||||||
|
@ -27,18 +27,21 @@
|
|||||||
#elif !defined(FDBCLIENT_RUNTRANSACTION_ACTOR_H)
|
#elif !defined(FDBCLIENT_RUNTRANSACTION_ACTOR_H)
|
||||||
#define FDBCLIENT_RUNTRANSACTION_ACTOR_H
|
#define FDBCLIENT_RUNTRANSACTION_ACTOR_H
|
||||||
|
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
#include "flow/flow.h"
|
#include "flow/flow.h"
|
||||||
#include "fdbclient/ReadYourWrites.h"
|
#include "fdbclient/ReadYourWrites.h"
|
||||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||||
|
|
||||||
ACTOR template < class Function >
|
ACTOR template <class Function>
|
||||||
Future<decltype(fake<Function>()(Reference<ReadYourWritesTransaction>()).getValue())>
|
Future<decltype(std::declval<Function>()(Reference<ReadYourWritesTransaction>()).getValue())> runRYWTransaction(
|
||||||
runRYWTransaction(Database cx, Function func) {
|
Database cx, Function func) {
|
||||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||||
loop{
|
loop{
|
||||||
try {
|
try {
|
||||||
// func should be idempodent; otherwise, retry will get undefined result
|
// func should be idempodent; otherwise, retry will get undefined result
|
||||||
state decltype( fake<Function>()( Reference<ReadYourWritesTransaction>() ).getValue()) result = wait(func(tr));
|
state decltype(std::declval<Function>()(Reference<ReadYourWritesTransaction>()).getValue()) result =
|
||||||
|
wait(func(tr));
|
||||||
wait(tr->commit());
|
wait(tr->commit());
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
@ -48,13 +51,14 @@ runRYWTransaction(Database cx, Function func) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR template < class Function >
|
ACTOR template <class Function>
|
||||||
Future<decltype(fake<Function>()(Reference<ReadYourWritesTransaction>()).getValue())>
|
Future<decltype(std::declval<Function>()(Reference<ReadYourWritesTransaction>()).getValue())>
|
||||||
runRYWTransactionFailIfLocked(Database cx, Function func) {
|
runRYWTransactionFailIfLocked(Database cx, Function func) {
|
||||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||||
loop{
|
loop{
|
||||||
try {
|
try {
|
||||||
state decltype( fake<Function>()( Reference<ReadYourWritesTransaction>() ).getValue()) result = wait(func(tr));
|
state decltype(std::declval<Function>()(Reference<ReadYourWritesTransaction>()).getValue()) result =
|
||||||
|
wait(func(tr));
|
||||||
wait(tr->commit());
|
wait(tr->commit());
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
@ -66,11 +70,11 @@ runRYWTransactionFailIfLocked(Database cx, Function func) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR template < class Function >
|
ACTOR template <class Function>
|
||||||
Future<decltype(fake<Function>()(Reference<ReadYourWritesTransaction>()).getValue())>
|
Future<decltype(std::declval<Function>()(Reference<ReadYourWritesTransaction>()).getValue())> runRYWTransactionNoRetry(
|
||||||
runRYWTransactionNoRetry(Database cx, Function func) {
|
Database cx, Function func) {
|
||||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||||
state decltype(fake<Function>()(Reference<ReadYourWritesTransaction>()).getValue()) result = wait(func(tr));
|
state decltype(std::declval<Function>()(Reference<ReadYourWritesTransaction>()).getValue()) result = wait(func(tr));
|
||||||
wait(tr->commit());
|
wait(tr->commit());
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
@ -33,6 +33,7 @@
|
|||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#endif
|
#endif
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
// Until we move to C++20, we'll need something to take the place of operator<=>.
|
// Until we move to C++20, we'll need something to take the place of operator<=>.
|
||||||
// This is as good a place as any, I guess.
|
// This is as good a place as any, I guess.
|
||||||
@ -137,7 +138,9 @@ public:
|
|||||||
|
|
||||||
// The following functions have fixed implementations for now:
|
// The following functions have fixed implementations for now:
|
||||||
template <class C>
|
template <class C>
|
||||||
decltype((fake<const C>()[0])) randomChoice( const C& c ) { return c[randomInt(0,(int)c.size())]; }
|
decltype((std::declval<const C>()[0])) randomChoice(const C& c) {
|
||||||
|
return c[randomInt(0, (int)c.size())];
|
||||||
|
}
|
||||||
|
|
||||||
template <class C>
|
template <class C>
|
||||||
void randomShuffle( C& container ) {
|
void randomShuffle( C& container ) {
|
||||||
@ -158,13 +161,13 @@ extern FILE* randLog;
|
|||||||
// Sets the seed for the deterministic random number generator on the current thread
|
// Sets the seed for the deterministic random number generator on the current thread
|
||||||
void setThreadLocalDeterministicRandomSeed(uint32_t seed);
|
void setThreadLocalDeterministicRandomSeed(uint32_t seed);
|
||||||
|
|
||||||
// Returns the random number generator that can be seeded. This generator should only
|
// Returns the random number generator that can be seeded. This generator should only
|
||||||
// be used in contexts where the choice to call it is deterministic.
|
// be used in contexts where the choice to call it is deterministic.
|
||||||
//
|
//
|
||||||
// This generator is only deterministic if given a seed using setThreadLocalDeterministicRandomSeed
|
// This generator is only deterministic if given a seed using setThreadLocalDeterministicRandomSeed
|
||||||
Reference<IRandom> deterministicRandom();
|
Reference<IRandom> deterministicRandom();
|
||||||
|
|
||||||
// A random number generator that cannot be manually seeded and may be called in
|
// A random number generator that cannot be manually seeded and may be called in
|
||||||
// non-deterministic contexts.
|
// non-deterministic contexts.
|
||||||
Reference<IRandom> nondeterministicRandom();
|
Reference<IRandom> nondeterministicRandom();
|
||||||
|
|
||||||
|
@ -135,9 +135,6 @@ do { \
|
|||||||
#include <functional>
|
#include <functional>
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// fake<T>() is for use in decltype expressions only - there is no implementation
|
|
||||||
template <class T> T fake();
|
|
||||||
|
|
||||||
// g++ requires that non-dependent names have to be looked up at
|
// g++ requires that non-dependent names have to be looked up at
|
||||||
// template definition, which makes circular dependencies a royal
|
// template definition, which makes circular dependencies a royal
|
||||||
// pain. (For whatever it's worth, g++ appears to be adhering to spec
|
// pain. (For whatever it's worth, g++ appears to be adhering to spec
|
||||||
|
@ -28,6 +28,8 @@
|
|||||||
#elif !defined(FLOW_THREADHELPER_ACTOR_H)
|
#elif !defined(FLOW_THREADHELPER_ACTOR_H)
|
||||||
#define FLOW_THREADHELPER_ACTOR_H
|
#define FLOW_THREADHELPER_ACTOR_H
|
||||||
|
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
#include "flow/flow.h"
|
#include "flow/flow.h"
|
||||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||||
|
|
||||||
@ -163,12 +165,12 @@ public:
|
|||||||
Error error;
|
Error error;
|
||||||
ThreadCallback *callback;
|
ThreadCallback *callback;
|
||||||
|
|
||||||
bool isReady() {
|
bool isReady() {
|
||||||
ThreadSpinLockHolder holder(mutex);
|
ThreadSpinLockHolder holder(mutex);
|
||||||
return isReadyUnsafe();
|
return isReadyUnsafe();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isError() {
|
bool isError() {
|
||||||
ThreadSpinLockHolder holder(mutex);
|
ThreadSpinLockHolder holder(mutex);
|
||||||
return isErrorUnsafe();
|
return isErrorUnsafe();
|
||||||
}
|
}
|
||||||
@ -180,7 +182,7 @@ public:
|
|||||||
return error.code();
|
return error.code();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool canBeSet() {
|
bool canBeSet() {
|
||||||
ThreadSpinLockHolder holder(mutex);
|
ThreadSpinLockHolder holder(mutex);
|
||||||
return canBeSetUnsafe();
|
return canBeSetUnsafe();
|
||||||
}
|
}
|
||||||
@ -203,8 +205,8 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
ThreadSingleAssignmentVarBase() : status(Unset), callback(NULL), valueReferenceCount(0) {} //, referenceCount(1) {}
|
ThreadSingleAssignmentVarBase() : status(Unset), callback(NULL), valueReferenceCount(0) {} //, referenceCount(1) {}
|
||||||
~ThreadSingleAssignmentVarBase() {
|
~ThreadSingleAssignmentVarBase() {
|
||||||
this->mutex.assertNotEntered();
|
this->mutex.assertNotEntered();
|
||||||
|
|
||||||
if(callback)
|
if(callback)
|
||||||
callback->destroy();
|
callback->destroy();
|
||||||
@ -229,7 +231,7 @@ public:
|
|||||||
ASSERT(false); // Promise fulfilled twice
|
ASSERT(false); // Promise fulfilled twice
|
||||||
}
|
}
|
||||||
error = err;
|
error = err;
|
||||||
status = ErrorSet;
|
status = ErrorSet;
|
||||||
if (!callback) {
|
if (!callback) {
|
||||||
this->mutex.leave();
|
this->mutex.leave();
|
||||||
return;
|
return;
|
||||||
@ -461,11 +463,7 @@ public:
|
|||||||
ThreadFuture(ThreadFuture<T>&& rhs) BOOST_NOEXCEPT : sav(rhs.sav) {
|
ThreadFuture(ThreadFuture<T>&& rhs) BOOST_NOEXCEPT : sav(rhs.sav) {
|
||||||
rhs.sav = 0;
|
rhs.sav = 0;
|
||||||
}
|
}
|
||||||
ThreadFuture( const T& presentValue )
|
ThreadFuture(const T& presentValue) : sav(new ThreadSingleAssignmentVar<T>()) { sav->send(presentValue); }
|
||||||
: sav(new ThreadSingleAssignmentVar<T>())
|
|
||||||
{
|
|
||||||
sav->send(presentValue);
|
|
||||||
}
|
|
||||||
ThreadFuture( Never )
|
ThreadFuture( Never )
|
||||||
: sav(new ThreadSingleAssignmentVar<T>())
|
: sav(new ThreadSingleAssignmentVar<T>())
|
||||||
{
|
{
|
||||||
@ -575,14 +573,16 @@ ACTOR template <class F> void doOnMainThreadVoid( Future<Void> signal, F f, Erro
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
template <class F> ThreadFuture< decltype(fake<F>()().getValue()) > onMainThread( F f ) {
|
template <class F>
|
||||||
|
ThreadFuture<decltype(std::declval<F>()().getValue())> onMainThread(F f) {
|
||||||
Promise<Void> signal;
|
Promise<Void> signal;
|
||||||
auto returnValue = new ThreadSingleAssignmentVar< decltype(fake<F>()().getValue()) >();
|
auto returnValue = new ThreadSingleAssignmentVar<decltype(std::declval<F>()().getValue())>();
|
||||||
returnValue->addref(); // For the ThreadFuture we return
|
returnValue->addref(); // For the ThreadFuture we return
|
||||||
Future<Void> cancelFuture = doOnMainThread<decltype(fake<F>()().getValue()), F>( signal.getFuture(), f, returnValue );
|
Future<Void> cancelFuture =
|
||||||
|
doOnMainThread<decltype(std::declval<F>()().getValue()), F>(signal.getFuture(), f, returnValue);
|
||||||
returnValue->setCancel( std::move(cancelFuture) );
|
returnValue->setCancel( std::move(cancelFuture) );
|
||||||
g_network->onMainThread( std::move(signal), TaskPriority::DefaultOnMainThread );
|
g_network->onMainThread( std::move(signal), TaskPriority::DefaultOnMainThread );
|
||||||
return ThreadFuture<decltype(fake<F>()().getValue())>( returnValue );
|
return ThreadFuture<decltype(std::declval<F>()().getValue())>(returnValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <class V>
|
template <class V>
|
||||||
|
@ -713,7 +713,7 @@ namespace actorcompiler
|
|||||||
}
|
}
|
||||||
|
|
||||||
var iter = getIteratorName(cx);
|
var iter = getIteratorName(cx);
|
||||||
state.Add(new StateVar { SourceLine = stmt.FirstSourceLine, name = iter, type = "decltype(std::begin(fake<" + container.type + ">()))", initializer = null });
|
state.Add(new StateVar { SourceLine = stmt.FirstSourceLine, name = iter, type = "decltype(std::begin(std::declval<" + container.type + ">()))", initializer = null });
|
||||||
var equivalent = new ForStatement {
|
var equivalent = new ForStatement {
|
||||||
initExpression = iter + " = std::begin(" + stmt.rangeExpression + ")",
|
initExpression = iter + " = std::begin(" + stmt.rangeExpression + ")",
|
||||||
condExpression = iter + " != std::end(" + stmt.rangeExpression + ")",
|
condExpression = iter + " != std::end(" + stmt.rangeExpression + ")",
|
||||||
|
12
flow/flow.h
12
flow/flow.h
@ -882,20 +882,20 @@ private:
|
|||||||
};
|
};
|
||||||
|
|
||||||
template <class Request>
|
template <class Request>
|
||||||
decltype(fake<Request>().reply) const& getReplyPromise(Request const& r) { return r.reply; }
|
decltype(std::declval<Request>().reply) const& getReplyPromise(Request const& r) {
|
||||||
|
return r.reply;
|
||||||
|
}
|
||||||
|
|
||||||
// Neither of these implementations of REPLY_TYPE() works on both MSVC and g++, so...
|
// Neither of these implementations of REPLY_TYPE() works on both MSVC and g++, so...
|
||||||
#ifdef __GNUG__
|
#ifdef __GNUG__
|
||||||
#define REPLY_TYPE(RequestType) decltype( getReplyPromise( fake<RequestType>() ).getFuture().getValue() )
|
#define REPLY_TYPE(RequestType) decltype(getReplyPromise(std::declval<RequestType>()).getFuture().getValue())
|
||||||
//#define REPLY_TYPE(RequestType) decltype( getReplyFuture( fake<RequestType>() ).getValue() )
|
//#define REPLY_TYPE(RequestType) decltype( getReplyFuture( std::declval<RequestType>() ).getValue() )
|
||||||
#else
|
#else
|
||||||
template <class T>
|
template <class T>
|
||||||
struct ReplyType {
|
struct ReplyType {
|
||||||
// Doing this calculation directly in the return value declaration for PromiseStream<T>::getReply()
|
// Doing this calculation directly in the return value declaration for PromiseStream<T>::getReply()
|
||||||
// breaks IntelliSense in VS2010; this is a workaround.
|
// breaks IntelliSense in VS2010; this is a workaround.
|
||||||
typedef decltype(fake<T>().reply.getFuture().getValue()) Type;
|
typedef decltype(std::declval<T>().reply.getFuture().getValue()) Type;
|
||||||
};
|
};
|
||||||
template <class T> class ReplyPromise;
|
template <class T> class ReplyPromise;
|
||||||
template <class T>
|
template <class T>
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
#define GENERICACTORS_ACTOR_H
|
#define GENERICACTORS_ACTOR_H
|
||||||
|
|
||||||
#include <list>
|
#include <list>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
#include "flow/flow.h"
|
#include "flow/flow.h"
|
||||||
#include "flow/Knobs.h"
|
#include "flow/Knobs.h"
|
||||||
@ -299,9 +300,8 @@ Future<Void> storeOrThrow(T &out, Future<Optional<T>> what, Error e = key_not_fo
|
|||||||
}
|
}
|
||||||
|
|
||||||
//Waits for a future to be ready, and then applies an asynchronous function to it.
|
//Waits for a future to be ready, and then applies an asynchronous function to it.
|
||||||
ACTOR template<class T, class F, class U = decltype( fake<F>()(fake<T>()).getValue() )>
|
ACTOR template <class T, class F, class U = decltype(std::declval<F>()(std::declval<T>()).getValue())>
|
||||||
Future<U> mapAsync(Future<T> what, F actorFunc)
|
Future<U> mapAsync(Future<T> what, F actorFunc) {
|
||||||
{
|
|
||||||
T val = wait(what);
|
T val = wait(what);
|
||||||
U ret = wait(actorFunc(val));
|
U ret = wait(actorFunc(val));
|
||||||
return ret;
|
return ret;
|
||||||
@ -318,8 +318,8 @@ std::vector<Future<std::invoke_result_t<F, T>>> mapAsync(std::vector<Future<T>>
|
|||||||
}
|
}
|
||||||
|
|
||||||
//maps a stream with an asynchronous function
|
//maps a stream with an asynchronous function
|
||||||
ACTOR template<class T, class F, class U = decltype( fake<F>()(fake<T>()).getValue() )>
|
ACTOR template <class T, class F, class U = decltype(std::declval<F>()(std::declval<T>()).getValue())>
|
||||||
Future<Void> mapAsync( FutureStream<T> input, F actorFunc, PromiseStream<U> output ) {
|
Future<Void> mapAsync(FutureStream<T> input, F actorFunc, PromiseStream<U> output) {
|
||||||
state Deque<Future<U>> futures;
|
state Deque<Future<U>> futures;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
@ -861,7 +861,7 @@ Future<T> ioTimeoutError( Future<T> what, double time ) {
|
|||||||
Future<Void> end = lowPriorityDelay( time );
|
Future<Void> end = lowPriorityDelay( time );
|
||||||
choose {
|
choose {
|
||||||
when( T t = wait( what ) ) { return t; }
|
when( T t = wait( what ) ) { return t; }
|
||||||
when( wait( end ) ) {
|
when(wait(end)) {
|
||||||
Error err = io_timeout();
|
Error err = io_timeout();
|
||||||
if(g_network->isSimulated()) {
|
if(g_network->isSimulated()) {
|
||||||
err = err.asInjectedFault();
|
err = err.asInjectedFault();
|
||||||
@ -1364,8 +1364,7 @@ struct NotifiedInt {
|
|||||||
NotifiedInt( int64_t val = 0 ) : val(val) {}
|
NotifiedInt( int64_t val = 0 ) : val(val) {}
|
||||||
|
|
||||||
Future<Void> whenAtLeast( int64_t limit ) {
|
Future<Void> whenAtLeast( int64_t limit ) {
|
||||||
if (val >= limit)
|
if (val >= limit) return Void();
|
||||||
return Void();
|
|
||||||
Promise<Void> p;
|
Promise<Void> p;
|
||||||
waiting.push( std::make_pair(limit,p) );
|
waiting.push( std::make_pair(limit,p) );
|
||||||
return p.getFuture();
|
return p.getFuture();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user