mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-15 02:18:39 +08:00
Add transaction getApproximateSize() API
The size is the summation of expected size of mutations, read conflict ranges, and write conflict ranges.
This commit is contained in:
parent
7e919e361c
commit
c50a675bf0
@ -584,6 +584,11 @@ fdb_error_t fdb_transaction_get_committed_version( FDBTransaction* tr,
|
||||
*out_version = TXN(tr)->getCommittedVersion(); );
|
||||
}
|
||||
|
||||
extern "C" DLLEXPORT
|
||||
fdb_error_t fdb_transaction_get_approximate_size(FDBTransaction* tr, uint32_t* out_size) {
|
||||
CATCH_AND_RETURN(*out_size = TXN(tr)->getApproximateSize(););
|
||||
}
|
||||
|
||||
extern "C" DLLEXPORT
|
||||
FDBFuture* fdb_transaction_get_versionstamp( FDBTransaction* tr )
|
||||
{
|
||||
|
@ -225,6 +225,9 @@ extern "C" {
|
||||
fdb_transaction_get_committed_version( FDBTransaction* tr,
|
||||
int64_t* out_version );
|
||||
|
||||
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t
|
||||
fdb_transaction_get_approximate_size(FDBTransaction* tr, uint32_t* out_size);
|
||||
|
||||
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_versionstamp( FDBTransaction* tr );
|
||||
|
||||
DLLEXPORT WARN_UNUSED_RESULT FDBFuture*
|
||||
|
@ -20,12 +20,13 @@
|
||||
|
||||
#include "fdb_flow.h"
|
||||
|
||||
#include "flow/DeterministicRandom.h"
|
||||
#include "flow/SystemMonitor.h"
|
||||
|
||||
#include <stdio.h>
|
||||
#include <cinttypes>
|
||||
|
||||
#include "flow/DeterministicRandom.h"
|
||||
#include "flow/SystemMonitor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
using namespace FDB;
|
||||
|
||||
THREAD_FUNC networkThread(void* fdb) {
|
||||
@ -147,6 +148,7 @@ namespace FDB {
|
||||
|
||||
void setOption(FDBTransactionOption option, Optional<StringRef> value = Optional<StringRef>()) override;
|
||||
|
||||
uint32_t getApproximateSize() override;
|
||||
Future<Void> onError(Error const& e) override;
|
||||
|
||||
void cancel() override;
|
||||
@ -408,6 +410,12 @@ namespace FDB {
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t TransactionImpl::getApproximateSize() {
|
||||
uint32_t size;
|
||||
throw_on_error(fdb_transaction_get_approximate_size(tr, &size));
|
||||
return size;
|
||||
}
|
||||
|
||||
Future<Void> TransactionImpl::onError(Error const& e) {
|
||||
return backToFuture< Void >( fdb_transaction_on_error( tr, e.code() ), [](Reference<CFuture> f) {
|
||||
throw_on_error( fdb_future_get_error( f->f ) );
|
||||
@ -422,4 +430,5 @@ namespace FDB {
|
||||
void TransactionImpl::reset() {
|
||||
fdb_transaction_reset( tr );
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace FDB
|
||||
|
@ -112,6 +112,7 @@ namespace FDB {
|
||||
|
||||
virtual Future<Void> commit() = 0;
|
||||
virtual Version getCommittedVersion() = 0;
|
||||
virtual uint32_t getApproximateSize() = 0;
|
||||
virtual Future<FDBStandalone<StringRef>> getVersionstamp() = 0;
|
||||
};
|
||||
|
||||
|
@ -18,16 +18,17 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbrpc/fdbrpc.h"
|
||||
#include "flow/DeterministicRandom.h"
|
||||
#include "bindings/flow/Tuple.h"
|
||||
#include "bindings/flow/FDBLoanerTypes.h"
|
||||
|
||||
#include "Tester.actor.h"
|
||||
#ifdef __linux__
|
||||
#include <string.h>
|
||||
#endif
|
||||
|
||||
#include "bindings/flow/Tuple.h"
|
||||
#include "bindings/flow/FDBLoanerTypes.h"
|
||||
#include "fdbrpc/fdbrpc.h"
|
||||
#include "flow/DeterministicRandom.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
// Otherwise we have to type setupNetwork(), FDB::open(), etc.
|
||||
using namespace FDB;
|
||||
|
||||
|
@ -805,6 +805,21 @@ JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1
|
||||
return (jlong)version;
|
||||
}
|
||||
|
||||
JNIEXPORT uint32_t JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1getApproximateSize(JNIEnv *jenv, jobject, jlong tPtr) {
|
||||
if( !tPtr ) {
|
||||
throwParamNotNull(jenv);
|
||||
return 0;
|
||||
}
|
||||
FDBTransaction *tr = (FDBTransaction *)tPtr;
|
||||
uint32_t size;
|
||||
fdb_error_t err = fdb_transaction_get_approximate_size(tr, &size);
|
||||
if (err) {
|
||||
safeThrow(jenv, getThrowable(jenv, err));
|
||||
return 0;
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1getVersionstamp(JNIEnv *jenv, jobject, jlong tPtr) {
|
||||
if (!tPtr) {
|
||||
throwParamNotNull(jenv);
|
||||
|
@ -61,6 +61,7 @@ public:
|
||||
|
||||
virtual ThreadFuture<Void> commit() = 0;
|
||||
virtual Version getCommittedVersion() = 0;
|
||||
virtual uint32_t getApproximateSize() = 0;
|
||||
|
||||
virtual void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value=Optional<StringRef>()) = 0;
|
||||
|
||||
|
@ -195,6 +195,12 @@ Version DLTransaction::getCommittedVersion() {
|
||||
return version;
|
||||
}
|
||||
|
||||
uint32_t DLTransaction::getApproximateSize() {
|
||||
int32_t size;
|
||||
throwIfError(api->transactionGetApproximateSize(tr, &size));
|
||||
return size;
|
||||
}
|
||||
|
||||
void DLTransaction::setOption(FDBTransactionOptions::Option option, Optional<StringRef> value) {
|
||||
throwIfError(api->transactionSetOption(tr, option, value.present() ? value.get().begin() : NULL, value.present() ? value.get().size() : 0));
|
||||
}
|
||||
@ -287,6 +293,7 @@ void DLApi::init() {
|
||||
loadClientFunction(&api->transactionAtomicOp, lib, fdbCPath, "fdb_transaction_atomic_op");
|
||||
loadClientFunction(&api->transactionCommit, lib, fdbCPath, "fdb_transaction_commit");
|
||||
loadClientFunction(&api->transactionGetCommittedVersion, lib, fdbCPath, "fdb_transaction_get_committed_version");
|
||||
loadClientFunction(&api->transactionGetApproximateSize, lib, fdbCPath, "fdb_transaction_get_approximate_size");
|
||||
loadClientFunction(&api->transactionWatch, lib, fdbCPath, "fdb_transaction_watch");
|
||||
loadClientFunction(&api->transactionOnError, lib, fdbCPath, "fdb_transaction_on_error");
|
||||
loadClientFunction(&api->transactionReset, lib, fdbCPath, "fdb_transaction_reset");
|
||||
@ -595,6 +602,14 @@ Version MultiVersionTransaction::getCommittedVersion() {
|
||||
return invalidVersion;
|
||||
}
|
||||
|
||||
uint32_t MultiVersionTransaction::getApproximateSize() {
|
||||
auto tr = getTransaction();
|
||||
if (tr.transaction) {
|
||||
return tr.transaction->getApproximateSize();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void MultiVersionTransaction::setOption(FDBTransactionOptions::Option option, Optional<StringRef> value) {
|
||||
if(MultiVersionApi::apiVersionAtLeast(610) && FDBTransactionOptions::optionInfo[option].persistent) {
|
||||
persistentOptions.emplace_back(option, value.castTo<Standalone<StringRef>>());
|
||||
|
@ -84,6 +84,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
|
||||
|
||||
FDBFuture* (*transactionCommit)(FDBTransaction *tr);
|
||||
fdb_error_t (*transactionGetCommittedVersion)(FDBTransaction *tr, int64_t *outVersion);
|
||||
fdb_error_t (*transactionGetApproximateSize)(FDBTransaction *tr, int32_t *outSize);
|
||||
FDBFuture* (*transactionWatch)(FDBTransaction *tr, uint8_t const *keyName, int keyNameLength);
|
||||
FDBFuture* (*transactionOnError)(FDBTransaction *tr, fdb_error_t error);
|
||||
void (*transactionReset)(FDBTransaction *tr);
|
||||
@ -116,41 +117,42 @@ public:
|
||||
DLTransaction(Reference<FdbCApi> api, FdbCApi::FDBTransaction *tr) : api(api), tr(tr) {}
|
||||
~DLTransaction() { api->transactionDestroy(tr); }
|
||||
|
||||
void cancel();
|
||||
void setVersion(Version v);
|
||||
ThreadFuture<Version> getReadVersion();
|
||||
void cancel() override;
|
||||
void setVersion(Version v) override;
|
||||
ThreadFuture<Version> getReadVersion() override;
|
||||
|
||||
ThreadFuture<Optional<Value>> get(const KeyRef& key, bool snapshot=false);
|
||||
ThreadFuture<Key> getKey(const KeySelectorRef& key, bool snapshot=false);
|
||||
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot=false, bool reverse=false);
|
||||
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot=false, bool reverse=false);
|
||||
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeyRangeRef& keys, int limit, bool snapshot=false, bool reverse=false);
|
||||
ThreadFuture<Standalone<RangeResultRef>> getRange( const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot=false, bool reverse=false);
|
||||
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key);
|
||||
ThreadFuture<Standalone<StringRef>> getVersionstamp();
|
||||
ThreadFuture<Optional<Value>> get(const KeyRef& key, bool snapshot=false) override;
|
||||
ThreadFuture<Key> getKey(const KeySelectorRef& key, bool snapshot=false) override;
|
||||
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot=false, bool reverse=false) override;
|
||||
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot=false, bool reverse=false) override;
|
||||
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeyRangeRef& keys, int limit, bool snapshot=false, bool reverse=false) override;
|
||||
ThreadFuture<Standalone<RangeResultRef>> getRange( const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot=false, bool reverse=false) override;
|
||||
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key) override;
|
||||
ThreadFuture<Standalone<StringRef>> getVersionstamp() override;
|
||||
|
||||
void addReadConflictRange(const KeyRangeRef& keys);
|
||||
void addReadConflictRange(const KeyRangeRef& keys) override;
|
||||
|
||||
void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType);
|
||||
void set(const KeyRef& key, const ValueRef& value);
|
||||
void clear(const KeyRef& begin, const KeyRef& end);
|
||||
void clear(const KeyRangeRef& range);
|
||||
void clear(const KeyRef& key);
|
||||
void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) override;
|
||||
void set(const KeyRef& key, const ValueRef& value) override;
|
||||
void clear(const KeyRef& begin, const KeyRef& end) override;
|
||||
void clear(const KeyRangeRef& range) override;
|
||||
void clear(const KeyRef& key) override;
|
||||
|
||||
ThreadFuture<Void> watch(const KeyRef& key);
|
||||
ThreadFuture<Void> watch(const KeyRef& key) override;
|
||||
|
||||
void addWriteConflictRange(const KeyRangeRef& keys);
|
||||
void addWriteConflictRange(const KeyRangeRef& keys) override;
|
||||
|
||||
ThreadFuture<Void> commit();
|
||||
Version getCommittedVersion();
|
||||
ThreadFuture<Void> commit() override;
|
||||
Version getCommittedVersion() override;
|
||||
uint32_t getApproximateSize() override;
|
||||
|
||||
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value=Optional<StringRef>());
|
||||
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value=Optional<StringRef>()) override;
|
||||
|
||||
ThreadFuture<Void> onError(Error const& e);
|
||||
void reset();
|
||||
ThreadFuture<Void> onError(Error const& e) override;
|
||||
void reset() override;
|
||||
|
||||
void addref() { ThreadSafeReferenceCounted<DLTransaction>::addref(); }
|
||||
void delref() { ThreadSafeReferenceCounted<DLTransaction>::delref(); }
|
||||
void addref() override { ThreadSafeReferenceCounted<DLTransaction>::addref(); }
|
||||
void delref() override { ThreadSafeReferenceCounted<DLTransaction>::delref(); }
|
||||
|
||||
private:
|
||||
const Reference<FdbCApi> api;
|
||||
@ -165,11 +167,11 @@ public:
|
||||
|
||||
ThreadFuture<Void> onReady();
|
||||
|
||||
Reference<ITransaction> createTransaction();
|
||||
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>());
|
||||
Reference<ITransaction> createTransaction() override;
|
||||
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
|
||||
|
||||
void addref() { ThreadSafeReferenceCounted<DLDatabase>::addref(); }
|
||||
void delref() { ThreadSafeReferenceCounted<DLDatabase>::delref(); }
|
||||
void addref() override { ThreadSafeReferenceCounted<DLDatabase>::addref(); }
|
||||
void delref() override { ThreadSafeReferenceCounted<DLDatabase>::delref(); }
|
||||
|
||||
private:
|
||||
const Reference<FdbCApi> api;
|
||||
@ -181,18 +183,18 @@ class DLApi : public IClientApi {
|
||||
public:
|
||||
DLApi(std::string fdbCPath);
|
||||
|
||||
void selectApiVersion(int apiVersion);
|
||||
const char* getClientVersion();
|
||||
void selectApiVersion(int apiVersion) override;
|
||||
const char* getClientVersion() override;
|
||||
|
||||
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>());
|
||||
void setupNetwork();
|
||||
void runNetwork();
|
||||
void stopNetwork();
|
||||
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
|
||||
void setupNetwork() override;
|
||||
void runNetwork() override;
|
||||
void stopNetwork() override;
|
||||
|
||||
Reference<IDatabase> createDatabase(const char *clusterFilePath);
|
||||
Reference<IDatabase> createDatabase(const char *clusterFilePath) override;
|
||||
Reference<IDatabase> createDatabase609(const char *clusterFilePath); // legacy database creation
|
||||
|
||||
void addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter);
|
||||
void addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter) override;
|
||||
|
||||
private:
|
||||
const std::string fdbCPath;
|
||||
@ -212,41 +214,42 @@ class MultiVersionTransaction : public ITransaction, ThreadSafeReferenceCounted<
|
||||
public:
|
||||
MultiVersionTransaction(Reference<MultiVersionDatabase> db, UniqueOrderedOptionList<FDBTransactionOptions> defaultOptions);
|
||||
|
||||
void cancel();
|
||||
void setVersion(Version v);
|
||||
ThreadFuture<Version> getReadVersion();
|
||||
void cancel() override;
|
||||
void setVersion(Version v) override;
|
||||
ThreadFuture<Version> getReadVersion() override;
|
||||
|
||||
ThreadFuture<Optional<Value>> get(const KeyRef& key, bool snapshot=false);
|
||||
ThreadFuture<Key> getKey(const KeySelectorRef& key, bool snapshot=false);
|
||||
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot=false, bool reverse=false);
|
||||
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot=false, bool reverse=false);
|
||||
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeyRangeRef& keys, int limit, bool snapshot=false, bool reverse=false);
|
||||
ThreadFuture<Standalone<RangeResultRef>> getRange( const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot=false, bool reverse=false);
|
||||
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key);
|
||||
ThreadFuture<Standalone<StringRef>> getVersionstamp();
|
||||
ThreadFuture<Optional<Value>> get(const KeyRef& key, bool snapshot=false) override;
|
||||
ThreadFuture<Key> getKey(const KeySelectorRef& key, bool snapshot=false) override;
|
||||
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot=false, bool reverse=false) override;
|
||||
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot=false, bool reverse=false) override;
|
||||
ThreadFuture<Standalone<RangeResultRef>> getRange(const KeyRangeRef& keys, int limit, bool snapshot=false, bool reverse=false) override;
|
||||
ThreadFuture<Standalone<RangeResultRef>> getRange( const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot=false, bool reverse=false) override;
|
||||
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key) override;
|
||||
ThreadFuture<Standalone<StringRef>> getVersionstamp() override;
|
||||
|
||||
void addReadConflictRange(const KeyRangeRef& keys);
|
||||
void addReadConflictRange(const KeyRangeRef& keys) override;
|
||||
|
||||
void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType);
|
||||
void set(const KeyRef& key, const ValueRef& value);
|
||||
void clear(const KeyRef& begin, const KeyRef& end);
|
||||
void clear(const KeyRangeRef& range);
|
||||
void clear(const KeyRef& key);
|
||||
void atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) override;
|
||||
void set(const KeyRef& key, const ValueRef& value) override;
|
||||
void clear(const KeyRef& begin, const KeyRef& end) override;
|
||||
void clear(const KeyRangeRef& range) override;
|
||||
void clear(const KeyRef& key) override;
|
||||
|
||||
ThreadFuture<Void> watch(const KeyRef& key);
|
||||
ThreadFuture<Void> watch(const KeyRef& key) override;
|
||||
|
||||
void addWriteConflictRange(const KeyRangeRef& keys);
|
||||
void addWriteConflictRange(const KeyRangeRef& keys) override;
|
||||
|
||||
ThreadFuture<Void> commit();
|
||||
Version getCommittedVersion();
|
||||
ThreadFuture<Void> commit() override;
|
||||
Version getCommittedVersion() override;
|
||||
uint32_t getApproximateSize() override;
|
||||
|
||||
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value=Optional<StringRef>());
|
||||
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value=Optional<StringRef>()) override;
|
||||
|
||||
ThreadFuture<Void> onError(Error const& e);
|
||||
void reset();
|
||||
ThreadFuture<Void> onError(Error const& e) override;
|
||||
void reset() override;
|
||||
|
||||
void addref() { ThreadSafeReferenceCounted<MultiVersionTransaction>::addref(); }
|
||||
void delref() { ThreadSafeReferenceCounted<MultiVersionTransaction>::delref(); }
|
||||
void addref() override { ThreadSafeReferenceCounted<MultiVersionTransaction>::addref(); }
|
||||
void delref() override { ThreadSafeReferenceCounted<MultiVersionTransaction>::delref(); }
|
||||
|
||||
private:
|
||||
const Reference<MultiVersionDatabase> db;
|
||||
@ -289,11 +292,11 @@ public:
|
||||
MultiVersionDatabase(MultiVersionApi *api, std::string clusterFilePath, Reference<IDatabase> db, bool openConnectors=true);
|
||||
~MultiVersionDatabase();
|
||||
|
||||
Reference<ITransaction> createTransaction();
|
||||
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>());
|
||||
Reference<ITransaction> createTransaction() override;
|
||||
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
|
||||
|
||||
void addref() { ThreadSafeReferenceCounted<MultiVersionDatabase>::addref(); }
|
||||
void delref() { ThreadSafeReferenceCounted<MultiVersionDatabase>::delref(); }
|
||||
void addref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::addref(); }
|
||||
void delref() override { ThreadSafeReferenceCounted<MultiVersionDatabase>::delref(); }
|
||||
|
||||
static Reference<IDatabase> debugCreateFromExistingDatabase(Reference<IDatabase> db);
|
||||
|
||||
@ -354,16 +357,16 @@ private:
|
||||
|
||||
class MultiVersionApi : public IClientApi {
|
||||
public:
|
||||
void selectApiVersion(int apiVersion);
|
||||
const char* getClientVersion();
|
||||
void selectApiVersion(int apiVersion) override;
|
||||
const char* getClientVersion() override;
|
||||
|
||||
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>());
|
||||
void setupNetwork();
|
||||
void runNetwork();
|
||||
void stopNetwork();
|
||||
void addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter);
|
||||
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
|
||||
void setupNetwork() override;
|
||||
void runNetwork() override;
|
||||
void stopNetwork() override;
|
||||
void addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParameter) override;
|
||||
|
||||
Reference<IDatabase> createDatabase(const char *clusterFilePath);
|
||||
Reference<IDatabase> createDatabase(const char *clusterFilePath) override;
|
||||
static MultiVersionApi* api;
|
||||
|
||||
Reference<ClientInfo> getLocalClient();
|
||||
|
@ -3176,6 +3176,11 @@ Future<Standalone<StringRef>> Transaction::getVersionstamp() {
|
||||
return versionstampPromise.getFuture();
|
||||
}
|
||||
|
||||
uint32_t Transaction::getApproximateSize() {
|
||||
return tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() +
|
||||
tr.transaction.write_conflict_ranges.expectedSize();
|
||||
}
|
||||
|
||||
Future<Void> Transaction::onError( Error const& e ) {
|
||||
if (e.code() == error_code_success) {
|
||||
return client_invalid_operation();
|
||||
|
@ -284,6 +284,7 @@ public:
|
||||
|
||||
Promise<Standalone<StringRef>> versionstampPromise;
|
||||
|
||||
uint32_t getApproximateSize();
|
||||
Future<Void> onError( Error const& e );
|
||||
void flushTrLogsIfEnabled();
|
||||
|
||||
|
@ -98,6 +98,7 @@ public:
|
||||
|
||||
Future<Void> commit();
|
||||
Version getCommittedVersion() { return tr.getCommittedVersion(); }
|
||||
uint32_t getApproximateSize() { return tr.getApproximateSize(); }
|
||||
Future<Standalone<StringRef>> getVersionstamp();
|
||||
|
||||
void setOption( FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>() );
|
||||
|
@ -271,13 +271,15 @@ ThreadFuture< Void > ThreadSafeTransaction::commit() {
|
||||
|
||||
Version ThreadSafeTransaction::getCommittedVersion() {
|
||||
// This should be thread safe when called legally, but it is fragile
|
||||
Version v = tr->getCommittedVersion();
|
||||
return v;
|
||||
return tr->getCommittedVersion();
|
||||
}
|
||||
|
||||
uint32_t ThreadSafeTransaction::getApproximateSize() {
|
||||
return tr->getApproximateSize();
|
||||
}
|
||||
|
||||
ThreadFuture<Standalone<StringRef>> ThreadSafeTransaction::getVersionstamp() {
|
||||
ReadYourWritesTransaction *tr = this->tr;
|
||||
return onMainThread([tr]() -> Future < Standalone<StringRef> > {
|
||||
return onMainThread([this]() -> Future < Standalone<StringRef> > {
|
||||
return tr->getVersionstamp();
|
||||
});
|
||||
}
|
||||
|
@ -55,54 +55,54 @@ public:
|
||||
explicit ThreadSafeTransaction(DatabaseContext* cx);
|
||||
~ThreadSafeTransaction();
|
||||
|
||||
void cancel();
|
||||
void setVersion( Version v );
|
||||
ThreadFuture<Version> getReadVersion();
|
||||
void cancel() override;
|
||||
void setVersion( Version v ) override;
|
||||
ThreadFuture<Version> getReadVersion() override;
|
||||
|
||||
ThreadFuture< Optional<Value> > get( const KeyRef& key, bool snapshot = false );
|
||||
ThreadFuture< Key > getKey( const KeySelectorRef& key, bool snapshot = false );
|
||||
ThreadFuture< Standalone<RangeResultRef> > getRange( const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot = false, bool reverse = false );
|
||||
ThreadFuture< Standalone<RangeResultRef> > getRange( const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot = false, bool reverse = false );
|
||||
ThreadFuture< Standalone<RangeResultRef> > getRange( const KeyRangeRef& keys, int limit, bool snapshot = false, bool reverse = false ) {
|
||||
return getRange( firstGreaterOrEqual(keys.begin), firstGreaterOrEqual(keys.end), limit, snapshot, reverse );
|
||||
ThreadFuture< Optional<Value> > get( const KeyRef& key, bool snapshot = false ) override;
|
||||
ThreadFuture< Key > getKey( const KeySelectorRef& key, bool snapshot = false ) override;
|
||||
ThreadFuture< Standalone<RangeResultRef> > getRange( const KeySelectorRef& begin, const KeySelectorRef& end, int limit, bool snapshot = false, bool reverse = false ) override;
|
||||
ThreadFuture< Standalone<RangeResultRef> > getRange( const KeySelectorRef& begin, const KeySelectorRef& end, GetRangeLimits limits, bool snapshot = false, bool reverse = false ) override;
|
||||
ThreadFuture< Standalone<RangeResultRef> > getRange( const KeyRangeRef& keys, int limit, bool snapshot = false, bool reverse = false ) override {
|
||||
return getRange( firstGreaterOrEqual(keys.begin), firstGreaterOrEqual(keys.end), limit, snapshot, reverse ) override;
|
||||
}
|
||||
ThreadFuture< Standalone<RangeResultRef> > getRange( const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot = false, bool reverse = false ) {
|
||||
ThreadFuture< Standalone<RangeResultRef> > getRange( const KeyRangeRef& keys, GetRangeLimits limits, bool snapshot = false, bool reverse = false ) override {
|
||||
return getRange( firstGreaterOrEqual(keys.begin), firstGreaterOrEqual(keys.end), limits, snapshot, reverse );
|
||||
}
|
||||
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key) override;
|
||||
ThreadFuture<Standalone<StringRef>> getVersionstamp() override;
|
||||
|
||||
ThreadFuture<Standalone<VectorRef<const char*>>> getAddressesForKey(const KeyRef& key);
|
||||
|
||||
void addReadConflictRange( const KeyRangeRef& keys );
|
||||
void addReadConflictRange( const KeyRangeRef& keys ) override;
|
||||
void makeSelfConflicting();
|
||||
|
||||
void atomicOp( const KeyRef& key, const ValueRef& value, uint32_t operationType );
|
||||
void set( const KeyRef& key, const ValueRef& value );
|
||||
void clear( const KeyRef& begin, const KeyRef& end);
|
||||
void clear( const KeyRangeRef& range );
|
||||
void clear( const KeyRef& key );
|
||||
void atomicOp( const KeyRef& key, const ValueRef& value, uint32_t operationType ) override;
|
||||
void set( const KeyRef& key, const ValueRef& value ) override;
|
||||
void clear( const KeyRef& begin, const KeyRef& end) override;
|
||||
void clear( const KeyRangeRef& range ) override;
|
||||
void clear( const KeyRef& key ) override;
|
||||
|
||||
ThreadFuture< Void > watch( const KeyRef& key );
|
||||
ThreadFuture< Void > watch( const KeyRef& key ) override;
|
||||
|
||||
void addWriteConflictRange( const KeyRangeRef& keys );
|
||||
void addWriteConflictRange( const KeyRangeRef& keys ) override;
|
||||
|
||||
ThreadFuture<Void> commit();
|
||||
Version getCommittedVersion();
|
||||
ThreadFuture<Standalone<StringRef>> getVersionstamp();
|
||||
ThreadFuture<Void> commit() override;
|
||||
Version getCommittedVersion() override;
|
||||
uint32_t getApproximateSize() override;
|
||||
|
||||
void setOption( FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>() );
|
||||
void setOption( FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>() ) override;
|
||||
|
||||
ThreadFuture<Void> checkDeferredError();
|
||||
ThreadFuture<Void> onError( Error const& e );
|
||||
ThreadFuture<Void> onError( Error const& e ) override;
|
||||
|
||||
// These are to permit use as state variables in actors:
|
||||
ThreadSafeTransaction() : tr(NULL) {}
|
||||
void operator=(ThreadSafeTransaction&& r) BOOST_NOEXCEPT;
|
||||
ThreadSafeTransaction(ThreadSafeTransaction&& r) BOOST_NOEXCEPT;
|
||||
|
||||
void reset();
|
||||
void reset() override;
|
||||
|
||||
void addref() { ThreadSafeReferenceCounted<ThreadSafeTransaction>::addref(); }
|
||||
void delref() { ThreadSafeReferenceCounted<ThreadSafeTransaction>::delref(); }
|
||||
void addref() override { ThreadSafeReferenceCounted<ThreadSafeTransaction>::addref(); }
|
||||
void delref() override { ThreadSafeReferenceCounted<ThreadSafeTransaction>::delref(); }
|
||||
|
||||
private:
|
||||
ReadYourWritesTransaction *tr;
|
||||
|
Loading…
x
Reference in New Issue
Block a user