mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-16 19:02:20 +08:00
Merge pull request #6717 from sfc-gh-ajbeamon/thread-future-safety-check
Disallow anonymous standalone thread futures in safeThreadFutureToFuture
This commit is contained in:
commit
5861ff2dc6
@ -125,6 +125,21 @@ bool isCompleteConfiguration(std::map<std::string, std::string> const& options);
|
|||||||
|
|
||||||
ConfigureAutoResult parseConfig(StatusObject const& status);
|
ConfigureAutoResult parseConfig(StatusObject const& status);
|
||||||
|
|
||||||
|
template <typename Transaction, class T>
|
||||||
|
struct transaction_future_type {
|
||||||
|
using type = typename Transaction::template FutureT<T>;
|
||||||
|
};
|
||||||
|
|
||||||
|
template <typename Transaction, class T>
|
||||||
|
struct transaction_future_type<Transaction*, T> {
|
||||||
|
using type = typename transaction_future_type<Transaction, T>::type;
|
||||||
|
};
|
||||||
|
|
||||||
|
template <typename Transaction, class T>
|
||||||
|
struct transaction_future_type<Reference<Transaction>, T> {
|
||||||
|
using type = typename transaction_future_type<Transaction, T>::type;
|
||||||
|
};
|
||||||
|
|
||||||
// Management API written in template code to support both IClientAPI and NativeAPI
|
// Management API written in template code to support both IClientAPI and NativeAPI
|
||||||
namespace ManagementAPI {
|
namespace ManagementAPI {
|
||||||
|
|
||||||
@ -636,7 +651,8 @@ Future<Optional<TenantMapEntry>> tryGetTenantTransaction(Transaction tr, TenantN
|
|||||||
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
|
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
|
||||||
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
||||||
|
|
||||||
Optional<Value> val = wait(safeThreadFutureToFuture(tr->get(tenantMapKey)));
|
state typename transaction_future_type<Transaction, Optional<Value>>::type tenantFuture = tr->get(tenantMapKey);
|
||||||
|
Optional<Value> val = wait(safeThreadFutureToFuture(tenantFuture));
|
||||||
return val.map<TenantMapEntry>([](Optional<Value> v) { return decodeTenantEntry(v.get()); });
|
return val.map<TenantMapEntry>([](Optional<Value> v) { return decodeTenantEntry(v.get()); });
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -688,10 +704,13 @@ Future<Optional<TenantMapEntry>> createTenantTransaction(Transaction tr, TenantN
|
|||||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||||
|
|
||||||
state Future<Optional<TenantMapEntry>> tenantEntryFuture = tryGetTenantTransaction(tr, name);
|
state Future<Optional<TenantMapEntry>> tenantEntryFuture = tryGetTenantTransaction(tr, name);
|
||||||
state Future<Optional<Value>> tenantDataPrefixFuture = safeThreadFutureToFuture(tr->get(tenantDataPrefixKey));
|
state typename transaction_future_type<Transaction, Optional<Value>>::type tenantDataPrefixFuture =
|
||||||
state Future<Optional<Value>> lastIdFuture = safeThreadFutureToFuture(tr->get(tenantLastIdKey));
|
tr->get(tenantDataPrefixKey);
|
||||||
|
state typename transaction_future_type<Transaction, Optional<Value>>::type lastIdFuture = tr->get(tenantLastIdKey);
|
||||||
|
state typename transaction_future_type<Transaction, Optional<Value>>::type tenantModeFuture =
|
||||||
|
tr->get(configKeysPrefix.withSuffix("tenant_mode"_sr));
|
||||||
|
|
||||||
Optional<Value> tenantMode = wait(safeThreadFutureToFuture(tr->get(configKeysPrefix.withSuffix("tenant_mode"_sr))));
|
Optional<Value> tenantMode = wait(safeThreadFutureToFuture(tenantModeFuture));
|
||||||
|
|
||||||
if (!tenantMode.present() || tenantMode.get() == StringRef(format("%d", TenantMode::DISABLED))) {
|
if (!tenantMode.present() || tenantMode.get() == StringRef(format("%d", TenantMode::DISABLED))) {
|
||||||
throw tenants_disabled();
|
throw tenants_disabled();
|
||||||
@ -702,13 +721,15 @@ Future<Optional<TenantMapEntry>> createTenantTransaction(Transaction tr, TenantN
|
|||||||
return Optional<TenantMapEntry>();
|
return Optional<TenantMapEntry>();
|
||||||
}
|
}
|
||||||
|
|
||||||
state Optional<Value> lastIdVal = wait(lastIdFuture);
|
state Optional<Value> lastIdVal = wait(safeThreadFutureToFuture(lastIdFuture));
|
||||||
Optional<Value> tenantDataPrefix = wait(tenantDataPrefixFuture);
|
Optional<Value> tenantDataPrefix = wait(safeThreadFutureToFuture(tenantDataPrefixFuture));
|
||||||
|
|
||||||
state TenantMapEntry newTenant(lastIdVal.present() ? TenantMapEntry::prefixToId(lastIdVal.get()) + 1 : 0,
|
state TenantMapEntry newTenant(lastIdVal.present() ? TenantMapEntry::prefixToId(lastIdVal.get()) + 1 : 0,
|
||||||
tenantDataPrefix.present() ? (KeyRef)tenantDataPrefix.get() : ""_sr);
|
tenantDataPrefix.present() ? (KeyRef)tenantDataPrefix.get() : ""_sr);
|
||||||
|
|
||||||
RangeResult contents = wait(safeThreadFutureToFuture(tr->getRange(prefixRange(newTenant.prefix), 1)));
|
state typename transaction_future_type<Transaction, RangeResult>::type prefixRangeFuture =
|
||||||
|
tr->getRange(prefixRange(newTenant.prefix), 1);
|
||||||
|
RangeResult contents = wait(safeThreadFutureToFuture(prefixRangeFuture));
|
||||||
if (!contents.empty()) {
|
if (!contents.empty()) {
|
||||||
throw tenant_prefix_allocator_conflict();
|
throw tenant_prefix_allocator_conflict();
|
||||||
}
|
}
|
||||||
@ -774,7 +795,9 @@ Future<Void> deleteTenantTransaction(Transaction tr, TenantNameRef name) {
|
|||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
RangeResult contents = wait(safeThreadFutureToFuture(tr->getRange(prefixRange(tenantEntry.get().prefix), 1)));
|
state typename transaction_future_type<Transaction, RangeResult>::type prefixRangeFuture =
|
||||||
|
tr->getRange(prefixRange(tenantEntry.get().prefix), 1);
|
||||||
|
RangeResult contents = wait(safeThreadFutureToFuture(prefixRangeFuture));
|
||||||
if (!contents.empty()) {
|
if (!contents.empty()) {
|
||||||
throw tenant_not_empty();
|
throw tenant_not_empty();
|
||||||
}
|
}
|
||||||
@ -832,8 +855,9 @@ Future<std::map<TenantName, TenantMapEntry>> listTenantsTransaction(Transaction
|
|||||||
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
|
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
|
||||||
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
|
||||||
|
|
||||||
RangeResult results = wait(safeThreadFutureToFuture(
|
state typename transaction_future_type<Transaction, RangeResult>::type listFuture =
|
||||||
tr->getRange(firstGreaterOrEqual(range.begin), firstGreaterOrEqual(range.end), limit)));
|
tr->getRange(firstGreaterOrEqual(range.begin), firstGreaterOrEqual(range.end), limit);
|
||||||
|
RangeResult results = wait(safeThreadFutureToFuture(listFuture));
|
||||||
|
|
||||||
std::map<TenantName, TenantMapEntry> tenants;
|
std::map<TenantName, TenantMapEntry> tenants;
|
||||||
for (auto kv : results) {
|
for (auto kv : results) {
|
||||||
|
@ -65,8 +65,9 @@ private:
|
|||||||
state int64_t window = 0;
|
state int64_t window = 0;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
RangeResult range =
|
state typename TransactionT::template FutureT<RangeResult> rangeFuture =
|
||||||
wait(safeThreadFutureToFuture(tr->getRange(self->counters.range(), 1, Snapshot::True, Reverse::True)));
|
tr->getRange(self->counters.range(), 1, Snapshot::True, Reverse::True);
|
||||||
|
RangeResult range = wait(safeThreadFutureToFuture(rangeFuture));
|
||||||
|
|
||||||
if (range.size() > 0) {
|
if (range.size() > 0) {
|
||||||
start = self->counters.unpack(range[0].key).getInt(0);
|
start = self->counters.unpack(range[0].key).getInt(0);
|
||||||
@ -83,11 +84,12 @@ private:
|
|||||||
|
|
||||||
int64_t inc = 1;
|
int64_t inc = 1;
|
||||||
tr->atomicOp(self->counters.get(start).key(), StringRef((uint8_t*)&inc, 8), MutationRef::AddValue);
|
tr->atomicOp(self->counters.get(start).key(), StringRef((uint8_t*)&inc, 8), MutationRef::AddValue);
|
||||||
Future<Optional<Value>> countFuture =
|
|
||||||
safeThreadFutureToFuture(tr->get(self->counters.get(start).key(), Snapshot::True));
|
state typename TransactionT::template FutureT<Optional<Value>> countFuture =
|
||||||
|
tr->get(self->counters.get(start).key(), Snapshot::True);
|
||||||
// }
|
// }
|
||||||
|
|
||||||
Optional<Value> countValue = wait(countFuture);
|
Optional<Value> countValue = wait(safeThreadFutureToFuture(countFuture));
|
||||||
|
|
||||||
int64_t count = 0;
|
int64_t count = 0;
|
||||||
if (countValue.present()) {
|
if (countValue.present()) {
|
||||||
@ -110,15 +112,17 @@ private:
|
|||||||
state int64_t candidate = deterministicRandom()->randomInt(start, start + window);
|
state int64_t candidate = deterministicRandom()->randomInt(start, start + window);
|
||||||
|
|
||||||
// if thread safety is needed, this should be locked {
|
// if thread safety is needed, this should be locked {
|
||||||
state Future<RangeResult> latestCounterFuture =
|
state typename TransactionT::template FutureT<RangeResult> latestCounterFuture =
|
||||||
safeThreadFutureToFuture(tr->getRange(self->counters.range(), 1, Snapshot::True, Reverse::True));
|
tr->getRange(self->counters.range(), 1, Snapshot::True, Reverse::True);
|
||||||
state Future<Optional<Value>> candidateValueFuture =
|
state typename TransactionT::template FutureT<Optional<Value>> candidateValueFuture =
|
||||||
safeThreadFutureToFuture(tr->get(self->recent.get(candidate).key()));
|
tr->get(self->recent.get(candidate).key());
|
||||||
tr->setOption(FDBTransactionOptions::NEXT_WRITE_NO_WRITE_CONFLICT_RANGE);
|
tr->setOption(FDBTransactionOptions::NEXT_WRITE_NO_WRITE_CONFLICT_RANGE);
|
||||||
tr->set(self->recent.get(candidate).key(), ValueRef());
|
tr->set(self->recent.get(candidate).key(), ValueRef());
|
||||||
// }
|
// }
|
||||||
|
|
||||||
wait(success(latestCounterFuture) && success(candidateValueFuture));
|
wait(success(safeThreadFutureToFuture(latestCounterFuture)) &&
|
||||||
|
success(safeThreadFutureToFuture(candidateValueFuture)));
|
||||||
|
|
||||||
int64_t currentWindowStart = 0;
|
int64_t currentWindowStart = 0;
|
||||||
if (latestCounterFuture.get().size() > 0) {
|
if (latestCounterFuture.get().size() > 0) {
|
||||||
currentWindowStart = self->counters.unpack(latestCounterFuture.get()[0].key).getInt(0);
|
currentWindowStart = self->counters.unpack(latestCounterFuture.get()[0].key).getInt(0);
|
||||||
|
@ -459,6 +459,10 @@ public:
|
|||||||
std::vector<Reference<Watch>> watches;
|
std::vector<Reference<Watch>> watches;
|
||||||
Span span;
|
Span span;
|
||||||
|
|
||||||
|
// used in template functions as returned Future type
|
||||||
|
template <typename Type>
|
||||||
|
using FutureT = Future<Type>;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Future<Version> getReadVersion(uint32_t flags);
|
Future<Version> getReadVersion(uint32_t flags);
|
||||||
|
|
||||||
|
@ -689,7 +689,7 @@ private:
|
|||||||
// If instead, this actor is cancelled, we will also cancel the underlying "threadFuture"
|
// If instead, this actor is cancelled, we will also cancel the underlying "threadFuture"
|
||||||
// Note: we are required to have unique ownership of the "threadFuture"
|
// Note: we are required to have unique ownership of the "threadFuture"
|
||||||
ACTOR template <class T>
|
ACTOR template <class T>
|
||||||
Future<T> safeThreadFutureToFuture(ThreadFuture<T> threadFuture) {
|
Future<T> safeThreadFutureToFutureImpl(ThreadFuture<T> threadFuture) {
|
||||||
Promise<Void> ready;
|
Promise<Void> ready;
|
||||||
Future<Void> onReady = ready.getFuture();
|
Future<Void> onReady = ready.getFuture();
|
||||||
UtilCallback<T>* callback = new UtilCallback<T>(threadFuture, ready.extractRawPointer());
|
UtilCallback<T>* callback = new UtilCallback<T>(threadFuture, ready.extractRawPointer());
|
||||||
@ -710,10 +710,45 @@ Future<T> safeThreadFutureToFuture(ThreadFuture<T> threadFuture) {
|
|||||||
return threadFuture.get();
|
return threadFuture.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
// do nothing, just for template functions' calls
|
// The allow anonymous_future type is used to prevent misuse of ThreadFutures.
|
||||||
|
// For Standalone types, the memory in some cases is actually stored in the ThreadFuture object,
|
||||||
|
// in which case we expect the caller to keep that ThreadFuture around until the result is no
|
||||||
|
// longer needed.
|
||||||
|
//
|
||||||
|
// We can provide some compile-time detection of this misuse by disallowing anonymous thread futures
|
||||||
|
// being passed in for certain types.
|
||||||
|
template <typename T>
|
||||||
|
struct allow_anonymous_future : std::true_type {};
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
struct allow_anonymous_future<Standalone<T>> : std::false_type {};
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
struct allow_anonymous_future<Optional<Standalone<T>>> : std::false_type {};
|
||||||
|
|
||||||
template <class T>
|
template <class T>
|
||||||
Future<T> safeThreadFutureToFuture(Future<T> future) {
|
typename std::enable_if<allow_anonymous_future<T>::value, Future<T>>::type safeThreadFutureToFuture(
|
||||||
// do nothing
|
const ThreadFuture<T>& threadFuture) {
|
||||||
|
return safeThreadFutureToFutureImpl(threadFuture);
|
||||||
|
}
|
||||||
|
|
||||||
|
template <class T>
|
||||||
|
typename std::enable_if<!allow_anonymous_future<T>::value, Future<T>>::type safeThreadFutureToFuture(
|
||||||
|
ThreadFuture<T>& threadFuture) {
|
||||||
|
return safeThreadFutureToFutureImpl(threadFuture);
|
||||||
|
}
|
||||||
|
|
||||||
|
template <class T>
|
||||||
|
typename std::enable_if<allow_anonymous_future<T>::value, Future<T>>::type safeThreadFutureToFuture(
|
||||||
|
const Future<T>& future) {
|
||||||
|
// Do nothing
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <class T>
|
||||||
|
typename std::enable_if<!allow_anonymous_future<T>::value, Future<T>>::type safeThreadFutureToFuture(
|
||||||
|
Future<T>& future) {
|
||||||
|
// Do nothing
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user