Use ThreadFuture for getApproximateSize

Change return type to int64_t and fix C and Python binding to use the correct
type.
This commit is contained in:
Jingyu Zhou 2019-06-28 10:15:37 -07:00
parent c91e712576
commit 8ef8b59fcc
13 changed files with 62 additions and 32 deletions

View File

@ -60,16 +60,20 @@ if(NOT WIN32)
if(OPEN_FOR_IDE)
add_library(fdb_c_performance_test OBJECT test/performance_test.c test/test.h)
add_library(fdb_c_ryw_benchmark OBJECT test/ryw_benchmark.c test/test.h)
add_library(fdb_c_txn_size_test OBJECT test/txn_size_test.c test/test.h)
add_library(mako OBJECT ${MAKO_SRCS})
else()
add_executable(fdb_c_performance_test test/performance_test.c test/test.h)
add_executable(fdb_c_ryw_benchmark test/ryw_benchmark.c test/test.h)
add_executable(fdb_c_txn_size_test test/txn_size_test.c test/test.h)
add_executable(mako ${MAKO_SRCS})
strip_debug_symbols(fdb_c_performance_test)
strip_debug_symbols(fdb_c_ryw_benchmark)
strip_debug_symbols(fdb_c_txn_size_test)
endif()
target_link_libraries(fdb_c_performance_test PRIVATE fdb_c)
target_link_libraries(fdb_c_ryw_benchmark PRIVATE fdb_c)
target_link_libraries(fdb_c_txn_size_test PRIVATE fdb_c)
# do not set RPATH for mako
set_property(TARGET mako PROPERTY SKIP_BUILD_RPATH TRUE)
target_link_libraries(mako PRIVATE fdb_c)

View File

@ -585,8 +585,8 @@ fdb_error_t fdb_transaction_get_committed_version( FDBTransaction* tr,
}
extern "C" DLLEXPORT
fdb_error_t fdb_transaction_get_approximate_size(FDBTransaction* tr, uint32_t* out_size) {
CATCH_AND_RETURN(*out_size = TXN(tr)->getApproximateSize(););
FDBFuture* fdb_transaction_get_approximate_size(FDBTransaction* tr) {
return (FDBFuture*)TXN(tr)->getApproximateSize().extractPtr();
}
extern "C" DLLEXPORT

View File

@ -225,8 +225,8 @@ extern "C" {
fdb_transaction_get_committed_version( FDBTransaction* tr,
int64_t* out_version );
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t
fdb_transaction_get_approximate_size(FDBTransaction* tr, uint32_t* out_size);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture*
fdb_transaction_get_approximate_size(FDBTransaction* tr);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_versionstamp( FDBTransaction* tr );

View File

@ -542,9 +542,8 @@ class Transaction(TransactionRead):
return version.value
def get_approximate_size(self):
size = ctypes.c_int()
self.capi.fdb_transaction_get_approximate_size(self.tpointer, ctypes.byref(size))
return size.value
"""Get the approximate commit size of the transaction."""
return FutureVersion(self.capi.fdb_transaction_get_approximate_size(self.tpointer))
def get_versionstamp(self):
return Key(self.capi.fdb_transaction_get_versionstamp(self.tpointer))
@ -1458,9 +1457,8 @@ def init_c_api():
_capi.fdb_transaction_get_committed_version.restype = ctypes.c_int
_capi.fdb_transaction_get_committed_version.errcheck = check_error_code
_capi.fdb_transaction_get_approximate_size.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_int)]
_capi.fdb_transaction_get_approximate_size.restype = ctypes.c_int
_capi.fdb_transaction_get_approximate_size.errcheck = check_error_code
_capi.fdb_transaction_get_approximate_size.argtypes = [ctypes.c_void_p]
_capi.fdb_transaction_get_approximate_size.restype = ctypes.c_void_p
_capi.fdb_transaction_get_versionstamp.argtypes = [ctypes.c_void_p]
_capi.fdb_transaction_get_versionstamp.restype = ctypes.c_void_p

View File

@ -61,7 +61,7 @@ public:
virtual ThreadFuture<Void> commit() = 0;
virtual Version getCommittedVersion() = 0;
virtual uint32_t getApproximateSize() = 0;
virtual ThreadFuture<int64_t> getApproximateSize() = 0;
virtual void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value=Optional<StringRef>()) = 0;

View File

@ -195,10 +195,18 @@ Version DLTransaction::getCommittedVersion() {
return version;
}
uint32_t DLTransaction::getApproximateSize() {
int32_t size;
throwIfError(api->transactionGetApproximateSize(tr, &size));
return size;
ThreadFuture<int64_t> DLTransaction::getApproximateSize() {
if(!api->transactionGetApproximateSize) {
return unsupported_operation();
}
FdbCApi::FDBFuture *f = api->transactionGetApproximateSize(tr);
return toThreadFuture<int64_t>(api, f, [](FdbCApi::FDBFuture *f, FdbCApi *api) {
int64_t size;
FdbCApi::fdb_error_t error = api->futureGetVersion(f, &size);
ASSERT(!error);
return size;
});
}
void DLTransaction::setOption(FDBTransactionOptions::Option option, Optional<StringRef> value) {
@ -293,7 +301,8 @@ void DLApi::init() {
loadClientFunction(&api->transactionAtomicOp, lib, fdbCPath, "fdb_transaction_atomic_op");
loadClientFunction(&api->transactionCommit, lib, fdbCPath, "fdb_transaction_commit");
loadClientFunction(&api->transactionGetCommittedVersion, lib, fdbCPath, "fdb_transaction_get_committed_version");
loadClientFunction(&api->transactionGetApproximateSize, lib, fdbCPath, "fdb_transaction_get_approximate_size");
// TODO: change to 620 for 6.2 release
loadClientFunction(&api->transactionGetApproximateSize, lib, fdbCPath, "fdb_transaction_get_approximate_size", headerVersion >= 610);
loadClientFunction(&api->transactionWatch, lib, fdbCPath, "fdb_transaction_watch");
loadClientFunction(&api->transactionOnError, lib, fdbCPath, "fdb_transaction_on_error");
loadClientFunction(&api->transactionReset, lib, fdbCPath, "fdb_transaction_reset");
@ -602,12 +611,10 @@ Version MultiVersionTransaction::getCommittedVersion() {
return invalidVersion;
}
uint32_t MultiVersionTransaction::getApproximateSize() {
ThreadFuture<int64_t> MultiVersionTransaction::getApproximateSize() {
auto tr = getTransaction();
if (tr.transaction) {
return tr.transaction->getApproximateSize();
}
return 0;
auto f = tr.transaction ? tr.transaction->getApproximateSize() : ThreadFuture<int64_t>(Never());
return abortableFuture(f, tr.onChange);
}
void MultiVersionTransaction::setOption(FDBTransactionOptions::Option option, Optional<StringRef> value) {

View File

@ -84,7 +84,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
FDBFuture* (*transactionCommit)(FDBTransaction *tr);
fdb_error_t (*transactionGetCommittedVersion)(FDBTransaction *tr, int64_t *outVersion);
fdb_error_t (*transactionGetApproximateSize)(FDBTransaction *tr, int32_t *outSize);
FDBFuture* (*transactionGetApproximateSize)(FDBTransaction *tr);
FDBFuture* (*transactionWatch)(FDBTransaction *tr, uint8_t const *keyName, int keyNameLength);
FDBFuture* (*transactionOnError)(FDBTransaction *tr, fdb_error_t error);
void (*transactionReset)(FDBTransaction *tr);
@ -144,7 +144,7 @@ public:
ThreadFuture<Void> commit() override;
Version getCommittedVersion() override;
uint32_t getApproximateSize() override;
ThreadFuture<int64_t> getApproximateSize() override;
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value=Optional<StringRef>()) override;
@ -241,7 +241,7 @@ public:
ThreadFuture<Void> commit() override;
Version getCommittedVersion() override;
uint32_t getApproximateSize() override;
ThreadFuture<int64_t> getApproximateSize() override;
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value=Optional<StringRef>()) override;

View File

@ -2314,6 +2314,7 @@ void Transaction::addReadConflictRange( KeyRangeRef const& keys ) {
}
tr.transaction.read_conflict_ranges.push_back_deep( tr.arena, r );
printf("%x read_conflict_ranges size: %d, %d\n", this, tr.transaction.read_conflict_ranges.size(), tr.transaction.read_conflict_ranges.expectedSize());
}
void Transaction::makeSelfConflicting() {
@ -2337,6 +2338,7 @@ void Transaction::set( const KeyRef& key, const ValueRef& value, bool addConflic
auto r = singleKeyRange( key, req.arena );
auto v = ValueRef( req.arena, value );
t.mutations.push_back( req.arena, MutationRef( MutationRef::SetValue, r.begin, v ) );
printf("%x set mutation size: %d, %d\n", this, t.mutations.size(), t.mutations.expectedSize());
if( addConflictRange ) {
t.write_conflict_ranges.push_back( req.arena, r );
@ -2470,6 +2472,7 @@ void Transaction::addWriteConflictRange( const KeyRangeRef& keys ) {
}
t.write_conflict_ranges.push_back_deep( req.arena, r );
printf("%x write_conflict_ranges size: %d, %d\n", this, t.write_conflict_ranges.size(), t.write_conflict_ranges.expectedSize());
}
double Transaction::getBackoff(int errCode) {
@ -2824,7 +2827,7 @@ Future<Void> Transaction::commitMutations() {
cx->mutationsPerCommit.addSample(tr.transaction.mutations.size());
cx->bytesPerCommit.addSample(tr.transaction.mutations.expectedSize());
size_t transactionSize = tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() + tr.transaction.write_conflict_ranges.expectedSize();
size_t transactionSize = getSize();
if (transactionSize > (uint64_t)FLOW_KNOBS->PACKET_WARNING) {
TraceEvent(!g_network->isSimulated() ? SevWarnAlways : SevWarn, "LargeTransaction")
.suppressFor(1.0)
@ -3176,9 +3179,11 @@ Future<Standalone<StringRef>> Transaction::getVersionstamp() {
return versionstampPromise.getFuture();
}
uint32_t Transaction::getApproximateSize() {
return tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() +
uint32_t Transaction::getSize() {
auto s = tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() +
tr.transaction.write_conflict_ranges.expectedSize();
printf("%x approximate size: %d, mutation size: %d\n", this, s, tr.transaction.mutations.size());
return s;
}
Future<Void> Transaction::onError( Error const& e ) {

View File

@ -284,7 +284,7 @@ public:
Promise<Standalone<StringRef>> versionstampPromise;
uint32_t getApproximateSize();
uint32_t getSize();
Future<Void> onError( Error const& e );
void flushTrLogsIfEnabled();

View File

@ -1366,6 +1366,7 @@ void ReadYourWritesTransaction::addReadConflictRange( KeyRangeRef const& keys )
return;
}
approximateSize += r.expectedSize() + RYW_TRANSACTION_SIZE_OVERHEAD;
if(options.readYourWritesDisabled) {
tr.addReadConflictRange(r);
return;
@ -1605,6 +1606,7 @@ void ReadYourWritesTransaction::set( const KeyRef& key, const ValueRef& value )
throw key_outside_legal_range();
if(options.readYourWritesDisabled ) {
approximateSize += key.expectedSize() + value.expectedSize() + RYW_TRANSACTION_SIZE_OVERHEAD;
return tr.set(key, value, addWriteConflict);
}
@ -1616,6 +1618,7 @@ void ReadYourWritesTransaction::set( const KeyRef& key, const ValueRef& value )
KeyRef k = KeyRef( arena, key );
ValueRef v = ValueRef( arena, value );
approximateSize += k.expectedSize() + v.expectedSize() + RYW_TRANSACTION_SIZE_OVERHEAD;
writes.mutate(k, MutationRef::SetValue, v, addWriteConflict);
RYWImpl::triggerWatches(this, key, value);
@ -1633,6 +1636,7 @@ void ReadYourWritesTransaction::clear( const KeyRangeRef& range ) {
throw key_outside_legal_range();
if( options.readYourWritesDisabled ) {
approximateSize += range.expectedSize() + RYW_TRANSACTION_SIZE_OVERHEAD;
return tr.clear(range, addWriteConflict);
}
@ -1653,6 +1657,7 @@ void ReadYourWritesTransaction::clear( const KeyRangeRef& range ) {
}
r = KeyRangeRef( arena, r );
approximateSize += r.expectedSize() + RYW_TRANSACTION_SIZE_OVERHEAD;
writes.clear(r, addWriteConflict);
RYWImpl::triggerWatches(this, r, Optional<ValueRef>());
@ -1676,6 +1681,7 @@ void ReadYourWritesTransaction::clear( const KeyRef& key ) {
}
KeyRangeRef r = singleKeyRange( key, arena );
approximateSize += r.expectedSize() + RYW_TRANSACTION_SIZE_OVERHEAD;
//SOMEDAY: add an optimized single key clear to write map
writes.clear(r, addWriteConflict);
@ -1700,6 +1706,7 @@ Future<Void> ReadYourWritesTransaction::watch(const Key& key) {
if (key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT))
return key_too_large();
approximateSize += key.expectedSize() + RYW_TRANSACTION_SIZE_OVERHEAD;
return RYWImpl::watch(this, key);
}
@ -1730,6 +1737,7 @@ void ReadYourWritesTransaction::addWriteConflictRange( KeyRangeRef const& keys )
return;
}
approximateSize += r.expectedSize() + RYW_TRANSACTION_SIZE_OVERHEAD;
if(options.readYourWritesDisabled) {
tr.addWriteConflictRange(r);
return;
@ -1855,6 +1863,7 @@ void ReadYourWritesTransaction::operator=(ReadYourWritesTransaction&& r) BOOST_N
r.resetPromise = Promise<Void>();
deferredError = std::move( r.deferredError );
retries = r.retries;
approximateSize = r.approximateSize;
timeoutActor = r.timeoutActor;
creationTime = r.creationTime;
commitStarted = r.commitStarted;
@ -1871,6 +1880,7 @@ ReadYourWritesTransaction::ReadYourWritesTransaction(ReadYourWritesTransaction&&
arena( std::move(r.arena) ),
reading( std::move(r.reading) ),
retries( r.retries ),
approximateSize(0),
creationTime( r.creationTime ),
deferredError( std::move(r.deferredError) ),
timeoutActor( std::move(r.timeoutActor) ),
@ -1922,6 +1932,7 @@ void ReadYourWritesTransaction::resetRyow() {
readConflicts = CoalescedKeyRefRangeMap<bool>();
watchMap.clear();
reading = AndFuture();
approximateSize = 0;
commitStarted = false;
deferredError = Error();
@ -1942,6 +1953,7 @@ void ReadYourWritesTransaction::cancel() {
void ReadYourWritesTransaction::reset() {
retries = 0;
approximateSize = 0;
creationTime = now();
timeoutActor.cancel();
persistentOptions.clear();

View File

@ -27,6 +27,9 @@
#include "fdbclient/RYWIterator.h"
#include <list>
// Estimated number of overhead bytes per mutation.
#define RYW_TRANSACTION_SIZE_OVERHEAD 30
//SOMEDAY: Optimize getKey to avoid using getRange
struct ReadYourWritesTransactionOptions {
@ -98,7 +101,7 @@ public:
Future<Void> commit();
Version getCommittedVersion() { return tr.getCommittedVersion(); }
uint32_t getApproximateSize() { return tr.getApproximateSize(); }
int64_t getApproximateSize() { return approximateSize; }
Future<Standalone<StringRef>> getVersionstamp();
void setOption( FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>() );
@ -141,6 +144,7 @@ private:
Promise<Void> resetPromise;
AndFuture reading;
int retries;
int64_t approximateSize;
Future<Void> timeoutActor;
double creationTime;
bool commitStarted;

View File

@ -274,8 +274,8 @@ Version ThreadSafeTransaction::getCommittedVersion() {
return tr->getCommittedVersion();
}
uint32_t ThreadSafeTransaction::getApproximateSize() {
return tr->getApproximateSize();
ThreadFuture<int64_t> ThreadSafeTransaction::getApproximateSize() {
return onMainThread([this]() -> Future<int64_t> { return tr->getApproximateSize(); });
}
ThreadFuture<Standalone<StringRef>> ThreadSafeTransaction::getVersionstamp() {

View File

@ -87,7 +87,7 @@ public:
ThreadFuture<Void> commit() override;
Version getCommittedVersion() override;
uint32_t getApproximateSize() override;
ThreadFuture<int64_t> getApproximateSize() override;
void setOption( FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>() ) override;