Do not throw lookup_failed when resolving fails.

Instead, return an empty Optional<NetworkAddress>. For resolveWithRetry(), still return NetworkAddress because it retries until succeed.
This commit is contained in:
Renxuan Wang 2022-04-08 11:05:57 -07:00 committed by Zhe Wu
parent 0f894509d9
commit 938e8ed996
5 changed files with 87 additions and 71 deletions

View File

@ -364,6 +364,32 @@ TEST_CASE("/fdbclient/MonitorLeader/ConnectionString") {
return Void();
}
TEST_CASE("/fdbclient/MonitorLeader/PartialResolve") {
std::string connectionString = "TestCluster:0@host.name:1234,host-name:5678";
std::string hn = "host-name", port = "5678";
state NetworkAddress address = NetworkAddress::parse("1.0.0.0:5678");
INetworkConnections::net()->addMockTCPEndpoint(hn, port, { address });
state ClusterConnectionString cs(connectionString);
state std::unordered_set<NetworkAddress> coordinatorAddresses;
std::vector<Future<Void>> fs;
for (auto& hostname : cs.hostnames) {
fs.push_back(map(hostname.resolve(), [&](Optional<NetworkAddress> const& addr) -> Void {
if (addr.present()) {
coordinatorAddresses.insert(addr.get());
}
return Void();
}));
}
wait(waitForAll(fs));
ASSERT(coordinatorAddresses.size() == 1 && coordinatorAddresses.count(address) == 1);
return Void();
}
TEST_CASE("/flow/FlatBuffers/LeaderInfo") {
{
LeaderInfo in;

View File

@ -80,22 +80,21 @@ Future<ErrorOr<REPLY_TYPE(Req)>> tryGetReplyFromHostname(RequestStream<Req>* to,
// A wrapper of tryGetReply(request), except that the request is sent to an address resolved from a hostname.
// If resolving fails, return lookup_failed().
// Otherwise, return tryGetReply(request).
try {
NetworkAddress address = wait(hostname.resolve());
*to = RequestStream<Req>(Endpoint::wellKnown({ address }, token));
ErrorOr<REPLY_TYPE(Req)> reply = wait(to->tryGetReply(request));
if (reply.isError()) {
resetReply(request);
if (reply.getError().code() == error_code_request_maybe_delivered) {
// Connection failure.
hostname.resetToUnresolved();
INetworkConnections::net()->removeCachedDNS(hostname.host, hostname.service);
}
}
return reply;
} catch (...) {
Optional<NetworkAddress> address = wait(hostname.resolve());
if (!address.present()) {
return ErrorOr<REPLY_TYPE(Req)>(lookup_failed());
}
*to = RequestStream<Req>(Endpoint::wellKnown({ address.get() }, token));
ErrorOr<REPLY_TYPE(Req)> reply = wait(to->tryGetReply(request));
if (reply.isError()) {
resetReply(request);
if (reply.getError().code() == error_code_request_maybe_delivered) {
// Connection failure.
hostname.resetToUnresolved();
INetworkConnections::net()->removeCachedDNS(hostname.host, hostname.service);
}
}
return reply;
}
ACTOR template <class Req>
@ -107,22 +106,21 @@ Future<ErrorOr<REPLY_TYPE(Req)>> tryGetReplyFromHostname(RequestStream<Req>* to,
// A wrapper of tryGetReply(request), except that the request is sent to an address resolved from a hostname.
// If resolving fails, return lookup_failed().
// Otherwise, return tryGetReply(request).
try {
NetworkAddress address = wait(hostname.resolve());
*to = RequestStream<Req>(Endpoint::wellKnown({ address }, token));
ErrorOr<REPLY_TYPE(Req)> reply = wait(to->tryGetReply(request, taskID));
if (reply.isError()) {
resetReply(request);
if (reply.getError().code() == error_code_request_maybe_delivered) {
// Connection failure.
hostname.resetToUnresolved();
INetworkConnections::net()->removeCachedDNS(hostname.host, hostname.service);
}
}
return reply;
} catch (...) {
Optional<NetworkAddress> address = wait(hostname.resolve());
if (!address.present()) {
return ErrorOr<REPLY_TYPE(Req)>(lookup_failed());
}
*to = RequestStream<Req>(Endpoint::wellKnown({ address.get() }, token));
ErrorOr<REPLY_TYPE(Req)> reply = wait(to->tryGetReply(request, taskID));
if (reply.isError()) {
resetReply(request);
if (reply.getError().code() == error_code_request_maybe_delivered) {
// Connection failure.
hostname.resetToUnresolved();
INetworkConnections::net()->removeCachedDNS(hostname.host, hostname.service);
}
}
return reply;
}
ACTOR template <class Req>
@ -134,7 +132,7 @@ Future<REPLY_TYPE(Req)> retryGetReplyFromHostname(RequestStream<Req>* to,
// Suitable for use with hostname, where RequestStream is NOT initialized yet.
// Not normally useful for endpoints initialized with NetworkAddress.
loop {
state NetworkAddress address = wait(hostname.resolveWithRetry());
NetworkAddress address = wait(hostname.resolveWithRetry());
*to = RequestStream<Req>(Endpoint::wellKnown({ address }, token));
ErrorOr<REPLY_TYPE(Req)> reply = wait(to->tryGetReply(request));
if (reply.isError()) {
@ -162,7 +160,7 @@ Future<REPLY_TYPE(Req)> retryGetReplyFromHostname(RequestStream<Req>* to,
// Suitable for use with hostname, where RequestStream is NOT initialized yet.
// Not normally useful for endpoints initialized with NetworkAddress.
loop {
state NetworkAddress address = wait(hostname.resolveWithRetry());
NetworkAddress address = wait(hostname.resolveWithRetry());
*to = RequestStream<Req>(Endpoint::wellKnown({ address }, token));
ErrorOr<REPLY_TYPE(Req)> reply = wait(to->tryGetReply(request, taskID));
if (reply.isError()) {

View File

@ -2549,15 +2549,14 @@ ACTOR Future<Void> clusterControllerCore(Reference<IClusterConnectionRecord> con
state std::unordered_set<NetworkAddress> coordinatorAddresses;
std::vector<Future<Void>> fs;
for (auto& hostname : ccs.hostnames) {
fs.push_back(map(hostname.resolve(), [&](NetworkAddress const& addr) -> Void {
coordinatorAddresses.insert(addr);
fs.push_back(map(hostname.resolve(), [&](Optional<NetworkAddress> const& addr) -> Void {
if (addr.present()) {
coordinatorAddresses.insert(addr.get());
}
return Void();
}));
}
try {
wait(waitForAll(fs));
} catch (...) {
}
wait(waitForAll(fs));
for (const auto& coord : ccs.coordinators()) {
coordinatorAddresses.insert(coord);

View File

@ -46,7 +46,7 @@ void Hostname::resetToUnresolved() {
}
}
ACTOR Future<NetworkAddress> resolveImpl(Hostname* self) {
ACTOR Future<Optional<NetworkAddress>> resolveImpl(Hostname* self) {
loop {
if (self->status == Hostname::UNRESOLVED) {
self->status = Hostname::RESOLVING;
@ -67,7 +67,7 @@ ACTOR Future<NetworkAddress> resolveImpl(Hostname* self) {
self->status = Hostname::UNRESOLVED;
self->resolveFinish.trigger();
self->resolvedAddress = Optional<NetworkAddress>();
throw lookup_failed();
return Optional<NetworkAddress>();
}
} else if (self->status == Hostname::RESOLVING) {
wait(self->resolveFinish.onTrigger());
@ -86,18 +86,19 @@ ACTOR Future<NetworkAddress> resolveImpl(Hostname* self) {
ACTOR Future<NetworkAddress> resolveWithRetryImpl(Hostname* self) {
loop {
try {
NetworkAddress address = wait(resolveImpl(self));
return address;
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) {
throw;
Optional<NetworkAddress> address = wait(resolveImpl(self));
if (address.present()) {
return address.get();
}
wait(delay(FLOW_KNOBS->HOSTNAME_RESOLVE_DELAY));
} catch (Error& e) {
ASSERT(e.code() == error_code_actor_cancelled);
throw;
}
}
}
Future<NetworkAddress> Hostname::resolve() {
Future<Optional<NetworkAddress>> Hostname::resolve() {
return resolveImpl(this);
}
@ -105,7 +106,7 @@ Future<NetworkAddress> Hostname::resolveWithRetry() {
return resolveWithRetryImpl(this);
}
NetworkAddress Hostname::resolveBlocking() {
Optional<NetworkAddress> Hostname::resolveBlocking() {
if (status != RESOLVED) {
try {
std::vector<NetworkAddress> addresses =
@ -118,14 +119,12 @@ NetworkAddress Hostname::resolveBlocking() {
}
resolvedAddress = address;
status = RESOLVED;
return resolvedAddress.get();
} catch (...) {
status = UNRESOLVED;
resolvedAddress = Optional<NetworkAddress>();
throw lookup_failed();
}
}
return resolvedAddress.get();
return resolvedAddress;
}
TEST_CASE("/flow/Hostname/hostname") {
@ -183,12 +182,8 @@ TEST_CASE("/flow/Hostname/hostname") {
ASSERT(hn3.status == Hostname::UNRESOLVED && !hn3.resolvedAddress.present());
ASSERT(hn4.status == Hostname::UNRESOLVED && !hn4.resolvedAddress.present());
try {
NetworkAddress _ = wait(hn2.resolve());
} catch (Error& e) {
ASSERT(e.code() == error_code_lookup_failed);
}
ASSERT(hn2.status == Hostname::UNRESOLVED && !hn2.resolvedAddress.present());
state Optional<NetworkAddress> emptyAddress = wait(hn2.resolve());
ASSERT(hn2.status == Hostname::UNRESOLVED && !hn2.resolvedAddress.present() && !emptyAddress.present());
try {
NetworkAddress _ = wait(timeoutError(hn2.resolveWithRetry(), 1));
@ -197,36 +192,34 @@ TEST_CASE("/flow/Hostname/hostname") {
}
ASSERT(hn2.status == Hostname::UNRESOLVED && !hn2.resolvedAddress.present());
try {
NetworkAddress _ = hn2.resolveBlocking();
} catch (Error& e) {
ASSERT(e.code() == error_code_lookup_failed);
}
ASSERT(hn2.status == Hostname::UNRESOLVED && !hn2.resolvedAddress.present());
emptyAddress = hn2.resolveBlocking();
ASSERT(hn2.status == Hostname::UNRESOLVED && !hn2.resolvedAddress.present() && !emptyAddress.present());
state NetworkAddress address = NetworkAddress::parse("127.0.0.0:1234");
INetworkConnections::net()->addMockTCPEndpoint("host-name", "1234", { address });
state NetworkAddress addressSource = NetworkAddress::parse("127.0.0.0:1234");
INetworkConnections::net()->addMockTCPEndpoint("host-name", "1234", { addressSource });
// Test resolve.
state NetworkAddress addr = wait(hn2.resolve());
state Optional<NetworkAddress> optionalAddress = wait(hn2.resolve());
ASSERT(hn2.status == Hostname::RESOLVED);
ASSERT(hn2.resolvedAddress.present() && hn2.resolvedAddress.get() == address && addr == address);
ASSERT(hn2.resolvedAddress.get() == addressSource && optionalAddress.get() == addressSource);
optionalAddress = Optional<NetworkAddress>();
// Test resolveWithRetry.
hn2.resetToUnresolved();
ASSERT(hn2.status == Hostname::UNRESOLVED && !hn2.resolvedAddress.present());
wait(store(addr, hn2.resolveWithRetry()));
state NetworkAddress address = wait(hn2.resolveWithRetry());
ASSERT(hn2.status == Hostname::RESOLVED);
ASSERT(hn2.resolvedAddress.present() && hn2.resolvedAddress.get() == address && addr == address);
ASSERT(hn2.resolvedAddress.get() == addressSource && address == addressSource);
// Test resolveBlocking.
hn2.resetToUnresolved();
ASSERT(hn2.status == Hostname::UNRESOLVED && !hn2.resolvedAddress.present());
addr = hn2.resolveBlocking();
optionalAddress = hn2.resolveBlocking();
ASSERT(hn2.status == Hostname::RESOLVED);
ASSERT(hn2.resolvedAddress.present() && hn2.resolvedAddress.get() == address && addr == address);
ASSERT(hn2.resolvedAddress.get() == addressSource && optionalAddress.get() == addressSource);
optionalAddress = Optional<NetworkAddress>();
return Void();
}

View File

@ -74,10 +74,10 @@ struct Hostname {
Optional<NetworkAddress> resolvedAddress;
enum HostnameStatus { UNRESOLVED, RESOLVING, RESOLVED };
Future<NetworkAddress> resolve();
Future<Optional<NetworkAddress>> resolve();
Future<NetworkAddress> resolveWithRetry();
NetworkAddress resolveBlocking(); // This one should only be used when resolving asynchronously is impossible.
// For all other cases, resolve() should be preferred.
Optional<NetworkAddress> resolveBlocking(); // This one should only be used when resolving asynchronously is
// impossible. For all other cases, resolve() should be preferred.
void resetToUnresolved();
HostnameStatus status = UNRESOLVED;
AsyncTrigger resolveFinish;