1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-06-01 10:45:56 +08:00

Merge remote-tracking branch 'origin/master' into async-task-thread

This commit is contained in:
sfc-gh-tclinkenbeard 2021-07-25 10:30:25 -07:00
commit 7d1af92239
66 changed files with 1091 additions and 542 deletions
.gitignoreREADME.md
bindings
c/test/mako
python/tests
cmake
contrib
documentation/sphinx/source
fdbclient
fdbrpc
fdbserver
flow
tests/TestRunner

1
.gitignore vendored

@ -84,6 +84,7 @@ ipch/
compile_commands.json
flow/actorcompiler/obj
flow/coveragetool/obj
*.code-workspace
# IDE indexing (commonly used tools)
/compile_commands.json

@ -30,7 +30,9 @@ Developers interested in using FoundationDB can get started by downloading and i
Developers on an OS for which there is no binary package, or who would like
to start hacking on the code, can get started by compiling from source.
The official docker image for building is `foundationdb/foundationdb-build`. It has all dependencies installed. To build outside the official docker image you'll need at least these dependencies:
The official docker image for building is [`foundationdb/build`](https://hub.docker.com/r/foundationdb/build) which has all dependencies installed. The Docker image definitions used by FoundationDB team members can be found in the [dedicated repository.](https://github.com/FoundationDB/fdb-build-support).
To build outside the official docker image you'll need at least these dependencies:
1. Install cmake Version 3.13 or higher [CMake](https://cmake.org/)
1. Install [Mono](http://www.mono-project.com/download/stable/)
@ -77,7 +79,7 @@ describe the actor compiler source file, not the post-processed output files,
and places the output file in the source directory. This file should then be
picked up automatically by any tooling.
Note that if building inside of the `foundationdb/foundationdb-build` docker
Note that if building inside of the `foundationdb/build` docker
image, the resulting paths will still be incorrect and require manual fixing.
One will wish to re-run `cmake` with `-DCMAKE_EXPORT_COMPILE_COMMANDS=OFF` to
prevent it from reverting the manual changes.
@ -138,7 +140,7 @@ You should create a second build-directory which you will use for building and d
### Linux
There are no special requirements for Linux. A docker image can be pulled from
`foundationdb/foundationdb-build` that has all of FoundationDB's dependencies
`foundationdb/build` that has all of FoundationDB's dependencies
pre-installed, and is what the CI uses to build and test PRs.
```

@ -1056,12 +1056,12 @@ void* worker_thread(void* thread_args) {
}
fprintf(debugme,
"DEBUG: worker_id:%d (%d) thread_id:%d (%d) (tid:%d)\n",
"DEBUG: worker_id:%d (%d) thread_id:%d (%d) (tid:%lld)\n",
worker_id,
args->num_processes,
thread_id,
args->num_threads,
(unsigned int)pthread_self());
(uint64_t)pthread_self());
if (args->tpsmax) {
thread_tps = compute_thread_tps(args->tpsmax, worker_id, thread_id, args->num_processes, args->num_threads);

@ -381,12 +381,19 @@ def exclude(logger):
while True:
logger.debug("Excluding process: {}".format(excluded_address))
error_message = run_fdbcli_command_and_get_error('exclude', excluded_address)
if not error_message:
if error_message == 'WARNING: {} is a coordinator!'.format(excluded_address):
# exclude coordinator will print the warning, verify the randomly selected process is the coordinator
coordinator_list = get_value_from_status_json(True, 'client', 'coordinators', 'coordinators')
assert len(coordinator_list) == 1
assert coordinator_list[0]['address'] == excluded_address
break
elif not error_message:
break
else:
logger.debug("Error message: {}\n".format(error_message))
logger.debug("Retry exclude after 1 second")
time.sleep(1)
output2 = run_fdbcli_command('exclude')
# logger.debug(output3)
assert 'There are currently 1 servers or localities being excluded from the database' in output2
assert excluded_address in output2
run_fdbcli_command('include', excluded_address)

@ -416,14 +416,14 @@ function(add_fdbclient_test)
message(STATUS "Adding Client test ${T_NAME}")
if (T_PROCESS_NUMBER)
add_test(NAME "${T_NAME}"
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/tmp_cluster.py
COMMAND ${Python_EXECUTABLE} ${CMAKE_SOURCE_DIR}/tests/TestRunner/tmp_cluster.py
--build-dir ${CMAKE_BINARY_DIR}
--process-number ${T_PROCESS_NUMBER}
--
${T_COMMAND})
else()
add_test(NAME "${T_NAME}"
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/tmp_cluster.py
COMMAND ${Python_EXECUTABLE} ${CMAKE_SOURCE_DIR}/tests/TestRunner/tmp_cluster.py
--build-dir ${CMAKE_BINARY_DIR}
--
${T_COMMAND})
@ -459,7 +459,7 @@ function(add_multi_fdbclient_test)
endif()
message(STATUS "Adding Client test ${T_NAME}")
add_test(NAME "${T_NAME}"
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/tmp_multi_cluster.py
COMMAND ${Python_EXECUTABLE} ${CMAKE_SOURCE_DIR}/tests/TestRunner/tmp_multi_cluster.py
--build-dir ${CMAKE_BINARY_DIR}
--clusters 3
--

@ -536,7 +536,8 @@ namespace SummarizeTest
consoleThread.Join();
var traceFiles = Directory.GetFiles(tempPath, "trace*.*").Where(s => s.EndsWith(".xml") || s.EndsWith(".json")).ToArray();
if (traceFiles.Length == 0)
// if no traces caused by the process failed then the result will include its stderr
if (process.ExitCode == 0 && traceFiles.Length == 0)
{
if (!traceToStdout)
{

0
contrib/gen_compile_db.py Executable file → Normal file

@ -0,0 +1,195 @@
###################################################
FDB HA Write Path: How a mutation travels in FDB HA
###################################################
| Author: Meng Xu
| Reviewer: Alex Miller, Jingyu Zhou, Lukas Joswiak, Trevor Clinkenbeard
| Audience: FDB developers, SREs and expert users.
This document describes how a mutation is replicated and moved from proxy to storage servers (SS) in a FDB High Availability (HA) cluster. Historically, FDB HA is also called Fearless DR or Multi-region configuration.
To simplify the description, we assume the HA cluster has the following configuration:
* Replication factor = 3 for storage servers. It means each mutation is replicated to 3 storage servers in the primary datacenter (DC) and 3 SSes in the secondary DC.
* Replication factor = 3 for transaction logs (tLogs). It means each mutation is synchronously replicated to 3 primary tLogs and 1 satellite tLog.
* Satellite replication factor = 1 satellite single replication. It means each mutation must be synchronously replicated to 1 satellite tLog before it can be committed.
* The satellite replication factor can be configured with one or two satellites and single, double or triple replicas as described here. We typically use only 1 satellite single replica config.
* Only 1 satellite is configured in the primary DC.
We describe the background knowledge -- Sharding and Tag structure -- before we discuss how a mutation travels in a FDB HA cluster.
Sharding: Which shard goes to which servers?
============================================
A shard is a continuous key range. FDB divides the entire keyspace to thousands of shards. A mutations key decides which shard it belongs to.
Shard-to-SS mapping is determined by the \xff/keyServers/ system keyspace. In the system keyspace, a shards begin key is used as the key, the shards end key is the next key, and the shards SSes are the value. For example, we have the following key-values in the system keyspace: \xff/keyServers/a=(SS1,SS2,SS3) , \xff/keyServers/b=(SS2,SS4,SS7) . It indicates: shard [a,b) will be saved on storage servers whose IDs are SS1, SS2, SS3; and shard [b, \xff\xff) will be saved on storage servers SS2, SS4, SS7.
SS-to-tag mapping is decided by the \xff/serverTag/ system keyspace. A tag is a whole number (i.e., natural number and 0). Each SS is mapped to a tag and vice versa. We use tags to represent SSes in the transaction system to save space and speed up search, because tags are continuous and small numbers (described in Tag structure section) while SS IDs are 64 bit random UID.
Shard-to-tLog mapping is decided by shard-to-SS mapping and tLogs replication policy. We use an example to explain how it works. Assume a mutation is mapped to SS1, SS2, and SS5, whose tags are respectively 1, 2, 5. The system has four tlogs. We use a function to transfer tag to tLog index: f(tag) = tLogIndex, where f(tag) is a modular function in FDB 6.2 and 6.3 implementation. In the example, the mutations assigned tLogs will be 1, 2, 1, which is calculated as the shards tag % 4. As you may notice, the three tags produces only two unique tLog indexes, which does not satisfy tLogs replication policy that requires 3 tLog replicas. The proxy will call the replication policy engine, selectReplicas(), to choose another tLog for the mutation.
Tag structure
=============
Tag is an overloaded term in FDB. In the early history of FDB, a tag is a number used in SS-to-tag mapping. As FDB evolves, tags are used by different components for different purposes:
* As FDB evolves to HA, tags are used by not only primary tLogs but also satellite tLogs, log router, and remote tLogs;
* As FDB scales and we work to reduce the recovery time, a special tag for transaction state store (txnStateStore) is introduced;
* and so on.
To distinguish the types of tags used for different purposes at different locations (primary DC or remote DC), we introduce Tag structure, which has two fields:
* locality (int8_t): When it is non-negative value, it decides which DC id the tag is used in. For example, if it is 0, it means the tag is used in primary DC and the tags id represents a storage server and is used for primary tLogs to index by storage servers. When it is negative, it decides which types of tags the tag belongs to. For example, if it is -2, it is a log router tag, and its id is used to decide which log router the tagged mutation should be sent to. The definition of all localities are in FDBTypes.h and you can easily find it if you search tagLocalitySpecial in the file.
* id (uint16_t): Once locality decides which FDB components will the tag be applied to, id decides which process in the component type will be used for the tagged mutation.
* FDB components in this context means (i) which DC of tLogs, and (ii) which types of tLogs.
To simplify our discussion in the document, we use “tag.id” to represent a tags id, and tag as the Tag structure that has both locality and id. We represent a Tag as (locality, id).
How does a mutation travel in FDB?
==================================
To simplify the description, we ignore the batching mechanisms happening in each component in the data path that are used to improve the systems performance.
Figure 1 illustrates how a mutation is routed inside FDB. The solid lines are asynchronous pull operations, while the dotted lines are synchronous push operations.
.. image:: images/FDB_ha_write_path.png
At Client
---------
When an application creates a transaction and writes mutations, its FDB client sends the set of mutations to a proxy, say proxy 0. Now lets focus on one of the normal mutations, say m1, whose key is in the normal keyspace.
At Proxy
--------
**Sequencing.** *It first asks the master for the commit version of this transaction batch*. The master acts like a sequencer for FDB transactions to determine the order of transactions to commit by assigning a new commit version and the last assigned commit version as the previous commit version. The transaction log system will use the [previous commit version, commit version] pair to determine its commit order, i.e., only make this transaction durable after the transaction with the previous commit version is made durable.
**Conflict checking.** *Proxy then checks if the transaction has conflicts* with others by sending mutations to resolvers. Resolvers check if there are conflicts among mutations in different transactions from different proxies. Suppose the mutation m1s transaction passes conflict check and can be committed.
**Commit mutation messages.** *Proxy then commits the mutations to tLogs*. Each proxy has the shard-to-tag mapping. It assigns Tags (which has locality and id) to the mutation m1. In the HA cluster in FDB 6.2, the mutation has the following Tags:
* 3 tags for primary DC. Assume they are (0, 1), (0, 2), and (0,5). The tag ids are decided by which primary SSes will eventually save the mutation;
* 3 tags for remote DC. Assume they are (1, 3), (1, 6), (1, 10). The tag ids are decided by which remote SSes will eventually save the mutation;
* 1 tag for log router. Assume it is (-2, 3), where -2 is the locality value for all log router tags. The tag id is randomly chosen by proxy as well.
* No tag for satellite tLog. The "satellite TLog locality" -5 in the code is used when recruiting a satellite TLog to tell it that it is a satellite TLog. This causes the TLog to only index log router tags (-2) and not bother indexing any of the >0 tags.
Why do we need log routers? Why cannot we let remote tLog directly pull data from primary tLogs?
The main reason is to avoid shipping the mutation across WAN multiple times. If you attach remote SS's tags, the same mutation will cross WAN 3 times. In contrast, the router tag reduces it to only 1 time.
Why do we randomly assign tag id for satellite tLogs and log routers?
Another alternative is to use remote SSes tags to decide which satellite tLog and log routers a shard should always go to. We tried that approach before and compared its performance with randomly assigned tags. Evaluation showed that randomly assigning a mutation to satellite tLogs and log routers provide lower latency and higher throughput for these two types of logs. This is somewhat expected: When we randomly assign a mutation to a satellite tlog (and log router), we may assign mutations in the same shard to different satellite tLogs (and log routers). The randomness happens to balance load on the logs.
Proxy groups mutations with the same tag as messages. Proxy then synchronously pushes these mutation messages to tLogs based on the tags. Proxy cannot acknowledge that the transaction is committed until the message has been durable on all primary and satellite tLogs.
**Commit empty messages to tLogs.** When a proxy commits a tagged mutation message at version V1 to tLogs, it also has to commit an empty message at the same version V1 to the rest of tLogs. This makes sure every tLog has the same versions of messages, even though some messages are empty. This is a trick used in FDB to let all tLogs march at the same versions. The reason why FDB does the trick is because the master hands out segments of versions as 'from v1 to v2', and the TLogs need to be able to piece all of them back together into one consistent timeline. It may or may not be a good design decision, because a slow tLog can delay other tLogs of the same kind. We may want to revisit the design later.
At primary tLogs and satellite tLogs
------------------------------------
Once it receives mutations pushed by proxies, it builds indexes for each tags mutations. Primary TLogs index both log router tags and the primary DC's SS tags. Satellite tLogs only index log router tags.
If tLogs mutations cannot be peeked and popped by its consumers (i.e., SSes and log routers) quickly enough, tLogs memory usage will increase. When buffered mutations exceed 1.5GB (configurable by knob), their in-memory index will be spilled into a “Tag,version->disk location” B-tree.
tLogs also maintain two properties:
* It will not make a mutation at version V1 durable until mutations before V1 has been made durable;
* It will not pop (i.e., delete) mutations at version V2, until mutations before V2 have been popped.
At primary SS
-------------
**Primary tLog of a SS.** Since a SSs tag is identically mapped to one tLog. The tLog has all mutations for the SS and is the primary tLog for the SS. When the SS peeks data from tLogs, it will prefer to peek data from its primary tLog. If the primary tLog crashes, it will contact the rest of tLogs, ask for mutations with the SSs tag, and merge them together. This complex merge operation is abstracted in the TagPartitionedLogSystem interface.
**Pulling data from tLogs.** Each SS in the primary DC keeps pulling mutations, whose tag is the SSs tag, from tLogs. Once mutations before a version V1 are made durable on a SS, the SS pops the tag upto the version V1 from *all* tLogs. The pop operation is an RPC to tLogs through the TagPartitionedLogSystem interface.
Since the mutation m1 has three tags for primary SSes, the mutation will be made durable on three primary SSes. This marks the end of the mutations journey in the primary DC.
Now lets look at how the mutation m1 is routed to the remote DC.
At log router
-------------
Log routers are consumers of satellite tLogs or primary tLogs, controlled by a knob LOG_ROUTER_PEEK_FROM_SATELLITES_PREFERRED. By default, the knob is configured for log routers to use satellite tLogs. This relationship is similar to primary SSes to primary tLogs.
Each log router tag is mapped to one log router. Each log router keeps pulling mutations, which have the log routers tag, from satellite tLogs. The number of log router tags is always the same as the number of log routers, which is always some N multiple of the number of satellite logs. Each log router has a preferred satellite TLog that has all of its mutations, so in the normal steady state, each satellite should have N log routers peeking from it (and only it).
Log router buffers its mutations in memory and waits for the remote tLogs to peek and pop its data. If the buffered data cannot be popped by remote tLog quickly enough, log routers memory usage will increase. To avoid out of memory (OOM), a log router only buffers 5 seconds of mutations in memory. It pauses peeking data from satellite tLogs until its excessive buffered mutations have been popped by remote tLogs.
At remote tLogs
---------------
Remote tLogs are consumers of log routers. Each remote tLog keeps pulling mutations, which have the remote tLogs tag, from log routers. Because log router tags are randomly chosen for mutations, a remote tLogs mutations can spread across all log routers. So each remote tLog must contact all log routers for its data and merge these mutations in increasing order of versions on the remote tLog.
Once a remote tLog collects and merge mutations from all log routers, it makes them durable on disk, index them based on their tags, and pop the mutations from log routers.
Now the mutation m1 has arrived at the remote tLog, which is similar as when it arrives at the primary tLog.
At remote SSes
--------------
Similar to how primary SSes pull mutations from primary tLogs, each remote SS keeps pulling mutations, which have its tag, from remote tLogs. Once a remote SS makes mutations up to a version V1 durable, the SS pops its tag to the version V1 from all remote tLogs.
Implementation
==============
* proxy assigns tags to a mutation:
https://github.com/apple/foundationdb/blob/7eabdf784a21bca102f84e7eaf14bafc54605dff/fdbserver/MasterProxyServer.actor.cpp#L1410
Mutation Serialization (WiP)
============================
This section will go into detail on how mutations are serialized as preparation for ingestion into the TagPartitionedLogSystem. This has also been covered at:
https://drive.google.com/file/d/1OaP5bqH2kst1VxD6RWj8h2cdr9rhhBHy/view
The proxy handles splitting transactions into their individual mutations. These mutations are then serialized and synchronously sent to multiple transaction logs.
The process starts in *commitBatch*. Eventually, *assignMutationsToStorageServers* is called to assign mutations to storage servers and serialize them. This function loops over each mutation in each transaction, determining the set of tags for the mutation (which storage servers it will be sent to), and then calling *LogPushData.writeTypedMessage* on the mutation.
The *LogPushData* class is used to hold serialized mutations on a per transaction log basis. Its *messagesWriter* field holds one *BinaryWriter* per transaction log.
*LogPushData.writeTypedMessage* is the function that serializes each mutation and writes it to the correct binary stream to be sent to the corresponding transaction log. Each serialized mutation contains additional metadata about the message, with the format:
.. image:: /images/serialized_mutation_metadata_format.png
* Message size: size of the message, in bytes, excluding the four bytes used for the message size
* Subsequence: integer value used for message ordering
* # of tags: integer value used to indicate the number of tags following
* Tag: serialized *Tag* object, repeated # of tags times for each location
Metadata takes up (10 + 3 * number_of_tags) bytes of each serialized mutation.
There is an additional metadata message prepended to the list of mutations in certain circumstances. To assist with visibility efforts, transaction logs and storage servers need to be able to associate a mutation with the transaction it was part of. This allows individual transactions to be tracked as they travel throughout FDB. Thus, at the beginning of each transaction, a *SpanProtocolMessage* will be written to the message stream before the first mutation for each location. A *SpanProtocolMessage* is a separate message, similar to the *LogProtocolMessage*, which holds metadata about the transaction itself.
An example may work best to illustrate the serialization process. Assume a client submits a transaction consisting of two mutations, m1 and m2. The proxy determines that m1 should be sent to tlogs 1, 2, and 3, while m2 should be sent to tlogs 2, 3, and 4. When m1 is serialized, a *LogProtocolMessage* will be written to the message stream for tlogs 1, 2, and 3 before the serialized m1 is written. Next, when m2 is serialized, a *LogProtocolMessage* will only be written to tlog 4, because tlogs 2 and 3 have already had a *LogProtocolMessage* written to them *for the transaction*. When all mutations in a transaction have been written, the process starts over for the next transaction.
This allows all transaction logs to receive information about the transaction each mutation is a part of. Storage servers will pull this information when pulling mutations, allowing them to track transaction info as well.

Binary file not shown.

After

(image error) Size: 790 KiB

Binary file not shown.

After

(image error) Size: 115 KiB

@ -2,6 +2,11 @@
Release Notes
#############
6.3.18
======
* The multi-version client API would not propagate errors that occurred when creating databases on external clients. This could result in a invalid memory accesses. `(PR #5221) <https://github.com/apple/foundationdb/pull/5221>`_
* Fixed a race between the multi-version client connecting to a cluster and destroying the database that could cause an assertion failure. `(PR #5221) <https://github.com/apple/foundationdb/pull/5221>`_
6.3.15
======

@ -60,6 +60,8 @@ Fixes
* Added a new pre-backup action when creating a backup. Backups can now either verify the range data is being saved to is empty before the backup begins (current behavior) or clear the range where data is being saved to. Fixes a ``restore_destination_not_empty`` failure after a backup retry due to ``commit_unknown_failure``. `(PR #4595) <https://github.com/apple/foundationdb/pull/4595>`_
* When configured with ``usable_regions=2``, a cluster would not fail over to a region which contained only storage class processes. `(PR #4599) <https://github.com/apple/foundationdb/pull/4599>`_
* If a restore is done using a prefix to remove and specific key ranges to restore, the key range boundaries must begin with the prefix to remove. `(PR #4684) <https://github.com/apple/foundationdb/pull/4684>`_
* The multi-version client API would not propagate errors that occurred when creating databases on external clients. This could result in a invalid memory accesses. `(PR #5220) <https://github.com/apple/foundationdb/pull/5220>`_
* Fixed a race between the multi-version client connecting to a cluster and destroying the database that could cause an assertion failure. `(PR #5220) <https://github.com/apple/foundationdb/pull/5220>`_
Status
------

@ -30,6 +30,8 @@ These documents explain the engineering design of FoundationDB, with detailed in
* :doc:`read-write-path` describes how FDB read and write path works.
* :doc:`ha-write-path` describes how FDB write path works in HA setting.
.. toctree::
:maxdepth: 1
:titlesonly:
@ -48,3 +50,4 @@ These documents explain the engineering design of FoundationDB, with detailed in
testing
kv-architecture
read-write-path
ha-write-path

@ -170,9 +170,11 @@ public:
}
Reference<IAsyncFile> f =
makeReference<ReadFile>(self->asyncTaskThread, self->containerName, fileName, self->client.get());
#if ENCRYPTION_ENABLED
if (self->usesEncryption()) {
f = makeReference<AsyncFileEncrypted>(f, AsyncFileEncrypted::Mode::READ_ONLY);
}
#endif
return f;
}
@ -182,11 +184,12 @@ public:
auto outcome = client->create_append_blob(containerName, fileName).get();
return Void();
}));
Reference<IAsyncFile> f =
makeReference<WriteFile>(self->asyncTaskThread, self->containerName, fileName, self->client.get());
auto f = makeReference<WriteFile>(self->asyncTaskThread, self->containerName, fileName, self->client.get());
#if ENCRYPTION_ENABLED
if (self->usesEncryption()) {
f = makeReference<AsyncFileEncrypted>(f, AsyncFileEncrypted::Mode::APPEND_ONLY);
}
#endif
return makeReference<BackupFile>(fileName, f);
}

@ -1127,6 +1127,7 @@ public:
return false;
}
#if ENCRYPTION_ENABLED
ACTOR static Future<Void> createTestEncryptionKeyFile(std::string filename) {
state Reference<IAsyncFile> keyFile = wait(IAsyncFileSystem::filesystem()->open(
filename,
@ -1163,6 +1164,8 @@ public:
StreamCipher::Key::initializeKey(std::move(key));
return Void();
}
#endif // ENCRYPTION_ENABLED
}; // class BackupContainerFileSystemImpl
Future<Reference<IBackupFile>> BackupContainerFileSystem::writeLogFile(Version beginVersion,
@ -1475,13 +1478,22 @@ bool BackupContainerFileSystem::usesEncryption() const {
Future<Void> BackupContainerFileSystem::encryptionSetupComplete() const {
return encryptionSetupFuture;
}
void BackupContainerFileSystem::setEncryptionKey(Optional<std::string> const& encryptionKeyFileName) {
if (encryptionKeyFileName.present()) {
#if ENCRYPTION_ENABLED
encryptionSetupFuture = BackupContainerFileSystemImpl::readEncryptionKey(encryptionKeyFileName.get());
#else
encryptionSetupFuture = Void();
#endif
}
}
Future<Void> BackupContainerFileSystem::createTestEncryptionKeyFile(std::string const &filename) {
#if ENCRYPTION_ENABLED
return BackupContainerFileSystemImpl::createTestEncryptionKeyFile(filename);
#else
return Void();
#endif
}
namespace backup_test {

@ -152,7 +152,6 @@ public:
VectorRef<KeyRangeRef> keyRangesFilter,
bool logsOnly,
Version beginVersion) final;
static Future<Void> createTestEncryptionKeyFile(std::string const& filename);
protected:

@ -20,7 +20,9 @@
#include "fdbclient/AsyncFileS3BlobStore.actor.h"
#include "fdbclient/BackupContainerS3BlobStore.h"
#if (!defined(TLS_DISABLED) && !defined(_WIN32))
#include "fdbrpc/AsyncFileEncrypted.h"
#endif
#include "fdbrpc/AsyncFileReadAhead.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -171,9 +173,12 @@ std::string BackupContainerS3BlobStore::getURLFormat() {
Future<Reference<IAsyncFile>> BackupContainerS3BlobStore::readFile(const std::string& path) {
Reference<IAsyncFile> f = makeReference<AsyncFileS3BlobStoreRead>(m_bstore, m_bucket, dataPath(path));
#if ENCRYPTION_ENABLED
if (usesEncryption()) {
f = makeReference<AsyncFileEncrypted>(f, AsyncFileEncrypted::Mode::READ_ONLY);
}
#endif
f = makeReference<AsyncFileReadAheadCache>(f,
m_bstore->knobs.read_block_size,
m_bstore->knobs.read_ahead_blocks,
@ -189,9 +194,11 @@ Future<std::vector<std::string>> BackupContainerS3BlobStore::listURLs(Reference<
Future<Reference<IBackupFile>> BackupContainerS3BlobStore::writeFile(const std::string& path) {
Reference<IAsyncFile> f = makeReference<AsyncFileS3BlobStoreWrite>(m_bstore, m_bucket, dataPath(path));
#if ENCRYPTION_ENABLED
if (usesEncryption()) {
f = makeReference<AsyncFileEncrypted>(f, AsyncFileEncrypted::Mode::APPEND_ONLY);
}
#endif
return Future<Reference<IBackupFile>>(makeReference<BackupContainerS3BlobStoreImpl::BackupFile>(path, f));
}

@ -196,7 +196,7 @@ public:
Reference<CommitProxyInfo> getCommitProxies(bool useProvisionalProxies);
Future<Reference<CommitProxyInfo>> getCommitProxiesFuture(bool useProvisionalProxies);
Reference<GrvProxyInfo> getGrvProxies(bool useProvisionalProxies);
Future<Void> onProxiesChanged();
Future<Void> onProxiesChanged() const;
Future<HealthMetrics> getHealthMetrics(bool detailed);
// Returns the protocol version reported by the coordinator this client is connected to
@ -255,7 +255,7 @@ public:
// private:
explicit DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile,
Reference<AsyncVar<ClientDBInfo>> clientDBInfo,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>> const> coordinator,
Future<Void> clientInfoMonitor,
TaskPriority taskID,
LocalityData const& clientLocality,
@ -307,7 +307,7 @@ public:
// trust that the read version (possibly set manually by the application) is actually from the correct cluster.
// Updated everytime we get a GRV response
Version minAcceptableReadVersion = std::numeric_limits<Version>::max();
void validateVersion(Version);
void validateVersion(Version) const;
// Client status updater
struct ClientStatusUpdater {
@ -399,7 +399,7 @@ public:
Future<Void> connected;
// An AsyncVar that reports the coordinator this DatabaseContext is interacting with
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator;
Reference<AsyncVar<Optional<ClientLeaderRegInterface>> const> coordinator;
Reference<AsyncVar<Optional<ClusterInterface>>> statusClusterInterface;
Future<Void> statusLeaderMon;
@ -428,7 +428,6 @@ public:
static bool debugUseTags;
static const std::vector<std::string> debugTransactionTagChoices;
std::unordered_map<KeyRef, Reference<WatchMetadata>> watchMap;
// Adds or updates the specified (SS, TSS) pair in the TSS mapping (if not already present).
// Requests to the storage server will be duplicated to the TSS.
@ -437,6 +436,9 @@ public:
// Removes the storage server and its TSS pair from the TSS mapping (if present).
// Requests to the storage server will no longer be duplicated to its pair TSS.
void removeTssMapping(StorageServerInterface const& ssi);
private:
std::unordered_map<KeyRef, Reference<WatchMetadata>> watchMap;
};
#endif

@ -72,7 +72,7 @@ public:
// to allow global configuration to run transactions on the latest
// database.
template <class T>
static void create(Database& cx, Reference<AsyncVar<T>> db, const ClientDBInfo* dbInfo) {
static void create(Database& cx, Reference<AsyncVar<T> const> db, const ClientDBInfo* dbInfo) {
if (g_network->global(INetwork::enGlobalConfig) == nullptr) {
auto config = new GlobalConfig{ cx };
g_network->setGlobal(INetwork::enGlobalConfig, config);

@ -49,7 +49,7 @@ struct ClientData {
OpenDatabaseRequest getRequest();
ClientData() : clientInfo(new AsyncVar<CachedSerialization<ClientDBInfo>>(CachedSerialization<ClientDBInfo>())) {}
ClientData() : clientInfo(makeReference<AsyncVar<CachedSerialization<ClientDBInfo>>>()) {}
};
struct MonitorLeaderInfo {

@ -591,7 +591,7 @@ Reference<IDatabase> DLApi::createDatabase609(const char* clusterFilePath) {
Reference<IDatabase> DLApi::createDatabase(const char* clusterFilePath) {
if (headerVersion >= 610) {
FdbCApi::FDBDatabase* db;
api->createDatabase(clusterFilePath, &db);
throwIfError(api->createDatabase(clusterFilePath, &db));
return Reference<IDatabase>(new DLDatabase(api, db));
} else {
return DLApi::createDatabase609(clusterFilePath);
@ -895,22 +895,43 @@ MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi* api,
api->runOnExternalClients(threadIdx, [this](Reference<ClientInfo> client) { dbState->addClient(client); });
if (!externalClientsInitialized.test_and_set()) {
api->runOnExternalClientsAllThreads([&clusterFilePath](Reference<ClientInfo> client) {
// This creates a database to initialize some client state on the external library
// We only do this on 6.2+ clients to avoid some bugs associated with older versions
// This deletes the new database immediately to discard its connections
if (client->protocolVersion.hasCloseUnusedConnection()) {
api->runOnExternalClientsAllThreads([&clusterFilePath](Reference<ClientInfo> client) {
// This creates a database to initialize some client state on the external library.
// We only do this on 6.2+ clients to avoid some bugs associated with older versions.
// This deletes the new database immediately to discard its connections.
//
// Simultaneous attempts to create a database could result in us running this initialization
// code in multiple threads simultaneously. It is necessary that each attempt have a chance
// to run this initialization in case the other fails, and it's safe to run them in parallel.
if (client->protocolVersion.hasCloseUnusedConnection() && !client->initialized) {
try {
Reference<IDatabase> newDb = client->api->createDatabase(clusterFilePath.c_str());
client->initialized = true;
} catch (Error& e) {
// This connection is not initialized. It is still possible to connect with it,
// but we may not see trace logs from this client until a successful connection
// is established.
TraceEvent(SevWarnAlways, "FailedToInitializeExternalClient")
.detail("LibraryPath", client->libPath)
.detail("ClusterFilePath", clusterFilePath)
.error(e);
}
});
}
}
});
// For clients older than 6.2 we create and maintain our database connection
api->runOnExternalClients(threadIdx, [this, &clusterFilePath](Reference<ClientInfo> client) {
if (!client->protocolVersion.hasCloseUnusedConnection()) {
dbState->legacyDatabaseConnections[client->protocolVersion] =
client->api->createDatabase(clusterFilePath.c_str());
try {
dbState->legacyDatabaseConnections[client->protocolVersion] =
client->api->createDatabase(clusterFilePath.c_str());
} catch (Error& e) {
// This connection is discarded
TraceEvent(SevWarnAlways, "FailedToCreateLegacyDatabaseConnection")
.detail("LibraryPath", client->libPath)
.detail("ClusterFilePath", clusterFilePath)
.error(e);
}
}
});
@ -994,7 +1015,7 @@ ThreadFuture<ProtocolVersion> MultiVersionDatabase::getServerProtocol(Optional<P
MultiVersionDatabase::DatabaseState::DatabaseState(std::string clusterFilePath, Reference<IDatabase> versionMonitorDb)
: clusterFilePath(clusterFilePath), versionMonitorDb(versionMonitorDb),
dbVar(new ThreadSafeAsyncVar<Reference<IDatabase>>(Reference<IDatabase>(nullptr))) {}
dbVar(new ThreadSafeAsyncVar<Reference<IDatabase>>(Reference<IDatabase>(nullptr))), closed(false) {}
// Adds a client (local or externally loaded) that can be used to connect to the cluster
void MultiVersionDatabase::DatabaseState::addClient(Reference<ClientInfo> client) {
@ -1058,6 +1079,10 @@ ThreadFuture<Void> MultiVersionDatabase::DatabaseState::monitorProtocolVersion()
// Called when a change to the protocol version of the cluster has been detected.
// Must be called from the main thread
void MultiVersionDatabase::DatabaseState::protocolVersionChanged(ProtocolVersion protocolVersion) {
if (closed) {
return;
}
// If the protocol version changed but is still compatible, update our local version but keep the same connection
if (dbProtocolVersion.present() &&
protocolVersion.normalizedVersion() == dbProtocolVersion.get().normalizedVersion()) {
@ -1084,7 +1109,20 @@ void MultiVersionDatabase::DatabaseState::protocolVersionChanged(ProtocolVersion
.detail("Failed", client->failed)
.detail("External", client->external);
Reference<IDatabase> newDb = client->api->createDatabase(clusterFilePath.c_str());
Reference<IDatabase> newDb;
try {
newDb = client->api->createDatabase(clusterFilePath.c_str());
} catch (Error& e) {
TraceEvent(SevWarnAlways, "MultiVersionClientFailedToCreateDatabase")
.detail("LibraryPath", client->libPath)
.detail("External", client->external)
.detail("ClusterFilePath", clusterFilePath)
.error(e);
// Put the client in a disconnected state until the version changes again
updateDatabase(Reference<IDatabase>(), Reference<ClientInfo>());
return;
}
if (client->external && !MultiVersionApi::apiVersionAtLeast(610)) {
// Old API versions return a future when creating the database, so we need to wait for it
@ -1112,6 +1150,10 @@ void MultiVersionDatabase::DatabaseState::protocolVersionChanged(ProtocolVersion
// Replaces the active database connection with a new one. Must be called from the main thread.
void MultiVersionDatabase::DatabaseState::updateDatabase(Reference<IDatabase> newDb, Reference<ClientInfo> client) {
if (closed) {
return;
}
if (newDb) {
optionLock.enter();
for (auto option : options) {
@ -1143,12 +1185,28 @@ void MultiVersionDatabase::DatabaseState::updateDatabase(Reference<IDatabase> ne
versionMonitorDb = db;
} else {
// For older clients that don't have an API to get the protocol version, we have to monitor it locally
versionMonitorDb = MultiVersionApi::api->getLocalClient()->api->createDatabase(clusterFilePath.c_str());
try {
versionMonitorDb = MultiVersionApi::api->getLocalClient()->api->createDatabase(clusterFilePath.c_str());
} catch (Error& e) {
// We can't create a new database to monitor the cluster version. This means we will continue using the
// previous one, which should hopefully continue to work.
TraceEvent(SevWarnAlways, "FailedToCreateDatabaseForVersionMonitoring")
.detail("ClusterFilePath", clusterFilePath)
.error(e);
}
}
} else {
// We don't have a database connection, so use the local client to monitor the protocol version
db = Reference<IDatabase>();
versionMonitorDb = MultiVersionApi::api->getLocalClient()->api->createDatabase(clusterFilePath.c_str());
try {
versionMonitorDb = MultiVersionApi::api->getLocalClient()->api->createDatabase(clusterFilePath.c_str());
} catch (Error& e) {
// We can't create a new database to monitor the cluster version. This means we will continue using the
// previous one, which should hopefully continue to work.
TraceEvent(SevWarnAlways, "FailedToCreateDatabaseForVersionMonitoring")
.detail("ClusterFilePath", clusterFilePath)
.error(e);
}
}
dbVar->set(db);
@ -1178,6 +1236,7 @@ void MultiVersionDatabase::DatabaseState::close() {
Reference<DatabaseState> self = Reference<DatabaseState>::addRef(this);
onMainThreadVoid(
[self]() {
self->closed = true;
if (self->protocolVersionMonitor.isValid()) {
self->protocolVersionMonitor.cancel();
}
@ -1255,8 +1314,6 @@ void MultiVersionDatabase::LegacyVersionMonitor::close() {
}
}
std::atomic_flag MultiVersionDatabase::externalClientsInitialized = ATOMIC_FLAG_INIT;
// MultiVersionApi
bool MultiVersionApi::apiVersionAtLeast(int minVersion) {
ASSERT_NE(MultiVersionApi::api->apiVersion, 0);

@ -417,12 +417,15 @@ struct ClientInfo : ClientDesc, ThreadSafeReferenceCounted<ClientInfo> {
ProtocolVersion protocolVersion;
IClientApi* api;
bool failed;
std::atomic_bool initialized;
std::vector<std::pair<void (*)(void*), void*>> threadCompletionHooks;
ClientInfo() : ClientDesc(std::string(), false), protocolVersion(0), api(nullptr), failed(true) {}
ClientInfo(IClientApi* api) : ClientDesc("internal", false), protocolVersion(0), api(api), failed(false) {}
ClientInfo()
: ClientDesc(std::string(), false), protocolVersion(0), api(nullptr), failed(true), initialized(false) {}
ClientInfo(IClientApi* api)
: ClientDesc("internal", false), protocolVersion(0), api(api), failed(false), initialized(false) {}
ClientInfo(IClientApi* api, std::string libPath)
: ClientDesc(libPath, true), protocolVersion(0), api(api), failed(false) {}
: ClientDesc(libPath, true), protocolVersion(0), api(api), failed(false), initialized(false) {}
void loadProtocolVersion();
bool canReplace(Reference<ClientInfo> other) const;
@ -504,10 +507,9 @@ public:
// this will be a specially created local db.
Reference<IDatabase> versionMonitorDb;
bool closed;
ThreadFuture<Void> changed;
bool cancelled;
ThreadFuture<Void> dbReady;
ThreadFuture<Void> protocolVersionMonitor;
@ -557,10 +559,6 @@ public:
const Reference<DatabaseState> dbState;
friend class MultiVersionTransaction;
// Clients must create a database object in order to initialize some of their state.
// This needs to be done only once, and this flag tracks whether that has happened.
static std::atomic_flag externalClientsInitialized;
};
// An implementation of IClientApi that can choose between multiple different client implementations either provided

@ -285,7 +285,7 @@ std::string unprintable(std::string const& val) {
return s;
}
void DatabaseContext::validateVersion(Version version) {
void DatabaseContext::validateVersion(Version version) const {
// Version could be 0 if the INITIALIZE_NEW_DATABASE option is set. In that case, it is illegal to perform any
// reads. We throw client_invalid_operation because the caller didn't directly set the version, so the
// version_invalid error might be confusing.
@ -650,7 +650,7 @@ ACTOR static Future<Void> clientStatusUpdateActor(DatabaseContext* cx) {
}
}
ACTOR static Future<Void> monitorProxiesChange(Reference<AsyncVar<ClientDBInfo>> clientDBInfo,
ACTOR static Future<Void> monitorProxiesChange(Reference<AsyncVar<ClientDBInfo> const> clientDBInfo,
AsyncTrigger* triggerVar) {
state vector<CommitProxyInterface> curCommitProxies;
state vector<GrvProxyInterface> curGrvProxies;
@ -1085,7 +1085,7 @@ Future<RangeResult> HealthMetricsRangeImpl::getRange(ReadYourWritesTransaction*
DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile,
Reference<AsyncVar<ClientDBInfo>> clientInfo,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>> const> coordinator,
Future<Void> clientInfoMonitor,
TaskPriority taskID,
LocalityData const& clientLocality,
@ -1482,7 +1482,7 @@ void DatabaseContext::invalidateCache(const KeyRangeRef& keys) {
locationCache.insert(KeyRangeRef(begin, end), Reference<LocationInfo>());
}
Future<Void> DatabaseContext::onProxiesChanged() {
Future<Void> DatabaseContext::onProxiesChanged() const {
return this->proxiesChangeTrigger.onTrigger();
}
@ -1759,7 +1759,8 @@ Database Database::createDatabase(Reference<ClusterConnectionFile> connFile,
}
auto database = Database(db);
GlobalConfig::create(database, clientInfo, std::addressof(clientInfo->get()));
GlobalConfig::create(
database, Reference<AsyncVar<ClientDBInfo> const>(clientInfo), std::addressof(clientInfo->get()));
return database;
}
@ -5760,7 +5761,7 @@ ACTOR Future<Optional<ProtocolVersion>> getCoordinatorProtocolFromConnectPacket(
NetworkAddress coordinatorAddress,
Optional<ProtocolVersion> expectedVersion) {
state Reference<AsyncVar<Optional<ProtocolVersion>>> protocolVersion =
state Reference<AsyncVar<Optional<ProtocolVersion>> const> protocolVersion =
FlowTransport::transport().getPeerProtocolAsyncVar(coordinatorAddress);
loop {
@ -5785,7 +5786,7 @@ ACTOR Future<Optional<ProtocolVersion>> getCoordinatorProtocolFromConnectPacket(
// Returns the protocol version reported by the given coordinator
// If an expected version is given, the future won't return until the protocol version is different than expected
ACTOR Future<ProtocolVersion> getClusterProtocolImpl(
Reference<AsyncVar<Optional<ClientLeaderRegInterface>>> coordinator,
Reference<AsyncVar<Optional<ClientLeaderRegInterface>> const> coordinator,
Optional<ProtocolVersion> expectedVersion) {
state bool needToConnect = true;

@ -335,7 +335,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
// KeyValueStoreRocksDB
init( ROCKSDB_BACKGROUND_PARALLELISM, 0 );
init( ROCKSDB_READ_PARALLELISM, 4 );
init( ROCKSDB_MEMTABLE_BYTES, 512 * 1024 * 1024 );
// Use a smaller memtable in simulation to avoid OOMs.
int64_t memtableBytes = isSimulated ? 32 * 1024 : 512 * 1024 * 1024;
init( ROCKSDB_MEMTABLE_BYTES, memtableBytes );
init( ROCKSDB_UNSAFE_AUTO_FSYNC, false );
init( ROCKSDB_PERIODIC_COMPACTION_SECONDS, 0 );
init( ROCKSDB_PREFIX_LEN, 0 );

@ -26,6 +26,8 @@
#include "flow/IRandom.h"
#include "flow/StreamCipher.h"
#if ENCRYPTION_ENABLED
#include <array>
/*
@ -79,3 +81,5 @@ public:
void releaseZeroCopy(void* data, int length, int64_t offset) override;
int64_t debugFD() const override;
};
#endif // ENCRYPTION_ENABLED

@ -1,6 +1,7 @@
set(FDBRPC_SRCS
AsyncFileCached.actor.h
AsyncFileEIO.actor.h
AsyncFileEncrypted.h
AsyncFileKAIO.actor.h
AsyncFileNonDurable.actor.h
AsyncFileReadAhead.actor.h
@ -36,7 +37,6 @@ set(FDBRPC_SRCS
if(WITH_TLS AND NOT WIN32)
set(FDBRPC_SRCS
${FDBRPC_SRCS}
AsyncFileEncrypted.h
AsyncFileEncrypted.actor.cpp)
endif()

@ -1698,7 +1698,7 @@ Reference<AsyncVar<bool>> FlowTransport::getDegraded() {
//
// Note that this function does not establish a connection to the peer. In order to obtain a peer's protocol
// version, some other mechanism should be used to connect to that peer.
Reference<AsyncVar<Optional<ProtocolVersion>>> FlowTransport::getPeerProtocolAsyncVar(NetworkAddress addr) {
Reference<AsyncVar<Optional<ProtocolVersion>> const> FlowTransport::getPeerProtocolAsyncVar(NetworkAddress addr) {
return self->peers.at(addr)->protocolVersion;
}
@ -1723,4 +1723,4 @@ void FlowTransport::createInstance(bool isClient, uint64_t transportId) {
HealthMonitor* FlowTransport::healthMonitor() {
return &self->healthMonitor;
}
}

@ -252,7 +252,7 @@ public:
//
// Note that this function does not establish a connection to the peer. In order to obtain a peer's protocol
// version, some other mechanism should be used to connect to that peer.
Reference<AsyncVar<Optional<ProtocolVersion>>> getPeerProtocolAsyncVar(NetworkAddress addr);
Reference<AsyncVar<Optional<ProtocolVersion>> const> getPeerProtocolAsyncVar(NetworkAddress addr);
static FlowTransport& transport() {
return *static_cast<FlowTransport*>((void*)g_network->global(INetwork::enFlowTransport));

@ -32,9 +32,7 @@
#include "fdbrpc/AsyncFileCached.actor.h"
#include "fdbrpc/AsyncFileEIO.actor.h"
#if (!defined(TLS_DISABLED) && !defined(_WIN32))
#include "fdbrpc/AsyncFileEncrypted.h"
#endif
#include "fdbrpc/AsyncFileWinASIO.actor.h"
#include "fdbrpc/AsyncFileKAIO.actor.h"
#include "flow/AsioReactor.h"
@ -79,14 +77,14 @@ Future<Reference<class IAsyncFile>> Net2FileSystem::open(const std::string& file
static_cast<boost::asio::io_service*>((void*)g_network->global(INetwork::enASIOService)));
if (FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0)
f = map(f, [=](Reference<IAsyncFile> r) { return Reference<IAsyncFile>(new AsyncFileWriteChecker(r)); });
#if (!defined(TLS_DISABLED) && !defined(_WIN32))
#if ENCRYPTION_ENABLED
if (flags & IAsyncFile::OPEN_ENCRYPTED)
f = map(f, [flags](Reference<IAsyncFile> r) {
auto mode = flags & IAsyncFile::OPEN_READWRITE ? AsyncFileEncrypted::Mode::APPEND_ONLY
: AsyncFileEncrypted::Mode::READ_ONLY;
return Reference<IAsyncFile>(new AsyncFileEncrypted(r, mode));
});
#endif
#endif // ENCRYPTION_ENABLED
return f;
}

@ -33,9 +33,7 @@
#include "flow/Util.h"
#include "fdbrpc/IAsyncFile.h"
#include "fdbrpc/AsyncFileCached.actor.h"
#if (!defined(TLS_DISABLED) && !defined(_WIN32))
#include "fdbrpc/AsyncFileEncrypted.h"
#endif
#include "fdbrpc/AsyncFileNonDurable.actor.h"
#include "flow/crc32c.h"
#include "fdbrpc/TraceFileIO.h"
@ -2477,14 +2475,14 @@ Future<Reference<class IAsyncFile>> Sim2FileSystem::open(const std::string& file
f = AsyncFileDetachable::open(f);
if (FLOW_KNOBS->PAGE_WRITE_CHECKSUM_HISTORY > 0)
f = map(f, [=](Reference<IAsyncFile> r) { return Reference<IAsyncFile>(new AsyncFileWriteChecker(r)); });
#if (!defined(TLS_DISABLED) && !defined(_WIN32))
#if ENCRYPTION_ENABLED
if (flags & IAsyncFile::OPEN_ENCRYPTED)
f = map(f, [flags](Reference<IAsyncFile> r) {
auto mode = flags & IAsyncFile::OPEN_READWRITE ? AsyncFileEncrypted::Mode::APPEND_ONLY
: AsyncFileEncrypted::Mode::READ_ONLY;
return Reference<IAsyncFile>(new AsyncFileEncrypted(r, mode));
});
#endif
#endif // ENCRYPTION_ENABLED
return f;
} else
return AsyncFileCached::open(filename, flags, mode);

@ -237,7 +237,7 @@ struct BackupData {
CounterCollection cc;
Future<Void> logger;
explicit BackupData(UID id, Reference<AsyncVar<ServerDBInfo>> db, const InitializeBackupRequest& req)
explicit BackupData(UID id, Reference<AsyncVar<ServerDBInfo> const> db, const InitializeBackupRequest& req)
: myId(id), tag(req.routerTag), totalTags(req.totalTags), startVersion(req.startVersion),
endVersion(req.endVersion), recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch),
minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion - 1), popVersion(req.startVersion - 1),
@ -987,7 +987,7 @@ ACTOR Future<Void> monitorBackupKeyOrPullData(BackupData* self, bool keyPresent)
}
}
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db, LogEpoch recoveryCount, BackupData* self) {
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo> const> db, LogEpoch recoveryCount, BackupData* self) {
loop {
bool isDisplaced =
db->get().recoveryCount > recoveryCount && db->get().recoveryState != RecoveryState::UNINITIALIZED;
@ -1033,7 +1033,7 @@ ACTOR static Future<Void> monitorWorkerPause(BackupData* self) {
ACTOR Future<Void> backupWorker(BackupInterface interf,
InitializeBackupRequest req,
Reference<AsyncVar<ServerDBInfo>> db) {
Reference<AsyncVar<ServerDBInfo> const> db) {
state BackupData self(interf.id(), db, req);
state PromiseStream<Future<Void>> addActor;
state Future<Void> error = actorCollection(addActor.getFuture());

@ -1596,7 +1596,7 @@ ACTOR static Future<Void> rejoinServer(CommitProxyInterface proxy, ProxyCommitDa
}
}
ACTOR Future<Void> ddMetricsRequestServer(CommitProxyInterface proxy, Reference<AsyncVar<ServerDBInfo>> db) {
ACTOR Future<Void> ddMetricsRequestServer(CommitProxyInterface proxy, Reference<AsyncVar<ServerDBInfo> const> db) {
loop {
choose {
when(state GetDDMetricsRequest req = waitNext(proxy.getDDMetrics.getFuture())) {
@ -1754,7 +1754,8 @@ ACTOR Future<Void> proxySnapCreate(ProxySnapRequest snapReq, ProxyCommitData* co
return Void();
}
ACTOR Future<Void> proxyCheckSafeExclusion(Reference<AsyncVar<ServerDBInfo>> db, ExclusionSafetyCheckRequest req) {
ACTOR Future<Void> proxyCheckSafeExclusion(Reference<AsyncVar<ServerDBInfo> const> db,
ExclusionSafetyCheckRequest req) {
TraceEvent("SafetyCheckCommitProxyBegin");
state ExclusionSafetyCheckReply reply(false);
if (!db->get().distributor.present()) {
@ -1783,7 +1784,7 @@ ACTOR Future<Void> proxyCheckSafeExclusion(Reference<AsyncVar<ServerDBInfo>> db,
}
ACTOR Future<Void> reportTxnTagCommitCost(UID myID,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
UIDTransactionTagMap<TransactionCommitCostEstimation>* ssTrTagCommitCost) {
state Future<Void> nextRequestTimer = Never();
state Future<Void> nextReply = Never();
@ -1818,7 +1819,7 @@ ACTOR Future<Void> reportTxnTagCommitCost(UID myID,
ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
MasterInterface master,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
LogEpoch epoch,
Version recoveryTransactionVersion,
bool firstProxy,
@ -2037,7 +2038,7 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
}
}
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db,
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo> const> db,
uint64_t recoveryCount,
CommitProxyInterface myInterface) {
loop {
@ -2051,7 +2052,7 @@ ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db,
ACTOR Future<Void> commitProxyServer(CommitProxyInterface proxy,
InitializeCommitProxyRequest req,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
std::string whitelistBinPaths) {
try {
state Future<Void> core = commitProxyServerCore(proxy,

@ -126,7 +126,7 @@ class ReadFromLocalConfigEnvironment {
UID id;
std::string dataDir;
LocalConfiguration localConfiguration;
Reference<IDependentAsyncVar<ConfigBroadcastFollowerInterface> const> cbfi;
Reference<IAsyncListener<ConfigBroadcastFollowerInterface> const> cbfi;
Future<Void> consumer;
ACTOR static Future<Void> checkEventually(LocalConfiguration const* localConfiguration,
@ -168,7 +168,7 @@ public:
return setup();
}
void connectToBroadcaster(Reference<IDependentAsyncVar<ConfigBroadcastFollowerInterface> const> const& cbfi) {
void connectToBroadcaster(Reference<IAsyncListener<ConfigBroadcastFollowerInterface> const> const& cbfi) {
ASSERT(!this->cbfi);
this->cbfi = cbfi;
consumer = localConfiguration.consume(cbfi);
@ -228,7 +228,7 @@ class BroadcasterToLocalConfigEnvironment {
ACTOR static Future<Void> setup(BroadcasterToLocalConfigEnvironment* self) {
wait(self->readFrom.setup());
self->readFrom.connectToBroadcaster(IDependentAsyncVar<ConfigBroadcastFollowerInterface>::create(self->cbfi));
self->readFrom.connectToBroadcaster(IAsyncListener<ConfigBroadcastFollowerInterface>::create(self->cbfi));
self->broadcastServer = self->broadcaster.serve(self->cbfi->get());
return Void();
}
@ -364,7 +364,7 @@ class TransactionToLocalConfigEnvironment {
ACTOR static Future<Void> setup(TransactionToLocalConfigEnvironment* self) {
wait(self->readFrom.setup());
self->readFrom.connectToBroadcaster(IDependentAsyncVar<ConfigBroadcastFollowerInterface>::create(self->cbfi));
self->readFrom.connectToBroadcaster(IAsyncListener<ConfigBroadcastFollowerInterface>::create(self->cbfi));
self->broadcastServer = self->broadcaster.serve(self->cbfi->get());
return Void();
}

@ -5218,7 +5218,7 @@ ACTOR Future<Void> initializeStorage(DDTeamCollection* self,
}
ACTOR Future<Void> storageRecruiter(DDTeamCollection* self,
Reference<AsyncVar<struct ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
const DDEnabledState* ddEnabledState) {
state Future<RecruitStorageReply> fCandidateWorker;
state RecruitStorageRequest lastRequest;
@ -5490,7 +5490,7 @@ ACTOR Future<Void> serverGetTeamRequests(TeamCollectionInterface tci, DDTeamColl
}
}
ACTOR Future<Void> remoteRecovered(Reference<AsyncVar<struct ServerDBInfo>> db) {
ACTOR Future<Void> remoteRecovered(Reference<AsyncVar<ServerDBInfo> const> db) {
TraceEvent("DDTrackerStarting");
while (db->get().recoveryState < RecoveryState::ALL_LOGS_RECRUITED) {
TraceEvent("DDTrackerStarting").detail("RecoveryState", (int)db->get().recoveryState);
@ -5516,8 +5516,8 @@ ACTOR Future<Void> monitorHealthyTeams(DDTeamCollection* self) {
ACTOR Future<Void> dataDistributionTeamCollection(Reference<DDTeamCollection> teamCollection,
Reference<InitialDataDistribution> initData,
TeamCollectionInterface tci,
Reference<AsyncVar<struct ServerDBInfo>> db,
const DDEnabledState* ddEnabledState) {
Reference<AsyncVar<ServerDBInfo> const> db,
DDEnabledState const* ddEnabledState) {
state DDTeamCollection* self = teamCollection.getPtr();
state Future<Void> loggingTrigger = Void();
state PromiseStream<Void> serverRemoved;
@ -5744,16 +5744,16 @@ ACTOR Future<Void> pollMoveKeysLock(Database cx, MoveKeysLock lock, const DDEnab
}
struct DataDistributorData : NonCopyable, ReferenceCounted<DataDistributorData> {
Reference<AsyncVar<struct ServerDBInfo>> dbInfo;
Reference<AsyncVar<ServerDBInfo> const> dbInfo;
UID ddId;
PromiseStream<Future<Void>> addActor;
DDTeamCollection* teamCollection;
DataDistributorData(Reference<AsyncVar<ServerDBInfo>> const& db, UID id)
DataDistributorData(Reference<AsyncVar<ServerDBInfo> const> const& db, UID id)
: dbInfo(db), ddId(id), teamCollection(nullptr) {}
};
ACTOR Future<Void> monitorBatchLimitedTime(Reference<AsyncVar<ServerDBInfo>> db, double* lastLimited) {
ACTOR Future<Void> monitorBatchLimitedTime(Reference<AsyncVar<ServerDBInfo> const> db, double* lastLimited) {
loop {
wait(delay(SERVER_KNOBS->METRIC_UPDATE_RATE));
@ -6121,7 +6121,7 @@ static std::set<int> const& normalDataDistributorErrors() {
return s;
}
ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<AsyncVar<struct ServerDBInfo>> db) {
ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<AsyncVar<ServerDBInfo> const> db) {
state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, LockAware::True);
state ReadYourWritesTransaction tr(cx);
loop {
@ -6265,7 +6265,7 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
}
ACTOR Future<Void> ddSnapCreate(DistributorSnapRequest snapReq,
Reference<AsyncVar<struct ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
DDEnabledState* ddEnabledState) {
state Future<Void> dbInfoChange = db->onChange();
if (!ddEnabledState->setDDEnabled(false, snapReq.snapUID)) {
@ -6459,7 +6459,7 @@ ACTOR Future<Void> ddGetMetrics(GetDataDistributorMetricsRequest req,
return Void();
}
ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncVar<struct ServerDBInfo>> db) {
ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncVar<ServerDBInfo> const> db) {
state Reference<DataDistributorData> self(new DataDistributorData(db, di.id()));
state Future<Void> collection = actorCollection(self->addActor.getFuture());
state PromiseStream<GetMetricsListRequest> getShardMetricsList;

@ -222,7 +222,7 @@ struct GrvProxyData {
Reference<ILogSystem> logSystem;
Database cx;
Reference<AsyncVar<ServerDBInfo>> db;
Reference<AsyncVar<ServerDBInfo> const> db;
Optional<LatencyBandConfig> latencyBandConfig;
double lastStartCommit;
@ -251,7 +251,7 @@ struct GrvProxyData {
GrvProxyData(UID dbgid,
MasterInterface master,
RequestStream<GetReadVersionRequest> getConsistentReadVersion,
Reference<AsyncVar<ServerDBInfo>> db)
Reference<AsyncVar<ServerDBInfo> const> db)
: dbgid(dbgid), stats(dbgid), master(master), getConsistentReadVersion(getConsistentReadVersion),
cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::True)), db(db), lastStartCommit(0),
lastCommitLatency(SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION), updateCommitRequests(0), lastCommitTime(0),
@ -275,7 +275,7 @@ ACTOR Future<Void> healthMetricsRequestServer(GrvProxyInterface grvProxy,
// Get transaction rate info from RateKeeper.
ACTOR Future<Void> getRate(UID myID,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
int64_t* inTransactionCount,
int64_t* inBatchTransactionCount,
GrvTransactionRateInfo* transactionRateInfo,
@ -375,7 +375,7 @@ void dropRequestFromQueue(Deque<GetReadVersionRequest>* queue, GrvProxyStats* st
}
// Put a GetReadVersion request into the queue corresponding to its priority.
ACTOR Future<Void> queueGetReadVersionRequests(Reference<AsyncVar<ServerDBInfo>> db,
ACTOR Future<Void> queueGetReadVersionRequests(Reference<AsyncVar<ServerDBInfo> const> db,
SpannedDeque<GetReadVersionRequest>* systemQueue,
SpannedDeque<GetReadVersionRequest>* defaultQueue,
SpannedDeque<GetReadVersionRequest>* batchQueue,
@ -634,7 +634,7 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture,
return Void();
}
ACTOR Future<Void> monitorDDMetricsChanges(int64_t* midShardSize, Reference<AsyncVar<ServerDBInfo>> db) {
ACTOR Future<Void> monitorDDMetricsChanges(int64_t* midShardSize, Reference<AsyncVar<ServerDBInfo> const> db) {
state Future<Void> nextRequestTimer = Never();
state Future<GetDataDistributorMetricsReply> nextReply = Never();
@ -680,7 +680,7 @@ ACTOR Future<Void> monitorDDMetricsChanges(int64_t* midShardSize, Reference<Asyn
}
ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
PromiseStream<Future<Void>> addActor,
GrvProxyData* grvProxyData,
GetHealthMetricsReply* healthMetricsReply,
@ -898,7 +898,7 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
ACTOR Future<Void> grvProxyServerCore(GrvProxyInterface proxy,
MasterInterface master,
Reference<AsyncVar<ServerDBInfo>> db) {
Reference<AsyncVar<ServerDBInfo> const> db) {
state GrvProxyData grvProxyData(proxy.id(), master, proxy.getConsistentReadVersion, db);
state PromiseStream<Future<Void>> addActor;
@ -945,7 +945,7 @@ ACTOR Future<Void> grvProxyServerCore(GrvProxyInterface proxy,
}
}
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db,
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo> const> db,
uint64_t recoveryCount,
GrvProxyInterface myInterface) {
loop {
@ -959,7 +959,7 @@ ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db,
ACTOR Future<Void> grvProxyServer(GrvProxyInterface proxy,
InitializeGrvProxyRequest req,
Reference<AsyncVar<ServerDBInfo>> db) {
Reference<AsyncVar<ServerDBInfo> const> db) {
try {
state Future<Void> core = grvProxyServerCore(proxy, req.master, db);
wait(core || checkRemoved(db, req.recoveryCount, proxy));

@ -42,31 +42,31 @@ typedef uint32_t QueueID;
// Pager Events
enum class PagerEvents { CacheLookup = 0, CacheHit, CacheMiss, PageWrite, MAXEVENTS };
static const std::string PagerEventsCodes[] = { "Lookup", "Hit", "Miss", "Write" };
static const char* const PagerEventsStrings[] = { "Lookup", "Hit", "Miss", "Write", "Unknown" };
// Reasons for page level events.
enum class PagerEventReasons { PointRead = 0, RangeRead, RangePrefetch, Commit, LazyClear, MetaData, MAXEVENTREASONS };
static const std::string PagerEventReasonsCodes[] = { "Get", "GetR", "GetRPF", "Commit", "LazyClr", "Meta" };
static const char* const PagerEventReasonsStrings[] = {
"Get", "GetR", "GetRPF", "Commit", "LazyClr", "Meta", "Unknown"
};
static const int nonBtreeLevel = 0;
static const std::pair<PagerEvents, PagerEventReasons> possibleEventReasonPairs[] = {
static const std::vector<std::pair<PagerEvents, PagerEventReasons>> possibleEventReasonPairs = {
{ PagerEvents::CacheLookup, PagerEventReasons::Commit },
{ PagerEvents::CacheLookup, PagerEventReasons::LazyClear },
{ PagerEvents::CacheLookup, PagerEventReasons::PointRead },
{ PagerEvents::CacheLookup, PagerEventReasons::RangeRead },
{ PagerEvents::CacheLookup, PagerEventReasons::LazyClear },
{ PagerEvents::CacheLookup, PagerEventReasons::MetaData },
{ PagerEvents::CacheHit, PagerEventReasons::Commit },
{ PagerEvents::CacheHit, PagerEventReasons::LazyClear },
{ PagerEvents::CacheHit, PagerEventReasons::PointRead },
{ PagerEvents::CacheHit, PagerEventReasons::RangeRead },
{ PagerEvents::CacheHit, PagerEventReasons::LazyClear },
{ PagerEvents::CacheHit, PagerEventReasons::MetaData },
{ PagerEvents::CacheHit, PagerEventReasons::Commit },
{ PagerEvents::CacheMiss, PagerEventReasons::Commit },
{ PagerEvents::CacheMiss, PagerEventReasons::LazyClear },
{ PagerEvents::CacheMiss, PagerEventReasons::PointRead },
{ PagerEvents::CacheMiss, PagerEventReasons::RangeRead },
{ PagerEvents::CacheMiss, PagerEventReasons::LazyClear },
{ PagerEvents::CacheMiss, PagerEventReasons::MetaData },
{ PagerEvents::CacheMiss, PagerEventReasons::Commit },
{ PagerEvents::PageWrite, PagerEventReasons::MetaData },
{ PagerEvents::PageWrite, PagerEventReasons::Commit },
{ PagerEvents::PageWrite, PagerEventReasons::LazyClear },
};
static const std::pair<PagerEvents, PagerEventReasons> L0PossibleEventReasonPairs[] = {
static const std::vector<std::pair<PagerEvents, PagerEventReasons>> L0PossibleEventReasonPairs = {
{ PagerEvents::CacheLookup, PagerEventReasons::RangePrefetch },
{ PagerEvents::CacheLookup, PagerEventReasons::MetaData },
{ PagerEvents::CacheHit, PagerEventReasons::RangePrefetch },
@ -167,6 +167,7 @@ public:
virtual Future<Reference<const ArenaPage>> getPhysicalPage(PagerEventReasons reason,
unsigned int level,
LogicalPageID pageID,
int priority,
bool cacheable,
bool nohit) = 0;
virtual bool tryEvictPage(LogicalPageID id) = 0;
@ -238,8 +239,9 @@ public:
virtual Future<Reference<ArenaPage>> readPage(PagerEventReasons reason,
unsigned int level,
LogicalPageID pageID,
bool cacheable = true,
bool noHit = false) = 0;
int priority,
bool cacheable,
bool noHit) = 0;
virtual Future<Reference<ArenaPage>> readExtent(LogicalPageID pageID) = 0;
virtual void releaseExtentReadLock() = 0;

@ -7,6 +7,7 @@
#include <rocksdb/slice_transform.h>
#include <rocksdb/table.h>
#include <rocksdb/utilities/table_properties_collectors.h>
#include "fdbserver/CoroFlow.h"
#include "flow/flow.h"
#include "flow/IThreadPool.h"
@ -283,7 +284,9 @@ struct RocksDBKeyValueStore : IKeyValueStore {
std::min(value.size(), size_t(a.maxLength)))));
} else {
if (!s.IsNotFound()) {
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "ReadValuePrefix");
TraceEvent(SevError, "RocksDBError")
.detail("Error", s.ToString())
.detail("Method", "ReadValuePrefix");
}
a.result.send(Optional<Value>());
}
@ -367,8 +370,23 @@ struct RocksDBKeyValueStore : IKeyValueStore {
std::unique_ptr<rocksdb::WriteBatch> writeBatch;
explicit RocksDBKeyValueStore(const std::string& path, UID id) : path(path), id(id) {
writeThread = createGenericThreadPool();
readThreads = createGenericThreadPool();
// In simluation, run the reader/writer threads as Coro threads (i.e. in the network thread. The storage engine
// is still multi-threaded as background compaction threads are still present. Reads/writes to disk will also
// block the network thread in a way that would be unacceptable in production but is a necessary evil here. When
// performing the reads in background threads in simulation, the event loop thinks there is no work to do and
// advances time faster than 1 sec/sec. By the time the blocking read actually finishes, simulation has advanced
// time by more than 5 seconds, so every read fails with a transaction_too_old error. Doing blocking IO on the
// main thread solves this issue. There are almost certainly better fixes, but my goal was to get a less
// invasive change merged first and work on a more realistic version if/when we think that would provide
// substantially more confidence in the correctness.
// TODO: Adapt the simulation framework to not advance time quickly when background reads/writes are occurring.
if (g_network->isSimulated()) {
writeThread = CoroThreadPool::createThreadPool();
readThreads = CoroThreadPool::createThreadPool();
} else {
writeThread = createGenericThreadPool();
readThreads = createGenericThreadPool();
}
writeThread->addThread(new Writer(db, id), "fdb-rocksdb-wr");
for (unsigned i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; ++i) {
readThreads->addThread(new Reader(db), "fdb-rocksdb-re");

@ -309,9 +309,8 @@ class LocalConfigurationImpl {
}
}
ACTOR static Future<Void> consume(
LocalConfigurationImpl* self,
Reference<IDependentAsyncVar<ConfigBroadcastFollowerInterface> const> broadcaster) {
ACTOR static Future<Void> consume(LocalConfigurationImpl* self,
Reference<IAsyncListener<ConfigBroadcastFollowerInterface> const> broadcaster) {
ASSERT(self->initFuture.isValid() && self->initFuture.isReady());
loop {
choose {
@ -371,7 +370,7 @@ public:
return getKnobs().getTestKnobs();
}
Future<Void> consume(Reference<IDependentAsyncVar<ConfigBroadcastFollowerInterface> const> const& broadcaster) {
Future<Void> consume(Reference<IAsyncListener<ConfigBroadcastFollowerInterface> const> const& broadcaster) {
return consume(this, broadcaster);
}
@ -453,7 +452,7 @@ TestKnobs const& LocalConfiguration::getTestKnobs() const {
}
Future<Void> LocalConfiguration::consume(
Reference<IDependentAsyncVar<ConfigBroadcastFollowerInterface> const> const& broadcaster) {
Reference<IAsyncListener<ConfigBroadcastFollowerInterface> const> const& broadcaster) {
return impl().consume(broadcaster);
}

@ -60,7 +60,7 @@ public:
ClientKnobs const& getClientKnobs() const;
ServerKnobs const& getServerKnobs() const;
TestKnobs const& getTestKnobs() const;
Future<Void> consume(Reference<IDependentAsyncVar<ConfigBroadcastFollowerInterface> const> const& broadcaster);
Future<Void> consume(Reference<IAsyncListener<ConfigBroadcastFollowerInterface> const> const& broadcaster);
UID getID() const;
public: // Testing

@ -625,7 +625,7 @@ ACTOR Future<Void> logRouterPop(LogRouterData* self, TLogPopRequest req) {
ACTOR Future<Void> logRouterCore(TLogInterface interf,
InitializeLogRouterRequest req,
Reference<AsyncVar<ServerDBInfo>> db) {
Reference<AsyncVar<ServerDBInfo> const> db) {
state LogRouterData logRouterData(interf.id(), req);
state PromiseStream<Future<Void>> addActor;
state Future<Void> error = actorCollection(addActor.getFuture());
@ -653,7 +653,7 @@ ACTOR Future<Void> logRouterCore(TLogInterface interf,
}
}
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db,
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo> const> db,
uint64_t recoveryCount,
TLogInterface myInterface) {
loop {
@ -670,7 +670,7 @@ ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db,
ACTOR Future<Void> logRouter(TLogInterface interf,
InitializeLogRouterRequest req,
Reference<AsyncVar<ServerDBInfo>> db) {
Reference<AsyncVar<ServerDBInfo> const> db) {
try {
TraceEvent("LogRouterStart", interf.id())
.detail("Start", req.startVersion)

@ -291,7 +291,7 @@ struct TLogData : NonCopyable {
AsyncVar<bool>
largeDiskQueueCommitBytes; // becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES
Reference<AsyncVar<ServerDBInfo>> dbInfo;
Reference<AsyncVar<ServerDBInfo> const> dbInfo;
NotifiedVersion queueCommitEnd;
Version queueCommitBegin;
@ -321,7 +321,7 @@ struct TLogData : NonCopyable {
UID workerID,
IKeyValueStore* persistentData,
IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> const& dbInfo)
Reference<AsyncVar<ServerDBInfo> const> const& dbInfo)
: dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()),
persistentData(persistentData), rawPersistentQueue(persistentQueue),
persistentQueue(new TLogQueue(persistentQueue, dbgid)), dbInfo(dbInfo), queueCommitBegin(0), queueCommitEnd(0),
@ -1568,7 +1568,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self, LocalityData locality)
ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
LocalityData locality,
UID tlogId,
UID workerID) {

@ -264,7 +264,7 @@ struct TLogData : NonCopyable {
AsyncVar<bool>
largeDiskQueueCommitBytes; // becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES
Reference<AsyncVar<ServerDBInfo>> dbInfo;
Reference<AsyncVar<ServerDBInfo> const> dbInfo;
Database cx;
NotifiedVersion queueCommitEnd;
@ -301,7 +301,7 @@ struct TLogData : NonCopyable {
UID workerID,
IKeyValueStore* persistentData,
IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
Reference<AsyncVar<bool>> degraded,
std::string folder)
: dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()),
@ -2716,7 +2716,7 @@ ACTOR Future<Void> startSpillingInTenSeconds(TLogData* self, UID tlogId, Referen
// New tLog (if !recoverFrom.size()) or restore from network
ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests,
UID tlogId,

@ -327,7 +327,7 @@ struct TLogData : NonCopyable {
AsyncVar<bool>
largeDiskQueueCommitBytes; // becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES
Reference<AsyncVar<ServerDBInfo>> dbInfo;
Reference<AsyncVar<ServerDBInfo> const> dbInfo;
Database cx;
NotifiedVersion queueCommitEnd;
@ -364,7 +364,7 @@ struct TLogData : NonCopyable {
UID workerID,
IKeyValueStore* persistentData,
IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
Reference<AsyncVar<bool>> degraded,
std::string folder)
: dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()),
@ -3205,7 +3205,7 @@ ACTOR Future<Void> startSpillingInTenSeconds(TLogData* self, UID tlogId, Referen
// New tLog (if !recoverFrom.size()) or restore from network
ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests,
UID tlogId,

@ -161,7 +161,7 @@ struct ProxyCommitData {
RequestStream<GetReadVersionRequest> getConsistentReadVersion;
RequestStream<CommitTransactionRequest> commit;
Database cx;
Reference<AsyncVar<ServerDBInfo>> db;
Reference<AsyncVar<ServerDBInfo> const> db;
EventMetricHandle<SingleKeyMutation> singleKeyMutationEvent;
std::map<UID, Reference<StorageInfo>> storageCache;
@ -239,7 +239,7 @@ struct ProxyCommitData {
RequestStream<GetReadVersionRequest> getConsistentReadVersion,
Version recoveryTransactionVersion,
RequestStream<CommitTransactionRequest> commit,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
bool firstProxy)
: dbgid(dbgid), stats(dbgid, &version, &committedVersion, &commitBatchesMemBytesCount), master(master),
logAdapter(nullptr), txnStateStore(nullptr), popRemoteTxs(false), committedVersion(recoveryTransactionVersion),

@ -35,7 +35,7 @@
#include <boost/lexical_cast.hpp>
#include "flow/actorcompiler.h" // This must be the last #include.
ACTOR Future<vector<WorkerDetails>> getWorkers(Reference<AsyncVar<ServerDBInfo>> dbInfo, int flags = 0) {
ACTOR Future<vector<WorkerDetails>> getWorkers(Reference<AsyncVar<ServerDBInfo> const> dbInfo, int flags = 0) {
loop {
choose {
when(vector<WorkerDetails> w = wait(brokenPromiseToNever(
@ -48,7 +48,7 @@ ACTOR Future<vector<WorkerDetails>> getWorkers(Reference<AsyncVar<ServerDBInfo>>
}
// Gets the WorkerInterface representing the Master server.
ACTOR Future<WorkerInterface> getMasterWorker(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
ACTOR Future<WorkerInterface> getMasterWorker(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
TraceEvent("GetMasterWorker").detail("Stage", "GettingWorkers");
loop {
@ -75,7 +75,7 @@ ACTOR Future<WorkerInterface> getMasterWorker(Database cx, Reference<AsyncVar<Se
}
// Gets the WorkerInterface representing the data distributor.
ACTOR Future<WorkerInterface> getDataDistributorWorker(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
ACTOR Future<WorkerInterface> getDataDistributorWorker(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
TraceEvent("GetDataDistributorWorker").detail("Stage", "GettingWorkers");
loop {
@ -118,7 +118,7 @@ ACTOR Future<int64_t> getDataInFlight(Database cx, WorkerInterface distributorWo
}
// Gets the number of bytes in flight from the data distributor.
ACTOR Future<int64_t> getDataInFlight(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
ACTOR Future<int64_t> getDataInFlight(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
WorkerInterface distributorInterf = wait(getDataDistributorWorker(cx, dbInfo));
int64_t dataInFlight = wait(getDataInFlight(cx, distributorInterf));
return dataInFlight;
@ -144,7 +144,7 @@ int64_t getPoppedVersionLag(const TraceEventFields& md) {
return persistentDataDurableVersion - queuePoppedVersion;
}
ACTOR Future<vector<WorkerInterface>> getCoordWorkers(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
ACTOR Future<vector<WorkerInterface>> getCoordWorkers(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
state std::vector<WorkerDetails> workers = wait(getWorkers(dbInfo));
Optional<Value> coordinators =
@ -177,7 +177,8 @@ ACTOR Future<vector<WorkerInterface>> getCoordWorkers(Database cx, Reference<Asy
}
// This is not robust in the face of a TLog failure
ACTOR Future<std::pair<int64_t, int64_t>> getTLogQueueInfo(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
ACTOR Future<std::pair<int64_t, int64_t>> getTLogQueueInfo(Database cx,
Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
TraceEvent("MaxTLogQueueSize").detail("Stage", "ContactingLogs");
state std::vector<WorkerDetails> workers = wait(getWorkers(dbInfo));
@ -245,7 +246,7 @@ ACTOR Future<vector<StorageServerInterface>> getStorageServers(Database cx, bool
}
ACTOR Future<vector<WorkerInterface>> getStorageWorkers(Database cx,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
bool localOnly) {
state std::vector<StorageServerInterface> servers = wait(getStorageServers(cx));
state std::map<NetworkAddress, WorkerInterface> workersMap;
@ -335,7 +336,7 @@ ACTOR Future<TraceEventFields> getStorageMetricsTimeout(UID storage, WorkerInter
};
// Gets the maximum size of all the storage server queues
ACTOR Future<int64_t> getMaxStorageServerQueueSize(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
ACTOR Future<int64_t> getMaxStorageServerQueueSize(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
TraceEvent("MaxStorageServerQueueSize").detail("Stage", "ContactingStorageServers");
Future<std::vector<StorageServerInterface>> serversFuture = getStorageServers(cx);
@ -399,7 +400,7 @@ ACTOR Future<int64_t> getDataDistributionQueueSize(Database cx,
// Gets the size of the data distribution queue. If reportInFlight is true, then data in flight is considered part of
// the queue Convenience method that first finds the master worker from a zookeeper interface
ACTOR Future<int64_t> getDataDistributionQueueSize(Database cx,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
bool reportInFlight) {
WorkerInterface distributorInterf = wait(getDataDistributorWorker(cx, dbInfo));
int64_t inQueue = wait(getDataDistributionQueueSize(cx, distributorInterf, reportInFlight));
@ -516,7 +517,7 @@ ACTOR Future<bool> getTeamCollectionValid(Database cx, WorkerInterface dataDistr
// Gets if the number of process and machine teams does not exceed the maximum allowed number of teams
// Convenience method that first finds the master worker from a zookeeper interface
ACTOR Future<bool> getTeamCollectionValid(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
ACTOR Future<bool> getTeamCollectionValid(Database cx, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
WorkerInterface dataDistributorWorker = wait(getDataDistributorWorker(cx, dbInfo));
bool valid = wait(getTeamCollectionValid(cx, dataDistributorWorker));
return valid;
@ -565,7 +566,9 @@ ACTOR Future<bool> getStorageServersRecruiting(Database cx, WorkerInterface dist
}
}
ACTOR Future<Void> repairDeadDatacenter(Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo, std::string context) {
ACTOR Future<Void> repairDeadDatacenter(Database cx,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
std::string context) {
if (g_network->isSimulated() && g_simulator.usableRegions > 1) {
bool primaryDead = g_simulator.datacenterDead(g_simulator.primaryDcId);
bool remoteDead = g_simulator.datacenterDead(g_simulator.remoteDcId);
@ -601,7 +604,7 @@ ACTOR Future<Void> repairDeadDatacenter(Database cx, Reference<AsyncVar<ServerDB
ACTOR Future<Void> reconfigureAfter(Database cx,
double time,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
std::string context) {
wait(delay(time));
wait(repairDeadDatacenter(cx, dbInfo, context));
@ -611,7 +614,7 @@ ACTOR Future<Void> reconfigureAfter(Database cx,
// Waits until a database quiets down (no data in flight, small tlog queue, low SQ, no active data distribution). This
// requires the database to be available and healthy in order to succeed.
ACTOR Future<Void> waitForQuietDatabase(Database cx,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
std::string phase,
int64_t dataInFlightGate = 2e6,
int64_t maxTLogQueueGate = 5e6,
@ -747,7 +750,7 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
}
Future<Void> quietDatabase(Database const& cx,
Reference<AsyncVar<ServerDBInfo>> const& dbInfo,
Reference<AsyncVar<ServerDBInfo> const> const& dbInfo,
std::string phase,
int64_t dataInFlightGate,
int64_t maxTLogQueueGate,

@ -28,25 +28,26 @@
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/actorcompiler.h"
Future<int64_t> getDataInFlight(Database const& cx, Reference<AsyncVar<struct ServerDBInfo>> const&);
Future<int64_t> getDataInFlight(Database const& cx, Reference<AsyncVar<struct ServerDBInfo> const> const&);
Future<std::pair<int64_t, int64_t>> getTLogQueueInfo(Database const& cx,
Reference<AsyncVar<struct ServerDBInfo>> const&);
Future<int64_t> getMaxStorageServerQueueSize(Database const& cx, Reference<AsyncVar<struct ServerDBInfo>> const&);
Reference<AsyncVar<struct ServerDBInfo> const> const&);
Future<int64_t> getMaxStorageServerQueueSize(Database const& cx, Reference<AsyncVar<struct ServerDBInfo> const> const&);
Future<int64_t> getDataDistributionQueueSize(Database const& cx,
Reference<AsyncVar<struct ServerDBInfo>> const&,
Reference<AsyncVar<struct ServerDBInfo> const> const&,
bool const& reportInFlight);
Future<bool> getTeamCollectionValid(Database const& cx, WorkerInterface const&);
Future<bool> getTeamCollectionValid(Database const& cx, Reference<AsyncVar<struct ServerDBInfo>> const&);
Future<bool> getTeamCollectionValid(Database const& cx, Reference<AsyncVar<struct ServerDBInfo> const> const&);
Future<vector<StorageServerInterface>> getStorageServers(Database const& cx, bool const& use_system_priority = false);
Future<vector<WorkerDetails>> getWorkers(Reference<AsyncVar<ServerDBInfo>> const& dbInfo, int const& flags = 0);
Future<WorkerInterface> getMasterWorker(Database const& cx, Reference<AsyncVar<ServerDBInfo>> const& dbInfo);
Future<vector<WorkerDetails>> getWorkers(Reference<AsyncVar<ServerDBInfo> const> const& dbInfo, int const& flags = 0);
Future<WorkerInterface> getMasterWorker(Database const& cx, Reference<AsyncVar<ServerDBInfo> const> const& dbInfo);
Future<Void> repairDeadDatacenter(Database const& cx,
Reference<AsyncVar<ServerDBInfo>> const& dbInfo,
Reference<AsyncVar<ServerDBInfo> const> const& dbInfo,
std::string const& context);
Future<vector<WorkerInterface>> getStorageWorkers(Database const& cx,
Reference<AsyncVar<ServerDBInfo>> const& dbInfo,
Reference<AsyncVar<ServerDBInfo> const> const& dbInfo,
bool const& localOnly);
Future<vector<WorkerInterface>> getCoordWorkers(Database const& cx, Reference<AsyncVar<ServerDBInfo>> const& dbInfo);
Future<vector<WorkerInterface>> getCoordWorkers(Database const& cx,
Reference<AsyncVar<ServerDBInfo> const> const& dbInfo);
#include "flow/unactorcompiler.h"
#endif

@ -1408,7 +1408,7 @@ ACTOR Future<Void> configurationMonitor(RatekeeperData* self) {
}
}
ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo>> dbInfo) {
ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
state RatekeeperData self(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True));
state Future<Void> timeout = Void();
state std::vector<Future<Void>> tlogTrackers;

@ -354,7 +354,7 @@ ACTOR Future<Void> resolverCore(ResolverInterface resolver, InitializeResolverRe
}
}
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db,
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo> const> db,
uint64_t recoveryCount,
ResolverInterface myInterface) {
loop {
@ -367,7 +367,7 @@ ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db,
ACTOR Future<Void> resolver(ResolverInterface resolver,
InitializeResolverRequest initReq,
Reference<AsyncVar<ServerDBInfo>> db) {
Reference<AsyncVar<ServerDBInfo> const> db) {
try {
state Future<Void> core = resolverCore(resolver, initReq);
loop choose {

@ -22,6 +22,7 @@
#include <fstream>
#include <ostream>
#include <sstream>
#include <string_view>
#include <toml.hpp>
#include "fdbrpc/Locality.h"
#include "fdbrpc/simulator.h"
@ -50,6 +51,19 @@ extern const char* getSourceVersion();
using namespace std::literals;
// TODO: Defining these here is just asking for ODR violations.
template <>
std::string describe(bool const& val) {
return val ? "true" : "false";
}
template <>
std::string describe(int const& val) {
return format("%d", val);
}
namespace {
const int MACHINE_REBOOT_TIME = 10;
bool destructed = false;
@ -232,6 +246,7 @@ public:
// 1 = "memory"
// 2 = "memory-radixtree-beta"
// 3 = "ssd-redwood-experimental"
// 4 = "ssd-rocksdb-experimental"
// Requires a comma-separated list of numbers WITHOUT whitespaces
std::vector<int> storageEngineExcludeTypes;
// Set the maximum TLog version that can be selected for a test
@ -629,16 +644,6 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(Reference<ClusterConnec
}
}
template <>
std::string describe(bool const& val) {
return val ? "true" : "false";
}
template <>
std::string describe(int const& val) {
return format("%d", val);
}
// Since a datacenter kill is considered to be the same as killing a machine, files cannot be swapped across datacenters
std::map<Optional<Standalone<StringRef>>, std::vector<std::vector<std::string>>> availableFolders;
// process count is no longer needed because it is now the length of the vector of ip's, because it was one ip per
@ -1252,7 +1257,7 @@ void SimulationConfig::setDatacenters(const TestConfig& testConfig) {
// Sets storage engine based on testConfig details
void SimulationConfig::setStorageEngine(const TestConfig& testConfig) {
int storage_engine_type = deterministicRandom()->randomInt(0, 4);
int storage_engine_type = deterministicRandom()->randomInt(0, 5);
if (testConfig.storageEngineType.present()) {
storage_engine_type = testConfig.storageEngineType.get();
} else {
@ -1260,7 +1265,7 @@ void SimulationConfig::setStorageEngine(const TestConfig& testConfig) {
while (std::find(testConfig.storageEngineExcludeTypes.begin(),
testConfig.storageEngineExcludeTypes.end(),
storage_engine_type) != testConfig.storageEngineExcludeTypes.end()) {
storage_engine_type = deterministicRandom()->randomInt(0, 4);
storage_engine_type = deterministicRandom()->randomInt(0, 5);
}
}
@ -1285,6 +1290,16 @@ void SimulationConfig::setStorageEngine(const TestConfig& testConfig) {
set_config("ssd-redwood-experimental");
break;
}
case 4: {
TEST(true); // Simulated cluster using RocksDB storage engine
set_config("ssd-rocksdb-experimental");
// Tests using the RocksDB engine are necessarily non-deterministic because of RocksDB
// background threads.
TraceEvent(SevWarn, "RocksDBNonDeterminism")
.detail("Explanation", "The RocksDB storage engine is threaded and non-deterministic");
noUnseed = true;
break;
}
default:
ASSERT(false); // Programmer forgot to adjust cases.
}
@ -2081,9 +2096,17 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors,
using namespace std::literals;
#ifdef SSD_ROCKSDB_EXPERIMENTAL
bool rocksDBEnabled = true;
#else
bool rocksDBEnabled = false;
#endif
// Populates the TestConfig fields according to what is found in the test file.
void checkTestConf(const char* testFile, TestConfig* testConfig) {}
} // namespace
ACTOR void setupAndRun(std::string dataFolder,
const char* testFile,
bool rebooting,
@ -2098,6 +2121,19 @@ ACTOR void setupAndRun(std::string dataFolder,
g_simulator.hasDiffProtocolProcess = testConfig.startIncompatibleProcess;
g_simulator.setDiffProtocol = false;
// The RocksDB storage engine does not support the restarting tests because you cannot consistently get a clean
// snapshot of the storage engine without a snapshotting file system.
// https://github.com/apple/foundationdb/issues/5155
if (std::string_view(testFile).find("restarting") != std::string_view::npos) {
testConfig.storageEngineExcludeTypes.push_back(4);
}
// The RocksDB engine is not always built with the rest of fdbserver. Don't try to use it if it is not included
// in the build.
if (!rocksDBEnabled) {
testConfig.storageEngineExcludeTypes.push_back(4);
}
state ProtocolVersion protocolVersion = currentProtocolVersion;
if (testConfig.startIncompatibleProcess) {
// isolates right most 1 bit of compatibleProtocolVersionMask to make this protocolVersion incompatible

@ -162,7 +162,7 @@ public:
ProtocolVersion logProtocol;
Reference<ILogSystem> logSystem;
Key ck; // cacheKey
Reference<AsyncVar<ServerDBInfo>> const& db;
Reference<AsyncVar<ServerDBInfo> const> db;
Database cx;
StorageCacheUpdater* updater;
@ -238,7 +238,7 @@ public:
}
} counters;
explicit StorageCacheData(UID thisServerID, uint16_t index, Reference<AsyncVar<ServerDBInfo>> const& db)
explicit StorageCacheData(UID thisServerID, uint16_t index, Reference<AsyncVar<ServerDBInfo> const> const& db)
: /*versionedData(FastAllocPTree<KeyRef>{std::make_shared<int>(0)}), */
thisServerID(thisServerID), index(index), logProtocol(0), db(db), cacheRangeChangeCounter(0),
lastTLogVersion(0), lastVersionWithData(0), peekVersion(0), compactionInProgress(Void()),
@ -2165,7 +2165,9 @@ ACTOR Future<Void> watchInterface(StorageCacheData* self, StorageServerInterface
}
}
ACTOR Future<Void> storageCacheServer(StorageServerInterface ssi, uint16_t id, Reference<AsyncVar<ServerDBInfo>> db) {
ACTOR Future<Void> storageCacheServer(StorageServerInterface ssi,
uint16_t id,
Reference<AsyncVar<ServerDBInfo> const> db) {
state StorageCacheData self(ssi.id(), id, db);
state ActorCollection actors(false);
state Future<Void> dbInfoChange = Void();

@ -329,7 +329,7 @@ struct TLogData : NonCopyable {
AsyncVar<bool>
largeDiskQueueCommitBytes; // becomes true when diskQueueCommitBytes is greater than MAX_QUEUE_COMMIT_BYTES
Reference<AsyncVar<ServerDBInfo>> dbInfo;
Reference<AsyncVar<ServerDBInfo> const> dbInfo;
Database cx;
NotifiedVersion queueCommitEnd;
@ -372,7 +372,7 @@ struct TLogData : NonCopyable {
UID workerID,
IKeyValueStore* persistentData,
IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
Reference<AsyncVar<bool>> degraded,
std::string folder)
: dbgid(dbgid), workerID(workerID), instanceID(deterministicRandom()->randomUniqueID().first()),
@ -3280,7 +3280,7 @@ ACTOR Future<Void> startSpillingInTenSeconds(TLogData* self, UID tlogId, Referen
// New tLog (if !recoverFrom.size()) or restore from network
ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests,
UID tlogId,

File diff suppressed because it is too large Load Diff

@ -831,7 +831,7 @@ ACTOR Future<Void> traceRole(Role role, UID roleId);
struct ServerDBInfo;
class Database openDBOnServer(Reference<AsyncVar<ServerDBInfo>> const& db,
class Database openDBOnServer(Reference<AsyncVar<ServerDBInfo> const> const& db,
TaskPriority taskID = TaskPriority::DefaultEndpoint,
LockAware = LockAware::False,
EnableLocalityLoadBalance = EnableLocalityLoadBalance::True);
@ -868,32 +868,32 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
Tag seedTag,
Version tssSeedVersion,
ReplyPromise<InitializeStorageReply> recruitReply,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder);
ACTOR Future<Void> storageServer(
IKeyValueStore* persistentData,
StorageServerInterface ssi,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder,
Promise<Void> recovered,
Reference<ClusterConnectionFile>
connFile); // changes pssi->id() to be the recovered ID); // changes pssi->id() to be the recovered ID
ACTOR Future<Void> masterServer(MasterInterface mi,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
ServerCoordinators serverCoordinators,
LifetimeToken lifetime,
bool forceRecovery);
ACTOR Future<Void> commitProxyServer(CommitProxyInterface proxy,
InitializeCommitProxyRequest req,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
std::string whitelistBinPaths);
ACTOR Future<Void> grvProxyServer(GrvProxyInterface proxy,
InitializeGrvProxyRequest req,
Reference<AsyncVar<ServerDBInfo>> db);
Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests,
UID tlogId,
@ -906,14 +906,18 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
Reference<AsyncVar<UID>> activeSharedTLog);
ACTOR Future<Void> resolver(ResolverInterface resolver,
InitializeResolverRequest initReq,
Reference<AsyncVar<ServerDBInfo>> db);
Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> logRouter(TLogInterface interf,
InitializeLogRouterRequest req,
Reference<AsyncVar<ServerDBInfo>> db);
ACTOR Future<Void> dataDistributor(DataDistributorInterface ddi, Reference<AsyncVar<ServerDBInfo>> db);
ACTOR Future<Void> ratekeeper(RatekeeperInterface rki, Reference<AsyncVar<ServerDBInfo>> db);
ACTOR Future<Void> storageCacheServer(StorageServerInterface interf, uint16_t id, Reference<AsyncVar<ServerDBInfo>> db);
ACTOR Future<Void> backupWorker(BackupInterface bi, InitializeBackupRequest req, Reference<AsyncVar<ServerDBInfo>> db);
Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> dataDistributor(DataDistributorInterface ddi, Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> ratekeeper(RatekeeperInterface rki, Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> storageCacheServer(StorageServerInterface interf,
uint16_t id,
Reference<AsyncVar<ServerDBInfo> const> db);
ACTOR Future<Void> backupWorker(BackupInterface bi,
InitializeBackupRequest req,
Reference<AsyncVar<ServerDBInfo> const> db);
void registerThreadForProfiling();
void updateCpuProfiler(ProfilerRequest req);
@ -921,7 +925,7 @@ void updateCpuProfiler(ProfilerRequest req);
namespace oldTLog_4_6 {
ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
LocalityData locality,
UID tlogId,
UID workerID);
@ -929,7 +933,7 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
namespace oldTLog_6_0 {
ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests,
UID tlogId,
@ -944,7 +948,7 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
namespace oldTLog_6_2 {
ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
IDiskQueue* persistentQueue,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
LocalityData locality,
PromiseStream<InitializeTLogRequest> tlogRequests,
UID tlogId,

@ -1815,18 +1815,23 @@ int main(int argc, char* argv[]) {
auto dataFolder = opts.dataFolder.size() ? opts.dataFolder : "simfdb";
std::vector<std::string> directories = platform::listDirectories(dataFolder);
for (int i = 0; i < directories.size(); i++)
if (directories[i].size() != 32 && directories[i] != "." && directories[i] != ".." &&
directories[i] != "backups" && directories[i].find("snap") == std::string::npos) {
const std::set<std::string> allowedDirectories = { ".", "..", "backups", "unittests" };
for (const auto& dir : directories) {
if (dir.size() != 32 && allowedDirectories.count(dir) == 0 && dir.find("snap") == std::string::npos) {
TraceEvent(SevError, "IncompatibleDirectoryFound")
.detail("DataFolder", dataFolder)
.detail("SuspiciousFile", directories[i]);
.detail("SuspiciousFile", dir);
fprintf(stderr,
"ERROR: Data folder `%s' had non fdb file `%s'; please use clean, fdb-only folder\n",
dataFolder.c_str(),
directories[i].c_str());
dir.c_str());
flushAndExit(FDB_EXIT_ERROR);
}
}
std::vector<std::string> files = platform::listFiles(dataFolder);
if ((files.size() > 1 || (files.size() == 1 && files[0] != "restartInfo.ini")) && !opts.restarting) {
TraceEvent(SevError, "IncompatibleFileFound").detail("DataFolder", dataFolder);

@ -228,7 +228,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
ReusableCoordinatedState cstate;
Promise<Void> cstateUpdated;
Reference<AsyncVar<ServerDBInfo>> dbInfo;
Reference<AsyncVar<ServerDBInfo> const> dbInfo;
int64_t registrationCount; // Number of different MasterRegistrationRequests sent to clusterController
RecoveryState recoveryState;
@ -255,7 +255,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
Future<Void> logger;
MasterData(Reference<AsyncVar<ServerDBInfo>> const& dbInfo,
MasterData(Reference<AsyncVar<ServerDBInfo> const> const& dbInfo,
MasterInterface const& myInterface,
ServerCoordinators const& coordinators,
ClusterControllerFullInterface const& clusterController,
@ -1978,7 +1978,7 @@ ACTOR Future<Void> masterCore(Reference<MasterData> self) {
}
ACTOR Future<Void> masterServer(MasterInterface mi,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
ServerCoordinators coordinators,
LifetimeToken lifetime,

@ -614,7 +614,7 @@ public:
bool tssInQuarantine;
Key sk;
Reference<AsyncVar<ServerDBInfo>> db;
Reference<AsyncVar<ServerDBInfo> const> db;
Database cx;
ActorCollection actors;
@ -806,7 +806,7 @@ public:
} counters;
StorageServer(IKeyValueStore* storage,
Reference<AsyncVar<ServerDBInfo>> const& db,
Reference<AsyncVar<ServerDBInfo> const> const& db,
StorageServerInterface const& ssi)
: fetchKeysHistograms(), instanceID(deterministicRandom()->randomUniqueID().first()), storage(this, storage),
db(db), actors(false), lastTLogVersion(0), lastVersionWithData(0), restoredVersion(0),
@ -5134,7 +5134,7 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
Tag seedTag,
Version tssSeedVersion,
ReplyPromise<InitializeStorageReply> recruitReply,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder) {
state StorageServer self(persistentData, db, ssi);
if (ssi.isTss()) {
@ -5328,7 +5328,7 @@ ACTOR Future<Void> replaceTSSInterface(StorageServer* self, StorageServerInterfa
// for recovering an existing storage server
ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
StorageServerInterface ssi,
Reference<AsyncVar<ServerDBInfo>> db,
Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder,
Promise<Void> recovered,
Reference<ClusterConnectionFile> connFile) {

@ -122,7 +122,7 @@ ACTOR Future<std::vector<Endpoint>> broadcastDBInfoRequest(UpdateServerDBInfoReq
return notUpdated;
}
ACTOR static Future<Void> extractClientInfo(Reference<AsyncVar<ServerDBInfo>> db,
ACTOR static Future<Void> extractClientInfo(Reference<AsyncVar<ServerDBInfo> const> db,
Reference<AsyncVar<ClientDBInfo>> info) {
state std::vector<UID> lastCommitProxyUIDs;
state std::vector<CommitProxyInterface> lastCommitProxies;
@ -136,7 +136,7 @@ ACTOR static Future<Void> extractClientInfo(Reference<AsyncVar<ServerDBInfo>> db
}
}
Database openDBOnServer(Reference<AsyncVar<ServerDBInfo>> const& db,
Database openDBOnServer(Reference<AsyncVar<ServerDBInfo> const> const& db,
TaskPriority taskID,
LockAware lockAware,
EnableLocalityLoadBalance enableLocalityLoadBalance) {
@ -502,15 +502,15 @@ std::vector<DiskStore> getDiskStores(std::string folder) {
// Register the worker interf to cluster controller (cc) and
// re-register the worker when key roles interface, e.g., cc, dd, ratekeeper, change.
ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
ACTOR Future<Void> registrationClient(Reference<AsyncVar<Optional<ClusterControllerFullInterface>> const> ccInterface,
WorkerInterface interf,
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
ProcessClass initialClass,
Reference<AsyncVar<Optional<DataDistributorInterface>>> ddInterf,
Reference<AsyncVar<Optional<RatekeeperInterface>>> rkInterf,
Reference<AsyncVar<bool>> degraded,
Reference<AsyncVar<Optional<DataDistributorInterface>> const> ddInterf,
Reference<AsyncVar<Optional<RatekeeperInterface>> const> rkInterf,
Reference<AsyncVar<bool> const> degraded,
Reference<ClusterConnectionFile> connFile,
Reference<AsyncVar<std::set<std::string>>> issues) {
Reference<AsyncVar<std::set<std::string>> const> issues) {
// Keeps the cluster controller (as it may be re-elected) informed that this worker exists
// The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply
// (requiring us to re-register) The registration request piggybacks optional distributor interface if it exists.
@ -2303,10 +2303,9 @@ ACTOR Future<Void> fdbd(Reference<ClusterConnectionFile> connFile,
auto dbInfo = makeReference<AsyncVar<ServerDBInfo>>();
if (useConfigDB != UseConfigDB::DISABLED) {
actors.push_back(
reportErrors(localConfig.consume(IDependentAsyncVar<ConfigBroadcastFollowerInterface>::create(
dbInfo, [](auto const& info) { return info.configBroadcaster; })),
"LocalConfiguration"));
actors.push_back(reportErrors(localConfig.consume(IAsyncListener<ConfigBroadcastFollowerInterface>::create(
dbInfo, [](auto const& info) { return info.configBroadcaster; })),
"LocalConfiguration"));
}
actors.push_back(reportErrors(monitorAndWriteCCPriorityInfo(fitnessFilePath, asyncPriorityInfo),
"MonitorAndWriteCCPriorityInfo"));

@ -55,7 +55,7 @@ struct UnitTestWorkload : TestWorkload {
if (g_network->isSimulated()) {
testParams.setDataDir(getOption(options, "dataDir"_sr, "simfdb/unittests/"_sr).toString());
} else {
testParams.setDataDir(getOption(options, "dataDir"_sr, "/private/tmp/"_sr).toString());
testParams.setDataDir(getOption(options, "dataDir"_sr, "unittests/"_sr).toString());
}
cleanupAfterTests = getOption(options, "cleanupAfterTests"_sr, true);

@ -222,7 +222,7 @@ double testKeyToDouble(const KeyRef& p, const KeyRef& prefix);
ACTOR Future<Void> databaseWarmer(Database cx);
Future<Void> quietDatabase(Database const& cx,
Reference<AsyncVar<struct ServerDBInfo>> const&,
Reference<AsyncVar<struct ServerDBInfo> const> const&,
std::string phase,
int64_t dataInFlightGate = 2e6,
int64_t maxTLogQueueGate = 5e6,

@ -663,7 +663,16 @@ struct Traceable<Standalone<T>> : std::conditional<Traceable<T>::value, std::tru
static std::string toString(const Standalone<T>& value) { return Traceable<T>::toString(value); }
};
#define LiteralStringRef(str) StringRef((const uint8_t*)(str), sizeof((str)) - 1)
namespace literal_string_ref {
template <class T, int Size>
StringRef LiteralStringRefHelper(const char* str) {
static_assert(std::is_same_v<T, const char(&)[Size]> || std::is_same_v<T, const char[Size]>,
"Argument to LiteralStringRef must be a literal string");
return StringRef(reinterpret_cast<const uint8_t*>(str), Size - 1);
}
} // namespace literal_string_ref
#define LiteralStringRef(str) literal_string_ref::LiteralStringRefHelper<decltype(str), sizeof(str)>(str)
inline StringRef operator"" _sr(const char* str, size_t size) {
return StringRef(reinterpret_cast<const uint8_t*>(str), size);
}

@ -53,6 +53,7 @@ set(FLOW_SRCS
SignalSafeUnwind.cpp
SignalSafeUnwind.h
SimpleOpt.h
StreamCipher.h
SystemMonitor.cpp
SystemMonitor.h
TDMetric.actor.h
@ -100,8 +101,7 @@ set(FLOW_SRCS
if(WITH_TLS AND NOT WIN32)
set(FLOW_SRCS
${FLOW_SRCS}
StreamCipher.cpp
StreamCipher.h)
StreamCipher.cpp)
endif()
add_library(stacktrace stacktrace.amalgamation.cpp stacktrace.h)

@ -60,4 +60,6 @@ TEST_CASE("/flow/IThreadPool/NamedThread") {
return Void();
}
#else
void forceLinkIThreadPoolTests() {}
#endif

@ -20,6 +20,14 @@
#pragma once
#if (!defined(TLS_DISABLED) && !defined(_WIN32))
#define ENCRYPTION_ENABLED 1
#else
#define ENCRYPTION_ENABLED 0
#endif
#if ENCRYPTION_ENABLED
#include "flow/Arena.h"
#include "flow/FastRef.h"
#include "flow/flow.h"
@ -78,3 +86,5 @@ public:
StringRef decrypt(unsigned char const* ciphertext, int len, Arena&);
StringRef finish(Arena&);
};
#endif // ENCRYPTION_ENABLED

@ -158,7 +158,7 @@ ACTOR Future<Void> testPublisher(Reference<AsyncVar<DummyState>> input) {
return Void();
}
ACTOR Future<Void> testSubscriber(Reference<IDependentAsyncVar<int>> output, Optional<int> expected) {
ACTOR Future<Void> testSubscriber(Reference<IAsyncListener<int>> output, Optional<int> expected) {
loop {
wait(output->onChange());
ASSERT(expected.present());
@ -170,12 +170,12 @@ ACTOR Future<Void> testSubscriber(Reference<IDependentAsyncVar<int>> output, Opt
} // namespace
TEST_CASE("/flow/genericactors/DependentAsyncVar") {
TEST_CASE("/flow/genericactors/AsyncListener") {
auto input = makeReference<AsyncVar<DummyState>>();
state Future<Void> subscriber1 =
testSubscriber(IDependentAsyncVar<int>::create(input, [](auto const& var) { return var.changed; }), 100);
testSubscriber(IAsyncListener<int>::create(input, [](auto const& var) { return var.changed; }), 100);
state Future<Void> subscriber2 =
testSubscriber(IDependentAsyncVar<int>::create(input, [](auto const& var) { return var.unchanged; }), {});
testSubscriber(IAsyncListener<int>::create(input, [](auto const& var) { return var.unchanged; }), {});
wait(subscriber1 && testPublisher(input));
ASSERT(!subscriber2.isReady());
return Void();

@ -690,7 +690,7 @@ public:
AsyncTrigger() {}
AsyncTrigger(AsyncTrigger&& at) : v(std::move(at.v)) {}
void operator=(AsyncTrigger&& at) { v = std::move(at.v); }
Future<Void> onTrigger() { return v.onChange(); }
Future<Void> onTrigger() const { return v.onChange(); }
void trigger() { v.trigger(); }
private:
@ -700,7 +700,7 @@ private:
// Binds an AsyncTrigger object to an AsyncVar, so when the AsyncVar changes
// the AsyncTrigger is triggered.
ACTOR template <class T>
void forward(Reference<AsyncVar<T>> from, AsyncTrigger* to) {
void forward(Reference<AsyncVar<T> const> from, AsyncTrigger* to) {
loop {
wait(from->onChange());
to->trigger();
@ -1957,25 +1957,28 @@ Future<U> operator>>(Future<T> const& lhs, Future<U> const& rhs) {
}
/*
* IDependentAsyncVar is similar to AsyncVar, but it decouples the input and output, so the translation unit
* IAsyncListener is similar to AsyncVar, but it decouples the input and output, so the translation unit
* responsible for handling the output does not need to have knowledge of how the output is generated
*/
template <class Output>
class IDependentAsyncVar : public ReferenceCounted<IDependentAsyncVar<Output>> {
class IAsyncListener : public ReferenceCounted<IAsyncListener<Output>> {
public:
virtual ~IDependentAsyncVar() = default;
virtual ~IAsyncListener() = default;
virtual Output const& get() const = 0;
virtual Future<Void> onChange() const = 0;
template <class Input, class F>
static Reference<IDependentAsyncVar> create(Reference<AsyncVar<Input>> const& input, F const& f);
static Reference<IDependentAsyncVar> create(Reference<AsyncVar<Output>> const& output);
static Reference<IAsyncListener> create(Reference<AsyncVar<Input>> const& input, F const& f);
static Reference<IAsyncListener> create(Reference<AsyncVar<Output>> const& output);
};
namespace IAsyncListenerImpl {
template <class Input, class Output, class F>
class DependentAsyncVar final : public IDependentAsyncVar<Output> {
Reference<AsyncVar<Output>> output;
class AsyncListener final : public IAsyncListener<Output> {
// Order matters here, output must outlive monitorActor
AsyncVar<Output> output;
Future<Void> monitorActor;
ACTOR static Future<Void> monitor(Reference<AsyncVar<Input>> input, Reference<AsyncVar<Output>> output, F f) {
ACTOR static Future<Void> monitor(Reference<AsyncVar<Input> const> input, AsyncVar<Output>* output, F f) {
loop {
wait(input->onChange());
output->set(f(input->get()));
@ -1983,23 +1986,24 @@ class DependentAsyncVar final : public IDependentAsyncVar<Output> {
}
public:
DependentAsyncVar(Reference<AsyncVar<Input>> const& input, F const& f)
: output(makeReference<AsyncVar<Output>>(f(input->get()))), monitorActor(monitor(input, output, f)) {}
Output const& get() const override { return output->get(); }
Future<Void> onChange() const override { return output->onChange(); }
AsyncListener(Reference<AsyncVar<Input> const> const& input, F const& f)
: output(f(input->get())), monitorActor(monitor(input, &output, f)) {}
Output const& get() const override { return output.get(); }
Future<Void> onChange() const override { return output.onChange(); }
};
} // namespace IAsyncListenerImpl
template <class Output>
template <class Input, class F>
Reference<IDependentAsyncVar<Output>> IDependentAsyncVar<Output>::create(Reference<AsyncVar<Input>> const& input,
F const& f) {
return makeReference<DependentAsyncVar<Input, Output, F>>(input, f);
Reference<IAsyncListener<Output>> IAsyncListener<Output>::create(Reference<AsyncVar<Input>> const& input, F const& f) {
return makeReference<IAsyncListenerImpl::AsyncListener<Input, Output, F>>(input, f);
}
template <class Output>
Reference<IDependentAsyncVar<Output>> IDependentAsyncVar<Output>::create(Reference<AsyncVar<Output>> const& input) {
Reference<IAsyncListener<Output>> IAsyncListener<Output>::create(Reference<AsyncVar<Output>> const& input) {
auto identity = [](const auto& x) { return x; };
return makeReference<DependentAsyncVar<Output, Output, decltype(identity)>>(input, identity);
return makeReference<IAsyncListenerImpl::AsyncListener<Output, Output, decltype(identity)>>(input, identity);
}
// A weak reference type to wrap a future Reference<T> object.

0
tests/TestRunner/tmp_multi_cluster.py Executable file → Normal file