1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-06-01 18:56:00 +08:00

Merge pull request from sfc-gh-ajbeamon/fdb-tenant-client

Add client support for tenants
This commit is contained in:
A.J. Beamon 2022-03-15 12:40:17 -07:00 committed by GitHub
commit e8077b65e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 1685 additions and 482 deletions

@ -37,12 +37,14 @@ int g_api_version = 0;
* FDBFuture -> ThreadSingleAssignmentVarBase
* FDBResult -> ThreadSingleAssignmentVarBase
* FDBDatabase -> IDatabase
* FDBTenant -> ITenant
* FDBTransaction -> ITransaction
*/
#define TSAVB(f) ((ThreadSingleAssignmentVarBase*)(f))
#define TSAV(T, f) ((ThreadSingleAssignmentVar<T>*)(f))
#define DB(d) ((IDatabase*)d)
#define TENANT(t) ((ITenant*)t)
#define TXN(t) ((ITransaction*)t)
// Legacy (pre API version 610)
@ -386,6 +388,14 @@ extern "C" DLLEXPORT void fdb_database_destroy(FDBDatabase* d) {
CATCH_AND_DIE(DB(d)->delref(););
}
extern "C" DLLEXPORT fdb_error_t fdb_database_open_tenant(FDBDatabase* d,
uint8_t const* tenant_name,
int tenant_name_length,
FDBTenant** out_tenant) {
CATCH_AND_RETURN(*out_tenant =
(FDBTenant*)DB(d)->openTenant(TenantNameRef(tenant_name, tenant_name_length)).extractPtr(););
}
extern "C" DLLEXPORT fdb_error_t fdb_database_create_transaction(FDBDatabase* d, FDBTransaction** out_transaction) {
CATCH_AND_RETURN(Reference<ITransaction> tr = DB(d)->createTransaction();
if (g_api_version <= 15) tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
@ -439,6 +449,17 @@ extern "C" DLLEXPORT FDBFuture* fdb_database_get_server_protocol(FDBDatabase* db
}).extractPtr());
}
extern "C" DLLEXPORT fdb_error_t fdb_tenant_create_transaction(FDBTenant* tenant, FDBTransaction** out_transaction) {
CATCH_AND_RETURN(*out_transaction = (FDBTransaction*)TENANT(tenant)->createTransaction().extractPtr(););
}
extern "C" DLLEXPORT void fdb_tenant_destroy(FDBTenant* tenant) {
try {
TENANT(tenant)->delref();
} catch (...) {
}
}
extern "C" DLLEXPORT void fdb_transaction_destroy(FDBTransaction* tr) {
try {
TXN(tr)->delref();

@ -67,6 +67,7 @@ extern "C" {
typedef struct FDB_future FDBFuture;
typedef struct FDB_result FDBResult;
typedef struct FDB_database FDBDatabase;
typedef struct FDB_tenant FDBTenant;
typedef struct FDB_transaction FDBTransaction;
typedef int fdb_error_t;
@ -271,6 +272,11 @@ DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_database_set_option(FDBDatabase* d,
uint8_t const* value,
int value_length);
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_database_open_tenant(FDBDatabase* d,
uint8_t const* tenant_name,
int tenant_name_length,
FDBTenant** out_tenant);
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_database_create_transaction(FDBDatabase* d,
FDBTransaction** out_transaction);
@ -294,6 +300,11 @@ DLLEXPORT WARN_UNUSED_RESULT double fdb_database_get_main_thread_busyness(FDBDat
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_get_server_protocol(FDBDatabase* db, uint64_t expected_version);
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_tenant_create_transaction(FDBTenant* tenant,
FDBTransaction** out_transaction);
DLLEXPORT void fdb_tenant_destroy(FDBTenant* tenant);
DLLEXPORT void fdb_transaction_destroy(FDBTransaction* tr);
DLLEXPORT void fdb_transaction_cancel(FDBTransaction* tr);

@ -448,16 +448,21 @@ func (o TransactionOptions) SetInitializeNewDatabase() error {
return o.setOpt(300, nil)
}
// Allows this transaction to read and modify system keys (those that start with the byte 0xFF)
// Allows this transaction to read and modify system keys (those that start with the byte 0xFF). Implies raw_access.
func (o TransactionOptions) SetAccessSystemKeys() error {
return o.setOpt(301, nil)
}
// Allows this transaction to read system keys (those that start with the byte 0xFF)
// Allows this transaction to read system keys (those that start with the byte 0xFF). Implies raw_access.
func (o TransactionOptions) SetReadSystemKeys() error {
return o.setOpt(302, nil)
}
// Allows this transaction to access the raw key-space when tenant mode is on.
func (o TransactionOptions) SetRawAccess() error {
return o.setOpt(303, nil)
}
// Not yet implemented.
func (o TransactionOptions) SetDebugRetryLogging(param string) error {
return o.setOpt(401, []byte(param))

@ -192,6 +192,8 @@ class BaseInfo(object):
self.start_timestamp = bb.get_double()
if protocol_version >= PROTOCOL_VERSION_6_3:
self.dc_id = bb.get_bytes_with_length()
if protocol_version >= PROTOCOL_VERSION_7_1:
self.tenant = bb.get_bytes_with_length()
class GetVersionInfo(BaseInfo):
def __init__(self, bb, protocol_version):

@ -6,6 +6,7 @@
.. |database-type| replace:: ``FDBDatabase``
.. |database-class| replace:: :type:`FDBDatabase`
.. |database-auto| replace:: FIXME
.. |tenant-type| replace:: ``FDBTenant``
.. |transaction-class| replace:: FIXME
.. |get-key-func| replace:: :func:`fdb_transaction_get_key()`
.. |get-range-func| replace:: :func:`fdb_transaction_get_range()`
@ -419,9 +420,20 @@ An |database-blurb1| Modifications to a database are performed via transactions.
|option-doc|
.. function:: fdb_error_t fdb_database_open_tenant(FDBDatabase* database, uint8_t const* tenant_name, int tenant_name_length, FDBTenant** out_tenant)
Opens a tenant on the given database. All transactions created by this tenant will operate on the tenant's key-space. The caller assumes ownership of the :type:`FDBTenant` object and must destroy it with :func:`fdb_tenant_destroy()`.
``tenant_name``
The name of the tenant being accessed, as a byte string.
``tenant_name_length``
The length of the tenant name byte string.
``*out_tenant``
Set to point to the newly created :type:`FDBTenant`.
.. function:: fdb_error_t fdb_database_create_transaction(FDBDatabase* database, FDBTransaction** out_transaction)
Creates a new transaction on the given database. The caller assumes ownership of the :type:`FDBTransaction` object and must destroy it with :func:`fdb_transaction_destroy()`.
Creates a new transaction on the given database without using a tenant, meaning that it will operate on the entire database key-space. The caller assumes ownership of the :type:`FDBTransaction` object and must destroy it with :func:`fdb_transaction_destroy()`.
``*out_transaction``
Set to point to the newly created :type:`FDBTransaction`.
@ -486,6 +498,26 @@ An |database-blurb1| Modifications to a database are performed via transactions.
Returns a value where 0 indicates that the client is idle and 1 (or larger) indicates that the client is saturated. By default, this value is updated every second.
Tenant
======
|tenant-blurb1|
.. type:: FDBTenant
An opaque type that represents a tenant in the FoundationDB C API.
.. function:: void fdb_tenant_destroy(FDBTenant* tenant)
Destroys an :type:`FDBTenant` object. It must be called exactly once for each successful call to :func:`fdb_database_create_tenant()`. This function only destroys a handle to the tenant -- the tenant and its data will be fine!
.. function:: fdb_error_t fdb_tenant_create_transaction(FDBTenant* tenant, FDBTronsaction **out_transaction)
Creates a new transaction on the given tenant. This transaction will operate within the tenant's key-space and cannot access data outside the tenant. The caller assumes ownership of the :type:`FDBTransaction` object and must destroy it with :func:`fdb_transaction_destroy()`.
``*out_transaction``
Set to point to the newly created :type:`FDBTransaction`.
Transaction
===========

@ -74,6 +74,9 @@
.. |database-sync| replace::
The convenience methods provided by |database-type| have the same signature as the corresponding methods of ``Transaction``. However, most of the |database-type| methods are fully synchronous. (An exception is the methods for watches.) As a result, the |database-type| methods do not support the use of :ref:`implicit parallelism with futures <developer-guide-programming-with-futures>`.
.. |tenant-blurb1| replace::
|tenant-type| represents a FoundationDB tenant. Tenants are optional named transaction domains that can be used to provide multiple disjoint key-spaces to client applications. A transaction created in a tenant will be limited to the keys contained within that tenant, and transactions operating on different tenants can use the same key names without interfering with each other.
.. |keysel-blurb1| replace::
FoundationDB's lexicographically ordered data model permits finding keys based on their order (for example, finding the first key in the database greater than a given key). Key selectors represent a description of a key in the database that could be resolved to an actual key by |get-key-func| or used directly as the beginning or end of a range in |get-range-func|.
@ -627,4 +630,4 @@
.. |option-set-distributed-client-tracer| replace::
Sets a tracer to run on the client. Should be set to the same value as the tracer set on the server.
Sets a tracer to run on the client. Should be set to the same value as the tracer set on the server.

@ -7,6 +7,7 @@
.. |database-type| replace:: ``Database``
.. |database-class| replace:: :class:`Database`
.. |database-auto| replace:: the :func:`@fdb.transactional <transactional>` decorator
.. |tenant-type| replace:: FIXME
.. |transaction-class| replace:: :class:`Transaction`
.. |get-key-func| replace:: :func:`Transaction.get_key`
.. |get-range-func| replace:: :func:`Transaction.get_range`

@ -5,6 +5,7 @@
.. |database-type| replace:: ``Database``
.. |database-class| replace:: :class:`Database`
.. |database-auto| replace:: :meth:`Database.transact`
.. |tenant-type| replace:: FIXME
.. |transaction-class| replace:: :class:`Transaction`
.. |get-key-func| replace:: :meth:`Transaction.get_key`
.. |get-range-func| replace:: :meth:`Transaction.get_range`

@ -8,6 +8,7 @@
.. |database-type| replace:: ``Database``
.. |database-class| replace:: ``Database``
.. |database-auto| replace:: FIXME
.. |tenant-type| replace:: FIXME
.. |transaction-class| replace:: ``Transaction``
.. |get-key-func| replace:: get_key()
.. |get-range-func| replace:: get_range()

@ -8,6 +8,7 @@
.. |database-type| replace:: ``Database``
.. |database-class| replace:: ``Database``
.. |database-auto| replace:: FIXME
.. |tenant-type| replace:: FIXME
.. |transaction-class| replace:: ``Transaction``
.. |get-key-func| replace:: get_key()
.. |get-range-func| replace:: get_range()

@ -205,6 +205,7 @@ that process, and wait for necessary data to be moved away.
#. ``\xff\xff/management/failed_locality/<locality>`` Read/write. Indicates that the cluster should consider matching processes as permanently failed. This allows the cluster to avoid maintaining extra state and doing extra work in the hope that these processes come back. See :ref:`removing machines from a cluster <removing-machines-from-a-cluster>` for documentation for the corresponding fdbcli command.
#. ``\xff\xff/management/options/excluded_locality/force`` Read/write. Setting this key disables safety checks for writes to ``\xff\xff/management/excluded_locality/<locality>``. Setting this key only has an effect in the current transaction and is not persisted on commit.
#. ``\xff\xff/management/options/failed_locality/force`` Read/write. Setting this key disables safety checks for writes to ``\xff\xff/management/failed_locality/<locality>``. Setting this key only has an effect in the current transaction and is not persisted on commit.
#. ``\xff\xff/management/tenant_map/<tenant>`` Read/write. Setting a key in this range to any value will result in a tenant being created with name ``<tenant>``. Clearing a key in this range will delete the tenant with name ``<tenant>``. Reading all or a portion of this range will return the list of tenants currently present in the cluster, excluding any changes in this transaction. Values read in this range will be JSON objects containing the metadata for the associated tenants.
An exclusion is syntactically either an ip address (e.g. ``127.0.0.1``), or
an ip address and port (e.g. ``127.0.0.1:4500``) or any locality (e.g ``locality_dcid:primary-satellite`` or

@ -61,6 +61,7 @@ void ClientKnobs::initialize(Randomize randomize) {
init( WRONG_SHARD_SERVER_DELAY, .01 ); if( randomize && BUGGIFY ) WRONG_SHARD_SERVER_DELAY = deterministicRandom()->random01(); // FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY; // SOMEDAY: This delay can limit performance of retrieving data when the cache is mostly wrong (e.g. dumping the database after a test)
init( FUTURE_VERSION_RETRY_DELAY, .01 ); if( randomize && BUGGIFY ) FUTURE_VERSION_RETRY_DELAY = deterministicRandom()->random01();// FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY;
init( UNKNOWN_TENANT_RETRY_DELAY, 0.0 ); if( randomize && BUGGIFY ) UNKNOWN_TENANT_RETRY_DELAY = deterministicRandom()->random01();
init( REPLY_BYTE_LIMIT, 80000 );
init( DEFAULT_BACKOFF, .01 ); if( randomize && BUGGIFY ) DEFAULT_BACKOFF = deterministicRandom()->random01();
init( DEFAULT_MAX_BACKOFF, 1.0 );
@ -89,6 +90,8 @@ void ClientKnobs::initialize(Randomize randomize) {
init( LOCATION_CACHE_EVICTION_SIZE_SIM, 10 ); if( randomize && BUGGIFY ) LOCATION_CACHE_EVICTION_SIZE_SIM = 3;
init( LOCATION_CACHE_ENDPOINT_FAILURE_GRACE_PERIOD, 60 );
init( LOCATION_CACHE_FAILED_ENDPOINT_RETRY_INTERVAL, 60 );
init( TENANT_CACHE_EVICTION_SIZE, 100000 );
init( TENANT_CACHE_EVICTION_SIZE_SIM, 10 ); if( randomize && BUGGIFY ) TENANT_CACHE_EVICTION_SIZE_SIM = 3;
init( GET_RANGE_SHARD_LIMIT, 2 );
init( WARM_RANGE_SHARD_LIMIT, 100 );

@ -60,6 +60,7 @@ public:
double WRONG_SHARD_SERVER_DELAY; // SOMEDAY: This delay can limit performance of retrieving data when the cache is
// mostly wrong (e.g. dumping the database after a test)
double FUTURE_VERSION_RETRY_DELAY;
double UNKNOWN_TENANT_RETRY_DELAY;
int REPLY_BYTE_LIMIT;
double DEFAULT_BACKOFF;
double DEFAULT_MAX_BACKOFF;
@ -89,6 +90,8 @@ public:
int LOCATION_CACHE_EVICTION_SIZE_SIM;
double LOCATION_CACHE_ENDPOINT_FAILURE_GRACE_PERIOD;
double LOCATION_CACHE_FAILED_ENDPOINT_RETRY_INTERVAL;
int TENANT_CACHE_EVICTION_SIZE;
int TENANT_CACHE_EVICTION_SIZE_SIM;
int GET_RANGE_SHARD_LIMIT;
int WARM_RANGE_SHARD_LIMIT;

@ -41,7 +41,8 @@ enum class TransactionPriorityType : int { PRIORITY_DEFAULT = 0, PRIORITY_BATCH
static_assert(sizeof(TransactionPriorityType) == 4, "transaction_profiling_analyzer.py assumes this field has size 4");
struct Event {
Event(EventType t, double ts, const Optional<Standalone<StringRef>>& dc) : type(t), startTs(ts) {
Event(EventType t, double ts, const Optional<Standalone<StringRef>>& dc, const Optional<TenantName>& tenant)
: type(t), startTs(ts), tenant(tenant) {
if (dc.present())
dcId = dc.get();
}
@ -49,7 +50,9 @@ struct Event {
template <typename Ar>
Ar& serialize(Ar& ar) {
if (ar.protocolVersion().version() >= (uint64_t)0x0FDB00B063010001LL) {
if (ar.protocolVersion().hasTenants()) {
return serializer(ar, type, startTs, dcId, tenant);
} else if (ar.protocolVersion().version() >= (uint64_t)0x0FDB00B063010001LL) {
return serializer(ar, type, startTs, dcId);
} else {
return serializer(ar, type, startTs);
@ -59,8 +62,10 @@ struct Event {
EventType type{ EventType::UNSET };
double startTs{ 0 };
Key dcId{};
Optional<TenantName> tenant{};
void logEvent(std::string id, int maxFieldLength) const {}
void augmentTraceEvent(TraceEvent& event) const { event.detail("Tenant", tenant); }
};
struct EventGetVersion : public Event {
@ -77,7 +82,9 @@ struct EventGetVersion : public Event {
double latency;
void logEvent(std::string id, int maxFieldLength) const {
TraceEvent("TransactionTrace_GetVersion").detail("TransactionID", id).detail("Latency", latency);
TraceEvent event("TransactionTrace_GetVersion");
event.detail("TransactionID", id).detail("Latency", latency);
augmentTraceEvent(event);
}
};
@ -97,10 +104,9 @@ struct EventGetVersion_V2 : public Event {
TransactionPriorityType priorityType{ TransactionPriorityType::UNSET };
void logEvent(std::string id, int maxFieldLength) const {
TraceEvent("TransactionTrace_GetVersion")
.detail("TransactionID", id)
.detail("Latency", latency)
.detail("PriorityType", priorityType);
TraceEvent event("TransactionTrace_GetVersion");
event.detail("TransactionID", id).detail("Latency", latency).detail("PriorityType", priorityType);
augmentTraceEvent(event);
}
};
@ -110,8 +116,9 @@ struct EventGetVersion_V3 : public Event {
const Optional<Standalone<StringRef>>& dcId,
double lat,
TransactionPriority priority,
Version version)
: Event(EventType::GET_VERSION_LATENCY, ts, dcId), latency(lat), readVersion(version) {
Version version,
const Optional<TenantName>& tenant)
: Event(EventType::GET_VERSION_LATENCY, ts, dcId, tenant), latency(lat), readVersion(version) {
switch (priority) {
// Unfortunately, the enum serialized here disagrees with the enum used elsewhere for the values used by each
// priority
@ -143,17 +150,23 @@ struct EventGetVersion_V3 : public Event {
Version readVersion;
void logEvent(std::string id, int maxFieldLength) const {
TraceEvent("TransactionTrace_GetVersion")
.detail("TransactionID", id)
TraceEvent event("TransactionTrace_GetVersion");
event.detail("TransactionID", id)
.detail("Latency", latency)
.detail("PriorityType", priorityType)
.detail("ReadVersion", readVersion);
augmentTraceEvent(event);
}
};
struct EventGet : public Event {
EventGet(double ts, const Optional<Standalone<StringRef>>& dcId, double lat, int size, const KeyRef& in_key)
: Event(EventType::GET_LATENCY, ts, dcId), latency(lat), valueSize(size), key(in_key) {}
EventGet(double ts,
const Optional<Standalone<StringRef>>& dcId,
double lat,
int size,
const KeyRef& in_key,
const Optional<TenantName>& tenant)
: Event(EventType::GET_LATENCY, ts, dcId, tenant), latency(lat), valueSize(size), key(in_key) {}
EventGet() {}
template <typename Ar>
@ -169,13 +182,14 @@ struct EventGet : public Event {
Key key;
void logEvent(std::string id, int maxFieldLength) const {
TraceEvent("TransactionTrace_Get")
.setMaxEventLength(-1)
TraceEvent event("TransactionTrace_Get");
event.setMaxEventLength(-1)
.detail("TransactionID", id)
.detail("Latency", latency)
.detail("ValueSizeBytes", valueSize)
.setMaxFieldLength(maxFieldLength)
.detail("Key", key);
augmentTraceEvent(event);
}
};
@ -185,8 +199,9 @@ struct EventGetRange : public Event {
double lat,
int size,
const KeyRef& start_key,
const KeyRef& end_key)
: Event(EventType::GET_RANGE_LATENCY, ts, dcId), latency(lat), rangeSize(size), startKey(start_key),
const KeyRef& end_key,
const Optional<TenantName>& tenant)
: Event(EventType::GET_RANGE_LATENCY, ts, dcId, tenant), latency(lat), rangeSize(size), startKey(start_key),
endKey(end_key) {}
EventGetRange() {}
@ -204,14 +219,15 @@ struct EventGetRange : public Event {
Key endKey;
void logEvent(std::string id, int maxFieldLength) const {
TraceEvent("TransactionTrace_GetRange")
.setMaxEventLength(-1)
TraceEvent event("TransactionTrace_GetRange");
event.setMaxEventLength(-1)
.detail("TransactionID", id)
.detail("Latency", latency)
.detail("RangeSizeBytes", rangeSize)
.setMaxFieldLength(maxFieldLength)
.detail("StartKey", startKey)
.detail("EndKey", endKey);
augmentTraceEvent(event);
}
};
@ -234,36 +250,40 @@ struct EventCommit : public Event {
void logEvent(std::string id, int maxFieldLength) const {
for (auto& read_range : req.transaction.read_conflict_ranges) {
TraceEvent("TransactionTrace_Commit_ReadConflictRange")
.setMaxEventLength(-1)
TraceEvent ev1("TransactionTrace_Commit_ReadConflictRange");
ev1.setMaxEventLength(-1)
.detail("TransactionID", id)
.setMaxFieldLength(maxFieldLength)
.detail("Begin", read_range.begin)
.detail("End", read_range.end);
augmentTraceEvent(ev1);
}
for (auto& write_range : req.transaction.write_conflict_ranges) {
TraceEvent("TransactionTrace_Commit_WriteConflictRange")
.setMaxEventLength(-1)
TraceEvent ev2("TransactionTrace_Commit_WriteConflictRange");
ev2.setMaxEventLength(-1)
.detail("TransactionID", id)
.setMaxFieldLength(maxFieldLength)
.detail("Begin", write_range.begin)
.detail("End", write_range.end);
augmentTraceEvent(ev2);
}
for (auto& mutation : req.transaction.mutations) {
TraceEvent("TransactionTrace_Commit_Mutation")
.setMaxEventLength(-1)
TraceEvent ev3("TransactionTrace_Commit_Mutation");
ev3.setMaxEventLength(-1)
.detail("TransactionID", id)
.setMaxFieldLength(maxFieldLength)
.detail("Mutation", mutation);
augmentTraceEvent(ev3);
}
TraceEvent("TransactionTrace_Commit")
.detail("TransactionID", id)
TraceEvent ev4("TransactionTrace_Commit");
ev4.detail("TransactionID", id)
.detail("Latency", latency)
.detail("NumMutations", numMutations)
.detail("CommitSizeBytes", commitBytes);
augmentTraceEvent(ev4);
}
};
@ -275,8 +295,9 @@ struct EventCommit_V2 : public Event {
int mut,
int bytes,
Version version,
const CommitTransactionRequest& commit_req)
: Event(EventType::COMMIT_LATENCY, ts, dcId), latency(lat), numMutations(mut), commitBytes(bytes),
const CommitTransactionRequest& commit_req,
const Optional<TenantName>& tenant)
: Event(EventType::COMMIT_LATENCY, ts, dcId, tenant), latency(lat), numMutations(mut), commitBytes(bytes),
commitVersion(version), req(commit_req) {}
EventCommit_V2() {}
@ -298,43 +319,51 @@ struct EventCommit_V2 : public Event {
void logEvent(std::string id, int maxFieldLength) const {
for (auto& read_range : req.transaction.read_conflict_ranges) {
TraceEvent("TransactionTrace_Commit_ReadConflictRange")
.setMaxEventLength(-1)
TraceEvent ev1("TransactionTrace_Commit_ReadConflictRange");
ev1.setMaxEventLength(-1)
.detail("TransactionID", id)
.setMaxFieldLength(maxFieldLength)
.detail("Begin", read_range.begin)
.detail("End", read_range.end);
augmentTraceEvent(ev1);
}
for (auto& write_range : req.transaction.write_conflict_ranges) {
TraceEvent("TransactionTrace_Commit_WriteConflictRange")
.setMaxEventLength(-1)
TraceEvent ev2("TransactionTrace_Commit_WriteConflictRange");
ev2.setMaxEventLength(-1)
.detail("TransactionID", id)
.setMaxFieldLength(maxFieldLength)
.detail("Begin", write_range.begin)
.detail("End", write_range.end);
augmentTraceEvent(ev2);
}
for (auto& mutation : req.transaction.mutations) {
TraceEvent("TransactionTrace_Commit_Mutation")
.setMaxEventLength(-1)
TraceEvent ev3("TransactionTrace_Commit_Mutation");
ev3.setMaxEventLength(-1)
.detail("TransactionID", id)
.setMaxFieldLength(maxFieldLength)
.detail("Mutation", mutation);
augmentTraceEvent(ev3);
}
TraceEvent("TransactionTrace_Commit")
.detail("TransactionID", id)
TraceEvent ev4("TransactionTrace_Commit");
ev4.detail("TransactionID", id)
.detail("CommitVersion", commitVersion)
.detail("Latency", latency)
.detail("NumMutations", numMutations)
.detail("CommitSizeBytes", commitBytes);
augmentTraceEvent(ev4);
}
};
struct EventGetError : public Event {
EventGetError(double ts, const Optional<Standalone<StringRef>>& dcId, int err_code, const KeyRef& in_key)
: Event(EventType::ERROR_GET, ts, dcId), errCode(err_code), key(in_key) {}
EventGetError(double ts,
const Optional<Standalone<StringRef>>& dcId,
int err_code,
const KeyRef& in_key,
const Optional<TenantName>& tenant)
: Event(EventType::ERROR_GET, ts, dcId, tenant), errCode(err_code), key(in_key) {}
EventGetError() {}
template <typename Ar>
@ -349,12 +378,13 @@ struct EventGetError : public Event {
Key key;
void logEvent(std::string id, int maxFieldLength) const {
TraceEvent("TransactionTrace_GetError")
.setMaxEventLength(-1)
TraceEvent event("TransactionTrace_GetError");
event.setMaxEventLength(-1)
.detail("TransactionID", id)
.detail("ErrCode", errCode)
.setMaxFieldLength(maxFieldLength)
.detail("Key", key);
augmentTraceEvent(event);
}
};
@ -363,8 +393,9 @@ struct EventGetRangeError : public Event {
const Optional<Standalone<StringRef>>& dcId,
int err_code,
const KeyRef& start_key,
const KeyRef& end_key)
: Event(EventType::ERROR_GET_RANGE, ts, dcId), errCode(err_code), startKey(start_key), endKey(end_key) {}
const KeyRef& end_key,
const Optional<TenantName>& tenant)
: Event(EventType::ERROR_GET_RANGE, ts, dcId, tenant), errCode(err_code), startKey(start_key), endKey(end_key) {}
EventGetRangeError() {}
template <typename Ar>
@ -380,13 +411,14 @@ struct EventGetRangeError : public Event {
Key endKey;
void logEvent(std::string id, int maxFieldLength) const {
TraceEvent("TransactionTrace_GetRangeError")
.setMaxEventLength(-1)
TraceEvent event("TransactionTrace_GetRangeError");
event.setMaxEventLength(-1)
.detail("TransactionID", id)
.detail("ErrCode", errCode)
.setMaxFieldLength(maxFieldLength)
.detail("StartKey", startKey)
.detail("EndKey", endKey);
augmentTraceEvent(event);
}
};
@ -394,8 +426,9 @@ struct EventCommitError : public Event {
EventCommitError(double ts,
const Optional<Standalone<StringRef>>& dcId,
int err_code,
const CommitTransactionRequest& commit_req)
: Event(EventType::ERROR_COMMIT, ts, dcId), errCode(err_code), req(commit_req) {}
const CommitTransactionRequest& commit_req,
const Optional<TenantName>& tenant)
: Event(EventType::ERROR_COMMIT, ts, dcId, tenant), errCode(err_code), req(commit_req) {}
EventCommitError() {}
template <typename Ar>
@ -412,32 +445,37 @@ struct EventCommitError : public Event {
void logEvent(std::string id, int maxFieldLength) const {
for (auto& read_range : req.transaction.read_conflict_ranges) {
TraceEvent("TransactionTrace_CommitError_ReadConflictRange")
.setMaxEventLength(-1)
TraceEvent ev1("TransactionTrace_CommitError_ReadConflictRange");
ev1.setMaxEventLength(-1)
.detail("TransactionID", id)
.setMaxFieldLength(maxFieldLength)
.detail("Begin", read_range.begin)
.detail("End", read_range.end);
augmentTraceEvent(ev1);
}
for (auto& write_range : req.transaction.write_conflict_ranges) {
TraceEvent("TransactionTrace_CommitError_WriteConflictRange")
.setMaxEventLength(-1)
TraceEvent ev2("TransactionTrace_CommitError_WriteConflictRange");
ev2.setMaxEventLength(-1)
.detail("TransactionID", id)
.setMaxFieldLength(maxFieldLength)
.detail("Begin", write_range.begin)
.detail("End", write_range.end);
augmentTraceEvent(ev2);
}
for (auto& mutation : req.transaction.mutations) {
TraceEvent("TransactionTrace_CommitError_Mutation")
.setMaxEventLength(-1)
TraceEvent ev3("TransactionTrace_CommitError_Mutation");
ev3.setMaxEventLength(-1)
.detail("TransactionID", id)
.setMaxFieldLength(maxFieldLength)
.detail("Mutation", mutation);
augmentTraceEvent(ev3);
}
TraceEvent("TransactionTrace_CommitError").detail("TransactionID", id).detail("ErrCode", errCode);
TraceEvent ev4("TransactionTrace_CommitError");
ev4.detail("TransactionID", id).detail("ErrCode", errCode);
augmentTraceEvent(ev4);
}
};
} // namespace FdbClientLogEvents

@ -171,9 +171,8 @@ struct CommitTransactionRequest : TimedRequest {
TenantInfo tenantInfo;
CommitTransactionRequest() : CommitTransactionRequest(TenantInfo(), SpanID()) {}
CommitTransactionRequest(TenantInfo const& tenantInfo, SpanID const& context)
: spanContext(context), flags(0), tenantInfo(tenantInfo) {}
CommitTransactionRequest() : CommitTransactionRequest(SpanID()) {}
CommitTransactionRequest(SpanID const& context) : spanContext(context), flags(0) {}
template <class Ar>
void serialize(Ar& ar) {

@ -133,6 +133,7 @@ public:
};
struct WatchParameters : public ReferenceCounted<WatchParameters> {
const TenantInfo tenant;
const Key key;
const Optional<Value> value;
@ -143,7 +144,8 @@ struct WatchParameters : public ReferenceCounted<WatchParameters> {
const Optional<UID> debugID;
const UseProvisionalProxies useProvisionalProxies;
WatchParameters(Key key,
WatchParameters(TenantInfo tenant,
Key key,
Optional<Value> value,
Version version,
TagSet tags,
@ -151,8 +153,8 @@ struct WatchParameters : public ReferenceCounted<WatchParameters> {
TaskPriority taskID,
Optional<UID> debugID,
UseProvisionalProxies useProvisionalProxies)
: key(key), value(value), version(version), tags(tags), spanID(spanID), taskID(taskID), debugID(debugID),
useProvisionalProxies(useProvisionalProxies) {}
: tenant(tenant), key(key), value(value), version(version), tags(tags), spanID(spanID), taskID(taskID),
debugID(debugID), useProvisionalProxies(useProvisionalProxies) {}
};
class WatchMetadata : public ReferenceCounted<WatchMetadata> {
@ -203,6 +205,16 @@ struct EndpointFailureInfo {
double lastRefreshTime = 0;
};
struct KeyRangeLocationInfo {
TenantMapEntry tenantEntry;
KeyRange range;
Reference<LocationInfo> locations;
KeyRangeLocationInfo() {}
KeyRangeLocationInfo(TenantMapEntry tenantEntry, KeyRange range, Reference<LocationInfo> locations)
: tenantEntry(tenantEntry), range(range), locations(locations) {}
};
class DatabaseContext : public ReferenceCounted<DatabaseContext>, public FastAllocated<DatabaseContext>, NonCopyable {
public:
static DatabaseContext* allocateOnForeignThread() {
@ -237,14 +249,22 @@ public:
switchable));
}
std::pair<KeyRange, Reference<LocationInfo>> getCachedLocation(const KeyRef&, Reverse isBackward = Reverse::False);
bool getCachedLocations(const KeyRangeRef&,
std::vector<std::pair<KeyRange, Reference<LocationInfo>>>&,
Optional<KeyRangeLocationInfo> getCachedLocation(const Optional<TenantName>& tenant,
const KeyRef&,
Reverse isBackward = Reverse::False);
bool getCachedLocations(const Optional<TenantName>& tenant,
const KeyRangeRef&,
std::vector<KeyRangeLocationInfo>&,
int limit,
Reverse reverse);
Reference<LocationInfo> setCachedLocation(const KeyRangeRef&, const std::vector<struct StorageServerInterface>&);
void invalidateCache(const KeyRef&, Reverse isBackward = Reverse::False);
void invalidateCache(const KeyRangeRef&);
void cacheTenant(const TenantName& tenant, const TenantMapEntry& tenantEntry);
Reference<LocationInfo> setCachedLocation(const Optional<TenantName>& tenant,
const TenantMapEntry& tenantEntry,
const KeyRangeRef&,
const std::vector<struct StorageServerInterface>&);
void invalidateCachedTenant(const TenantNameRef& tenant);
void invalidateCache(const KeyRef& tenantPrefix, const KeyRef& key, Reverse isBackward = Reverse::False);
void invalidateCache(const KeyRef& tenantPrefix, const KeyRangeRef& keys);
// Records that `endpoint` is failed on a healthy server.
void setFailedEndpointOnHealthyServer(const Endpoint& endpoint);
@ -287,9 +307,9 @@ public:
void removeWatch();
// watch map operations
Reference<WatchMetadata> getWatchMetadata(KeyRef key) const;
Key setWatchMetadata(Reference<WatchMetadata> metadata);
void deleteWatchMetadata(KeyRef key);
Reference<WatchMetadata> getWatchMetadata(int64_t tenantId, KeyRef key) const;
void setWatchMetadata(Reference<WatchMetadata> metadata);
void deleteWatchMetadata(int64_t tenant, KeyRef key);
void clearWatchMetadata();
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value);
@ -407,8 +427,10 @@ public:
// Cache of location information
int locationCacheSize;
int tenantCacheSize;
CoalescedKeyRangeMap<Reference<LocationInfo>> locationCache;
std::unordered_map<Endpoint, EndpointFailureInfo> failedEndpointsOnHealthyServersInfo;
std::unordered_map<TenantName, TenantMapEntry> tenantCache;
std::map<UID, StorageServerInfo*> server_interf;
std::map<UID, BlobWorkerInterface> blobWorker_interf; // blob workers don't change endpoints for the same ID
@ -558,7 +580,8 @@ public:
EventCacheHolder connectToDatabaseEventCacheHolder;
private:
std::unordered_map<Key, Reference<WatchMetadata>> watchMap;
std::unordered_map<std::pair<int64_t, Key>, Reference<WatchMetadata>, boost::hash<std::pair<int64_t, Key>>>
watchMap;
};
#endif

@ -37,6 +37,9 @@ the contents of the system key space.
#include "fdbclient/ClientBooleanParams.h"
#include "fdbclient/DatabaseConfiguration.h"
#include "fdbclient/Status.h"
#include "fdbclient/Subspace.h"
#include "fdbclient/DatabaseConfiguration.h"
#include "fdbclient/Status.h"
#include "fdbclient/SystemData.h"
#include "flow/actorcompiler.h" // has to be last include
@ -626,6 +629,231 @@ Future<ConfigurationResult> changeConfig(Reference<DB> db,
// used by special keys and fdbcli
std::string generateErrorMessage(const CoordinatorsResult& res);
ACTOR template <class Transaction>
Future<Optional<TenantMapEntry>> tryGetTenantTransaction(Transaction tr, TenantName name) {
state Key tenantMapKey = name.withPrefix(tenantMapPrefix);
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
Optional<Value> val = wait(safeThreadFutureToFuture(tr->get(tenantMapKey)));
return val.map<TenantMapEntry>([](Optional<Value> v) { return decodeTenantEntry(v.get()); });
}
ACTOR template <class DB>
Future<Optional<TenantMapEntry>> tryGetTenant(Reference<DB> db, TenantName name) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
loop {
try {
Optional<TenantMapEntry> entry = wait(tryGetTenantTransaction(tr, name));
return entry;
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
ACTOR template <class Transaction>
Future<TenantMapEntry> getTenantTransaction(Transaction tr, TenantName name) {
Optional<TenantMapEntry> entry = wait(tryGetTenantTransaction(tr, name));
if (!entry.present()) {
throw tenant_not_found();
}
return entry.get();
}
ACTOR template <class DB>
Future<TenantMapEntry> getTenant(Reference<DB> db, TenantName name) {
Optional<TenantMapEntry> entry = wait(tryGetTenant(db, name));
if (!entry.present()) {
throw tenant_not_found();
}
return entry.get();
}
// Creates a tenant with the given name. If the tenant already exists, an empty optional will be returned.
ACTOR template <class Transaction>
Future<Optional<TenantMapEntry>> createTenantTransaction(Transaction tr, TenantNameRef name) {
state Key tenantMapKey = name.withPrefix(tenantMapPrefix);
if (name.startsWith("\xff"_sr)) {
throw invalid_tenant_name();
}
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Future<Optional<TenantMapEntry>> tenantEntryFuture = tryGetTenantTransaction(tr, name);
state Future<Optional<Value>> tenantDataPrefixFuture = safeThreadFutureToFuture(tr->get(tenantDataPrefixKey));
state Future<Optional<Value>> lastIdFuture = safeThreadFutureToFuture(tr->get(tenantLastIdKey));
Optional<Value> tenantMode = wait(safeThreadFutureToFuture(tr->get(configKeysPrefix.withSuffix("tenant_mode"_sr))));
if (!tenantMode.present() || tenantMode.get() == StringRef(format("%d", TenantMode::DISABLED))) {
throw tenants_disabled();
}
Optional<TenantMapEntry> tenantEntry = wait(tenantEntryFuture);
if (tenantEntry.present()) {
return Optional<TenantMapEntry>();
}
state Optional<Value> lastIdVal = wait(lastIdFuture);
Optional<Value> tenantDataPrefix = wait(tenantDataPrefixFuture);
state TenantMapEntry newTenant(lastIdVal.present() ? TenantMapEntry::prefixToId(lastIdVal.get()) + 1 : 0,
tenantDataPrefix.present() ? (KeyRef)tenantDataPrefix.get() : ""_sr);
RangeResult contents = wait(safeThreadFutureToFuture(tr->getRange(prefixRange(newTenant.prefix), 1)));
if (!contents.empty()) {
throw tenant_prefix_allocator_conflict();
}
tr->set(tenantLastIdKey, TenantMapEntry::idToPrefix(newTenant.id));
tr->set(tenantMapKey, encodeTenantEntry(newTenant));
return newTenant;
}
ACTOR template <class DB>
Future<Void> createTenant(Reference<DB> db, TenantName name) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
state bool firstTry = true;
loop {
try {
if (firstTry) {
Optional<TenantMapEntry> entry = wait(tryGetTenantTransaction(tr, name));
if (entry.present()) {
throw tenant_already_exists();
}
firstTry = false;
}
state Optional<TenantMapEntry> newTenant = wait(createTenantTransaction(tr, name));
if (BUGGIFY) {
throw commit_unknown_result();
}
wait(safeThreadFutureToFuture(tr->commit()));
if (BUGGIFY) {
throw commit_unknown_result();
}
TraceEvent("CreatedTenant")
.detail("Tenant", name)
.detail("TenantId", newTenant.present() ? newTenant.get().id : -1)
.detail("Prefix", newTenant.present() ? (StringRef)newTenant.get().prefix : "Unknown"_sr)
.detail("Version", tr->getCommittedVersion());
return Void();
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
ACTOR template <class Transaction>
Future<Void> deleteTenantTransaction(Transaction tr, TenantNameRef name) {
state Key tenantMapKey = name.withPrefix(tenantMapPrefix);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Optional<TenantMapEntry> tenantEntry = wait(tryGetTenantTransaction(tr, name));
if (!tenantEntry.present()) {
return Void();
}
RangeResult contents = wait(safeThreadFutureToFuture(tr->getRange(prefixRange(tenantEntry.get().prefix), 1)));
if (!contents.empty()) {
throw tenant_not_empty();
}
tr->clear(tenantMapKey);
return Void();
}
ACTOR template <class DB>
Future<Void> deleteTenant(Reference<DB> db, TenantName name) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
state bool firstTry = true;
loop {
try {
if (firstTry) {
Optional<TenantMapEntry> entry = wait(tryGetTenantTransaction(tr, name));
if (!entry.present()) {
throw tenant_not_found();
}
firstTry = false;
}
wait(deleteTenantTransaction(tr, name));
if (BUGGIFY) {
throw commit_unknown_result();
}
wait(safeThreadFutureToFuture(tr->commit()));
if (BUGGIFY) {
throw commit_unknown_result();
}
TraceEvent("DeletedTenant").detail("Tenant", name).detail("Version", tr->getCommittedVersion());
return Void();
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
ACTOR template <class Transaction>
Future<std::map<TenantName, TenantMapEntry>> listTenantsTransaction(Transaction tr,
TenantNameRef begin,
TenantNameRef end,
int limit) {
state KeyRange range = KeyRangeRef(begin, end).withPrefix(tenantMapPrefix);
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
RangeResult results = wait(safeThreadFutureToFuture(
tr->getRange(firstGreaterOrEqual(range.begin), firstGreaterOrEqual(range.end), limit)));
std::map<TenantName, TenantMapEntry> tenants;
for (auto kv : results) {
tenants[kv.key.removePrefix(tenantMapPrefix)] = decodeTenantEntry(kv.value);
}
return tenants;
}
ACTOR template <class DB>
Future<std::map<TenantName, TenantMapEntry>> listTenants(Reference<DB> db,
TenantName begin,
TenantName end,
int limit) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
loop {
try {
std::map<TenantName, TenantMapEntry> tenants = wait(listTenantsTransaction(tr, begin, end, limit));
return tenants;
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
} // namespace ManagementAPI
#include "flow/unactorcompiler.h"

@ -24,6 +24,7 @@
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/Tenant.h"
#include "flow/ThreadHelper.actor.h"
@ -109,6 +110,18 @@ public:
// Only if it's a MultiVersionTransaction and the underlying transaction handler is null,
// it will return false
virtual bool isValid() { return true; }
virtual Optional<TenantName> getTenant() = 0;
};
class ITenant {
public:
virtual ~ITenant() {}
virtual Reference<ITransaction> createTransaction() = 0;
virtual void addref() = 0;
virtual void delref() = 0;
};
// An interface that represents a connection to a cluster made by a client
@ -116,6 +129,7 @@ class IDatabase {
public:
virtual ~IDatabase() {}
virtual Reference<ITenant> openTenant(TenantNameRef tenantName) = 0;
virtual Reference<ITransaction> createTransaction() = 0;
virtual void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;
virtual double getMainThreadBusyness() = 0;

@ -48,6 +48,21 @@ Reference<ISingleThreadTransaction> ISingleThreadTransaction::create(Type type,
} else {
result = makeReference<PaxosConfigTransaction>();
}
result->setDatabase(cx);
result->construct(cx);
return result;
}
Reference<ISingleThreadTransaction> ISingleThreadTransaction::create(Type type,
Database const& cx,
TenantName const& tenant) {
Reference<ISingleThreadTransaction> result;
if (type == Type::RYW) {
result = makeReference<ReadYourWritesTransaction>();
} else if (type == Type::SIMPLE_CONFIG) {
result = makeReference<SimpleConfigTransaction>();
} else {
result = makeReference<PaxosConfigTransaction>();
}
result->construct(cx, tenant);
return result;
}

@ -45,8 +45,15 @@ public:
};
static ISingleThreadTransaction* allocateOnForeignThread(Type);
static Reference<ISingleThreadTransaction> create(Type, Database const&);
virtual void setDatabase(Database const&) = 0;
static Reference<ISingleThreadTransaction> create(Type, Database const&, TenantName const&);
virtual void construct(Database const&) = 0;
virtual void construct(Database const&, TenantName const&) {
// By default, a transaction implementation does not support tenants.
ASSERT(false);
}
virtual void setVersion(Version v) = 0;
virtual Future<Version> getReadVersion() = 0;

@ -18,7 +18,9 @@
* limitations under the License.
*/
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/GenericManagementAPI.actor.h"
#include "fdbclient/MultiVersionTransaction.h"
#include "fdbclient/MultiVersionAssignmentVars.h"
#include "fdbclient/ClientVersion.h"
@ -382,6 +384,15 @@ void DLTransaction::reset() {
api->transactionReset(tr);
}
// DLTenant
Reference<ITransaction> DLTenant::createTransaction() {
ASSERT(api->tenantCreateTransaction != nullptr);
FdbCApi::FDBTransaction* tr;
api->tenantCreateTransaction(tenant, &tr);
return Reference<ITransaction>(new DLTransaction(api, tr));
}
// DLDatabase
DLDatabase::DLDatabase(Reference<FdbCApi> api, ThreadFuture<FdbCApi::FDBDatabase*> dbFuture) : api(api), db(nullptr) {
addref();
@ -401,9 +412,19 @@ ThreadFuture<Void> DLDatabase::onReady() {
return ready;
}
Reference<ITenant> DLDatabase::openTenant(TenantNameRef tenantName) {
if (!api->databaseOpenTenant) {
throw unsupported_operation();
}
FdbCApi::FDBTenant* tenant;
throwIfError(api->databaseOpenTenant(db, tenantName.begin(), tenantName.size(), &tenant));
return makeReference<DLTenant>(api, tenant);
}
Reference<ITransaction> DLDatabase::createTransaction() {
FdbCApi::FDBTransaction* tr;
api->databaseCreateTransaction(db, &tr);
throwIfError(api->databaseCreateTransaction(db, &tr));
return Reference<ITransaction>(new DLTransaction(api, tr));
}
@ -522,6 +543,7 @@ void DLApi::init() {
loadClientFunction(&api->stopNetwork, lib, fdbCPath, "fdb_stop_network", headerVersion >= 0);
loadClientFunction(&api->createDatabase, lib, fdbCPath, "fdb_create_database", headerVersion >= 610);
loadClientFunction(&api->databaseOpenTenant, lib, fdbCPath, "fdb_database_open_tenant", headerVersion >= 710);
loadClientFunction(
&api->databaseCreateTransaction, lib, fdbCPath, "fdb_database_create_transaction", headerVersion >= 0);
loadClientFunction(&api->databaseSetOption, lib, fdbCPath, "fdb_database_set_option", headerVersion >= 0);
@ -542,6 +564,10 @@ void DLApi::init() {
loadClientFunction(
&api->databaseCreateSnapshot, lib, fdbCPath, "fdb_database_create_snapshot", headerVersion >= 700);
loadClientFunction(
&api->tenantCreateTransaction, lib, fdbCPath, "fdb_tenant_create_transaction", headerVersion >= 710);
loadClientFunction(&api->tenantDestroy, lib, fdbCPath, "fdb_tenant_destroy", headerVersion >= 710);
loadClientFunction(&api->transactionSetOption, lib, fdbCPath, "fdb_transaction_set_option", headerVersion >= 0);
loadClientFunction(&api->transactionDestroy, lib, fdbCPath, "fdb_transaction_destroy", headerVersion >= 0);
loadClientFunction(
@ -737,8 +763,9 @@ void DLApi::addNetworkThreadCompletionHook(void (*hook)(void*), void* hookParame
// MultiVersionTransaction
MultiVersionTransaction::MultiVersionTransaction(Reference<MultiVersionDatabase> db,
Optional<Reference<MultiVersionTenant>> tenant,
UniqueOrderedOptionList<FDBTransactionOptions> defaultOptions)
: db(db), startTime(timer_monotonic()), timeoutTsav(new ThreadSingleAssignmentVar<Void>()) {
: db(db), tenant(tenant), startTime(timer_monotonic()), timeoutTsav(new ThreadSingleAssignmentVar<Void>()) {
setDefaultOptions(defaultOptions);
updateTransaction();
}
@ -749,18 +776,29 @@ void MultiVersionTransaction::setDefaultOptions(UniqueOrderedOptionList<FDBTrans
}
void MultiVersionTransaction::updateTransaction() {
auto currentDb = db->dbState->dbVar->get();
TransactionInfo newTr;
if (currentDb.value) {
newTr.transaction = currentDb.value->createTransaction();
if (tenant.present()) {
ASSERT(tenant.get());
auto currentTenant = tenant.get()->tenantVar->get();
if (currentTenant.value) {
newTr.transaction = currentTenant.value->createTransaction();
}
newTr.onChange = currentTenant.onChange;
} else {
auto currentDb = db->dbState->dbVar->get();
if (currentDb.value) {
newTr.transaction = currentDb.value->createTransaction();
}
newTr.onChange = currentDb.onChange;
}
Optional<StringRef> timeout;
for (auto option : persistentOptions) {
if (option.first == FDBTransactionOptions::TIMEOUT) {
timeout = option.second.castTo<StringRef>();
} else if (currentDb.value) {
} else if (newTr.transaction) {
newTr.transaction->setOption(option.first, option.second.castTo<StringRef>());
}
}
@ -770,13 +808,11 @@ void MultiVersionTransaction::updateTransaction() {
// that might inadvertently fail the transaction.
if (timeout.present()) {
setTimeout(timeout);
if (currentDb.value) {
if (newTr.transaction) {
newTr.transaction->setOption(FDBTransactionOptions::TIMEOUT, timeout);
}
}
newTr.onChange = currentDb.onChange;
lock.enter();
transaction = newTr;
lock.leave();
@ -1041,6 +1077,14 @@ ThreadFuture<Void> MultiVersionTransaction::onError(Error const& e) {
}
}
Optional<TenantName> MultiVersionTransaction::getTenant() {
if (tenant.present()) {
return tenant.get()->tenantName;
} else {
return Optional<TenantName>();
}
}
// Waits for the specified duration and signals the assignment variable with a timed out error
// This will be canceled if a new timeout is set, in which case the tsav will not be signaled.
ACTOR Future<Void> timeoutImpl(Reference<ThreadSingleAssignmentVar<Void>> tsav, double duration) {
@ -1167,6 +1211,39 @@ bool MultiVersionTransaction::isValid() {
return tr.transaction.isValid();
}
// MultiVersionTenant
MultiVersionTenant::MultiVersionTenant(Reference<MultiVersionDatabase> db, StringRef tenantName)
: tenantVar(new ThreadSafeAsyncVar<Reference<ITenant>>(Reference<ITenant>(nullptr))), tenantName(tenantName), db(db) {
updateTenant();
}
MultiVersionTenant::~MultiVersionTenant() {}
Reference<ITransaction> MultiVersionTenant::createTransaction() {
return Reference<ITransaction>(new MultiVersionTransaction(
db, Reference<MultiVersionTenant>::addRef(this), db->dbState->transactionDefaultOptions));
}
// Creates a new underlying tenant object whenever the database connection changes. This change is signaled
// to open transactions via an AsyncVar.
void MultiVersionTenant::updateTenant() {
Reference<ITenant> tenant;
auto currentDb = db->dbState->dbVar->get();
if (currentDb.value) {
tenant = currentDb.value->openTenant(tenantName);
} else {
tenant = Reference<ITenant>(nullptr);
}
tenantVar->set(tenant);
MutexHolder holder(tenantLock);
tenantUpdater = mapThreadFuture<Void, Void>(currentDb.onChange, [this](ErrorOr<Void> result) {
updateTenant();
return Void();
});
}
// MultiVersionDatabase
MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi* api,
int threadIdx,
@ -1241,9 +1318,14 @@ Reference<IDatabase> MultiVersionDatabase::debugCreateFromExistingDatabase(Refer
return Reference<IDatabase>(new MultiVersionDatabase(MultiVersionApi::api, 0, "", db, db, false));
}
Reference<ITenant> MultiVersionDatabase::openTenant(TenantNameRef tenantName) {
return makeReference<MultiVersionTenant>(Reference<MultiVersionDatabase>::addRef(this), tenantName);
}
Reference<ITransaction> MultiVersionDatabase::createTransaction() {
return Reference<ITransaction>(
new MultiVersionTransaction(Reference<MultiVersionDatabase>::addRef(this), dbState->transactionDefaultOptions));
return Reference<ITransaction>(new MultiVersionTransaction(Reference<MultiVersionDatabase>::addRef(this),
Optional<Reference<MultiVersionTenant>>(),
dbState->transactionDefaultOptions));
}
void MultiVersionDatabase::setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value) {

@ -36,6 +36,7 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
typedef struct FDB_result FDBResult;
typedef struct FDB_cluster FDBCluster;
typedef struct FDB_database FDBDatabase;
typedef struct FDB_tenant FDBTenant;
typedef struct FDB_transaction FDBTransaction;
typedef int fdb_error_t;
@ -120,6 +121,10 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
fdb_error_t (*createDatabase)(const char* clusterFilePath, FDBDatabase** db);
// Database
fdb_error_t (*databaseOpenTenant)(FDBDatabase* database,
uint8_t const* tenantName,
int tenantNameLength,
FDBTenant** outTenant);
fdb_error_t (*databaseCreateTransaction)(FDBDatabase* database, FDBTransaction** tr);
fdb_error_t (*databaseSetOption)(FDBDatabase* database,
FDBDatabaseOption option,
@ -140,6 +145,10 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
double (*databaseGetMainThreadBusyness)(FDBDatabase* database);
FDBFuture* (*databaseGetServerProtocol)(FDBDatabase* database, uint64_t expectedVersion);
// Tenant
fdb_error_t (*tenantCreateTransaction)(FDBTenant* tenant, FDBTransaction** outTransaction);
void (*tenantDestroy)(FDBTenant* tenant);
// Transaction
fdb_error_t (*transactionSetOption)(FDBTransaction* tr,
FDBTransactionOption option,
@ -353,6 +362,11 @@ public:
ThreadFuture<Void> onError(Error const& e) override;
void reset() override;
Optional<TenantName> getTenant() override {
ASSERT(false);
throw internal_error();
}
void addref() override { ThreadSafeReferenceCounted<DLTransaction>::addref(); }
void delref() override { ThreadSafeReferenceCounted<DLTransaction>::delref(); }
@ -361,6 +375,25 @@ private:
FdbCApi::FDBTransaction* const tr;
};
class DLTenant : public ITenant, ThreadSafeReferenceCounted<DLTenant> {
public:
DLTenant(Reference<FdbCApi> api, FdbCApi::FDBTenant* tenant) : api(api), tenant(tenant) {}
~DLTenant() override {
if (tenant) {
api->tenantDestroy(tenant);
}
}
Reference<ITransaction> createTransaction() override;
void addref() override { ThreadSafeReferenceCounted<DLTenant>::addref(); }
void delref() override { ThreadSafeReferenceCounted<DLTenant>::delref(); }
private:
const Reference<FdbCApi> api;
FdbCApi::FDBTenant* tenant;
};
// An implementation of IDatabase that wraps a database object created on an externally loaded client library.
// All API calls to that database are routed through the external library.
class DLDatabase : public IDatabase, ThreadSafeReferenceCounted<DLDatabase> {
@ -375,6 +408,7 @@ public:
ThreadFuture<Void> onReady();
Reference<ITenant> openTenant(TenantNameRef tenantName) override;
Reference<ITransaction> createTransaction() override;
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
double getMainThreadBusyness() override;
@ -432,6 +466,7 @@ private:
};
class MultiVersionDatabase;
class MultiVersionTenant;
// An implementation of ITransaction that wraps a transaction created either locally or through a dynamically loaded
// external client. When needed (e.g on cluster version change), the MultiVersionTransaction can automatically replace
@ -439,6 +474,7 @@ class MultiVersionDatabase;
class MultiVersionTransaction : public ITransaction, ThreadSafeReferenceCounted<MultiVersionTransaction> {
public:
MultiVersionTransaction(Reference<MultiVersionDatabase> db,
Optional<Reference<MultiVersionTenant>> tenant,
UniqueOrderedOptionList<FDBTransactionOptions> defaultOptions);
~MultiVersionTransaction() override;
@ -507,6 +543,8 @@ public:
ThreadFuture<Void> onError(Error const& e) override;
void reset() override;
Optional<TenantName> getTenant() override;
void addref() override { ThreadSafeReferenceCounted<MultiVersionTransaction>::addref(); }
void delref() override { ThreadSafeReferenceCounted<MultiVersionTransaction>::delref(); }
@ -515,6 +553,7 @@ public:
private:
const Reference<MultiVersionDatabase> db;
const Optional<Reference<MultiVersionTenant>> tenant;
ThreadSpinLock lock;
struct TransactionInfo {
@ -555,6 +594,8 @@ private:
void setDefaultOptions(UniqueOrderedOptionList<FDBTransactionOptions> options);
std::vector<std::pair<FDBTransactionOptions::Option, Optional<Standalone<StringRef>>>> persistentOptions;
const Optional<TenantName> tenantName;
};
struct ClientDesc {
@ -585,6 +626,33 @@ struct ClientInfo : ClientDesc, ThreadSafeReferenceCounted<ClientInfo> {
class MultiVersionApi;
// An implementation of ITenant that wraps a tenant created either locally or through a dynamically loaded
// external client. The wrapped ITenant is automatically changed when the MultiVersionDatabase used to create
// it connects with a different version.
class MultiVersionTenant final : public ITenant, ThreadSafeReferenceCounted<MultiVersionTenant> {
public:
MultiVersionTenant(Reference<MultiVersionDatabase> db, StringRef tenantName);
~MultiVersionTenant() override;
Reference<ITransaction> createTransaction() override;
void addref() override { ThreadSafeReferenceCounted<MultiVersionTenant>::addref(); }
void delref() override { ThreadSafeReferenceCounted<MultiVersionTenant>::delref(); }
Reference<ThreadSafeAsyncVar<Reference<ITenant>>> tenantVar;
const Standalone<StringRef> tenantName;
private:
Reference<MultiVersionDatabase> db;
Mutex tenantLock;
ThreadFuture<Void> tenantUpdater;
// Creates a new underlying tenant object whenever the database connection changes. This change is signaled
// to open transactions via an AsyncVar.
void updateTenant();
};
// An implementation of IDatabase that wraps a database created either locally or through a dynamically loaded
// external client. The MultiVersionDatabase monitors the protocol version of the cluster and automatically
// replaces the wrapped database when the protocol version changes.
@ -599,6 +667,7 @@ public:
~MultiVersionDatabase() override;
Reference<ITenant> openTenant(TenantNameRef tenantName) override;
Reference<ITransaction> createTransaction() override;
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
double getMainThreadBusyness() override;

File diff suppressed because it is too large Load Diff

@ -159,6 +159,7 @@ struct TransactionOptions {
bool expensiveClearCostEstimation : 1;
bool useGrvCache : 1;
bool skipGrvCache : 1;
bool rawAccess : 1;
TransactionPriority priority;
@ -236,6 +237,8 @@ struct Watch : public ReferenceCounted<Watch>, NonCopyable {
struct TransactionState : ReferenceCounted<TransactionState> {
Database cx;
Optional<TenantName> tenant;
int64_t tenantId = TenantInfo::INVALID_TENANT;
Reference<TransactionLogInfo> trLogInfo;
TransactionOptions options;
@ -258,15 +261,19 @@ struct TransactionState : ReferenceCounted<TransactionState> {
// Only available so that Transaction can have a default constructor, for use in state variables
TransactionState(TaskPriority taskID, SpanID spanID) : taskID(taskID), spanID(spanID) {}
TransactionState(Database cx, TaskPriority taskID, SpanID spanID, Reference<TransactionLogInfo> trLogInfo)
: cx(cx), trLogInfo(trLogInfo), options(cx), taskID(taskID), spanID(spanID) {}
TransactionState(Database cx,
Optional<TenantName> tenant,
TaskPriority taskID,
SpanID spanID,
Reference<TransactionLogInfo> trLogInfo);
Reference<TransactionState> cloneAndReset(Reference<TransactionLogInfo> newTrLogInfo, bool generateNewSpan) const;
TenantInfo getTenantInfo() const;
};
class Transaction : NonCopyable {
public:
explicit Transaction(Database const& cx);
explicit Transaction(Database const& cx, Optional<TenantName> const& tenant = Optional<TenantName>());
~Transaction();
void setVersion(Version v);
@ -440,6 +447,8 @@ public:
return Standalone<VectorRef<KeyRangeRef>>(tr.transaction.write_conflict_ranges, tr.arena);
}
Optional<TenantName> getTenant() { return trState->tenant; }
Reference<TransactionState> trState;
std::vector<Reference<Watch>> watches;
Span span;

@ -461,6 +461,6 @@ PaxosConfigTransaction::PaxosConfigTransaction() = default;
PaxosConfigTransaction::~PaxosConfigTransaction() = default;
void PaxosConfigTransaction::setDatabase(Database const& cx) {
void PaxosConfigTransaction::construct(Database const& cx) {
impl = PImpl<PaxosConfigTransactionImpl>::create(cx);
}

@ -35,7 +35,7 @@ public:
PaxosConfigTransaction(std::vector<ConfigTransactionInterface> const&);
PaxosConfigTransaction();
~PaxosConfigTransaction();
void setDatabase(Database const&) override;
void construct(Database const&) override;
Future<Version> getReadVersion() override;
Optional<Version> getCachedReadVersion() const override;

@ -1443,17 +1443,21 @@ public:
}
};
ReadYourWritesTransaction::ReadYourWritesTransaction(Database const& cx)
: ISingleThreadTransaction(cx->deferredError), tr(cx), cache(&arena), writes(&arena), retries(0), approximateSize(0),
creationTime(now()), commitStarted(false), versionStampFuture(tr.getVersionstamp()),
ReadYourWritesTransaction::ReadYourWritesTransaction(Database const& cx, Optional<TenantName> tenantName)
: ISingleThreadTransaction(cx->deferredError), tr(cx, tenantName), cache(&arena), writes(&arena), retries(0),
approximateSize(0), creationTime(now()), commitStarted(false), versionStampFuture(tr.getVersionstamp()),
specialKeySpaceWriteMap(std::make_pair(false, Optional<Value>()), specialKeys.end), options(tr) {
std::copy(
cx.getTransactionDefaults().begin(), cx.getTransactionDefaults().end(), std::back_inserter(persistentOptions));
applyPersistentOptions();
}
void ReadYourWritesTransaction::setDatabase(Database const& cx) {
*this = ReadYourWritesTransaction(cx);
void ReadYourWritesTransaction::construct(Database const& cx) {
*this = ReadYourWritesTransaction(cx, Optional<TenantName>());
}
void ReadYourWritesTransaction::construct(Database const& cx, TenantName const& tenantName) {
*this = ReadYourWritesTransaction(cx, tenantName);
}
ACTOR Future<Void> timebomb(double endTime, Promise<Void> resetPromise) {

@ -68,10 +68,11 @@ class ReadYourWritesTransaction final : NonCopyable,
public ISingleThreadTransaction,
public FastAllocated<ReadYourWritesTransaction> {
public:
explicit ReadYourWritesTransaction(Database const& cx);
explicit ReadYourWritesTransaction(Database const& cx, Optional<TenantName> tenant = Optional<TenantName>());
~ReadYourWritesTransaction();
void setDatabase(Database const&) override;
void construct(Database const&) override;
void construct(Database const&, TenantName const& tenant) override;
void setVersion(Version v) override { tr.setVersion(v); }
Future<Version> getReadVersion() override;
Optional<Version> getCachedReadVersion() const override { return tr.getCachedReadVersion(); }
@ -190,6 +191,8 @@ public:
void setSpecialKeySpaceErrorMsg(const std::string& msg) { specialKeySpaceErrorMsg = msg; }
Transaction& getTransaction() { return tr; }
Optional<TenantName> getTenant() { return tr.getTenant(); }
// used in template functions as returned Future type
template <typename Type>
using FutureT = Future<Type>;

@ -286,7 +286,7 @@ void SimpleConfigTransaction::checkDeferredError() const {
impl->checkDeferredError(deferredError);
}
void SimpleConfigTransaction::setDatabase(Database const& cx) {
void SimpleConfigTransaction::construct(Database const& cx) {
impl = PImpl<SimpleConfigTransactionImpl>::create(cx);
}

@ -43,7 +43,7 @@ public:
SimpleConfigTransaction(ConfigTransactionInterface const&);
SimpleConfigTransaction(Database const&);
SimpleConfigTransaction();
void setDatabase(Database const&) override;
void construct(Database const&) override;
~SimpleConfigTransaction();
Future<Version> getReadVersion() override;
Optional<Version> getCachedReadVersion() const override;

@ -28,6 +28,7 @@
#include "fdbclient/ActorLineageProfiler.h"
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/Knobs.h"
#include "fdbclient/ProcessInterface.h"
#include "fdbclient/GlobalConfig.actor.h"
@ -54,6 +55,8 @@ static bool isAlphaNumeric(const std::string& key) {
}
} // namespace
const KeyRangeRef TenantMapRangeImpl::submoduleRange = KeyRangeRef("tenant_map/"_sr, "tenant_map0"_sr);
std::unordered_map<SpecialKeySpace::MODULE, KeyRange> SpecialKeySpace::moduleToBoundary = {
{ SpecialKeySpace::MODULE::TRANSACTION,
KeyRangeRef(LiteralStringRef("\xff\xff/transaction/"), LiteralStringRef("\xff\xff/transaction0")) },
@ -111,7 +114,8 @@ std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandT
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
{ "datadistribution",
KeyRangeRef(LiteralStringRef("data_distribution/"), LiteralStringRef("data_distribution0"))
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
{ "tenantmap", TenantMapRangeImpl::submoduleRange.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }
};
std::unordered_map<std::string, KeyRange> SpecialKeySpace::actorLineageApiCommandToRange = {
@ -1291,6 +1295,7 @@ void ProcessClassRangeImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef&
}
ACTOR Future<RangeResult> getProcessClassSourceActor(ReadYourWritesTransaction* ryw, KeyRef prefix, KeyRangeRef kr) {
ryw->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
std::vector<ProcessData> _workers = wait(getWorkers(&ryw->getTransaction()));
auto workers = _workers; // strip const
// Note : the sort by string is anti intuition, ex. 1.1.1.1:11 < 1.1.1.1:5
@ -2697,3 +2702,95 @@ Future<Optional<std::string>> FailedLocalitiesRangeImpl::commit(ReadYourWritesTr
// exclude locality with failed option as true.
return excludeLocalityCommitActor(ryw, true);
}
ACTOR Future<RangeResult> getTenantList(ReadYourWritesTransaction* ryw, KeyRangeRef kr, GetRangeLimits limitsHint) {
KeyRangeRef tenantRange =
kr.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)
.removePrefix(TenantMapRangeImpl::submoduleRange.begin);
state KeyRef managementPrefix =
kr.begin.substr(0,
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin.size() +
TenantMapRangeImpl::submoduleRange.begin.size());
std::map<TenantName, TenantMapEntry> tenants = wait(ManagementAPI::listTenantsTransaction(
Reference<ReadYourWritesTransaction>::addRef(ryw), tenantRange.begin, tenantRange.end, limitsHint.rows));
RangeResult results;
for (auto tenant : tenants) {
json_spirit::mObject tenantEntry;
tenantEntry["id"] = tenant.second.id;
tenantEntry["prefix"] = tenant.second.prefix.toString();
std::string tenantEntryString = json_spirit::write_string(json_spirit::mValue(tenantEntry));
ValueRef tenantEntryBytes(results.arena(), tenantEntryString);
results.push_back(results.arena(),
KeyValueRef(tenant.first.withPrefix(managementPrefix, results.arena()), tenantEntryBytes));
}
return results;
}
TenantMapRangeImpl::TenantMapRangeImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
Future<RangeResult> TenantMapRangeImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const {
return getTenantList(ryw, kr, limitsHint);
}
ACTOR Future<Void> deleteTenantRange(ReadYourWritesTransaction* ryw, TenantName beginTenant, TenantName endTenant) {
std::map<TenantName, TenantMapEntry> tenants = wait(
ManagementAPI::listTenantsTransaction(&ryw->getTransaction(), beginTenant, endTenant, CLIENT_KNOBS->TOO_MANY));
if (tenants.size() == CLIENT_KNOBS->TOO_MANY) {
TraceEvent(SevWarn, "DeleteTenantRangeTooLange")
.detail("BeginTenant", beginTenant)
.detail("EndTenant", endTenant);
ryw->setSpecialKeySpaceErrorMsg("too many tenants to range delete");
throw special_keys_api_failure();
}
std::vector<Future<Void>> deleteFutures;
for (auto tenant : tenants) {
deleteFutures.push_back(ManagementAPI::deleteTenantTransaction(&ryw->getTransaction(), tenant.first));
}
wait(waitForAll(deleteFutures));
return Void();
}
Future<Optional<std::string>> TenantMapRangeImpl::commit(ReadYourWritesTransaction* ryw) {
auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(range);
std::vector<Future<Void>> tenantManagementFutures;
for (auto range : ranges) {
if (!range.value().first) {
continue;
}
TenantNameRef tenantName =
range.begin()
.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)
.removePrefix(TenantMapRangeImpl::submoduleRange.begin);
if (range.value().second.present()) {
tenantManagementFutures.push_back(
success(ManagementAPI::createTenantTransaction(&ryw->getTransaction(), tenantName)));
} else {
// For a single key clear, just issue the delete
if (KeyRangeRef(range.begin(), range.end()).singleKeyRange()) {
tenantManagementFutures.push_back(
ManagementAPI::deleteTenantTransaction(&ryw->getTransaction(), tenantName));
} else {
TenantNameRef endTenant = range.end().removePrefix(
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin);
if (endTenant.startsWith(submoduleRange.begin)) {
endTenant = endTenant.removePrefix(submoduleRange.end);
} else {
endTenant = "\xff"_sr;
}
tenantManagementFutures.push_back(deleteTenantRange(ryw, tenantName, endTenant));
}
}
}
return tag(waitForAll(tenantManagementFutures), Optional<std::string>());
}

@ -528,5 +528,16 @@ public:
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
class TenantMapRangeImpl : public SpecialKeyRangeRWImpl {
public:
const static KeyRangeRef submoduleRange;
explicit TenantMapRangeImpl(KeyRangeRef kr);
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const override;
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
#include "flow/unactorcompiler.h"
#endif

@ -1336,6 +1336,8 @@ TenantMapEntry decodeTenantEntry(ValueRef const& value) {
const KeyRangeRef tenantMapKeys("\xff/tenantMap/"_sr, "\xff/tenantMap0"_sr);
const KeyRef tenantMapPrefix = tenantMapKeys.begin;
const KeyRef tenantMapPrivatePrefix = "\xff\xff/tenantMap/"_sr;
const KeyRef tenantLastIdKey = "\xff/tenantLastId/"_sr;
const KeyRef tenantDataPrefixKey = "\xff/tenantDataPrefix"_sr;
// for tests
void testSSISerdes(StorageServerInterface const& ssi, bool useFB) {

@ -598,6 +598,8 @@ BlobWorkerInterface decodeBlobWorkerListValue(ValueRef const& value);
extern const KeyRangeRef tenantMapKeys;
extern const KeyRef tenantMapPrefix;
extern const KeyRef tenantMapPrivatePrefix;
extern const KeyRef tenantLastIdKey;
extern const KeyRef tenantDataPrefixKey;
Value encodeTenantEntry(TenantMapEntry const& tenantEntry);
TenantMapEntry decodeTenantEntry(ValueRef const& value);

@ -23,6 +23,7 @@
#include "fdbclient/ThreadSafeTransaction.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/versions.h"
#include "fdbclient/GenericManagementAPI.actor.h"
#include "fdbclient/NativeAPI.actor.h"
// Users of ThreadSafeTransaction might share Reference<ThreadSafe...> between different threads as long as they don't
@ -46,9 +47,13 @@ ThreadFuture<Reference<IDatabase>> ThreadSafeDatabase::createFromExistingDatabas
});
}
Reference<ITenant> ThreadSafeDatabase::openTenant(TenantNameRef tenantName) {
return makeReference<ThreadSafeTenant>(Reference<ThreadSafeDatabase>::addRef(this), tenantName);
}
Reference<ITransaction> ThreadSafeDatabase::createTransaction() {
auto type = isConfigDB ? ISingleThreadTransaction::Type::SIMPLE_CONFIG : ISingleThreadTransaction::Type::RYW;
return Reference<ITransaction>(new ThreadSafeTransaction(db, type));
return Reference<ITransaction>(new ThreadSafeTransaction(db, type, Optional<TenantName>()));
}
void ThreadSafeDatabase::setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value) {
@ -139,7 +144,17 @@ ThreadSafeDatabase::~ThreadSafeDatabase() {
onMainThreadVoid([db]() { db->delref(); }, nullptr);
}
ThreadSafeTransaction::ThreadSafeTransaction(DatabaseContext* cx, ISingleThreadTransaction::Type type) {
Reference<ITransaction> ThreadSafeTenant::createTransaction() {
auto type = db->isConfigDB ? ISingleThreadTransaction::Type::SIMPLE_CONFIG : ISingleThreadTransaction::Type::RYW;
return Reference<ITransaction>(new ThreadSafeTransaction(db->db, type, name));
}
ThreadSafeTenant::~ThreadSafeTenant() {}
ThreadSafeTransaction::ThreadSafeTransaction(DatabaseContext* cx,
ISingleThreadTransaction::Type type,
Optional<TenantName> tenant)
: tenantName(tenant) {
// Allocate memory for the transaction from this thread (so the pointer is known for subsequent method calls)
// but run its constructor on the main thread
@ -150,9 +165,13 @@ ThreadSafeTransaction::ThreadSafeTransaction(DatabaseContext* cx, ISingleThreadT
auto tr = this->tr = ISingleThreadTransaction::allocateOnForeignThread(type);
// No deferred error -- if the construction of the RYW transaction fails, we have no where to put it
onMainThreadVoid(
[tr, cx]() {
[tr, cx, tenant]() {
cx->addref();
tr->setDatabase(Database(cx));
if (tenant.present()) {
tr->construct(Database(cx), tenant.get());
} else {
tr->construct(Database(cx));
}
},
nullptr);
}
@ -461,6 +480,10 @@ ThreadFuture<Void> ThreadSafeTransaction::onError(Error const& e) {
return onMainThread([tr, e]() { return tr->onError(e); });
}
Optional<TenantName> ThreadSafeTransaction::getTenant() {
return tenantName;
}
void ThreadSafeTransaction::operator=(ThreadSafeTransaction&& r) noexcept {
tr = r.tr;
r.tr = nullptr;

@ -35,6 +35,7 @@ public:
~ThreadSafeDatabase() override;
static ThreadFuture<Reference<IDatabase>> createFromExistingDatabase(Database cx);
Reference<ITenant> openTenant(TenantNameRef tenantName) override;
Reference<ITransaction> createTransaction() override;
void setOption(FDBDatabaseOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
@ -58,6 +59,7 @@ public:
ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override;
private:
friend class ThreadSafeTenant;
friend class ThreadSafeTransaction;
bool isConfigDB{ false };
DatabaseContext* db;
@ -68,11 +70,28 @@ public: // Internal use only
DatabaseContext* unsafeGetPtr() const { return db; }
};
class ThreadSafeTenant : public ITenant, ThreadSafeReferenceCounted<ThreadSafeTenant>, NonCopyable {
public:
ThreadSafeTenant(Reference<ThreadSafeDatabase> db, StringRef name) : db(db), name(name) {}
~ThreadSafeTenant() override;
Reference<ITransaction> createTransaction() override;
void addref() override { ThreadSafeReferenceCounted<ThreadSafeTenant>::addref(); }
void delref() override { ThreadSafeReferenceCounted<ThreadSafeTenant>::delref(); }
private:
Reference<ThreadSafeDatabase> db;
Standalone<StringRef> name;
};
// An implementation of ITransaction that serializes operations onto the network thread and interacts with the
// lower-level client APIs exposed by ISingleThreadTransaction
class ThreadSafeTransaction : public ITransaction, ThreadSafeReferenceCounted<ThreadSafeTransaction>, NonCopyable {
public:
explicit ThreadSafeTransaction(DatabaseContext* cx, ISingleThreadTransaction::Type type);
explicit ThreadSafeTransaction(DatabaseContext* cx,
ISingleThreadTransaction::Type type,
Optional<TenantName> tenant);
~ThreadSafeTransaction() override;
// Note: used while refactoring fdbcli, need to be removed later
@ -149,6 +168,8 @@ public:
ThreadFuture<Void> checkDeferredError();
ThreadFuture<Void> onError(Error const& e) override;
Optional<TenantName> getTenant() override;
// These are to permit use as state variables in actors:
ThreadSafeTransaction() : tr(nullptr) {}
void operator=(ThreadSafeTransaction&& r) noexcept;
@ -161,6 +182,7 @@ public:
private:
ISingleThreadTransaction* tr;
const Optional<TenantName> tenantName;
};
// An implementation of IClientApi that serializes operations onto the network thread and interacts with the lower-level

@ -230,9 +230,11 @@ description is not currently required but encouraged.
<Option name="initialize_new_database" code="300"
description="This is a write-only transaction which sets the initial configuration. This option is designed for use by database system tools only." />
<Option name="access_system_keys" code="301"
description="Allows this transaction to read and modify system keys (those that start with the byte 0xFF)"/>
description="Allows this transaction to read and modify system keys (those that start with the byte 0xFF). Implies raw_access."/>
<Option name="read_system_keys" code="302"
description="Allows this transaction to read system keys (those that start with the byte 0xFF)"/>
description="Allows this transaction to read system keys (those that start with the byte 0xFF). Implies raw_access."/>
<Option name="raw_access" code="303"
description="Allows this transaction to access the raw key-space when tenant mode is on."/>
<Option name="debug_dump" code="400"
hidden="true" />
<Option name="debug_retry_logging" code="401" paramType="String" paramDescription="Optional transaction name" />

@ -20,6 +20,7 @@
#include <vector>
#include "fdbclient/FDBOptions.g.h"
#include "flow/Util.h"
#include "fdbrpc/FailureMonitor.h"
#include "fdbclient/KeyBackedTypes.h"
@ -65,6 +66,7 @@ ACTOR Future<MoveKeysLock> takeMoveKeysLock(Database cx, UID ddId) {
state MoveKeysLock lock;
state UID txnId;
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (!g_network->isSimulated()) {
txnId = deterministicRandom()->randomUniqueID();
tr.debugTransaction(txnId);
@ -99,6 +101,7 @@ ACTOR static Future<Void> checkMoveKeysLock(Transaction* tr,
MoveKeysLock lock,
const DDEnabledState* ddEnabledState,
bool isWrite = true) {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
if (!ddEnabledState->isDDEnabled()) {
TraceEvent(SevDebug, "DDDisabledByInMemoryCheck").log();
throw movekeys_conflict();
@ -605,6 +608,7 @@ ACTOR Future<Void> checkFetchingState(Database cx,
tr.trState->taskID = TaskPriority::MoveKeys;
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
std::vector<Future<Optional<Value>>> serverListEntries;
serverListEntries.reserve(dest.size());
@ -698,6 +702,7 @@ ACTOR static Future<Void> finishMoveKeys(Database occ,
tr.trState->taskID = TaskPriority::MoveKeys;
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
releaser.release();
wait(finishMoveKeysParallelismLock->take(TaskPriority::DataDistributionLaunch));
@ -1332,6 +1337,7 @@ ACTOR Future<Void> removeKeysFromFailedServer(Database cx,
try {
tr.trState->taskID = TaskPriority::MoveKeys;
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
wait(checkMoveKeysLock(&tr, lock, ddEnabledState));
TraceEvent("RemoveKeysFromFailedServerLocked")
.detail("ServerID", serverID)

@ -19,6 +19,7 @@
*/
#include <cinttypes>
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/SystemData.h"
#include "flow/ActorCollection.h"
#include "fdbrpc/simulator.h"
@ -233,6 +234,7 @@ ACTOR Future<std::vector<BlobWorkerInterface>> getBlobWorkers(Database cx, bool
if (use_system_priority) {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
}
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
try {
RangeResult blobWorkersList = wait(tr.getRange(blobWorkerListKeys, CLIENT_KNOBS->TOO_MANY));
@ -256,6 +258,7 @@ ACTOR Future<std::vector<StorageServerInterface>> getStorageServers(Database cx,
if (use_system_priority) {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
}
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
try {
RangeResult serverList = wait(tr.getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY));

@ -4450,7 +4450,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
// FIXME: The client cache does not notice when servers are added to a team. To read from a local storage server
// we must refresh the cache manually.
data->cx->invalidateCache(keys);
data->cx->invalidateCache(Key(), keys);
loop {
state Transaction tr(data->cx);

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include "fdbclient/FDBOptions.g.h"
#include "fdbrpc/simulator.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/ManagementAPI.actor.h"
@ -50,6 +51,7 @@ struct MoveKeysWorkload : TestWorkload {
// Get the database configuration so as to use proper team size
state Transaction tr(cx);
loop {
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
try {
RangeResult res = wait(tr.getRange(configKeys, 1000));
ASSERT(res.size() < 1000);

@ -161,6 +161,7 @@ public: // introduced features
PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, NetworkAddressHostnameFlag);
PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, StorageMetadata);
PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, PerpetualWiggleMetadata);
PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, Tenants);
};
template <>