Extract IConnection and NetworkAddress out from network.h

This commit is contained in:
Xiaoge Su 2022-12-27 17:29:54 -08:00
parent c11c48fa3f
commit 50de69c897
34 changed files with 405 additions and 326 deletions

View File

@ -21,6 +21,7 @@
#include "fdbclient/AsyncFileS3BlobStore.actor.h"
#include "fdbrpc/AsyncFileReadAhead.actor.h"
#include "flow/UnitTest.h"
#include "flow/IConnection.h"
#include "flow/actorcompiler.h" // has to be last include
Future<int64_t> AsyncFileS3BlobStoreRead::size() const {

View File

@ -24,6 +24,7 @@
#include "flow/IRandom.h"
#include "flow/flow.h"
#include "fdbclient/Knobs.h"
#include "flow/IConnection.h"
#include "fdbclient/S3BlobStore.h"
std::string buildPartitionPath(const std::string& url, const std::string& partition) {

View File

@ -26,6 +26,7 @@
#include "flow/UnitTest.h"
#include "fdbrpc/genericactors.actor.h"
#include "flow/Platform.h"
#include "flow/IConnection.h"
#include "flow/actorcompiler.h" // has to be last include
namespace {

View File

@ -33,6 +33,7 @@
#include "flow/serialize.h"
#include "flow/Trace.h"
#include "flow/UnitTest.h"
#include "flow/IConnection.h"
#include <memory>
#include <unordered_map>

View File

@ -22,6 +22,7 @@
#include "flow/flat_buffers.h"
#include "flow/UnitTest.h"
#include "flow/IConnection.h"
#include <boost/algorithm/string.hpp>

View File

@ -20,6 +20,7 @@
#include "fdbclient/S3BlobStore.h"
#include "flow/IConnection.h"
#include "md5/md5.h"
#include "libb64/encode.h"
#include "fdbclient/sha1/SHA1.h"

View File

@ -23,6 +23,7 @@
#include "flow/IRandom.h"
#include "flow/UnitTest.h"
#include "flow/Knobs.h"
#include "flow/IConnection.h"
#include "fdbclient/IKnobCollection.h"
#include "flow/network.h"
#include <functional>

View File

@ -38,6 +38,8 @@
using RESTConnectionPoolKey = std::pair<std::string, std::string>;
class IConnection;
class RESTConnectionPool : public ReferenceCounted<RESTConnectionPool> {
public:
struct ReusableConnection {

View File

@ -29,6 +29,8 @@
#include "fdbrpc/HTTP.h"
#include "fdbclient/JSONDoc.h"
class IConnection;
// Representation of all the things you need to connect to a blob store instance with some credentials.
// Reference counted because a very large number of them could be needed.
class S3BlobStoreEndpoint : public ReferenceCounted<S3BlobStoreEndpoint> {

View File

@ -49,10 +49,15 @@
#include "flow/ProtocolVersion.h"
#include "flow/UnitTest.h"
#include "flow/WatchFile.actor.h"
#include "flow/IConnection.h"
#define XXH_INLINE_ALL
#include "flow/xxhash.h"
#include "flow/actorcompiler.h" // This must be the last #include.
void removeCachedDNS(const std::string& host, const std::string& service) {
INetworkConnections::net()->removeCachedDNS(host, service);
}
namespace {
NetworkAddressList g_currentDeliveryPeerAddress = NetworkAddressList();
@ -572,9 +577,7 @@ ACTOR Future<Void> connectionMonitor(Reference<Peer> peer) {
}
break;
}
when(wait(peer->resetPing.onTrigger())) {
break;
}
when(wait(peer->resetPing.onTrigger())) { break; }
}
}
}
@ -670,9 +673,7 @@ ACTOR Future<Void> connectionKeeper(Reference<Peer> self,
choose {
when(wait(self->dataToSend.onTrigger())) {}
when(wait(retryConnectF)) {
break;
}
when(wait(retryConnectF)) { break; }
}
}
@ -721,9 +722,7 @@ ACTOR Future<Void> connectionKeeper(Reference<Peer> self,
self->prependConnectPacket();
reader = connectionReader(self->transport, conn, self, Promise<Reference<Peer>>());
}
when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) {
throw connection_failed();
}
when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) { throw connection_failed(); }
}
} catch (Error& e) {
++self->connectFailedCount;
@ -1470,9 +1469,7 @@ ACTOR static Future<Void> connectionIncoming(TransportData* self, Reference<ICon
ASSERT(false);
return Void();
}
when(Reference<Peer> p = wait(onConnected.getFuture())) {
p->onIncomingConnection(p, conn, reader);
}
when(Reference<Peer> p = wait(onConnected.getFuture())) { p->onIncomingConnection(p, conn, reader); }
when(wait(delayJittered(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) {
CODE_PROBE(true, "Incoming connection timed out");
throw timed_out();

View File

@ -24,6 +24,7 @@
#include "libb64/encode.h"
#include "flow/Knobs.h"
#include <cctype>
#include "flow/IConnection.h"
#include "flow/actorcompiler.h" // has to be last include

View File

@ -18,6 +18,8 @@
* limitations under the License.
*/
#include <boost/asio.hpp>
#include "flow/UnitTest.h"
#include "flow/Error.h"
#include "fdbrpc/IPAllowList.h"

View File

@ -30,6 +30,7 @@
#include "flow/Platform.h"
#include "flow/SendBufferIterator.h"
#include "flow/UnitTest.h"
#include "flow/IConnection.h"
#include "flow/actorcompiler.h" // This must be the last #include.
using namespace boost::asio;

View File

@ -34,6 +34,8 @@
#include "flow/Arena.h"
#include "flow/PKey.h"
class IConnection;
enum { WLTOKEN_ENDPOINT_NOT_FOUND = 0, WLTOKEN_PING_PACKET, WLTOKEN_UNAUTHORIZED_ENDPOINT, WLTOKEN_FIRST_AVAILABLE };
#pragma pack(push, 4)

View File

@ -27,6 +27,8 @@
#include "flow/Net2Packet.h"
#include "flow/IRateControl.h"
class IConnection;
namespace HTTP {
struct is_iless {
bool operator()(const std::string& a, const std::string& b) const { return strcasecmp(a.c_str(), b.c_str()) < 0; }

View File

@ -25,6 +25,7 @@
#include "flow/FastRef.h"
#include "flow/network.h"
#include "flow/flow.h"
#include "flow/IConnection.h"
#include <boost/asio.hpp>

View File

@ -32,6 +32,10 @@
#include "flow/Hostname.h"
#include "flow/actorcompiler.h" // This must be the last #include.
// To avoid diretly access INetworkConnection::net()->removeCachedDNS(), which will require heavy include budget, put
// the call to FlowTransport.actor.cpp as a external function.
extern void removeCachedDNS(const std::string& host, const std::string& service);
ACTOR template <class Req, bool P>
Future<REPLY_TYPE(Req)> retryBrokenPromise(RequestStream<Req, P> to, Req request) {
// Like to.getReply(request), except that a broken_promise exception results in retrying request immediately.
@ -98,7 +102,7 @@ Future<ErrorOr<REPLY_TYPE(Req)>> tryGetReplyFromHostname(Req request, Hostname h
resetReply(request);
if (reply.getError().code() == error_code_request_maybe_delivered) {
// Connection failure.
INetworkConnections::net()->removeCachedDNS(hostname.host, hostname.service);
removeCachedDNS(hostname.host, hostname.service);
}
}
return reply;
@ -122,7 +126,7 @@ Future<ErrorOr<REPLY_TYPE(Req)>> tryGetReplyFromHostname(Req request,
resetReply(request);
if (reply.getError().code() == error_code_request_maybe_delivered) {
// Connection failure.
INetworkConnections::net()->removeCachedDNS(hostname.host, hostname.service);
removeCachedDNS(hostname.host, hostname.service);
}
}
return reply;
@ -147,7 +151,7 @@ Future<REPLY_TYPE(Req)> retryGetReplyFromHostname(Req request, Hostname hostname
// Connection failure.
wait(delay(reconnectInterval));
reconnectInterval = std::min(2 * reconnectInterval, FLOW_KNOBS->HOSTNAME_RECONNECT_MAX_INTERVAL);
INetworkConnections::net()->removeCachedDNS(hostname.host, hostname.service);
removeCachedDNS(hostname.host, hostname.service);
} else {
throw reply.getError();
}
@ -180,7 +184,7 @@ Future<REPLY_TYPE(Req)> retryGetReplyFromHostname(Req request,
wait(delay(reconnectInitInterval));
reconnectInitInterval =
std::min(2 * reconnectInitInterval, FLOW_KNOBS->HOSTNAME_RECONNECT_MAX_INTERVAL);
INetworkConnections::net()->removeCachedDNS(hostname.host, hostname.service);
removeCachedDNS(hostname.host, hostname.service);
} else {
throw reply.getError();
}

View File

@ -41,9 +41,11 @@
#include "fdbrpc/ReplicationPolicy.h"
#include "fdbrpc/TokenSign.h"
#include "flow/IUDPSocket.h"
#include "flow/IConnection.h"
enum ClogMode { ClogDefault, ClogAll, ClogSend, ClogReceive };
struct ValidationData {
// global validation that missing refreshed feeds were previously destroyed
std::unordered_set<std::string> allDestroyedChangeFeedIDs;

View File

@ -55,6 +55,7 @@
#include "flow/FaultInjection.h"
#include "flow/TaskQueue.h"
#include "flow/IUDPSocket.h"
#include "flow/IConnection.h"
#include "flow/actorcompiler.h" // This must be the last #include.
ISimulator* g_simulator = nullptr;

View File

@ -33,6 +33,7 @@
#endif
#include "flow/network.h"
#include "flow/IUDPSocket.h"
#include "flow/IConnection.h"
#include "flow/actorcompiler.h"
UDPMetricClient::UDPMetricClient()

View File

@ -41,6 +41,7 @@
#include "flow/flow.h"
#include "flow/network.h"
#include "flow/IUDPSocket.h"
#include "flow/IConnection.h"
#include "flow/actorcompiler.h" // This must be the last #include.
struct MetricsRule {

View File

@ -35,6 +35,7 @@
#include "flow/Error.h"
#include "flow/FastRef.h"
#include "flow/IAsyncFile.h"
#include "flow/IConnection.h"
#include "flow/IRandom.h"
#include "flow/Platform.h"
#include "flow/Trace.h"

View File

@ -50,8 +50,9 @@
#include "flow/TypeTraits.h"
#include "flow/FaultInjection.h"
#include "flow/CodeProbeUtils.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include "fdbserver/SimulatedCluster.h"
#include "flow/IConnection.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#undef max
#undef min

View File

@ -25,6 +25,7 @@
#include "flow/UnitTest.h"
#include <inttypes.h>
#include "flow/IConnection.h"
#include "flow/actorcompiler.h" // This must be the last #include.
constexpr int WLTOKEN_NETWORKTEST = WLTOKEN_FIRST_AVAILABLE;

View File

@ -24,6 +24,7 @@
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbrpc/simulator.h"
#include "boost/algorithm/string/predicate.hpp"
#include "flow/IConnection.h"
#undef state
#include "fdbclient/SimpleIni.h"

View File

@ -35,7 +35,7 @@
#include <memory>
#include <functional>
#include "flow/IUDPSocket.h"
#include "flow/IConnection.h"
#include "flow/actorcompiler.h" // has to be last include
namespace {

View File

@ -22,6 +22,7 @@
#include <regex>
#include "flow/IConnection.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // has to be last include

View File

@ -57,6 +57,7 @@
#include "flow/UnitTest.h"
#include "flow/ScopeExit.h"
#include "flow/IUDPSocket.h"
#include "flow/IConnection.h"
#ifdef ADDRESS_SANITIZER
#include <sanitizer/lsan_interface.h>

View File

@ -44,6 +44,7 @@
#include <boost/filesystem.hpp>
#include <boost/filesystem/operations.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/asio.hpp>
#include "fmt/format.h"

View File

@ -0,0 +1,202 @@
/*
* IConnection.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FLOW_ICONNECTION_H
#define FLOW_ICONNECTION_H
#include <cstdint>
#include <limits>
#include <boost/asio/ip/tcp.hpp>
#include "flow/NetworkAddress.h"
class Void;
template <typename T>
class Future;
class IConnection {
public:
// IConnection is reference-counted (use Reference<IConnection>), but the caller must explicitly call close()
virtual void addref() = 0;
virtual void delref() = 0;
// Closes the underlying connection eventually if it is not already closed.
virtual void close() = 0;
virtual Future<Void> acceptHandshake() = 0;
virtual Future<Void> connectHandshake() = 0;
// Precondition: write() has been called and last returned 0
// returns when write() can write at least one byte (or may throw an error if the connection dies)
virtual Future<Void> onWritable() = 0;
// Precondition: read() has been called and last returned 0
// returns when read() can read at least one byte (or may throw an error if the connection dies)
virtual Future<Void> onReadable() = 0;
// Reads as many bytes as possible from the read buffer into [begin,end) and returns the number of bytes read (might
// be 0) (or may throw an error if the connection dies)
virtual int read(uint8_t* begin, uint8_t* end) = 0;
// Writes as many bytes as possible from the given SendBuffer chain into the write buffer and returns the number of
// bytes written (might be 0) (or may throw an error if the connection dies) The SendBuffer chain cannot be empty,
// and the limit must be positive. Important non-obvious behavior: The caller is committing to write the contents
// of the buffer chain up to the limit. If all of those bytes could not be sent in this call to write() then
// further calls must be made to write the remainder. An IConnection implementation can make decisions based on the
// entire byte set that the caller was attempting to write even if it is unable to write all of it immediately. Due
// to limitations of TLSConnection, callers must also avoid reallocations that reduce the amount of written data in
// the first buffer in the chain.
virtual int write(SendBuffer const* buffer, int limit = std::numeric_limits<int>::max()) = 0;
// Returns the network address and port of the other end of the connection. In the case of an incoming connection,
// this may not be an address we can connect to!
virtual NetworkAddress getPeerAddress() const = 0;
// Returns whether the peer is trusted.
// For TLS-enabled connections, this is true if the peer has presented a valid chain of certificates trusted by the
// local endpoint. For non-TLS connections this is always true for any valid open connection.
virtual bool hasTrustedPeer() const = 0;
virtual UID getDebugID() const = 0;
// At present, implemented by Sim2Conn where we want to disable bits flip for connections between parent process and
// child process, also reduce latency for this kind of connection
virtual bool isStableConnection() const { throw unsupported_operation(); }
virtual boost::asio::ip::tcp::socket& getSocket() = 0;
};
// forward declare SendBuffer, declared in serialize.h
class SendBuffer;
class IListener {
public:
virtual void addref() = 0;
virtual void delref() = 0;
// Returns one incoming connection when it is available. Do not cancel unless you are done with the listener!
virtual Future<Reference<IConnection>> accept() = 0;
virtual NetworkAddress getListenAddress() const = 0;
};
// DNSCache is a class maintaining a <hostname, vector<NetworkAddress>> mapping.
class DNSCache {
public:
DNSCache() = default;
explicit DNSCache(const std::map<std::string, std::vector<NetworkAddress>>& dnsCache)
: hostnameToAddresses(dnsCache) {}
Optional<std::vector<NetworkAddress>> find(const std::string& host, const std::string& service);
void add(const std::string& host, const std::string& service, const std::vector<NetworkAddress>& addresses);
void remove(const std::string& host, const std::string& service);
void clear();
// Convert hostnameToAddresses to string. The format is:
// hostname1,host1Address1,host1Address2;hostname2,host2Address1,host2Address2...
std::string toString();
static DNSCache parseFromString(const std::string& s);
private:
std::map<std::string, std::vector<NetworkAddress>> hostnameToAddresses;
};
class IUDPSocket;
class INetworkConnections {
public:
// Methods for making and accepting network connections. Logically this is part of the INetwork abstraction
// that abstracts all interaction with the physical world; it is separated out to make it easy for e.g. transport
// security to override only these operations without having to delegate everything in INetwork.
// Make an outgoing connection to the given address. May return an error or block indefinitely in case of
// connection problems!
virtual Future<Reference<IConnection>> connect(NetworkAddress toAddr,
boost::asio::ip::tcp::socket* existingSocket = nullptr) = 0;
virtual Future<Reference<IConnection>> connectExternal(NetworkAddress toAddr) = 0;
// Make an outgoing udp connection and connect to the passed address.
virtual Future<Reference<IUDPSocket>> createUDPSocket(NetworkAddress toAddr) = 0;
// Make an outgoing udp connection without establishing a connection
virtual Future<Reference<IUDPSocket>> createUDPSocket(bool isV6 = false) = 0;
virtual void addMockTCPEndpoint(const std::string& host,
const std::string& service,
const std::vector<NetworkAddress>& addresses) = 0;
virtual void removeMockTCPEndpoint(const std::string& host, const std::string& service) = 0;
virtual void parseMockDNSFromString(const std::string& s) = 0;
virtual std::string convertMockDNSToString() = 0;
// Resolve host name and service name (such as "http" or can be a plain number like "80") to a list of 1 or more
// NetworkAddresses
virtual Future<std::vector<NetworkAddress>> resolveTCPEndpoint(const std::string& host,
const std::string& service) = 0;
// Similar to resolveTCPEndpoint(), except that this one uses DNS cache.
virtual Future<std::vector<NetworkAddress>> resolveTCPEndpointWithDNSCache(const std::string& host,
const std::string& service) = 0;
// Resolve host name and service name. This one should only be used when resolving asynchronously is impossible. For
// all other cases, resolveTCPEndpoint() should be preferred.
virtual std::vector<NetworkAddress> resolveTCPEndpointBlocking(const std::string& host,
const std::string& service) = 0;
// Resolve host name and service name with DNS cache. This one should only be used when resolving asynchronously is
// impossible. For all other cases, resolveTCPEndpointWithDNSCache() should be preferred.
virtual std::vector<NetworkAddress> resolveTCPEndpointBlockingWithDNSCache(const std::string& host,
const std::string& service) = 0;
// Convenience function to resolve host/service and connect to one of its NetworkAddresses randomly
// isTLS has to be a parameter here because it is passed to connect() as part of the toAddr object.
virtual Future<Reference<IConnection>> connect(const std::string& host,
const std::string& service,
bool isTLS = false);
// Listen for connections on the given local address
virtual Reference<IListener> listen(NetworkAddress localAddr) = 0;
static INetworkConnections* net() {
return static_cast<INetworkConnections*>((void*)g_network->global(INetwork::enNetworkConnections));
}
// If a DNS name can be resolved to both and IPv4 and IPv6 addresses, we want IPv6 addresses when running the
// clusters on IPv6.
// This function takes a vector of addresses and return a random one, preferring IPv6 over IPv4.
static NetworkAddress pickOneAddress(const std::vector<NetworkAddress>& addresses) {
std::vector<NetworkAddress> ipV6Addresses;
for (const NetworkAddress& addr : addresses) {
if (addr.isV6()) {
ipV6Addresses.push_back(addr);
}
}
if (ipV6Addresses.size() > 0) {
return ipV6Addresses[deterministicRandom()->randomInt(0, ipV6Addresses.size())];
}
return addresses[deterministicRandom()->randomInt(0, addresses.size())];
}
void removeCachedDNS(const std::string& host, const std::string& service) { dnsCache.remove(host, service); }
DNSCache dnsCache;
// Returns the interface that should be used to make and accept socket connections
};
#endif // FLOW_ICONNECTION_H

View File

@ -0,0 +1,146 @@
#ifndef FLOW_NETWORKADDRESS_H
#define FLOW_NETWORKADDRESS_H
#include "flow/BooleanParam.h"
#include "flow/Trace.h"
#include "flow/IPAddress.h"
FDB_DECLARE_BOOLEAN_PARAM(NetworkAddressFromHostname);
struct NetworkAddress {
constexpr static FileIdentifier file_identifier = 14155727;
// A NetworkAddress identifies a particular running server (i.e. a TCP endpoint).
IPAddress ip;
uint16_t port;
uint16_t flags;
bool fromHostname;
enum { FLAG_PRIVATE = 1, FLAG_TLS = 2 };
NetworkAddress()
: ip(IPAddress(0)), port(0), flags(FLAG_PRIVATE), fromHostname(NetworkAddressFromHostname::False) {}
NetworkAddress(const IPAddress& address,
uint16_t port,
bool isPublic,
bool isTLS,
NetworkAddressFromHostname fromHostname = NetworkAddressFromHostname::False)
: ip(address), port(port), flags((isPublic ? 0 : FLAG_PRIVATE) | (isTLS ? FLAG_TLS : 0)),
fromHostname(fromHostname) {}
NetworkAddress(uint32_t ip,
uint16_t port,
bool isPublic,
bool isTLS,
NetworkAddressFromHostname fromHostname = NetworkAddressFromHostname::False)
: NetworkAddress(IPAddress(ip), port, isPublic, isTLS, fromHostname) {}
NetworkAddress(uint32_t ip, uint16_t port)
: NetworkAddress(ip, port, false, false, NetworkAddressFromHostname::False) {}
NetworkAddress(const IPAddress& ip, uint16_t port)
: NetworkAddress(ip, port, false, false, NetworkAddressFromHostname::False) {}
bool operator==(NetworkAddress const& r) const { return ip == r.ip && port == r.port && flags == r.flags; }
bool operator!=(NetworkAddress const& r) const { return !(*this == r); }
bool operator<(NetworkAddress const& r) const {
if (flags != r.flags)
return flags < r.flags;
else if (ip != r.ip)
return ip < r.ip;
return port < r.port;
}
bool operator>(NetworkAddress const& r) const { return r < *this; }
bool operator<=(NetworkAddress const& r) const { return !(*this > r); }
bool operator>=(NetworkAddress const& r) const { return !(*this < r); }
bool isValid() const { return ip.isValid() || port != 0; }
bool isPublic() const { return !(flags & FLAG_PRIVATE); }
bool isTLS() const { return (flags & FLAG_TLS) != 0; }
bool isV6() const { return ip.isV6(); }
size_t hash() const {
size_t result = 0;
if (ip.isV6()) {
uint16_t* ptr = (uint16_t*)ip.toV6().data();
result = ((size_t)ptr[5] << 32) | ((size_t)ptr[6] << 16) | ptr[7];
} else {
result = ip.toV4();
}
return (result << 16) + port;
}
static NetworkAddress parse(std::string const&); // May throw connection_string_invalid
static Optional<NetworkAddress> parseOptional(std::string const&);
static std::vector<NetworkAddress> parseList(std::string const&);
std::string toString() const;
template <class Ar>
void serialize(Ar& ar) {
if constexpr (is_fb_function<Ar>) {
serializer(ar, ip, port, flags, fromHostname);
} else {
if (ar.isDeserializing && !ar.protocolVersion().hasIPv6()) {
uint32_t ipV4;
serializer(ar, ipV4, port, flags);
ip = IPAddress(ipV4);
} else {
serializer(ar, ip, port, flags);
}
if (ar.protocolVersion().hasNetworkAddressHostnameFlag()) {
serializer(ar, fromHostname);
}
}
}
};
template <>
struct Traceable<NetworkAddress> : std::true_type {
static std::string toString(const NetworkAddress& value) { return value.toString(); }
};
namespace std {
template <>
struct hash<NetworkAddress> {
size_t operator()(const NetworkAddress& na) const { return na.hash(); }
};
} // namespace std
struct NetworkAddressList {
NetworkAddress address;
Optional<NetworkAddress> secondaryAddress{};
bool operator==(NetworkAddressList const& r) const {
return address == r.address && secondaryAddress == r.secondaryAddress;
}
bool operator!=(NetworkAddressList const& r) const {
return address != r.address || secondaryAddress != r.secondaryAddress;
}
bool operator<(NetworkAddressList const& r) const {
if (address != r.address)
return address < r.address;
return secondaryAddress < r.secondaryAddress;
}
NetworkAddress getTLSAddress() const {
if (!secondaryAddress.present() || address.isTLS()) {
return address;
}
return secondaryAddress.get();
}
std::string toString() const {
if (!secondaryAddress.present()) {
return address.toString();
}
return address.toString() + ", " + secondaryAddress.get().toString();
}
bool contains(const NetworkAddress& r) const {
return address == r || (secondaryAddress.present() && secondaryAddress.get() == r);
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, address, secondaryAddress);
}
};
#endif // FLOW_NETWORKADDRESS_H

View File

@ -22,159 +22,19 @@
#define FLOW_OPENNETWORK_H
#pragma once
#include "flow/NetworkAddress.h"
#include "flow/IPAddress.h"
#include "flow/TaskPriority.h"
#include <string>
#include <stdint.h>
#include <atomic>
#include <boost/asio/ip/tcp.hpp>
#include "flow/Arena.h"
#include "flow/BooleanParam.h"
#include "flow/IRandom.h"
#include "flow/ProtocolVersion.h"
#include "flow/Trace.h"
#include "flow/TaskPriority.h"
#include "flow/WriteOnlySet.h"
#include "flow/IPAddress.h"
class Void;
FDB_DECLARE_BOOLEAN_PARAM(NetworkAddressFromHostname);
struct NetworkAddress {
constexpr static FileIdentifier file_identifier = 14155727;
// A NetworkAddress identifies a particular running server (i.e. a TCP endpoint).
IPAddress ip;
uint16_t port;
uint16_t flags;
bool fromHostname;
enum { FLAG_PRIVATE = 1, FLAG_TLS = 2 };
NetworkAddress()
: ip(IPAddress(0)), port(0), flags(FLAG_PRIVATE), fromHostname(NetworkAddressFromHostname::False) {}
NetworkAddress(const IPAddress& address,
uint16_t port,
bool isPublic,
bool isTLS,
NetworkAddressFromHostname fromHostname = NetworkAddressFromHostname::False)
: ip(address), port(port), flags((isPublic ? 0 : FLAG_PRIVATE) | (isTLS ? FLAG_TLS : 0)),
fromHostname(fromHostname) {}
NetworkAddress(uint32_t ip,
uint16_t port,
bool isPublic,
bool isTLS,
NetworkAddressFromHostname fromHostname = NetworkAddressFromHostname::False)
: NetworkAddress(IPAddress(ip), port, isPublic, isTLS, fromHostname) {}
NetworkAddress(uint32_t ip, uint16_t port)
: NetworkAddress(ip, port, false, false, NetworkAddressFromHostname::False) {}
NetworkAddress(const IPAddress& ip, uint16_t port)
: NetworkAddress(ip, port, false, false, NetworkAddressFromHostname::False) {}
bool operator==(NetworkAddress const& r) const { return ip == r.ip && port == r.port && flags == r.flags; }
bool operator!=(NetworkAddress const& r) const { return !(*this == r); }
bool operator<(NetworkAddress const& r) const {
if (flags != r.flags)
return flags < r.flags;
else if (ip != r.ip)
return ip < r.ip;
return port < r.port;
}
bool operator>(NetworkAddress const& r) const { return r < *this; }
bool operator<=(NetworkAddress const& r) const { return !(*this > r); }
bool operator>=(NetworkAddress const& r) const { return !(*this < r); }
bool isValid() const { return ip.isValid() || port != 0; }
bool isPublic() const { return !(flags & FLAG_PRIVATE); }
bool isTLS() const { return (flags & FLAG_TLS) != 0; }
bool isV6() const { return ip.isV6(); }
size_t hash() const {
size_t result = 0;
if (ip.isV6()) {
uint16_t* ptr = (uint16_t*)ip.toV6().data();
result = ((size_t)ptr[5] << 32) | ((size_t)ptr[6] << 16) | ptr[7];
} else {
result = ip.toV4();
}
return (result << 16) + port;
}
static NetworkAddress parse(std::string const&); // May throw connection_string_invalid
static Optional<NetworkAddress> parseOptional(std::string const&);
static std::vector<NetworkAddress> parseList(std::string const&);
std::string toString() const;
template <class Ar>
void serialize(Ar& ar) {
if constexpr (is_fb_function<Ar>) {
serializer(ar, ip, port, flags, fromHostname);
} else {
if (ar.isDeserializing && !ar.protocolVersion().hasIPv6()) {
uint32_t ipV4;
serializer(ar, ipV4, port, flags);
ip = IPAddress(ipV4);
} else {
serializer(ar, ip, port, flags);
}
if (ar.protocolVersion().hasNetworkAddressHostnameFlag()) {
serializer(ar, fromHostname);
}
}
}
};
template <>
struct Traceable<NetworkAddress> : std::true_type {
static std::string toString(const NetworkAddress& value) { return value.toString(); }
};
namespace std {
template <>
struct hash<NetworkAddress> {
size_t operator()(const NetworkAddress& na) const { return na.hash(); }
};
} // namespace std
struct NetworkAddressList {
NetworkAddress address;
Optional<NetworkAddress> secondaryAddress{};
bool operator==(NetworkAddressList const& r) const {
return address == r.address && secondaryAddress == r.secondaryAddress;
}
bool operator!=(NetworkAddressList const& r) const {
return address != r.address || secondaryAddress != r.secondaryAddress;
}
bool operator<(NetworkAddressList const& r) const {
if (address != r.address)
return address < r.address;
return secondaryAddress < r.secondaryAddress;
}
NetworkAddress getTLSAddress() const {
if (!secondaryAddress.present() || address.isTLS()) {
return address;
}
return secondaryAddress.get();
}
std::string toString() const {
if (!secondaryAddress.present()) {
return address.toString();
}
return address.toString() + ", " + secondaryAddress.get().toString();
}
bool contains(const NetworkAddress& r) const {
return address == r || (secondaryAddress.present() && secondaryAddress.get() == r);
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, address, secondaryAddress);
}
};
std::string toIPVectorString(std::vector<uint32_t> ips);
std::string toIPVectorString(const std::vector<IPAddress>& ips);
std::string formatIpPort(const IPAddress& ip, uint16_t port);
@ -260,72 +120,6 @@ public:
virtual Future<int64_t> read() = 0;
};
// forward declare SendBuffer, declared in serialize.h
class SendBuffer;
class IConnection {
public:
// IConnection is reference-counted (use Reference<IConnection>), but the caller must explicitly call close()
virtual void addref() = 0;
virtual void delref() = 0;
// Closes the underlying connection eventually if it is not already closed.
virtual void close() = 0;
virtual Future<Void> acceptHandshake() = 0;
virtual Future<Void> connectHandshake() = 0;
// Precondition: write() has been called and last returned 0
// returns when write() can write at least one byte (or may throw an error if the connection dies)
virtual Future<Void> onWritable() = 0;
// Precondition: read() has been called and last returned 0
// returns when read() can read at least one byte (or may throw an error if the connection dies)
virtual Future<Void> onReadable() = 0;
// Reads as many bytes as possible from the read buffer into [begin,end) and returns the number of bytes read (might
// be 0) (or may throw an error if the connection dies)
virtual int read(uint8_t* begin, uint8_t* end) = 0;
// Writes as many bytes as possible from the given SendBuffer chain into the write buffer and returns the number of
// bytes written (might be 0) (or may throw an error if the connection dies) The SendBuffer chain cannot be empty,
// and the limit must be positive. Important non-obvious behavior: The caller is committing to write the contents
// of the buffer chain up to the limit. If all of those bytes could not be sent in this call to write() then
// further calls must be made to write the remainder. An IConnection implementation can make decisions based on the
// entire byte set that the caller was attempting to write even if it is unable to write all of it immediately. Due
// to limitations of TLSConnection, callers must also avoid reallocations that reduce the amount of written data in
// the first buffer in the chain.
virtual int write(SendBuffer const* buffer, int limit = std::numeric_limits<int>::max()) = 0;
// Returns the network address and port of the other end of the connection. In the case of an incoming connection,
// this may not be an address we can connect to!
virtual NetworkAddress getPeerAddress() const = 0;
// Returns whether the peer is trusted.
// For TLS-enabled connections, this is true if the peer has presented a valid chain of certificates trusted by the
// local endpoint. For non-TLS connections this is always true for any valid open connection.
virtual bool hasTrustedPeer() const = 0;
virtual UID getDebugID() const = 0;
// At present, implemented by Sim2Conn where we want to disable bits flip for connections between parent process and
// child process, also reduce latency for this kind of connection
virtual bool isStableConnection() const { throw unsupported_operation(); }
virtual boost::asio::ip::tcp::socket& getSocket() = 0;
};
class IListener {
public:
virtual void addref() = 0;
virtual void delref() = 0;
// Returns one incoming connection when it is available. Do not cancel unless you are done with the listener!
virtual Future<Reference<IConnection>> accept() = 0;
virtual NetworkAddress getListenAddress() const = 0;
};
typedef void* flowGlobalType;
typedef NetworkAddress (*NetworkAddressFuncPtr)();
@ -485,103 +279,5 @@ protected:
~INetwork() {} // Please don't try to delete through this interface!
};
// DNSCache is a class maintaining a <hostname, vector<NetworkAddress>> mapping.
class DNSCache {
public:
DNSCache() = default;
explicit DNSCache(const std::map<std::string, std::vector<NetworkAddress>>& dnsCache)
: hostnameToAddresses(dnsCache) {}
Optional<std::vector<NetworkAddress>> find(const std::string& host, const std::string& service);
void add(const std::string& host, const std::string& service, const std::vector<NetworkAddress>& addresses);
void remove(const std::string& host, const std::string& service);
void clear();
// Convert hostnameToAddresses to string. The format is:
// hostname1,host1Address1,host1Address2;hostname2,host2Address1,host2Address2...
std::string toString();
static DNSCache parseFromString(const std::string& s);
private:
std::map<std::string, std::vector<NetworkAddress>> hostnameToAddresses;
};
class IUDPSocket;
class INetworkConnections {
public:
// Methods for making and accepting network connections. Logically this is part of the INetwork abstraction
// that abstracts all interaction with the physical world; it is separated out to make it easy for e.g. transport
// security to override only these operations without having to delegate everything in INetwork.
// Make an outgoing connection to the given address. May return an error or block indefinitely in case of
// connection problems!
virtual Future<Reference<IConnection>> connect(NetworkAddress toAddr,
boost::asio::ip::tcp::socket* existingSocket = nullptr) = 0;
virtual Future<Reference<IConnection>> connectExternal(NetworkAddress toAddr) = 0;
// Make an outgoing udp connection and connect to the passed address.
virtual Future<Reference<IUDPSocket>> createUDPSocket(NetworkAddress toAddr) = 0;
// Make an outgoing udp connection without establishing a connection
virtual Future<Reference<IUDPSocket>> createUDPSocket(bool isV6 = false) = 0;
virtual void addMockTCPEndpoint(const std::string& host,
const std::string& service,
const std::vector<NetworkAddress>& addresses) = 0;
virtual void removeMockTCPEndpoint(const std::string& host, const std::string& service) = 0;
virtual void parseMockDNSFromString(const std::string& s) = 0;
virtual std::string convertMockDNSToString() = 0;
// Resolve host name and service name (such as "http" or can be a plain number like "80") to a list of 1 or more
// NetworkAddresses
virtual Future<std::vector<NetworkAddress>> resolveTCPEndpoint(const std::string& host,
const std::string& service) = 0;
// Similar to resolveTCPEndpoint(), except that this one uses DNS cache.
virtual Future<std::vector<NetworkAddress>> resolveTCPEndpointWithDNSCache(const std::string& host,
const std::string& service) = 0;
// Resolve host name and service name. This one should only be used when resolving asynchronously is impossible. For
// all other cases, resolveTCPEndpoint() should be preferred.
virtual std::vector<NetworkAddress> resolveTCPEndpointBlocking(const std::string& host,
const std::string& service) = 0;
// Resolve host name and service name with DNS cache. This one should only be used when resolving asynchronously is
// impossible. For all other cases, resolveTCPEndpointWithDNSCache() should be preferred.
virtual std::vector<NetworkAddress> resolveTCPEndpointBlockingWithDNSCache(const std::string& host,
const std::string& service) = 0;
// Convenience function to resolve host/service and connect to one of its NetworkAddresses randomly
// isTLS has to be a parameter here because it is passed to connect() as part of the toAddr object.
virtual Future<Reference<IConnection>> connect(const std::string& host,
const std::string& service,
bool isTLS = false);
// Listen for connections on the given local address
virtual Reference<IListener> listen(NetworkAddress localAddr) = 0;
static INetworkConnections* net() {
return static_cast<INetworkConnections*>((void*)g_network->global(INetwork::enNetworkConnections));
}
// If a DNS name can be resolved to both and IPv4 and IPv6 addresses, we want IPv6 addresses when running the
// clusters on IPv6.
// This function takes a vector of addresses and return a random one, preferring IPv6 over IPv4.
static NetworkAddress pickOneAddress(const std::vector<NetworkAddress>& addresses) {
std::vector<NetworkAddress> ipV6Addresses;
for (const NetworkAddress& addr : addresses) {
if (addr.isV6()) {
ipV6Addresses.push_back(addr);
}
}
if (ipV6Addresses.size() > 0) {
return ipV6Addresses[deterministicRandom()->randomInt(0, ipV6Addresses.size())];
}
return addresses[deterministicRandom()->randomInt(0, addresses.size())];
}
void removeCachedDNS(const std::string& host, const std::string& service) { dnsCache.remove(host, service); }
DNSCache dnsCache;
// Returns the interface that should be used to make and accept socket connections
};
#endif

View File

@ -28,6 +28,7 @@
#include "flow/flow.h"
#include "flow/ChaosMetrics.h"
#include "flow/UnitTest.h"
#include "flow/IConnection.h"
ChaosMetrics::ChaosMetrics() {
clear();

View File

@ -6,6 +6,7 @@
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/time.h>
#include "flow/rte_memcpy.h"
#include "flow/IRandom.h"