diff --git a/fdbclient/GenericManagementAPI.actor.h b/fdbclient/GenericManagementAPI.actor.h index 63611ee2cf..36309ce73d 100644 --- a/fdbclient/GenericManagementAPI.actor.h +++ b/fdbclient/GenericManagementAPI.actor.h @@ -125,6 +125,21 @@ bool isCompleteConfiguration(std::map const& options); ConfigureAutoResult parseConfig(StatusObject const& status); +template +struct transaction_future_type { + using type = typename Transaction::template FutureT; +}; + +template +struct transaction_future_type { + using type = typename transaction_future_type::type; +}; + +template +struct transaction_future_type, T> { + using type = typename transaction_future_type::type; +}; + // Management API written in template code to support both IClientAPI and NativeAPI namespace ManagementAPI { @@ -636,7 +651,8 @@ Future> tryGetTenantTransaction(Transaction tr, TenantN tr->setOption(FDBTransactionOptions::RAW_ACCESS); tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE); - Optional val = wait(safeThreadFutureToFuture(tr->get(tenantMapKey))); + state typename transaction_future_type>::type tenantFuture = tr->get(tenantMapKey); + Optional val = wait(safeThreadFutureToFuture(tenantFuture)); return val.map([](Optional v) { return decodeTenantEntry(v.get()); }); } @@ -688,10 +704,13 @@ Future> createTenantTransaction(Transaction tr, TenantN tr->setOption(FDBTransactionOptions::LOCK_AWARE); state Future> tenantEntryFuture = tryGetTenantTransaction(tr, name); - state Future> tenantDataPrefixFuture = safeThreadFutureToFuture(tr->get(tenantDataPrefixKey)); - state Future> lastIdFuture = safeThreadFutureToFuture(tr->get(tenantLastIdKey)); + state typename transaction_future_type>::type tenantDataPrefixFuture = + tr->get(tenantDataPrefixKey); + state typename transaction_future_type>::type lastIdFuture = tr->get(tenantLastIdKey); + state typename transaction_future_type>::type tenantModeFuture = + tr->get(configKeysPrefix.withSuffix("tenant_mode"_sr)); - Optional tenantMode = wait(safeThreadFutureToFuture(tr->get(configKeysPrefix.withSuffix("tenant_mode"_sr)))); + Optional tenantMode = wait(safeThreadFutureToFuture(tenantModeFuture)); if (!tenantMode.present() || tenantMode.get() == StringRef(format("%d", TenantMode::DISABLED))) { throw tenants_disabled(); @@ -702,13 +721,15 @@ Future> createTenantTransaction(Transaction tr, TenantN return Optional(); } - state Optional lastIdVal = wait(lastIdFuture); - Optional tenantDataPrefix = wait(tenantDataPrefixFuture); + state Optional lastIdVal = wait(safeThreadFutureToFuture(lastIdFuture)); + Optional tenantDataPrefix = wait(safeThreadFutureToFuture(tenantDataPrefixFuture)); state TenantMapEntry newTenant(lastIdVal.present() ? TenantMapEntry::prefixToId(lastIdVal.get()) + 1 : 0, tenantDataPrefix.present() ? (KeyRef)tenantDataPrefix.get() : ""_sr); - RangeResult contents = wait(safeThreadFutureToFuture(tr->getRange(prefixRange(newTenant.prefix), 1))); + state typename transaction_future_type::type prefixRangeFuture = + tr->getRange(prefixRange(newTenant.prefix), 1); + RangeResult contents = wait(safeThreadFutureToFuture(prefixRangeFuture)); if (!contents.empty()) { throw tenant_prefix_allocator_conflict(); } @@ -774,7 +795,9 @@ Future deleteTenantTransaction(Transaction tr, TenantNameRef name) { return Void(); } - RangeResult contents = wait(safeThreadFutureToFuture(tr->getRange(prefixRange(tenantEntry.get().prefix), 1))); + state typename transaction_future_type::type prefixRangeFuture = + tr->getRange(prefixRange(tenantEntry.get().prefix), 1); + RangeResult contents = wait(safeThreadFutureToFuture(prefixRangeFuture)); if (!contents.empty()) { throw tenant_not_empty(); } @@ -832,8 +855,9 @@ Future> listTenantsTransaction(Transaction tr->setOption(FDBTransactionOptions::RAW_ACCESS); tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE); - RangeResult results = wait(safeThreadFutureToFuture( - tr->getRange(firstGreaterOrEqual(range.begin), firstGreaterOrEqual(range.end), limit))); + state typename transaction_future_type::type listFuture = + tr->getRange(firstGreaterOrEqual(range.begin), firstGreaterOrEqual(range.end), limit); + RangeResult results = wait(safeThreadFutureToFuture(listFuture)); std::map tenants; for (auto kv : results) { diff --git a/fdbclient/HighContentionPrefixAllocator.actor.h b/fdbclient/HighContentionPrefixAllocator.actor.h index 814e653e5a..57e185cd78 100644 --- a/fdbclient/HighContentionPrefixAllocator.actor.h +++ b/fdbclient/HighContentionPrefixAllocator.actor.h @@ -65,8 +65,9 @@ private: state int64_t window = 0; loop { - RangeResult range = - wait(safeThreadFutureToFuture(tr->getRange(self->counters.range(), 1, Snapshot::True, Reverse::True))); + state typename TransactionT::template FutureT rangeFuture = + tr->getRange(self->counters.range(), 1, Snapshot::True, Reverse::True); + RangeResult range = wait(safeThreadFutureToFuture(rangeFuture)); if (range.size() > 0) { start = self->counters.unpack(range[0].key).getInt(0); @@ -83,11 +84,12 @@ private: int64_t inc = 1; tr->atomicOp(self->counters.get(start).key(), StringRef((uint8_t*)&inc, 8), MutationRef::AddValue); - Future> countFuture = - safeThreadFutureToFuture(tr->get(self->counters.get(start).key(), Snapshot::True)); + + state typename TransactionT::template FutureT> countFuture = + tr->get(self->counters.get(start).key(), Snapshot::True); // } - Optional countValue = wait(countFuture); + Optional countValue = wait(safeThreadFutureToFuture(countFuture)); int64_t count = 0; if (countValue.present()) { @@ -110,15 +112,17 @@ private: state int64_t candidate = deterministicRandom()->randomInt(start, start + window); // if thread safety is needed, this should be locked { - state Future latestCounterFuture = - safeThreadFutureToFuture(tr->getRange(self->counters.range(), 1, Snapshot::True, Reverse::True)); - state Future> candidateValueFuture = - safeThreadFutureToFuture(tr->get(self->recent.get(candidate).key())); + state typename TransactionT::template FutureT latestCounterFuture = + tr->getRange(self->counters.range(), 1, Snapshot::True, Reverse::True); + state typename TransactionT::template FutureT> candidateValueFuture = + tr->get(self->recent.get(candidate).key()); tr->setOption(FDBTransactionOptions::NEXT_WRITE_NO_WRITE_CONFLICT_RANGE); tr->set(self->recent.get(candidate).key(), ValueRef()); // } - wait(success(latestCounterFuture) && success(candidateValueFuture)); + wait(success(safeThreadFutureToFuture(latestCounterFuture)) && + success(safeThreadFutureToFuture(candidateValueFuture))); + int64_t currentWindowStart = 0; if (latestCounterFuture.get().size() > 0) { currentWindowStart = self->counters.unpack(latestCounterFuture.get()[0].key).getInt(0); diff --git a/fdbclient/NativeAPI.actor.h b/fdbclient/NativeAPI.actor.h index bb6999b7c3..4752a05443 100644 --- a/fdbclient/NativeAPI.actor.h +++ b/fdbclient/NativeAPI.actor.h @@ -459,6 +459,10 @@ public: std::vector> watches; Span span; + // used in template functions as returned Future type + template + using FutureT = Future; + private: Future getReadVersion(uint32_t flags); diff --git a/flow/ThreadHelper.actor.h b/flow/ThreadHelper.actor.h index 556fff89e4..639de040b6 100644 --- a/flow/ThreadHelper.actor.h +++ b/flow/ThreadHelper.actor.h @@ -689,7 +689,7 @@ private: // If instead, this actor is cancelled, we will also cancel the underlying "threadFuture" // Note: we are required to have unique ownership of the "threadFuture" ACTOR template -Future safeThreadFutureToFuture(ThreadFuture threadFuture) { +Future safeThreadFutureToFutureImpl(ThreadFuture threadFuture) { Promise ready; Future onReady = ready.getFuture(); UtilCallback* callback = new UtilCallback(threadFuture, ready.extractRawPointer()); @@ -710,10 +710,45 @@ Future safeThreadFutureToFuture(ThreadFuture threadFuture) { return threadFuture.get(); } -// do nothing, just for template functions' calls +// The allow anonymous_future type is used to prevent misuse of ThreadFutures. +// For Standalone types, the memory in some cases is actually stored in the ThreadFuture object, +// in which case we expect the caller to keep that ThreadFuture around until the result is no +// longer needed. +// +// We can provide some compile-time detection of this misuse by disallowing anonymous thread futures +// being passed in for certain types. +template +struct allow_anonymous_future : std::true_type {}; + +template +struct allow_anonymous_future> : std::false_type {}; + +template +struct allow_anonymous_future>> : std::false_type {}; + template -Future safeThreadFutureToFuture(Future future) { - // do nothing +typename std::enable_if::value, Future>::type safeThreadFutureToFuture( + const ThreadFuture& threadFuture) { + return safeThreadFutureToFutureImpl(threadFuture); +} + +template +typename std::enable_if::value, Future>::type safeThreadFutureToFuture( + ThreadFuture& threadFuture) { + return safeThreadFutureToFutureImpl(threadFuture); +} + +template +typename std::enable_if::value, Future>::type safeThreadFutureToFuture( + const Future& future) { + // Do nothing + return future; +} + +template +typename std::enable_if::value, Future>::type safeThreadFutureToFuture( + Future& future) { + // Do nothing return future; }