/* * NativeAPI.actor.h * * This source file is part of the FoundationDB open source project * * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_NATIVEAPI_ACTOR_G_H) #define FDBCLIENT_NATIVEAPI_ACTOR_G_H #include "fdbclient/NativeAPI.actor.g.h" #elif !defined(FDBCLIENT_NATIVEAPI_ACTOR_H) #define FDBCLIENT_NATIVEAPI_ACTOR_H #include "flow/flow.h" #include "flow/TDMetric.actor.h" #include "fdbclient/FDBTypes.h" #include "fdbclient/MasterProxyInterface.h" #include "fdbclient/FDBOptions.g.h" #include "fdbclient/CoordinationInterface.h" #include "fdbclient/ClusterInterface.h" #include "fdbclient/ClientLogEvents.h" #include "flow/actorcompiler.h" // has to be last include // Incomplete types that are reference counted class DatabaseContext; template <> void addref( DatabaseContext* ptr ); template <> void delref( DatabaseContext* ptr ); void validateOptionValue(Optional value, bool shouldBePresent); void enableClientInfoLogging(); struct NetworkOptions { std::string localAddress; std::string clusterFile; Optional traceDirectory; uint64_t traceRollSize; uint64_t traceMaxLogsSize; std::string traceLogGroup; std::string traceFormat; Optional logClientInfo; Standalone> supportedVersions; bool slowTaskProfilingEnabled; // The default values, TRACE_DEFAULT_ROLL_SIZE and TRACE_DEFAULT_MAX_LOGS_SIZE are located in Trace.h. NetworkOptions() : localAddress(""), clusterFile(""), traceDirectory(Optional()), traceRollSize(TRACE_DEFAULT_ROLL_SIZE), traceMaxLogsSize(TRACE_DEFAULT_MAX_LOGS_SIZE), traceLogGroup("default"), traceFormat("xml"), slowTaskProfilingEnabled(false) {} }; class Database { public: enum { API_VERSION_LATEST = -1 }; static Database createDatabase( Reference connFile, int apiVersion, LocalityData const& clientLocality=LocalityData(), DatabaseContext *preallocatedDb=nullptr ); static Database createDatabase( std::string connFileName, int apiVersion, LocalityData const& clientLocality=LocalityData() ); Database() {} // an uninitialized database can be destructed or reassigned safely; that's it void operator= ( Database const& rhs ) { db = rhs.db; } Database( Database const& rhs ) : db(rhs.db) {} Database(Database&& r) BOOST_NOEXCEPT : db(std::move(r.db)) {} void operator= (Database&& r) BOOST_NOEXCEPT { db = std::move(r.db); } // For internal use by the native client: explicit Database(Reference cx) : db(cx) {} explicit Database( DatabaseContext* cx ) : db(cx) {} inline DatabaseContext* getPtr() const { return db.getPtr(); } inline DatabaseContext* extractPtr() { return db.extractPtr(); } DatabaseContext* operator->() const { return db.getPtr(); } private: Reference db; }; void setNetworkOption(FDBNetworkOptions::Option option, Optional value = Optional() ); // Configures the global networking machinery void setupNetwork(uint64_t transportId = 0, bool useMetrics = false); // This call blocks while the network is running. To use the API in a single-threaded // environment, the calling program must have ACTORs already launched that are waiting // to use the network. In this case, the program can terminate by calling stopNetwork() // from a callback, thereby releasing this call to return. In a multithreaded setup // this call can be called from a dedicated "networking" thread. All the network-based // callbacks will happen on this second thread. When a program is finished, the // call stopNetwork (from a non-networking thread) can cause the runNetwork() call to // return. // // Throws network_already_setup if g_network has already been initalized void runNetwork(); // See above. Can be called from a thread that is not the "networking thread" // // Throws network_not_setup if g_network has not been initalized void stopNetwork(); /* * Starts and holds the monitorLeader and failureMonitorClient actors */ class Cluster : public ReferenceCounted, NonCopyable { public: Cluster(Reference connFile, Reference> connectedCoordinatorsNum, int apiVersion=Database::API_VERSION_LATEST); Cluster(Reference connFile, Reference>> clusterInterface, Reference> connectedCoordinatorsNum); ~Cluster(); Reference>> getClusterInterface(); Reference getConnectionFile() { return connectionFile; } Future onConnected(); private: void init(Reference connFile, bool startClientInfoMonitor, Reference> connectedCoordinatorsNum, int apiVersion=Database::API_VERSION_LATEST); Reference>> clusterInterface; Reference connectionFile; Future leaderMon; Future failMon; Future connected; }; struct StorageMetrics; struct TransactionOptions { double maxBackoff; uint32_t getReadVersionFlags; uint32_t customTransactionSizeLimit; bool checkWritesEnabled : 1; bool causalWriteRisky : 1; bool commitOnFirstProxy : 1; bool debugDump : 1; bool lockAware : 1; bool readOnly : 1; bool firstInBatch : 1; TransactionOptions(Database const& cx); TransactionOptions(); void reset(Database const& cx); }; struct TransactionInfo { Optional debugID; int taskID; bool useProvisionalProxies; explicit TransactionInfo( int taskID ) : taskID(taskID), useProvisionalProxies(false) {} }; struct TransactionLogInfo : public ReferenceCounted, NonCopyable { enum LoggingLocation { DONT_LOG = 0, TRACE_LOG = 1, DATABASE = 2 }; TransactionLogInfo() : logLocation(DONT_LOG) {} TransactionLogInfo(LoggingLocation location) : logLocation(location) {} TransactionLogInfo(std::string id, LoggingLocation location) : logLocation(location), identifier(id) {} void setIdentifier(std::string id) { identifier = id; } void logTo(LoggingLocation loc) { logLocation = logLocation | loc; } template void addLog(const T& event) { if(logLocation & TRACE_LOG) { ASSERT(!identifier.empty()) event.logEvent(identifier); } if (flushed) { return; } if(logLocation & DATABASE) { logsAdded = true; static_assert(std::is_base_of::value, "Event should be derived class of FdbClientLogEvents::Event"); trLogWriter << event; } } BinaryWriter trLogWriter{ IncludeVersion() }; bool logsAdded{ false }; bool flushed{ false }; int logLocation; std::string identifier; }; struct Watch : public ReferenceCounted, NonCopyable { Key key; Optional value; bool valuePresent; Optional setValue; bool setPresent; Promise onChangeTrigger; Promise onSetWatchTrigger; Future watchFuture; Watch() : watchFuture(Never()), valuePresent(false), setPresent(false) { } Watch(Key key) : key(key), watchFuture(Never()), valuePresent(false), setPresent(false) { } Watch(Key key, Optional val) : key(key), value(val), watchFuture(Never()), valuePresent(true), setPresent(false) { } void setWatch(Future watchFuture); }; class Transaction : NonCopyable { public: explicit Transaction( Database const& cx ); ~Transaction(); void preinitializeOnForeignThread() { committedVersion = invalidVersion; } void setVersion( Version v ); Future getReadVersion() { return getReadVersion(0); } Future< Optional > get( const Key& key, bool snapshot = false ); Future< Void > watch( Reference watch ); Future< Key > getKey( const KeySelector& key, bool snapshot = false ); //Future< Optional > get( const KeySelectorRef& key ); Future< Standalone > getRange( const KeySelector& begin, const KeySelector& end, int limit, bool snapshot = false, bool reverse = false ); Future< Standalone > getRange( const KeySelector& begin, const KeySelector& end, GetRangeLimits limits, bool snapshot = false, bool reverse = false ); Future< Standalone > getRange( const KeyRange& keys, int limit, bool snapshot = false, bool reverse = false ) { return getRange( KeySelector( firstGreaterOrEqual(keys.begin), keys.arena() ), KeySelector( firstGreaterOrEqual(keys.end), keys.arena() ), limit, snapshot, reverse ); } Future< Standalone > getRange( const KeyRange& keys, GetRangeLimits limits, bool snapshot = false, bool reverse = false ) { return getRange( KeySelector( firstGreaterOrEqual(keys.begin), keys.arena() ), KeySelector( firstGreaterOrEqual(keys.end), keys.arena() ), limits, snapshot, reverse ); } Future< Standalone>> getAddressesForKey (const Key& key ); void enableCheckWrites(); void addReadConflictRange( KeyRangeRef const& keys ); void addWriteConflictRange( KeyRangeRef const& keys ); void makeSelfConflicting(); Future< Void > warmRange( Database cx, KeyRange keys ); Future< StorageMetrics > waitStorageMetrics( KeyRange const& keys, StorageMetrics const& min, StorageMetrics const& max, StorageMetrics const& permittedError, int shardLimit ); Future< StorageMetrics > getStorageMetrics( KeyRange const& keys, int shardLimit ); Future< Standalone> > splitStorageMetrics( KeyRange const& keys, StorageMetrics const& limit, StorageMetrics const& estimated ); // If checkWriteConflictRanges is true, existing write conflict ranges will be searched for this key void set( const KeyRef& key, const ValueRef& value, bool addConflictRange = true ); void atomicOp( const KeyRef& key, const ValueRef& value, MutationRef::Type operationType, bool addConflictRange = true ); void clear( const KeyRangeRef& range, bool addConflictRange = true ); void clear( const KeyRef& key, bool addConflictRange = true ); Future commit(); // Throws not_committed or commit_unknown_result errors in normal operation void setOption( FDBTransactionOptions::Option option, Optional value = Optional() ); Version getCommittedVersion() { return committedVersion; } // May be called only after commit() returns success Future> getVersionstamp(); // Will be fulfilled only after commit() returns success Promise> versionstampPromise; Future onError( Error const& e ); void flushTrLogsIfEnabled(); // These are to permit use as state variables in actors: Transaction() : info( TaskDefaultEndpoint ) {} void operator=(Transaction&& r) BOOST_NOEXCEPT; void reset(); void fullReset(); double getBackoff(int errCode); void debugTransaction(UID dID) { info.debugID = dID; } Future commitMutations(); void setupWatches(); void cancelWatches(Error const& e = transaction_cancelled()); TransactionInfo info; int numErrors; std::vector> watches; int apiVersionAtLeast(int minVersion) const; void checkDeferredError(); Database getDatabase() const { return cx; } static Reference createTrLogInfoProbabilistically(const Database& cx); TransactionOptions options; double startTime; Reference trLogInfo; private: Future getReadVersion(uint32_t flags); void setPriority(uint32_t priorityFlag); Database cx; double backoff; Version committedVersion; CommitTransactionRequest tr; Future readVersion; Promise> metadataVersion; vector>> extraConflictRanges; Promise commitResult; Future committing; }; ACTOR Future waitForCommittedVersion(Database cx, Version version); std::string unprintable( const std::string& ); int64_t extractIntOption( Optional value, int64_t minValue = std::numeric_limits::min(), int64_t maxValue = std::numeric_limits::max() ); #include "flow/unactorcompiler.h" #endif