Address comments.

This commit is contained in:
Renxuan Wang 2022-01-25 18:53:23 -08:00
parent 481587a8c6
commit 8eb7a10404
5 changed files with 27 additions and 36 deletions

View File

@ -547,31 +547,31 @@ ClientLeaderRegInterface::ClientLeaderRegInterface(INetwork* local) {
// This function contacts a coordinator coord to ask who is its nominee.
// Note: for coordinators whose NetworkAddress is parsed out of a hostname, a connection failure will cause this actor
// to throw `coordinators_changed()` error
ACTOR Future<Void> monitorNominee(
Key key,
ClientLeaderRegInterface coord,
AsyncTrigger* nomineeChange,
Optional<LeaderInfo>* info,
Optional<Hostname> hostname = Optional<Hostname>(),
Reference<IClusterConnectionRecord> connRecord = Reference<IClusterConnectionRecord>()) {
ACTOR Future<Void> monitorNominee(Key key,
ClientLeaderRegInterface coord,
AsyncTrigger* nomineeChange,
Optional<LeaderInfo>* info,
Optional<Hostname> hostname = Optional<Hostname>()) {
loop {
if (connRecord.isValid() && connRecord->hasUnresolvedHostnames()) {
wait(connRecord->resolveHostnames());
}
state Optional<LeaderInfo> li;
if (coord.getLeader.getEndpoint().getPrimaryAddress().fromHostname) {
state ErrorOr<Optional<LeaderInfo>> rep =
wait(coord.getLeader.tryGetReply(GetLeaderRequest(key, info->present() ? info->get().changeID : UID()),
TaskPriority::CoordinationReply));
if (rep.isError() && rep.getError().code() == error_code_request_maybe_delivered) {
if (rep.isError()) {
// Connecting to nominee failed, most likely due to connection failed.
TraceEvent("CoordnitorChangedMonitorNominee")
TraceEvent("MonitorNomineeError")
.detail("Hostname", hostname.present() ? hostname.get().toString() : "UnknownHostname")
.detail("OldAddr", coord.getLeader.getEndpoint().getPrimaryAddress().toString());
// 50 milliseconds delay to prevent tight resolving loop due to outdated DNS cache
wait(delay(0.05));
throw coordinators_changed();
.detail("OldAddr", coord.getLeader.getEndpoint().getPrimaryAddress().toString())
.error(rep.getError());
if (rep.getError().code() == error_code_request_maybe_delivered) {
// 50 milliseconds delay to prevent tight resolving loop due to outdated DNS cache
wait(delay(0.05));
throw coordinators_changed();
} else {
throw rep.getError();
}
} else if (rep.present()) {
li = rep.get();
}
@ -681,12 +681,8 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderOneGeneration(Reference<IClusterCon
if (r != connRecord->getConnectionString().networkAddressToHostname.end()) {
hostname = r->second;
}
actors.push_back(monitorNominee(coordinators.clusterKey,
coordinators.clientLeaderServers[i],
&nomineeChange,
&nominees[i],
hostname,
connRecord));
actors.push_back(monitorNominee(
coordinators.clusterKey, coordinators.clientLeaderServers[i], &nomineeChange, &nominees[i], hostname));
}
allActors = waitForAll(actors);
@ -725,7 +721,7 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderOneGeneration(Reference<IClusterCon
wait(nomineeChange.onTrigger() || allActors);
} catch (Error& e) {
if (e.code() == error_code_coordinators_changed) {
connRecord->getMutableConnectionString()->resetToUnresolved();
connRecord->getConnectionString().resetToUnresolved();
break;
} else {
throw e;
@ -919,7 +915,7 @@ ACTOR Future<Void> monitorLeaderAndGetClientInfo(Key clusterKey,
if (e.code() == error_code_coordinators_changed) {
coordinatorsChanged->trigger();
}
return Void();
throw e;
}
}
}
@ -1088,7 +1084,7 @@ ACTOR Future<Void> monitorProxies(
}
} catch (Error& e) {
if (e.code() == error_code_coordinators_changed) {
info.intermediateConnRecord->getMutableConnectionString()->resetToUnresolved();
info.intermediateConnRecord->getConnectionString().resetToUnresolved();
} else {
throw e;
}

View File

@ -359,9 +359,5 @@ Future<Void> MovableCoordinatedState::setExclusive(Value v) {
return impl->setExclusive(v);
}
Future<Void> MovableCoordinatedState::move(ClusterConnectionString const& nc) {
// We assume the new connection string contains only IP addresses. Even `coordinators` command in fdbcli is going to
// support hostnames, those hostnames should have been resolved in place and will be treated as IP address
// afterwards, i.e. no automatic re-resolve as that of the hostname used in cluster file.
ASSERT(nc.hostnames.size() == 0);
return MovableCoordinatedStateImpl::move(impl.get(), nc);
}

View File

@ -268,7 +268,7 @@ ACTOR Future<Void> remoteMonitorLeader(int* clientCount,
Reference<AsyncVar<Optional<LeaderInfo>>> currentElectedLeader,
ElectionResultRequest req,
Reference<AsyncVar<Void>> coordinatorsChanged) {
state bool _coordinatorsChanged = false;
state bool coordinatorsChangeDetected = false;
state Future<Void> coordinatorsChangedOnChange = coordinatorsChanged->onChange();
state Future<Void> currentElectedLeaderOnChange = currentElectedLeader->onChange();
++(*clientCount);
@ -281,14 +281,14 @@ ACTOR Future<Void> remoteMonitorLeader(int* clientCount,
}
when(wait(coordinatorsChangedOnChange)) {
coordinatorsChangedOnChange = coordinatorsChanged->onChange();
_coordinatorsChanged = true;
coordinatorsChangeDetected = true;
break;
}
when(wait(delayJittered(SERVER_KNOBS->CLIENT_REGISTER_INTERVAL))) { break; }
}
}
if (_coordinatorsChanged) {
if (coordinatorsChangeDetected) {
req.reply.sendError(coordinators_changed());
} else {
req.reply.send(currentElectedLeader->get());

View File

@ -2511,7 +2511,7 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderWithDelayedCandidacyImplOneGenerati
successIndex = index;
} else {
if (leader.isError() && leader.getError().code() == error_code_coordinators_changed) {
info.intermediateConnRecord->getMutableConnectionString()->resetToUnresolved();
info.intermediateConnRecord->getConnectionString().resetToUnresolved();
throw coordinators_changed();
}
index = (index + 1) % addrs.size();
@ -2539,7 +2539,7 @@ ACTOR Future<Void> monitorLeaderWithDelayedCandidacyImplInternal(Reference<IClus
info = _info;
} catch (Error& e) {
if (e.code() == error_code_coordinators_changed) {
info.intermediateConnRecord->getMutableConnectionString()->resetToUnresolved();
info.intermediateConnRecord->getConnectionString().resetToUnresolved();
} else {
throw e;
}

View File

@ -1052,11 +1052,10 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
Optional<Value> res = wait(tx->get(coordinatorsKey));
ASSERT(res.present()); // Otherwise, database is in a bad state
state ClusterConnectionString csNew(res.get().toString());
ASSERT(csNew.coordinators().size() + csNew.hostnames.size() ==
old_coordinators_processes.size() + 1);
if (csNew.hasUnresolvedHostnames) {
wait(csNew.resolveHostnames());
}
ASSERT(csNew.coordinators().size() == old_coordinators_processes.size() + 1);
// verify the coordinators' addresses
for (const auto& network_address : csNew.coordinators()) {
std::string address_str = network_address.toString();