diff --git a/fdbclient/GenericManagementAPI.actor.h b/fdbclient/GenericManagementAPI.actor.h index 63611ee2cf..36309ce73d 100644 --- a/fdbclient/GenericManagementAPI.actor.h +++ b/fdbclient/GenericManagementAPI.actor.h @@ -125,6 +125,21 @@ bool isCompleteConfiguration(std::map<std::string, std::string> const& options); ConfigureAutoResult parseConfig(StatusObject const& status); +template <typename Transaction, class T> +struct transaction_future_type { + using type = typename Transaction::template FutureT<T>; +}; + +template <typename Transaction, class T> +struct transaction_future_type<Transaction*, T> { + using type = typename transaction_future_type<Transaction, T>::type; +}; + +template <typename Transaction, class T> +struct transaction_future_type<Reference<Transaction>, T> { + using type = typename transaction_future_type<Transaction, T>::type; +}; + // Management API written in template code to support both IClientAPI and NativeAPI namespace ManagementAPI { @@ -636,7 +651,8 @@ Future<Optional<TenantMapEntry>> tryGetTenantTransaction(Transaction tr, TenantN tr->setOption(FDBTransactionOptions::RAW_ACCESS); tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE); - Optional<Value> val = wait(safeThreadFutureToFuture(tr->get(tenantMapKey))); + state typename transaction_future_type<Transaction, Optional<Value>>::type tenantFuture = tr->get(tenantMapKey); + Optional<Value> val = wait(safeThreadFutureToFuture(tenantFuture)); return val.map<TenantMapEntry>([](Optional<Value> v) { return decodeTenantEntry(v.get()); }); } @@ -688,10 +704,13 @@ Future<Optional<TenantMapEntry>> createTenantTransaction(Transaction tr, TenantN tr->setOption(FDBTransactionOptions::LOCK_AWARE); state Future<Optional<TenantMapEntry>> tenantEntryFuture = tryGetTenantTransaction(tr, name); - state Future<Optional<Value>> tenantDataPrefixFuture = safeThreadFutureToFuture(tr->get(tenantDataPrefixKey)); - state Future<Optional<Value>> lastIdFuture = safeThreadFutureToFuture(tr->get(tenantLastIdKey)); + state typename transaction_future_type<Transaction, Optional<Value>>::type tenantDataPrefixFuture = + tr->get(tenantDataPrefixKey); + state typename transaction_future_type<Transaction, Optional<Value>>::type lastIdFuture = tr->get(tenantLastIdKey); + state typename transaction_future_type<Transaction, Optional<Value>>::type tenantModeFuture = + tr->get(configKeysPrefix.withSuffix("tenant_mode"_sr)); - Optional<Value> tenantMode = wait(safeThreadFutureToFuture(tr->get(configKeysPrefix.withSuffix("tenant_mode"_sr)))); + Optional<Value> tenantMode = wait(safeThreadFutureToFuture(tenantModeFuture)); if (!tenantMode.present() || tenantMode.get() == StringRef(format("%d", TenantMode::DISABLED))) { throw tenants_disabled(); @@ -702,13 +721,15 @@ Future<Optional<TenantMapEntry>> createTenantTransaction(Transaction tr, TenantN return Optional<TenantMapEntry>(); } - state Optional<Value> lastIdVal = wait(lastIdFuture); - Optional<Value> tenantDataPrefix = wait(tenantDataPrefixFuture); + state Optional<Value> lastIdVal = wait(safeThreadFutureToFuture(lastIdFuture)); + Optional<Value> tenantDataPrefix = wait(safeThreadFutureToFuture(tenantDataPrefixFuture)); state TenantMapEntry newTenant(lastIdVal.present() ? TenantMapEntry::prefixToId(lastIdVal.get()) + 1 : 0, tenantDataPrefix.present() ? (KeyRef)tenantDataPrefix.get() : ""_sr); - RangeResult contents = wait(safeThreadFutureToFuture(tr->getRange(prefixRange(newTenant.prefix), 1))); + state typename transaction_future_type<Transaction, RangeResult>::type prefixRangeFuture = + tr->getRange(prefixRange(newTenant.prefix), 1); + RangeResult contents = wait(safeThreadFutureToFuture(prefixRangeFuture)); if (!contents.empty()) { throw tenant_prefix_allocator_conflict(); } @@ -774,7 +795,9 @@ Future<Void> deleteTenantTransaction(Transaction tr, TenantNameRef name) { return Void(); } - RangeResult contents = wait(safeThreadFutureToFuture(tr->getRange(prefixRange(tenantEntry.get().prefix), 1))); + state typename transaction_future_type<Transaction, RangeResult>::type prefixRangeFuture = + tr->getRange(prefixRange(tenantEntry.get().prefix), 1); + RangeResult contents = wait(safeThreadFutureToFuture(prefixRangeFuture)); if (!contents.empty()) { throw tenant_not_empty(); } @@ -832,8 +855,9 @@ Future<std::map<TenantName, TenantMapEntry>> listTenantsTransaction(Transaction tr->setOption(FDBTransactionOptions::RAW_ACCESS); tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE); - RangeResult results = wait(safeThreadFutureToFuture( - tr->getRange(firstGreaterOrEqual(range.begin), firstGreaterOrEqual(range.end), limit))); + state typename transaction_future_type<Transaction, RangeResult>::type listFuture = + tr->getRange(firstGreaterOrEqual(range.begin), firstGreaterOrEqual(range.end), limit); + RangeResult results = wait(safeThreadFutureToFuture(listFuture)); std::map<TenantName, TenantMapEntry> tenants; for (auto kv : results) { diff --git a/fdbclient/HighContentionPrefixAllocator.actor.h b/fdbclient/HighContentionPrefixAllocator.actor.h index 814e653e5a..57e185cd78 100644 --- a/fdbclient/HighContentionPrefixAllocator.actor.h +++ b/fdbclient/HighContentionPrefixAllocator.actor.h @@ -65,8 +65,9 @@ private: state int64_t window = 0; loop { - RangeResult range = - wait(safeThreadFutureToFuture(tr->getRange(self->counters.range(), 1, Snapshot::True, Reverse::True))); + state typename TransactionT::template FutureT<RangeResult> rangeFuture = + tr->getRange(self->counters.range(), 1, Snapshot::True, Reverse::True); + RangeResult range = wait(safeThreadFutureToFuture(rangeFuture)); if (range.size() > 0) { start = self->counters.unpack(range[0].key).getInt(0); @@ -83,11 +84,12 @@ private: int64_t inc = 1; tr->atomicOp(self->counters.get(start).key(), StringRef((uint8_t*)&inc, 8), MutationRef::AddValue); - Future<Optional<Value>> countFuture = - safeThreadFutureToFuture(tr->get(self->counters.get(start).key(), Snapshot::True)); + + state typename TransactionT::template FutureT<Optional<Value>> countFuture = + tr->get(self->counters.get(start).key(), Snapshot::True); // } - Optional<Value> countValue = wait(countFuture); + Optional<Value> countValue = wait(safeThreadFutureToFuture(countFuture)); int64_t count = 0; if (countValue.present()) { @@ -110,15 +112,17 @@ private: state int64_t candidate = deterministicRandom()->randomInt(start, start + window); // if thread safety is needed, this should be locked { - state Future<RangeResult> latestCounterFuture = - safeThreadFutureToFuture(tr->getRange(self->counters.range(), 1, Snapshot::True, Reverse::True)); - state Future<Optional<Value>> candidateValueFuture = - safeThreadFutureToFuture(tr->get(self->recent.get(candidate).key())); + state typename TransactionT::template FutureT<RangeResult> latestCounterFuture = + tr->getRange(self->counters.range(), 1, Snapshot::True, Reverse::True); + state typename TransactionT::template FutureT<Optional<Value>> candidateValueFuture = + tr->get(self->recent.get(candidate).key()); tr->setOption(FDBTransactionOptions::NEXT_WRITE_NO_WRITE_CONFLICT_RANGE); tr->set(self->recent.get(candidate).key(), ValueRef()); // } - wait(success(latestCounterFuture) && success(candidateValueFuture)); + wait(success(safeThreadFutureToFuture(latestCounterFuture)) && + success(safeThreadFutureToFuture(candidateValueFuture))); + int64_t currentWindowStart = 0; if (latestCounterFuture.get().size() > 0) { currentWindowStart = self->counters.unpack(latestCounterFuture.get()[0].key).getInt(0); diff --git a/fdbclient/NativeAPI.actor.h b/fdbclient/NativeAPI.actor.h index bb6999b7c3..4752a05443 100644 --- a/fdbclient/NativeAPI.actor.h +++ b/fdbclient/NativeAPI.actor.h @@ -459,6 +459,10 @@ public: std::vector<Reference<Watch>> watches; Span span; + // used in template functions as returned Future type + template <typename Type> + using FutureT = Future<Type>; + private: Future<Version> getReadVersion(uint32_t flags); diff --git a/flow/ThreadHelper.actor.h b/flow/ThreadHelper.actor.h index 6627b9e25e..9430b98280 100644 --- a/flow/ThreadHelper.actor.h +++ b/flow/ThreadHelper.actor.h @@ -614,7 +614,7 @@ private: // If instead, this actor is cancelled, we will also cancel the underlying "threadFuture" // Note: we are required to have unique ownership of the "threadFuture" ACTOR template <class T> -Future<T> safeThreadFutureToFuture(ThreadFuture<T> threadFuture) { +Future<T> safeThreadFutureToFutureImpl(ThreadFuture<T> threadFuture) { Promise<Void> ready; Future<Void> onReady = ready.getFuture(); UtilCallback<T>* callback = new UtilCallback<T>(threadFuture, ready.extractRawPointer()); @@ -635,10 +635,45 @@ Future<T> safeThreadFutureToFuture(ThreadFuture<T> threadFuture) { return threadFuture.get(); } -// do nothing, just for template functions' calls +// The allow anonymous_future type is used to prevent misuse of ThreadFutures. +// For Standalone types, the memory in some cases is actually stored in the ThreadFuture object, +// in which case we expect the caller to keep that ThreadFuture around until the result is no +// longer needed. +// +// We can provide some compile-time detection of this misuse by disallowing anonymous thread futures +// being passed in for certain types. +template <typename T> +struct allow_anonymous_future : std::true_type {}; + +template <typename T> +struct allow_anonymous_future<Standalone<T>> : std::false_type {}; + +template <typename T> +struct allow_anonymous_future<Optional<Standalone<T>>> : std::false_type {}; + template <class T> -Future<T> safeThreadFutureToFuture(Future<T> future) { - // do nothing +typename std::enable_if<allow_anonymous_future<T>::value, Future<T>>::type safeThreadFutureToFuture( + const ThreadFuture<T>& threadFuture) { + return safeThreadFutureToFutureImpl(threadFuture); +} + +template <class T> +typename std::enable_if<!allow_anonymous_future<T>::value, Future<T>>::type safeThreadFutureToFuture( + ThreadFuture<T>& threadFuture) { + return safeThreadFutureToFutureImpl(threadFuture); +} + +template <class T> +typename std::enable_if<allow_anonymous_future<T>::value, Future<T>>::type safeThreadFutureToFuture( + const Future<T>& future) { + // Do nothing + return future; +} + +template <class T> +typename std::enable_if<!allow_anonymous_future<T>::value, Future<T>>::type safeThreadFutureToFuture( + Future<T>& future) { + // Do nothing return future; }