From 8ef8b59fccd45858a7609dbefae02b4b66196baf Mon Sep 17 00:00:00 2001
From: Jingyu Zhou <jingyuzhou@gmail.com>
Date: Fri, 28 Jun 2019 10:15:37 -0700
Subject: [PATCH] Use ThreadFuture for getApproximateSize

Change return type to int64_t and fix C and Python binding to use the correct
type.
---
 bindings/c/CMakeLists.txt                   |  4 +++
 bindings/c/fdb_c.cpp                        |  4 +--
 bindings/c/foundationdb/fdb_c.h             |  4 +--
 bindings/python/fdb/impl.py                 | 10 +++-----
 fdbclient/IClientApi.h                      |  2 +-
 fdbclient/MultiVersionTransaction.actor.cpp | 27 +++++++++++++--------
 fdbclient/MultiVersionTransaction.h         |  6 ++---
 fdbclient/NativeAPI.actor.cpp               | 11 ++++++---
 fdbclient/NativeAPI.actor.h                 |  2 +-
 fdbclient/ReadYourWrites.actor.cpp          | 12 +++++++++
 fdbclient/ReadYourWrites.h                  |  6 ++++-
 fdbclient/ThreadSafeTransaction.actor.cpp   |  4 +--
 fdbclient/ThreadSafeTransaction.h           |  2 +-
 13 files changed, 62 insertions(+), 32 deletions(-)

diff --git a/bindings/c/CMakeLists.txt b/bindings/c/CMakeLists.txt
index a5897a08d4..b30a78bd84 100644
--- a/bindings/c/CMakeLists.txt
+++ b/bindings/c/CMakeLists.txt
@@ -60,16 +60,20 @@ if(NOT WIN32)
   if(OPEN_FOR_IDE)
     add_library(fdb_c_performance_test OBJECT test/performance_test.c test/test.h)
     add_library(fdb_c_ryw_benchmark OBJECT test/ryw_benchmark.c test/test.h)
+    add_library(fdb_c_txn_size_test OBJECT test/txn_size_test.c test/test.h)
     add_library(mako OBJECT ${MAKO_SRCS})
   else()
     add_executable(fdb_c_performance_test test/performance_test.c test/test.h)
     add_executable(fdb_c_ryw_benchmark test/ryw_benchmark.c test/test.h)
+    add_executable(fdb_c_txn_size_test test/txn_size_test.c test/test.h)
     add_executable(mako ${MAKO_SRCS})
     strip_debug_symbols(fdb_c_performance_test)
     strip_debug_symbols(fdb_c_ryw_benchmark)
+    strip_debug_symbols(fdb_c_txn_size_test)
   endif()
   target_link_libraries(fdb_c_performance_test PRIVATE fdb_c)
   target_link_libraries(fdb_c_ryw_benchmark PRIVATE fdb_c)
+  target_link_libraries(fdb_c_txn_size_test PRIVATE fdb_c)
   # do not set RPATH for mako
   set_property(TARGET mako PROPERTY SKIP_BUILD_RPATH TRUE)
   target_link_libraries(mako PRIVATE fdb_c)
diff --git a/bindings/c/fdb_c.cpp b/bindings/c/fdb_c.cpp
index e4f44e8ae4..467b2b0c11 100644
--- a/bindings/c/fdb_c.cpp
+++ b/bindings/c/fdb_c.cpp
@@ -585,8 +585,8 @@ fdb_error_t fdb_transaction_get_committed_version( FDBTransaction* tr,
 }
 
 extern "C" DLLEXPORT
-fdb_error_t fdb_transaction_get_approximate_size(FDBTransaction* tr, uint32_t* out_size) {
-	CATCH_AND_RETURN(*out_size = TXN(tr)->getApproximateSize(););
+FDBFuture* fdb_transaction_get_approximate_size(FDBTransaction* tr) {
+	return (FDBFuture*)TXN(tr)->getApproximateSize().extractPtr();
 }
 
 extern "C" DLLEXPORT
diff --git a/bindings/c/foundationdb/fdb_c.h b/bindings/c/foundationdb/fdb_c.h
index b302d1a6ff..d3e904ecd9 100644
--- a/bindings/c/foundationdb/fdb_c.h
+++ b/bindings/c/foundationdb/fdb_c.h
@@ -225,8 +225,8 @@ extern "C" {
     fdb_transaction_get_committed_version( FDBTransaction* tr,
                                            int64_t* out_version );
 
-	DLLEXPORT WARN_UNUSED_RESULT fdb_error_t
-	fdb_transaction_get_approximate_size(FDBTransaction* tr, uint32_t* out_size);
+	DLLEXPORT WARN_UNUSED_RESULT FDBFuture*
+	fdb_transaction_get_approximate_size(FDBTransaction* tr);
 
     DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_versionstamp( FDBTransaction* tr );
 
diff --git a/bindings/python/fdb/impl.py b/bindings/python/fdb/impl.py
index 6bfc8b8cce..e3f68e98c9 100644
--- a/bindings/python/fdb/impl.py
+++ b/bindings/python/fdb/impl.py
@@ -542,9 +542,8 @@ class Transaction(TransactionRead):
         return version.value
 
     def get_approximate_size(self):
-        size = ctypes.c_int()
-        self.capi.fdb_transaction_get_approximate_size(self.tpointer, ctypes.byref(size))
-        return size.value
+        """Get the approximate commit size of the transaction."""
+        return FutureVersion(self.capi.fdb_transaction_get_approximate_size(self.tpointer))
 
     def get_versionstamp(self):
         return Key(self.capi.fdb_transaction_get_versionstamp(self.tpointer))
@@ -1458,9 +1457,8 @@ def init_c_api():
     _capi.fdb_transaction_get_committed_version.restype = ctypes.c_int
     _capi.fdb_transaction_get_committed_version.errcheck = check_error_code
 
-    _capi.fdb_transaction_get_approximate_size.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_int)]
-    _capi.fdb_transaction_get_approximate_size.restype = ctypes.c_int
-    _capi.fdb_transaction_get_approximate_size.errcheck = check_error_code
+    _capi.fdb_transaction_get_approximate_size.argtypes = [ctypes.c_void_p]
+    _capi.fdb_transaction_get_approximate_size.restype = ctypes.c_void_p
 
     _capi.fdb_transaction_get_versionstamp.argtypes = [ctypes.c_void_p]
     _capi.fdb_transaction_get_versionstamp.restype = ctypes.c_void_p
diff --git a/fdbclient/IClientApi.h b/fdbclient/IClientApi.h
index db794fb6dd..b3e6217054 100644
--- a/fdbclient/IClientApi.h
+++ b/fdbclient/IClientApi.h
@@ -61,7 +61,7 @@ public:
 
 	virtual ThreadFuture<Void> commit() = 0;
 	virtual Version getCommittedVersion() = 0;
-	virtual uint32_t getApproximateSize() = 0;
+	virtual ThreadFuture<int64_t> getApproximateSize() = 0;
 
 	virtual void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value=Optional<StringRef>()) = 0;
 
diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp
index ee633d8747..ff26f73e14 100644
--- a/fdbclient/MultiVersionTransaction.actor.cpp
+++ b/fdbclient/MultiVersionTransaction.actor.cpp
@@ -195,10 +195,18 @@ Version DLTransaction::getCommittedVersion() {
 	return version;
 }
 
-uint32_t DLTransaction::getApproximateSize() {
-	int32_t size;
-	throwIfError(api->transactionGetApproximateSize(tr, &size));
-	return size;
+ThreadFuture<int64_t> DLTransaction::getApproximateSize() {
+	if(!api->transactionGetApproximateSize) {
+		return unsupported_operation();
+	}
+
+	FdbCApi::FDBFuture *f = api->transactionGetApproximateSize(tr);
+	return toThreadFuture<int64_t>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
+		int64_t size;
+		FdbCApi::fdb_error_t error = api->futureGetVersion(f, &size);
+		ASSERT(!error);
+		return size;
+	});
 }
 
 void DLTransaction::setOption(FDBTransactionOptions::Option option, Optional<StringRef> value) {
@@ -293,7 +301,8 @@ void DLApi::init() {
 	loadClientFunction(&api->transactionAtomicOp, lib, fdbCPath, "fdb_transaction_atomic_op");
 	loadClientFunction(&api->transactionCommit, lib, fdbCPath, "fdb_transaction_commit");
 	loadClientFunction(&api->transactionGetCommittedVersion, lib, fdbCPath, "fdb_transaction_get_committed_version");
-	loadClientFunction(&api->transactionGetApproximateSize, lib, fdbCPath, "fdb_transaction_get_approximate_size");
+	// TODO: change to 620 for 6.2 release
+	loadClientFunction(&api->transactionGetApproximateSize, lib, fdbCPath, "fdb_transaction_get_approximate_size", headerVersion >= 610);
 	loadClientFunction(&api->transactionWatch, lib, fdbCPath, "fdb_transaction_watch");
 	loadClientFunction(&api->transactionOnError, lib, fdbCPath, "fdb_transaction_on_error");
 	loadClientFunction(&api->transactionReset, lib, fdbCPath, "fdb_transaction_reset");
@@ -602,12 +611,10 @@ Version MultiVersionTransaction::getCommittedVersion() {
 	return invalidVersion;
 }
 
-uint32_t MultiVersionTransaction::getApproximateSize() {
+ThreadFuture<int64_t> MultiVersionTransaction::getApproximateSize() {
 	auto tr = getTransaction();
-	if (tr.transaction) {
-		return tr.transaction->getApproximateSize();
-	}
-	return 0;
+	auto f = tr.transaction ? tr.transaction->getApproximateSize() : ThreadFuture<int64_t>(Never());
+	return abortableFuture(f, tr.onChange);
 }
 
 void MultiVersionTransaction::setOption(FDBTransactionOptions::Option option, Optional<StringRef> value) {
diff --git a/fdbclient/MultiVersionTransaction.h b/fdbclient/MultiVersionTransaction.h
index 86b4f72fb6..be52927022 100644
--- a/fdbclient/MultiVersionTransaction.h
+++ b/fdbclient/MultiVersionTransaction.h
@@ -84,7 +84,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
 	
 	FDBFuture* (*transactionCommit)(FDBTransaction *tr);
 	fdb_error_t (*transactionGetCommittedVersion)(FDBTransaction *tr, int64_t *outVersion);
-	fdb_error_t (*transactionGetApproximateSize)(FDBTransaction *tr, int32_t *outSize);
+	FDBFuture* (*transactionGetApproximateSize)(FDBTransaction *tr);
 	FDBFuture* (*transactionWatch)(FDBTransaction *tr, uint8_t const *keyName, int keyNameLength);
 	FDBFuture* (*transactionOnError)(FDBTransaction *tr, fdb_error_t error);
 	void (*transactionReset)(FDBTransaction *tr);
@@ -144,7 +144,7 @@ public:
 
 	ThreadFuture<Void> commit() override;
 	Version getCommittedVersion() override;
-	uint32_t getApproximateSize() override;
+	ThreadFuture<int64_t> getApproximateSize() override;
 
 	void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value=Optional<StringRef>()) override;
 
@@ -241,7 +241,7 @@ public:
 
 	ThreadFuture<Void> commit() override;
 	Version getCommittedVersion() override;
-	uint32_t getApproximateSize() override;
+	ThreadFuture<int64_t> getApproximateSize() override;
 
 	void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value=Optional<StringRef>()) override;
 
diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp
index 19931005f8..2160804393 100644
--- a/fdbclient/NativeAPI.actor.cpp
+++ b/fdbclient/NativeAPI.actor.cpp
@@ -2314,6 +2314,7 @@ void Transaction::addReadConflictRange( KeyRangeRef const& keys ) {
 	}
 
 	tr.transaction.read_conflict_ranges.push_back_deep( tr.arena, r );
+	printf("%x read_conflict_ranges size: %d, %d\n", this, tr.transaction.read_conflict_ranges.size(), tr.transaction.read_conflict_ranges.expectedSize());
 }
 
 void Transaction::makeSelfConflicting() {
@@ -2337,6 +2338,7 @@ void Transaction::set( const KeyRef& key, const ValueRef& value, bool addConflic
 	auto r = singleKeyRange( key, req.arena );
 	auto v = ValueRef( req.arena, value );
 	t.mutations.push_back( req.arena, MutationRef( MutationRef::SetValue, r.begin, v ) );
+	printf("%x set mutation size: %d, %d\n", this, t.mutations.size(), t.mutations.expectedSize());
 
 	if( addConflictRange ) {
 		t.write_conflict_ranges.push_back( req.arena, r );
@@ -2470,6 +2472,7 @@ void Transaction::addWriteConflictRange( const KeyRangeRef& keys ) {
 	}
 
 	t.write_conflict_ranges.push_back_deep( req.arena, r );
+	printf("%x write_conflict_ranges size: %d, %d\n", this, t.write_conflict_ranges.size(), t.write_conflict_ranges.expectedSize());
 }
 
 double Transaction::getBackoff(int errCode) {
@@ -2824,7 +2827,7 @@ Future<Void> Transaction::commitMutations() {
 		cx->mutationsPerCommit.addSample(tr.transaction.mutations.size());
 		cx->bytesPerCommit.addSample(tr.transaction.mutations.expectedSize());
 
-		size_t transactionSize = tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() + tr.transaction.write_conflict_ranges.expectedSize();
+		size_t transactionSize = getSize();
 		if (transactionSize > (uint64_t)FLOW_KNOBS->PACKET_WARNING) {
 			TraceEvent(!g_network->isSimulated() ? SevWarnAlways : SevWarn, "LargeTransaction")
 				.suppressFor(1.0)
@@ -3176,9 +3179,11 @@ Future<Standalone<StringRef>> Transaction::getVersionstamp() {
 	return versionstampPromise.getFuture();
 }
 
-uint32_t Transaction::getApproximateSize() {
-	return tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() +
+uint32_t Transaction::getSize() {
+	auto s = tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() +
 	       tr.transaction.write_conflict_ranges.expectedSize();
+	printf("%x approximate size: %d, mutation size: %d\n", this, s, tr.transaction.mutations.size());
+	return s;
 }
 
 Future<Void> Transaction::onError( Error const& e ) {
diff --git a/fdbclient/NativeAPI.actor.h b/fdbclient/NativeAPI.actor.h
index 3f07fd1e04..28d19b8807 100644
--- a/fdbclient/NativeAPI.actor.h
+++ b/fdbclient/NativeAPI.actor.h
@@ -284,7 +284,7 @@ public:
 
 	Promise<Standalone<StringRef>> versionstampPromise;
 
-	uint32_t getApproximateSize();
+	uint32_t getSize();
 	Future<Void> onError( Error const& e );
 	void flushTrLogsIfEnabled();
 
diff --git a/fdbclient/ReadYourWrites.actor.cpp b/fdbclient/ReadYourWrites.actor.cpp
index 013fd61596..ad5bc42071 100644
--- a/fdbclient/ReadYourWrites.actor.cpp
+++ b/fdbclient/ReadYourWrites.actor.cpp
@@ -1366,6 +1366,7 @@ void ReadYourWritesTransaction::addReadConflictRange( KeyRangeRef const& keys )
 		return;
 	}
 
+	approximateSize += r.expectedSize() + RYW_TRANSACTION_SIZE_OVERHEAD;
 	if(options.readYourWritesDisabled) {
 		tr.addReadConflictRange(r);
 		return;
@@ -1605,6 +1606,7 @@ void ReadYourWritesTransaction::set( const KeyRef& key, const ValueRef& value )
 		throw key_outside_legal_range();
 
 	if(options.readYourWritesDisabled ) {
+		approximateSize += key.expectedSize() + value.expectedSize() + RYW_TRANSACTION_SIZE_OVERHEAD;
 		return tr.set(key, value, addWriteConflict);
 	}
 
@@ -1616,6 +1618,7 @@ void ReadYourWritesTransaction::set( const KeyRef& key, const ValueRef& value )
 	
 	KeyRef k = KeyRef( arena, key );
 	ValueRef v = ValueRef( arena, value );
+	approximateSize += k.expectedSize() + v.expectedSize() + RYW_TRANSACTION_SIZE_OVERHEAD;
 
 	writes.mutate(k, MutationRef::SetValue, v, addWriteConflict);
 	RYWImpl::triggerWatches(this, key, value);
@@ -1633,6 +1636,7 @@ void ReadYourWritesTransaction::clear( const KeyRangeRef& range ) {
 		throw key_outside_legal_range();
 
 	if( options.readYourWritesDisabled ) {
+		approximateSize += range.expectedSize() + RYW_TRANSACTION_SIZE_OVERHEAD;
 		return tr.clear(range, addWriteConflict);
 	}
 	
@@ -1653,6 +1657,7 @@ void ReadYourWritesTransaction::clear( const KeyRangeRef& range ) {
 	}
 
 	r = KeyRangeRef( arena, r );
+	approximateSize += r.expectedSize() + RYW_TRANSACTION_SIZE_OVERHEAD;
 
 	writes.clear(r, addWriteConflict);
 	RYWImpl::triggerWatches(this, r, Optional<ValueRef>());
@@ -1676,6 +1681,7 @@ void ReadYourWritesTransaction::clear( const KeyRef& key ) {
 	}
 	
 	KeyRangeRef r = singleKeyRange( key, arena );
+	approximateSize += r.expectedSize() + RYW_TRANSACTION_SIZE_OVERHEAD;
 
 	//SOMEDAY: add an optimized single key clear to write map
 	writes.clear(r, addWriteConflict);
@@ -1700,6 +1706,7 @@ Future<Void> ReadYourWritesTransaction::watch(const Key& key) {
 	if (key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
 		return key_too_large();
 
+	approximateSize += key.expectedSize() + RYW_TRANSACTION_SIZE_OVERHEAD;
 	return RYWImpl::watch(this, key);
 }
 
@@ -1730,6 +1737,7 @@ void ReadYourWritesTransaction::addWriteConflictRange( KeyRangeRef const& keys )
 		return;
 	}
 
+	approximateSize += r.expectedSize() + RYW_TRANSACTION_SIZE_OVERHEAD;
 	if(options.readYourWritesDisabled) {
 		tr.addWriteConflictRange(r);
 		return;
@@ -1855,6 +1863,7 @@ void ReadYourWritesTransaction::operator=(ReadYourWritesTransaction&& r) BOOST_N
 	r.resetPromise = Promise<Void>();
 	deferredError = std::move( r.deferredError );
 	retries = r.retries;
+	approximateSize = r.approximateSize;
 	timeoutActor = r.timeoutActor;
 	creationTime = r.creationTime;
 	commitStarted = r.commitStarted;
@@ -1871,6 +1880,7 @@ ReadYourWritesTransaction::ReadYourWritesTransaction(ReadYourWritesTransaction&&
 	arena( std::move(r.arena) ), 
 	reading( std::move(r.reading) ),
 	retries( r.retries ), 
+	approximateSize(0),
 	creationTime( r.creationTime ), 
 	deferredError( std::move(r.deferredError) ), 
 	timeoutActor( std::move(r.timeoutActor) ),
@@ -1922,6 +1932,7 @@ void ReadYourWritesTransaction::resetRyow() {
 	readConflicts = CoalescedKeyRefRangeMap<bool>();
 	watchMap.clear();
 	reading = AndFuture();
+	approximateSize = 0;
 	commitStarted = false;
 
 	deferredError = Error();
@@ -1942,6 +1953,7 @@ void ReadYourWritesTransaction::cancel() {
 
 void ReadYourWritesTransaction::reset() {
 	retries = 0;
+	approximateSize = 0;
 	creationTime = now();
 	timeoutActor.cancel();
 	persistentOptions.clear();
diff --git a/fdbclient/ReadYourWrites.h b/fdbclient/ReadYourWrites.h
index 1dd04445ea..4df1a92389 100644
--- a/fdbclient/ReadYourWrites.h
+++ b/fdbclient/ReadYourWrites.h
@@ -27,6 +27,9 @@
 #include "fdbclient/RYWIterator.h"
 #include <list>
 
+// Estimated number of overhead bytes per mutation.
+#define RYW_TRANSACTION_SIZE_OVERHEAD  30
+
 //SOMEDAY: Optimize getKey to avoid using getRange
 
 struct ReadYourWritesTransactionOptions {
@@ -98,7 +101,7 @@ public:
 
 	Future<Void> commit();
 	Version getCommittedVersion() { return tr.getCommittedVersion(); }
-	uint32_t getApproximateSize() { return tr.getApproximateSize(); }
+	int64_t getApproximateSize() { return approximateSize; }
 	Future<Standalone<StringRef>> getVersionstamp();
 
 	void setOption( FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>() );
@@ -141,6 +144,7 @@ private:
 	Promise<Void> resetPromise;
 	AndFuture reading;
 	int retries;
+	int64_t approximateSize;
 	Future<Void> timeoutActor;
 	double creationTime;
 	bool commitStarted;
diff --git a/fdbclient/ThreadSafeTransaction.actor.cpp b/fdbclient/ThreadSafeTransaction.actor.cpp
index 35f69bc8b4..761b705798 100644
--- a/fdbclient/ThreadSafeTransaction.actor.cpp
+++ b/fdbclient/ThreadSafeTransaction.actor.cpp
@@ -274,8 +274,8 @@ Version ThreadSafeTransaction::getCommittedVersion() {
 	return tr->getCommittedVersion();
 }
 
-uint32_t ThreadSafeTransaction::getApproximateSize() {
-	return tr->getApproximateSize();
+ThreadFuture<int64_t> ThreadSafeTransaction::getApproximateSize() {
+	return onMainThread([this]() -> Future<int64_t> { return tr->getApproximateSize(); });
 }
 
 ThreadFuture<Standalone<StringRef>> ThreadSafeTransaction::getVersionstamp() {
diff --git a/fdbclient/ThreadSafeTransaction.h b/fdbclient/ThreadSafeTransaction.h
index 2177b890e2..59188e5f9a 100644
--- a/fdbclient/ThreadSafeTransaction.h
+++ b/fdbclient/ThreadSafeTransaction.h
@@ -87,7 +87,7 @@ public:
 
 	ThreadFuture<Void> commit() override;
 	Version getCommittedVersion() override;
-	uint32_t getApproximateSize() override;
+	ThreadFuture<int64_t> getApproximateSize() override;
 
 	void setOption( FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>() ) override;