add knob enabling cross cluster support (default true)

This commit is contained in:
Dan Lambright 2021-04-05 15:52:48 -04:00
parent 065c4fdd5a
commit 60d27d05d8
6 changed files with 57 additions and 42 deletions

View File

@ -568,17 +568,18 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore
info.forward = forward.get().serializedInfo;
req.reply.send(CachedSerialization<ClientDBInfo>(info));
} else {
StringRef reqClusterName = getClusterName(req.clusterKey);
StringRef clusterName = getClusterName(ccf->getConnectionString().clusterKey());
if (reqClusterName.compare(clusterName) ||
ccf->getConnectionString().coordinators() != req.coordinators) {
TraceEvent(SevWarnAlways, "CCFMismatch")
StringRef clusterName = ccf->getConnectionString().clusterKeyName();
if (! SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT &&
getClusterName(req.clusterKey).compare(clusterName)) {
TraceEvent(SevError, "CCFMismatch")
.detail("RequestType", "OpenDatabaseCoordRequest")
.detail("LocalCS", ccf->getConnectionString().toString())
.detail("IncomingClusterKey", req.clusterKey)
.detail("IncomingCoordinators", describeList(req.coordinators, req.coordinators.size()));
req.reply.sendError(wrong_connection_file());
} else {
regs.getInterface(req.clusterKey, id).openDatabase.send(req);
}
regs.getInterface(req.clusterKey, id).openDatabase.send(req);
}
}
when(ElectionResultRequest req = waitNext(interf.electionResult.getFuture())) {
@ -586,17 +587,19 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore
if (forward.present()) {
req.reply.send(forward.get());
} else {
StringRef reqClusterName = getClusterName(req.key);
StringRef clusterName = getClusterName(ccf->getConnectionString().clusterKey());
if (reqClusterName.compare(clusterName) ||
ccf->getConnectionString().coordinators() != req.coordinators) {
TraceEvent(SevWarnAlways, "CCFMismatch")
StringRef clusterName = ccf->getConnectionString().clusterKeyName();
if (! SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT &&
getClusterName(req.key).compare(clusterName)) {
TraceEvent(SevError, "CCFMismatch")
.detail("RequestType", "ElectionResultRequest")
.detail("LocalCS", ccf->getConnectionString().toString())
.detail("IncomingClusterKey", req.key)
.detail("ClusterKey", ccf->getConnectionString().clusterKey())
.detail("IncomingCoordinators", describeList(req.coordinators, req.coordinators.size()));
req.reply.sendError(wrong_connection_file());
} else {
regs.getInterface(req.key, id).electionResult.send(req);
}
regs.getInterface(req.key, id).electionResult.send(req);
}
}
when(GetLeaderRequest req = waitNext(interf.getLeader.getFuture())) {
@ -604,16 +607,18 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore
if (forward.present())
req.reply.send(forward.get());
else {
StringRef reqClusterName = getClusterName(req.key);
StringRef clusterName = getClusterName(ccf->getConnectionString().clusterKey());
if (reqClusterName.compare(clusterName)) {
TraceEvent(SevWarnAlways, "CCFMismatch")
StringRef clusterName = ccf->getConnectionString().clusterKeyName();
if (! SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT &&
getClusterName(req.key).compare(clusterName)) {
TraceEvent(SevError, "CCFMismatch")
.detail("RequestType", "GetLeaderRequest")
.detail("LocalCS", ccf->getConnectionString().toString())
.detail("IncomingClusterKey", req.key)
.detail("Key", reqClusterName).detail("Key2",clusterName);
.detail("ClusterKey", ccf->getConnectionString().clusterKey());
req.reply.sendError(wrong_connection_file());
} else {
regs.getInterface(req.key, id).getLeader.send(req);
}
regs.getInterface(req.key, id).getLeader.send(req);
}
}
when(CandidacyRequest req = waitNext(interf.candidacy.getFuture())) {
@ -621,15 +626,17 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore
if (forward.present())
req.reply.send(forward.get());
else {
StringRef reqClusterName = getClusterName(req.key);
StringRef clusterName = getClusterName(ccf->getConnectionString().clusterKey());
if (reqClusterName.compare(clusterName)) {
TraceEvent(SevWarnAlways, "CCFMismatch")
StringRef clusterName = ccf->getConnectionString().clusterKeyName();
if (! SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT &&
getClusterName(req.key).compare(clusterName)) {
TraceEvent(SevError, "CCFMismatch")
.detail("RequestType", "CandidacyRequest")
.detail("LocalCS", ccf->getConnectionString().toString())
.detail("IncomingClusterKey", req.key);
req.reply.sendError(wrong_connection_file());
} else {
regs.getInterface(req.key, id).candidacy.send(req);
}
regs.getInterface(req.key, id).candidacy.send(req);
}
}
when(LeaderHeartbeatRequest req = waitNext(interf.leaderHeartbeat.getFuture())) {
@ -637,15 +644,17 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore
if (forward.present())
req.reply.send(LeaderHeartbeatReply{ false });
else {
StringRef reqClusterName = getClusterName(req.key);
StringRef clusterName = getClusterName(ccf->getConnectionString().clusterKey());
if (reqClusterName.compare(clusterName)) {
TraceEvent(SevWarnAlways, "CCFMismatch")
StringRef clusterName = ccf->getConnectionString().clusterKeyName();
if (! SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT &&
getClusterName(req.key).compare(clusterName)) {
TraceEvent(SevError, "CCFMismatch")
.detail("RequestType", "LeaderHeartbeatRequest")
.detail("LocalCS", ccf->getConnectionString().toString())
.detail("IncomingClusterKey", req.key);
req.reply.sendError(wrong_connection_file());
} else {
regs.getInterface(req.key, id).leaderHeartbeat.send(req);
}
regs.getInterface(req.key, id).leaderHeartbeat.send(req);
}
}
when(ForwardRequest req = waitNext(interf.forward.getFuture())) {
@ -653,17 +662,19 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore
if (forward.present())
req.reply.send(Void());
else {
StringRef reqClusterName = getClusterName(req.key);
StringRef clusterName = getClusterName(ccf->getConnectionString().clusterKey());
if (reqClusterName.compare(clusterName)) {
TraceEvent(SevWarnAlways, "CCFMismatch")
StringRef clusterName = ccf->getConnectionString().clusterKeyName();
if (! SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT &&
getClusterName(req.key).compare(clusterName)) {
TraceEvent(SevError, "CCFMismatch")
.detail("RequestType", "ForwardRequest")
.detail("LocalCS", ccf->getConnectionString().toString())
.detail("IncomingClusterKey", req.key);
req.reply.sendError(wrong_connection_file());
} else {
forwarders.add(
LeaderRegisterCollection::setForward(&regs, req.key, ClusterConnectionString(req.conn.toString())));
regs.getInterface(req.key, id).forward.send(req);
}
forwarders.add(LeaderRegisterCollection::setForward(&regs, req.key,
ClusterConnectionString(req.conn.toString())));
regs.getInterface(req.key, id).forward.send(req);
}
}
when(wait(forwarders.getResult())) {

View File

@ -631,7 +631,6 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
// Coordination
init( COORDINATED_STATE_ONCONFLICT_POLL_INTERVAL, 1.0 ); if( randomize && BUGGIFY ) COORDINATED_STATE_ONCONFLICT_POLL_INTERVAL = 10.0;
init( FORWARD_REQUEST_TOO_OLD, 600.0 ); if( randomize && BUGGIFY ) FORWARD_REQUEST_TOO_OLD = 60.0;
init( ENABLE_CROSS_CLUSTER_SUPPORT, true ); if( randomize && BUGGIFY ) ENABLE_CROSS_CLUSTER_SUPPORT = false;
// Buggification

View File

@ -559,9 +559,7 @@ public:
// Coordination
double COORDINATED_STATE_ONCONFLICT_POLL_INTERVAL;
double FORWARD_REQUEST_TOO_OLD;
bool ENABLE_CROSS_CLUSTER_SUPPORT; // Allow a coordinator to serve requests whose connection string does not match
// the local copy
bool ENABLE_CROSS_CLUSTER_SUPPORT; // Allow a coordinator to serve requests whose connection string does not match the local descriptor
// Buggification
double BUGGIFIED_EVENTUAL_CONSISTENCY;

View File

@ -1691,7 +1691,7 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors,
} else if (testConfig.extraDB == 2) {
// The DR database is a new database
g_simulator.extraDB =
new ClusterConnectionString(extraCoordinatorAddresses, LiteralStringRef("ExtraCluster:0"));
new ClusterConnectionString(extraCoordinatorAddresses, LiteralStringRef("ExtraCluster:0"));
} else if (testConfig.extraDB == 3) {
// The DR database is the same database
g_simulator.extraDB = new ClusterConnectionString(coordinatorAddresses, LiteralStringRef("TestCluster:0"));

View File

@ -270,6 +270,7 @@ struct ConfigureDatabaseWorkload : TestWorkload {
return Void();
}
state int randomChoice = deterministicRandom()->randomInt(0, 8);
if (randomChoice == 0) {
wait(success(
runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) -> Future<Optional<Value>> {
@ -316,8 +317,13 @@ struct ConfigureDatabaseWorkload : TestWorkload {
} else if (randomChoice == 4) {
//TraceEvent("ConfigureTestQuorumBegin").detail("NewQuorum", s);
auto ch = autoQuorumChange();
std::string desiredClusterName = "NewName%d";
if (! SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT) {
// if configuration does not allow changing the descriptor, pass empty string (keep old descriptor)
desiredClusterName = "";
}
if (deterministicRandom()->randomInt(0, 2))
ch = nameQuorumChange(format("NewName%d", deterministicRandom()->randomInt(0, 100)), ch);
ch = nameQuorumChange(format(desiredClusterName.c_str(), deterministicRandom()->randomInt(0, 100)), ch);
wait(success(changeQuorum(cx, ch)));
//TraceEvent("ConfigureTestConfigureEnd").detail("NewQuorum", s);
} else if (randomChoice == 5) {

View File

@ -936,7 +936,8 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
// test change coordinators and cluster description
// we randomly pick one process(not coordinator) and add it, in this case, it should always succeed
{
state std::string new_cluster_description = deterministicRandom()->randomAlphaNumeric(8);
// choose a new description if configuration allows transactions across differently named clusters
state std::string new_cluster_description = SERVER_KNOBS->ENABLE_CROSS_CLUSTER_SUPPORT ? deterministicRandom()->randomAlphaNumeric(8) : cs.clusterKeyName().toString();
state std::string new_coordinator_process;
state std::vector<std::string> old_coordinators_processes;
state bool possible_to_add_coordinator;