diff --git a/bindings/go/src/fdb/generated.go b/bindings/go/src/fdb/generated.go index 2a7c40f6aa..782b108fda 100644 --- a/bindings/go/src/fdb/generated.go +++ b/bindings/go/src/fdb/generated.go @@ -228,6 +228,30 @@ func (o NetworkOptions) SetEnableSlowTaskProfiling() error { return o.setOpt(71, nil) } +// Enable client buggify - will make requests randomly fail (intended for client testing) +func (o NetworkOptions) SetClientBuggifyEnable() error { + return o.setOpt(80, nil) +} + +// Disable client buggify +func (o NetworkOptions) SetClientBuggifyDisable() error { + return o.setOpt(81, nil) +} + +// Set the probability of a CLIENT_BUGGIFY section being active for the current execution. +// +// Parameter: probability expressed as a percentage between 0 and 100 +func (o NetworkOptions) SetClientBuggifySectionActivatedProbability(param int64) error { + return o.setOpt(82, int64ToBytes(param)) +} + +// Set the probability of an active CLIENT_BUGGIFY section being fired. A section will only fire if it was activated +// +// Parameter: probability expressed as a percentage between 0 and 100 +func (o NetworkOptions) SetClientBuggifySectionFiredProbability(param int64) error { + return o.setOpt(83, int64ToBytes(param)) +} + // Set the size of the client location cache. Raising this value can boost performance in very large databases where clients access data in a near-random pattern. Defaults to 100000. // // Parameter: Max location cache entries @@ -277,7 +301,7 @@ func (o DatabaseOptions) SetTransactionMaxRetryDelay(param int64) error { return o.setOpt(502, int64ToBytes(param)) } -// Set the maximum transaction size which, if exceeded, will cause the transaction to be cancelled. Default to 10,000,000 bytes. +// Set the maximum transaction size in bytes. This sets the ``size_limit`` option on each transaction created by this database. See the transaction option description for more information. // // Parameter: value in bytes func (o DatabaseOptions) SetTransactionSizeLimit(param int64) error { @@ -409,7 +433,7 @@ func (o TransactionOptions) SetMaxRetryDelay(param int64) error { return o.setOpt(502, int64ToBytes(param)) } -// Set the maximum transaction size which, if exceeded, will cause the transaction to be cancelled. Valid parameter values are ``[32, 10,000,000]```. +// Set the transaction size limit in bytes. The size is calculated by combining the sizes of all keys and values written or mutated, all key ranges cleared, and all read and write conflict ranges. (In other words, it includes the total size of all data included in the request to the cluster to commit the transaction.) Large transactions can cause performance problems on FoundationDB clusters, so setting this limit to a smaller value than the default can help prevent the client from accidentally degrading the cluster's performance. This value must be at least 32 and cannot be set to higher than 10,000,000, the default transaction size limit. // // Parameter: value in bytes func (o TransactionOptions) SetSizeLimit(param int64) error { diff --git a/bindings/python/tests/size_limit.py b/bindings/python/tests/size_limit.py index 6d08f15efc..3072e153f8 100644 --- a/bindings/python/tests/size_limit.py +++ b/bindings/python/tests/size_limit.py @@ -1,6 +1,6 @@ #!/usr/bin/python # -# size_limit.py +# size_limit_tests.py # # This source file is part of the FoundationDB open source project # @@ -21,44 +21,56 @@ import fdb import sys -fdb.api_version(610) +if __name__ == '__main__': + fdb.api_version(610) @fdb.transactional def setValue(tr, key, value): - tr[key] = value + tr[key] = value @fdb.transactional def setValueWithLimit(tr, key, value, limit): - tr.options.set_size_limit(limit) - tr[key] = value + tr.options.set_size_limit(limit) + tr[key] = value -def run(clusterFile): - db = fdb.open(clusterFile) - db.options.set_transaction_timeout(2000) # 2 seconds - db.options.set_transaction_retry_limit(3) - value = 'a' * 1024 +def test_size_limit_option(db): + db.options.set_transaction_timeout(2000) # 2 seconds + db.options.set_transaction_retry_limit(3) + value = 'a' * 1024 - setValue(db, 't1', value) - assert(value == db['t1']) + setValue(db, 't1', value) + assert(value == db['t1']) - try: - db.options.set_transaction_size_limit(1000) - setValue(db, 't2', value) - assert(False) # not reached - except fdb.impl.FDBError as e: - assert(e.code == 2101) # Transaction exceeds byte limit (2101) + try: + db.options.set_transaction_size_limit(1000) + setValue(db, 't2', value) + assert(False) # not reached + except fdb.FDBError as e: + assert(e.code == 2101) # Transaction exceeds byte limit (2101) - # Per transaction option overrides database option - db.options.set_transaction_size_limit(1000000) - try: - setValueWithLimit(db, 't3', value, 1000) - assert(False) # not reached - except fdb.impl.FDBError as e: - assert(e.code == 2101) # Transaction exceeds byte limit (2101) + # Per transaction option overrides database option + db.options.set_transaction_size_limit(1000000) + try: + setValueWithLimit(db, 't3', value, 1000) + assert(False) # not reached + except fdb.FDBError as e: + assert(e.code == 2101) # Transaction exceeds byte limit (2101) + # DB default survives on_error reset + db.options.set_transaction_size_limit(1000) + tr = db.create_transaction() + try: + tr['t4'] = 'bar' + tr.on_error(fdb.FDBError(1007)).wait() + setValue(tr, 't4', value) + tr.commit().wait() + assert(False) # not reached + except fdb.FDBError as e: + assert(e.code == 2101) # Transaction exceeds byte limit (2101) # Expect a cluster file as input. This test will write to the FDB cluster, so # be aware of potential side effects. if __name__ == '__main__': - clusterFile = sys.argv[1] - run(clusterFile) \ No newline at end of file + clusterFile = sys.argv[1] + db = fdb.open(clusterFile) + test_size_limit_option(db) diff --git a/bindings/python/tests/tester.py b/bindings/python/tests/tester.py index 3023cc5cb8..95aa36ea3e 100644 --- a/bindings/python/tests/tester.py +++ b/bindings/python/tests/tester.py @@ -48,6 +48,8 @@ from cancellation_timeout_tests import test_retry_limits from cancellation_timeout_tests import test_db_retry_limits from cancellation_timeout_tests import test_combinations +from size_limit_tests import test_size_limit_option + random.seed(0) if len(sys.argv) == 4: @@ -557,6 +559,8 @@ class Tester: test_locality(db) test_predicates() + test_size_limit_option(db) + except fdb.FDBError as e: print("Unit tests failed: %s" % e.description) traceback.print_exc() diff --git a/documentation/sphinx/source/api-common.rst.inc b/documentation/sphinx/source/api-common.rst.inc index 292cd36ec5..3c99c45382 100644 --- a/documentation/sphinx/source/api-common.rst.inc +++ b/documentation/sphinx/source/api-common.rst.inc @@ -399,7 +399,7 @@ .. |option-set-size-limit-blurb| replace:: - Set the maximum transaction size limit in bytes. The size is calculated by combining the sizes of all keys and values written or mutated, all key ranges cleared, and all read and write conflict ranges. (In other words, it includes the total size of all data included in the request to the cluster to commit the transaction.) Large transactions can cause performance problems on FoundationDB clusters, so setting this limit to a smaller value than the default can help prevent the client from accidentally degrading the cluster's performance. This value must be at least 32 and cannot be set to higher than 10,000,000, the default transaction size limit. The value set by this limit will persist across transaction resets. + Set the transaction size limit in bytes. The size is calculated by combining the sizes of all keys and values written or mutated, all key ranges cleared, and all read and write conflict ranges. (In other words, it includes the total size of all data included in the request to the cluster to commit the transaction.) Large transactions can cause performance problems on FoundationDB clusters, so setting this limit to a smaller value than the default can help prevent the client from accidentally degrading the cluster's performance. This value must be at least 32 and cannot be set to higher than 10,000,000, the default transaction size limit. .. |option-set-timeout-blurb1| replace:: diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index eca185e8f8..537d329fce 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -154,10 +154,6 @@ public: int outstandingWatches; int maxOutstandingWatches; - double transactionTimeout; - int transactionMaxRetries; - double transactionMaxBackoff; - int transactionMaxSize; // Max size in bytes. int snapshotRywEnabled; Future logger; @@ -180,6 +176,8 @@ public: HealthMetrics healthMetrics; double healthMetricsLastUpdated; double detailedHealthMetricsLastUpdated; + + UniqueOrderedOptionList transactionDefaults; }; #endif diff --git a/fdbclient/FDBOptions.h b/fdbclient/FDBOptions.h index e23cab582a..dc3a1d0075 100644 --- a/fdbclient/FDBOptions.h +++ b/fdbclient/FDBOptions.h @@ -23,8 +23,11 @@ #define FDBCLIENT_FDBOPTIONS_H #include +#include #include +#include "flow/Arena.h" + struct FDBOptionInfo { std::string name; std::string comment; @@ -32,9 +35,14 @@ struct FDBOptionInfo { bool hasParameter; bool hidden; + bool persistent; - FDBOptionInfo(std::string name, std::string comment, std::string parameterComment, bool hasParameter, bool hidden) - : name(name), comment(comment), parameterComment(parameterComment), hasParameter(hasParameter), hidden(hidden) { } + // If non-negative, this specifies the code for the transaction option that this option is the default value for. + int defaultFor; + + FDBOptionInfo(std::string name, std::string comment, std::string parameterComment, bool hasParameter, bool hidden, bool persistent, int defaultFor) + : name(name), comment(comment), parameterComment(parameterComment), hasParameter(hasParameter), hidden(hidden), persistent(persistent), + defaultFor(defaultFor) { } FDBOptionInfo() { } }; @@ -54,6 +62,29 @@ public: FDBOptionInfoMap() { T::init(); } }; -#define ADD_OPTION_INFO( type, var, name, comment, parameterComment, hasParameter, hidden ) type::optionInfo[var] = FDBOptionInfo(name, comment, parameterComment, hasParameter, hidden); +template +class UniqueOrderedOptionList { +public: + typedef std::list>>> OptionList; + +private: + OptionList options; + std::map optionsIndexMap; + +public: + void addOption(typename T::Option option, Optional> value) { + auto itr = optionsIndexMap.find(option); + if(itr != optionsIndexMap.end()) { + options.erase(itr->second); + } + options.push_back(std::make_pair(option, value)); + optionsIndexMap[option] = --options.end(); + } + + typename OptionList::const_iterator begin() const { return options.cbegin(); } + typename OptionList::const_iterator end() const { return options.cend(); } +}; + +#define ADD_OPTION_INFO( type, var, name, comment, parameterComment, hasParameter, hidden, persistent, defaultFor ) type::optionInfo[var] = FDBOptionInfo(name, comment, parameterComment, hasParameter, hidden, persistent, defaultFor); #endif \ No newline at end of file diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index c2ef5ad1b6..1f51dd3680 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -408,17 +408,36 @@ void DLApi::addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParame } // MultiVersionTransaction -MultiVersionTransaction::MultiVersionTransaction(Reference db) : db(db) { +MultiVersionTransaction::MultiVersionTransaction(Reference db, UniqueOrderedOptionList defaultOptions) : db(db) { + setDefaultOptions(defaultOptions); updateTransaction(); } -// SOMEDAY: This function is unsafe if it's possible to set Database options that affect subsequently created transactions. There are currently no such options. +void MultiVersionTransaction::setDefaultOptions(UniqueOrderedOptionList options) { + MutexHolder holder(db->dbState->optionLock); + std::copy(options.begin(), options.end(), std::back_inserter(persistentOptions)); +} + void MultiVersionTransaction::updateTransaction() { auto currentDb = db->dbState->dbVar->get(); TransactionInfo newTr; if(currentDb.value) { newTr.transaction = currentDb.value->createTransaction(); + + Optional timeout; + for (auto option : persistentOptions) { + if(option.first == FDBTransactionOptions::TIMEOUT) { + timeout = option.second.castTo(); + } + else { + newTr.transaction->setOption(option.first, option.second.castTo()); + } + } + + if(timeout.present()) { + newTr.transaction->setOption(FDBTransactionOptions::TIMEOUT, timeout); + } } newTr.onChange = currentDb.onChange; @@ -574,6 +593,9 @@ Version MultiVersionTransaction::getCommittedVersion() { } void MultiVersionTransaction::setOption(FDBTransactionOptions::Option option, Optional value) { + if(MultiVersionApi::apiVersionAtLeast(610) && FDBTransactionOptions::optionInfo[option].persistent) { + persistentOptions.push_back(std::make_pair(option, value.castTo>())); + } auto tr = getTransaction(); if(tr.transaction) { tr.transaction->setOption(option, value); @@ -593,6 +615,8 @@ ThreadFuture MultiVersionTransaction::onError(Error const& e) { } void MultiVersionTransaction::reset() { + persistentOptions.clear(); + setDefaultOptions(db->dbState->transactionDefaultOptions); updateTransaction(); } @@ -630,13 +654,12 @@ Reference MultiVersionDatabase::debugCreateFromExistingDatabase(Refer } Reference MultiVersionDatabase::createTransaction() { - return Reference(new MultiVersionTransaction(Reference::addRef(this))); + return Reference(new MultiVersionTransaction(Reference::addRef(this), dbState->transactionDefaultOptions)); } void MultiVersionDatabase::setOption(FDBDatabaseOptions::Option option, Optional value) { MutexHolder holder(dbState->optionLock); - auto itr = FDBDatabaseOptions::optionInfo.find(option); if(itr != FDBDatabaseOptions::optionInfo.end()) { TraceEvent("SetDatabaseOption").detail("Option", itr->second.name); @@ -646,11 +669,18 @@ void MultiVersionDatabase::setOption(FDBDatabaseOptions::Option option, Optional throw invalid_option(); } - if(dbState->db) { - dbState->db->setOption(option, value); + int defaultFor = FDBDatabaseOptions::optionInfo[option].defaultFor; + if (defaultFor >= 0) { + ASSERT(FDBTransactionOptions::optionInfo.find((FDBTransactionOptions::Option)defaultFor) != + FDBTransactionOptions::optionInfo.end()); + dbState->transactionDefaultOptions.addOption((FDBTransactionOptions::Option)defaultFor, value.castTo>()); } dbState->options.push_back(std::make_pair(option, value.castTo>())); + + if(dbState->db) { + dbState->db->setOption(option, value); + } } void MultiVersionDatabase::Connector::connect() { @@ -811,6 +841,11 @@ void MultiVersionDatabase::DatabaseState::cancelConnections() { // MultiVersionApi +bool MultiVersionApi::apiVersionAtLeast(int minVersion) { + ASSERT(MultiVersionApi::api->apiVersion != 0); + return MultiVersionApi::api->apiVersion >= minVersion; +} + // runOnFailedClients should be used cautiously. Some failed clients may not have successfully loaded all symbols. void MultiVersionApi::runOnExternalClients(std::function)> func, bool runOnFailedClients) { bool newFailure = false; diff --git a/fdbclient/MultiVersionTransaction.h b/fdbclient/MultiVersionTransaction.h index 6ddaeac8fa..b1a1c3372a 100644 --- a/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/MultiVersionTransaction.h @@ -210,7 +210,7 @@ class MultiVersionDatabase; class MultiVersionTransaction : public ITransaction, ThreadSafeReferenceCounted { public: - MultiVersionTransaction(Reference db); + MultiVersionTransaction(Reference db, UniqueOrderedOptionList defaultOptions); void cancel(); void setVersion(Version v); @@ -261,6 +261,9 @@ private: TransactionInfo getTransaction(); void updateTransaction(); + void setDefaultOptions(UniqueOrderedOptionList options); + + std::vector>>> persistentOptions; }; struct ClientInfo : ThreadSafeReferenceCounted { @@ -341,6 +344,7 @@ private: std::vector> connectionAttempts; std::vector>>> options; + UniqueOrderedOptionList transactionDefaultOptions; Mutex optionLock; }; @@ -370,6 +374,8 @@ public: bool callbackOnMainThread; bool localClientDisabled; + static bool apiVersionAtLeast(int minVersion); + private: MultiVersionApi(); diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 6cea540491..4f0770f680 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -279,7 +279,6 @@ struct TrInfoChunk { ACTOR static Future transactionInfoCommitActor(Transaction *tr, std::vector *chunks) { state const Key clientLatencyAtomicCtr = CLIENT_LATENCY_INFO_CTR_PREFIX.withPrefix(fdbClientInfoPrefixRange.begin); - state int retryCount = 0; loop{ try { tr->reset(); @@ -296,9 +295,6 @@ ACTOR static Future transactionInfoCommitActor(Transaction *tr, std::vecto return Void(); } catch (Error& e) { - retryCount++; - if (retryCount == 10) - throw; wait(tr->onError(e)); } } @@ -516,15 +512,13 @@ DatabaseContext::DatabaseContext( lockAware(lockAware), apiVersion(apiVersion), provisional(false), transactionReadVersions(0), transactionLogicalReads(0), transactionPhysicalReads(0), transactionCommittedMutations(0), transactionCommittedMutationBytes(0), transactionsCommitStarted(0), transactionsCommitCompleted(0), transactionsTooOld(0), transactionsFutureVersions(0), transactionsNotCommitted(0), - transactionsMaybeCommitted(0), transactionsResourceConstrained(0), transactionsProcessBehind(0), outstandingWatches(0), transactionTimeout(0.0), transactionMaxRetries(-1), + transactionsMaybeCommitted(0), transactionsResourceConstrained(0), transactionsProcessBehind(0), outstandingWatches(0), latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), mvCacheInsertLocation(0), healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0) { metadataVersionCache.resize(CLIENT_KNOBS->METADATA_VERSION_CACHE_SIZE); maxOutstandingWatches = CLIENT_KNOBS->DEFAULT_MAX_OUTSTANDING_WATCHES; - transactionMaxBackoff = CLIENT_KNOBS->FAILURE_MAX_DELAY; - transactionMaxSize = CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT; snapshotRywEnabled = apiVersionAtLeast(300) ? 1 : 0; logger = databaseLogger( this ); @@ -745,52 +739,43 @@ uint64_t extractHexOption( StringRef value ) { } void DatabaseContext::setOption( FDBDatabaseOptions::Option option, Optional value) { - switch(option) { - case FDBDatabaseOptions::LOCATION_CACHE_SIZE: - locationCacheSize = (int)extractIntOption(value, 0, std::numeric_limits::max()); - break; - case FDBDatabaseOptions::MACHINE_ID: - clientLocality = LocalityData( clientLocality.processId(), value.present() ? Standalone(value.get()) : Optional>(), clientLocality.machineId(), clientLocality.dcId() ); - if( clientInfo->get().proxies.size() ) - masterProxies = Reference( new ProxyInfo( clientInfo->get().proxies, clientLocality ) ); - server_interf.clear(); - locationCache.insert( allKeys, Reference() ); - break; - case FDBDatabaseOptions::MAX_WATCHES: - maxOutstandingWatches = (int)extractIntOption(value, 0, CLIENT_KNOBS->ABSOLUTE_MAX_WATCHES); - break; - case FDBDatabaseOptions::DATACENTER_ID: - clientLocality = LocalityData(clientLocality.processId(), clientLocality.zoneId(), clientLocality.machineId(), value.present() ? Standalone(value.get()) : Optional>()); - if( clientInfo->get().proxies.size() ) - masterProxies = Reference( new ProxyInfo( clientInfo->get().proxies, clientLocality )); - server_interf.clear(); - locationCache.insert( allKeys, Reference() ); - break; - case FDBDatabaseOptions::TRANSACTION_TIMEOUT: - if( !apiVersionAtLeast(610) ) { - throw invalid_option(); - } - transactionTimeout = extractIntOption(value, 0, std::numeric_limits::max())/1000.0; - break; - case FDBDatabaseOptions::TRANSACTION_RETRY_LIMIT: - transactionMaxRetries = (int)extractIntOption(value, -1, std::numeric_limits::max()); - break; - case FDBDatabaseOptions::TRANSACTION_MAX_RETRY_DELAY: - validateOptionValue(value, true); - transactionMaxBackoff = extractIntOption(value, 0, std::numeric_limits::max()) / 1000.0; - break; - case FDBDatabaseOptions::TRANSACTION_SIZE_LIMIT: - validateOptionValue(value, true); - transactionMaxSize = extractIntOption(value, 32, CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT); - break; - case FDBDatabaseOptions::SNAPSHOT_RYW_ENABLE: - validateOptionValue(value, false); - snapshotRywEnabled++; - break; - case FDBDatabaseOptions::SNAPSHOT_RYW_DISABLE: - validateOptionValue(value, false); - snapshotRywEnabled--; - break; + int defaultFor = FDBDatabaseOptions::optionInfo[option].defaultFor; + if (defaultFor >= 0) { + ASSERT(FDBTransactionOptions::optionInfo.find((FDBTransactionOptions::Option)defaultFor) != + FDBTransactionOptions::optionInfo.end()); + transactionDefaults.addOption((FDBTransactionOptions::Option)option, value.castTo>()); + } + else { + switch(option) { + case FDBDatabaseOptions::LOCATION_CACHE_SIZE: + locationCacheSize = (int)extractIntOption(value, 0, std::numeric_limits::max()); + break; + case FDBDatabaseOptions::MACHINE_ID: + clientLocality = LocalityData( clientLocality.processId(), value.present() ? Standalone(value.get()) : Optional>(), clientLocality.machineId(), clientLocality.dcId() ); + if( clientInfo->get().proxies.size() ) + masterProxies = Reference( new ProxyInfo( clientInfo->get().proxies, clientLocality ) ); + server_interf.clear(); + locationCache.insert( allKeys, Reference() ); + break; + case FDBDatabaseOptions::MAX_WATCHES: + maxOutstandingWatches = (int)extractIntOption(value, 0, CLIENT_KNOBS->ABSOLUTE_MAX_WATCHES); + break; + case FDBDatabaseOptions::DATACENTER_ID: + clientLocality = LocalityData(clientLocality.processId(), clientLocality.zoneId(), clientLocality.machineId(), value.present() ? Standalone(value.get()) : Optional>()); + if( clientInfo->get().proxies.size() ) + masterProxies = Reference( new ProxyInfo( clientInfo->get().proxies, clientLocality )); + server_interf.clear(); + locationCache.insert( allKeys, Reference() ); + break; + case FDBDatabaseOptions::SNAPSHOT_RYW_ENABLE: + validateOptionValue(value, false); + snapshotRywEnabled++; + break; + case FDBDatabaseOptions::SNAPSHOT_RYW_DISABLE: + validateOptionValue(value, false); + snapshotRywEnabled--; + break; + } } } @@ -839,6 +824,11 @@ Database Database::createDatabase( std::string connFileName, int apiVersion, Loc return Database::createDatabase(rccf, apiVersion, clientLocality); } +const UniqueOrderedOptionList& Database::getTransactionDefaults() const { + ASSERT(db); + return db->transactionDefaults; +} + extern IPAddress determinePublicIPAutomatically(ClusterConnectionString const& ccs); Cluster::Cluster( Reference connFile, Reference> connectedCoordinatorsNum, int apiVersion ) @@ -2457,8 +2447,6 @@ double Transaction::getBackoff(int errCode) { } TransactionOptions::TransactionOptions(Database const& cx) { - maxBackoff = cx->transactionMaxBackoff; - sizeLimit = cx->transactionMaxSize; reset(cx); if (BUGGIFY) { commitOnFirstProxy = true; @@ -2472,11 +2460,9 @@ TransactionOptions::TransactionOptions() { } void TransactionOptions::reset(Database const& cx) { - double oldMaxBackoff = maxBackoff; - uint32_t oldSizeLimit = sizeLimit; memset(this, 0, sizeof(*this)); - maxBackoff = cx->apiVersionAtLeast(610) ? oldMaxBackoff : cx->transactionMaxBackoff; - sizeLimit = oldSizeLimit; + maxBackoff = CLIENT_KNOBS->DEFAULT_MAX_BACKOFF; + sizeLimit = CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT; lockAware = cx->lockAware; } @@ -2503,7 +2489,6 @@ void Transaction::reset() { void Transaction::fullReset() { reset(); backoff = CLIENT_KNOBS->DEFAULT_BACKOFF; - options.maxBackoff = getDatabase()->transactionMaxBackoff; } int Transaction::apiVersionAtLeast(int minVersion) const { @@ -3150,8 +3135,7 @@ Future> Transaction::getVersionstamp() { } Future Transaction::onError( Error const& e ) { - if (e.code() == error_code_success) - { + if (e.code() == error_code_success) { return client_invalid_operation(); } if (e.code() == error_code_not_committed || @@ -3175,7 +3159,7 @@ Future Transaction::onError( Error const& e ) { double backoff = getBackoff(e.code()); reset(); - return delay( backoff, info.taskID ); + return delay(backoff, info.taskID); } if (e.code() == error_code_transaction_too_old || e.code() == error_code_future_version) @@ -3187,7 +3171,7 @@ Future Transaction::onError( Error const& e ) { double maxBackoff = options.maxBackoff; reset(); - return delay( std::min(CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, maxBackoff), info.taskID ); + return delay(std::min(CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, maxBackoff), info.taskID); } if(g_network->isSimulated() && ++numErrors % 10 == 0) diff --git a/fdbclient/NativeAPI.actor.h b/fdbclient/NativeAPI.actor.h index 0fbf76cfe4..4419646da3 100644 --- a/fdbclient/NativeAPI.actor.h +++ b/fdbclient/NativeAPI.actor.h @@ -90,6 +90,8 @@ public: inline DatabaseContext* extractPtr() { return db.extractPtr(); } DatabaseContext* operator->() const { return db.getPtr(); } + const UniqueOrderedOptionList& getTransactionDefaults() const; + private: Reference db; }; diff --git a/fdbclient/ReadYourWrites.actor.cpp b/fdbclient/ReadYourWrites.actor.cpp index 7d006dd85a..8bc1f3683c 100644 --- a/fdbclient/ReadYourWrites.actor.cpp +++ b/fdbclient/ReadYourWrites.actor.cpp @@ -1124,7 +1124,8 @@ public: }; ReadYourWritesTransaction::ReadYourWritesTransaction( Database const& cx ) : cache(&arena), writes(&arena), tr(cx), retries(0), creationTime(now()), commitStarted(false), options(tr), deferredError(cx->deferredError) { - resetTimeout(); + std::copy(cx.getTransactionDefaults().begin(), cx.getTransactionDefaults().end(), std::back_inserter(persistentOptions)); + applyPersistentOptions(); } ACTOR Future timebomb(double endTime, Promise resetPromise) { @@ -1473,36 +1474,16 @@ void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const } ReadYourWritesTransactionOptions::ReadYourWritesTransactionOptions(Transaction const& tr) { - Database cx = tr.getDatabase(); - timeoutInSeconds = cx->transactionTimeout; - maxRetries = cx->transactionMaxRetries; reset(tr); } void ReadYourWritesTransactionOptions::reset(Transaction const& tr) { - double oldTimeout = timeoutInSeconds; - int oldMaxRetries = maxRetries; memset(this, 0, sizeof(*this)); - if( tr.apiVersionAtLeast(610) ) { - // Starting in API version 610, these options are not cleared after reset. - timeoutInSeconds = oldTimeout; - maxRetries = oldMaxRetries; - } - else { - Database cx = tr.getDatabase(); - maxRetries = cx->transactionMaxRetries; - timeoutInSeconds = cx->transactionTimeout; - } + timeoutInSeconds = 0.0; + maxRetries = -1; snapshotRywEnabled = tr.getDatabase()->snapshotRywEnabled; } -void ReadYourWritesTransactionOptions::fullReset(Transaction const& tr) { - reset(tr); - Database cx = tr.getDatabase(); - maxRetries = cx->transactionMaxRetries; - timeoutInSeconds = cx->transactionTimeout; -} - bool ReadYourWritesTransactionOptions::getAndResetWriteConflictDisabled() { bool disabled = nextWriteDisableConflictRange; nextWriteDisableConflictRange = false; @@ -1777,7 +1758,15 @@ Future> ReadYourWritesTransaction::getVersionstamp() { return waitOrError(tr.getVersionstamp(), resetPromise.getFuture()); } -void ReadYourWritesTransaction::setOption( FDBTransactionOptions::Option option, Optional value ) { +void ReadYourWritesTransaction::setOption( FDBTransactionOptions::Option option, Optional value ) { + setOptionImpl(option, value); + + if(FDBTransactionOptions::optionInfo[option].persistent) { + persistentOptions.push_back(std::make_pair(option, value.castTo>())); + } +} + +void ReadYourWritesTransaction::setOptionImpl( FDBTransactionOptions::Option option, Optional value ) { switch(option) { case FDBTransactionOptions::READ_YOUR_WRITES_DISABLE: validateOptionValue(value, false); @@ -1815,8 +1804,8 @@ void ReadYourWritesTransaction::setOption( FDBTransactionOptions::Option option, case FDBTransactionOptions::TIMEOUT: options.timeoutInSeconds = extractIntOption(value, 0, std::numeric_limits::max())/1000.0; - resetTimeout(); - break; + resetTimeout(); + break; case FDBTransactionOptions::RETRY_LIMIT: options.maxRetries = (int)extractIntOption(value, -1, std::numeric_limits::max()); @@ -1872,6 +1861,7 @@ void ReadYourWritesTransaction::operator=(ReadYourWritesTransaction&& r) BOOST_N transactionDebugInfo = r.transactionDebugInfo; cache.arena = &arena; writes.arena = &arena; + persistentOptions = std::move(r.persistentOptions); } ReadYourWritesTransaction::ReadYourWritesTransaction(ReadYourWritesTransaction&& r) BOOST_NOEXCEPT : @@ -1894,12 +1884,29 @@ ReadYourWritesTransaction::ReadYourWritesTransaction(ReadYourWritesTransaction&& readConflicts = std::move(r.readConflicts); watchMap = std::move( r.watchMap ); r.resetPromise = Promise(); + persistentOptions = std::move(r.persistentOptions); } Future ReadYourWritesTransaction::onError(Error const& e) { return RYWImpl::onError( this, e ); } +void ReadYourWritesTransaction::applyPersistentOptions() { + Optional timeout; + for (auto option : persistentOptions) { + if(option.first == FDBTransactionOptions::TIMEOUT) { + timeout = option.second.castTo(); + } + else { + setOptionImpl(option.first, option.second.castTo()); + } + } + + if(timeout.present()) { + setOptionImpl(FDBTransactionOptions::TIMEOUT, timeout); + } +} + void ReadYourWritesTransaction::resetRyow() { Promise oldReset = resetPromise; resetPromise = Promise(); @@ -1917,7 +1924,7 @@ void ReadYourWritesTransaction::resetRyow() { if(tr.apiVersionAtLeast(16)) { options.reset(tr); - resetTimeout(); + applyPersistentOptions(); } if ( !oldReset.isSet() ) @@ -1933,9 +1940,11 @@ void ReadYourWritesTransaction::reset() { retries = 0; creationTime = now(); timeoutActor.cancel(); - options.fullReset(tr); + persistentOptions.clear(); + options.reset(tr); transactionDebugInfo.clear(); tr.fullReset(); + std::copy(tr.getDatabase().getTransactionDefaults().begin(), tr.getDatabase().getTransactionDefaults().end(), std::back_inserter(persistentOptions)); resetRyow(); } diff --git a/fdbclient/ReadYourWrites.h b/fdbclient/ReadYourWrites.h index c5d4e0fafc..f4b93eabcc 100644 --- a/fdbclient/ReadYourWrites.h +++ b/fdbclient/ReadYourWrites.h @@ -44,7 +44,6 @@ struct ReadYourWritesTransactionOptions { ReadYourWritesTransactionOptions() {} explicit ReadYourWritesTransactionOptions(Transaction const& tr); void reset(Transaction const& tr); - void fullReset(Transaction const& tr); bool getAndResetWriteConflictDisabled(); }; @@ -160,6 +159,10 @@ private: void debugLogRetries(Optional error = Optional()); + void setOptionImpl( FDBTransactionOptions::Option option, Optional value = Optional() ); + void applyPersistentOptions(); + + std::vector>>> persistentOptions; ReadYourWritesTransactionOptions options; }; diff --git a/fdbclient/ThreadSafeTransaction.actor.cpp b/fdbclient/ThreadSafeTransaction.actor.cpp index 130b1652ce..d341ac3c5d 100644 --- a/fdbclient/ThreadSafeTransaction.actor.cpp +++ b/fdbclient/ThreadSafeTransaction.actor.cpp @@ -53,6 +53,8 @@ Reference ThreadSafeDatabase::createTransaction() { void ThreadSafeDatabase::setOption( FDBDatabaseOptions::Option option, Optional value) { DatabaseContext *db = this->db; Standalone> passValue = value; + + // ThreadSafeDatabase is not allowed to do anything with options except pass them through to RYW. onMainThreadVoid( [db, option, passValue](){ db->checkDeferredError(); db->setOption(option, passValue.contents()); @@ -274,6 +276,8 @@ ThreadFuture> ThreadSafeTransaction::getVersionstamp() { void ThreadSafeTransaction::setOption( FDBTransactionOptions::Option option, Optional value ) { ReadYourWritesTransaction *tr = this->tr; Standalone> passValue = value; + + // ThreadSafeTransaction is not allowed to do anything with options except pass them through to RYW. onMainThreadVoid( [tr, option, passValue](){ tr->setOption(option, passValue.contents()); }, &tr->deferredError ); } diff --git a/fdbclient/vexillographer/cpp.cs b/fdbclient/vexillographer/cpp.cs index 0d17cbdb5c..4ea844f6ca 100644 --- a/fdbclient/vexillographer/cpp.cs +++ b/fdbclient/vexillographer/cpp.cs @@ -47,8 +47,8 @@ namespace vexillographer private static string getCInfoLine(Option o, string indent, string structName) { - return String.Format("{0}ADD_OPTION_INFO({1}, {2}, \"{2}\", \"{3}\", \"{4}\", {5}, {6})", - indent, structName, o.name.ToUpper(), o.comment, o.getParameterComment(), (o.paramDesc != null).ToString().ToLower(), o.hidden.ToString().ToLower()); + return String.Format("{0}ADD_OPTION_INFO({1}, {2}, \"{2}\", \"{3}\", \"{4}\", {5}, {6}, {7}, {8})", + indent, structName, o.name.ToUpper(), o.comment, o.getParameterComment(), (o.paramDesc != null).ToString().ToLower(), o.hidden.ToString().ToLower(), o.persistent.ToString().ToLower(), o.defaultFor); } private static void writeCppInfo(TextWriter outFile, Scope scope, IEnumerable