diff --git a/fdbclient/IClientApi.h b/fdbclient/IClientApi.h index 154ac9723f..0d7288de52 100644 --- a/fdbclient/IClientApi.h +++ b/fdbclient/IClientApi.h @@ -49,6 +49,8 @@ public: virtual void addReadConflictRange(const KeyRangeRef& keys) = 0; virtual ThreadFuture getEstimatedRangeSizeBytes(const KeyRangeRef& keys) = 0; + virtual ThreadFuture>> getRangeSplitPoints(const KeyRangeRef& range, + int64_t chunkSize) = 0; virtual void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) = 0; virtual void set(const KeyRef& key, const ValueRef& value) = 0; diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index 966d0f6504..a3359eb6c8 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -159,6 +159,23 @@ ThreadFuture DLTransaction::getEstimatedRangeSizeBytes(const KeyRangeRe }); } +ThreadFuture>> DLTransaction::getRangeSplitPoints(const KeyRangeRef& range, + int64_t chunkSize) { + if (!api->transactionGetRangeSplitPoints) { + return unsupported_operation(); + } + FdbCApi::FDBFuture* f = api->transactionGetRangeSplitPoints(tr, range.begin.begin(), range.begin.size(), + range.end.begin(), range.end.size(), chunkSize); + + return toThreadFuture>>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { + const FdbCApi::FDBKey* splitKeys; + int keysArrayLength; + FdbCApi::fdb_error_t error = api->futureGetKeyArray(f, &splitKeys, &keysArrayLength); + ASSERT(!error); + return Standalone>(VectorRef((KeyRef*)splitKeys, keysArrayLength), Arena()); + }); +} + void DLTransaction::addReadConflictRange(const KeyRangeRef& keys) { throwIfError(api->transactionAddConflictRange(tr, keys.begin.begin(), keys.begin.size(), keys.end.begin(), keys.end.size(), FDBConflictRangeTypes::READ)); } @@ -322,12 +339,15 @@ void DLApi::init() { loadClientFunction(&api->transactionCancel, lib, fdbCPath, "fdb_transaction_cancel"); loadClientFunction(&api->transactionAddConflictRange, lib, fdbCPath, "fdb_transaction_add_conflict_range"); loadClientFunction(&api->transactionGetEstimatedRangeSizeBytes, lib, fdbCPath, "fdb_transaction_get_estimated_range_size_bytes", headerVersion >= 630); + loadClientFunction(&api->transactionGetRangeSplitPoints, lib, fdbCPath, "fdb_transaction_get_range_split_points", + headerVersion >= 630); loadClientFunction(&api->futureGetInt64, lib, fdbCPath, headerVersion >= 620 ? "fdb_future_get_int64" : "fdb_future_get_version"); loadClientFunction(&api->futureGetError, lib, fdbCPath, "fdb_future_get_error"); loadClientFunction(&api->futureGetKey, lib, fdbCPath, "fdb_future_get_key"); loadClientFunction(&api->futureGetValue, lib, fdbCPath, "fdb_future_get_value"); loadClientFunction(&api->futureGetStringArray, lib, fdbCPath, "fdb_future_get_string_array"); + loadClientFunction(&api->futureGetKeyArray, lib, fdbCPath, "fdb_future_get_key_array"); loadClientFunction(&api->futureGetKeyValueArray, lib, fdbCPath, "fdb_future_get_keyvalue_array"); loadClientFunction(&api->futureSetCallback, lib, fdbCPath, "fdb_future_set_callback"); loadClientFunction(&api->futureCancel, lib, fdbCPath, "fdb_future_cancel"); @@ -568,6 +588,14 @@ ThreadFuture MultiVersionTransaction::getEstimatedRangeSizeBytes(const return abortableFuture(f, tr.onChange); } +ThreadFuture>> MultiVersionTransaction::getRangeSplitPoints(const KeyRangeRef& range, + int64_t chunkSize) { + auto tr = getTransaction(); + auto f = tr.transaction ? tr.transaction->getRangeSplitPoints(range, chunkSize) + : ThreadFuture>>(Never()); + return abortableFuture(f, tr.onChange); +} + void MultiVersionTransaction::atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) { auto tr = getTransaction(); if(tr.transaction) { diff --git a/fdbclient/MultiVersionTransaction.h b/fdbclient/MultiVersionTransaction.h index c803032cc7..ebf40726e0 100644 --- a/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/MultiVersionTransaction.h @@ -35,6 +35,10 @@ struct FdbCApi : public ThreadSafeReferenceCounted { typedef struct transaction FDBTransaction; #pragma pack(push, 4) + typedef struct key { + const uint8_t* key; + int keyLength; + } FDBKey; typedef struct keyvalue { const void *key; int keyLength; @@ -84,7 +88,11 @@ struct FdbCApi : public ThreadSafeReferenceCounted { FDBFuture* (*transactionGetEstimatedRangeSizeBytes)(FDBTransaction* tr, uint8_t const* begin_key_name, int begin_key_name_length, uint8_t const* end_key_name, int end_key_name_length); - + + FDBFuture* (*transactionGetRangeSplitPoints)(FDBTransaction* tr, uint8_t const* begin_key_name, + int begin_key_name_length, uint8_t const* end_key_name, + int end_key_name_length, int64_t chunkSize); + FDBFuture* (*transactionCommit)(FDBTransaction *tr); fdb_error_t (*transactionGetCommittedVersion)(FDBTransaction *tr, int64_t *outVersion); FDBFuture* (*transactionGetApproximateSize)(FDBTransaction *tr); @@ -103,6 +111,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted { fdb_error_t (*futureGetKey)(FDBFuture *f, uint8_t const **outKey, int *outKeyLength); fdb_error_t (*futureGetValue)(FDBFuture *f, fdb_bool_t *outPresent, uint8_t const **outValue, int *outValueLength); fdb_error_t (*futureGetStringArray)(FDBFuture *f, const char ***outStrings, int *outCount); + fdb_error_t (*futureGetKeyArray)(FDBFuture* f, FDBKey const** outKeys, int* outCount); fdb_error_t (*futureGetKeyValueArray)(FDBFuture *f, FDBKeyValue const ** outKV, int *outCount, fdb_bool_t *outMore); fdb_error_t (*futureSetCallback)(FDBFuture *f, FDBCallback callback, void *callback_parameter); void (*futureCancel)(FDBFuture *f); @@ -133,7 +142,9 @@ public: ThreadFuture>> getAddressesForKey(const KeyRef& key) override; ThreadFuture> getVersionstamp() override; ThreadFuture getEstimatedRangeSizeBytes(const KeyRangeRef& keys) override; - + ThreadFuture>> getRangeSplitPoints(const KeyRangeRef& range, + int64_t chunkSize) override; + void addReadConflictRange(const KeyRangeRef& keys) override; void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) override; @@ -237,6 +248,8 @@ public: void addReadConflictRange(const KeyRangeRef& keys) override; ThreadFuture getEstimatedRangeSizeBytes(const KeyRangeRef& keys) override; + ThreadFuture>> getRangeSplitPoints(const KeyRangeRef& range, + int64_t chunkSize) override; void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) override; void set(const KeyRef& key, const ValueRef& value) override; diff --git a/fdbclient/ReadYourWrites.actor.cpp b/fdbclient/ReadYourWrites.actor.cpp index c69ca87467..0b092c2d58 100644 --- a/fdbclient/ReadYourWrites.actor.cpp +++ b/fdbclient/ReadYourWrites.actor.cpp @@ -1397,6 +1397,16 @@ Future ReadYourWritesTransaction::getEstimatedRangeSizeBytes(const KeyR return map(waitOrError(tr.getStorageMetrics(keys, -1), resetPromise.getFuture()), [](const StorageMetrics& m) { return m.bytes; }); } +Future>> ReadYourWritesTransaction::getRangeSplitPoints(const KeyRangeRef& range, + int64_t chunkSize) { + if (checkUsedDuringCommit()) { + throw used_during_commit(); + } + if (resetPromise.isSet()) return resetPromise.getFuture().getError(); + + return waitOrError(tr.getRangeSplitPoints(range, chunkSize), resetPromise.getFuture()); +} + void ReadYourWritesTransaction::addReadConflictRange( KeyRangeRef const& keys ) { if(checkUsedDuringCommit()) { throw used_during_commit(); diff --git a/fdbclient/ReadYourWrites.h b/fdbclient/ReadYourWrites.h index 16edbb7277..493ff06b27 100644 --- a/fdbclient/ReadYourWrites.h +++ b/fdbclient/ReadYourWrites.h @@ -86,6 +86,7 @@ public: [[nodiscard]] Future>> getAddressesForKey(const Key& key); Future getEstimatedRangeSizeBytes( const KeyRangeRef& keys ); + Future>> getRangeSplitPoints(const KeyRangeRef& range, int64_t chunkSize); void addReadConflictRange( KeyRangeRef const& keys ); void makeSelfConflicting() { tr.makeSelfConflicting(); } diff --git a/fdbclient/ThreadSafeTransaction.actor.cpp b/fdbclient/ThreadSafeTransaction.actor.cpp index 8b9d75e2e2..2c375ea28b 100644 --- a/fdbclient/ThreadSafeTransaction.actor.cpp +++ b/fdbclient/ThreadSafeTransaction.actor.cpp @@ -164,6 +164,16 @@ ThreadFuture ThreadSafeTransaction::getEstimatedRangeSizeBytes( const K } ); } +ThreadFuture>> ThreadSafeTransaction::getRangeSplitPoints(const KeyRangeRef& range, + int64_t chunkSize) { + KeyRange r = range; + + ReadYourWritesTransaction* tr = this->tr; + return onMainThread([tr, r, chunkSize]() -> Future>> { + tr->checkDeferredError(); + return tr->getRangeSplitPoints(r, chunkSize); + }); +} ThreadFuture< Standalone > ThreadSafeTransaction::getRange( const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot, bool reverse ) { KeySelector b = begin; diff --git a/fdbclient/ThreadSafeTransaction.h b/fdbclient/ThreadSafeTransaction.h index 8e364ed3a3..0702daa541 100644 --- a/fdbclient/ThreadSafeTransaction.h +++ b/fdbclient/ThreadSafeTransaction.h @@ -72,6 +72,8 @@ public: ThreadFuture>> getAddressesForKey(const KeyRef& key) override; ThreadFuture> getVersionstamp() override; ThreadFuture getEstimatedRangeSizeBytes(const KeyRangeRef& keys) override; + ThreadFuture>> getRangeSplitPoints(const KeyRangeRef& range, + int64_t chunkSize) override; void addReadConflictRange( const KeyRangeRef& keys ) override; void makeSelfConflicting();