Make default and persistent options specifyable via annotations to fdb.options. Fix some issues with persisting these options in the multi-version client. Make size limit option not persistent.

This commit is contained in:
A.J. Beamon 2019-06-28 13:24:32 -07:00
parent 35b6277a50
commit 2035b36257
16 changed files with 276 additions and 150 deletions

View File

@ -228,6 +228,30 @@ func (o NetworkOptions) SetEnableSlowTaskProfiling() error {
return o.setOpt(71, nil)
}
// Enable client buggify - will make requests randomly fail (intended for client testing)
func (o NetworkOptions) SetClientBuggifyEnable() error {
return o.setOpt(80, nil)
}
// Disable client buggify
func (o NetworkOptions) SetClientBuggifyDisable() error {
return o.setOpt(81, nil)
}
// Set the probability of a CLIENT_BUGGIFY section being active for the current execution.
//
// Parameter: probability expressed as a percentage between 0 and 100
func (o NetworkOptions) SetClientBuggifySectionActivatedProbability(param int64) error {
return o.setOpt(82, int64ToBytes(param))
}
// Set the probability of an active CLIENT_BUGGIFY section being fired. A section will only fire if it was activated
//
// Parameter: probability expressed as a percentage between 0 and 100
func (o NetworkOptions) SetClientBuggifySectionFiredProbability(param int64) error {
return o.setOpt(83, int64ToBytes(param))
}
// Set the size of the client location cache. Raising this value can boost performance in very large databases where clients access data in a near-random pattern. Defaults to 100000.
//
// Parameter: Max location cache entries
@ -277,7 +301,7 @@ func (o DatabaseOptions) SetTransactionMaxRetryDelay(param int64) error {
return o.setOpt(502, int64ToBytes(param))
}
// Set the maximum transaction size which, if exceeded, will cause the transaction to be cancelled. Default to 10,000,000 bytes.
// Set the maximum transaction size in bytes. This sets the ``size_limit`` option on each transaction created by this database. See the transaction option description for more information.
//
// Parameter: value in bytes
func (o DatabaseOptions) SetTransactionSizeLimit(param int64) error {
@ -409,7 +433,7 @@ func (o TransactionOptions) SetMaxRetryDelay(param int64) error {
return o.setOpt(502, int64ToBytes(param))
}
// Set the maximum transaction size which, if exceeded, will cause the transaction to be cancelled. Valid parameter values are ``[32, 10,000,000]```.
// Set the transaction size limit in bytes. The size is calculated by combining the sizes of all keys and values written or mutated, all key ranges cleared, and all read and write conflict ranges. (In other words, it includes the total size of all data included in the request to the cluster to commit the transaction.) Large transactions can cause performance problems on FoundationDB clusters, so setting this limit to a smaller value than the default can help prevent the client from accidentally degrading the cluster's performance. This value must be at least 32 and cannot be set to higher than 10,000,000, the default transaction size limit.
//
// Parameter: value in bytes
func (o TransactionOptions) SetSizeLimit(param int64) error {

View File

@ -1,6 +1,6 @@
#!/usr/bin/python
#
# size_limit.py
# size_limit_tests.py
#
# This source file is part of the FoundationDB open source project
#
@ -21,44 +21,56 @@
import fdb
import sys
fdb.api_version(610)
if __name__ == '__main__':
fdb.api_version(610)
@fdb.transactional
def setValue(tr, key, value):
tr[key] = value
tr[key] = value
@fdb.transactional
def setValueWithLimit(tr, key, value, limit):
tr.options.set_size_limit(limit)
tr[key] = value
tr.options.set_size_limit(limit)
tr[key] = value
def run(clusterFile):
db = fdb.open(clusterFile)
db.options.set_transaction_timeout(2000) # 2 seconds
db.options.set_transaction_retry_limit(3)
value = 'a' * 1024
def test_size_limit_option(db):
db.options.set_transaction_timeout(2000) # 2 seconds
db.options.set_transaction_retry_limit(3)
value = 'a' * 1024
setValue(db, 't1', value)
assert(value == db['t1'])
setValue(db, 't1', value)
assert(value == db['t1'])
try:
db.options.set_transaction_size_limit(1000)
setValue(db, 't2', value)
assert(False) # not reached
except fdb.impl.FDBError as e:
assert(e.code == 2101) # Transaction exceeds byte limit (2101)
try:
db.options.set_transaction_size_limit(1000)
setValue(db, 't2', value)
assert(False) # not reached
except fdb.FDBError as e:
assert(e.code == 2101) # Transaction exceeds byte limit (2101)
# Per transaction option overrides database option
db.options.set_transaction_size_limit(1000000)
try:
setValueWithLimit(db, 't3', value, 1000)
assert(False) # not reached
except fdb.impl.FDBError as e:
assert(e.code == 2101) # Transaction exceeds byte limit (2101)
# Per transaction option overrides database option
db.options.set_transaction_size_limit(1000000)
try:
setValueWithLimit(db, 't3', value, 1000)
assert(False) # not reached
except fdb.FDBError as e:
assert(e.code == 2101) # Transaction exceeds byte limit (2101)
# DB default survives on_error reset
db.options.set_transaction_size_limit(1000)
tr = db.create_transaction()
try:
tr['t4'] = 'bar'
tr.on_error(fdb.FDBError(1007)).wait()
setValue(tr, 't4', value)
tr.commit().wait()
assert(False) # not reached
except fdb.FDBError as e:
assert(e.code == 2101) # Transaction exceeds byte limit (2101)
# Expect a cluster file as input. This test will write to the FDB cluster, so
# be aware of potential side effects.
if __name__ == '__main__':
clusterFile = sys.argv[1]
run(clusterFile)
clusterFile = sys.argv[1]
db = fdb.open(clusterFile)
test_size_limit_option(db)

View File

@ -48,6 +48,8 @@ from cancellation_timeout_tests import test_retry_limits
from cancellation_timeout_tests import test_db_retry_limits
from cancellation_timeout_tests import test_combinations
from size_limit_tests import test_size_limit_option
random.seed(0)
if len(sys.argv) == 4:
@ -557,6 +559,8 @@ class Tester:
test_locality(db)
test_predicates()
test_size_limit_option(db)
except fdb.FDBError as e:
print("Unit tests failed: %s" % e.description)
traceback.print_exc()

View File

@ -399,7 +399,7 @@
.. |option-set-size-limit-blurb| replace::
Set the maximum transaction size limit in bytes. The size is calculated by combining the sizes of all keys and values written or mutated, all key ranges cleared, and all read and write conflict ranges. (In other words, it includes the total size of all data included in the request to the cluster to commit the transaction.) Large transactions can cause performance problems on FoundationDB clusters, so setting this limit to a smaller value than the default can help prevent the client from accidentally degrading the cluster's performance. This value must be at least 32 and cannot be set to higher than 10,000,000, the default transaction size limit. The value set by this limit will persist across transaction resets.
Set the transaction size limit in bytes. The size is calculated by combining the sizes of all keys and values written or mutated, all key ranges cleared, and all read and write conflict ranges. (In other words, it includes the total size of all data included in the request to the cluster to commit the transaction.) Large transactions can cause performance problems on FoundationDB clusters, so setting this limit to a smaller value than the default can help prevent the client from accidentally degrading the cluster's performance. This value must be at least 32 and cannot be set to higher than 10,000,000, the default transaction size limit.
.. |option-set-timeout-blurb1| replace::

View File

@ -154,10 +154,6 @@ public:
int outstandingWatches;
int maxOutstandingWatches;
double transactionTimeout;
int transactionMaxRetries;
double transactionMaxBackoff;
int transactionMaxSize; // Max size in bytes.
int snapshotRywEnabled;
Future<Void> logger;
@ -180,6 +176,8 @@ public:
HealthMetrics healthMetrics;
double healthMetricsLastUpdated;
double detailedHealthMetricsLastUpdated;
UniqueOrderedOptionList<FDBTransactionOptions> transactionDefaults;
};
#endif

View File

@ -23,8 +23,11 @@
#define FDBCLIENT_FDBOPTIONS_H
#include <string>
#include <list>
#include <map>
#include "flow/Arena.h"
struct FDBOptionInfo {
std::string name;
std::string comment;
@ -32,9 +35,14 @@ struct FDBOptionInfo {
bool hasParameter;
bool hidden;
bool persistent;
FDBOptionInfo(std::string name, std::string comment, std::string parameterComment, bool hasParameter, bool hidden)
: name(name), comment(comment), parameterComment(parameterComment), hasParameter(hasParameter), hidden(hidden) { }
// If non-negative, this specifies the code for the transaction option that this option is the default value for.
int defaultFor;
FDBOptionInfo(std::string name, std::string comment, std::string parameterComment, bool hasParameter, bool hidden, bool persistent, int defaultFor)
: name(name), comment(comment), parameterComment(parameterComment), hasParameter(hasParameter), hidden(hidden), persistent(persistent),
defaultFor(defaultFor) { }
FDBOptionInfo() { }
};
@ -54,6 +62,29 @@ public:
FDBOptionInfoMap() { T::init(); }
};
#define ADD_OPTION_INFO( type, var, name, comment, parameterComment, hasParameter, hidden ) type::optionInfo[var] = FDBOptionInfo(name, comment, parameterComment, hasParameter, hidden);
template<class T>
class UniqueOrderedOptionList {
public:
typedef std::list<std::pair<typename T::Option, Optional<Standalone<StringRef>>>> OptionList;
private:
OptionList options;
std::map<typename T::Option, typename OptionList::iterator> optionsIndexMap;
public:
void addOption(typename T::Option option, Optional<Standalone<StringRef>> value) {
auto itr = optionsIndexMap.find(option);
if(itr != optionsIndexMap.end()) {
options.erase(itr->second);
}
options.push_back(std::make_pair(option, value));
optionsIndexMap[option] = --options.end();
}
typename OptionList::const_iterator begin() const { return options.cbegin(); }
typename OptionList::const_iterator end() const { return options.cend(); }
};
#define ADD_OPTION_INFO( type, var, name, comment, parameterComment, hasParameter, hidden, persistent, defaultFor ) type::optionInfo[var] = FDBOptionInfo(name, comment, parameterComment, hasParameter, hidden, persistent, defaultFor);
#endif

View File

@ -408,17 +408,36 @@ void DLApi::addNetworkThreadCompletionHook(void (*hook)(void*), void *hookParame
}
// MultiVersionTransaction
MultiVersionTransaction::MultiVersionTransaction(Reference<MultiVersionDatabase> db) : db(db) {
MultiVersionTransaction::MultiVersionTransaction(Reference<MultiVersionDatabase> db, UniqueOrderedOptionList<FDBTransactionOptions> defaultOptions) : db(db) {
setDefaultOptions(defaultOptions);
updateTransaction();
}
// SOMEDAY: This function is unsafe if it's possible to set Database options that affect subsequently created transactions. There are currently no such options.
void MultiVersionTransaction::setDefaultOptions(UniqueOrderedOptionList<FDBTransactionOptions> options) {
MutexHolder holder(db->dbState->optionLock);
std::copy(options.begin(), options.end(), std::back_inserter(persistentOptions));
}
void MultiVersionTransaction::updateTransaction() {
auto currentDb = db->dbState->dbVar->get();
TransactionInfo newTr;
if(currentDb.value) {
newTr.transaction = currentDb.value->createTransaction();
Optional<StringRef> timeout;
for (auto option : persistentOptions) {
if(option.first == FDBTransactionOptions::TIMEOUT) {
timeout = option.second.castTo<StringRef>();
}
else {
newTr.transaction->setOption(option.first, option.second.castTo<StringRef>());
}
}
if(timeout.present()) {
newTr.transaction->setOption(FDBTransactionOptions::TIMEOUT, timeout);
}
}
newTr.onChange = currentDb.onChange;
@ -574,6 +593,9 @@ Version MultiVersionTransaction::getCommittedVersion() {
}
void MultiVersionTransaction::setOption(FDBTransactionOptions::Option option, Optional<StringRef> value) {
if(MultiVersionApi::apiVersionAtLeast(610) && FDBTransactionOptions::optionInfo[option].persistent) {
persistentOptions.push_back(std::make_pair(option, value.castTo<Standalone<StringRef>>()));
}
auto tr = getTransaction();
if(tr.transaction) {
tr.transaction->setOption(option, value);
@ -593,6 +615,8 @@ ThreadFuture<Void> MultiVersionTransaction::onError(Error const& e) {
}
void MultiVersionTransaction::reset() {
persistentOptions.clear();
setDefaultOptions(db->dbState->transactionDefaultOptions);
updateTransaction();
}
@ -630,13 +654,12 @@ Reference<IDatabase> MultiVersionDatabase::debugCreateFromExistingDatabase(Refer
}
Reference<ITransaction> MultiVersionDatabase::createTransaction() {
return Reference<ITransaction>(new MultiVersionTransaction(Reference<MultiVersionDatabase>::addRef(this)));
return Reference<ITransaction>(new MultiVersionTransaction(Reference<MultiVersionDatabase>::addRef(this), dbState->transactionDefaultOptions));
}
void MultiVersionDatabase::setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value) {
MutexHolder holder(dbState->optionLock);
auto itr = FDBDatabaseOptions::optionInfo.find(option);
if(itr != FDBDatabaseOptions::optionInfo.end()) {
TraceEvent("SetDatabaseOption").detail("Option", itr->second.name);
@ -646,11 +669,18 @@ void MultiVersionDatabase::setOption(FDBDatabaseOptions::Option option, Optional
throw invalid_option();
}
if(dbState->db) {
dbState->db->setOption(option, value);
int defaultFor = FDBDatabaseOptions::optionInfo[option].defaultFor;
if (defaultFor >= 0) {
ASSERT(FDBTransactionOptions::optionInfo.find((FDBTransactionOptions::Option)defaultFor) !=
FDBTransactionOptions::optionInfo.end());
dbState->transactionDefaultOptions.addOption((FDBTransactionOptions::Option)defaultFor, value.castTo<Standalone<StringRef>>());
}
dbState->options.push_back(std::make_pair(option, value.castTo<Standalone<StringRef>>()));
if(dbState->db) {
dbState->db->setOption(option, value);
}
}
void MultiVersionDatabase::Connector::connect() {
@ -811,6 +841,11 @@ void MultiVersionDatabase::DatabaseState::cancelConnections() {
// MultiVersionApi
bool MultiVersionApi::apiVersionAtLeast(int minVersion) {
ASSERT(MultiVersionApi::api->apiVersion != 0);
return MultiVersionApi::api->apiVersion >= minVersion;
}
// runOnFailedClients should be used cautiously. Some failed clients may not have successfully loaded all symbols.
void MultiVersionApi::runOnExternalClients(std::function<void(Reference<ClientInfo>)> func, bool runOnFailedClients) {
bool newFailure = false;

View File

@ -210,7 +210,7 @@ class MultiVersionDatabase;
class MultiVersionTransaction : public ITransaction, ThreadSafeReferenceCounted<MultiVersionTransaction> {
public:
MultiVersionTransaction(Reference<MultiVersionDatabase> db);
MultiVersionTransaction(Reference<MultiVersionDatabase> db, UniqueOrderedOptionList<FDBTransactionOptions> defaultOptions);
void cancel();
void setVersion(Version v);
@ -261,6 +261,9 @@ private:
TransactionInfo getTransaction();
void updateTransaction();
void setDefaultOptions(UniqueOrderedOptionList<FDBTransactionOptions> options);
std::vector<std::pair<FDBTransactionOptions::Option, Optional<Standalone<StringRef>>>> persistentOptions;
};
struct ClientInfo : ThreadSafeReferenceCounted<ClientInfo> {
@ -341,6 +344,7 @@ private:
std::vector<Reference<Connector>> connectionAttempts;
std::vector<std::pair<FDBDatabaseOptions::Option, Optional<Standalone<StringRef>>>> options;
UniqueOrderedOptionList<FDBTransactionOptions> transactionDefaultOptions;
Mutex optionLock;
};
@ -370,6 +374,8 @@ public:
bool callbackOnMainThread;
bool localClientDisabled;
static bool apiVersionAtLeast(int minVersion);
private:
MultiVersionApi();

View File

@ -279,7 +279,6 @@ struct TrInfoChunk {
ACTOR static Future<Void> transactionInfoCommitActor(Transaction *tr, std::vector<TrInfoChunk> *chunks) {
state const Key clientLatencyAtomicCtr = CLIENT_LATENCY_INFO_CTR_PREFIX.withPrefix(fdbClientInfoPrefixRange.begin);
state int retryCount = 0;
loop{
try {
tr->reset();
@ -296,9 +295,6 @@ ACTOR static Future<Void> transactionInfoCommitActor(Transaction *tr, std::vecto
return Void();
}
catch (Error& e) {
retryCount++;
if (retryCount == 10)
throw;
wait(tr->onError(e));
}
}
@ -516,15 +512,13 @@ DatabaseContext::DatabaseContext(
lockAware(lockAware), apiVersion(apiVersion), provisional(false),
transactionReadVersions(0), transactionLogicalReads(0), transactionPhysicalReads(0), transactionCommittedMutations(0), transactionCommittedMutationBytes(0),
transactionsCommitStarted(0), transactionsCommitCompleted(0), transactionsTooOld(0), transactionsFutureVersions(0), transactionsNotCommitted(0),
transactionsMaybeCommitted(0), transactionsResourceConstrained(0), transactionsProcessBehind(0), outstandingWatches(0), transactionTimeout(0.0), transactionMaxRetries(-1),
transactionsMaybeCommitted(0), transactionsResourceConstrained(0), transactionsProcessBehind(0), outstandingWatches(0),
latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), mvCacheInsertLocation(0),
healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0)
{
metadataVersionCache.resize(CLIENT_KNOBS->METADATA_VERSION_CACHE_SIZE);
maxOutstandingWatches = CLIENT_KNOBS->DEFAULT_MAX_OUTSTANDING_WATCHES;
transactionMaxBackoff = CLIENT_KNOBS->FAILURE_MAX_DELAY;
transactionMaxSize = CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT;
snapshotRywEnabled = apiVersionAtLeast(300) ? 1 : 0;
logger = databaseLogger( this );
@ -745,52 +739,43 @@ uint64_t extractHexOption( StringRef value ) {
}
void DatabaseContext::setOption( FDBDatabaseOptions::Option option, Optional<StringRef> value) {
switch(option) {
case FDBDatabaseOptions::LOCATION_CACHE_SIZE:
locationCacheSize = (int)extractIntOption(value, 0, std::numeric_limits<int>::max());
break;
case FDBDatabaseOptions::MACHINE_ID:
clientLocality = LocalityData( clientLocality.processId(), value.present() ? Standalone<StringRef>(value.get()) : Optional<Standalone<StringRef>>(), clientLocality.machineId(), clientLocality.dcId() );
if( clientInfo->get().proxies.size() )
masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, clientLocality ) );
server_interf.clear();
locationCache.insert( allKeys, Reference<LocationInfo>() );
break;
case FDBDatabaseOptions::MAX_WATCHES:
maxOutstandingWatches = (int)extractIntOption(value, 0, CLIENT_KNOBS->ABSOLUTE_MAX_WATCHES);
break;
case FDBDatabaseOptions::DATACENTER_ID:
clientLocality = LocalityData(clientLocality.processId(), clientLocality.zoneId(), clientLocality.machineId(), value.present() ? Standalone<StringRef>(value.get()) : Optional<Standalone<StringRef>>());
if( clientInfo->get().proxies.size() )
masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, clientLocality ));
server_interf.clear();
locationCache.insert( allKeys, Reference<LocationInfo>() );
break;
case FDBDatabaseOptions::TRANSACTION_TIMEOUT:
if( !apiVersionAtLeast(610) ) {
throw invalid_option();
}
transactionTimeout = extractIntOption(value, 0, std::numeric_limits<int>::max())/1000.0;
break;
case FDBDatabaseOptions::TRANSACTION_RETRY_LIMIT:
transactionMaxRetries = (int)extractIntOption(value, -1, std::numeric_limits<int>::max());
break;
case FDBDatabaseOptions::TRANSACTION_MAX_RETRY_DELAY:
validateOptionValue(value, true);
transactionMaxBackoff = extractIntOption(value, 0, std::numeric_limits<int32_t>::max()) / 1000.0;
break;
case FDBDatabaseOptions::TRANSACTION_SIZE_LIMIT:
validateOptionValue(value, true);
transactionMaxSize = extractIntOption(value, 32, CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT);
break;
case FDBDatabaseOptions::SNAPSHOT_RYW_ENABLE:
validateOptionValue(value, false);
snapshotRywEnabled++;
break;
case FDBDatabaseOptions::SNAPSHOT_RYW_DISABLE:
validateOptionValue(value, false);
snapshotRywEnabled--;
break;
int defaultFor = FDBDatabaseOptions::optionInfo[option].defaultFor;
if (defaultFor >= 0) {
ASSERT(FDBTransactionOptions::optionInfo.find((FDBTransactionOptions::Option)defaultFor) !=
FDBTransactionOptions::optionInfo.end());
transactionDefaults.addOption((FDBTransactionOptions::Option)option, value.castTo<Standalone<StringRef>>());
}
else {
switch(option) {
case FDBDatabaseOptions::LOCATION_CACHE_SIZE:
locationCacheSize = (int)extractIntOption(value, 0, std::numeric_limits<int>::max());
break;
case FDBDatabaseOptions::MACHINE_ID:
clientLocality = LocalityData( clientLocality.processId(), value.present() ? Standalone<StringRef>(value.get()) : Optional<Standalone<StringRef>>(), clientLocality.machineId(), clientLocality.dcId() );
if( clientInfo->get().proxies.size() )
masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, clientLocality ) );
server_interf.clear();
locationCache.insert( allKeys, Reference<LocationInfo>() );
break;
case FDBDatabaseOptions::MAX_WATCHES:
maxOutstandingWatches = (int)extractIntOption(value, 0, CLIENT_KNOBS->ABSOLUTE_MAX_WATCHES);
break;
case FDBDatabaseOptions::DATACENTER_ID:
clientLocality = LocalityData(clientLocality.processId(), clientLocality.zoneId(), clientLocality.machineId(), value.present() ? Standalone<StringRef>(value.get()) : Optional<Standalone<StringRef>>());
if( clientInfo->get().proxies.size() )
masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, clientLocality ));
server_interf.clear();
locationCache.insert( allKeys, Reference<LocationInfo>() );
break;
case FDBDatabaseOptions::SNAPSHOT_RYW_ENABLE:
validateOptionValue(value, false);
snapshotRywEnabled++;
break;
case FDBDatabaseOptions::SNAPSHOT_RYW_DISABLE:
validateOptionValue(value, false);
snapshotRywEnabled--;
break;
}
}
}
@ -839,6 +824,11 @@ Database Database::createDatabase( std::string connFileName, int apiVersion, Loc
return Database::createDatabase(rccf, apiVersion, clientLocality);
}
const UniqueOrderedOptionList<FDBTransactionOptions>& Database::getTransactionDefaults() const {
ASSERT(db);
return db->transactionDefaults;
}
extern IPAddress determinePublicIPAutomatically(ClusterConnectionString const& ccs);
Cluster::Cluster( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<int>> connectedCoordinatorsNum, int apiVersion )
@ -2457,8 +2447,6 @@ double Transaction::getBackoff(int errCode) {
}
TransactionOptions::TransactionOptions(Database const& cx) {
maxBackoff = cx->transactionMaxBackoff;
sizeLimit = cx->transactionMaxSize;
reset(cx);
if (BUGGIFY) {
commitOnFirstProxy = true;
@ -2472,11 +2460,9 @@ TransactionOptions::TransactionOptions() {
}
void TransactionOptions::reset(Database const& cx) {
double oldMaxBackoff = maxBackoff;
uint32_t oldSizeLimit = sizeLimit;
memset(this, 0, sizeof(*this));
maxBackoff = cx->apiVersionAtLeast(610) ? oldMaxBackoff : cx->transactionMaxBackoff;
sizeLimit = oldSizeLimit;
maxBackoff = CLIENT_KNOBS->DEFAULT_MAX_BACKOFF;
sizeLimit = CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT;
lockAware = cx->lockAware;
}
@ -2503,7 +2489,6 @@ void Transaction::reset() {
void Transaction::fullReset() {
reset();
backoff = CLIENT_KNOBS->DEFAULT_BACKOFF;
options.maxBackoff = getDatabase()->transactionMaxBackoff;
}
int Transaction::apiVersionAtLeast(int minVersion) const {
@ -3150,8 +3135,7 @@ Future<Standalone<StringRef>> Transaction::getVersionstamp() {
}
Future<Void> Transaction::onError( Error const& e ) {
if (e.code() == error_code_success)
{
if (e.code() == error_code_success) {
return client_invalid_operation();
}
if (e.code() == error_code_not_committed ||
@ -3175,7 +3159,7 @@ Future<Void> Transaction::onError( Error const& e ) {
double backoff = getBackoff(e.code());
reset();
return delay( backoff, info.taskID );
return delay(backoff, info.taskID);
}
if (e.code() == error_code_transaction_too_old ||
e.code() == error_code_future_version)
@ -3187,7 +3171,7 @@ Future<Void> Transaction::onError( Error const& e ) {
double maxBackoff = options.maxBackoff;
reset();
return delay( std::min(CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, maxBackoff), info.taskID );
return delay(std::min(CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, maxBackoff), info.taskID);
}
if(g_network->isSimulated() && ++numErrors % 10 == 0)

View File

@ -90,6 +90,8 @@ public:
inline DatabaseContext* extractPtr() { return db.extractPtr(); }
DatabaseContext* operator->() const { return db.getPtr(); }
const UniqueOrderedOptionList<FDBTransactionOptions>& getTransactionDefaults() const;
private:
Reference<DatabaseContext> db;
};

View File

@ -1124,7 +1124,8 @@ public:
};
ReadYourWritesTransaction::ReadYourWritesTransaction( Database const& cx ) : cache(&arena), writes(&arena), tr(cx), retries(0), creationTime(now()), commitStarted(false), options(tr), deferredError(cx->deferredError) {
resetTimeout();
std::copy(cx.getTransactionDefaults().begin(), cx.getTransactionDefaults().end(), std::back_inserter(persistentOptions));
applyPersistentOptions();
}
ACTOR Future<Void> timebomb(double endTime, Promise<Void> resetPromise) {
@ -1473,36 +1474,16 @@ void ReadYourWritesTransaction::writeRangeToNativeTransaction( KeyRangeRef const
}
ReadYourWritesTransactionOptions::ReadYourWritesTransactionOptions(Transaction const& tr) {
Database cx = tr.getDatabase();
timeoutInSeconds = cx->transactionTimeout;
maxRetries = cx->transactionMaxRetries;
reset(tr);
}
void ReadYourWritesTransactionOptions::reset(Transaction const& tr) {
double oldTimeout = timeoutInSeconds;
int oldMaxRetries = maxRetries;
memset(this, 0, sizeof(*this));
if( tr.apiVersionAtLeast(610) ) {
// Starting in API version 610, these options are not cleared after reset.
timeoutInSeconds = oldTimeout;
maxRetries = oldMaxRetries;
}
else {
Database cx = tr.getDatabase();
maxRetries = cx->transactionMaxRetries;
timeoutInSeconds = cx->transactionTimeout;
}
timeoutInSeconds = 0.0;
maxRetries = -1;
snapshotRywEnabled = tr.getDatabase()->snapshotRywEnabled;
}
void ReadYourWritesTransactionOptions::fullReset(Transaction const& tr) {
reset(tr);
Database cx = tr.getDatabase();
maxRetries = cx->transactionMaxRetries;
timeoutInSeconds = cx->transactionTimeout;
}
bool ReadYourWritesTransactionOptions::getAndResetWriteConflictDisabled() {
bool disabled = nextWriteDisableConflictRange;
nextWriteDisableConflictRange = false;
@ -1777,7 +1758,15 @@ Future<Standalone<StringRef>> ReadYourWritesTransaction::getVersionstamp() {
return waitOrError(tr.getVersionstamp(), resetPromise.getFuture());
}
void ReadYourWritesTransaction::setOption( FDBTransactionOptions::Option option, Optional<StringRef> value ) {
void ReadYourWritesTransaction::setOption( FDBTransactionOptions::Option option, Optional<StringRef> value ) {
setOptionImpl(option, value);
if(FDBTransactionOptions::optionInfo[option].persistent) {
persistentOptions.push_back(std::make_pair(option, value.castTo<Standalone<StringRef>>()));
}
}
void ReadYourWritesTransaction::setOptionImpl( FDBTransactionOptions::Option option, Optional<StringRef> value ) {
switch(option) {
case FDBTransactionOptions::READ_YOUR_WRITES_DISABLE:
validateOptionValue(value, false);
@ -1815,8 +1804,8 @@ void ReadYourWritesTransaction::setOption( FDBTransactionOptions::Option option,
case FDBTransactionOptions::TIMEOUT:
options.timeoutInSeconds = extractIntOption(value, 0, std::numeric_limits<int>::max())/1000.0;
resetTimeout();
break;
resetTimeout();
break;
case FDBTransactionOptions::RETRY_LIMIT:
options.maxRetries = (int)extractIntOption(value, -1, std::numeric_limits<int>::max());
@ -1872,6 +1861,7 @@ void ReadYourWritesTransaction::operator=(ReadYourWritesTransaction&& r) BOOST_N
transactionDebugInfo = r.transactionDebugInfo;
cache.arena = &arena;
writes.arena = &arena;
persistentOptions = std::move(r.persistentOptions);
}
ReadYourWritesTransaction::ReadYourWritesTransaction(ReadYourWritesTransaction&& r) BOOST_NOEXCEPT :
@ -1894,12 +1884,29 @@ ReadYourWritesTransaction::ReadYourWritesTransaction(ReadYourWritesTransaction&&
readConflicts = std::move(r.readConflicts);
watchMap = std::move( r.watchMap );
r.resetPromise = Promise<Void>();
persistentOptions = std::move(r.persistentOptions);
}
Future<Void> ReadYourWritesTransaction::onError(Error const& e) {
return RYWImpl::onError( this, e );
}
void ReadYourWritesTransaction::applyPersistentOptions() {
Optional<StringRef> timeout;
for (auto option : persistentOptions) {
if(option.first == FDBTransactionOptions::TIMEOUT) {
timeout = option.second.castTo<StringRef>();
}
else {
setOptionImpl(option.first, option.second.castTo<StringRef>());
}
}
if(timeout.present()) {
setOptionImpl(FDBTransactionOptions::TIMEOUT, timeout);
}
}
void ReadYourWritesTransaction::resetRyow() {
Promise<Void> oldReset = resetPromise;
resetPromise = Promise<Void>();
@ -1917,7 +1924,7 @@ void ReadYourWritesTransaction::resetRyow() {
if(tr.apiVersionAtLeast(16)) {
options.reset(tr);
resetTimeout();
applyPersistentOptions();
}
if ( !oldReset.isSet() )
@ -1933,9 +1940,11 @@ void ReadYourWritesTransaction::reset() {
retries = 0;
creationTime = now();
timeoutActor.cancel();
options.fullReset(tr);
persistentOptions.clear();
options.reset(tr);
transactionDebugInfo.clear();
tr.fullReset();
std::copy(tr.getDatabase().getTransactionDefaults().begin(), tr.getDatabase().getTransactionDefaults().end(), std::back_inserter(persistentOptions));
resetRyow();
}

View File

@ -44,7 +44,6 @@ struct ReadYourWritesTransactionOptions {
ReadYourWritesTransactionOptions() {}
explicit ReadYourWritesTransactionOptions(Transaction const& tr);
void reset(Transaction const& tr);
void fullReset(Transaction const& tr);
bool getAndResetWriteConflictDisabled();
};
@ -160,6 +159,10 @@ private:
void debugLogRetries(Optional<Error> error = Optional<Error>());
void setOptionImpl( FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>() );
void applyPersistentOptions();
std::vector<std::pair<FDBTransactionOptions::Option, Optional<Standalone<StringRef>>>> persistentOptions;
ReadYourWritesTransactionOptions options;
};

View File

@ -53,6 +53,8 @@ Reference<ITransaction> ThreadSafeDatabase::createTransaction() {
void ThreadSafeDatabase::setOption( FDBDatabaseOptions::Option option, Optional<StringRef> value) {
DatabaseContext *db = this->db;
Standalone<Optional<StringRef>> passValue = value;
// ThreadSafeDatabase is not allowed to do anything with options except pass them through to RYW.
onMainThreadVoid( [db, option, passValue](){
db->checkDeferredError();
db->setOption(option, passValue.contents());
@ -274,6 +276,8 @@ ThreadFuture<Standalone<StringRef>> ThreadSafeTransaction::getVersionstamp() {
void ThreadSafeTransaction::setOption( FDBTransactionOptions::Option option, Optional<StringRef> value ) {
ReadYourWritesTransaction *tr = this->tr;
Standalone<Optional<StringRef>> passValue = value;
// ThreadSafeTransaction is not allowed to do anything with options except pass them through to RYW.
onMainThreadVoid( [tr, option, passValue](){ tr->setOption(option, passValue.contents()); }, &tr->deferredError );
}

View File

@ -47,8 +47,8 @@ namespace vexillographer
private static string getCInfoLine(Option o, string indent, string structName)
{
return String.Format("{0}ADD_OPTION_INFO({1}, {2}, \"{2}\", \"{3}\", \"{4}\", {5}, {6})",
indent, structName, o.name.ToUpper(), o.comment, o.getParameterComment(), (o.paramDesc != null).ToString().ToLower(), o.hidden.ToString().ToLower());
return String.Format("{0}ADD_OPTION_INFO({1}, {2}, \"{2}\", \"{3}\", \"{4}\", {5}, {6}, {7}, {8})",
indent, structName, o.name.ToUpper(), o.comment, o.getParameterComment(), (o.paramDesc != null).ToString().ToLower(), o.hidden.ToString().ToLower(), o.persistent.ToString().ToLower(), o.defaultFor);
}
private static void writeCppInfo(TextWriter outFile, Scope scope, IEnumerable<Option> options)

View File

@ -148,16 +148,20 @@ description is not currently required but encouraged.
description="Specify the datacenter ID that was passed to fdbserver processes running in the same datacenter as this client, for better location-aware load balancing." />
<Option name="transaction_timeout" code="500"
paramType="Int" paramDescription="value in milliseconds of timeout"
description="Set a timeout in milliseconds which, when elapsed, will cause each transaction automatically to be cancelled. This sets the ``timeout`` option of each transaction created by this database. See the transaction option description for more information. Using this option requires that the API version is 610 or higher." />
description="Set a timeout in milliseconds which, when elapsed, will cause each transaction automatically to be cancelled. This sets the ``timeout`` option of each transaction created by this database. See the transaction option description for more information. Using this option requires that the API version is 610 or higher."
defaultFor="500"/>
<Option name="transaction_retry_limit" code="501"
paramType="Int" paramDescription="number of times to retry"
description="Set a timeout in milliseconds which, when elapsed, will cause a transaction automatically to be cancelled. This sets the ``retry_limit`` option of each transaction created by this database. See the transaction option description for more information." />
description="Set a timeout in milliseconds which, when elapsed, will cause a transaction automatically to be cancelled. This sets the ``retry_limit`` option of each transaction created by this database. See the transaction option description for more information."
defaultFor="501"/>
<Option name="transaction_max_retry_delay" code="502"
paramType="Int" paramDescription="value in milliseconds of maximum delay"
description="Set the maximum amount of backoff delay incurred in the call to ``onError`` if the error is retryable. This sets the ``max_retry_delay`` option of each transaction created by this database. See the transaction option description for more information." />
description="Set the maximum amount of backoff delay incurred in the call to ``onError`` if the error is retryable. This sets the ``max_retry_delay`` option of each transaction created by this database. See the transaction option description for more information."
defaultFor="502"/>
<Option name="transaction_size_limit" code="503"
paramType="Int" paramDescription="value in bytes"
description="Set the maximum transaction size which, if exceeded, will cause the transaction to be cancelled. Default to 10,000,000 bytes." />
description="Set the maximum transaction size in bytes. This sets the ``size_limit`` option on each transaction created by this database. See the transaction option description for more information."
defaultFor="503"/>
<Option name="snapshot_ryw_enable" code="26"
description="Snapshot read operations will see the results of writes done in the same transaction. This is the default behavior." />
<Option name="snapshot_ryw_disable" code="27"
@ -206,16 +210,19 @@ description is not currently required but encouraged.
description="Enables tracing for this transaction and logs results to the client trace logs. The DEBUG_TRANSACTION_IDENTIFIER option must be set before using this option, and client trace logging must be enabled and to get log output." />
<Option name="timeout" code="500"
paramType="Int" paramDescription="value in milliseconds of timeout"
description="Set a timeout in milliseconds which, when elapsed, will cause the transaction automatically to be cancelled. Valid parameter values are ``[0, INT_MAX]``. If set to 0, will disable all timeouts. All pending and any future uses of the transaction will throw an exception. The transaction can be used again after it is reset. Prior to API version 610, like all other transaction options, the timeout must be reset after a call to ``onError``. If the API version is 610 or greater, the timeout is not reset after an ``onError`` call. This allows the user to specify a longer timeout on specific transactions than the default timeout specified through the ``transaction_timeout`` database option without the shorter database timeout cancelling transactions that encounter a retryable error. Note that at all API versions, it is safe and legal to set the timeout each time the transaction begins, so most code written assuming the older behavior can be upgraded to the newer behavior without requiring any modification, and the caller is not required to implement special logic in retry loops to only conditionally set this option." />
description="Set a timeout in milliseconds which, when elapsed, will cause the transaction automatically to be cancelled. Valid parameter values are ``[0, INT_MAX]``. If set to 0, will disable all timeouts. All pending and any future uses of the transaction will throw an exception. The transaction can be used again after it is reset. Prior to API version 610, like all other transaction options, the timeout must be reset after a call to ``onError``. If the API version is 610 or greater, the timeout is not reset after an ``onError`` call. This allows the user to specify a longer timeout on specific transactions than the default timeout specified through the ``transaction_timeout`` database option without the shorter database timeout cancelling transactions that encounter a retryable error. Note that at all API versions, it is safe and legal to set the timeout each time the transaction begins, so most code written assuming the older behavior can be upgraded to the newer behavior without requiring any modification, and the caller is not required to implement special logic in retry loops to only conditionally set this option."
persistent="true" />
<Option name="retry_limit" code="501"
paramType="Int" paramDescription="number of times to retry"
description="Set a maximum number of retries after which additional calls to ``onError`` will throw the most recently seen error code. Valid parameter values are ``[-1, INT_MAX]``. If set to -1, will disable the retry limit. Prior to API version 610, like all other transaction options, the retry limit must be reset after a call to ``onError``. If the API version is 610 or greater, the retry limit is not reset after an ``onError`` call. Note that at all API versions, it is safe and legal to set the retry limit each time the transaction begins, so most code written assuming the older behavior can be upgraded to the newer behavior without requiring any modification, and the caller is not required to implement special logic in retry loops to only conditionally set this option." />
description="Set a maximum number of retries after which additional calls to ``onError`` will throw the most recently seen error code. Valid parameter values are ``[-1, INT_MAX]``. If set to -1, will disable the retry limit. Prior to API version 610, like all other transaction options, the retry limit must be reset after a call to ``onError``. If the API version is 610 or greater, the retry limit is not reset after an ``onError`` call. Note that at all API versions, it is safe and legal to set the retry limit each time the transaction begins, so most code written assuming the older behavior can be upgraded to the newer behavior without requiring any modification, and the caller is not required to implement special logic in retry loops to only conditionally set this option."
persistent="true"/>
<Option name="max_retry_delay" code="502"
paramType="Int" paramDescription="value in milliseconds of maximum delay"
description="Set the maximum amount of backoff delay incurred in the call to ``onError`` if the error is retryable. Defaults to 1000 ms. Valid parameter values are ``[0, INT_MAX]``. If the maximum retry delay is less than the current retry delay of the transaction, then the current retry delay will be clamped to the maximum retry delay. Prior to API version 610, like all other transaction options, the maximum retry delay must be reset after a call to ``onError``. If the API version is 610 or greater, the retry limit is not reset after an ``onError`` call. Note that at all API versions, it is safe and legal to set the maximum retry delay each time the transaction begins, so most code written assuming the older behavior can be upgraded to the newer behavior without requiring any modification, and the caller is not required to implement special logic in retry loops to only conditionally set this option."/>
description="Set the maximum amount of backoff delay incurred in the call to ``onError`` if the error is retryable. Defaults to 1000 ms. Valid parameter values are ``[0, INT_MAX]``. If the maximum retry delay is less than the current retry delay of the transaction, then the current retry delay will be clamped to the maximum retry delay. Prior to API version 610, like all other transaction options, the maximum retry delay must be reset after a call to ``onError``. If the API version is 610 or greater, the retry limit is not reset after an ``onError`` call. Note that at all API versions, it is safe and legal to set the maximum retry delay each time the transaction begins, so most code written assuming the older behavior can be upgraded to the newer behavior without requiring any modification, and the caller is not required to implement special logic in retry loops to only conditionally set this option."
persistent="true"/>
<Option name="size_limit" code="503"
paramType="Int" paramDescription="value in bytes"
description="Set the maximum transaction size which, if exceeded, will cause the transaction to be cancelled. Valid parameter values are ``[32, 10,000,000]```." />
description="Set the transaction size limit in bytes. The size is calculated by combining the sizes of all keys and values written or mutated, all key ranges cleared, and all read and write conflict ranges. (In other words, it includes the total size of all data included in the request to the cluster to commit the transaction.) Large transactions can cause performance problems on FoundationDB clusters, so setting this limit to a smaller value than the default can help prevent the client from accidentally degrading the cluster's performance. This value must be at least 32 and cannot be set to higher than 10,000,000, the default transaction size limit." />
<Option name="snapshot_ryw_enable" code="600"
description="Snapshot read operations will see the results of writes done in the same transaction. This is the default behavior." />
<Option name="snapshot_ryw_disable" code="601"

View File

@ -54,7 +54,9 @@ namespace vexillographer
public string paramDesc { get; set; }
public int code { get; set; }
public bool hidden { get; set; }
private string _comment;
public bool persistent { get; set; }
public int defaultFor { get; set; }
private string _comment;
public string comment {
get {
return _comment == null ? "" : _comment;
@ -132,7 +134,10 @@ namespace vexillographer
var paramTypeStr = oDoc.AttributeOrNull("paramType");
ParamType p = paramTypeStr == null ? ParamType.None : (ParamType)Enum.Parse(typeof(ParamType), paramTypeStr);
bool hidden = oDoc.AttributeOrNull("hidden") == "true";
string disableOn = oDoc.AttributeOrNull("disableOn");
bool persistent = oDoc.AttributeOrNull("persistent") == "true";
String defaultForString = oDoc.AttributeOrNull("defaultFor");
int defaultFor = defaultForString == null ? -1 : int.Parse(defaultForString);
string disableOn = oDoc.AttributeOrNull("disableOn");
bool disabled = false;
if(disableOn != null)
{
@ -150,7 +155,9 @@ namespace vexillographer
paramType = p,
paramDesc = oDoc.AttributeOrNull("paramDescription"),
comment = oDoc.AttributeOrNull("description"),
hidden = hidden
hidden = hidden,
persistent = persistent,
defaultFor = defaultFor
});
}
}