Revert "Refactor: ClusterController driving cluster-recovery state machine"

This reverts commit dfe9d184ff5dd66bdbbc5b984688ac3ebb15b901.
This commit is contained in:
Aaron Molitor 2021-12-23 12:27:35 -08:00 committed by Aaron Molitor
parent d174bb2e06
commit 30b05b469c
15 changed files with 2153 additions and 2547 deletions

View File

@ -1086,7 +1086,7 @@ ACTOR Future<Void> backupWorker(BackupInterface interf,
TraceEvent("BackupWorkerDone", self.myId).detail("BackupEpoch", self.backupEpoch);
// Notify master so that this worker can be removed from log system, then this
// worker (for an old epoch's unfinished work) can safely exit.
wait(brokenPromiseToNever(db->get().clusterInterface.notifyBackupWorkerDone.getReply(
wait(brokenPromiseToNever(db->get().master.notifyBackupWorkerDone.getReply(
BackupWorkerDoneRequest(self.myId, self.backupEpoch))));
break;
}

File diff suppressed because it is too large Load Diff

View File

@ -642,8 +642,6 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
self->commitVersion = versionReply.version;
self->prevVersion = versionReply.prevVersion;
//TraceEvent("CPGetVersion", pProxyCommitData->dbgid).detail("Master", pProxyCommitData->master.id().toString()).detail("CommitVersion", self->commitVersion).detail("PrvVersion", self->prevVersion);
for (auto it : versionReply.resolverChanges) {
auto rs = pProxyCommitData->keyResolvers.modify(it.range);
for (auto r = rs.begin(); r != rs.end(); ++r)
@ -880,7 +878,7 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
if (!self->isMyFirstBatch &&
pProxyCommitData->txnStateStore->readValue(coordinatorsKey).get().get() != self->oldCoordinators.get()) {
wait(brokenPromiseToNever(pProxyCommitData->db->get().clusterInterface.changeCoordinators.getReply(
wait(brokenPromiseToNever(pProxyCommitData->master.changeCoordinators.getReply(
ChangeCoordinatorsRequest(pProxyCommitData->txnStateStore->readValue(coordinatorsKey).get().get()))));
ASSERT(false); // ChangeCoordinatorsRequest should always throw
}
@ -1095,16 +1093,8 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
applyMetadataEffect(self);
if (debugID.present()) {
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "CommitProxyServer.commitBatch.ApplyMetadaEffect");
}
determineCommittedTransactions(self);
if (debugID.present()) {
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "CommitProxyServer.commitBatch.ApplyMetadaEffect");
}
if (self->forceRecovery) {
wait(Future<Void>(Never()));
}
@ -1112,18 +1102,9 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
// First pass
wait(applyMetadataToCommittedTransactions(self));
if (debugID.present()) {
g_traceBatch.addEvent(
"CommitDebug", debugID.get().first(), "CommitProxyServer.commitBatch.ApplyMetadaToCommittedTxn");
}
// Second pass
wait(assignMutationsToStorageServers(self));
if (debugID.present()) {
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "CommitProxyServer.commitBatch.AssignMutationToSS");
}
// Serialize and backup the mutations as a single mutation
if ((pProxyCommitData->vecBackupKeys.size() > 1) && self->logRangeMutations.size()) {
wait(addBackupMutations(pProxyCommitData,
@ -1260,7 +1241,7 @@ ACTOR Future<Void> transactionLogging(CommitBatchContext* self) {
}
} catch (Error& e) {
if (e.code() == error_code_broken_promise) {
throw tlog_failed();
throw master_tlog_failed();
}
throw;
}
@ -1292,10 +1273,8 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
const Optional<UID>& debugID = self->debugID;
if (self->prevVersion && self->commitVersion - self->prevVersion < SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT / 2) {
//TraceEvent("CPAdvanceMinVersion", self->pProxyCommitData->dbgid).detail("PrvVersion", self->prevVersion).detail("CommitVersion", self->commitVersion).detail("Master", self->pProxyCommitData->master.id().toString()).detail("TxSize", self->trs.size());
if (self->prevVersion && self->commitVersion - self->prevVersion < SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT / 2)
debug_advanceMinCommittedVersion(UID(), self->commitVersion);
}
//TraceEvent("ProxyPushed", pProxyCommitData->dbgid).detail("PrevVersion", prevVersion).detail("Version", commitVersion);
if (debugID.present())
@ -2020,7 +1999,6 @@ ACTOR Future<Void> processTransactionStateRequestPart(TransactionStateResolveCon
ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
MasterInterface master,
LifetimeToken masterLifetime,
Reference<AsyncVar<ServerDBInfo> const> db,
LogEpoch epoch,
Version recoveryTransactionVersion,
@ -2035,7 +2013,8 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
state Future<Void> lastCommitComplete = Void();
state PromiseStream<Future<Void>> addActor;
state Future<Void> onError = transformError(actorCollection(addActor.getFuture()), broken_promise(), tlog_failed());
state Future<Void> onError =
transformError(actorCollection(addActor.getFuture()), broken_promise(), master_tlog_failed());
state double lastCommit = 0;
state GetHealthMetricsReply healthMetricsReply;
@ -2047,7 +2026,7 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
//TraceEvent("CommitProxyInit1", proxy.id());
// Wait until we can load the "real" logsystem, since we don't support switching them currently
while (!(masterLifetime.isEqual(commitData.db->get().masterLifetime) &&
while (!(commitData.db->get().master.id() == master.id() &&
commitData.db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION)) {
//TraceEvent("ProxyInit2", proxy.id()).detail("LSEpoch", db->get().logSystemConfig.epoch).detail("Need", epoch);
wait(commitData.db->onChange());
@ -2109,7 +2088,7 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
loop choose {
when(wait(dbInfoChange)) {
dbInfoChange = commitData.db->onChange();
if (masterLifetime.isEqual(commitData.db->get().masterLifetime) &&
if (commitData.db->get().master.id() == master.id() &&
commitData.db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION) {
commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), commitData.db->get(), false, addActor);
for (auto it : commitData.tag_popped) {
@ -2127,9 +2106,7 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
const std::vector<CommitTransactionRequest>& trs = batchedRequests.first;
int batchBytes = batchedRequests.second;
//TraceEvent("CommitProxyCTR", proxy.id()).detail("CommitTransactions", trs.size()).detail("TransactionRate", transactionRate).detail("TransactionQueue", transactionQueue.size()).detail("ReleasedTransactionCount", transactionCount);
//TraceEvent("CommitProxyCore", commitData.dbgid).detail("TxSize", trs.size()).detail("MasterLifetime", masterLifetime.toString()).detail("DbMasterLifetime", commitData.db->get().masterLifetime.toString()).detail("RecoveryState", commitData.db->get().recoveryState).detail("CCInf", commitData.db->get().clusterInterface.id().toString());
if (trs.size() || (commitData.db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS &&
masterLifetime.isEqual(commitData.db->get().masterLifetime) &&
now() - lastCommit >= SERVER_KNOBS->MAX_COMMIT_BATCH_INTERVAL)) {
lastCommit = now();
@ -2178,7 +2155,6 @@ ACTOR Future<Void> commitProxyServer(CommitProxyInterface proxy,
try {
state Future<Void> core = commitProxyServerCore(proxy,
req.master,
req.masterLifetime,
db,
req.recoveryCount,
req.recoveryTransactionVersion,
@ -2189,7 +2165,7 @@ ACTOR Future<Void> commitProxyServer(CommitProxyInterface proxy,
TraceEvent("CommitProxyTerminated", proxy.id()).error(e, true);
if (e.code() != error_code_worker_removed && e.code() != error_code_tlog_stopped &&
e.code() != error_code_tlog_failed && e.code() != error_code_coordinators_changed &&
e.code() != error_code_master_tlog_failed && e.code() != error_code_coordinators_changed &&
e.code() != error_code_coordinated_state_conflict && e.code() != error_code_new_coordinators_timed_out &&
e.code() != error_code_failed_to_progress) {
throw;

View File

@ -920,12 +920,12 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
ACTOR Future<Void> grvProxyServerCore(GrvProxyInterface proxy,
MasterInterface master,
LifetimeToken masterLifetime,
Reference<AsyncVar<ServerDBInfo> const> db) {
state GrvProxyData grvProxyData(proxy.id(), master, proxy.getConsistentReadVersion, db);
state PromiseStream<Future<Void>> addActor;
state Future<Void> onError = transformError(actorCollection(addActor.getFuture()), broken_promise(), tlog_failed());
state Future<Void> onError =
transformError(actorCollection(addActor.getFuture()), broken_promise(), master_tlog_failed());
state GetHealthMetricsReply healthMetricsReply;
state GetHealthMetricsReply detailedHealthMetricsReply;
@ -933,14 +933,9 @@ ACTOR Future<Void> grvProxyServerCore(GrvProxyInterface proxy,
addActor.send(waitFailureServer(proxy.waitFailure.getFuture()));
addActor.send(traceRole(Role::GRV_PROXY, proxy.id()));
TraceEvent("GrvProxyServerCore", proxy.id())
.detail("MasterId", master.id().toString())
.detail("MasterLifetime", masterLifetime.toString())
.detail("RecoveryCount", db->get().recoveryCount);
// Wait until we can load the "real" logsystem, since we don't support switching them currently
while (!(masterLifetime.isEqual(grvProxyData.db->get().masterLifetime) &&
grvProxyData.db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS)) {
while (!(grvProxyData.db->get().master.id() == master.id() &&
grvProxyData.db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION)) {
wait(grvProxyData.db->onChange());
}
// Do we need to wait for any db info change? Yes. To update latency band.
@ -961,7 +956,7 @@ ACTOR Future<Void> grvProxyServerCore(GrvProxyInterface proxy,
when(wait(dbInfoChange)) {
dbInfoChange = grvProxyData.db->onChange();
if (masterLifetime.isEqual(grvProxyData.db->get().masterLifetime) &&
if (grvProxyData.db->get().master.id() == master.id() &&
grvProxyData.db->get().recoveryState >= RecoveryState::RECOVERY_TRANSACTION) {
grvProxyData.logSystem =
ILogSystem::fromServerDBInfo(proxy.id(), grvProxyData.db->get(), false, addActor);
@ -988,13 +983,13 @@ ACTOR Future<Void> grvProxyServer(GrvProxyInterface proxy,
InitializeGrvProxyRequest req,
Reference<AsyncVar<ServerDBInfo> const> db) {
try {
state Future<Void> core = grvProxyServerCore(proxy, req.master, req.masterLifetime, db);
state Future<Void> core = grvProxyServerCore(proxy, req.master, db);
wait(core || checkRemoved(db, req.recoveryCount, proxy));
} catch (Error& e) {
TraceEvent("GrvProxyTerminated", proxy.id()).error(e, true);
if (e.code() != error_code_worker_removed && e.code() != error_code_tlog_stopped &&
e.code() != error_code_tlog_failed && e.code() != error_code_coordinators_changed &&
e.code() != error_code_master_tlog_failed && e.code() != error_code_coordinators_changed &&
e.code() != error_code_coordinated_state_conflict && e.code() != error_code_new_coordinators_timed_out) {
throw;
}

View File

@ -22,13 +22,11 @@
#define FDBSERVER_MASTERINTERFACE_H
#pragma once
#include "fdbclient/CommitProxyInterface.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/DatabaseConfiguration.h"
#include "fdbserver/TLogInterface.h"
#include "fdbclient/Notified.h"
typedef uint64_t DBRecoveryCount;
@ -36,17 +34,20 @@ struct MasterInterface {
constexpr static FileIdentifier file_identifier = 5979145;
LocalityData locality;
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream<struct TLogRejoinRequest>
tlogRejoin; // sent by tlog (whether or not rebooted) to communicate with a new master
RequestStream<struct ChangeCoordinatorsRequest> changeCoordinators;
RequestStream<struct GetCommitVersionRequest> getCommitVersion;
RequestStream<struct BackupWorkerDoneRequest> notifyBackupWorkerDone;
// Get the centralized live committed version reported by commit proxies.
RequestStream<struct GetRawCommittedVersionRequest> getLiveCommittedVersion;
// Report a proxy's committed version.
RequestStream<struct ReportRawCommittedVersionRequest> reportLiveCommittedVersion;
RequestStream<struct UpdateRecoveryDataRequest> updateRecoveryData;
NetworkAddress address() const { return getCommitVersion.getEndpoint().getPrimaryAddress(); }
NetworkAddressList addresses() const { return getCommitVersion.getEndpoint().addresses; }
NetworkAddress address() const { return changeCoordinators.getEndpoint().getPrimaryAddress(); }
NetworkAddressList addresses() const { return changeCoordinators.getEndpoint().addresses; }
UID id() const { return getCommitVersion.getEndpoint().token; }
UID id() const { return changeCoordinators.getEndpoint().token; }
template <class Archive>
void serialize(Archive& ar) {
if constexpr (!is_fb_function<Archive>) {
@ -54,28 +55,61 @@ struct MasterInterface {
}
serializer(ar, locality, waitFailure);
if (Archive::isDeserializing) {
tlogRejoin = RequestStream<struct TLogRejoinRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(1));
changeCoordinators =
RequestStream<struct ChangeCoordinatorsRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(2));
getCommitVersion =
RequestStream<struct GetCommitVersionRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(1));
RequestStream<struct GetCommitVersionRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(3));
notifyBackupWorkerDone =
RequestStream<struct BackupWorkerDoneRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(4));
getLiveCommittedVersion =
RequestStream<struct GetRawCommittedVersionRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(2));
RequestStream<struct GetRawCommittedVersionRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(5));
reportLiveCommittedVersion = RequestStream<struct ReportRawCommittedVersionRequest>(
waitFailure.getEndpoint().getAdjustedEndpoint(3));
updateRecoveryData =
RequestStream<struct UpdateRecoveryDataRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(4));
waitFailure.getEndpoint().getAdjustedEndpoint(6));
}
}
void initEndpoints() {
std::vector<std::pair<FlowReceiver*, TaskPriority>> streams;
streams.push_back(waitFailure.getReceiver());
streams.push_back(tlogRejoin.getReceiver(TaskPriority::MasterTLogRejoin));
streams.push_back(changeCoordinators.getReceiver());
streams.push_back(getCommitVersion.getReceiver(TaskPriority::GetConsistentReadVersion));
streams.push_back(notifyBackupWorkerDone.getReceiver());
streams.push_back(getLiveCommittedVersion.getReceiver(TaskPriority::GetLiveCommittedVersion));
streams.push_back(reportLiveCommittedVersion.getReceiver(TaskPriority::ReportLiveCommittedVersion));
streams.push_back(updateRecoveryData.getReceiver(TaskPriority::UpdateRecoveryTransactionVersion));
FlowTransport::transport().addEndpoints(streams);
}
};
struct TLogRejoinReply {
constexpr static FileIdentifier file_identifier = 11;
// false means someone else registered, so we should re-register. true means this master is recovered, so don't
// send again to the same master.
bool masterIsRecovered;
TLogRejoinReply() = default;
explicit TLogRejoinReply(bool masterIsRecovered) : masterIsRecovered(masterIsRecovered) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, masterIsRecovered);
}
};
struct TLogRejoinRequest {
constexpr static FileIdentifier file_identifier = 15692200;
TLogInterface myInterface;
ReplyPromise<TLogRejoinReply> reply;
TLogRejoinRequest() {}
explicit TLogRejoinRequest(const TLogInterface& interf) : myInterface(interf) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, myInterface, reply);
}
};
struct ChangeCoordinatorsRequest {
constexpr static FileIdentifier file_identifier = 13605416;
Standalone<StringRef> newConnectionString;
@ -150,26 +184,6 @@ struct GetCommitVersionRequest {
}
};
struct UpdateRecoveryDataRequest {
constexpr static FileIdentifier file_identifier = 13605417;
Version recoveryTransactionVersion;
Version lastEpochEnd;
std::vector<CommitProxyInterface> commitProxies;
ReplyPromise<Void> reply;
UpdateRecoveryDataRequest() {}
UpdateRecoveryDataRequest(Version recoveryTransactionVersion,
Version lastEpochEnd,
std::vector<CommitProxyInterface> commitProxies)
: recoveryTransactionVersion(recoveryTransactionVersion), lastEpochEnd(lastEpochEnd),
commitProxies(commitProxies) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, recoveryTransactionVersion, lastEpochEnd, commitProxies, reply);
}
};
struct ReportRawCommittedVersionRequest {
constexpr static FileIdentifier file_identifier = 1853148;
Version version;
@ -193,6 +207,21 @@ struct ReportRawCommittedVersionRequest {
}
};
struct BackupWorkerDoneRequest {
constexpr static FileIdentifier file_identifier = 8736351;
UID workerUID;
LogEpoch backupEpoch;
ReplyPromise<Void> reply;
BackupWorkerDoneRequest() : workerUID(), backupEpoch(-1) {}
BackupWorkerDoneRequest(UID id, LogEpoch epoch) : workerUID(id), backupEpoch(epoch) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, workerUID, backupEpoch, reply);
}
};
struct LifetimeToken {
UID ccID;
int64_t count;
@ -202,9 +231,6 @@ struct LifetimeToken {
bool isStillValid(LifetimeToken const& latestToken, bool isLatestID) const {
return ccID == latestToken.ccID && (count >= latestToken.count || isLatestID);
}
bool isEqual(LifetimeToken const& toCompare) {
return ccID.compare(toCompare.ccID) == 0 && count == toCompare.count;
}
std::string toString() const { return ccID.shortString() + format("#%lld", count); }
void operator++() { ++count; }
@ -214,18 +240,4 @@ struct LifetimeToken {
}
};
struct CommitProxyVersionReplies {
std::map<uint64_t, GetCommitVersionReply> replies;
NotifiedVersion latestRequestNum;
CommitProxyVersionReplies(CommitProxyVersionReplies&& r) noexcept
: replies(std::move(r.replies)), latestRequestNum(std::move(r.latestRequestNum)) {}
void operator=(CommitProxyVersionReplies&& r) noexcept {
replies = std::move(r.replies);
latestRequestNum = std::move(r.latestRequestNum);
}
CommitProxyVersionReplies() : latestRequestNum(0) {}
};
#endif

View File

@ -1249,11 +1249,11 @@ ACTOR Future<Void> commitQueue(TLogData* self) {
}
}
ACTOR Future<Void> rejoinClusterController(TLogData* self,
TLogInterface tli,
DBRecoveryCount recoveryCount,
Future<Void> registerWithCC) {
state LifetimeToken lastMasterLifetime;
ACTOR Future<Void> rejoinMasters(TLogData* self,
TLogInterface tli,
DBRecoveryCount recoveryCount,
Future<Void> registerWithMaster) {
state UID lastMasterID(0, 0);
loop {
auto const& inf = self->dbInfo->get();
bool isDisplaced =
@ -1274,21 +1274,18 @@ ACTOR Future<Void> rejoinClusterController(TLogData* self,
throw worker_removed();
}
if (registerWithCC.isReady()) {
if (!lastMasterLifetime.isEqual(self->dbInfo->get().masterLifetime)) {
if (registerWithMaster.isReady()) {
if (self->dbInfo->get().master.id() != lastMasterID) {
// The TLogRejoinRequest is needed to establish communications with a new master, which doesn't have our
// TLogInterface
TLogRejoinRequest req;
req.myInterface = tli;
TraceEvent("TLogRejoining", tli.id())
.detail("ClusterController", self->dbInfo->get().clusterInterface.id())
.detail("DbInfoMasterLifeTime", self->dbInfo->get().masterLifetime.toString())
.detail("LastMasterLifeTime", lastMasterLifetime.toString());
TraceEvent("TLogRejoining", tli.id()).detail("Master", self->dbInfo->get().master.id());
choose {
when(TLogRejoinReply rep = wait(
brokenPromiseToNever(self->dbInfo->get().clusterInterface.tlogRejoin.getReply(req)))) {
when(TLogRejoinReply rep =
wait(brokenPromiseToNever(self->dbInfo->get().master.tlogRejoin.getReply(req)))) {
if (rep.masterIsRecovered)
lastMasterLifetime = self->dbInfo->get().masterLifetime;
lastMasterID = self->dbInfo->get().master.id();
}
when(wait(self->dbInfo->onChange())) {}
}
@ -1296,7 +1293,7 @@ ACTOR Future<Void> rejoinClusterController(TLogData* self,
wait(self->dbInfo->onChange());
}
} else {
wait(registerWithCC || self->dbInfo->onChange());
wait(registerWithMaster || self->dbInfo->onChange());
}
}
}
@ -1482,7 +1479,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self, LocalityData locality)
ASSERT(fVers.get().size() == fRecoverCounts.get().size());
state int idx = 0;
state Promise<Void> registerWithCC;
state Promise<Void> registerWithMaster;
for (idx = 0; idx < fVers.get().size(); idx++) {
state KeyRef rawId = fVers.get()[idx].key.removePrefix(persistCurrentVersionKeys.begin);
UID id1 = BinaryReader::fromStringRef<UID>(rawId, Unversioned());
@ -1516,7 +1513,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self, LocalityData locality)
logData->version.set(ver);
logData->recoveryCount =
BinaryReader::fromStringRef<DBRecoveryCount>(fRecoverCounts.get()[idx].value, Unversioned());
logData->removed = rejoinClusterController(self, recruited, logData->recoveryCount, registerWithCC.getFuture());
logData->removed = rejoinMasters(self, recruited, logData->recoveryCount, registerWithMaster.getFuture());
removed.push_back(errorOr(logData->removed));
TraceEvent("TLogRestorePersistentStateVer", id1).detail("Ver", ver);
@ -1620,8 +1617,8 @@ ACTOR Future<Void> restorePersistentState(TLogData* self, LocalityData locality)
self->sharedActors.send(tLogCore(self, it.second));
}
if (registerWithCC.canBeSet())
registerWithCC.send(Void());
if (registerWithMaster.canBeSet())
registerWithMaster.send(Void());
return Void();
}

View File

@ -1729,12 +1729,12 @@ ACTOR Future<Void> initPersistentState(TLogData* self, Reference<LogData> logDat
return Void();
}
ACTOR Future<Void> rejoinClusterController(TLogData* self,
TLogInterface tli,
DBRecoveryCount recoveryCount,
Future<Void> registerWithCC,
bool isPrimary) {
state LifetimeToken lastMasterLifetime;
ACTOR Future<Void> rejoinMasters(TLogData* self,
TLogInterface tli,
DBRecoveryCount recoveryCount,
Future<Void> registerWithMaster,
bool isPrimary) {
state UID lastMasterID(0, 0);
loop {
auto const& inf = self->dbInfo->get();
bool isDisplaced =
@ -1762,20 +1762,17 @@ ACTOR Future<Void> rejoinClusterController(TLogData* self,
throw worker_removed();
}
if (registerWithCC.isReady()) {
if (!lastMasterLifetime.isEqual(self->dbInfo->get().masterLifetime)) {
if (registerWithMaster.isReady()) {
if (self->dbInfo->get().master.id() != lastMasterID) {
// The TLogRejoinRequest is needed to establish communications with a new master, which doesn't have our
// TLogInterface
TLogRejoinRequest req(tli);
TraceEvent("TLogRejoining", tli.id())
.detail("ClusterController", self->dbInfo->get().clusterInterface.id())
.detail("DbInfoMasterLifeTime", self->dbInfo->get().masterLifetime.toString())
.detail("LastMasterLifeTime", lastMasterLifetime.toString());
TraceEvent("TLogRejoining", tli.id()).detail("Master", self->dbInfo->get().master.id());
choose {
when(TLogRejoinReply rep = wait(
brokenPromiseToNever(self->dbInfo->get().clusterInterface.tlogRejoin.getReply(req)))) {
when(TLogRejoinReply rep =
wait(brokenPromiseToNever(self->dbInfo->get().master.tlogRejoin.getReply(req)))) {
if (rep.masterIsRecovered)
lastMasterLifetime = self->dbInfo->get().masterLifetime;
lastMasterID = self->dbInfo->get().master.id();
}
when(wait(self->dbInfo->onChange())) {}
}
@ -1783,7 +1780,7 @@ ACTOR Future<Void> rejoinClusterController(TLogData* self,
wait(self->dbInfo->onChange());
}
} else {
wait(registerWithCC || self->dbInfo->onChange());
wait(registerWithMaster || self->dbInfo->onChange());
}
}
}
@ -2387,7 +2384,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
}
state int idx = 0;
state Promise<Void> registerWithCC;
state Promise<Void> registerWithMaster;
state std::map<UID, TLogInterface> id_interf;
for (idx = 0; idx < fVers.get().size(); idx++) {
state KeyRef rawId = fVers.get()[idx].key.removePrefix(persistCurrentVersionKeys.begin);
@ -2435,7 +2432,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
logData->recoveryCount =
BinaryReader::fromStringRef<DBRecoveryCount>(fRecoverCounts.get()[idx].value, Unversioned());
logData->removed =
rejoinClusterController(self, recruited, logData->recoveryCount, registerWithCC.getFuture(), false);
rejoinMasters(self, recruited, logData->recoveryCount, registerWithMaster.getFuture(), false);
removed.push_back(errorOr(logData->removed));
TraceEvent("TLogRestorePersistentStateVer", id1).detail("Ver", ver);
@ -2537,8 +2534,8 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
self->sharedActors.send(tLogCore(self, it.second, id_interf[it.first], false));
}
if (registerWithCC.canBeSet())
registerWithCC.send(Void());
if (registerWithMaster.canBeSet())
registerWithMaster.send(Void());
return Void();
}
@ -2656,7 +2653,7 @@ ACTOR Future<Void> tLogStart(TLogData* self, InitializeTLogRequest req, Locality
self->id_data[recruited.id()] = logData;
logData->locality = req.locality;
logData->recoveryCount = req.epoch;
logData->removed = rejoinClusterController(self, recruited, req.epoch, Future<Void>(Void()), req.isPrimary);
logData->removed = rejoinMasters(self, recruited, req.epoch, Future<Void>(Void()), req.isPrimary);
self->queueOrder.push_back(recruited.id());
TraceEvent("TLogStart", logData->logId).log();

View File

@ -2174,12 +2174,12 @@ ACTOR Future<Void> initPersistentState(TLogData* self, Reference<LogData> logDat
return Void();
}
ACTOR Future<Void> rejoinClusterController(TLogData* self,
TLogInterface tli,
DBRecoveryCount recoveryCount,
Future<Void> registerWithCC,
bool isPrimary) {
state LifetimeToken lastMasterLifetime;
ACTOR Future<Void> rejoinMasters(TLogData* self,
TLogInterface tli,
DBRecoveryCount recoveryCount,
Future<Void> registerWithMaster,
bool isPrimary) {
state UID lastMasterID(0, 0);
loop {
auto const& inf = self->dbInfo->get();
bool isDisplaced =
@ -2207,20 +2207,17 @@ ACTOR Future<Void> rejoinClusterController(TLogData* self,
throw worker_removed();
}
if (registerWithCC.isReady()) {
if (!lastMasterLifetime.isEqual(self->dbInfo->get().masterLifetime)) {
if (registerWithMaster.isReady()) {
if (self->dbInfo->get().master.id() != lastMasterID) {
// The TLogRejoinRequest is needed to establish communications with a new master, which doesn't have our
// TLogInterface
TLogRejoinRequest req(tli);
TraceEvent("TLogRejoining", tli.id())
.detail("ClusterController", self->dbInfo->get().clusterInterface.id())
.detail("DbInfoMasterLifeTime", self->dbInfo->get().masterLifetime.toString())
.detail("LastMasterLifeTime", lastMasterLifetime.toString());
TraceEvent("TLogRejoining", tli.id()).detail("Master", self->dbInfo->get().master.id());
choose {
when(TLogRejoinReply rep = wait(
brokenPromiseToNever(self->dbInfo->get().clusterInterface.tlogRejoin.getReply(req)))) {
when(TLogRejoinReply rep =
wait(brokenPromiseToNever(self->dbInfo->get().master.tlogRejoin.getReply(req)))) {
if (rep.masterIsRecovered)
lastMasterLifetime = self->dbInfo->get().masterLifetime;
lastMasterID = self->dbInfo->get().master.id();
}
when(wait(self->dbInfo->onChange())) {}
}
@ -2228,7 +2225,7 @@ ACTOR Future<Void> rejoinClusterController(TLogData* self,
wait(self->dbInfo->onChange());
}
} else {
wait(registerWithCC || self->dbInfo->onChange());
wait(registerWithMaster || self->dbInfo->onChange());
}
}
}
@ -2849,7 +2846,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
}
state int idx = 0;
state Promise<Void> registerWithCC;
state Promise<Void> registerWithMaster;
state std::map<UID, TLogInterface> id_interf;
state std::vector<std::pair<Version, UID>> logsByVersion;
for (idx = 0; idx < fVers.get().size(); idx++) {
@ -2897,7 +2894,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
logData->recoveryCount =
BinaryReader::fromStringRef<DBRecoveryCount>(fRecoverCounts.get()[idx].value, Unversioned());
logData->removed =
rejoinClusterController(self, recruited, logData->recoveryCount, registerWithCC.getFuture(), false);
rejoinMasters(self, recruited, logData->recoveryCount, registerWithMaster.getFuture(), false);
removed.push_back(errorOr(logData->removed));
logsByVersion.emplace_back(ver, id1);
@ -3020,8 +3017,8 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
self->sharedActors.send(tLogCore(self, it.second, id_interf[it.first], false));
}
if (registerWithCC.canBeSet())
registerWithCC.send(Void());
if (registerWithMaster.canBeSet())
registerWithMaster.send(Void());
return Void();
}
@ -3138,7 +3135,7 @@ ACTOR Future<Void> tLogStart(TLogData* self, InitializeTLogRequest req, Locality
self->id_data[recruited.id()] = logData;
logData->locality = req.locality;
logData->recoveryCount = req.epoch;
logData->removed = rejoinClusterController(self, recruited, req.epoch, Future<Void>(Void()), req.isPrimary);
logData->removed = rejoinMasters(self, recruited, req.epoch, Future<Void>(Void()), req.isPrimary);
self->popOrder.push_back(recruited.id());
self->spillOrder.push_back(recruited.id());

View File

@ -319,9 +319,9 @@ struct TLogData : NonCopyable {
// data is written from. This value is restored from disk when the tlog
// restarts.
UID durableClusterId;
// The cluster-controller cluster ID stores the cluster ID read from the txnStateStore.
// The master cluster ID stores the cluster ID read from the txnStateStore.
// It is cached in this variable.
UID ccClusterId;
UID masterClusterId;
UID dbgid;
UID workerID;
@ -783,7 +783,7 @@ void TLogQueue::updateVersionSizes(const TLogQueueEntry& result,
ACTOR Future<Void> tLogLock(TLogData* self, ReplyPromise<TLogLockResult> reply, Reference<LogData> logData) {
state Version stopVersion = logData->version.get();
TEST(true); // TLog stopped by recovering cluster-controller
TEST(true); // TLog stopped by recovering master
TEST(logData->stopped); // logData already stopped
TEST(!logData->stopped); // logData not yet stopped
@ -2020,7 +2020,7 @@ ACTOR Future<Void> doQueueCommit(TLogData* self,
logData->recoveryComplete.send(Void());
}
TraceEvent("TLogCommitDurable", self->dbgid).detail("Version", ver);
//TraceEvent("TLogCommitDurable", self->dbgid).detail("Version", ver);
if (logData->logSystem->get() &&
(!logData->isPrimary || logData->logRouterPoppedVersion < logData->logRouterPopToVersion)) {
logData->logRouterPoppedVersion = ver;
@ -2251,12 +2251,12 @@ ACTOR Future<UID> getClusterId(TLogData* self) {
}
}
ACTOR Future<Void> rejoinClusterController(TLogData* self,
TLogInterface tli,
DBRecoveryCount recoveryCount,
Future<Void> registerWithCC,
bool isPrimary) {
state LifetimeToken lastMasterLifetime;
ACTOR Future<Void> rejoinMasters(TLogData* self,
TLogInterface tli,
DBRecoveryCount recoveryCount,
Future<Void> registerWithMaster,
bool isPrimary) {
state UID lastMasterID(0, 0);
loop {
auto const& inf = self->dbInfo->get();
bool isDisplaced =
@ -2284,27 +2284,24 @@ ACTOR Future<Void> rejoinClusterController(TLogData* self,
// with a different cluster ID.
state UID clusterId = wait(getClusterId(self));
ASSERT(clusterId.isValid());
self->ccClusterId = clusterId;
self->masterClusterId = clusterId;
ev.detail("ClusterId", clusterId).detail("SelfClusterId", self->durableClusterId);
if (BUGGIFY)
wait(delay(SERVER_KNOBS->BUGGIFY_WORKER_REMOVED_MAX_LAG * deterministicRandom()->random01()));
throw worker_removed();
}
if (registerWithCC.isReady()) {
if (!lastMasterLifetime.isEqual(self->dbInfo->get().masterLifetime)) {
if (registerWithMaster.isReady()) {
if (self->dbInfo->get().master.id() != lastMasterID) {
// The TLogRejoinRequest is needed to establish communications with a new master, which doesn't have our
// TLogInterface
TLogRejoinRequest req(tli);
TraceEvent("TLogRejoining", tli.id())
.detail("ClusterController", self->dbInfo->get().clusterInterface.id())
.detail("DbInfoMasterLifeTime", self->dbInfo->get().masterLifetime.toString())
.detail("LastMasterLifeTime", lastMasterLifetime.toString());
TraceEvent("TLogRejoining", tli.id()).detail("Master", self->dbInfo->get().master.id());
choose {
when(TLogRejoinReply rep = wait(
brokenPromiseToNever(self->dbInfo->get().clusterInterface.tlogRejoin.getReply(req)))) {
when(TLogRejoinReply rep =
wait(brokenPromiseToNever(self->dbInfo->get().master.tlogRejoin.getReply(req)))) {
if (rep.masterIsRecovered)
lastMasterLifetime = self->dbInfo->get().masterLifetime;
lastMasterID = self->dbInfo->get().master.id();
}
when(wait(self->dbInfo->onChange())) {}
}
@ -2312,7 +2309,7 @@ ACTOR Future<Void> rejoinClusterController(TLogData* self,
wait(self->dbInfo->onChange());
}
} else {
wait(registerWithCC || self->dbInfo->onChange());
wait(registerWithMaster || self->dbInfo->onChange());
}
}
}
@ -2509,13 +2506,13 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self,
}
// Persist cluster ID once cluster has recovered.
auto ccClusterId = self->dbInfo->get().clusterId;
auto masterClusterId = self->dbInfo->get().clusterId;
if (self->dbInfo->get().recoveryState == RecoveryState::FULLY_RECOVERED &&
!self->durableClusterId.isValid()) {
ASSERT(ccClusterId.isValid());
self->durableClusterId = ccClusterId;
ASSERT(masterClusterId.isValid());
self->durableClusterId = masterClusterId;
self->persistentData->set(
KeyValueRef(persistClusterIdKey, BinaryWriter::toValue(ccClusterId, Unversioned())));
KeyValueRef(persistClusterIdKey, BinaryWriter::toValue(masterClusterId, Unversioned())));
wait(self->persistentData->commit());
}
}
@ -2961,7 +2958,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
}
state int idx = 0;
state Promise<Void> registerWithCC;
state Promise<Void> registerWithMaster;
state std::map<UID, TLogInterface> id_interf;
state std::vector<std::pair<Version, UID>> logsByVersion;
for (idx = 0; idx < fVers.get().size(); idx++) {
@ -3017,7 +3014,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
logData->recoveryCount =
BinaryReader::fromStringRef<DBRecoveryCount>(fRecoverCounts.get()[idx].value, Unversioned());
logData->removed =
rejoinClusterController(self, recruited, logData->recoveryCount, registerWithCC.getFuture(), false);
rejoinMasters(self, recruited, logData->recoveryCount, registerWithMaster.getFuture(), false);
removed.push_back(errorOr(logData->removed));
logsByVersion.emplace_back(ver, id1);
@ -3140,8 +3137,8 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
self->sharedActors.send(tLogCore(self, it.second, id_interf[it.first], false));
}
if (registerWithCC.canBeSet())
registerWithCC.send(Void());
if (registerWithMaster.canBeSet())
registerWithMaster.send(Void());
return Void();
}
@ -3266,7 +3263,7 @@ ACTOR Future<Void> tLogStart(TLogData* self, InitializeTLogRequest req, Locality
self->id_data[recruited.id()] = logData;
logData->locality = req.locality;
logData->recoveryCount = req.epoch;
logData->removed = rejoinClusterController(self, recruited, req.epoch, Future<Void>(Void()), req.isPrimary);
logData->removed = rejoinMasters(self, recruited, req.epoch, Future<Void>(Void()), req.isPrimary);
self->popOrder.push_back(recruited.id());
self->spillOrder.push_back(recruited.id());
@ -3494,13 +3491,13 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
// it should automatically exclude itself to avoid being used in
// the new cluster.
auto recoveryState = self.dbInfo->get().recoveryState;
if (recoveryState == RecoveryState::FULLY_RECOVERED && self.ccClusterId.isValid() &&
self.durableClusterId.isValid() && self.ccClusterId != self.durableClusterId) {
if (recoveryState == RecoveryState::FULLY_RECOVERED && self.masterClusterId.isValid() &&
self.durableClusterId.isValid() && self.masterClusterId != self.durableClusterId) {
state NetworkAddress address = g_network->getLocalAddress();
wait(excludeServers(self.cx, { AddressExclusion{ address.ip, address.port } }));
TraceEvent(SevWarnAlways, "TLogBelongsToExistingCluster")
.detail("ClusterId", self.durableClusterId)
.detail("NewClusterId", self.ccClusterId);
.detail("NewClusterId", self.masterClusterId);
}
// If the tlog has a valid durable cluster ID, we don't want it to
// wipe its data! Throw this error to signal to `tlogTerminated` to

View File

@ -461,8 +461,8 @@ ACTOR Future<Void> TagPartitionedLogSystem::onError_internal(TagPartitionedLogSy
changes.push_back(self->backupWorkerChanged.onTrigger());
ASSERT(failed.size() >= 1);
wait(quorum(changes, 1) || tagError<Void>(quorum(failed, 1), tlog_failed()) ||
tagError<Void>(quorum(backupFailed, 1), backup_worker_failed()));
wait(quorum(changes, 1) || tagError<Void>(quorum(failed, 1), master_tlog_failed()) ||
tagError<Void>(quorum(backupFailed, 1), master_backup_worker_failed()));
}
}
@ -2300,7 +2300,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::recruitOldLogRouters(TagPartitionedL
auto reply = transformErrors(
throwErrorOr(workers[nextRouter].logRouter.getReplyUnlessFailedFor(
req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
cluster_recovery_failed());
master_recovery_failed());
logRouterInitializationReplies.back().push_back(reply);
allReplies.push_back(reply);
nextRouter = (nextRouter + 1) % workers.size();
@ -2349,7 +2349,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::recruitOldLogRouters(TagPartitionedL
auto reply = transformErrors(
throwErrorOr(workers[nextRouter].logRouter.getReplyUnlessFailedFor(
req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
cluster_recovery_failed());
master_recovery_failed());
logRouterInitializationReplies.back().push_back(reply);
allReplies.push_back(reply);
nextRouter = (nextRouter + 1) % workers.size();
@ -2410,7 +2410,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::recruitOldLogRouters(TagPartitionedL
if (!forRemote) {
self->logSystemConfigChanged.trigger();
wait(failed.size() ? tagError<Void>(quorum(failed, 1), tlog_failed()) : Future<Void>(Never()));
wait(failed.size() ? tagError<Void>(quorum(failed, 1), master_tlog_failed()) : Future<Void>(Never()));
throw internal_error();
}
return Void();
@ -2509,7 +2509,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst
throwErrorOr(
remoteWorkers.logRouters[i % remoteWorkers.logRouters.size()].logRouter.getReplyUnlessFailedFor(
req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
cluster_recovery_failed()));
master_recovery_failed()));
}
std::vector<Tag> localTags = TagPartitionedLogSystem::getLocalTags(remoteLocality, allTags);
@ -2587,7 +2587,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst
remoteTLogInitializationReplies.push_back(transformErrors(
throwErrorOr(remoteWorkers.remoteTLogs[i].tLog.getReplyUnlessFailedFor(
remoteTLogReqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
cluster_recovery_failed()));
master_recovery_failed()));
TraceEvent("RemoteLogRecruitment_InitializingRemoteLogs")
.detail("StartVersion", logSet->startVersion)
@ -2616,7 +2616,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst
TLogRecoveryFinishedRequest(),
SERVER_KNOBS->TLOG_TIMEOUT,
SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
cluster_recovery_failed()));
master_recovery_failed()));
self->remoteRecoveryComplete = waitForAll(recoveryComplete);
self->tLogs.push_back(logSet);
@ -2857,7 +2857,7 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
initializationReplies.push_back(transformErrors(
throwErrorOr(recr.tLogs[i].tLog.getReplyUnlessFailedFor(
reqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
cluster_recovery_failed()));
master_recovery_failed()));
state std::vector<Future<Void>> recoveryComplete;
@ -2924,7 +2924,7 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
satelliteInitializationReplies.push_back(transformErrors(
throwErrorOr(recr.satelliteTLogs[i].tLog.getReplyUnlessFailedFor(
sreqs[i], SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
cluster_recovery_failed()));
master_recovery_failed()));
wait(waitForAll(satelliteInitializationReplies) || oldRouterRecruitment);
@ -2940,7 +2940,7 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
TLogRecoveryFinishedRequest(),
SERVER_KNOBS->TLOG_TIMEOUT,
SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
cluster_recovery_failed()));
master_recovery_failed()));
}
wait(waitForAll(initializationReplies) || oldRouterRecruitment);
@ -2955,7 +2955,7 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
// Don't force failure of recovery if it took us a long time to recover. This avoids multiple long running
// recoveries causing tests to timeout
if (BUGGIFY && now() - startTime < 300 && g_network->isSimulated() && g_simulator.speedUpSimulation)
throw cluster_recovery_failed();
throw master_recovery_failed();
for (int i = 0; i < logSystem->tLogs[0]->logServers.size(); i++)
recoveryComplete.push_back(transformErrors(
@ -2963,7 +2963,7 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
TLogRecoveryFinishedRequest(),
SERVER_KNOBS->TLOG_TIMEOUT,
SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
cluster_recovery_failed()));
master_recovery_failed()));
logSystem->recoveryComplete = waitForAll(recoveryComplete);
if (configuration.usableRegions > 1) {
@ -3057,7 +3057,7 @@ ACTOR Future<TLogLockResult> TagPartitionedLogSystem::lockTLog(
UID myID,
Reference<AsyncVar<OptionalInterface<TLogInterface>>> tlog) {
TraceEvent("TLogLockStarted", myID).detail("TLog", tlog->get().id()).detail("InfPresent", tlog->get().present());
TraceEvent("TLogLockStarted", myID).detail("TLog", tlog->get().id());
loop {
choose {
when(TLogLockResult data = wait(

View File

@ -163,25 +163,17 @@ struct ClusterControllerFullInterface {
RequestStream<struct GetServerDBInfoRequest>
getServerDBInfo; // only used by testers; the cluster controller will send the serverDBInfo to workers
RequestStream<struct UpdateWorkerHealthRequest> updateWorkerHealth;
RequestStream<struct TLogRejoinRequest>
tlogRejoin; // sent by tlog (whether or not rebooted) to communicate with a new controller
RequestStream<struct BackupWorkerDoneRequest> notifyBackupWorkerDone;
RequestStream<struct ChangeCoordinatorsRequest> changeCoordinators;
UID id() const { return clientInterface.id(); }
bool operator==(ClusterControllerFullInterface const& r) const { return id() == r.id(); }
bool operator!=(ClusterControllerFullInterface const& r) const { return id() != r.id(); }
NetworkAddress address() const { return clientInterface.address(); }
bool hasMessage() const {
return clientInterface.hasMessage() || recruitFromConfiguration.getFuture().isReady() ||
recruitRemoteFromConfiguration.getFuture().isReady() || recruitStorage.getFuture().isReady() ||
recruitBlobWorker.getFuture().isReady() || registerWorker.getFuture().isReady() ||
getWorkers.getFuture().isReady() || registerMaster.getFuture().isReady() ||
getServerDBInfo.getFuture().isReady() || updateWorkerHealth.getFuture().isReady() ||
tlogRejoin.getFuture().isReady() || notifyBackupWorkerDone.getFuture().isReady() ||
changeCoordinators.getFuture().isReady();
getServerDBInfo.getFuture().isReady() || updateWorkerHealth.getFuture().isReady();
}
void initEndpoints() {
@ -195,9 +187,6 @@ struct ClusterControllerFullInterface {
registerMaster.getEndpoint(TaskPriority::ClusterControllerRegister);
getServerDBInfo.getEndpoint(TaskPriority::ClusterController);
updateWorkerHealth.getEndpoint(TaskPriority::ClusterController);
tlogRejoin.getEndpoint(TaskPriority::MasterTLogRejoin);
notifyBackupWorkerDone.getEndpoint(TaskPriority::ClusterController);
changeCoordinators.getEndpoint(TaskPriority::DefaultEndpoint);
}
template <class Ar>
@ -215,10 +204,7 @@ struct ClusterControllerFullInterface {
getWorkers,
registerMaster,
getServerDBInfo,
updateWorkerHealth,
tlogRejoin,
notifyBackupWorkerDone,
changeCoordinators);
updateWorkerHealth);
}
};
@ -337,11 +323,10 @@ struct RecruitRemoteFromConfigurationReply {
constexpr static FileIdentifier file_identifier = 9091392;
std::vector<WorkerInterface> remoteTLogs;
std::vector<WorkerInterface> logRouters;
Optional<UID> dbgId;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, remoteTLogs, logRouters, dbgId);
serializer(ar, remoteTLogs, logRouters);
}
};
@ -351,7 +336,6 @@ struct RecruitRemoteFromConfigurationRequest {
Optional<Key> dcId;
int logRouterCount;
std::vector<UID> exclusionWorkerIds;
Optional<UID> dbgId;
ReplyPromise<RecruitRemoteFromConfigurationReply> reply;
RecruitRemoteFromConfigurationRequest() {}
@ -364,7 +348,7 @@ struct RecruitRemoteFromConfigurationRequest {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, configuration, dcId, logRouterCount, exclusionWorkerIds, dbgId, reply);
serializer(ar, configuration, dcId, logRouterCount, exclusionWorkerIds, reply);
}
};
@ -502,49 +486,6 @@ struct UpdateWorkerHealthRequest {
}
};
struct TLogRejoinReply {
constexpr static FileIdentifier file_identifier = 11;
// false means someone else registered, so we should re-register. true means this master is recovered, so don't
// send again to the same master.
bool masterIsRecovered;
TLogRejoinReply() = default;
explicit TLogRejoinReply(bool masterIsRecovered) : masterIsRecovered(masterIsRecovered) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, masterIsRecovered);
}
};
struct TLogRejoinRequest {
constexpr static FileIdentifier file_identifier = 15692200;
TLogInterface myInterface;
ReplyPromise<TLogRejoinReply> reply;
TLogRejoinRequest() {}
explicit TLogRejoinRequest(const TLogInterface& interf) : myInterface(interf) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, myInterface, reply);
}
};
struct BackupWorkerDoneRequest {
constexpr static FileIdentifier file_identifier = 8736351;
UID workerUID;
LogEpoch backupEpoch;
ReplyPromise<Void> reply;
BackupWorkerDoneRequest() : workerUID(), backupEpoch(-1) {}
BackupWorkerDoneRequest(UID id, LogEpoch epoch) : workerUID(id), backupEpoch(epoch) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, workerUID, backupEpoch, reply);
}
};
struct InitializeTLogRequest {
constexpr static FileIdentifier file_identifier = 15604392;
UID recruitmentID;
@ -664,7 +605,6 @@ struct RecruitMasterRequest {
struct InitializeCommitProxyRequest {
constexpr static FileIdentifier file_identifier = 10344153;
MasterInterface master;
LifetimeToken masterLifetime;
uint64_t recoveryCount;
Version recoveryTransactionVersion;
bool firstProxy;
@ -672,20 +612,19 @@ struct InitializeCommitProxyRequest {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, master, masterLifetime, recoveryCount, recoveryTransactionVersion, firstProxy, reply);
serializer(ar, master, recoveryCount, recoveryTransactionVersion, firstProxy, reply);
}
};
struct InitializeGrvProxyRequest {
constexpr static FileIdentifier file_identifier = 8265613;
MasterInterface master;
LifetimeToken masterLifetime;
uint64_t recoveryCount;
ReplyPromise<GrvProxyInterface> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, master, masterLifetime, recoveryCount, reply);
serializer(ar, master, recoveryCount, reply);
}
};

File diff suppressed because it is too large Load Diff

View File

@ -700,8 +700,8 @@ TEST_CASE("/fdbserver/worker/addressInDbAndPrimaryDc") {
// Manually set up a master address.
NetworkAddress testAddress(IPAddress(0x13131313), 1);
testDbInfo.master.getCommitVersion =
RequestStream<struct GetCommitVersionRequest>(Endpoint({ testAddress }, UID(1, 2)));
testDbInfo.master.changeCoordinators =
RequestStream<struct ChangeCoordinatorsRequest>(Endpoint({ testAddress }, UID(1, 2)));
// First, create an empty TLogInterface, and check that it shouldn't be considered as in primary DC.
testDbInfo.logSystemConfig.tLogs.push_back(TLogSet());
@ -1772,10 +1772,12 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
startRole(Role::MASTER, recruited.id(), interf.id());
DUMPTOKEN(recruited.waitFailure);
DUMPTOKEN(recruited.tlogRejoin);
DUMPTOKEN(recruited.changeCoordinators);
DUMPTOKEN(recruited.getCommitVersion);
DUMPTOKEN(recruited.getLiveCommittedVersion);
DUMPTOKEN(recruited.reportLiveCommittedVersion);
DUMPTOKEN(recruited.updateRecoveryData);
DUMPTOKEN(recruited.notifyBackupWorkerDone);
// printf("Recruited as masterServer\n");
Future<Void> masterProcess = masterServer(

View File

@ -93,22 +93,21 @@ ERROR( connection_leaked, 1103, "Connection object leaked" )
ERROR( recruitment_failed, 1200, "Recruitment of a server failed" ) // Be careful, catching this will delete the data of a storage server or tlog permanently
ERROR( move_to_removed_server, 1201, "Attempt to move keys to a storage server that was removed" )
ERROR( worker_removed, 1202, "Normal worker shut down" ) // Be careful, catching this will delete the data of a storage server or tlog permanently
ERROR( cluster_recovery_failed, 1203, "Cluster recovery failed")
ERROR( master_recovery_failed, 1203, "Master recovery failed")
ERROR( master_max_versions_in_flight, 1204, "Master hit maximum number of versions in flight" )
ERROR( tlog_failed, 1205, "Cluster recovery terminating because a TLog failed" ) // similar to tlog_stopped, but the tlog has actually died
ERROR( master_tlog_failed, 1205, "Master terminating because a TLog failed" ) // similar to tlog_stopped, but the tlog has actually died
ERROR( worker_recovery_failed, 1206, "Recovery of a worker process failed" )
ERROR( please_reboot, 1207, "Reboot of server process requested" )
ERROR( please_reboot_delete, 1208, "Reboot of server process requested, with deletion of state" )
ERROR( commit_proxy_failed, 1209, "Master terminating because a CommitProxy failed" )
ERROR( resolver_failed, 1210, "Cluster recovery terminating because a Resolver failed" )
ERROR( master_resolver_failed, 1210, "Master terminating because a Resolver failed" )
ERROR( server_overloaded, 1211, "Server is under too much load and cannot respond" )
ERROR( backup_worker_failed, 1212, "Cluster recovery terminating because a backup worker failed")
ERROR( master_backup_worker_failed, 1212, "Master terminating because a backup worker failed")
ERROR( tag_throttled, 1213, "Transaction tag is being throttled" )
ERROR( grv_proxy_failed, 1214, "Cluster recovery terminating because a GRVProxy failed" )
ERROR( grv_proxy_failed, 1214, "Master terminating because a GRVProxy failed" )
ERROR( dd_tracker_cancelled, 1215, "The data distribution tracker has been cancelled" )
ERROR( failed_to_progress, 1216, "Process has failed to make sufficient progress" )
ERROR( invalid_cluster_id, 1217, "Attempted to join cluster with a different cluster ID" )
ERROR( restart_cluster_controller, 1218, "Restart cluster controller process" )
// 15xx Platform errors
ERROR( platform_error, 1500, "Platform error" )

View File

@ -84,7 +84,6 @@ enum class TaskPriority {
GetConsistentReadVersion = 8500,
GetLiveCommittedVersionReply = 8490,
GetLiveCommittedVersion = 8480,
UpdateRecoveryTransactionVersion = 8470,
DefaultPromiseEndpoint = 8000,
DefaultOnMainThread = 7500,
DefaultDelay = 7010,