1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-05-26 01:10:04 +08:00

Merge remote-tracking branch 'upstream/release-6.0' into feature-bigint-support-go-ruby-aj

This commit is contained in:
Alec Grieser 2018-11-13 11:28:10 -08:00
commit 7e4f84c60e
No known key found for this signature in database
GPG Key ID: CAF63551C60D3462
130 changed files with 1486 additions and 863 deletions

@ -345,7 +345,7 @@ bool FDBLibTLSSession::verify_peer() {
if (!rc) {
// log the various failure reasons
for (std::string reason : verify_failure_reasons) {
TraceEvent(reason.c_str(), uid);
TraceEvent(reason.c_str(), uid).suppressFor(1.0);
}
}

@ -58,14 +58,14 @@ class Result:
self.key_tuple = subspace.unpack(key)
self.values = values
def key(self, specification):
return self.key_tuple[specification.key_start_index:]
def matches_key(self, rhs, specification):
if not isinstance(rhs, Result):
return False
left_key = self.key_tuple[specification.key_start_index:]
right_key = rhs.key_tuple[specification.key_start_index:]
return left_key == right_key
return self.key(specification) == rhs.key(specification)
def matches(self, rhs, specification):
if not self.matches_key(rhs, specification):

@ -75,33 +75,46 @@ class ResultSet(object):
util.get_logger().info('Comparing results from \'%s\'...' % repr(util.subspace_to_tuple(self.specification.subspace)))
num_errors = 0
has_filtered_error = False
# Tracks the current result being evaluated for each tester
indices = [0 for i in range(len(self.tester_results))]
name_length = max([len(name) for name in self.tester_results.keys()])
has_filtered_error = False
while True:
# Gets the next result for each tester
results = {i: r[indices[i]] for i, r in enumerate(self.tester_results.values()) if len(r) > indices[i]}
if len(results) == 0:
break
# Attempt to 'align' the results. If two results have matching sequence numbers, then they should be compared.
# Only those testers which have a result matching the minimum current sequence number will be included. All
# others are considered to have not produced a result and will be evaluated in a future iteration.
sequence_nums = [r.sequence_num(self.specification) for r in results.values()]
if any([s is not None for s in sequence_nums]):
results = {i: r for i, r in results.items() if r.sequence_num(self.specification) == min(sequence_nums)}
else:
results = {i: r for i, r in results.items() if r.matches_key(min(results.values()), self.specification)}
# If these results aren't using sequence numbers, then we match two results based on whether they share the same key
else:
min_key = min([r.key(self.specification) for r in results.values()])
results = {i: r for i, r in results.items() if r.key(self.specification) == min_key}
# Increment the indices for those testers which produced a result in this iteration
for i in results.keys():
indices[i] += 1
# Fill in 'None' values for testers that didn't produce a result and generate an output string describing the results
all_results = {i: results[i] if i in results else None for i in range(len(self.tester_results))}
result_str = '\n'.join([' %-*s - %s' % (name_length, self.tester_results.keys()[i], r) for i, r in all_results.items()])
result_list = results.values()
# If any of our results matches the global error filter, we ignore the result
if any(r.matches_global_error_filter(self.specification) for r in result_list):
has_filtered_error = True
# The result is considered correct if every tester produced a value and all the values meet the matching criteria
if len(results) < len(all_results) or not all(result_list[0].matches(r, self.specification) for r in result_list):
util.get_logger().error('\nIncorrect result: \n%s' % result_str)
num_errors += 1

@ -547,12 +547,21 @@ Virtual machines
Processes running in different VMs on a single machine will appear to FoundationDB as being hardware isolated. FoundationDB takes pains to assure that data replication is protected from hardware-correlated failures. If FoundationDB is run in multiple VMs on a single machine this protection will be subverted. An administrator can inform FoundationDB of this hardware sharing, however, by specifying a machine ID using the ``locality_machineid`` parameter in :ref:`foundationdb.conf <foundationdb-conf>`. All processes on VMs that share hardware should specify the same ``locality_machineid``.
Datacenters
------------
-----------
FoundationDB is datacenter aware and supports operation across datacenters. In a multiple-datacenter configuration, it is recommended that you set the :ref:`redundancy mode <configuration-choosing-redundancy-mode>` to ``three_datacenter`` and that you set the ``locality_dcid`` parameter for all FoundationDB processes in :ref:`foundationdb.conf <foundationdb-conf>`.
If you specify the ``--datacenter_id`` option to any FoundationDB process in your cluster, you should specify it to all such processes. Processes which do not have a specified datacenter ID on the command line are considered part of a default "unset" datacenter. FoundationDB will incorrectly believe that these processes are failure-isolated from other datacenters, which can reduce performance and fault tolerance.
(Re)creating a database
-----------------------
Installing FoundationDB packages usually creates a new database on the cluster automatically. However, if a cluster does not have a database configured (because the package installation failed to create it, you deleted your data files, or you did not install from the packages, etc.), then you may need to create it manually using the ``configure new`` command in ``fdbcli`` with the desired redundancy mode and storage engine::
> configure new single memory
.. warning:: In a cluster that hasn't been configured, running ``configure new`` will cause the processes in the cluster to delete all data files in their data directories. If a process is reusing an existing data directory, be sure to backup any files that you want to keep. Do not use ``configure new`` to fix a previously working cluster that reports itself missing unless you are certain any necessary data files are safe.
.. _administration-removing:
Uninstalling

@ -242,7 +242,7 @@
.. |option-tls-plugin-blurb| replace::
Sets the :ref:`TLS plugin <configuring-tls-plugin>` to load. This option, if used, must be set before any other TLS options.
Sets the :ref:`TLS plugin <configuring-tls>` to load. This option, if used, must be set before any other TLS options.
.. |option-tls-cert-path-blurb| replace::

@ -1064,7 +1064,7 @@ the most part, this also implies that ``T == fdb.tuple.unpack(fdb.tuple.pack(T))
.. method:: pack(tuple, prefix=b'')
Returns a key (byte string) encoding the specified tuple. If ``prefix`` is set, it will prefix the serialized
bytes with the prefix string. This throws an error if any of the tuple's items are incomplete `Versionstamp`
bytes with the prefix string. This throws an error if any of the tuple's items are incomplete :class:`Versionstamp`
instances.
.. method:: pack_with_versionstamp(tuple, prefix=b'')
@ -1074,8 +1074,8 @@ the most part, this also implies that ``T == fdb.tuple.unpack(fdb.tuple.pack(T))
recurse down nested tuples if there are any to find one.) If so, it will produce a byte string
that can be fed into :meth:`fdb.Transaction.set_versionstamped_key` and correctly fill in the
versionstamp information at commit time so that when the key is re-read and deserialized, the
only difference is that the `Versionstamp` instance is complete and has the transaction version
filled in. This throws an error if there are no incomplete `Versionstamp` instances in the tuple
only difference is that the :class:`Versionstamp` instance is complete and has the transaction version
filled in. This throws an error if there are no incomplete :class:`Versionstamp` instances in the tuple
or if there is more than one.
.. method:: unpack(key)

@ -870,7 +870,7 @@ All future objects are a subclass of the :class:`Future` type.
|future-cancel-blurb|
.. classmethod:: Future.wait_for_any(*futures) -> Fixnum
.. classmethod:: Future.wait_for_any(\*futures) -> Fixnum
Does not return until at least one of the given future objects is ready. Returns the index in the parameter list of a ready future object.

@ -1,8 +1,8 @@
.. _backups:
######################
######################################################
Backup, Restore, and Replication for Disaster Recovery
######################
######################################################
.. include:: guide-common.rst.inc
@ -323,7 +323,7 @@ Optionally, the user can specify a minimum RESTORABILITY guarantee with one of t
.. program:: fdbbackup describe
``describe``
----------
------------
The ``describe`` subcommand will analyze the given backup and print a summary of the snapshot and mutation data versions it contains as well as the version range of restorability the backup can currently provide.

@ -99,7 +99,7 @@ For large clusters, you can manually set the allocated number of processes of a
Set the process using ``configure [proxies|resolvers|logs]=<N>``, where ``<N>`` is an integer greater than 0, or -1 to reset the value to its default.
For recommendations on appropriate values for process types in large clusters, see :ref:`configuration-large-cluster-performance`.
For recommendations on appropriate values for process types in large clusters, see :ref:`guidelines-process-class-config`.
coordinators
------------

@ -263,7 +263,8 @@ Contains default parameters for all fdbserver processes on this machine. These s
* ``locality_dcid``: Data center identifier key. All processes physically located in a data center should share the id. No default value. If you are depending on data center based replication this must be set on all processes.
* ``locality_data_hall``: Data hall identifier key. All processes physically located in a data hall should share the id. No default value. If you are depending on data hall based replication this must be set on all processes.
* ``io_trust_seconds``: Time in seconds that a read or write operation is allowed to take before timing out with an error. If an operation times out, all future operations on that file will fail with an error as well. Only has an effect when using AsyncFileKAIO in Linux. If unset, defaults to 0 which means timeout is disabled.
.. note:: In addition to the options above, TLS settings as described for the :ref:`TLS plugin <configuring-tls-plugin>` can be specified in the [fdbserver] section.
.. note:: In addition to the options above, TLS settings as described for the :ref:`TLS plugin <configuring-tls>` can be specified in the [fdbserver] section.
``[fdbserver.<ID>]`` section(s)
---------------------------------

2
documentation/sphinx/source/guide-common.rst.inc Normal file → Executable file

@ -31,7 +31,7 @@
``coordinators auto`` selects processes based on IP address. If your cluster has processes on the same machine with different IP addresses, ``coordinators auto`` may select a set of coordinators that are not fault tolerant. To ensure maximal fault tolerance, we recommend selecting coordinators according to the criteria in :ref:`configuration-choosing-coordination-servers` and setting them manually.
.. |conf-file-change-detection| replace::
Whenever the ``foundationdb.conf`` file changes, the ``fdbmonitor`` daemon automatically detects the changes and starts, stops, or restarts child processes as necessary.
Whenever the ``foundationdb.conf`` file changes, the ``fdbmonitor`` daemon automatically detects the changes and starts, stops, or restarts child processes as necessary. Note that changes to the configuration file contents must be made *atomically*. It is recommended to save the modified file to a temporary filename and then move/rename it into place, replacing the original. Some text editors do this automatically when saving.
.. |package-deb-clients| replace::
foundationdb-clients\_\ |release|\ -1\_amd64.deb

@ -5,7 +5,7 @@ Local Development
Download the FoundationDB package
=================================
:doc:`Download the FoundationDB package <downloads>` for macOS (FoundationDB-*.pkg) onto your local development machine.
:doc:`Download the FoundationDB package <downloads>` for macOS (FoundationDB-\*.pkg) onto your local development machine.
Install the FoundationDB binaries
=================================

@ -10,7 +10,7 @@ Language support
* FoundationDB now supports :doc:`Ruby </api-ruby>`
* FoundationDB now supports :doc:`Node.js </api-node>`
* FoundationDB now supports Node.js
* FoundationDB now supports `Java </javadoc/index.html>`_ and other JVM languages.

@ -223,12 +223,12 @@ Node
----
* Support for API version 200 and backwards compatibility with previous API versions.
* New APIs for allocating and managing keyspace (:ref:`Directory <developer-guide-directories>`).
* Support for the :ref:`Promise/A+ specification <api-node-promises>` with supporting utilities.
* Support for the Promise/A+ specification with supporting utilities.
* Futures can take multiple callbacks. Callbacks can be added if the original function was called with a callback. The Future type is exposed in our binding.
* Added ``as_foundationdb_key`` and ``as_foundationdb_value`` support.
* Node prints a stack trace if an error occurs in a callback from V8.
* Snapshot transactions can be used in retry loops.
* The :ref:`methods <api-node-setAndWatch>` ``db.setAndWatch`` and ``db.clearAndWatch`` now return an object with a watch member instead of a future.
* The methods ``db.setAndWatch`` and ``db.clearAndWatch`` now return an object with a watch member instead of a future.
* Fix: Could not use the ``'this'`` pointer with the retry decorator.
* Fix: Node transactional decorator didn't return a result to the caller if the function was called with a transaction.
* Fix: The program could sometimes crash when watches were manually cancelled.

@ -47,7 +47,7 @@ Fixes
Java
----
* The `ReadTransaction` interface supports the ability to set transaction options.
* The ``ReadTransaction`` interface supports the ability to set transaction options.
Other Changes
-------------

@ -2,7 +2,7 @@
Release Notes
#############
6.0.14
6.0.15
======
Features
@ -30,6 +30,7 @@ Performance
* Significantly reduced master recovery times for clusters with large amounts of data. [6.0.14] `(PR #836) <https://github.com/apple/foundationdb/pull/836>`_
* Reduced read and commit latencies for clusters which are processing transactions larger than 1MB. [6.0.14] `(PR #851) <https://github.com/apple/foundationdb/pull/851>`_
* Significantly reduced recovery times when executing rollbacks on the memory storage engine. [6.0.14] `(PR #821) <https://github.com/apple/foundationdb/pull/821>`_
* Clients update their key location cache much more efficiently after storage server reboots. [6.0.15] `(PR #892) <https://github.com/apple/foundationdb/pull/892>`_
Fixes
-----
@ -59,6 +60,8 @@ Fixes
* Excluding a process that was both the cluster controller and something else would cause two recoveries instead of one. [6.0.12] `(PR #784) <https://github.com/apple/foundationdb/pull/784>`_
* Configuring from ``three_datacenter`` to ``three_datacenter_fallback`` would cause a lot of unnecessary data movement. [6.0.12] `(PR #782) <https://github.com/apple/foundationdb/pull/782>`_
* Very rarely, backup snapshots would stop making progress. [6.0.14] `(PR #837) <https://github.com/apple/foundationdb/pull/837>`_
* Sometimes data distribution calculated the size of a shard incorrectly. [6.0.15] `(PR #892) <https://github.com/apple/foundationdb/pull/892>`_
* Changing the storage engine configuration would not effect which storage engine was used by the transaction logs. [6.0.15] `(PR #892) <https://github.com/apple/foundationdb/pull/892>`_
Fixes only impacting 6.0.0+
---------------------------
@ -74,6 +77,10 @@ Fixes only impacting 6.0.0+
* The transaction logs were doing a lot of unnecessary disk writes. [6.0.12] `(PR #784) <https://github.com/apple/foundationdb/pull/784>`_
* The master will recover the transaction state store from local transaction logs if possible. [6.0.12] `(PR #801) <https://github.com/apple/foundationdb/pull/801>`_
* A bug in status collection led to various workload metrics being missing and the cluster reporting unhealthy. [6.0.13] `(PR #834) <https://github.com/apple/foundationdb/pull/834>`_
* Data distribution did not stop tracking certain unhealthy teams, leading to incorrect status reporting. [6.0.15] `(PR #892) <https://github.com/apple/foundationdb/pull/892>`_
* Fixed a variety of problems related to changing between different region configurations. [6.0.15] `(PR #892) <https://github.com/apple/foundationdb/pull/892>`_
* fdbcli protects against configuration changes which could cause irreversible damage to a cluster. [6.0.15] `(PR #892) <https://github.com/apple/foundationdb/pull/892>`_
* Significantly reduced both client and server memory usage in clusters with large amounts of data and usable_regions=2. [6.0.15] `(PR #892) <https://github.com/apple/foundationdb/pull/892>`_
Status
------
@ -93,8 +100,10 @@ Bindings
* Java: the `Versionstamp::getUserVersion() </javadoc/com/apple/foundationdb/tuple/Versionstamp.html#getUserVersion-->`_ method did not handle user versions greater than ``0x00FF`` due to operator precedence errors. [6.0.11] `(Issue #761) <https://github.com/apple/foundationdb/issues/761>`_
* Python: bindings didn't work with Python 3.7 because of the new `async` keyword. [6.0.13] `(Issue #830) <https://github.com/apple/foundationdb/issues/830>`_
* Go: `PrefixRange` didn't correctly return an error if it failed to generate the range. [6.0.15] `(PR #878) <https://github.com/apple/foundationdb/pull/878>`_
* Go: Add Tuple layer support for `uint`, `uint64`, and `*big.Int` integers up to 255 bytes. Integer values will be decoded into the first of `int64`, `uint64`, or `*big.Int` in which they fit. [6.0.15]
* Ruby: Add Tuple layer support for integers up to 255 bytes. [6.0.15]
* Go: Add Tuple layer support for `uint`, `uint64`, and `*big.Int` integers up to 255 bytes. Integer values will be decoded into the first of `int64`, `uint64`, or `*big.Int` in which they fit. `(PR #889) <https://github.com/apple/foundationdb/pull/889>`_ [6.0.15]
* Ruby: Add Tuple layer support for integers up to 255 bytes. `(PR #889) <https://github.com/apple/foundationdb/pull/889>`_ [6.0.15]
* Python: bindings didn't work with Python 3.7 because of the new ``async`` keyword. [6.0.13] `(Issue #830) <https://github.com/apple/foundationdb/issues/830>`_
* Go: ``PrefixRange`` didn't correctly return an error if it failed to generate the range. [6.0.15] `(PR #878) <https://github.com/apple/foundationdb/pull/878>`_
Other Changes
-------------

@ -100,7 +100,7 @@ Parameters and client bindings
------------------------------
The default LibreSSL-based implementation
=================================
=========================================
FoundationDB offers TLS based on the LibreSSL library. By default, it will be enabled automatically when participating in a TLS-enabled cluster.
@ -230,7 +230,7 @@ Field Well known name
``subjectAltName`` Subject Alternative Name
================== ========================
Within a subject alternative name requirement, the value specified is required to have the form ``prefix:value``, where the prefix specifies the type of value being matched against. The following prefixes are supported.
Within a subject alternative name requirement, the value specified is required to have the form ``prefix:value``, where the prefix specifies the type of value being matched against. The following prefixes are supported:
====== ===========================
Prefix Well known name
@ -239,7 +239,7 @@ DNS Domain Name
URI Uniform Resource Identifier
IP IP Address
EMAIL Email Address
====== ============================
====== ===========================
The following operators are supported:

@ -30,12 +30,12 @@
#include "fdbclient/Status.h"
#include "fdbclient/BackupContainer.h"
#include "fdbclient/KeyBackedTypes.h"
#include "fdbclient/RunTransaction.actor.h"
#include "fdbrpc/Platform.h"
#include "fdbrpc/BlobStore.h"
#include "fdbclient/BlobStore.h"
#include "fdbclient/json_spirit/json_spirit_writer_template.h"
#include "fdbrpc/Platform.h"
#include <stdarg.h>
#include <stdio.h>
#include <algorithm> // std::transform

@ -1501,11 +1501,18 @@ ACTOR Future<Void> commitTransaction( Reference<ReadYourWritesTransaction> tr )
ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Reference<ClusterConnectionFile> ccf, LineNoise* linenoise, Future<Void> warn ) {
state ConfigurationResult::Type result;
state int startToken = 1;
state bool force = false;
if (tokens.size() < 2)
result = ConfigurationResult::NO_OPTIONS_PROVIDED;
else {
if(tokens[startToken] == LiteralStringRef("FORCE")) {
force = true;
startToken = 2;
}
state Optional<ConfigureAutoResult> conf;
if( tokens[1] == LiteralStringRef("auto") ) {
if( tokens[startToken] == LiteralStringRef("auto") ) {
StatusObject s = wait( makeInterruptable(StatusClient::statusFetcher( ccf )) );
if(warn.isValid())
warn.cancel();
@ -1565,7 +1572,7 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere
}
}
ConfigurationResult::Type r = wait( makeInterruptable( changeConfig( db, std::vector<StringRef>(tokens.begin()+1,tokens.end()), conf) ) );
ConfigurationResult::Type r = wait( makeInterruptable( changeConfig( db, std::vector<StringRef>(tokens.begin()+startToken,tokens.end()), conf, force) ) );
result = r;
}
@ -1577,7 +1584,7 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere
case ConfigurationResult::CONFLICTING_OPTIONS:
case ConfigurationResult::UNKNOWN_OPTION:
case ConfigurationResult::INCOMPLETE_CONFIGURATION:
printUsage(tokens[0]);
printUsage(LiteralStringRef("configure"));
ret=true;
break;
case ConfigurationResult::INVALID_CONFIGURATION:
@ -1592,6 +1599,31 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere
printf("Database created\n");
ret=false;
break;
case ConfigurationResult::DATABASE_UNAVAILABLE:
printf("ERROR: The database is unavailable\n");
printf("Type `configure FORCE <TOKEN>*' to configure without this check\n");
ret=false;
break;
case ConfigurationResult::STORAGE_IN_UNKNOWN_DCID:
printf("ERROR: All storage servers must be in one of the known regions\n");
printf("Type `configure FORCE <TOKEN>*' to configure without this check\n");
ret=false;
break;
case ConfigurationResult::REGION_NOT_FULLY_REPLICATED:
printf("ERROR: When usable_regions > 1, all regions with priority >= 0 must be fully replicated before changing the configuration\n");
printf("Type `configure FORCE <TOKEN>*' to configure without this check\n");
ret=false;
break;
case ConfigurationResult::MULTIPLE_ACTIVE_REGIONS:
printf("ERROR: When changing usable_regions, only one region can have priority >= 0\n");
printf("Type `configure FORCE <TOKEN>*' to configure without this check\n");
ret=false;
break;
case ConfigurationResult::REGIONS_CHANGED:
printf("ERROR: The region configuration cannot be changed while simultaneously changing usable_regions\n");
printf("Type `configure FORCE <TOKEN>*' to configure without this check\n");
ret=false;
break;
case ConfigurationResult::SUCCESS:
printf("Configuration changed\n");
ret=false;
@ -1603,7 +1635,7 @@ ACTOR Future<bool> configure( Database db, std::vector<StringRef> tokens, Refere
return ret;
}
ACTOR Future<bool> fileConfigure(Database db, std::string filePath, bool isNewDatabase) {
ACTOR Future<bool> fileConfigure(Database db, std::string filePath, bool isNewDatabase, bool force) {
std::string contents(readFileBytes(filePath, 100000));
json_spirit::mValue config;
if(!json_spirit::read_string( contents, config )) {
@ -1643,7 +1675,7 @@ ACTOR Future<bool> fileConfigure(Database db, std::string filePath, bool isNewDa
return true;
}
}
ConfigurationResult::Type result = wait( makeInterruptable( changeConfig(db, configString) ) );
ConfigurationResult::Type result = wait( makeInterruptable( changeConfig(db, configString, force) ) );
// Real errors get thrown from makeInterruptable and printed by the catch block in cli(), but
// there are various results specific to changeConfig() that we need to report:
bool ret;
@ -1676,6 +1708,31 @@ ACTOR Future<bool> fileConfigure(Database db, std::string filePath, bool isNewDa
printf("Database created\n");
ret=false;
break;
case ConfigurationResult::DATABASE_UNAVAILABLE:
printf("ERROR: The database is unavailable\n");
printf("Type `fileconfigure FORCE <FILENAME>' to configure without this check\n");
ret=false;
break;
case ConfigurationResult::STORAGE_IN_UNKNOWN_DCID:
printf("ERROR: All storage servers must be in one of the known regions\n");
printf("Type `fileconfigure FORCE <FILENAME>' to configure without this check\n");
ret=false;
break;
case ConfigurationResult::REGION_NOT_FULLY_REPLICATED:
printf("ERROR: When usable_regions > 1, All regions with priority >= 0 must be fully replicated before changing the configuration\n");
printf("Type `fileconfigure FORCE <FILENAME>' to configure without this check\n");
ret=false;
break;
case ConfigurationResult::MULTIPLE_ACTIVE_REGIONS:
printf("ERROR: When changing usable_regions, only one region can have priority >= 0\n");
printf("Type `fileconfigure FORCE <FILENAME>' to configure without this check\n");
ret=false;
break;
case ConfigurationResult::REGIONS_CHANGED:
printf("ERROR: The region configuration cannot be changed while simultaneously changing usable_regions\n");
printf("Type `fileconfigure FORCE <TOKEN>*' to configure without this check\n");
ret=false;
break;
case ConfigurationResult::SUCCESS:
printf("Configuration changed\n");
ret=false;
@ -2550,8 +2607,8 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
}
if (tokencmp(tokens[0], "fileconfigure")) {
if (tokens.size() == 2 || (tokens.size() == 3 && tokens[1] == LiteralStringRef("new"))) {
bool err = wait( fileConfigure( db, tokens.back().toString(), tokens.size() == 3 ) );
if (tokens.size() == 2 || (tokens.size() == 3 && (tokens[1] == LiteralStringRef("new") || tokens[1] == LiteralStringRef("FORCE")) )) {
bool err = wait( fileConfigure( db, tokens.back().toString(), tokens[1] == LiteralStringRef("new"), tokens[1] == LiteralStringRef("FORCE") ) );
if (err) is_error = true;
} else {
printUsage(tokens[0]);

@ -18,8 +18,8 @@
* limitations under the License.
*/
#include "AsyncFileBlobStore.actor.h"
#include "AsyncFileReadAhead.actor.h"
#include "fdbclient/AsyncFileBlobStore.actor.h"
#include "fdbrpc/AsyncFileReadAhead.actor.h"
#include "flow/UnitTest.h"
Future<int64_t> AsyncFileBlobStoreRead::size() {

@ -30,13 +30,13 @@
#include <sstream>
#include <time.h>
#include "IAsyncFile.h"
#include "fdbrpc/IAsyncFile.h"
#include "flow/serialize.h"
#include "flow/Net2Packet.h"
#include "IRateControl.h"
#include "BlobStore.h"
#include "md5/md5.h"
#include "libb64/encode.h"
#include "fdbrpc/IRateControl.h"
#include "fdbclient/BlobStore.h"
#include "fdbclient/md5/md5.h"
#include "fdbclient/libb64/encode.h"
ACTOR template<typename T> static Future<T> joinErrorGroup(Future<T> f, Promise<Void> p) {
try {

@ -22,9 +22,9 @@
#include "flow/Trace.h"
#include "flow/UnitTest.h"
#include "flow/Hash3.h"
#include "fdbrpc/AsyncFileBlobStore.actor.h"
#include "fdbrpc/AsyncFileReadAhead.actor.h"
#include "fdbrpc/Platform.h"
#include "fdbclient/AsyncFileBlobStore.actor.h"
#include "fdbclient/Status.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/ReadYourWrites.h"

@ -20,13 +20,13 @@
#include "BlobStore.h"
#include "md5/md5.h"
#include "libb64/encode.h"
#include "sha1/SHA1.h"
#include "fdbclient/md5/md5.h"
#include "fdbclient/libb64/encode.h"
#include "fdbclient/sha1/SHA1.h"
#include "time.h"
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
#include "IAsyncFile.h"
#include "fdbrpc/IAsyncFile.h"
json_spirit::mObject BlobStoreEndpoint::Stats::getJSON() {
json_spirit::mObject o;

@ -25,9 +25,9 @@
#include "flow/flow.h"
#include "flow/Net2Packet.h"
#include "fdbclient/Knobs.h"
#include "IRateControl.h"
#include "HTTP.h"
#include "JSONDoc.h"
#include "fdbrpc/IRateControl.h"
#include "fdbclient/HTTP.h"
#include "fdbclient/JSONDoc.h"
// Representation of all the things you need to connect to a blob store instance with some credentials.
// Reference counted because a very large number of them could be needed.

@ -54,8 +54,11 @@ void parseReplicationPolicy(IRepPolicyRef* policy, ValueRef const& v) {
void parse( std::vector<RegionInfo>* regions, ValueRef const& v ) {
try {
StatusObject statusObj = BinaryReader::fromStringRef<StatusObject>(v, IncludeVersion());
StatusArray regionArray = statusObj["regions"].get_array();
regions->clear();
if(statusObj["regions"].type() != json_spirit::array_type) {
return;
}
StatusArray regionArray = statusObj["regions"].get_array();
for (StatusObjectReader dc : regionArray) {
RegionInfo info;
json_spirit::mArray datacenters;

@ -32,22 +32,19 @@
#include "EventTypes.actor.h"
#include "fdbrpc/ContinuousSample.h"
class LocationInfo : public MultiInterface<StorageServerInterface> {
class StorageServerInfo : public ReferencedInterface<StorageServerInterface> {
public:
static Reference<LocationInfo> getInterface( DatabaseContext *cx, std::vector<StorageServerInterface> const& alternatives, LocalityData const& clientLocality );
static Reference<StorageServerInfo> getInterface( DatabaseContext *cx, StorageServerInterface const& interf, LocalityData const& locality );
void notifyContextDestroyed();
virtual ~LocationInfo();
virtual ~StorageServerInfo();
private:
DatabaseContext *cx;
LocationInfo( DatabaseContext* cx, vector<StorageServerInterface> const& shards, LocalityData const& clientLocality ) : cx(cx), MultiInterface( shards, clientLocality ) {}
StorageServerInfo( DatabaseContext *cx, StorageServerInterface const& interf, LocalityData const& locality ) : cx(cx), ReferencedInterface<StorageServerInterface>(interf, locality) {}
};
class ProxyInfo : public MultiInterface<MasterProxyInterface> {
public:
ProxyInfo( vector<MasterProxyInterface> const& proxies, LocalityData const& clientLocality ) : MultiInterface( proxies, clientLocality, ALWAYS_FRESH ) {}
};
typedef MultiInterface<ReferencedInterface<StorageServerInterface>> LocationInfo;
typedef MultiInterface<MasterProxyInterface> ProxyInfo;
class DatabaseContext : public ReferenceCounted<DatabaseContext>, NonCopyable {
public:
@ -125,7 +122,7 @@ public:
int locationCacheSize;
CoalescedKeyRangeMap< Reference<LocationInfo> > locationCache;
std::map< std::vector<UID>, LocationInfo* > ssid_locationInfo;
std::map< UID, StorageServerInfo* > server_interf;
// for logging/debugging (relic of multi-db support)
Standalone<StringRef> dbName;

@ -91,6 +91,11 @@ static std::string describe( const int item ) {
return format("%d", item);
}
template <class T>
static std::string describe( Reference<T> const& item ) {
return item->toString();
}
template <class T>
static std::string describe( T const& item ) {
return item.toString();

@ -18,11 +18,12 @@
* limitations under the License.
*/
#include "HTTP.h"
#include "md5/md5.h"
#include "libb64/encode.h"
#include "fdbclient/HTTP.h"
#include "fdbclient/md5/md5.h"
#include "fdbclient/libb64/encode.h"
#include "fdbclient/xml2json.hpp"
#include <cctype>
#include "xml2json.hpp"
namespace HTTP {

@ -20,11 +20,17 @@
#include "flow/flow.h"
#include "flow/Net2Packet.h"
#include "IRateControl.h"
#include "fdbrpc/IRateControl.h"
#include "fdbclient/Knobs.h"
namespace HTTP {
typedef std::map<std::string, std::string> Headers;
struct is_iless {
bool operator() (const std::string &a, const std::string &b) const {
return strcasecmp(a.c_str(), b.c_str()) < 0;
}
};
typedef std::map<std::string, std::string, is_iless> Headers;
std::string urlEncode(const std::string &s);

2
fdbclient/JsonBuilder.h Executable file → Normal file

@ -5,7 +5,7 @@
#include <cmath>
#include "flow/flow.h"
#include "flow/Trace.h"
#include "fdbrpc/JSONDoc.h"
#include "fdbclient/JSONDoc.h"
class JsonBuilder;
class JsonBuilderObject;

@ -51,6 +51,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( DEFAULT_MAX_BACKOFF, 1.0 );
init( BACKOFF_GROWTH_RATE, 2.0 );
init( RESOURCE_CONSTRAINED_MAX_BACKOFF, 30.0 );
init( PROXY_COMMIT_OVERHEAD_BYTES, 23 ); //The size of serializing 7 tags (3 primary, 3 remote, 1 log router) + 2 for the tag length
init( TRANSACTION_SIZE_LIMIT, 1e7 );
init( KEY_SIZE_LIMIT, 1e4 );
@ -61,7 +62,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( MAX_BATCH_SIZE, 20 ); if( randomize && BUGGIFY ) MAX_BATCH_SIZE = 1; // Note that SERVER_KNOBS->START_TRANSACTION_MAX_BUDGET_SIZE is set to match this value
init( GRV_BATCH_TIMEOUT, 0.005 ); if( randomize && BUGGIFY ) GRV_BATCH_TIMEOUT = 0.1;
init( LOCATION_CACHE_EVICTION_SIZE, 100000 );
init( LOCATION_CACHE_EVICTION_SIZE, 300000 );
init( LOCATION_CACHE_EVICTION_SIZE_SIM, 10 ); if( randomize && BUGGIFY ) LOCATION_CACHE_EVICTION_SIZE_SIM = 3;
init( GET_RANGE_SHARD_LIMIT, 2 );

@ -49,6 +49,7 @@ public:
double DEFAULT_MAX_BACKOFF;
double BACKOFF_GROWTH_RATE;
double RESOURCE_CONSTRAINED_MAX_BACKOFF;
int PROXY_COMMIT_OVERHEAD_BYTES;
int64_t TRANSACTION_SIZE_LIMIT;
int64_t KEY_SIZE_LIMIT;

@ -267,39 +267,119 @@ ACTOR Future<DatabaseConfiguration> getDatabaseConfiguration( Database cx ) {
}
}
ACTOR Future<ConfigurationResult::Type> changeConfig( Database cx, std::map<std::string, std::string> m ) {
ACTOR Future<ConfigurationResult::Type> changeConfig( Database cx, std::map<std::string, std::string> m, bool force ) {
state StringRef initIdKey = LiteralStringRef( "\xff/init_id" );
state Transaction tr(cx);
if (!m.size())
if (!m.size()) {
return ConfigurationResult::NO_OPTIONS_PROVIDED;
}
// make sure we have essential configuration options
std::string initKey = configKeysPrefix.toString() + "initialized";
state bool creating = m.count( initKey ) != 0;
if (creating) {
m[initIdKey.toString()] = g_random->randomUniqueID().toString();
if (!isCompleteConfiguration(m))
if (!isCompleteConfiguration(m)) {
return ConfigurationResult::INCOMPLETE_CONFIGURATION;
} else {
state Future<DatabaseConfiguration> fConfig = getDatabaseConfiguration(cx);
Void _ = wait( success(fConfig) || delay(1.0) );
if(fConfig.isReady()) {
DatabaseConfiguration config = fConfig.get();
for(auto kv : m) {
config.set(kv.first, kv.second);
}
if(!config.isValid()) {
return ConfigurationResult::INVALID_CONFIGURATION;
}
}
}
state Future<Void> tooLong = delay(4.5);
loop {
try {
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE );
tr.setOption( FDBTransactionOptions::LOCK_AWARE );
if(!creating && !force) {
state Future<Standalone<RangeResultRef>> fConfig = tr.getRange(configKeys, CLIENT_KNOBS->TOO_MANY);
Void _ = wait( success(fConfig) || tooLong );
if(!fConfig.isReady()) {
return ConfigurationResult::DATABASE_UNAVAILABLE;
}
if(fConfig.isReady()) {
ASSERT( fConfig.get().size() < CLIENT_KNOBS->TOO_MANY );
state DatabaseConfiguration oldConfig;
oldConfig.fromKeyValues((VectorRef<KeyValueRef>) fConfig.get());
state DatabaseConfiguration newConfig = oldConfig;
for(auto kv : m) {
newConfig.set(kv.first, kv.second);
}
if(!newConfig.isValid()) {
return ConfigurationResult::INVALID_CONFIGURATION;
}
if(oldConfig.usableRegions != newConfig.usableRegions) {
//cannot change region configuration
std::map<Key,int32_t> dcId_priority;
for(auto& it : newConfig.regions) {
dcId_priority[it.dcId] = it.priority;
}
for(auto& it : oldConfig.regions) {
if(!dcId_priority.count(it.dcId) || dcId_priority[it.dcId] != it.priority) {
return ConfigurationResult::REGIONS_CHANGED;
}
}
//must only have one region with priority >= 0
int activeRegionCount = 0;
for(auto& it : newConfig.regions) {
if(it.priority >= 0) {
activeRegionCount++;
}
}
if(activeRegionCount > 1) {
return ConfigurationResult::MULTIPLE_ACTIVE_REGIONS;
}
}
state Future<Standalone<RangeResultRef>> fServerList = (newConfig.regions.size()) ? tr.getRange( serverListKeys, CLIENT_KNOBS->TOO_MANY ) : Future<Standalone<RangeResultRef>>();
if(newConfig.usableRegions==2) {
//all regions with priority >= 0 must be fully replicated
state std::vector<Future<Optional<Value>>> replicasFutures;
for(auto& it : newConfig.regions) {
if(it.priority >= 0) {
replicasFutures.push_back(tr.get(datacenterReplicasKeyFor(it.dcId)));
}
}
Void _ = wait( waitForAll(replicasFutures) || tooLong );
for(auto& it : replicasFutures) {
if(!it.isReady()) {
return ConfigurationResult::DATABASE_UNAVAILABLE;
}
if(!it.get().present()) {
return ConfigurationResult::REGION_NOT_FULLY_REPLICATED;
}
}
}
if(newConfig.regions.size()) {
//all storage servers must be in one of the regions
Void _ = wait( success(fServerList) || tooLong );
if(!fServerList.isReady()) {
return ConfigurationResult::DATABASE_UNAVAILABLE;
}
Standalone<RangeResultRef> serverList = fServerList.get();
ASSERT( !serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY );
std::set<Key> newDcIds;
for(auto& it : newConfig.regions) {
newDcIds.insert(it.dcId);
}
for(auto& s : serverList) {
auto ssi = decodeServerListValue( s.value );
if ( !ssi.locality.dcId().present() || !newDcIds.count(ssi.locality.dcId().get()) ) {
return ConfigurationResult::STORAGE_IN_UNKNOWN_DCID;
}
}
}
}
}
if (creating) {
tr.setOption( FDBTransactionOptions::INITIALIZE_NEW_DATABASE );
tr.addReadConflictRange( singleKeyRange( initIdKey ) );
@ -635,7 +715,7 @@ ACTOR Future<ConfigurationResult::Type> autoConfig( Database cx, ConfigureAutoRe
}
}
Future<ConfigurationResult::Type> changeConfig( Database const& cx, std::vector<StringRef> const& modes, Optional<ConfigureAutoResult> const& conf ) {
Future<ConfigurationResult::Type> changeConfig( Database const& cx, std::vector<StringRef> const& modes, Optional<ConfigureAutoResult> const& conf, bool force ) {
if( modes.size() && modes[0] == LiteralStringRef("auto") && conf.present() ) {
return autoConfig(cx, conf.get());
}
@ -644,16 +724,16 @@ Future<ConfigurationResult::Type> changeConfig( Database const& cx, std::vector<
auto r = buildConfiguration( modes, m );
if (r != ConfigurationResult::SUCCESS)
return r;
return changeConfig(cx, m);
return changeConfig(cx, m, force);
}
Future<ConfigurationResult::Type> changeConfig( Database const& cx, std::string const& modes ) {
Future<ConfigurationResult::Type> changeConfig( Database const& cx, std::string const& modes, bool force ) {
TraceEvent("ChangeConfig").detail("Mode", modes);
std::map<std::string,std::string> m;
auto r = buildConfiguration( modes, m );
if (r != ConfigurationResult::SUCCESS)
return r;
return changeConfig(cx, m);
return changeConfig(cx, m, force);
}
ACTOR Future<vector<ProcessData>> getWorkers( Transaction* tr ) {

@ -49,6 +49,11 @@ public:
INVALID_CONFIGURATION,
DATABASE_ALREADY_CREATED,
DATABASE_CREATED,
DATABASE_UNAVAILABLE,
STORAGE_IN_UNKNOWN_DCID,
REGION_NOT_FULLY_REPLICATED,
MULTIPLE_ACTIVE_REGIONS,
REGIONS_CHANGED,
SUCCESS
};
};
@ -104,11 +109,11 @@ ConfigurationResult::Type buildConfiguration( std::string const& modeString, std
bool isCompleteConfiguration( std::map<std::string, std::string> const& options );
// All versions of changeConfig apply the given set of configuration tokens to the database, and return a ConfigurationResult (or error).
Future<ConfigurationResult::Type> changeConfig( Database const& cx, std::string const& configMode ); // Accepts tokens separated by spaces in a single string
Future<ConfigurationResult::Type> changeConfig( Database const& cx, std::string const& configMode, bool force ); // Accepts tokens separated by spaces in a single string
ConfigureAutoResult parseConfig( StatusObject const& status );
Future<ConfigurationResult::Type> changeConfig( Database const& cx, std::vector<StringRef> const& modes, Optional<ConfigureAutoResult> const& conf ); // Accepts a vector of configuration tokens
Future<ConfigurationResult::Type> changeConfig( Database const& cx, std::map<std::string, std::string> const& m ); // Accepts a full configuration in key/value format (from buildConfiguration)
Future<ConfigurationResult::Type> changeConfig( Database const& cx, std::vector<StringRef> const& modes, Optional<ConfigureAutoResult> const& conf, bool force ); // Accepts a vector of configuration tokens
Future<ConfigurationResult::Type> changeConfig( Database const& cx, std::map<std::string, std::string> const& m, bool const& force ); // Accepts a full configuration in key/value format (from buildConfiguration)
Future<DatabaseConfiguration> getDatabaseConfiguration( Database const& cx );
Future<Void> waitForFullReplication( Database const& cx );

@ -28,6 +28,7 @@
struct MasterProxyInterface {
enum { LocationAwareLoadBalance = 1 };
enum { AlwaysFresh = 1 };
LocalityData locality;
RequestStream< struct CommitTransactionRequest > commit;
@ -96,12 +97,12 @@ struct CommitTransactionRequest {
}
};
static inline int getBytes( CommitTransactionRequest const& r ) {
static inline int getBytes( CommitTransactionRequest const& r ) {
// SOMEDAY: Optimize
//return r.arena.getSize(); // NOT correct because arena can be shared!
int total = sizeof(r);
for(auto m = r.transaction.mutations.begin(); m != r.transaction.mutations.end(); ++m)
total += m->expectedSize();
total += m->expectedSize() + CLIENT_KNOBS->PROXY_COMMIT_OVERHEAD_BYTES;
for(auto i = r.transaction.read_conflict_ranges.begin(); i != r.transaction.read_conflict_ranges.end(); ++i)
total += i->expectedSize();
for(auto i = r.transaction.write_conflict_ranges.begin(); i != r.transaction.write_conflict_ranges.end(); ++i)

@ -73,38 +73,40 @@ static void initTLSOptions() {
static const Key CLIENT_LATENCY_INFO_PREFIX = LiteralStringRef("client_latency/");
static const Key CLIENT_LATENCY_INFO_CTR_PREFIX = LiteralStringRef("client_latency_counter/");
Reference<LocationInfo> LocationInfo::getInterface( DatabaseContext *cx, std::vector<StorageServerInterface> const& alternatives, LocalityData const& clientLocality ) {
std::vector<UID> handles;
for( auto const& alternative : alternatives )
handles.push_back( alternative.getVersion.getEndpoint().token ); // getVersion here was a random choice
std::sort( handles.begin(), handles.end() );
ASSERT( handles.size() );
Reference<StorageServerInfo> StorageServerInfo::getInterface( DatabaseContext *cx, StorageServerInterface const& ssi, LocalityData const& locality ) {
auto it = cx->server_interf.find( ssi.id() );
if( it != cx->server_interf.end() ) {
if(it->second->interf.getVersion.getEndpoint().token != ssi.getVersion.getEndpoint().token) {
if(it->second->interf.locality == ssi.locality) {
//FIXME: load balance holds pointers to individual members of the interface, and this assignment will swap out the object they are
// pointing to. This is technically correct, but is very unnatural. We may want to refactor load balance to take an AsyncVar<Reference<Interface>>
// so that it is notified when the interface changes.
it->second->interf = ssi;
} else {
it->second->notifyContextDestroyed();
Reference<StorageServerInfo> loc( new StorageServerInfo(cx, ssi, locality) );
cx->server_interf[ ssi.id() ] = loc.getPtr();
return loc;
}
}
auto it = cx->ssid_locationInfo.find( handles );
if( it != cx->ssid_locationInfo.end() ) {
return Reference<LocationInfo>::addRef( it->second );
return Reference<StorageServerInfo>::addRef( it->second );
}
Reference<LocationInfo> loc( new LocationInfo(cx, alternatives, clientLocality) );
cx->ssid_locationInfo[ handles ] = loc.getPtr();
Reference<StorageServerInfo> loc( new StorageServerInfo(cx, ssi, locality) );
cx->server_interf[ ssi.id() ] = loc.getPtr();
return loc;
}
void LocationInfo::notifyContextDestroyed() {
void StorageServerInfo::notifyContextDestroyed() {
cx = NULL;
}
LocationInfo::~LocationInfo() {
StorageServerInfo::~StorageServerInfo() {
if( cx ) {
std::vector<UID> handles;
for( auto const& alternative : getAlternatives() )
handles.push_back( alternative.v.getVersion.getEndpoint().token ); // must match above choice of UID
std::sort( handles.begin(), handles.end() );
ASSERT_ABORT( handles.size() );
auto it = cx->ssid_locationInfo.find( handles );
if( it != cx->ssid_locationInfo.end() )
cx->ssid_locationInfo.erase( it );
auto it = cx->server_interf.find( interf.id() );
if( it != cx->server_interf.end() )
cx->server_interf.erase( it );
cx = NULL;
}
}
@ -498,6 +500,7 @@ ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<Cluster
Reference<ClusterConnectionFile> ccf, Reference<AsyncVar<ClientDBInfo>> outInfo )
{
try {
state Optional<std::string> incorrectConnectionString;
loop {
OpenDatabaseRequest req;
req.knownClientInfoID = outInfo->get().id;
@ -508,11 +511,18 @@ ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<Cluster
ClusterConnectionString fileConnectionString;
if (ccf && !ccf->fileContentsUpToDate(fileConnectionString)) {
req.issues = LiteralStringRef("incorrect_cluster_file_contents");
std::string connectionString = ccf->getConnectionString().toString();
if(ccf->canGetFilename()) {
TraceEvent(SevWarnAlways, "IncorrectClusterFileContents").detail("Filename", ccf->getFilename())
// Don't log a SevWarnAlways the first time to account for transient issues (e.g. someone else changing the file right before us)
TraceEvent(incorrectConnectionString.present() && incorrectConnectionString.get() == connectionString ? SevWarnAlways : SevWarn, "IncorrectClusterFileContents")
.detail("Filename", ccf->getFilename())
.detail("ConnectionStringFromFile", fileConnectionString.toString())
.detail("CurrentConnectionString", ccf->getConnectionString().toString());
.detail("CurrentConnectionString", connectionString);
}
incorrectConnectionString = connectionString;
}
else {
incorrectConnectionString = Optional<std::string>();
}
choose {
@ -555,9 +565,9 @@ Database DatabaseContext::create( Reference<AsyncVar<ClientDBInfo>> info, Future
DatabaseContext::~DatabaseContext() {
monitorMasterProxiesInfoChange.cancel();
for(auto it = ssid_locationInfo.begin(); it != ssid_locationInfo.end(); it = ssid_locationInfo.erase(it))
for(auto it = server_interf.begin(); it != server_interf.end(); it = server_interf.erase(it))
it->second->notifyContextDestroyed();
ASSERT_ABORT( ssid_locationInfo.empty() );
ASSERT_ABORT( server_interf.empty() );
locationCache.insert( allKeys, Reference<LocationInfo>() );
}
@ -603,8 +613,14 @@ bool DatabaseContext::getCachedLocations( const KeyRangeRef& range, vector<std::
}
Reference<LocationInfo> DatabaseContext::setCachedLocation( const KeyRangeRef& keys, const vector<StorageServerInterface>& servers ) {
vector<Reference<ReferencedInterface<StorageServerInterface>>> serverRefs;
serverRefs.reserve(servers.size());
for(auto& interf : servers) {
serverRefs.push_back( StorageServerInfo::getInterface( this, interf, clientLocality ) );
}
int maxEvictionAttempts = 100, attempts = 0;
Reference<LocationInfo> loc = LocationInfo::getInterface( this, servers, clientLocality);
Reference<LocationInfo> loc = Reference<LocationInfo>( new LocationInfo(serverRefs) );
while( locationCache.size() > locationCacheSize && attempts < maxEvictionAttempts) {
TEST( true ); // NativeAPI storage server locationCache entry evicted
attempts++;
@ -663,8 +679,8 @@ void DatabaseContext::setOption( FDBDatabaseOptions::Option option, Optional<Str
case FDBDatabaseOptions::MACHINE_ID:
clientLocality = LocalityData( clientLocality.processId(), value.present() ? Standalone<StringRef>(value.get()) : Optional<Standalone<StringRef>>(), clientLocality.machineId(), clientLocality.dcId() );
if( clientInfo->get().proxies.size() )
masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, clientLocality ));
ssid_locationInfo.clear();
masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, clientLocality ) );
server_interf.clear();
locationCache.insert( allKeys, Reference<LocationInfo>() );
break;
case FDBDatabaseOptions::MAX_WATCHES:
@ -674,7 +690,7 @@ void DatabaseContext::setOption( FDBDatabaseOptions::Option option, Optional<Str
clientLocality = LocalityData(clientLocality.processId(), clientLocality.zoneId(), clientLocality.machineId(), value.present() ? Standalone<StringRef>(value.get()) : Optional<Standalone<StringRef>>());
if( clientInfo->get().proxies.size() )
masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, clientLocality ));
ssid_locationInfo.clear();
server_interf.clear();
locationCache.insert( allKeys, Reference<LocationInfo>() );
break;
}
@ -924,7 +940,7 @@ Reference<ProxyInfo> DatabaseContext::getMasterProxies() {
return masterProxies;
}
//Actor which will wait until the ProxyInfo returned by the DatabaseContext cx is not NULL
//Actor which will wait until the MultiInterface<MasterProxyInterface> returned by the DatabaseContext cx is not NULL
ACTOR Future<Reference<ProxyInfo>> getMasterProxiesFuture(DatabaseContext *cx) {
loop{
Reference<ProxyInfo> proxies = cx->getMasterProxies();
@ -2916,31 +2932,21 @@ ACTOR Future< StorageMetrics > waitStorageMetrics(
StorageMetrics permittedError,
int shardLimit )
{
state int tooManyShardsCount = 0;
loop {
state vector< pair<KeyRange, Reference<LocationInfo>> > locations = wait( getKeyRangeLocations( cx, keys, shardLimit, false, &StorageServerInterface::waitMetrics, TransactionInfo(TaskDataDistribution) ) );
vector< pair<KeyRange, Reference<LocationInfo>> > locations = wait( getKeyRangeLocations( cx, keys, shardLimit, false, &StorageServerInterface::waitMetrics, TransactionInfo(TaskDataDistribution) ) );
if( locations.size() == shardLimit ) {
TraceEvent(!g_network->isSimulated() && ++tooManyShardsCount >= 15 ? SevWarnAlways : SevWarn, "WaitStorageMetricsPenalty")
.detail("Keys", printable(keys))
.detail("Locations", locations.size())
.detail("Limit", CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT)
.detail("JitteredSecondsOfPenitence", CLIENT_KNOBS->STORAGE_METRICS_TOO_MANY_SHARDS_DELAY);
Void _ = wait(delayJittered(CLIENT_KNOBS->STORAGE_METRICS_TOO_MANY_SHARDS_DELAY, TaskDataDistribution));
// make sure that the next getKeyRangeLocations() call will actually re-fetch the range
cx->invalidateCache( keys );
} else {
tooManyShardsCount = 0;
//SOMEDAY: Right now, if there are too many shards we delay and check again later. There may be a better solution to this.
//SOMEDAY: Right now, if there are too many shards we delay and check again later. There may be a better solution to this.
if(locations.size() < shardLimit) {
try {
Future<StorageMetrics> fx;
if (locations.size() > 1) {
StorageMetrics x = wait( waitStorageMetricsMultipleLocations( locations, min, max, permittedError ) );
return x;
fx = waitStorageMetricsMultipleLocations( locations, min, max, permittedError );
} else {
WaitMetricsRequest req( keys, min, max );
StorageMetrics x = wait( loadBalance( locations[0].second, &StorageServerInterface::waitMetrics, req, TaskDataDistribution ) );
return x;
fx = loadBalance( locations[0].second, &StorageServerInterface::waitMetrics, req, TaskDataDistribution );
}
StorageMetrics x = wait(fx);
return x;
} catch (Error& e) {
if (e.code() != error_code_wrong_shard_server && e.code() != error_code_all_alternatives_failed) {
TraceEvent(SevError, "WaitStorageMetricsError").error(e);
@ -2949,6 +2955,14 @@ ACTOR Future< StorageMetrics > waitStorageMetrics(
cx->invalidateCache(keys);
Void _ = wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, TaskDataDistribution));
}
} else {
TraceEvent(SevWarn, "WaitStorageMetricsPenalty")
.detail("Keys", printable(keys))
.detail("Limit", CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT)
.detail("JitteredSecondsOfPenitence", CLIENT_KNOBS->STORAGE_METRICS_TOO_MANY_SHARDS_DELAY);
Void _ = wait(delayJittered(CLIENT_KNOBS->STORAGE_METRICS_TOO_MANY_SHARDS_DELAY, TaskDataDistribution));
// make sure that the next getKeyRangeLocations() call will actually re-fetch the range
cx->invalidateCache( keys );
}
}
}

@ -21,7 +21,7 @@
#ifndef FDBCLIENT_STATUS_H
#define FDBCLIENT_STATUS_H
#include "../fdbrpc/JSONDoc.h"
#include "fdbclient/JSONDoc.h"
// Reads the entire string s as a JSON value
// Throws if no value can be parsed or if s contains data after the first JSON value

@ -36,6 +36,7 @@ struct StorageServerInterface {
};
enum { LocationAwareLoadBalance = 1 };
enum { AlwaysFresh = 0 };
LocalityData locality;
UID uniqueID;

@ -258,6 +258,29 @@ int decodeDatacenterReplicasValue( ValueRef const& value ) {
return s;
}
// "\xff\x02/tLogDatacenters/[[datacenterID]]"
extern const KeyRangeRef tLogDatacentersKeys;
extern const KeyRef tLogDatacentersPrefix;
const Key tLogDatacentersKeyFor( Optional<Value> dcID );
const KeyRangeRef tLogDatacentersKeys(
LiteralStringRef("\xff\x02/tLogDatacenters/"),
LiteralStringRef("\xff\x02/tLogDatacenters0") );
const KeyRef tLogDatacentersPrefix = tLogDatacentersKeys.begin;
const Key tLogDatacentersKeyFor( Optional<Value> dcID ) {
BinaryWriter wr(AssumeVersion(currentProtocolVersion));
wr.serializeBytes( tLogDatacentersKeys.begin );
wr << dcID;
return wr.toStringRef();
}
Optional<Value> decodeTLogDatacentersKey( KeyRef const& key ) {
Optional<Value> dcID;
BinaryReader rd( key.removePrefix(tLogDatacentersKeys.begin), AssumeVersion(currentProtocolVersion) );
rd >> dcID;
return dcID;
}
const KeyRef primaryDatacenterKey = LiteralStringRef("\xff/primaryDatacenter");
// serverListKeys.contains(k) iff k.startsWith( serverListKeys.begin ) because '/'+1 == '0'
@ -395,6 +418,8 @@ const KeyRef minRequiredCommitVersionKey = LiteralStringRef("\xff/minRequiredCom
const KeyRef globalKeysPrefix = LiteralStringRef("\xff/globals");
const KeyRef lastEpochEndKey = LiteralStringRef("\xff/globals/lastEpochEnd");
const KeyRef lastEpochEndPrivateKey = LiteralStringRef("\xff\xff/globals/lastEpochEnd");
const KeyRef rebootWhenDurableKey = LiteralStringRef("\xff/globals/rebootWhenDurable");
const KeyRef rebootWhenDurablePrivateKey = LiteralStringRef("\xff\xff/globals/rebootWhenDurable");
const KeyRef fastLoggingEnabled = LiteralStringRef("\xff/globals/fastLoggingEnabled");
const KeyRef fastLoggingEnabledPrivateKey = LiteralStringRef("\xff\xff/globals/fastLoggingEnabled");

@ -78,7 +78,7 @@ const Value tagLocalityListValue( int8_t const& );
Optional<Value> decodeTagLocalityListKey( KeyRef const& );
int8_t decodeTagLocalityListValue( ValueRef const& );
// "\xff\x02/DatacenterReplicas/[[datacenterID]]" := "[[replicas]]"
// "\xff\x02/datacenterReplicas/[[datacenterID]]" := "[[replicas]]"
extern const KeyRangeRef datacenterReplicasKeys;
extern const KeyRef datacenterReplicasPrefix;
const Key datacenterReplicasKeyFor( Optional<Value> dcID );
@ -86,6 +86,12 @@ const Value datacenterReplicasValue( int const& );
Optional<Value> decodeDatacenterReplicasKey( KeyRef const& );
int decodeDatacenterReplicasValue( ValueRef const& );
// "\xff\x02/tLogDatacenters/[[datacenterID]]"
extern const KeyRangeRef tLogDatacentersKeys;
extern const KeyRef tLogDatacentersPrefix;
const Key tLogDatacentersKeyFor( Optional<Value> dcID );
Optional<Value> decodeTLogDatacentersKey( KeyRef const& );
extern const KeyRef primaryDatacenterKey;
// "\xff/serverList/[[serverID]]" := "[[StorageServerInterface]]"
@ -148,6 +154,8 @@ std::pair<vector<std::pair<UID, NetworkAddress>>,vector<std::pair<UID, NetworkAd
extern const KeyRef globalKeysPrefix;
extern const KeyRef lastEpochEndKey;
extern const KeyRef lastEpochEndPrivateKey;
extern const KeyRef rebootWhenDurableKey;
extern const KeyRef rebootWhenDurablePrivateKey;
extern const KeyRef fastLoggingEnabled;
extern const KeyRef fastLoggingEnabledPrivateKey;

49
fdbclient/fdbclient.vcxproj Executable file → Normal file

@ -20,13 +20,14 @@
</ProjectConfiguration>
</ItemGroup>
<ItemGroup>
<ActorCompiler Include="KeyRangeMap.actor.cpp" />
<ActorCompiler Include="NativeAPI.actor.cpp" />
</ItemGroup>
<ItemGroup>
<ActorCompiler Include="AsyncFileBlobStore.actor.h">
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">false</EnableCompile>
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Release|X64'">false</EnableCompile>
</ActorCompiler>
<ClInclude Include="Atomic.h" />
<ClInclude Include="BackupContainer.h" />
<ClInclude Include="BackupAgent.h" />
<ClInclude Include="BlobStore.h" />
<ClInclude Include="ClientDBInfo.h" />
<ClInclude Include="ClientLogEvents.h" />
<ClInclude Include="ClientWorkerInterface.h" />
@ -34,19 +35,21 @@
<ClInclude Include="CommitTransaction.h" />
<ClInclude Include="CoordinationInterface.h" />
<ClInclude Include="DatabaseConfiguration.h" />
<ActorCompiler Include="DatabaseContext.h" />
<ClInclude Include="DatabaseContext.h" />
<ActorCompiler Include="EventTypes.actor.h">
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">false</EnableCompile>
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Release|X64'">false</EnableCompile>
</ActorCompiler>
<ClInclude Include="KeyBackedTypes.h" />
<ClInclude Include="MetricLogger.h" />
<ActorCompiler Include="MetricLogger.actor.cpp" />
<ClInclude Include="FailureMonitorClient.h" />
<ClInclude Include="FDBOptions.g.h" />
<ClInclude Include="FDBOptions.h" />
<ClInclude Include="FDBTypes.h" />
<ClInclude Include="HTTP.h" />
<ClInclude Include="KeyBackedTypes.h" />
<ClInclude Include="MetricLogger.h" />
<ClInclude Include="FailureMonitorClient.h" />
<ClInclude Include="IClientApi.h" />
<ClInclude Include="JsonBuilder.h" />
<ClInclude Include="JSONDoc.h" />
<ClInclude Include="json_spirit\json_spirit_error_position.h" />
<ClInclude Include="json_spirit\json_spirit_reader_template.h" />
<ClInclude Include="json_spirit\json_spirit_value.h" />
@ -54,8 +57,13 @@
<ClInclude Include="json_spirit\json_spirit_writer_template.h" />
<ClInclude Include="KeyRangeMap.h" />
<ClInclude Include="Knobs.h" />
<ClInclude Include="libb64\cdecode.h" />
<ClInclude Include="libb64\cencode.h" />
<ClInclude Include="libb64\decode.h" />
<ClInclude Include="libb64\encode.h" />
<ClInclude Include="ManagementAPI.h" />
<ClInclude Include="MasterProxyInterface.h" />
<ClInclude Include="md5\md5.h" />
<ClInclude Include="MonitorLeader.h" />
<ClInclude Include="MultiVersionAssignmentVars.h" />
<ClInclude Include="MultiVersionTransaction.h" />
@ -66,41 +74,52 @@
<ActorCompiler Include="RunTransaction.actor.h" />
<ClInclude Include="RYWIterator.h" />
<ClInclude Include="Schemas.h" />
<ClInclude Include="sha1\SHA1.h" />
<ClInclude Include="SnapshotCache.h" />
<ClInclude Include="Status.h" />
<ClInclude Include="StatusClient.h" />
<ClInclude Include="StorageServerInterface.h" />
<ClInclude Include="Subspace.h" />
<ClInclude Include="SystemData.h" />
<ClInclude Include="TaskBucket.h" />
<ClInclude Include="ThreadSafeTransaction.h" />
<ClInclude Include="Tuple.h" />
<ActorCompiler Include="VersionedMap.actor.h">
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">false</EnableCompile>
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Release|X64'">false</EnableCompile>
</ActorCompiler>
<ClInclude Include="VersionedMap.h" />
<ClInclude Include="WriteMap.h" />
<ClInclude Include="Subspace.h" />
<ClInclude Include="Tuple.h" />
<ClInclude Include="JsonBuilder.h" />
<ClInclude Include="xml2json.hpp" />
</ItemGroup>
<ItemGroup>
<ActorCompiler Include="FailureMonitorClient.actor.cpp" />
<ActorCompiler Include="ReadYourWrites.actor.cpp" />
<ActorCompiler Include="AsyncFileBlobStore.actor.cpp" />
<ClCompile Include="AutoPublicAddress.cpp" />
<ActorCompiler Include="BackupAgentBase.actor.cpp" />
<ActorCompiler Include="BackupContainer.actor.cpp" />
<ActorCompiler Include="BlobStore.actor.cpp" />
<ActorCompiler Include="DatabaseBackupAgent.actor.cpp" />
<ClCompile Include="DatabaseConfiguration.cpp" />
<ClCompile Include="AutoPublicAddress.cpp" />
<ActorCompiler Include="FailureMonitorClient.actor.cpp" />
<ClCompile Include="FDBOptions.g.cpp" />
<ActorCompiler Include="FileBackupAgent.actor.cpp" />
<ActorCompiler Include="HTTP.actor.cpp" />
<ActorCompiler Include="KeyRangeMap.actor.cpp" />
<ClCompile Include="Knobs.cpp" />
<ClCompile Include="libb64\cdecode.c" />
<ClCompile Include="libb64\cencode.c" />
<ClCompile Include="md5\md5.c" />
<ActorCompiler Include="MetricLogger.actor.cpp" />
<ActorCompiler Include="MonitorLeader.actor.cpp" />
<ActorCompiler Include="ManagementAPI.actor.cpp" />
<ActorCompiler Include="MultiVersionTransaction.actor.cpp" />
<ActorCompiler Include="NativeAPI.actor.cpp" />
<ActorCompiler Include="ReadYourWrites.actor.cpp" />
<ClCompile Include="RYWIterator.cpp" />
<ActorCompiler Include="StatusClient.actor.cpp" />
<ClCompile Include="Schemas.cpp" />
<ClCompile Include="SystemData.cpp" />
<ClCompile Include="sha1\SHA1.cpp" />
<ActorCompiler Include="ThreadSafeTransaction.actor.cpp" />
<ActorCompiler Include="TaskBucket.actor.cpp" />
<ClCompile Include="Subspace.cpp" />

24
fdbrpc/xml2json.hpp → fdbclient/xml2json.hpp Executable file → Normal file

@ -9,19 +9,19 @@
#include <string>
#include <cctype>
#include "rapidxml/rapidxml.hpp"
#include "rapidxml/rapidxml_utils.hpp"
#include "rapidxml/rapidxml_print.hpp"
#include "fdbclient/rapidxml/rapidxml.hpp"
#include "fdbclient/rapidxml/rapidxml_utils.hpp"
#include "fdbclient/rapidxml/rapidxml_print.hpp"
#include "rapidjson/document.h"
#include "rapidjson/prettywriter.h"
#include "rapidjson/encodedstream.h"
#include "rapidjson/stringbuffer.h"
#include "rapidjson/reader.h"
#include "rapidjson/writer.h"
#include "rapidjson/filereadstream.h"
#include "rapidjson/filewritestream.h"
#include "rapidjson/error/en.h"
#include "fdbclient/rapidjson/document.h"
#include "fdbclient/rapidjson/prettywriter.h"
#include "fdbclient/rapidjson/encodedstream.h"
#include "fdbclient/rapidjson/stringbuffer.h"
#include "fdbclient/rapidjson/reader.h"
#include "fdbclient/rapidjson/writer.h"
#include "fdbclient/rapidjson/filereadstream.h"
#include "fdbclient/rapidjson/filewritestream.h"
#include "fdbclient/rapidjson/error/en.h"
/* [Start] This part is configurable */
static const char xml2json_text_additional_name[] = "#text";

@ -635,13 +635,15 @@ bool argv_equal(const char** a1, const char** a2)
return true;
}
void kill_process(uint64_t id) {
void kill_process(uint64_t id, bool wait = true) {
pid_t pid = id_pid[id];
log_msg(SevInfo, "Killing process %d\n", pid);
kill(pid, SIGTERM);
waitpid(pid, NULL, 0);
if(wait) {
waitpid(pid, NULL, 0);
}
pid_id.erase(pid);
id_pid.erase(id);
@ -1367,8 +1369,19 @@ int main(int argc, char** argv) {
signal(SIGCHLD, SIG_IGN);
sigprocmask(SIG_SETMASK, &normal_mask, NULL);
/* Send SIGHUP to all child processes */
kill(0, SIGHUP);
/* If daemonized, setsid() was called earlier so we can just kill our entire new process group */
if(daemonize) {
kill(0, SIGHUP);
}
else {
/* Otherwise kill each process individually but don't wait on them yet */
auto i = id_pid.begin();
auto iEnd = id_pid.end();
while(i != iEnd) {
// Must advance i before calling kill_process() which erases the entry at i
kill_process((i++)->first, false);
}
}
/* Wait for all child processes (says POSIX.1-2001) */
/* POSIX.1-2001 specifies that if the disposition of SIGCHLD is set to SIG_IGN, then children that terminate do not become zombies and a call to wait()

@ -420,7 +420,7 @@ struct Peer : NonCopyable {
TraceEvent(ok ? SevInfo : SevWarnAlways, "ConnectionClosed", conn ? conn->getDebugID() : UID()).error(e, true).suppressFor(1.0).detail("PeerAddr", self->destination);
}
else {
TraceEvent(ok ? SevInfo : SevWarnAlways, "IncompatibleConnectionClosed", conn ? conn->getDebugID() : UID()).error(e, true).detail("PeerAddr", self->destination);
TraceEvent(ok ? SevInfo : SevWarnAlways, "IncompatibleConnectionClosed", conn ? conn->getDebugID() : UID()).error(e, true).suppressFor(1.0).detail("PeerAddr", self->destination);
}
if (conn) {

@ -154,9 +154,9 @@ void addLaggingRequest(Future<Optional<Reply>> reply, Promise<Void> requestFinis
// Keep trying to get a reply from any of servers until success or cancellation; tries to take into account
// failMon's information for load balancing and avoiding failed servers
// If ALL the servers are failed and the list of servers is not fresh, throws an exception to let the caller refresh the list of servers
ACTOR template <class Interface, class Request>
ACTOR template <class Interface, class Request, class Multi>
Future< REPLY_TYPE(Request) > loadBalance(
Reference<MultiInterface<Interface>> alternatives,
Reference<MultiInterface<Multi>> alternatives,
RequestStream<Request> Interface::* channel,
Request request = Request(),
int taskID = TaskDefaultPromiseEndpoint,
@ -272,39 +272,37 @@ Future< REPLY_TYPE(Request) > loadBalance(
if(!stream && !firstRequest.isValid() ) {
// Everything is down! Wait for someone to be up.
if(now() - g_network->networkMetrics.newestAlternativesFailure > FLOW_KNOBS->ALTERNATIVES_FAILURE_RESET_TIME) {
g_network->networkMetrics.oldestAlternativesFailure = now();
}
double serversValidTime = alternatives->getRetrievedAt();
double minDelay = std::min(FLOW_KNOBS->CACHE_REFRESH_INTERVAL_WHEN_ALL_ALTERNATIVES_FAILED - (now() - serversValidTime), FLOW_KNOBS->ALTERNATIVES_FAILURE_MIN_DELAY);
double delay = std::max(std::min((now()-g_network->networkMetrics.oldestAlternativesFailure)*FLOW_KNOBS->ALTERNATIVES_FAILURE_DELAY_RATIO, FLOW_KNOBS->ALTERNATIVES_FAILURE_MAX_DELAY), minDelay);
if(serversValidTime == ALWAYS_FRESH)
delay = ALWAYS_FRESH;
// Making this SevWarn means a lot of clutter
if(now() - g_network->networkMetrics.newestAlternativesFailure > 1 || g_random->random01() < 0.01) {
TraceEvent("AllAlternativesFailed")
.detail("Interval", FLOW_KNOBS->CACHE_REFRESH_INTERVAL_WHEN_ALL_ALTERNATIVES_FAILED)
.detail("ServersValidTime", serversValidTime)
.detail("Alternatives", alternatives->description())
.detail("Delay", delay);
}
g_network->networkMetrics.newestAlternativesFailure = now();
if (delay < 0) {
throw all_alternatives_failed();
}
vector<Future<Void>> ok( alternatives->size() );
for(int i=0; i<ok.size(); i++)
for(int i=0; i<ok.size(); i++) {
ok[i] = IFailureMonitor::failureMonitor().onStateEqual( alternatives->get(i, channel).getEndpoint(), FailureStatus(false) );
choose {
when ( Void _ = wait( quorum( ok, 1 ) ) ) {}
when ( Void _ = wait( ::delayJittered( delay ) ) ) {
throw all_alternatives_failed();
}
if(!alternatives->alwaysFresh()) {
if(now() - g_network->networkMetrics.newestAlternativesFailure > FLOW_KNOBS->ALTERNATIVES_FAILURE_RESET_TIME) {
g_network->networkMetrics.oldestAlternativesFailure = now();
}
double delay = std::max(std::min((now()-g_network->networkMetrics.oldestAlternativesFailure)*FLOW_KNOBS->ALTERNATIVES_FAILURE_DELAY_RATIO, FLOW_KNOBS->ALTERNATIVES_FAILURE_MAX_DELAY), FLOW_KNOBS->ALTERNATIVES_FAILURE_MIN_DELAY);
// Making this SevWarn means a lot of clutter
if(now() - g_network->networkMetrics.newestAlternativesFailure > 1 || g_random->random01() < 0.01) {
TraceEvent("AllAlternativesFailed")
.detail("Interval", FLOW_KNOBS->CACHE_REFRESH_INTERVAL_WHEN_ALL_ALTERNATIVES_FAILED)
.detail("Alternatives", alternatives->description())
.detail("Delay", delay);
}
g_network->networkMetrics.newestAlternativesFailure = now();
choose {
when ( Void _ = wait( quorum( ok, 1 ) ) ) {}
when ( Void _ = wait( ::delayJittered( delay ) ) ) {
throw all_alternatives_failed();
}
}
} else {
Void _ = wait( quorum( ok, 1 ) );
}
numAttempts = 0; // now that we've got a server back, reset the backoff
@ -408,17 +406,4 @@ Future< REPLY_TYPE(Request) > loadBalance(
}
}
// This wrapper is just to help the compiler accept the coercesion to Reference<Multinterface>
template <class Interface, class Request, class Multi>
inline Future< REPLY_TYPE(Request) > loadBalance(
Reference<Multi> alternatives,
RequestStream<Request> Interface::* channel,
Request request = Request(),
int taskID = TaskDefaultPromiseEndpoint,
bool atMostOnce = false,
QueueModel* model = NULL)
{
return loadBalance( Reference<MultiInterface<Interface>>(alternatives), channel, request, taskID, atMostOnce, model );
}
#endif

@ -268,6 +268,7 @@ struct LBLocalityData {
enum { Present = 0 };
static LocalityData getLocality( Interface const& ) { return LocalityData(); }
static NetworkAddress getAddress( Interface const& ) { return NetworkAddress(); }
static bool alwaysFresh() { return true; }
};
// Template specialization that only works for interfaces with a .locality member.
@ -277,6 +278,7 @@ struct LBLocalityData<Interface, typename std::enable_if< Interface::LocationAwa
enum { Present = 1 };
static LocalityData getLocality( Interface const& i ) { return i.locality; }
static NetworkAddress getAddress( Interface const& i ) { return i.address(); }
static bool alwaysFresh() { return Interface::AlwaysFresh; }
};
struct LBDistance {

@ -22,8 +22,6 @@
#define FLOW_MULTIINTERFACE_H
#pragma once
#define ALWAYS_FRESH 1e99
extern uint64_t debug_lastLoadBalanceResultEndpointToken;
template <class K, class V>
@ -42,10 +40,27 @@ template <class K, class V> bool operator < ( K const& l, KVPair<K,V> const& r )
template <class K, class V>
std::string describe( KVPair<K,V> const& p ) { return format("%d ", p.k) + describe(p.v); }
template <class T>
struct ReferencedInterface : public ReferenceCounted<ReferencedInterface<T>> {
T interf;
int8_t distance;
std::string toString() const {
return interf.toString();
}
ReferencedInterface(T const& interf, LocalityData const& locality = LocalityData()) : interf(interf) {
distance = LBLocalityData<T>::Present ? loadBalanceDistance( locality, LBLocalityData<T>::getLocality( interf ), LBLocalityData<T>::getAddress( interf ) ) : LBDistance::DISTANT;
}
virtual ~ReferencedInterface() {}
static bool sort_by_distance(Reference<ReferencedInterface<T>> r1, Reference<ReferencedInterface<T>> r2) {
return r1->distance < r2->distance;
}
};
template <class T>
class MultiInterface : public ReferenceCounted<MultiInterface<T>> {
public:
MultiInterface( const vector<T>& v, LocalityData const& locality = LocalityData(), double timeNow = now() ) : retrievedAt( timeNow ), bestCount(0) {
MultiInterface( const vector<T>& v, LocalityData const& locality = LocalityData() ) : bestCount(0) {
for(int i=0; i<v.size(); i++)
alternatives.push_back(KVPair<int,T>(LBDistance::DISTANT,v[i]));
g_random->randomShuffle(alternatives);
@ -59,7 +74,7 @@ public:
}
int size() const { return alternatives.size(); }
int countBest() const {
int countBest() const {
return bestCount;
}
LBDistance::Type bestDistance() const {
@ -67,6 +82,9 @@ public:
return LBDistance::DISTANT;
return (LBDistance::Type) alternatives[0].k;
}
bool alwaysFresh() const {
return LBLocalityData<T>::alwaysFresh();
}
template <class F>
F const& get( int index, F T::*member ) const {
@ -76,27 +94,66 @@ public:
T const& getInterface(int index) { return alternatives[index].v; }
UID getId( int index ) const { return alternatives[index].v.id(); }
//vector<T> const& get() { return alternatives; }
double getRetrievedAt() const { return retrievedAt; }
virtual ~MultiInterface() {}
// void alwaysFresh() { retrievedAt = FLOW_KNOBS->ALWAYS_FRESH; }
// void freshen() { retrievedAt = now(); }
std::string description() {
return describe( alternatives );
}
protected:
vector<KVPair<int,T>> const& getAlternatives() { return alternatives; }
private:
vector<KVPair<int,T>> alternatives;
double retrievedAt;
int bestCount;
int16_t bestCount;
};
template <class T>
class MultiInterface<ReferencedInterface<T>> : public ReferenceCounted<MultiInterface<ReferencedInterface<T>>> {
public:
MultiInterface( const vector<Reference<ReferencedInterface<T>>>& v ) : alternatives(v), bestCount(0) {
g_random->randomShuffle(alternatives);
if ( LBLocalityData<T>::Present ) {
std::stable_sort( alternatives.begin(), alternatives.end(), ReferencedInterface<T>::sort_by_distance );
}
if(size()) {
for(int i = 1; i < alternatives.size(); i++) {
if(alternatives[i]->distance > alternatives[0]->distance) {
bestCount = i;
return;
}
}
bestCount = size();
}
}
int size() const { return alternatives.size(); }
int countBest() const {
return bestCount;
}
LBDistance::Type bestDistance() const {
if( !size() )
return LBDistance::DISTANT;
return (LBDistance::Type) alternatives[0]->distance;
}
bool alwaysFresh() const {
return LBLocalityData<T>::alwaysFresh();
}
template <class F>
F const& get( int index, F T::*member ) const {
return alternatives[index]->interf.*member;
}
T const& getInterface(int index) { return alternatives[index]->interf; }
UID getId( int index ) const { return alternatives[index]->interf.id(); }
virtual ~MultiInterface() {}
std::string description() {
return describe( alternatives );
}
private:
vector<Reference<ReferencedInterface<T>>> alternatives;
int16_t bestCount;
};
template <class Ar, class T> void load(Ar& ar, Reference<MultiInterface<T>>&) { ASSERT(false); } //< required for Future<T>
#endif
#endif

@ -76,7 +76,7 @@ ACTOR static Future<Void> handshake( TLSConnection* self ) {
int r = self->session->handshake();
if ( r == ITLSSession::SUCCESS ) break;
if ( r == ITLSSession::FAILED ) {
TraceEvent("TLSConnectionHandshakeError", self->getDebugID());
TraceEvent("TLSConnectionHandshakeError", self->getDebugID()).suppressFor(1.0);
throw connection_failed();
}
ASSERT( r == ITLSSession::WANT_WRITE || r == ITLSSession::WANT_READ );

Some files were not shown because too many files have changed in this diff Show More