diff --git a/bindings/c/CMakeLists.txt b/bindings/c/CMakeLists.txt index a5897a08d4..b30a78bd84 100644 --- a/bindings/c/CMakeLists.txt +++ b/bindings/c/CMakeLists.txt @@ -60,16 +60,20 @@ if(NOT WIN32) if(OPEN_FOR_IDE) add_library(fdb_c_performance_test OBJECT test/performance_test.c test/test.h) add_library(fdb_c_ryw_benchmark OBJECT test/ryw_benchmark.c test/test.h) + add_library(fdb_c_txn_size_test OBJECT test/txn_size_test.c test/test.h) add_library(mako OBJECT ${MAKO_SRCS}) else() add_executable(fdb_c_performance_test test/performance_test.c test/test.h) add_executable(fdb_c_ryw_benchmark test/ryw_benchmark.c test/test.h) + add_executable(fdb_c_txn_size_test test/txn_size_test.c test/test.h) add_executable(mako ${MAKO_SRCS}) strip_debug_symbols(fdb_c_performance_test) strip_debug_symbols(fdb_c_ryw_benchmark) + strip_debug_symbols(fdb_c_txn_size_test) endif() target_link_libraries(fdb_c_performance_test PRIVATE fdb_c) target_link_libraries(fdb_c_ryw_benchmark PRIVATE fdb_c) + target_link_libraries(fdb_c_txn_size_test PRIVATE fdb_c) # do not set RPATH for mako set_property(TARGET mako PROPERTY SKIP_BUILD_RPATH TRUE) target_link_libraries(mako PRIVATE fdb_c) diff --git a/bindings/c/fdb_c.cpp b/bindings/c/fdb_c.cpp index e4f44e8ae4..467b2b0c11 100644 --- a/bindings/c/fdb_c.cpp +++ b/bindings/c/fdb_c.cpp @@ -585,8 +585,8 @@ fdb_error_t fdb_transaction_get_committed_version( FDBTransaction* tr, } extern "C" DLLEXPORT -fdb_error_t fdb_transaction_get_approximate_size(FDBTransaction* tr, uint32_t* out_size) { - CATCH_AND_RETURN(*out_size = TXN(tr)->getApproximateSize();); +FDBFuture* fdb_transaction_get_approximate_size(FDBTransaction* tr) { + return (FDBFuture*)TXN(tr)->getApproximateSize().extractPtr(); } extern "C" DLLEXPORT diff --git a/bindings/c/foundationdb/fdb_c.h b/bindings/c/foundationdb/fdb_c.h index b302d1a6ff..d3e904ecd9 100644 --- a/bindings/c/foundationdb/fdb_c.h +++ b/bindings/c/foundationdb/fdb_c.h @@ -225,8 +225,8 @@ extern "C" { fdb_transaction_get_committed_version( FDBTransaction* tr, int64_t* out_version ); - DLLEXPORT WARN_UNUSED_RESULT fdb_error_t - fdb_transaction_get_approximate_size(FDBTransaction* tr, uint32_t* out_size); + DLLEXPORT WARN_UNUSED_RESULT FDBFuture* + fdb_transaction_get_approximate_size(FDBTransaction* tr); DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_versionstamp( FDBTransaction* tr ); diff --git a/bindings/python/fdb/impl.py b/bindings/python/fdb/impl.py index 6bfc8b8cce..e3f68e98c9 100644 --- a/bindings/python/fdb/impl.py +++ b/bindings/python/fdb/impl.py @@ -542,9 +542,8 @@ class Transaction(TransactionRead): return version.value def get_approximate_size(self): - size = ctypes.c_int() - self.capi.fdb_transaction_get_approximate_size(self.tpointer, ctypes.byref(size)) - return size.value + """Get the approximate commit size of the transaction.""" + return FutureVersion(self.capi.fdb_transaction_get_approximate_size(self.tpointer)) def get_versionstamp(self): return Key(self.capi.fdb_transaction_get_versionstamp(self.tpointer)) @@ -1458,9 +1457,8 @@ def init_c_api(): _capi.fdb_transaction_get_committed_version.restype = ctypes.c_int _capi.fdb_transaction_get_committed_version.errcheck = check_error_code - _capi.fdb_transaction_get_approximate_size.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_int)] - _capi.fdb_transaction_get_approximate_size.restype = ctypes.c_int - _capi.fdb_transaction_get_approximate_size.errcheck = check_error_code + _capi.fdb_transaction_get_approximate_size.argtypes = [ctypes.c_void_p] + _capi.fdb_transaction_get_approximate_size.restype = ctypes.c_void_p _capi.fdb_transaction_get_versionstamp.argtypes = [ctypes.c_void_p] _capi.fdb_transaction_get_versionstamp.restype = ctypes.c_void_p diff --git a/fdbclient/IClientApi.h b/fdbclient/IClientApi.h index db794fb6dd..b3e6217054 100644 --- a/fdbclient/IClientApi.h +++ b/fdbclient/IClientApi.h @@ -61,7 +61,7 @@ public: virtual ThreadFuture commit() = 0; virtual Version getCommittedVersion() = 0; - virtual uint32_t getApproximateSize() = 0; + virtual ThreadFuture getApproximateSize() = 0; virtual void setOption(FDBTransactionOptions::Option option, Optional value=Optional()) = 0; diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index ee633d8747..ff26f73e14 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -195,10 +195,18 @@ Version DLTransaction::getCommittedVersion() { return version; } -uint32_t DLTransaction::getApproximateSize() { - int32_t size; - throwIfError(api->transactionGetApproximateSize(tr, &size)); - return size; +ThreadFuture DLTransaction::getApproximateSize() { + if(!api->transactionGetApproximateSize) { + return unsupported_operation(); + } + + FdbCApi::FDBFuture *f = api->transactionGetApproximateSize(tr); + return toThreadFuture(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) { + int64_t size; + FdbCApi::fdb_error_t error = api->futureGetVersion(f, &size); + ASSERT(!error); + return size; + }); } void DLTransaction::setOption(FDBTransactionOptions::Option option, Optional value) { @@ -293,7 +301,8 @@ void DLApi::init() { loadClientFunction(&api->transactionAtomicOp, lib, fdbCPath, "fdb_transaction_atomic_op"); loadClientFunction(&api->transactionCommit, lib, fdbCPath, "fdb_transaction_commit"); loadClientFunction(&api->transactionGetCommittedVersion, lib, fdbCPath, "fdb_transaction_get_committed_version"); - loadClientFunction(&api->transactionGetApproximateSize, lib, fdbCPath, "fdb_transaction_get_approximate_size"); + // TODO: change to 620 for 6.2 release + loadClientFunction(&api->transactionGetApproximateSize, lib, fdbCPath, "fdb_transaction_get_approximate_size", headerVersion >= 610); loadClientFunction(&api->transactionWatch, lib, fdbCPath, "fdb_transaction_watch"); loadClientFunction(&api->transactionOnError, lib, fdbCPath, "fdb_transaction_on_error"); loadClientFunction(&api->transactionReset, lib, fdbCPath, "fdb_transaction_reset"); @@ -602,12 +611,10 @@ Version MultiVersionTransaction::getCommittedVersion() { return invalidVersion; } -uint32_t MultiVersionTransaction::getApproximateSize() { +ThreadFuture MultiVersionTransaction::getApproximateSize() { auto tr = getTransaction(); - if (tr.transaction) { - return tr.transaction->getApproximateSize(); - } - return 0; + auto f = tr.transaction ? tr.transaction->getApproximateSize() : ThreadFuture(Never()); + return abortableFuture(f, tr.onChange); } void MultiVersionTransaction::setOption(FDBTransactionOptions::Option option, Optional value) { diff --git a/fdbclient/MultiVersionTransaction.h b/fdbclient/MultiVersionTransaction.h index 86b4f72fb6..be52927022 100644 --- a/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/MultiVersionTransaction.h @@ -84,7 +84,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted { FDBFuture* (*transactionCommit)(FDBTransaction *tr); fdb_error_t (*transactionGetCommittedVersion)(FDBTransaction *tr, int64_t *outVersion); - fdb_error_t (*transactionGetApproximateSize)(FDBTransaction *tr, int32_t *outSize); + FDBFuture* (*transactionGetApproximateSize)(FDBTransaction *tr); FDBFuture* (*transactionWatch)(FDBTransaction *tr, uint8_t const *keyName, int keyNameLength); FDBFuture* (*transactionOnError)(FDBTransaction *tr, fdb_error_t error); void (*transactionReset)(FDBTransaction *tr); @@ -144,7 +144,7 @@ public: ThreadFuture commit() override; Version getCommittedVersion() override; - uint32_t getApproximateSize() override; + ThreadFuture getApproximateSize() override; void setOption(FDBTransactionOptions::Option option, Optional value=Optional()) override; @@ -241,7 +241,7 @@ public: ThreadFuture commit() override; Version getCommittedVersion() override; - uint32_t getApproximateSize() override; + ThreadFuture getApproximateSize() override; void setOption(FDBTransactionOptions::Option option, Optional value=Optional()) override; diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 19931005f8..2160804393 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -2314,6 +2314,7 @@ void Transaction::addReadConflictRange( KeyRangeRef const& keys ) { } tr.transaction.read_conflict_ranges.push_back_deep( tr.arena, r ); + printf("%x read_conflict_ranges size: %d, %d\n", this, tr.transaction.read_conflict_ranges.size(), tr.transaction.read_conflict_ranges.expectedSize()); } void Transaction::makeSelfConflicting() { @@ -2337,6 +2338,7 @@ void Transaction::set( const KeyRef& key, const ValueRef& value, bool addConflic auto r = singleKeyRange( key, req.arena ); auto v = ValueRef( req.arena, value ); t.mutations.push_back( req.arena, MutationRef( MutationRef::SetValue, r.begin, v ) ); + printf("%x set mutation size: %d, %d\n", this, t.mutations.size(), t.mutations.expectedSize()); if( addConflictRange ) { t.write_conflict_ranges.push_back( req.arena, r ); @@ -2470,6 +2472,7 @@ void Transaction::addWriteConflictRange( const KeyRangeRef& keys ) { } t.write_conflict_ranges.push_back_deep( req.arena, r ); + printf("%x write_conflict_ranges size: %d, %d\n", this, t.write_conflict_ranges.size(), t.write_conflict_ranges.expectedSize()); } double Transaction::getBackoff(int errCode) { @@ -2824,7 +2827,7 @@ Future Transaction::commitMutations() { cx->mutationsPerCommit.addSample(tr.transaction.mutations.size()); cx->bytesPerCommit.addSample(tr.transaction.mutations.expectedSize()); - size_t transactionSize = tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() + tr.transaction.write_conflict_ranges.expectedSize(); + size_t transactionSize = getSize(); if (transactionSize > (uint64_t)FLOW_KNOBS->PACKET_WARNING) { TraceEvent(!g_network->isSimulated() ? SevWarnAlways : SevWarn, "LargeTransaction") .suppressFor(1.0) @@ -3176,9 +3179,11 @@ Future> Transaction::getVersionstamp() { return versionstampPromise.getFuture(); } -uint32_t Transaction::getApproximateSize() { - return tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() + +uint32_t Transaction::getSize() { + auto s = tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() + tr.transaction.write_conflict_ranges.expectedSize(); + printf("%x approximate size: %d, mutation size: %d\n", this, s, tr.transaction.mutations.size()); + return s; } Future Transaction::onError( Error const& e ) { diff --git a/fdbclient/NativeAPI.actor.h b/fdbclient/NativeAPI.actor.h index 3f07fd1e04..28d19b8807 100644 --- a/fdbclient/NativeAPI.actor.h +++ b/fdbclient/NativeAPI.actor.h @@ -284,7 +284,7 @@ public: Promise> versionstampPromise; - uint32_t getApproximateSize(); + uint32_t getSize(); Future onError( Error const& e ); void flushTrLogsIfEnabled(); diff --git a/fdbclient/ReadYourWrites.actor.cpp b/fdbclient/ReadYourWrites.actor.cpp index 013fd61596..ad5bc42071 100644 --- a/fdbclient/ReadYourWrites.actor.cpp +++ b/fdbclient/ReadYourWrites.actor.cpp @@ -1366,6 +1366,7 @@ void ReadYourWritesTransaction::addReadConflictRange( KeyRangeRef const& keys ) return; } + approximateSize += r.expectedSize() + RYW_TRANSACTION_SIZE_OVERHEAD; if(options.readYourWritesDisabled) { tr.addReadConflictRange(r); return; @@ -1605,6 +1606,7 @@ void ReadYourWritesTransaction::set( const KeyRef& key, const ValueRef& value ) throw key_outside_legal_range(); if(options.readYourWritesDisabled ) { + approximateSize += key.expectedSize() + value.expectedSize() + RYW_TRANSACTION_SIZE_OVERHEAD; return tr.set(key, value, addWriteConflict); } @@ -1616,6 +1618,7 @@ void ReadYourWritesTransaction::set( const KeyRef& key, const ValueRef& value ) KeyRef k = KeyRef( arena, key ); ValueRef v = ValueRef( arena, value ); + approximateSize += k.expectedSize() + v.expectedSize() + RYW_TRANSACTION_SIZE_OVERHEAD; writes.mutate(k, MutationRef::SetValue, v, addWriteConflict); RYWImpl::triggerWatches(this, key, value); @@ -1633,6 +1636,7 @@ void ReadYourWritesTransaction::clear( const KeyRangeRef& range ) { throw key_outside_legal_range(); if( options.readYourWritesDisabled ) { + approximateSize += range.expectedSize() + RYW_TRANSACTION_SIZE_OVERHEAD; return tr.clear(range, addWriteConflict); } @@ -1653,6 +1657,7 @@ void ReadYourWritesTransaction::clear( const KeyRangeRef& range ) { } r = KeyRangeRef( arena, r ); + approximateSize += r.expectedSize() + RYW_TRANSACTION_SIZE_OVERHEAD; writes.clear(r, addWriteConflict); RYWImpl::triggerWatches(this, r, Optional()); @@ -1676,6 +1681,7 @@ void ReadYourWritesTransaction::clear( const KeyRef& key ) { } KeyRangeRef r = singleKeyRange( key, arena ); + approximateSize += r.expectedSize() + RYW_TRANSACTION_SIZE_OVERHEAD; //SOMEDAY: add an optimized single key clear to write map writes.clear(r, addWriteConflict); @@ -1700,6 +1706,7 @@ Future ReadYourWritesTransaction::watch(const Key& key) { if (key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)) return key_too_large(); + approximateSize += key.expectedSize() + RYW_TRANSACTION_SIZE_OVERHEAD; return RYWImpl::watch(this, key); } @@ -1730,6 +1737,7 @@ void ReadYourWritesTransaction::addWriteConflictRange( KeyRangeRef const& keys ) return; } + approximateSize += r.expectedSize() + RYW_TRANSACTION_SIZE_OVERHEAD; if(options.readYourWritesDisabled) { tr.addWriteConflictRange(r); return; @@ -1855,6 +1863,7 @@ void ReadYourWritesTransaction::operator=(ReadYourWritesTransaction&& r) BOOST_N r.resetPromise = Promise(); deferredError = std::move( r.deferredError ); retries = r.retries; + approximateSize = r.approximateSize; timeoutActor = r.timeoutActor; creationTime = r.creationTime; commitStarted = r.commitStarted; @@ -1871,6 +1880,7 @@ ReadYourWritesTransaction::ReadYourWritesTransaction(ReadYourWritesTransaction&& arena( std::move(r.arena) ), reading( std::move(r.reading) ), retries( r.retries ), + approximateSize(0), creationTime( r.creationTime ), deferredError( std::move(r.deferredError) ), timeoutActor( std::move(r.timeoutActor) ), @@ -1922,6 +1932,7 @@ void ReadYourWritesTransaction::resetRyow() { readConflicts = CoalescedKeyRefRangeMap(); watchMap.clear(); reading = AndFuture(); + approximateSize = 0; commitStarted = false; deferredError = Error(); @@ -1942,6 +1953,7 @@ void ReadYourWritesTransaction::cancel() { void ReadYourWritesTransaction::reset() { retries = 0; + approximateSize = 0; creationTime = now(); timeoutActor.cancel(); persistentOptions.clear(); diff --git a/fdbclient/ReadYourWrites.h b/fdbclient/ReadYourWrites.h index 1dd04445ea..4df1a92389 100644 --- a/fdbclient/ReadYourWrites.h +++ b/fdbclient/ReadYourWrites.h @@ -27,6 +27,9 @@ #include "fdbclient/RYWIterator.h" #include +// Estimated number of overhead bytes per mutation. +#define RYW_TRANSACTION_SIZE_OVERHEAD 30 + //SOMEDAY: Optimize getKey to avoid using getRange struct ReadYourWritesTransactionOptions { @@ -98,7 +101,7 @@ public: Future commit(); Version getCommittedVersion() { return tr.getCommittedVersion(); } - uint32_t getApproximateSize() { return tr.getApproximateSize(); } + int64_t getApproximateSize() { return approximateSize; } Future> getVersionstamp(); void setOption( FDBTransactionOptions::Option option, Optional value = Optional() ); @@ -141,6 +144,7 @@ private: Promise resetPromise; AndFuture reading; int retries; + int64_t approximateSize; Future timeoutActor; double creationTime; bool commitStarted; diff --git a/fdbclient/ThreadSafeTransaction.actor.cpp b/fdbclient/ThreadSafeTransaction.actor.cpp index 35f69bc8b4..761b705798 100644 --- a/fdbclient/ThreadSafeTransaction.actor.cpp +++ b/fdbclient/ThreadSafeTransaction.actor.cpp @@ -274,8 +274,8 @@ Version ThreadSafeTransaction::getCommittedVersion() { return tr->getCommittedVersion(); } -uint32_t ThreadSafeTransaction::getApproximateSize() { - return tr->getApproximateSize(); +ThreadFuture ThreadSafeTransaction::getApproximateSize() { + return onMainThread([this]() -> Future { return tr->getApproximateSize(); }); } ThreadFuture> ThreadSafeTransaction::getVersionstamp() { diff --git a/fdbclient/ThreadSafeTransaction.h b/fdbclient/ThreadSafeTransaction.h index 2177b890e2..59188e5f9a 100644 --- a/fdbclient/ThreadSafeTransaction.h +++ b/fdbclient/ThreadSafeTransaction.h @@ -87,7 +87,7 @@ public: ThreadFuture commit() override; Version getCommittedVersion() override; - uint32_t getApproximateSize() override; + ThreadFuture getApproximateSize() override; void setOption( FDBTransactionOptions::Option option, Optional value = Optional() ) override;