issue 4252

This commit is contained in:
Dan Lambright 2021-03-30 12:31:10 -04:00
parent ee3e569055
commit 065c4fdd5a
7 changed files with 90 additions and 13 deletions

View File

@ -545,9 +545,15 @@ struct LeaderRegisterCollection {
}
};
StringRef getClusterName(Key key) {
StringRef str = key.contents();
return str.eat(":");
}
// leaderServer multiplexes multiple leaderRegisters onto a single LeaderElectionRegInterface,
// creating and destroying them on demand.
ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore* pStore, UID id) {
ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore* pStore, UID id,
Reference<ClusterConnectionFile> ccf) {
state LeaderRegisterCollection regs(pStore);
state ActorCollection forwarders(false);
@ -562,6 +568,16 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore
info.forward = forward.get().serializedInfo;
req.reply.send(CachedSerialization<ClientDBInfo>(info));
} else {
StringRef reqClusterName = getClusterName(req.clusterKey);
StringRef clusterName = getClusterName(ccf->getConnectionString().clusterKey());
if (reqClusterName.compare(clusterName) ||
ccf->getConnectionString().coordinators() != req.coordinators) {
TraceEvent(SevWarnAlways, "CCFMismatch")
.detail("RequestType", "OpenDatabaseCoordRequest")
.detail("LocalCS", ccf->getConnectionString().toString())
.detail("IncomingClusterKey", req.clusterKey)
.detail("IncomingCoordinators", describeList(req.coordinators, req.coordinators.size()));
}
regs.getInterface(req.clusterKey, id).openDatabase.send(req);
}
}
@ -570,6 +586,16 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore
if (forward.present()) {
req.reply.send(forward.get());
} else {
StringRef reqClusterName = getClusterName(req.key);
StringRef clusterName = getClusterName(ccf->getConnectionString().clusterKey());
if (reqClusterName.compare(clusterName) ||
ccf->getConnectionString().coordinators() != req.coordinators) {
TraceEvent(SevWarnAlways, "CCFMismatch")
.detail("RequestType", "ElectionResultRequest")
.detail("LocalCS", ccf->getConnectionString().toString())
.detail("IncomingClusterKey", req.key)
.detail("IncomingCoordinators", describeList(req.coordinators, req.coordinators.size()));
}
regs.getInterface(req.key, id).electionResult.send(req);
}
}
@ -577,30 +603,66 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore
Optional<LeaderInfo> forward = regs.getForward(req.key);
if (forward.present())
req.reply.send(forward.get());
else
else {
StringRef reqClusterName = getClusterName(req.key);
StringRef clusterName = getClusterName(ccf->getConnectionString().clusterKey());
if (reqClusterName.compare(clusterName)) {
TraceEvent(SevWarnAlways, "CCFMismatch")
.detail("RequestType", "GetLeaderRequest")
.detail("LocalCS", ccf->getConnectionString().toString())
.detail("IncomingClusterKey", req.key)
.detail("Key", reqClusterName).detail("Key2",clusterName);
}
regs.getInterface(req.key, id).getLeader.send(req);
}
}
when(CandidacyRequest req = waitNext(interf.candidacy.getFuture())) {
Optional<LeaderInfo> forward = regs.getForward(req.key);
if (forward.present())
req.reply.send(forward.get());
else
else {
StringRef reqClusterName = getClusterName(req.key);
StringRef clusterName = getClusterName(ccf->getConnectionString().clusterKey());
if (reqClusterName.compare(clusterName)) {
TraceEvent(SevWarnAlways, "CCFMismatch")
.detail("RequestType", "CandidacyRequest")
.detail("LocalCS", ccf->getConnectionString().toString())
.detail("IncomingClusterKey", req.key);
}
regs.getInterface(req.key, id).candidacy.send(req);
}
}
when(LeaderHeartbeatRequest req = waitNext(interf.leaderHeartbeat.getFuture())) {
Optional<LeaderInfo> forward = regs.getForward(req.key);
if (forward.present())
req.reply.send(LeaderHeartbeatReply{ false });
else
else {
StringRef reqClusterName = getClusterName(req.key);
StringRef clusterName = getClusterName(ccf->getConnectionString().clusterKey());
if (reqClusterName.compare(clusterName)) {
TraceEvent(SevWarnAlways, "CCFMismatch")
.detail("RequestType", "LeaderHeartbeatRequest")
.detail("LocalCS", ccf->getConnectionString().toString())
.detail("IncomingClusterKey", req.key);
}
regs.getInterface(req.key, id).leaderHeartbeat.send(req);
}
}
when(ForwardRequest req = waitNext(interf.forward.getFuture())) {
Optional<LeaderInfo> forward = regs.getForward(req.key);
if (forward.present())
req.reply.send(Void());
else {
forwarders.add(
LeaderRegisterCollection::setForward(&regs, req.key, ClusterConnectionString(req.conn.toString())));
StringRef reqClusterName = getClusterName(req.key);
StringRef clusterName = getClusterName(ccf->getConnectionString().clusterKey());
if (reqClusterName.compare(clusterName)) {
TraceEvent(SevWarnAlways, "CCFMismatch")
.detail("RequestType", "ForwardRequest")
.detail("LocalCS", ccf->getConnectionString().toString())
.detail("IncomingClusterKey", req.key);
}
forwarders.add(LeaderRegisterCollection::setForward(&regs, req.key,
ClusterConnectionString(req.conn.toString())));
regs.getInterface(req.key, id).forward.send(req);
}
}
@ -611,7 +673,7 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore
}
}
ACTOR Future<Void> coordinationServer(std::string dataFolder) {
ACTOR Future<Void> coordinationServer(std::string dataFolder, Reference<ClusterConnectionFile> ccf) {
state UID myID = deterministicRandom()->randomUniqueID();
state LeaderElectionRegInterface myLeaderInterface(g_network);
state GenerationRegInterface myInterface(g_network);
@ -622,7 +684,7 @@ ACTOR Future<Void> coordinationServer(std::string dataFolder) {
.detail("Folder", dataFolder);
try {
wait(localGenerationReg(myInterface, &store) || leaderServer(myLeaderInterface, &store, myID) ||
wait(localGenerationReg(myInterface, &store) || leaderServer(myLeaderInterface, &store, myID, ccf) ||
store.getError());
throw internal_error();
} catch (Error& e) {

View File

@ -225,6 +225,6 @@ public:
vector<GenerationRegInterface> stateServers;
};
Future<Void> coordinationServer(std::string const& dataFolder);
Future<Void> coordinationServer(std::string const& dataFolder, Reference<ClusterConnectionFile> const& ccf);
#endif

View File

@ -631,6 +631,8 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
// Coordination
init( COORDINATED_STATE_ONCONFLICT_POLL_INTERVAL, 1.0 ); if( randomize && BUGGIFY ) COORDINATED_STATE_ONCONFLICT_POLL_INTERVAL = 10.0;
init( FORWARD_REQUEST_TOO_OLD, 600.0 ); if( randomize && BUGGIFY ) FORWARD_REQUEST_TOO_OLD = 60.0;
init( ENABLE_CROSS_CLUSTER_SUPPORT, true ); if( randomize && BUGGIFY ) ENABLE_CROSS_CLUSTER_SUPPORT = false;
// Buggification
init( BUGGIFIED_EVENTUAL_CONSISTENCY, 1.0 );

View File

@ -559,6 +559,9 @@ public:
// Coordination
double COORDINATED_STATE_ONCONFLICT_POLL_INTERVAL;
double FORWARD_REQUEST_TOO_OLD;
bool ENABLE_CROSS_CLUSTER_SUPPORT; // Allow a coordinator to serve requests whose connection string does not match
// the local copy
// Buggification
double BUGGIFIED_EVENTUAL_CONSISTENCY;

View File

@ -1605,6 +1605,7 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors,
TEST(!useIPv6); // Use IPv4
vector<NetworkAddress> coordinatorAddresses;
vector<NetworkAddress> extraCoordinatorAddresses; // Used by extra DB if the DR db is a new one
if (testConfig.minimumRegions > 1) {
// do not put coordinators in the primary region so that we can kill that region safely
int nonPrimaryDcs = dataCenters / 2;
@ -1614,6 +1615,9 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors,
auto ip = makeIPAddressForSim(useIPv6, { 2, dc, 1, m });
coordinatorAddresses.push_back(
NetworkAddress(ip, sslEnabled && !sslOnly ? 2 : 1, true, sslEnabled && sslOnly));
auto extraIp = makeIPAddressForSim(useIPv6, { 4, dc, 1, m });
extraCoordinatorAddresses.push_back(
NetworkAddress(extraIp, sslEnabled && !sslOnly ? 2 : 1, true, sslEnabled && sslOnly));
TraceEvent("SelectedCoordinator").detail("Address", coordinatorAddresses.back());
}
}
@ -1642,6 +1646,9 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors,
auto ip = makeIPAddressForSim(useIPv6, { 2, dc, 1, m });
coordinatorAddresses.push_back(
NetworkAddress(ip, sslEnabled && !sslOnly ? 2 : 1, true, sslEnabled && sslOnly));
auto extraIp = makeIPAddressForSim(useIPv6, { 4, dc, 1, m });
extraCoordinatorAddresses.push_back(
NetworkAddress(extraIp, sslEnabled && !sslOnly ? 2 : 1, true, sslEnabled && sslOnly));
TraceEvent("SelectedCoordinator")
.detail("Address", coordinatorAddresses.back())
.detail("M", m)
@ -1678,11 +1685,13 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors,
// If extraDB==0, leave g_simulator.extraDB as null because the test does not use DR.
if (testConfig.extraDB == 1) {
// The DR database can be either a new database or itself
g_simulator.extraDB = new ClusterConnectionString(
coordinatorAddresses, BUGGIFY ? LiteralStringRef("TestCluster:0") : LiteralStringRef("ExtraCluster:0"));
g_simulator.extraDB =
BUGGIFY ? new ClusterConnectionString(coordinatorAddresses, LiteralStringRef("TestCluster:0"))
: new ClusterConnectionString(extraCoordinatorAddresses, LiteralStringRef("ExtraCluster:0"));
} else if (testConfig.extraDB == 2) {
// The DR database is a new database
g_simulator.extraDB = new ClusterConnectionString(coordinatorAddresses, LiteralStringRef("ExtraCluster:0"));
g_simulator.extraDB =
new ClusterConnectionString(extraCoordinatorAddresses, LiteralStringRef("ExtraCluster:0"));
} else if (testConfig.extraDB == 3) {
// The DR database is the same database
g_simulator.extraDB = new ClusterConnectionString(coordinatorAddresses, LiteralStringRef("TestCluster:0"));

View File

@ -2047,7 +2047,7 @@ ACTOR Future<Void> fdbd(Reference<ClusterConnectionFile> connFile,
if (coordFolder.size()) {
// SOMEDAY: remove the fileNotFound wrapper and make DiskQueue construction safe from errors setting up
// their files
actors.push_back(fileNotFoundToNever(coordinationServer(coordFolder)));
actors.push_back(fileNotFoundToNever(coordinationServer(coordFolder, coordinators.ccf)));
}
state UID processIDUid = wait(createAndLockProcessIdFile(dataFolder));

View File

@ -74,6 +74,7 @@ ERROR( disk_adapter_reset, 1050, "The disk queue adpater reset" )
ERROR( batch_transaction_throttled, 1051, "Batch GRV request rate limit exceeded")
ERROR( dd_cancelled, 1052, "Data distribution components cancelled")
ERROR( dd_not_found, 1053, "Data distributor not found")
ERROR( wrong_connection_file, 1054, "Connection file mismatch")
ERROR( broken_promise, 1100, "Broken promise" )
ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )