coordinatorsKey should not always store IP addresses. (#7204)

* coordinatorsKey should not storing IP addresses.

Currently, when we do a commit of coordinator change, we are always converting hostnames to IP addresses and store the converted results in coordinatorsKey (\xff/coordinators). This result in ForwardRequest also sending IP addresses, and receivers will update their cluster files with IPs, then we lose the dynamic IP feature.

* Remove the legacy coordinators() function.

* Update async_resolve().

ip::basic_resolver::async_resolve(const query & q, ResolveHandler && handler) is deprecated.

* Clean code format.

* Fix typo.

* Remove SpecifiedQuorumChange and NoQuorumChange.
This commit is contained in:
Renxuan Wang 2022-05-23 11:42:56 -07:00 committed by GitHub
parent 4015b5b2a0
commit df4e0deb4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 122 additions and 142 deletions

View File

@ -31,7 +31,7 @@
// Determine public IP address by calling the first available coordinator.
// If fail connecting all coordinators, throw bind_failed().
IPAddress determinePublicIPAutomatically(ClusterConnectionString& ccs) {
int size = ccs.coordinators().size() + ccs.hostnames.size();
int size = ccs.coords.size() + ccs.hostnames.size();
int index = 0;
loop {
try {
@ -42,10 +42,10 @@ IPAddress determinePublicIPAutomatically(ClusterConnectionString& ccs) {
NetworkAddress coordAddr;
// Try coords first, because they don't need to be resolved.
if (index < ccs.coordinators().size()) {
coordAddr = ccs.coordinators()[index];
if (index < ccs.coords.size()) {
coordAddr = ccs.coords[index];
} else {
Hostname& h = ccs.hostnames[index - ccs.coordinators().size()];
Hostname& h = ccs.hostnames[index - ccs.coords.size()];
Optional<NetworkAddress> resolvedAddr = h.resolveBlocking();
if (!resolvedAddr.present()) {
throw lookup_failed();

View File

@ -66,7 +66,6 @@ public:
ClusterConnectionString(const std::vector<NetworkAddress>& coordinators, Key key);
ClusterConnectionString(const std::vector<Hostname>& hosts, Key key);
std::vector<NetworkAddress> const& coordinators() const { return coords; }
Key clusterKey() const { return key; }
Key clusterKeyName() const {
return keyDesc;

View File

@ -799,8 +799,8 @@ ACTOR Future<Optional<ClusterConnectionString>> getConnectionString(Database cx)
}
ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr,
Reference<IQuorumChange> change,
std::vector<NetworkAddress> desiredCoordinators) {
ClusterConnectionString* conn,
std::string newName) {
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::USE_PROVISIONAL_PROXIES);
@ -816,34 +816,30 @@ ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr,
tr->getDatabase()->getConnectionRecord()->getConnectionString().clusterKeyName())
return CoordinatorsResult::BAD_DATABASE_STATE; // Someone changed the "name" of the database??
state std::vector<NetworkAddress> oldCoordinators = wait(old.tryResolveHostnames());
state CoordinatorsResult result = CoordinatorsResult::SUCCESS;
if (!desiredCoordinators.size()) {
std::vector<NetworkAddress> _desiredCoordinators = wait(change->getDesiredCoordinators(
tr,
oldCoordinators,
Reference<ClusterConnectionMemoryRecord>(new ClusterConnectionMemoryRecord(old)),
result));
desiredCoordinators = _desiredCoordinators;
if (conn->hostnames.size() + conn->coords.size() == 0) {
conn->hostnames = old.hostnames;
conn->coords = old.coords;
}
std::vector<NetworkAddress> desiredCoordinators = wait(conn->tryResolveHostnames());
if (desiredCoordinators.size() != conn->hostnames.size() + conn->coords.size()) {
TraceEvent("ChangeQuorumCheckerEarlyTermination")
.detail("Reason", "One or more hostnames are unresolvable")
.backtrace();
return CoordinatorsResult::COORDINATOR_UNREACHABLE;
}
if (result != CoordinatorsResult::SUCCESS)
return result;
if (!desiredCoordinators.size())
return CoordinatorsResult::INVALID_NETWORK_ADDRESSES;
std::sort(desiredCoordinators.begin(), desiredCoordinators.end());
std::string newName = change->getDesiredClusterKeyName();
if (newName.empty())
if (newName.empty()) {
newName = old.clusterKeyName().toString();
if (oldCoordinators == desiredCoordinators && old.clusterKeyName() == newName)
}
std::sort(conn->hostnames.begin(), conn->hostnames.end());
std::sort(conn->coords.begin(), conn->coords.end());
std::sort(old.hostnames.begin(), old.hostnames.end());
std::sort(old.coords.begin(), old.coords.end());
if (conn->hostnames == old.hostnames && conn->coords == old.coords && old.clusterKeyName() == newName) {
return CoordinatorsResult::SAME_NETWORK_ADDRESSES;
}
state ClusterConnectionString conn(desiredCoordinators,
StringRef(newName + ':' + deterministicRandom()->randomAlphaNumeric(32)));
conn->parseKey(newName + ':' + deterministicRandom()->randomAlphaNumeric(32));
if (g_network->isSimulated()) {
int i = 0;
@ -868,19 +864,27 @@ ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr,
}
std::vector<Future<Optional<LeaderInfo>>> leaderServers;
ClientCoordinators coord(Reference<ClusterConnectionMemoryRecord>(new ClusterConnectionMemoryRecord(conn)));
ClientCoordinators coord(Reference<ClusterConnectionMemoryRecord>(new ClusterConnectionMemoryRecord(*conn)));
leaderServers.reserve(coord.clientLeaderServers.size());
for (int i = 0; i < coord.clientLeaderServers.size(); i++)
leaderServers.push_back(retryBrokenPromise(coord.clientLeaderServers[i].getLeader,
GetLeaderRequest(coord.clusterKey, UID()),
TaskPriority::CoordinationReply));
for (int i = 0; i < coord.clientLeaderServers.size(); i++) {
if (coord.clientLeaderServers[i].hostname.present()) {
leaderServers.push_back(retryGetReplyFromHostname(GetLeaderRequest(coord.clusterKey, UID()),
coord.clientLeaderServers[i].hostname.get(),
WLTOKEN_CLIENTLEADERREG_GETLEADER,
TaskPriority::CoordinationReply));
} else {
leaderServers.push_back(retryBrokenPromise(coord.clientLeaderServers[i].getLeader,
GetLeaderRequest(coord.clusterKey, UID()),
TaskPriority::CoordinationReply));
}
}
choose {
when(wait(waitForAll(leaderServers))) {}
when(wait(delay(5.0))) { return CoordinatorsResult::COORDINATOR_UNREACHABLE; }
}
tr->set(coordinatorsKey, conn.toString());
tr->set(coordinatorsKey, conn->toString());
return Optional<CoordinatorsResult>();
}
@ -990,32 +994,6 @@ ACTOR Future<CoordinatorsResult> changeQuorum(Database cx, Reference<IQuorumChan
}
}
struct SpecifiedQuorumChange final : IQuorumChange {
std::vector<NetworkAddress> desired;
explicit SpecifiedQuorumChange(std::vector<NetworkAddress> const& desired) : desired(desired) {}
Future<std::vector<NetworkAddress>> getDesiredCoordinators(Transaction* tr,
std::vector<NetworkAddress> oldCoordinators,
Reference<IClusterConnectionRecord>,
CoordinatorsResult&) override {
return desired;
}
};
Reference<IQuorumChange> specifiedQuorumChange(std::vector<NetworkAddress> const& addresses) {
return Reference<IQuorumChange>(new SpecifiedQuorumChange(addresses));
}
struct NoQuorumChange final : IQuorumChange {
Future<std::vector<NetworkAddress>> getDesiredCoordinators(Transaction* tr,
std::vector<NetworkAddress> oldCoordinators,
Reference<IClusterConnectionRecord>,
CoordinatorsResult&) override {
return oldCoordinators;
}
};
Reference<IQuorumChange> noQuorumChange() {
return Reference<IQuorumChange>(new NoQuorumChange);
}
struct NameQuorumChange final : IQuorumChange {
std::string newName;
Reference<IQuorumChange> otherChange;
@ -1062,12 +1040,30 @@ struct AutoQuorumChange final : IQuorumChange {
Reference<IClusterConnectionRecord> ccr,
int desiredCount,
std::set<AddressExclusion>* excluded) {
ClusterConnectionString cs = ccr->getConnectionString();
if (oldCoordinators.size() != cs.hostnames.size() + cs.coords.size()) {
return false;
}
// Are there enough coordinators for the redundancy level?
if (oldCoordinators.size() < desiredCount)
return false;
if (oldCoordinators.size() % 2 != 1)
return false;
// Check exclusions
for (auto& c : oldCoordinators) {
if (addressExcluded(*excluded, c))
return false;
}
// Check locality
// FIXME: Actual locality!
std::sort(oldCoordinators.begin(), oldCoordinators.end());
for (int i = 1; i < oldCoordinators.size(); i++)
if (oldCoordinators[i - 1].ip == oldCoordinators[i].ip)
return false; // Multiple coordinators share an IP
// Check availability
ClientCoordinators coord(ccr);
std::vector<Future<Optional<LeaderInfo>>> leaderServers;
@ -1095,19 +1091,6 @@ struct AutoQuorumChange final : IQuorumChange {
}
}
// Check exclusions
for (auto& c : oldCoordinators) {
if (addressExcluded(*excluded, c))
return false;
}
// Check locality
// FIXME: Actual locality!
std::sort(oldCoordinators.begin(), oldCoordinators.end());
for (int i = 1; i < oldCoordinators.size(); i++)
if (oldCoordinators[i - 1].ip == oldCoordinators[i].ip)
return false; // Multiple coordinators share an IP
return true; // The status quo seems fine
}
@ -1149,8 +1132,10 @@ struct AutoQuorumChange final : IQuorumChange {
if (checkAcceptable) {
bool ok = wait(isAcceptable(self.getPtr(), tr, oldCoordinators, ccr, desiredCount, &excluded));
if (ok)
if (ok) {
*err = CoordinatorsResult::SAME_NETWORK_ADDRESSES;
return oldCoordinators;
}
}
std::vector<NetworkAddress> chosen;

View File

@ -55,12 +55,10 @@ struct IQuorumChange : ReferenceCounted<IQuorumChange> {
// Change to use the given set of coordination servers
ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr,
Reference<IQuorumChange> change,
std::vector<NetworkAddress> desiredCoordinators);
ClusterConnectionString* conn,
std::string newName);
ACTOR Future<CoordinatorsResult> changeQuorum(Database cx, Reference<IQuorumChange> change);
Reference<IQuorumChange> autoQuorumChange(int desired = -1);
Reference<IQuorumChange> noQuorumChange();
Reference<IQuorumChange> specifiedQuorumChange(std::vector<NetworkAddress> const&);
Reference<IQuorumChange> nameQuorumChange(std::string const& name, Reference<IQuorumChange> const& other);
// Exclude the given set of servers from use as state servers. Returns as soon as the change is durable, without

View File

@ -250,7 +250,7 @@ TEST_CASE("/fdbclient/MonitorLeader/ConnectionString/hostname") {
ClusterConnectionString cs(hostnames, LiteralStringRef("TestCluster:0"));
ASSERT(cs.hostnames.size() == 2);
ASSERT(cs.coordinators().size() == 0);
ASSERT(cs.coords.size() == 0);
ASSERT(cs.toString() == connectionString);
}
@ -301,7 +301,7 @@ TEST_CASE("/fdbclient/MonitorLeader/PartialResolve") {
INetworkConnections::net()->addMockTCPEndpoint(hn, port, { address });
ClusterConnectionString cs(connectionString);
state std::vector<NetworkAddress> allCoordinators = wait(cs.tryResolveHostnames());
std::vector<NetworkAddress> allCoordinators = wait(cs.tryResolveHostnames());
ASSERT(allCoordinators.size() == 1 &&
std::find(allCoordinators.begin(), allCoordinators.end(), address) != allCoordinators.end());
@ -460,7 +460,7 @@ ClientCoordinators::ClientCoordinators(Reference<IClusterConnectionRecord> ccr)
for (auto h : cs.hostnames) {
clientLeaderServers.push_back(ClientLeaderRegInterface(h));
}
for (auto s : cs.coordinators()) {
for (auto s : cs.coords) {
clientLeaderServers.push_back(ClientLeaderRegInterface(s));
}
}
@ -866,7 +866,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
Reference<ReferencedObject<Standalone<VectorRef<ClientVersionRef>>>> supportedVersions,
Key traceLogGroup) {
state ClusterConnectionString cs = info.intermediateConnRecord->getConnectionString();
state int coordinatorsSize = cs.hostnames.size() + cs.coordinators().size();
state int coordinatorsSize = cs.hostnames.size() + cs.coords.size();
state int index = 0;
state int successIndex = 0;
state Optional<double> incorrectTime;
@ -880,7 +880,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
for (const auto& h : cs.hostnames) {
clientLeaderServers.push_back(ClientLeaderRegInterface(h));
}
for (const auto& c : cs.coordinators()) {
for (const auto& c : cs.coords) {
clientLeaderServers.push_back(ClientLeaderRegInterface(c));
}
@ -892,7 +892,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
req.clusterKey = cs.clusterKey();
req.hostnames = cs.hostnames;
req.coordinators = cs.coordinators();
req.coordinators = cs.coords;
req.knownClientInfoID = clientInfo->get().id;
req.supportedVersions = supportedVersions->get();
req.traceLogGroup = traceLogGroup;

View File

@ -409,11 +409,11 @@ public:
PaxosConfigTransactionImpl(Database const& cx) : cx(cx) {
const ClusterConnectionString& cs = cx->getConnectionRecord()->getConnectionString();
ctis.reserve(cs.hostnames.size() + cs.coordinators().size());
ctis.reserve(cs.hostnames.size() + cs.coords.size());
for (const auto& h : cs.hostnames) {
ctis.emplace_back(h);
}
for (const auto& c : cs.coordinators()) {
for (const auto& c : cs.coords) {
ctis.emplace_back(c);
}
getGenerationQuorum = GetGenerationQuorum{ ctis };

View File

@ -162,8 +162,8 @@ class SimpleConfigTransactionImpl {
public:
SimpleConfigTransactionImpl(Database const& cx) : cx(cx) {
const ClusterConnectionString& cs = cx->getConnectionRecord()->getConnectionString();
if (cs.coordinators().size()) {
std::vector<NetworkAddress> coordinators = cs.coordinators();
if (cs.coords.size()) {
std::vector<NetworkAddress> coordinators = cs.coords;
std::sort(coordinators.begin(), coordinators.end());
cti = ConfigTransactionInterface(coordinators[0]);
} else {

View File

@ -1658,7 +1658,6 @@ Future<RangeResult> CoordinatorsImpl::getRange(ReadYourWritesTransaction* ryw,
}
ACTOR static Future<Optional<std::string>> coordinatorsCommitActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) {
state Reference<IQuorumChange> change;
state ClusterConnectionString conn; // We don't care about the Key here.
state std::vector<std::string> process_address_or_hostname_strs;
state Optional<std::string> msg;
@ -1704,15 +1703,7 @@ ACTOR static Future<Optional<std::string>> coordinatorsCommitActor(ReadYourWrite
}
}
std::vector<NetworkAddress> addressesVec = wait(conn.tryResolveHostnames());
if (addressesVec.size() != conn.hostnames.size() + conn.coordinators().size()) {
return ManagementAPIError::toJsonString(false, "coordinators", "One or more hostnames are not resolvable.");
} else if (addressesVec.size()) {
change = specifiedQuorumChange(addressesVec);
} else {
change = noQuorumChange();
}
std::string newName;
// check update for cluster_description
Key cluster_decription_key = LiteralStringRef("cluster_description").withPrefix(kr.begin);
auto entry = ryw->getSpecialKeySpaceWriteMap()[cluster_decription_key];
@ -1720,7 +1711,7 @@ ACTOR static Future<Optional<std::string>> coordinatorsCommitActor(ReadYourWrite
// check valid description [a-zA-Z0-9_]+
if (entry.second.present() && isAlphaNumeric(entry.second.get().toString())) {
// do the name change
change = nameQuorumChange(entry.second.get().toString(), change);
newName = entry.second.get().toString();
} else {
// throw the error
return ManagementAPIError::toJsonString(
@ -1728,13 +1719,11 @@ ACTOR static Future<Optional<std::string>> coordinatorsCommitActor(ReadYourWrite
}
}
ASSERT(change.isValid());
TraceEvent(SevDebug, "SKSChangeCoordinatorsStart")
.detail("NewAddresses", describe(addressesVec))
.detail("NewConnectionString", conn.toString())
.detail("Description", entry.first ? entry.second.get().toString() : "");
Optional<CoordinatorsResult> r = wait(changeQuorumChecker(&ryw->getTransaction(), change, addressesVec));
Optional<CoordinatorsResult> r = wait(changeQuorumChecker(&ryw->getTransaction(), &conn, newName));
TraceEvent(SevDebug, "SKSChangeCoordinatorsFinish")
.detail("Result", r.present() ? static_cast<int>(r.get()) : -1); // -1 means success
@ -1803,9 +1792,20 @@ ACTOR static Future<RangeResult> CoordinatorsAutoImplActor(ReadYourWritesTransac
throw special_keys_api_failure();
}
for (const auto& address : _desiredCoordinators) {
autoCoordinatorsKey += autoCoordinatorsKey.size() ? "," : "";
autoCoordinatorsKey += address.toString();
if (result == CoordinatorsResult::SAME_NETWORK_ADDRESSES) {
for (const auto& host : old.hostnames) {
autoCoordinatorsKey += autoCoordinatorsKey.size() ? "," : "";
autoCoordinatorsKey += host.toString();
}
for (const auto& coord : old.coords) {
autoCoordinatorsKey += autoCoordinatorsKey.size() ? "," : "";
autoCoordinatorsKey += coord.toString();
}
} else {
for (const auto& address : _desiredCoordinators) {
autoCoordinatorsKey += autoCoordinatorsKey.size() ? "," : "";
autoCoordinatorsKey += address.toString();
}
}
res.push_back_deep(res.arena(), KeyValueRef(kr.begin, Value(autoCoordinatorsKey)));
return res;

View File

@ -104,7 +104,7 @@ ServerCoordinators::ServerCoordinators(Reference<IClusterConnectionRecord> ccr)
stateServers.emplace_back(h);
configServers.emplace_back(h);
}
for (auto s : cs.coordinators()) {
for (auto s : cs.coords) {
leaderElectionServers.emplace_back(s);
stateServers.emplace_back(s);
configServers.emplace_back(s);

View File

@ -857,7 +857,7 @@ std::pair<NetworkAddressList, NetworkAddressList> buildNetworkAddresses(
NetworkAddressList listenNetworkAddresses;
std::vector<Hostname>& hostnames = connectionRecord.getConnectionString().hostnames;
const std::vector<NetworkAddress>& coords = connectionRecord.getConnectionString().coordinators();
const std::vector<NetworkAddress>& coords = connectionRecord.getConnectionString().coords;
ASSERT(hostnames.size() + coords.size() > 0);
for (int ii = 0; ii < publicAddressStrs.size(); ++ii) {

View File

@ -2978,7 +2978,7 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderWithDelayedCandidacyImplOneGenerati
Reference<AsyncVar<Value>> result,
MonitorLeaderInfo info) {
ClusterConnectionString cs = info.intermediateConnRecord->getConnectionString();
state int coordinatorsSize = cs.hostnames.size() + cs.coordinators().size();
state int coordinatorsSize = cs.hostnames.size() + cs.coords.size();
state ElectionResultRequest request;
state int index = 0;
state int successIndex = 0;
@ -2988,14 +2988,14 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderWithDelayedCandidacyImplOneGenerati
for (const auto& h : cs.hostnames) {
leaderElectionServers.push_back(LeaderElectionRegInterface(h));
}
for (const auto& c : cs.coordinators()) {
for (const auto& c : cs.coords) {
leaderElectionServers.push_back(LeaderElectionRegInterface(c));
}
deterministicRandom()->randomShuffle(leaderElectionServers);
request.key = cs.clusterKey();
request.hostnames = cs.hostnames;
request.coordinators = cs.coordinators();
request.coordinators = cs.coords;
loop {
LeaderElectionRegInterface interf = leaderElectionServers[index];

View File

@ -956,7 +956,7 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
state std::vector<std::string> process_addresses;
boost::split(
process_addresses, coordinator_processes_key.get().toString(), [](char c) { return c == ','; });
ASSERT(process_addresses.size() == cs.coordinators().size() + cs.hostnames.size());
ASSERT(process_addresses.size() == cs.coords.size() + cs.hostnames.size());
// compare the coordinator process network addresses one by one
std::vector<NetworkAddress> coordinators = wait(cs.tryResolveHostnames());
for (const auto& network_address : coordinators) {
@ -1080,8 +1080,7 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
ClusterConnectionString csNew(res.get().toString());
// verify the cluster decription
ASSERT(new_cluster_description == csNew.clusterKeyName().toString());
ASSERT(csNew.hostnames.size() + csNew.coordinators().size() ==
old_coordinators_processes.size() + 1);
ASSERT(csNew.hostnames.size() + csNew.coords.size() == old_coordinators_processes.size() + 1);
std::vector<NetworkAddress> newCoordinators = wait(csNew.tryResolveHostnames());
// verify the coordinators' addresses
for (const auto& network_address : newCoordinators) {

View File

@ -1847,35 +1847,34 @@ ACTOR static Future<std::vector<NetworkAddress>> resolveTCPEndpoint_impl(Net2* s
Promise<std::vector<NetworkAddress>> promise;
state Future<std::vector<NetworkAddress>> result = promise.getFuture();
tcpResolver.async_resolve(tcp::resolver::query(host, service),
[=](const boost::system::error_code& ec, tcp::resolver::iterator iter) {
if (ec) {
self->dnsCache.remove(host, service);
promise.sendError(lookup_failed());
return;
}
tcpResolver.async_resolve(host, service, [=](const boost::system::error_code& ec, tcp::resolver::iterator iter) {
if (ec) {
self->dnsCache.remove(host, service);
promise.sendError(lookup_failed());
return;
}
std::vector<NetworkAddress> addrs;
std::vector<NetworkAddress> addrs;
tcp::resolver::iterator end;
while (iter != end) {
auto endpoint = iter->endpoint();
auto addr = endpoint.address();
if (addr.is_v6()) {
addrs.emplace_back(IPAddress(addr.to_v6().to_bytes()), endpoint.port());
} else {
addrs.emplace_back(addr.to_v4().to_ulong(), endpoint.port());
}
++iter;
}
tcp::resolver::iterator end;
while (iter != end) {
auto endpoint = iter->endpoint();
auto addr = endpoint.address();
if (addr.is_v6()) {
addrs.emplace_back(IPAddress(addr.to_v6().to_bytes()), endpoint.port());
} else {
addrs.emplace_back(addr.to_v4().to_ulong(), endpoint.port());
}
++iter;
}
if (addrs.empty()) {
self->dnsCache.remove(host, service);
promise.sendError(lookup_failed());
} else {
promise.send(addrs);
}
});
if (addrs.empty()) {
self->dnsCache.remove(host, service);
promise.sendError(lookup_failed());
} else {
promise.send(addrs);
}
});
wait(ready(result));
tcpResolver.cancel();