Refactor: ClusterController driving cluster-recovery state machine

diff-1: Address Jingyu's review comments
 diff-2: Introduce ClusterRecovery actor to seperate out
         cluster recovery code

At present, cluster recovery process consists of following steps:
1. ClusterController clusterWatchDatabase actor recruits
   master/sequencer process.
2. Sequencer process implements the cluster recovery state machine,
   responsible to recruit all other processes as well restore the
   cluster state.

Patch proposes a scheme where the cluster recovery state machine
is implemented and driven by the ClusterController process instead
of the Sequencer process.

Advantages of the scheme could be:
1. Simplified design where ClusterController recruits "sequencer"
   process like other worker processes compared to current scheme
   where "sequencer" process gets special treatment. In newer scheme
   sequencer is responsible for maintaining/providing
   "committed version" (as expected).
2. ClusterController is responsible for worker processes recruitment,
   the sequencer though orchestrating the recovery state machine, it
   need to reachout to the ClusterController for recruiting worker
   processes etc.

NOTE:
Patch has moved the recovery state machine code from
'sequencer' -> 'cluster-controller' process, however, necessary
updates were done for both functionality as well as performance
improvement reasons.

Next Steps:
Cluster recovery documentation will be updated in near future.
This commit is contained in:
Ata E Husain Bohra 2021-12-20 16:22:39 -06:00 committed by Aaron Molitor
parent abd2959702
commit 1520390bc5
5 changed files with 5415 additions and 5280 deletions

View File

@ -8,7 +8,10 @@ set(FDBSERVER_SRCS
BlobManager.actor.cpp
BlobManagerInterface.h
BlobWorker.actor.cpp
ClusterController.actor.h
ClusterController.actor.cpp
ClusterRecovery.actor.h
ClusterRecovery.actor.cpp
ConfigBroadcaster.actor.cpp
ConfigBroadcaster.h
ConfigDatabaseUnitTests.actor.cpp

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,289 @@
/*
* ClusterRecovery.actor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source
// version.
#include <utility>
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_CLUSTERRECOVERY_ACTOR_G_H)
#define FDBSERVER_CLUSTERRECOVERY_ACTOR_G_H
#include "fdbserver/ClusterRecovery.actor.g.h"
#elif !defined(FDBSERVER_CLUSTERRECOVERY_ACTOR_H)
#define FDBSERVER_CLUSTERRECOVERY_ACTOR_H
#include "fdbclient/DatabaseContext.h"
#include "fdbrpc/Replication.h"
#include "fdbrpc/ReplicationUtils.h"
#include "fdbserver/CoordinatedState.h"
#include "fdbserver/CoordinationInterface.h" // copy constructors for ServerCoordinators class
#include "fdbserver/ClusterController.actor.h"
#include "fdbserver/DBCoreState.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/LogSystemConfig.h"
#include "fdbserver/LogSystemDiskQueueAdapter.h"
#include "fdbserver/MoveKeys.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/Error.h"
#include "flow/SystemMonitor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
ACTOR Future<Void> recoveryTerminateOnConflict(UID dbgid,
Promise<Void> fullyRecovered,
Future<Void> onConflict,
Future<Void> switchedState);
class ReusableCoordinatedState : NonCopyable {
public:
Promise<Void> fullyRecovered;
DBCoreState prevDBState;
DBCoreState myDBState;
bool finalWriteStarted;
Future<Void> previousWrite;
ReusableCoordinatedState(ServerCoordinators const& coordinators,
PromiseStream<Future<Void>> const& addActor,
UID const& dbgid)
: finalWriteStarted(false), previousWrite(Void()), cstate(coordinators), coordinators(coordinators),
addActor(addActor), dbgid(dbgid) {}
Future<Void> read() { return _read(this); }
Future<Void> write(DBCoreState newState, bool finalWrite = false) {
previousWrite = _write(this, newState, finalWrite);
return previousWrite;
}
Future<Void> move(ClusterConnectionString const& nc) { return cstate.move(nc); }
private:
MovableCoordinatedState cstate;
ServerCoordinators coordinators;
PromiseStream<Future<Void>> addActor;
Promise<Void> switchedState;
UID dbgid;
ACTOR Future<Void> _read(ReusableCoordinatedState* self) {
Value prevDBStateRaw = wait(self->cstate.read());
Future<Void> onConflict = recoveryTerminateOnConflict(
self->dbgid, self->fullyRecovered, self->cstate.onConflict(), self->switchedState.getFuture());
if (onConflict.isReady() && onConflict.isError()) {
throw onConflict.getError();
}
self->addActor.send(onConflict);
if (prevDBStateRaw.size()) {
self->prevDBState = BinaryReader::fromStringRef<DBCoreState>(prevDBStateRaw, IncludeVersion());
self->myDBState = self->prevDBState;
}
return Void();
}
ACTOR Future<Void> _write(ReusableCoordinatedState* self, DBCoreState newState, bool finalWrite) {
if (self->finalWriteStarted) {
wait(Future<Void>(Never()));
}
if (finalWrite) {
self->finalWriteStarted = true;
}
try {
wait(self->cstate.setExclusive(
BinaryWriter::toValue(newState, IncludeVersion(ProtocolVersion::withDBCoreState()))));
} catch (Error& e) {
TEST(true); // Master displaced during writeMasterState
throw;
}
self->myDBState = newState;
if (!finalWrite) {
self->switchedState.send(Void());
self->cstate = MovableCoordinatedState(self->coordinators);
Value rereadDBStateRaw = wait(self->cstate.read());
DBCoreState readState;
if (rereadDBStateRaw.size())
readState = BinaryReader::fromStringRef<DBCoreState>(rereadDBStateRaw, IncludeVersion());
if (readState != newState) {
TraceEvent("RecoveryTerminated", self->dbgid).detail("Reason", "CStateChanged");
TEST(true); // Coordinated state changed between writing and reading, recovery restarting
throw worker_removed();
}
self->switchedState = Promise<Void>();
self->addActor.send(recoveryTerminateOnConflict(
self->dbgid, self->fullyRecovered, self->cstate.onConflict(), self->switchedState.getFuture()));
} else {
self->fullyRecovered.send(Void());
}
return Void();
}
};
struct ClusterRecoveryData : NonCopyable, ReferenceCounted<ClusterRecoveryData> {
ClusterControllerData* controllerData;
UID dbgid;
AsyncTrigger registrationTrigger;
Version lastEpochEnd, // The last version in the old epoch not (to be) rolled back in this recovery
recoveryTransactionVersion; // The first version in this epoch
double lastCommitTime;
Version liveCommittedVersion; // The largest live committed version reported by commit proxies.
bool databaseLocked;
Optional<Value> proxyMetadataVersion;
Version minKnownCommittedVersion;
DatabaseConfiguration originalConfiguration;
DatabaseConfiguration configuration;
std::vector<Optional<Key>> primaryDcId;
std::vector<Optional<Key>> remoteDcIds;
bool hasConfiguration;
ServerCoordinators coordinators;
Reference<ILogSystem> logSystem;
Version version; // The last version assigned to a proxy by getVersion()
double lastVersionTime;
LogSystemDiskQueueAdapter* txnStateLogAdapter;
IKeyValueStore* txnStateStore;
int64_t memoryLimit;
std::map<Optional<Value>, int8_t> dcId_locality;
std::vector<Tag> allTags;
int8_t getNextLocality() {
int8_t maxLocality = -1;
for (auto it : dcId_locality) {
maxLocality = std::max(maxLocality, it.second);
}
return maxLocality + 1;
}
std::vector<CommitProxyInterface> commitProxies;
std::vector<CommitProxyInterface> provisionalCommitProxies;
std::vector<GrvProxyInterface> grvProxies;
std::vector<GrvProxyInterface> provisionalGrvProxies;
std::vector<ResolverInterface> resolvers;
std::map<UID, CommitProxyVersionReplies> lastCommitProxyVersionReplies;
UID clusterId;
Standalone<StringRef> dbId;
MasterInterface masterInterface;
LifetimeToken masterLifetime;
const ClusterControllerFullInterface
clusterController; // If the cluster controller changes, this master will die, so this is immutable.
ReusableCoordinatedState cstate;
Promise<Void> recoveryReadyForCommits;
Promise<Void> cstateUpdated;
Reference<AsyncVar<ServerDBInfo> const> dbInfo;
int64_t registrationCount; // Number of different MasterRegistrationRequests sent to clusterController
RecoveryState recoveryState;
AsyncVar<Standalone<VectorRef<ResolverMoveRef>>> resolverChanges;
Version resolverChangesVersion;
std::set<UID> resolverNeedingChanges;
PromiseStream<Future<Void>> addActor;
Reference<AsyncVar<bool>> recruitmentStalled;
bool forceRecovery;
bool neverCreated;
int8_t safeLocality;
int8_t primaryLocality;
std::vector<WorkerInterface> backupWorkers; // Recruited backup workers from cluster controller.
CounterCollection cc;
Counter changeCoordinatorsRequests;
Counter getCommitVersionRequests;
Counter backupWorkerDoneRequests;
Counter getLiveCommittedVersionRequests;
Counter reportLiveCommittedVersionRequests;
Future<Void> logger;
Reference<EventCacheHolder> clusterRecoveryStateEventHolder;
Reference<EventCacheHolder> clusterRecoveryGenerationsEventHolder;
Reference<EventCacheHolder> clusterRecoveryDurationEventHolder;
Reference<EventCacheHolder> clusterRecoveryAvailableEventHolder;
Reference<EventCacheHolder> recoveredConfigEventHolder;
ClusterRecoveryData(ClusterControllerData* controllerData,
Reference<AsyncVar<ServerDBInfo> const> const& dbInfo,
MasterInterface const& masterInterface,
LifetimeToken const& masterLifetimeToken,
ServerCoordinators const& coordinators,
ClusterControllerFullInterface const& clusterController,
Standalone<StringRef> const& dbId,
PromiseStream<Future<Void>> const& addActor,
bool forceRecovery)
: controllerData(controllerData), dbgid(masterInterface.id()), lastEpochEnd(invalidVersion),
recoveryTransactionVersion(invalidVersion), lastCommitTime(0), liveCommittedVersion(invalidVersion),
databaseLocked(false), minKnownCommittedVersion(invalidVersion), hasConfiguration(false),
coordinators(coordinators), version(invalidVersion), lastVersionTime(0), txnStateStore(nullptr),
memoryLimit(2e9), dbId(dbId), masterInterface(masterInterface), masterLifetime(masterLifetimeToken),
clusterController(clusterController), cstate(coordinators, addActor, dbgid), dbInfo(dbInfo),
registrationCount(0), addActor(addActor), recruitmentStalled(makeReference<AsyncVar<bool>>(false)),
forceRecovery(forceRecovery), neverCreated(false), safeLocality(tagLocalityInvalid),
primaryLocality(tagLocalityInvalid), cc("Master", dbgid.toString()),
changeCoordinatorsRequests("ChangeCoordinatorsRequests", cc),
getCommitVersionRequests("GetCommitVersionRequests", cc),
backupWorkerDoneRequests("BackupWorkerDoneRequests", cc),
getLiveCommittedVersionRequests("GetLiveCommittedVersionRequests", cc),
reportLiveCommittedVersionRequests("ReportLiveCommittedVersionRequests", cc),
clusterRecoveryStateEventHolder(makeReference<EventCacheHolder>("ClusterRecoveryState")),
clusterRecoveryGenerationsEventHolder(makeReference<EventCacheHolder>("ClusterRecoveryGenerations")),
clusterRecoveryDurationEventHolder(makeReference<EventCacheHolder>("ClusterRecoveryDuration")),
clusterRecoveryAvailableEventHolder(makeReference<EventCacheHolder>("ClusterRecoveryAvailable")),
recoveredConfigEventHolder(makeReference<EventCacheHolder>("RecoveredConfig")) {
logger = traceCounters(
"ClusterRecoveryMetrics", dbgid, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ClusterRecoveryMetrics");
if (forceRecovery && !controllerData->clusterControllerDcId.present()) {
TraceEvent(SevError, "ForcedRecoveryRequiresDcID").log();
forceRecovery = false;
}
}
~ClusterRecoveryData() {
if (txnStateStore)
txnStateStore->close();
}
};
ACTOR Future<Void> recruitNewMaster(ClusterControllerData* cluster,
ClusterControllerData::DBInfo* db,
MasterInterface* newMaster);
ACTOR Future<Void> cleanupRecoveryActorCollection(Reference<ClusterRecoveryData> self, bool exThrown);
ACTOR Future<Void> clusterRecoveryCore(Reference<ClusterRecoveryData> self);
bool isNormalClusterRecoveryError(const Error&);
#include "flow/unactorcompiler.h"
#endif