1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-06-01 10:45:56 +08:00

Merge branch 'master' into feature-release-notes-7.0-master

This commit is contained in:
negoyal 2021-06-25 16:27:19 -07:00
commit bc03553365
19 changed files with 631 additions and 107 deletions

@ -4,6 +4,9 @@ import sys
import subprocess
import logging
import functools
import json
import time
import random
def enable_logging(level=logging.ERROR):
"""Enable logging in the function with the specified logging level
@ -24,7 +27,7 @@ def enable_logging(level=logging.ERROR):
handler.setLevel(level)
logger.addHandler(handler)
# pass the logger to the decorated function
result = func(logger, *args,**kwargs)
result = func(logger, *args, **kwargs)
return result
return wrapper
return func_decorator
@ -38,6 +41,15 @@ def run_fdbcli_command(*args):
commands = command_template + ["{}".format(' '.join(args))]
return subprocess.run(commands, stdout=subprocess.PIPE).stdout.decode('utf-8').strip()
def run_fdbcli_command_and_get_error(*args):
"""run the fdbcli statement: fdbcli --exec '<arg1> <arg2> ... <argN>'.
Returns:
string: Stderr output from fdbcli
"""
commands = command_template + ["{}".format(' '.join(args))]
return subprocess.run(commands, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stderr.decode('utf-8').strip()
@enable_logging()
def advanceversion(logger):
# get current read version
@ -82,6 +94,245 @@ def maintenance(logger):
output3 = run_fdbcli_command('maintenance')
assert output3 == no_maintenance_output
@enable_logging()
def setclass(logger):
output1 = run_fdbcli_command('setclass')
class_type_line_1 = output1.split('\n')[-1]
logger.debug(class_type_line_1)
# check process' network address
assert '127.0.0.1' in class_type_line_1
network_address = ':'.join(class_type_line_1.split(':')[:2])
logger.debug("Network address: {}".format(network_address))
# check class type
assert 'unset' in class_type_line_1
# check class source
assert 'command_line' in class_type_line_1
# set class to a random valid type
class_types = ['storage', 'storage', 'transaction', 'resolution',
'commit_proxy', 'grv_proxy', 'master', 'stateless', 'log',
'router', 'cluster_controller', 'fast_restore', 'data_distributor',
'coordinator', 'ratekeeper', 'storage_cache', 'backup'
]
random_class_type = random.choice(class_types)
logger.debug("Change to type: {}".format(random_class_type))
run_fdbcli_command('setclass', network_address, random_class_type)
# check the set successful
output2 = run_fdbcli_command('setclass')
class_type_line_2 = output2.split('\n')[-1]
logger.debug(class_type_line_2)
# check process' network address
assert network_address in class_type_line_2
# check class type changed to the specified value
assert random_class_type in class_type_line_2
# check class source
assert 'set_class' in class_type_line_2
# set back to default
run_fdbcli_command('setclass', network_address, 'default')
# everything should be back to the same as before
output3 = run_fdbcli_command('setclass')
class_type_line_3 = output3.split('\n')[-1]
logger.debug(class_type_line_3)
assert class_type_line_3 == class_type_line_1
@enable_logging()
def lockAndUnlock(logger):
# lock an unlocked database, should be successful
output1 = run_fdbcli_command('lock')
# UID is 32 bytes
lines = output1.split('\n')
lock_uid = lines[0][-32:]
assert lines[1] == 'Database locked.'
logger.debug("UID: {}".format(lock_uid))
assert get_value_from_status_json(True, 'cluster', 'database_lock_state', 'locked')
# lock a locked database, should get the error code 1038
output2 = run_fdbcli_command_and_get_error("lock")
assert output2 == 'ERROR: Database is locked (1038)'
# unlock the database
process = subprocess.Popen(command_template + ['unlock ' + lock_uid], stdin = subprocess.PIPE, stdout = subprocess.PIPE)
line1 = process.stdout.readline()
# The randome passphrease we need to confirm to proceed the unlocking
line2 = process.stdout.readline()
logger.debug("Random passphrase: {}".format(line2))
output3, err = process.communicate(input=line2)
# No error and unlock was successful
assert err is None
assert output3.decode('utf-8').strip() == 'Database unlocked.'
assert not get_value_from_status_json(True, 'cluster', 'database_lock_state', 'locked')
@enable_logging()
def kill(logger):
output1 = run_fdbcli_command('kill')
lines = output1.split('\n')
assert len(lines) == 2
assert lines[1].startswith('127.0.0.1:')
address = lines[1]
logger.debug("Address: {}".format(address))
old_generation = get_value_from_status_json(False, 'cluster', 'generation')
# This is currently an issue with fdbcli,
# where you need to first run 'kill' to initialize processes' list
# and then specify the certain process to kill
process = subprocess.Popen(command_template[:-1], stdin = subprocess.PIPE, stdout = subprocess.PIPE)
#
output2, err = process.communicate(input='kill; kill {}\n'.format(address).encode())
logger.debug(output2)
# wait for a second for the cluster recovery
time.sleep(1)
new_generation = get_value_from_status_json(True, 'cluster', 'generation')
logger.debug("Old: {}, New: {}".format(old_generation, new_generation))
assert new_generation > old_generation
@enable_logging()
def suspend(logger):
output1 = run_fdbcli_command('suspend')
lines = output1.split('\n')
assert len(lines) == 2
assert lines[1].startswith('127.0.0.1:')
address = lines[1]
logger.debug("Address: {}".format(address))
db_available = get_value_from_status_json(False, 'client', 'database_status', 'available')
assert db_available
# use pgrep to get the pid of the fdb process
pinfos = subprocess.check_output(['pgrep', '-a', 'fdbserver']).decode().strip().split('\n')
port = address.split(':')[1]
logger.debug("Port: {}".format(port))
# use the port number to find the exact fdb process we are connecting to
pinfo = list(filter(lambda x: port in x, pinfos))
assert len(pinfo) == 1
pid = pinfo[0].split(' ')[0]
logger.debug("Pid: {}".format(pid))
process = subprocess.Popen(command_template[:-1], stdin = subprocess.PIPE, stdout = subprocess.PIPE)
# suspend the process for enough long time
output2, err = process.communicate(input='suspend; suspend 3600 {}\n'.format(address).encode())
# the cluster should be unavailable after the only process being suspended
assert not get_value_from_status_json(False, 'client', 'database_status', 'available')
# check the process pid still exists
pids = subprocess.check_output(['pidof', 'fdbserver']).decode().strip()
logger.debug("PIDs: {}".format(pids))
assert pid in pids
# kill the process by pid
kill_output = subprocess.check_output(['kill', pid]).decode().strip()
logger.debug("Kill result: {}".format(kill_output))
# The process should come back after a few time
duration = 0 # seconds we already wait
while not get_value_from_status_json(False, 'client', 'database_status', 'available') and duration < 60:
logger.debug("Sleep for 1 second to wait cluster recovery")
time.sleep(1)
duration += 1
# at most after 60 seconds, the cluster should be available
assert get_value_from_status_json(False, 'client', 'database_status', 'available')
def get_value_from_status_json(retry, *args):
while True:
result = json.loads(run_fdbcli_command('status', 'json'))
if result['client']['database_status']['available'] or not retry:
break
for arg in args:
assert arg in result
result = result[arg]
return result
@enable_logging()
def consistencycheck(logger):
consistency_check_on_output = 'ConsistencyCheck is on'
consistency_check_off_output = 'ConsistencyCheck is off'
output1 = run_fdbcli_command('consistencycheck')
assert output1 == consistency_check_on_output
run_fdbcli_command('consistencycheck', 'off')
output2 = run_fdbcli_command('consistencycheck')
assert output2 == consistency_check_off_output
run_fdbcli_command('consistencycheck', 'on')
output3 = run_fdbcli_command('consistencycheck')
assert output3 == consistency_check_on_output
@enable_logging()
def cache_range(logger):
# this command is currently experimental
# just test we can set and clear the cached range
run_fdbcli_command('cache_range', 'set', 'a', 'b')
run_fdbcli_command('cache_range', 'clear', 'a', 'b')
@enable_logging()
def datadistribution(logger):
output1 = run_fdbcli_command('datadistribution', 'off')
assert output1 == 'Data distribution is turned off.'
output2 = run_fdbcli_command('datadistribution', 'on')
assert output2 == 'Data distribution is turned on.'
output3 = run_fdbcli_command('datadistribution', 'disable', 'ssfailure')
assert output3 == 'Data distribution is disabled for storage server failures.'
# While we disable ssfailure, maintenance should fail
error_msg = run_fdbcli_command_and_get_error('maintenance', 'on', 'fake_zone_id', '1')
assert error_msg == "ERROR: Maintenance mode cannot be used while data distribution is disabled for storage server failures. Use 'datadistribution on' to reenable data distribution."
output4 = run_fdbcli_command('datadistribution', 'enable', 'ssfailure')
assert output4 == 'Data distribution is enabled for storage server failures.'
output5 = run_fdbcli_command('datadistribution', 'disable', 'rebalance')
assert output5 == 'Data distribution is disabled for rebalance.'
output6 = run_fdbcli_command('datadistribution', 'enable', 'rebalance')
assert output6 == 'Data distribution is enabled for rebalance.'
time.sleep(1)
@enable_logging()
def transaction(logger):
"""This test will cover the transaction related fdbcli commands.
In particular,
'begin', 'rollback', 'option'
'getversion', 'get', 'getrange', 'clear', 'clearrange', 'set', 'commit'
"""
err1 = run_fdbcli_command_and_get_error('set', 'key', 'value')
assert err1 == 'ERROR: writemode must be enabled to set or clear keys in the database.'
process = subprocess.Popen(command_template[:-1], stdin = subprocess.PIPE, stdout = subprocess.PIPE)
transaction_flow = ['writemode on', 'begin', 'getversion', 'set key value', 'get key', 'commit']
output1, _ = process.communicate(input='\n'.join(transaction_flow).encode())
# split the output into lines
lines = list(filter(len, output1.decode().split('\n')))[-4:]
assert lines[0] == 'Transaction started'
read_version = int(lines[1])
logger.debug("Read version {}".format(read_version))
# line[1] is the printed read version
assert lines[2] == "`key' is `value'"
assert lines[3].startswith('Committed (') and lines[3].endswith(')')
# validate commit version is larger than the read version
commit_verion = int(lines[3][len('Committed ('):-1])
logger.debug("Commit version: {}".format(commit_verion))
assert commit_verion >= read_version
# check the transaction is committed
output2 = run_fdbcli_command('get', 'key')
assert output2 == "`key' is `value'"
# test rollback and read-your-write behavior
process = subprocess.Popen(command_template[:-1], stdin = subprocess.PIPE, stdout = subprocess.PIPE)
transaction_flow = [
'writemode on', 'begin', 'getrange a z',
'clear key', 'get key',
# 'option on READ_YOUR_WRITES_DISABLE', 'get key',
'rollback'
]
output3, _ = process.communicate(input='\n'.join(transaction_flow).encode())
lines = list(filter(len, output3.decode().split('\n')))[-5:]
# lines[0] == "Transaction started" and lines[1] == 'Range limited to 25 keys'
assert lines[2] == "`key' is `value'"
assert lines[3] == "`key': not found"
assert lines[4] == "Transaction rolled back"
# make sure the rollback works
output4 = run_fdbcli_command('get', 'key')
assert output4 == "`key' is `value'"
# test read_your_write_disable option and clear the inserted key
process = subprocess.Popen(command_template[:-1], stdin = subprocess.PIPE, stdout = subprocess.PIPE)
transaction_flow = [
'writemode on', 'begin',
'option on READ_YOUR_WRITES_DISABLE',
'clear key', 'get key',
'commit'
]
output6, _ = process.communicate(input='\n'.join(transaction_flow).encode())
lines = list(filter(len, output6.decode().split('\n')))[-4:]
assert lines[1] == 'Option enabled for current transaction'
# the get key should still return the value even we clear it in the transaction
assert lines[2] == "`key' is `value'"
# Check the transaction is committed
output7 = run_fdbcli_command('get', 'key')
assert output7 == "`key': not found"
if __name__ == '__main__':
# fdbcli_tests.py <path_to_fdbcli_binary> <path_to_fdb_cluster_file>
assert len(sys.argv) == 3, "Please pass arguments: <path_to_fdbcli_binary> <path_to_fdb_cluster_file>"
@ -90,4 +341,13 @@ if __name__ == '__main__':
# tests for fdbcli commands
# assertions will fail if fdbcli does not work as expected
advanceversion()
cache_range()
consistencycheck()
datadistribution()
kill()
lockAndUnlock()
maintenance()
setclass()
suspend()
transaction()

@ -17,3 +17,4 @@ The following documents give detailed descriptions of the API for each language:
api-c
api-error-codes
special-keys
global-configuration

@ -0,0 +1,130 @@
.. _global-configuration:
.. default-domain:: cpp
.. highlight:: cpp
====================
Global Configuration
====================
The global configuration framework is an eventually consistent configuration
mechanism to efficiently make runtime changes to all clients and servers. It
works by broadcasting updates made to the global configuration key space,
relying on individual machines to store existing configuration in-memory.
The global configuration framework provides a key-value interface to all
processes and clients in a FoundationDB cluster.
The global configuration framework is internal to FoundationDB and clients will
usually have no need to interact with it. The API is provided here for
reference.
Reading data
------------
The global configuration framework is exposed through the
``GlobalConfig::globalConfig()`` static function. There are separate ways to
read a value, depending on if it is an object or a primitive.
.. function:: template<class T> const T get(KeyRef name, T defaultVal)
Returns the value associated with ``name`` stored in global configuration,
or ``defaultVal`` if no key matching ``name`` exists. This templated
function is enabled only when the ``std::is_arithmetic<T>`` specialization
returns true.
.. code-block:: cpp
auto& config = GlobalConfig::globalConfig();
double value = config.get<double>("path/to/key", 1.0);
.. function:: const Reference<ConfigValue> get(KeyRef name)
Returns the value associated with ``name`` stored in global configuration.
.. code-block:: cpp
auto& config = GlobalConfig::globalConfig();
auto configValue = config.get("path/to/key");
// Check if value exists
ASSERT(configValue.value.has_value());
// Cast to correct type
auto str = std::any_cast<StringRef>(configValue.value);
.. function:: const std::map<KeyRef, Reference<ConfigValue>> get(KeyRangeRef range)
Returns all values in the specified range.
.. type:: ConfigValue
Holds a global configuration value and the arena where it lives. ::
struct ConfigValue : ReferenceCounted<ConfigValue> {
Arena arena;
std::any value;
}
``arena``
The arena where the value (and the associated key) lives in memory.
``value``
The stored value.
Writing data
------------
Writing data to global configuration is done using transactions written to the
special key space range ``\xff\xff/global_config/ - \xff\xff/global_config/0``.
Values must always be encoded according to the :ref:`api-python-tuple-layer`.
.. code-block:: cpp
// In GlobalConfig.actor.h
extern const KeyRef myGlobalConfigKey;
// In GlobalConfig.actor.cpp
const KeyRef myGlobalConfigKey = LiteralStringRef("config/key");
// When you want to set the value..
Tuple value = Tuple().appendDouble(1.5);
FDBTransaction* tr = ...;
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tr->set(GlobalConfig::prefixedKey(myGlobalConfigKey), value.pack());
// commit transaction
The client is responsible for avoiding conflicts with other global
configuration keys. For most uses, it is recommended to create a new key space.
For example, an application that wants to write configuration data should use
the ``global_config/config/`` namespace, instead of storing keys in the top
level ``global_config/`` key space.
^^^^^^^^^^^^^
Clearing data
^^^^^^^^^^^^^
Data can be removed from global configuration using standard transaction
semantics. Submit a clear or clear range to the appropriate global
configuration keys in the special key space to clear data.
Watching data
-------------
Global configuration provides functionality to watch for changes.
.. function:: Future<Void> onInitialized()
Returns a ``Future`` which will be triggered when global configuration has
been successfully initialized and populated with data.
.. function:: Future<Void> onChange()
Returns a ``Future`` which will be triggered when any key-value pair in
global configuration changes.
.. function:: void trigger(KeyRef key, std::function<void(std::optional<std::any>)> fn)
Registers a callback to be called when the value for global configuration
key ``key`` is changed. The callback function takes a single argument, an
optional which will be populated with the updated value when ``key`` is
changed, or an empty optional if the value was cleared. If the value is an
allocated object, its memory remains in control of global configuration.

@ -234,6 +234,27 @@ command string The fdbcli command corresponding to this ope
message string Help text explaining the reason this operation failed
========================== ======== ===============
Global configuration module
---------------------------
The global configuration module provides an interface to read and write values
to :doc:`global-configuration`. In general, clients should not read and write
the global configuration special key space keys directly, but should instead
use the global configuration functions.
#. ``\xff\xff/global_config/<key> := <value>`` Read/write. Reading keys in the range will return a tuple decoded string representation of the value for the given key. Writing a value will update all processes in the cluster with the new key-value pair. Values must be written using the :ref:`api-python-tuple-layer`.
Tracing module
--------------
The tracing module provides read and write access to a transactions' tracing
data. Every transaction contains a unique identifier which follows the
transaction through the system. By providing access to set this identifier,
clients can connect FoundationDB transactions to outside events.
#. ``\xff\xff/tracing/transaction_id := <transaction_id>`` Read/write. A 64-bit integer transaction ID which follows the transaction as it moves through FoundationDB. All transactions are assigned a random transaction ID on creation, and this key can be read to surface the randomly generated ID. Alternatively, set this key to provide a custom identifier. When setting this key, provide a string in the form of a 64-bit integer, which will be automatically converted to the appropriate type.
#. ``\xff\xff/tracing/token := <tracing_enabled>`` Read/write. Set to true/false to enable or disable tracing for the transaction, respectively. If read, returns a 64-bit integer set to 0 if tracing has been disabled, or a random 64-bit integer otherwise (this integers value has no meaning to the client other than to determine whether the transaction will be traced).
.. [#conflicting_keys] In practice, the transaction probably committed successfully. However, if you're running multiple resolvers then it's possible for a transaction to cause another to abort even if it doesn't commit successfully.
.. [#max_read_transaction_life_versions] The number 5000000 comes from the server knob MAX_READ_TRANSACTION_LIFE_VERSIONS
.. [#special_key_space_enable_writes] Enabling this option enables other transaction options, such as ``ACCESS_SYSTEM_KEYS``. This may change in the future.

@ -80,8 +80,9 @@ ACTOR Future<bool> clearHealthyZone(Reference<IDatabase> db,
ASSERT(res.size() <= 1);
if (!clearSSFailureZoneString && res.size() == 1 && res[0].key == fdb_cli::ignoreSSFailureSpecialKey) {
if (printWarning) {
printf("ERROR: Maintenance mode cannot be used while data distribution is disabled for storage "
"server failures. Use 'datadistribution on' to reenable data distribution.\n");
fprintf(stderr,
"ERROR: Maintenance mode cannot be used while data distribution is disabled for storage "
"server failures. Use 'datadistribution on' to reenable data distribution.\n");
}
return false;
}
@ -112,8 +113,9 @@ ACTOR Future<bool> setHealthyZone(Reference<IDatabase> db,
ASSERT(res.size() <= 1);
if (res.size() == 1 && res[0].key == fdb_cli::ignoreSSFailureSpecialKey) {
if (printWarning) {
printf("ERROR: Maintenance mode cannot be used while data distribution is disabled for storage "
"server failures. Use 'datadistribution on' to reenable data distribution.\n");
fprintf(stderr,
"ERROR: Maintenance mode cannot be used while data distribution is disabled for storage "
"server failures. Use 'datadistribution on' to reenable data distribution.\n");
}
return false;
}

@ -566,7 +566,8 @@ void initHelp() {
"change the class of a process",
"If no address and class are specified, lists the classes of all servers.\n\nSetting the class to "
"`default' resets the process class to the class specified on the command line. The available "
"classes are `unset', `storage', `transaction', `resolution', `proxy', `master', `test', `unset', "
"classes are `unset', `storage', `transaction', `resolution', `commit_proxy', `grv_proxy', "
"`master', `test', "
"`stateless', `log', `router', `cluster_controller', `fast_restore', `data_distributor', "
"`coordinator', `ratekeeper', `storage_cache', `backup', and `default'.");
helpMap["status"] =
@ -3604,9 +3605,10 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
state std::string passPhrase = deterministicRandom()->randomAlphaNumeric(10);
warn.cancel(); // don't warn while waiting on user input
printf("Unlocking the database is a potentially dangerous operation.\n");
Optional<std::string> input = wait(linenoise.read(
format("Repeat the following passphrase if you would like to proceed (%s) : ",
passPhrase.c_str())));
printf("%s\n", passPhrase.c_str());
fflush(stdout);
Optional<std::string> input =
wait(linenoise.read(format("Repeat the above passphrase if you would like to proceed:")));
warn = checkStatus(timeWarning(5.0, "\nWARNING: Long delay (Ctrl-C to interrupt)\n"), db);
if (input.present() && input.get() == passPhrase) {
UID unlockUID = UID::fromString(tokens[1].toString());

@ -39,6 +39,8 @@ class IKnobCollection {
static std::unique_ptr<IKnobCollection> globalKnobCollection;
public:
virtual ~IKnobCollection() = default;
enum class Type {
CLIENT,
SERVER,

@ -35,7 +35,7 @@
// in order.
template <class T>
class ParallelStream {
BoundedFlowLock semaphore;
Reference<BoundedFlowLock> semaphore;
struct FragmentConstructorTag {
explicit FragmentConstructorTag() = default;
};
@ -43,14 +43,14 @@ class ParallelStream {
public:
// A Fragment is a single stream that will get results to be merged back into the main output stream
class Fragment : public ReferenceCounted<Fragment> {
ParallelStream* parallelStream;
Reference<BoundedFlowLock> semaphore;
PromiseStream<T> stream;
BoundedFlowLock::Releaser releaser;
friend class ParallelStream;
public:
Fragment(ParallelStream* parallelStream, int64_t permitNumber, FragmentConstructorTag)
: parallelStream(parallelStream), releaser(&parallelStream->semaphore, permitNumber) {}
Fragment(Reference<BoundedFlowLock> semaphore, int64_t permitNumber, FragmentConstructorTag)
: semaphore(semaphore), releaser(semaphore.getPtr(), permitNumber) {}
template <class U>
void send(U&& value) {
stream.send(std::forward<U>(value));
@ -105,14 +105,15 @@ public:
}
}
ParallelStream(PromiseStream<T> results, size_t bufferLimit) : results(results), semaphore(1, bufferLimit) {
ParallelStream(PromiseStream<T> results, size_t bufferLimit) : results(results) {
semaphore = makeReference<BoundedFlowLock>(1, bufferLimit);
flusher = flushToClient(this);
}
// Creates a fragment to get merged into the main output stream
ACTOR static Future<Fragment*> createFragmentImpl(ParallelStream<T>* self) {
int64_t permitNumber = wait(self->semaphore.take());
auto fragment = makeReference<Fragment>(self, permitNumber, FragmentConstructorTag());
int64_t permitNumber = wait(self->semaphore->take());
auto fragment = makeReference<Fragment>(self->semaphore, permitNumber, FragmentConstructorTag());
self->fragments.send(fragment);
return fragment.getPtr();
}

@ -462,6 +462,7 @@ void ServerKnobs::initialize(Randomize _randomize, ClientKnobs* clientKnobs, IsS
init( REPLACE_INTERFACE_CHECK_DELAY, 5.0 );
init( COORDINATOR_REGISTER_INTERVAL, 5.0 );
init( CLIENT_REGISTER_INTERVAL, 600.0 );
init( CLUSTER_CONTROLLER_ENABLE_WORKER_HEALTH_MONITOR, false );
init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0;
init( EXPECTED_MASTER_FITNESS, ProcessClass::UnsetFit );

@ -389,6 +389,7 @@ public:
double REPLACE_INTERFACE_CHECK_DELAY;
double COORDINATOR_REGISTER_INTERVAL;
double CLIENT_REGISTER_INTERVAL;
bool CLUSTER_CONTROLLER_ENABLE_WORKER_HEALTH_MONITOR;
// Knobs used to select the best policy (via monte carlo)
int POLICY_RATING_TESTS; // number of tests per policy (in order to compare)

@ -70,7 +70,6 @@ set(FDBSERVER_SRCS
OldTLogServer_6_2.actor.cpp
OnDemandStore.actor.cpp
OnDemandStore.h
Orderer.actor.h
PaxosConfigConsumer.actor.cpp
PaxosConfigConsumer.h
PaxosConfigDatabaseNode.actor.cpp

@ -2719,6 +2719,66 @@ public:
return idUsed;
}
// Updates work health signals in `workerHealth` based on `req`.
void updateWorkerHealth(const UpdateWorkerHealthRequest& req) {
std::string degradedPeersString;
for (int i = 0; i < req.degradedPeers.size(); ++i) {
degradedPeersString += i == 0 ? "" : " " + req.degradedPeers[i].toString();
}
TraceEvent("ClusterControllerUpdateWorkerHealth")
.detail("WorkerAddress", req.address)
.detail("DegradedPeers", degradedPeersString);
// `req.degradedPeers` contains the latest peer performance view from the worker. Clear the worker if the
// requested worker doesn't see any degraded peers.
if (req.degradedPeers.empty()) {
workerHealth.erase(req.address);
return;
}
double currentTime = now();
// Current `workerHealth` doesn't have any information about the incoming worker. Add the worker into
// `workerHealth`.
if (workerHealth.find(req.address) == workerHealth.end()) {
workerHealth[req.address] = {};
for (const auto& degradedPeer : req.degradedPeers) {
workerHealth[req.address].degradedPeers[degradedPeer] = { currentTime, currentTime };
}
return;
}
// The incoming worker already exists in `workerHealth`.
auto& health = workerHealth[req.address];
// First, remove any degraded peers recorded in the `workerHealth`, but aren't in the incoming request. These
// machines network performance should have recovered.
std::unordered_set<NetworkAddress> recoveredPeers;
for (const auto& [peer, times] : health.degradedPeers) {
recoveredPeers.insert(peer);
}
for (const auto& peer : req.degradedPeers) {
if (recoveredPeers.find(peer) != recoveredPeers.end()) {
recoveredPeers.erase(peer);
}
}
for (const auto& peer : recoveredPeers) {
health.degradedPeers.erase(peer);
}
// Update the worker's degradedPeers.
for (const auto& peer : req.degradedPeers) {
auto it = health.degradedPeers.find(peer);
if (it == health.degradedPeers.end()) {
health.degradedPeers[peer] = { currentTime, currentTime };
continue;
}
it->second.lastRefreshTime = currentTime;
}
}
std::map<Optional<Standalone<StringRef>>, WorkerInfo> id_worker;
std::map<Optional<Standalone<StringRef>>, ProcessClass>
id_class; // contains the mapping from process id to process class from the database
@ -2757,6 +2817,18 @@ public:
Optional<UID> recruitingRatekeeperID;
AsyncVar<bool> recruitRatekeeper;
// Stores the health information from a particular worker's perspective.
struct WorkerHealth {
struct DegradedTimes {
double startTime = 0;
double lastRefreshTime = 0;
};
std::unordered_map<NetworkAddress, DegradedTimes> degradedPeers;
// TODO(zhewu): Include disk and CPU signals.
};
std::unordered_map<NetworkAddress, WorkerHealth> workerHealth;
CounterCollection clusterControllerMetrics;
Counter openDatabaseRequests;
@ -4537,6 +4609,11 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
++self.registerMasterRequests;
clusterRegisterMaster(&self, req);
}
when(UpdateWorkerHealthRequest req = waitNext(interf.updateWorkerHealth.getFuture())) {
if (SERVER_KNOBS->CLUSTER_CONTROLLER_ENABLE_WORKER_HEALTH_MONITOR) {
self.updateWorkerHealth(req);
}
}
when(GetServerDBInfoRequest req = waitNext(interf.getServerDBInfo.getFuture())) {
self.addActor.send(clusterGetServerInfo(&self.db, req.knownServerInfoID, req.reply));
}
@ -4631,3 +4708,67 @@ ACTOR Future<Void> clusterController(Reference<ClusterConnectionFile> connFile,
hasConnected = true;
}
}
namespace {
// Tests `ClusterControllerData::updateWorkerHealth()` can update `ClusterControllerData::workerHealth` based on
// `UpdateWorkerHealth` request correctly.
TEST_CASE("/fdbserver/clustercontroller/updateWorkerHealth") {
// Create a testing ClusterControllerData. Most of the internal states do not matter in this test.
state ClusterControllerData data(ClusterControllerFullInterface(),
LocalityData(),
ServerCoordinators(Reference<ClusterConnectionFile>(new ClusterConnectionFile())));
state NetworkAddress workerAddress(IPAddress(0x01010101), 1);
state NetworkAddress badPeer1(IPAddress(0x02020202), 1);
state NetworkAddress badPeer2(IPAddress(0x03030303), 1);
state NetworkAddress badPeer3(IPAddress(0x04040404), 1);
// Create a `UpdateWorkerHealthRequest` with two bad peers, and they should appear in the `workerAddress`'s
// degradedPeers.
{
UpdateWorkerHealthRequest req;
req.address = workerAddress;
req.degradedPeers.push_back(badPeer1);
req.degradedPeers.push_back(badPeer2);
data.updateWorkerHealth(req);
ASSERT(data.workerHealth.find(workerAddress) != data.workerHealth.end());
auto& health = data.workerHealth[workerAddress];
ASSERT_EQ(health.degradedPeers.size(), 2);
ASSERT(health.degradedPeers.find(badPeer1) != health.degradedPeers.end());
ASSERT_EQ(health.degradedPeers[badPeer1].startTime, health.degradedPeers[badPeer1].lastRefreshTime);
ASSERT(health.degradedPeers.find(badPeer2) != health.degradedPeers.end());
}
// Create a `UpdateWorkerHealthRequest` with two bad peers, one from the previous test and a new one.
// The one from the previous test should have lastRefreshTime updated.
// The other one from the previous test not included in this test should be removed.
{
// Make the time to move so that now() guarantees to return a larger value than before.
wait(delay(0.001));
UpdateWorkerHealthRequest req;
req.address = workerAddress;
req.degradedPeers.push_back(badPeer1);
req.degradedPeers.push_back(badPeer3);
data.updateWorkerHealth(req);
ASSERT(data.workerHealth.find(workerAddress) != data.workerHealth.end());
auto& health = data.workerHealth[workerAddress];
ASSERT_EQ(health.degradedPeers.size(), 2);
ASSERT(health.degradedPeers.find(badPeer1) != health.degradedPeers.end());
ASSERT_LT(health.degradedPeers[badPeer1].startTime, health.degradedPeers[badPeer1].lastRefreshTime);
ASSERT(health.degradedPeers.find(badPeer2) == health.degradedPeers.end());
ASSERT(health.degradedPeers.find(badPeer3) != health.degradedPeers.end());
}
// Create a `UpdateWorkerHealthRequest` with empty `degradedPeers`, which should remove the worker from
// `workerHealth`.
{
UpdateWorkerHealthRequest req;
req.address = workerAddress;
data.updateWorkerHealth(req);
ASSERT(data.workerHealth.find(workerAddress) == data.workerHealth.end());
}
return Void();
}
} // namespace

@ -1,78 +0,0 @@
/*
* Orderer.actor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source
// version.
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_ORDERER_ACTOR_G_H)
#define FDBSERVER_ORDERER_ACTOR_G_H
#include "fdbserver/Orderer.actor.g.h"
#elif !defined(FDBSERVER_ORDERER_ACTOR_H)
#define FDBSERVER_ORDERER_ACTOR_H
#include "fdbclient/Notified.h"
#include "flow/actorcompiler.h" // This must be the last #include.
template <class Seq>
class Orderer {
public:
explicit Orderer(Seq s) : ready(s), started(false) {}
void reset(Seq s) {
ready = NotifiedVersion(s);
started = false;
}
Future<bool> order(Seq s, TaskPriority taskID = TaskPriority::DefaultYield) {
if (ready.get() < s)
return waitAndOrder(this, s, taskID);
else
return dedup(s);
}
void complete(Seq s) {
ASSERT(s == ready.get() && started);
started = false;
ready.set(s + 1);
}
Seq getNextSequence() {
return ready.get();
} // Returns the next sequence number which has *not* been returned from order()
Future<Void> whenNextSequenceAtLeast(Seq v) { return ready.whenAtLeast(v); }
private:
ACTOR static Future<bool> waitAndOrder(Orderer<Seq>* self, Seq s, TaskPriority taskID) {
wait(self->ready.whenAtLeast(s));
wait(yield(taskID) || self->shutdown.getFuture());
return self->dedup(s);
}
bool dedup(Seq s) {
if (s != ready.get() || started)
return false;
started = true;
return true;
}
bool started;
NotifiedVersion ready; // FIXME: Notified<Seq>
Promise<Void> shutdown; // Never set, only broken on destruction
};
#include "flow/unactorcompiler.h"
#endif

@ -18,18 +18,19 @@
* limitations under the License.
*/
#include "flow/ActorCollection.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/ConflictSet.h"
#include "fdbserver/ResolverInterface.h"
#include "fdbserver/MasterInterface.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/Orderer.actor.h"
#include "fdbserver/StorageMetrics.h"
#include "fdbclient/Notified.h"
#include "fdbclient/SystemData.h"
#include "fdbserver/ConflictSet.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/MasterInterface.h"
#include "fdbserver/ResolverInterface.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/StorageMetrics.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/ActorCollection.h"
#include "flow/actorcompiler.h" // This must be the last #include.
namespace {

@ -149,6 +149,7 @@ struct ClusterControllerFullInterface {
RequestStream<struct RegisterWorkerRequest> registerWorker;
RequestStream<struct GetWorkersRequest> getWorkers;
RequestStream<struct RegisterMasterRequest> registerMaster;
RequestStream<struct UpdateWorkerHealthRequest> updateWorkerHealth;
RequestStream<struct GetServerDBInfoRequest>
getServerDBInfo; // only used by testers; the cluster controller will send the serverDBInfo to workers
@ -160,7 +161,8 @@ struct ClusterControllerFullInterface {
return clientInterface.hasMessage() || recruitFromConfiguration.getFuture().isReady() ||
recruitRemoteFromConfiguration.getFuture().isReady() || recruitStorage.getFuture().isReady() ||
registerWorker.getFuture().isReady() || getWorkers.getFuture().isReady() ||
registerMaster.getFuture().isReady() || getServerDBInfo.getFuture().isReady();
registerMaster.getFuture().isReady() || updateWorkerHealth.getFuture().isReady() ||
getServerDBInfo.getFuture().isReady();
}
void initEndpoints() {
@ -171,6 +173,7 @@ struct ClusterControllerFullInterface {
registerWorker.getEndpoint(TaskPriority::ClusterControllerWorker);
getWorkers.getEndpoint(TaskPriority::ClusterController);
registerMaster.getEndpoint(TaskPriority::ClusterControllerRegister);
updateWorkerHealth.getEndpoint(TaskPriority::ClusterController);
getServerDBInfo.getEndpoint(TaskPriority::ClusterController);
}
@ -187,6 +190,7 @@ struct ClusterControllerFullInterface {
registerWorker,
getWorkers,
registerMaster,
updateWorkerHealth,
getServerDBInfo);
}
};
@ -418,6 +422,20 @@ struct GetWorkersRequest {
}
};
struct UpdateWorkerHealthRequest {
constexpr static FileIdentifier file_identifier = 5789927;
NetworkAddress address;
std::vector<NetworkAddress> degradedPeers;
template <class Ar>
void serialize(Ar& ar) {
if constexpr (!is_fb_function<Ar>) {
ASSERT(ar.protocolVersion().isValid());
}
serializer(ar, address, degradedPeers);
}
};
struct InitializeTLogRequest {
constexpr static FileIdentifier file_identifier = 15604392;
UID recruitmentID;

@ -687,6 +687,7 @@ ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFu
addressesInDbAndPrimaryDc(interf.addresses(), dbInfo) && ccInterface->get().present()) {
nextHealthCheckDelay = delay(SERVER_KNOBS->WORKER_HEALTH_MONITOR_INTERVAL);
const auto& allPeers = FlowTransport::transport().getAllPeers();
UpdateWorkerHealthRequest req;
for (const auto& [address, peer] : allPeers) {
if (peer->pingLatencies.getPopulationSize() < SERVER_KNOBS->PEER_LATENCY_CHECK_MIN_POPULATION) {
// Ignore peers that don't have enough samples.
@ -724,9 +725,14 @@ ACTOR Future<Void> healthMonitor(Reference<AsyncVar<Optional<ClusterControllerFu
.detail("Count", peer->pingLatencies.getPopulationSize())
.detail("TimeoutCount", peer->timeoutCount);
// TODO(zhewu): Keep track of degraded peers and send them to cluster controller.
req.degradedPeers.push_back(address);
}
}
if (!req.degradedPeers.empty()) {
req.address = FlowTransport::transport().getLocalAddress();
ccInterface->get().get().updateWorkerHealth.send(req);
}
}
choose {
when(wait(nextHealthCheckDelay)) {}

@ -369,6 +369,8 @@ public:
}
template <class T>
void serializeBinaryItem(const T& t) {
static_assert(is_binary_serializable<T>::value,
"Object must be binary serializable, see BINARY_SERIALIZABLE macro");
*(T*)writeBytes(sizeof(T)) = t;
}
void* getData() { return data; }
@ -543,6 +545,8 @@ public:
}
template <class T>
void serializeBinaryItem(const T& t) {
static_assert(is_binary_serializable<T>::value,
"Object must be binary serializable, see BINARY_SERIALIZABLE macro");
writeBytes(&t, sizeof(T));
}
@ -577,6 +581,8 @@ public:
template <class T>
void serializeBinaryItem(T& t) {
static_assert(is_binary_serializable<T>::value,
"Object must be binary serializable, see BINARY_SERIALIZABLE macro");
t = *(T*)(static_cast<Impl*>(this)->readBytes(sizeof(T)));
}
@ -808,6 +814,8 @@ struct PacketWriter {
void serializeBytes(StringRef bytes) { serializeBytes(bytes.begin(), bytes.size()); }
template <class T>
void serializeBinaryItem(const T& t) {
static_assert(is_binary_serializable<T>::value,
"Object must be binary serializable, see BINARY_SERIALIZABLE macro");
if (sizeof(T) <= buffer->bytes_unwritten()) {
*(T*)(buffer->data() + buffer->bytes_written) = t;
buffer->bytes_written += sizeof(T);

@ -88,6 +88,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES SpecificUnitTest.txt IGNORE)
add_fdb_test(TEST_FILES StorageMetricsSampleTests.txt IGNORE)
add_fdb_test(TEST_FILES WorkerTests.txt IGNORE)
add_fdb_test(TEST_FILES ClusterControllerTests.txt IGNORE)
add_fdb_test(TEST_FILES StorageServerInterface.txt)
add_fdb_test(TEST_FILES StreamingWrite.txt IGNORE)
add_fdb_test(TEST_FILES SystemData.txt)

@ -0,0 +1,7 @@
testTitle=UnitTests
startDelay=0
useDB=false
testName=UnitTests
maxTestCases=0
testsMatching=/fdbserver/clustercontroller/