Merge pull request #5637 from sfc-gh-ljoswiak/features/data-loss-prevention

Data loss protection when joining new cluster
This commit is contained in:
Evan Tschannen 2021-11-15 15:26:32 -08:00 committed by GitHub
commit 964d0209ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 314 additions and 56 deletions

View File

@ -213,6 +213,8 @@ const KeyRangeRef writeConflictRangeKeysRange =
KeyRangeRef(LiteralStringRef("\xff\xff/transaction/write_conflict_range/"),
LiteralStringRef("\xff\xff/transaction/write_conflict_range/\xff\xff"));
const KeyRef clusterIdKey = LiteralStringRef("\xff/clusterId");
// "\xff/cacheServer/[[UID]] := StorageServerInterface"
const KeyRangeRef storageCacheServerKeys(LiteralStringRef("\xff/cacheServer/"), LiteralStringRef("\xff/cacheServer0"));
const KeyRef storageCacheServersPrefix = storageCacheServerKeys.begin;

View File

@ -67,6 +67,8 @@ void decodeKeyServersValue(std::map<Tag, UID> const& tag_uid,
std::vector<UID>& src,
std::vector<UID>& dest);
extern const KeyRef clusterIdKey;
// "\xff/storageCacheServer/[[UID]] := StorageServerInterface"
// This will be added by the cache server on initialization and removed by DD
// TODO[mpilman]: We will need a way to map uint16_t ids to UIDs in a future

View File

@ -543,7 +543,7 @@ private:
m.param1.startsWith(applyMutationsAddPrefixRange.begin) ||
m.param1.startsWith(applyMutationsRemovePrefixRange.begin) || m.param1.startsWith(tagLocalityListPrefix) ||
m.param1.startsWith(serverTagHistoryPrefix) ||
m.param1.startsWith(testOnlyTxnStateStorePrefixRange.begin)) {
m.param1.startsWith(testOnlyTxnStateStorePrefixRange.begin) || m.param1 == clusterIdKey) {
txnStateStore->set(KeyValueRef(m.param1, m.param2));
}

View File

@ -3438,7 +3438,7 @@ ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster, ClusterC
TEST(true); // clusterWatchDatabase() master failed
TraceEvent(SevWarn, "DetectedFailedMaster", cluster->id).detail("OldMaster", iMaster.id());
} else {
TEST(true); // clusterWatchDatabas() !newMaster.present()
TEST(true); // clusterWatchDatabase() !newMaster.present()
wait(delay(SERVER_KNOBS->MASTER_SPIN_DELAY));
}
} catch (Error& e) {
@ -4065,7 +4065,8 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co
.detail("GrvProxies", req.grvProxies.size())
.detail("RecoveryCount", req.recoveryCount)
.detail("Stalled", req.recoveryStalled)
.detail("OldestBackupEpoch", req.logSystemConfig.oldestBackupEpoch);
.detail("OldestBackupEpoch", req.logSystemConfig.oldestBackupEpoch)
.detail("ClusterId", req.clusterId);
// make sure the request comes from an active database
auto db = &self->db;
@ -4149,6 +4150,11 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co
dbInfo.recoveryCount = req.recoveryCount;
}
if (dbInfo.clusterId != req.clusterId) {
isChanged = true;
dbInfo.clusterId = req.clusterId;
}
if (isChanged) {
dbInfo.id = deterministicRandom()->randomUniqueID();
dbInfo.infoGeneration = ++self->db.dbInfoCount;

View File

@ -5267,6 +5267,21 @@ struct TSSPairState : ReferenceCounted<TSSPairState>, NonCopyable {
Future<Void> waitComplete() { return complete.getFuture(); }
};
ACTOR Future<UID> getClusterId(DDTeamCollection* self) {
state ReadYourWritesTransaction tr(self->cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> clusterId = wait(tr.get(clusterIdKey));
ASSERT(clusterId.present());
return BinaryReader::fromStringRef<UID>(clusterId.get(), Unversioned());
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<Void> initializeStorage(DDTeamCollection* self,
RecruitStorageReply candidateWorker,
const DDEnabledState* ddEnabledState,
@ -5284,12 +5299,15 @@ ACTOR Future<Void> initializeStorage(DDTeamCollection* self,
// Ask the candidateWorker to initialize a SS only if the worker does not have a pending request
state UID interfaceId = deterministicRandom()->randomUniqueID();
UID clusterId = wait(getClusterId(self));
state InitializeStorageRequest isr;
isr.storeType =
recruitTss ? self->configuration.testingStorageServerStoreType : self->configuration.storageServerStoreType;
isr.seedTag = invalidTag;
isr.reqId = deterministicRandom()->randomUniqueID();
isr.interfaceId = interfaceId;
isr.clusterId = clusterId;
self->recruitingIds.insert(interfaceId);
self->recruitingLocalities.insert(candidateWorker.worker.stableAddress());

View File

@ -637,6 +637,7 @@ struct ILogSystem {
virtual Future<Reference<ILogSystem>> newEpoch(
RecruitFromConfigurationReply const& recr,
Future<struct RecruitRemoteFromConfigurationReply> const& fRemoteWorkers,
UID clusterId,
DatabaseConfiguration const& config,
LogEpoch recoveryCount,
int8_t primaryLocality,

View File

@ -64,6 +64,7 @@ struct ServerDBInfo {
// which need to stay alive in case this recovery fails
Optional<LatencyBandConfig> latencyBandConfig;
int64_t infoGeneration;
UID clusterId;
ServerDBInfo()
: recoveryCount(0), recoveryState(RecoveryState::UNINITIALIZED), logSystemConfig(0), infoGeneration(0) {}
@ -88,7 +89,8 @@ struct ServerDBInfo {
logSystemConfig,
priorCommittedLogServers,
latencyBandConfig,
infoGeneration);
infoGeneration,
clusterId);
}
};

View File

@ -26,6 +26,7 @@
#include "fdbclient/RunTransaction.actor.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/LogProtocolMessage.h"
#include "fdbserver/SpanContextMessage.h"
@ -225,6 +226,8 @@ static const KeyRange persistTagMessagesKeys = prefixRange(LiteralStringRef("Tag
static const KeyRange persistTagMessageRefsKeys = prefixRange(LiteralStringRef("TagMsgRef/"));
static const KeyRange persistTagPoppedKeys = prefixRange(LiteralStringRef("TagPop/"));
static const KeyRef persistClusterIdKey = LiteralStringRef("clusterId");
static Key persistTagMessagesKey(UID id, Tag tag, Version version) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(persistTagMessagesKeys.begin);
@ -312,6 +315,13 @@ struct TLogData : NonCopyable {
Deque<UID> spillOrder;
std::map<UID, Reference<struct LogData>> id_data;
// The durable cluster ID identifies which cluster the tlogs persistent
// data is written from. This value is restored from disk when the tlog
// restarts.
UID durableClusterId;
// The master cluster ID stores the cluster ID read from the txnStateStore.
// It is cached in this variable.
UID masterClusterId;
UID dbgid;
UID workerID;
@ -2214,6 +2224,24 @@ ACTOR Future<Void> initPersistentState(TLogData* self, Reference<LogData> logDat
return Void();
}
ACTOR Future<UID> getClusterId(TLogData* self) {
state ReadYourWritesTransaction tr(self->cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> clusterId = wait(tr.get(clusterIdKey));
if (clusterId.present()) {
return BinaryReader::fromStringRef<UID>(clusterId.get(), Unversioned());
} else {
return UID();
}
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<Void> rejoinMasters(TLogData* self,
TLogInterface tli,
DBRecoveryCount recoveryCount,
@ -2234,14 +2262,21 @@ ACTOR Future<Void> rejoinMasters(TLogData* self,
}
isDisplaced = isDisplaced && !inf.logSystemConfig.hasTLog(tli.id());
if (isDisplaced) {
TraceEvent("TLogDisplaced", tli.id())
.detail("Reason", "DBInfoDoesNotContain")
state TraceEvent ev("TLogDisplaced", tli.id());
ev.detail("Reason", "DBInfoDoesNotContain")
.detail("RecoveryCount", recoveryCount)
.detail("InfRecoveryCount", inf.recoveryCount)
.detail("RecoveryState", (int)inf.recoveryState)
.detail("LogSysConf", describe(inf.logSystemConfig.tLogs))
.detail("PriorLogs", describe(inf.priorCommittedLogServers))
.detail("OldLogGens", inf.logSystemConfig.oldTLogs.size());
// Read and cache cluster ID before displacing this tlog. We want
// to avoid removing the tlogs data if it has joined a new cluster
// with a different cluster ID.
state UID clusterId = wait(getClusterId(self));
ASSERT(clusterId.isValid());
self->masterClusterId = clusterId;
ev.detail("ClusterId", clusterId).detail("SelfClusterId", self->durableClusterId);
if (BUGGIFY)
wait(delay(SERVER_KNOBS->BUGGIFY_WORKER_REMOVED_MAX_LAG * deterministicRandom()->random01()));
throw worker_removed();
@ -2460,6 +2495,17 @@ ACTOR Future<Void> serveTLogInterface(TLogData* self,
} else {
logData->logSystem->set(Reference<ILogSystem>());
}
// Persist cluster ID once cluster has recovered.
auto masterClusterId = self->dbInfo->get().clusterId;
if (self->dbInfo->get().recoveryState == RecoveryState::FULLY_RECOVERED &&
!self->durableClusterId.isValid()) {
ASSERT(masterClusterId.isValid());
self->durableClusterId = masterClusterId;
self->persistentData->set(
KeyValueRef(persistClusterIdKey, BinaryWriter::toValue(masterClusterId, Unversioned())));
wait(self->persistentData->commit());
}
}
when(TLogPeekStreamRequest req = waitNext(tli.peekStreamMessages.getFuture())) {
TraceEvent(SevDebug, "TLogPeekStream", logData->logId)
@ -2807,6 +2853,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
wait(storage->init());
state Future<Optional<Value>> fFormat = storage->readValue(persistFormat.key);
state Future<Optional<Value>> fRecoveryLocation = storage->readValue(persistRecoveryLocationKey);
state Future<Optional<Value>> fClusterId = storage->readValue(persistClusterIdKey);
state Future<RangeResult> fVers = storage->readRange(persistCurrentVersionKeys);
state Future<RangeResult> fKnownCommitted = storage->readRange(persistKnownCommittedVersionKeys);
state Future<RangeResult> fLocality = storage->readRange(persistLocalityKeys);
@ -2818,7 +2865,7 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
// FIXME: metadata in queue?
wait(waitForAll(std::vector{ fFormat, fRecoveryLocation }));
wait(waitForAll(std::vector{ fFormat, fRecoveryLocation, fClusterId }));
wait(waitForAll(std::vector{ fVers,
fKnownCommitted,
fLocality,
@ -2828,6 +2875,10 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
fProtocolVersions,
fTLogSpillTypes }));
if (fClusterId.get().present()) {
self->durableClusterId = BinaryReader::fromStringRef<UID>(fClusterId.get().get(), Unversioned());
}
if (fFormat.get().present() && !persistFormatReadableRange.contains(fFormat.get().get())) {
// FIXME: remove when we no longer need to test upgrades from 4.X releases
if (g_network->isSimulated()) {
@ -3089,7 +3140,7 @@ bool tlogTerminated(TLogData* self, IKeyValueStore* persistentData, TLogQueue* p
}
if (e.code() == error_code_worker_removed || e.code() == error_code_recruitment_failed ||
e.code() == error_code_file_not_found) {
e.code() == error_code_file_not_found || e.code() == error_code_invalid_cluster_id) {
TraceEvent("TLogTerminated", self->dbgid).error(e, true);
return true;
} else
@ -3353,48 +3404,91 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData,
TraceEvent("SharedTlog", tlogId).log();
try {
if (restoreFromDisk) {
wait(restorePersistentState(&self, locality, oldLog, recovered, tlogRequests));
} else {
wait(ioTimeoutError(checkEmptyQueue(&self) && checkRecovered(&self),
SERVER_KNOBS->TLOG_MAX_CREATE_DURATION));
}
try {
if (restoreFromDisk) {
wait(restorePersistentState(&self, locality, oldLog, recovered, tlogRequests));
} else {
wait(ioTimeoutError(checkEmptyQueue(&self) && checkRecovered(&self),
SERVER_KNOBS->TLOG_MAX_CREATE_DURATION));
}
// Disk errors need a chance to kill this actor.
wait(delay(0.000001));
// Disk errors need a chance to kill this actor.
wait(delay(0.000001));
if (recovered.canBeSet())
recovered.send(Void());
if (recovered.canBeSet())
recovered.send(Void());
self.sharedActors.send(commitQueue(&self));
self.sharedActors.send(updateStorageLoop(&self));
self.sharedActors.send(traceRole(Role::SHARED_TRANSACTION_LOG, tlogId));
state Future<Void> activeSharedChange = Void();
self.sharedActors.send(commitQueue(&self));
self.sharedActors.send(updateStorageLoop(&self));
self.sharedActors.send(traceRole(Role::SHARED_TRANSACTION_LOG, tlogId));
state Future<Void> activeSharedChange = Void();
loop {
choose {
when(InitializeTLogRequest req = waitNext(tlogRequests.getFuture())) {
if (!self.tlogCache.exists(req.recruitmentID)) {
self.tlogCache.set(req.recruitmentID, req.reply.getFuture());
self.sharedActors.send(
self.tlogCache.removeOnReady(req.recruitmentID, tLogStart(&self, req, locality)));
} else {
forwardPromise(req.reply, self.tlogCache.get(req.recruitmentID));
loop {
choose {
when(state InitializeTLogRequest req = waitNext(tlogRequests.getFuture())) {
ASSERT(req.clusterId.isValid());
// Durably persist the cluster ID if it is not already
// durable and the cluster has progressed far enough
// through recovery. To avoid different partitions from
// persisting different cluster IDs, we need to wait
// until a single cluster ID has been persisted in the
// txnStateStore before finally writing it to disk.
auto recoveryState = self.dbInfo->get().recoveryState;
if (!self.durableClusterId.isValid() && recoveryState >= RecoveryState::ACCEPTING_COMMITS) {
self.durableClusterId = req.clusterId;
// Will let commit loop durably write the cluster ID.
self.persistentData->set(
KeyValueRef(persistClusterIdKey, BinaryWriter::toValue(req.clusterId, Unversioned())));
}
if (!self.tlogCache.exists(req.recruitmentID)) {
self.tlogCache.set(req.recruitmentID, req.reply.getFuture());
self.sharedActors.send(
self.tlogCache.removeOnReady(req.recruitmentID, tLogStart(&self, req, locality)));
} else {
forwardPromise(req.reply, self.tlogCache.get(req.recruitmentID));
}
}
}
when(wait(error)) { throw internal_error(); }
when(wait(activeSharedChange)) {
if (activeSharedTLog->get() == tlogId) {
TraceEvent("SharedTLogNowActive", self.dbgid).detail("NowActive", activeSharedTLog->get());
self.targetVolatileBytes = SERVER_KNOBS->TLOG_SPILL_THRESHOLD;
} else {
stopAllTLogs(&self, tlogId);
TraceEvent("SharedTLogQueueSpilling", self.dbgid).detail("NowActive", activeSharedTLog->get());
self.sharedActors.send(startSpillingInTenSeconds(&self, tlogId, activeSharedTLog));
when(wait(error)) { throw internal_error(); }
when(wait(activeSharedChange)) {
if (activeSharedTLog->get() == tlogId) {
TraceEvent("SharedTLogNowActive", self.dbgid).detail("NowActive", activeSharedTLog->get());
self.targetVolatileBytes = SERVER_KNOBS->TLOG_SPILL_THRESHOLD;
} else {
stopAllTLogs(&self, tlogId);
TraceEvent("SharedTLogQueueSpilling", self.dbgid)
.detail("NowActive", activeSharedTLog->get());
self.sharedActors.send(startSpillingInTenSeconds(&self, tlogId, activeSharedTLog));
}
activeSharedChange = activeSharedTLog->onChange();
}
activeSharedChange = activeSharedTLog->onChange();
}
}
} catch (Error& e) {
if (e.code() != error_code_worker_removed) {
throw;
}
// Don't need to worry about deleting data if there is no durable
// cluster ID.
if (!self.durableClusterId.isValid()) {
throw;
}
// When a tlog joins a new cluster and has data for an old cluster,
// it should automatically exclude itself to avoid being used in
// the new cluster.
auto recoveryState = self.dbInfo->get().recoveryState;
if (recoveryState == RecoveryState::FULLY_RECOVERED && self.masterClusterId.isValid() &&
self.durableClusterId.isValid() && self.masterClusterId != self.durableClusterId) {
state NetworkAddress address = g_network->getLocalAddress();
wait(excludeServers(self.cx, { AddressExclusion{ address.ip, address.port } }));
TraceEvent(SevWarnAlways, "TLogBelongsToExistingCluster")
.detail("ClusterId", self.durableClusterId)
.detail("NewClusterId", self.masterClusterId);
}
// If the tlog has a valid durable cluster ID, we don't want it to
// wipe its data! Throw this error to signal to `tlogTerminated` to
// close the persistent data store instead of deleting it.
throw invalid_cluster_id();
}
} catch (Error& e) {
self.terminated.send(Void());

View File

@ -1574,6 +1574,7 @@ Future<Void> TagPartitionedLogSystem::endEpoch() {
Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
RecruitFromConfigurationReply const& recr,
Future<RecruitRemoteFromConfigurationReply> const& fRemoteWorkers,
UID clusterId,
DatabaseConfiguration const& config,
LogEpoch recoveryCount,
int8_t primaryLocality,
@ -1583,6 +1584,7 @@ Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
return newEpoch(Reference<TagPartitionedLogSystem>::addRef(this),
recr,
fRemoteWorkers,
clusterId,
config,
recoveryCount,
primaryLocality,
@ -2437,6 +2439,7 @@ std::vector<Tag> TagPartitionedLogSystem::getLocalTags(int8_t locality, const st
ACTOR Future<Void> TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSystem* self,
Reference<TagPartitionedLogSystem> oldLogSystem,
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers,
UID clusterId,
DatabaseConfiguration configuration,
LogEpoch recoveryCount,
int8_t remoteLocality,
@ -2576,6 +2579,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst
req.startVersion = logSet->startVersion;
req.logRouterTags = 0;
req.txsTags = self->txsTags;
req.clusterId = clusterId;
}
remoteTLogInitializationReplies.reserve(remoteWorkers.remoteTLogs.size());
@ -2624,6 +2628,7 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
Reference<TagPartitionedLogSystem> oldLogSystem,
RecruitFromConfigurationReply recr,
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers,
UID clusterId,
DatabaseConfiguration configuration,
LogEpoch recoveryCount,
int8_t primaryLocality,
@ -2844,6 +2849,7 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
req.startVersion = logSystem->tLogs[0]->startVersion;
req.logRouterTags = logSystem->logRouterTags;
req.txsTags = logSystem->txsTags;
req.clusterId = clusterId;
}
initializationReplies.reserve(recr.tLogs.size());
@ -2910,6 +2916,7 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
req.startVersion = oldLogSystem->knownCommittedVersion + 1;
req.logRouterTags = logSystem->logRouterTags;
req.txsTags = logSystem->txsTags;
req.clusterId = clusterId;
}
satelliteInitializationReplies.reserve(recr.satelliteTLogs.size());
@ -2961,8 +2968,14 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
if (configuration.usableRegions > 1) {
logSystem->hasRemoteServers = true;
logSystem->remoteRecovery = TagPartitionedLogSystem::newRemoteEpoch(
logSystem.getPtr(), oldLogSystem, fRemoteWorkers, configuration, recoveryCount, remoteLocality, allTags);
logSystem->remoteRecovery = TagPartitionedLogSystem::newRemoteEpoch(logSystem.getPtr(),
oldLogSystem,
fRemoteWorkers,
clusterId,
configuration,
recoveryCount,
remoteLocality,
allTags);
if (oldLogSystem->tLogs.size() > 0 && oldLogSystem->tLogs[0]->locality == tagLocalitySpecial) {
// The wait is required so that we know both primary logs and remote logs have copied the data between
// the known committed version and the recovery version.

View File

@ -263,6 +263,7 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
// The new epoch is only provisional until the caller updates the coordinated DBCoreState.
Future<Reference<ILogSystem>> newEpoch(RecruitFromConfigurationReply const& recr,
Future<RecruitRemoteFromConfigurationReply> const& fRemoteWorkers,
UID clusterId,
DatabaseConfiguration const& config,
LogEpoch recoveryCount,
int8_t primaryLocality,
@ -342,6 +343,7 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
ACTOR static Future<Void> newRemoteEpoch(TagPartitionedLogSystem* self,
Reference<TagPartitionedLogSystem> oldLogSystem,
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers,
UID clusterId,
DatabaseConfiguration configuration,
LogEpoch recoveryCount,
int8_t remoteLocality,
@ -350,6 +352,7 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
ACTOR static Future<Reference<ILogSystem>> newEpoch(Reference<TagPartitionedLogSystem> oldLogSystem,
RecruitFromConfigurationReply recr,
Future<RecruitRemoteFromConfigurationReply> fRemoteWorkers,
UID clusterId,
DatabaseConfiguration configuration,
LogEpoch recoveryCount,
int8_t primaryLocality,
@ -380,4 +383,4 @@ std::vector<T> TagPartitionedLogSystem::getReadyNonError(std::vector<Future<T>>
}
#include "flow/unactorcompiler.h"
#endif // FDBSERVER_TAGPARTITIONEDLOGSYSTEM_ACTOR_H
#endif // FDBSERVER_TAGPARTITIONEDLOGSYSTEM_ACTOR_H

View File

@ -238,6 +238,7 @@ struct RegisterMasterRequest {
std::vector<UID> priorCommittedLogServers;
RecoveryState recoveryState;
bool recoveryStalled;
UID clusterId;
ReplyPromise<Void> reply;
@ -261,6 +262,7 @@ struct RegisterMasterRequest {
priorCommittedLogServers,
recoveryState,
recoveryStalled,
clusterId,
reply);
}
};
@ -499,6 +501,7 @@ struct InitializeTLogRequest {
Version startVersion;
int logRouterTags;
int txsTags;
UID clusterId;
ReplyPromise<struct TLogInterface> reply;
@ -523,7 +526,8 @@ struct InitializeTLogRequest {
reply,
logVersion,
spillType,
txsTags);
txsTags,
clusterId);
}
};
@ -693,11 +697,12 @@ struct InitializeStorageRequest {
KeyValueStoreType storeType;
Optional<std::pair<UID, Version>>
tssPairIDAndVersion; // Only set if recruiting a tss. Will be the UID and Version of its SS pair.
UID clusterId; // Unique cluster identifier. Only needed at recruitment, will be read from txnStateStore on recovery
ReplyPromise<InitializeStorageReply> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, seedTag, reqId, interfaceId, storeType, reply, tssPairIDAndVersion);
serializer(ar, seedTag, reqId, interfaceId, storeType, reply, tssPairIDAndVersion, clusterId);
}
};
@ -992,6 +997,7 @@ class IDiskQueue;
ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
StorageServerInterface ssi,
Tag seedTag,
UID clusterId,
Version tssSeedVersion,
ReplyPromise<InitializeStorageReply> recruitReply,
Reference<AsyncVar<ServerDBInfo> const> db,

View File

@ -216,6 +216,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
std::map<UID, CommitProxyVersionReplies> lastCommitProxyVersionReplies;
UID clusterId;
Standalone<StringRef> dbId;
MasterInterface myInterface;
@ -402,6 +403,7 @@ ACTOR Future<Void> newTLogServers(Reference<MasterData> self,
self->logSystem = Reference<ILogSystem>(); // Cancels the actors in the previous log system.
Reference<ILogSystem> newLogSystem = wait(oldLogSystem->newEpoch(recr,
fRemoteWorkers,
self->clusterId,
self->configuration,
self->cstate.myDBState.recoveryCount + 1,
self->primaryLocality,
@ -414,6 +416,7 @@ ACTOR Future<Void> newTLogServers(Reference<MasterData> self,
self->logSystem = Reference<ILogSystem>(); // Cancels the actors in the previous log system.
Reference<ILogSystem> newLogSystem = wait(oldLogSystem->newEpoch(recr,
Never(),
self->clusterId,
self->configuration,
self->cstate.myDBState.recoveryCount + 1,
self->primaryLocality,
@ -447,6 +450,7 @@ ACTOR Future<Void> newSeedServers(Reference<MasterData> self,
isr.storeType = self->configuration.storageServerStoreType;
isr.reqId = deterministicRandom()->randomUniqueID();
isr.interfaceId = deterministicRandom()->randomUniqueID();
isr.clusterId = self->clusterId;
ErrorOr<InitializeStorageReply> newServer = wait(recruits.storageServers[idx].storage.tryGetReply(isr));
@ -582,6 +586,7 @@ Future<Void> sendMasterRegistration(MasterData* self,
masterReq.priorCommittedLogServers = priorCommittedLogServers;
masterReq.recoveryState = self->recoveryState;
masterReq.recoveryStalled = self->recruitmentStalled->get();
masterReq.clusterId = self->clusterId;
return brokenPromiseToNever(self->clusterController.registerMaster.getReply(masterReq));
}
@ -1053,7 +1058,8 @@ ACTOR Future<Void> recoverFrom(Reference<MasterData> self,
Reference<ILogSystem> oldLogSystem,
std::vector<StorageServerInterface>* seedServers,
std::vector<Standalone<CommitTransactionRef>>* initialConfChanges,
Future<Version> poppedTxsVersion) {
Future<Version> poppedTxsVersion,
bool* clusterIdExists) {
TraceEvent("MasterRecoveryState", self->dbgid)
.detail("StatusCode", RecoveryStatus::reading_transaction_system_state)
.detail("Status", RecoveryStatus::names[RecoveryStatus::reading_transaction_system_state])
@ -1077,6 +1083,16 @@ ACTOR Future<Void> recoverFrom(Reference<MasterData> self,
debug_checkMaxRestoredVersion(UID(), self->lastEpochEnd, "DBRecovery");
// Generate a cluster ID to uniquely identify the cluster if it doesn't
// already exist in the txnStateStore.
Optional<Value> clusterId = self->txnStateStore->readValue(clusterIdKey).get();
*clusterIdExists = clusterId.present();
if (!clusterId.present()) {
self->clusterId = deterministicRandom()->randomUniqueID();
} else {
self->clusterId = BinaryReader::fromStringRef<UID>(clusterId.get(), Unversioned());
}
// Ordinarily we pass through this loop once and recover. We go around the loop if recovery stalls for more than a
// second, a provisional master is initialized, and an "emergency transaction" is submitted that might change the
// configuration so that we can finish recovery.
@ -1490,6 +1506,7 @@ ACTOR Future<Void> trackTlogRecovery(Reference<MasterData> self,
.detail("StatusCode", RecoveryStatus::fully_recovered)
.detail("Status", RecoveryStatus::names[RecoveryStatus::fully_recovered])
.detail("FullyRecoveredAtVersion", self->version)
.detail("ClusterId", self->clusterId)
.trackLatest(self->masterRecoveryStateEventHolder->trackingKey);
TraceEvent("MasterRecoveryGenerations", self->dbgid)
@ -1757,6 +1774,7 @@ ACTOR Future<Void> masterCore(Reference<MasterData> self) {
state Future<Void> logChanges;
state Future<Void> minRecoveryDuration;
state Future<Version> poppedTxsVersion;
state bool clusterIdExists = false;
loop {
Reference<ILogSystem> oldLogSystem = oldLogSystems->get();
@ -1772,9 +1790,13 @@ ACTOR Future<Void> masterCore(Reference<MasterData> self) {
self->registrationTrigger.trigger();
choose {
when(wait(oldLogSystem
? recoverFrom(self, oldLogSystem, &seedServers, &initialConfChanges, poppedTxsVersion)
: Never())) {
when(wait(oldLogSystem ? recoverFrom(self,
oldLogSystem,
&seedServers,
&initialConfChanges,
poppedTxsVersion,
std::addressof(clusterIdExists))
: Never())) {
reg.cancel();
break;
}
@ -1803,6 +1825,7 @@ ACTOR Future<Void> masterCore(Reference<MasterData> self) {
.detail("Status", RecoveryStatus::names[RecoveryStatus::recovery_transaction])
.detail("PrimaryLocality", self->primaryLocality)
.detail("DcId", self->myInterface.locality.dcId())
.detail("ClusterId", self->clusterId)
.trackLatest(self->masterRecoveryStateEventHolder->trackingKey);
// Recovery transaction
@ -1883,6 +1906,11 @@ ACTOR Future<Void> masterCore(Reference<MasterData> self) {
}
}
// Write cluster ID into txnStateStore if it is missing.
if (!clusterIdExists) {
tr.set(recoveryCommitRequest.arena, clusterIdKey, BinaryWriter::toValue(self->clusterId, Unversioned()));
}
applyMetadataMutations(SpanID(),
self->dbgid,
recoveryCommitRequest.arena,

View File

@ -653,6 +653,7 @@ public:
Reference<ILogSystem> logSystem;
Reference<ILogSystem::IPeekCursor> logCursor;
Promise<UID> clusterId;
UID thisServerID;
Optional<UID> tssPairID; // if this server is a tss, this is the id of its (ss) pair
Optional<UID> ssPairID; // if this server is an ss, this is the id of its (tss) pair
@ -3811,6 +3812,7 @@ static const KeyRangeRef persistFormatReadableRange(LiteralStringRef("Foundation
static const KeyRef persistID = LiteralStringRef(PERSIST_PREFIX "ID");
static const KeyRef persistTssPairID = LiteralStringRef(PERSIST_PREFIX "tssPairID");
static const KeyRef persistTssQuarantine = LiteralStringRef(PERSIST_PREFIX "tssQ");
static const KeyRef persistClusterIdKey = LiteralStringRef(PERSIST_PREFIX "clusterId");
// (Potentially) change with the durable version or when fetchKeys completes
static const KeyRef persistVersion = LiteralStringRef(PERSIST_PREFIX "Version");
@ -5534,6 +5536,9 @@ void StorageServerDisk::makeNewStorageServerDurable() {
if (data->tssPairID.present()) {
storage->set(KeyValueRef(persistTssPairID, BinaryWriter::toValue(data->tssPairID.get(), Unversioned())));
}
ASSERT(data->clusterId.getFuture().isReady() && data->clusterId.getFuture().get().isValid());
storage->set(
KeyValueRef(persistClusterIdKey, BinaryWriter::toValue(data->clusterId.getFuture().get(), Unversioned())));
storage->set(KeyValueRef(persistVersion, BinaryWriter::toValue(data->version.get(), Unversioned())));
storage->set(KeyValueRef(persistShardAssignedKeys.begin.toString(), LiteralStringRef("0")));
storage->set(KeyValueRef(persistShardAvailableKeys.begin.toString(), LiteralStringRef("0")));
@ -5763,9 +5768,52 @@ ACTOR Future<Void> restoreByteSample(StorageServer* data,
return Void();
}
// Reads the cluster ID from the transaction state store.
ACTOR Future<UID> getClusterId(StorageServer* self) {
state ReadYourWritesTransaction tr(self->cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> clusterId = wait(tr.get(clusterIdKey));
ASSERT(clusterId.present());
return BinaryReader::fromStringRef<UID>(clusterId.get(), Unversioned());
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
// Read the cluster ID from the transaction state store and persist it to local
// storage. This function should only be necessary during an upgrade when the
// prior FDB version did not support cluster IDs. The normal path for storage
// server recruitment will include the cluster ID in the initial recruitment
// message.
ACTOR Future<Void> persistClusterId(StorageServer* self) {
state Transaction tr(self->cx);
loop {
try {
Optional<Value> clusterId = wait(tr.get(clusterIdKey));
if (clusterId.present()) {
auto uid = BinaryReader::fromStringRef<UID>(clusterId.get(), Unversioned());
self->storage.writeKeyValue(
KeyValueRef(persistClusterIdKey, BinaryWriter::toValue(uid, Unversioned())));
// Purposely not calling commit here, and letting the recurring
// commit handle save this value to disk
self->clusterId.send(uid);
}
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
}
ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* storage) {
state Future<Optional<Value>> fFormat = storage->readValue(persistFormat.key);
state Future<Optional<Value>> fID = storage->readValue(persistID);
state Future<Optional<Value>> fClusterID = storage->readValue(persistClusterIdKey);
state Future<Optional<Value>> ftssPairID = storage->readValue(persistTssPairID);
state Future<Optional<Value>> fTssQuarantine = storage->readValue(persistTssQuarantine);
state Future<Optional<Value>> fVersion = storage->readValue(persistVersion);
@ -5781,7 +5829,8 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
restoreByteSample(data, storage, byteSampleSampleRecovered, startByteSampleRestore.getFuture());
TraceEvent("ReadingDurableState", data->thisServerID).log();
wait(waitForAll(std::vector{ fFormat, fID, ftssPairID, fTssQuarantine, fVersion, fLogProtocol, fPrimaryLocality }));
wait(waitForAll(
std::vector{ fFormat, fID, fClusterID, ftssPairID, fTssQuarantine, fVersion, fLogProtocol, fPrimaryLocality }));
wait(waitForAll(std::vector{ fShardAssigned, fShardAvailable, fChangeFeeds }));
wait(byteSampleSampleRecovered.getFuture());
TraceEvent("RestoringDurableState", data->thisServerID).log();
@ -5805,6 +5854,13 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
data->setTssPair(BinaryReader::fromStringRef<UID>(ftssPairID.get().get(), Unversioned()));
}
if (fClusterID.get().present()) {
data->clusterId.send(BinaryReader::fromStringRef<UID>(fClusterID.get().get(), Unversioned()));
} else {
TEST(true); // storage server upgraded to version supporting cluster IDs
data->actors.add(persistClusterId(data));
}
// It's a bit sketchy to rely on an untrusted storage engine to persist its quarantine state when the quarantine
// state means the storage engine already had a durability or correctness error, but it should get re-quarantined
// very quickly because of a mismatch if it starts trying to do things again
@ -6666,11 +6722,13 @@ ACTOR Future<Void> memoryStoreRecover(IKeyValueStore* store, Reference<IClusterC
ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
StorageServerInterface ssi,
Tag seedTag,
UID clusterId,
Version tssSeedVersion,
ReplyPromise<InitializeStorageReply> recruitReply,
Reference<AsyncVar<ServerDBInfo> const> db,
std::string folder) {
state StorageServer self(persistentData, db, ssi);
self.clusterId.send(clusterId);
if (ssi.isTss()) {
self.setTssPair(ssi.tssPairID.get());
ASSERT(self.isTss());
@ -6913,10 +6971,32 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
if (recovered.canBeSet())
recovered.send(Void());
if (self.isTss()) {
wait(replaceTSSInterface(&self, ssi));
} else {
wait(replaceInterface(&self, ssi));
try {
if (self.isTss()) {
wait(replaceTSSInterface(&self, ssi));
} else {
wait(replaceInterface(&self, ssi));
}
} catch (Error& e) {
if (e.code() != error_code_worker_removed) {
throw;
}
state UID clusterId = wait(getClusterId(&self));
ASSERT(self.clusterId.isValid());
UID durableClusterId = wait(self.clusterId.getFuture());
ASSERT(durableClusterId.isValid());
if (clusterId == durableClusterId) {
throw worker_removed();
}
// When a storage server connects to a new cluster, it deletes its
// old data and creates a new, empty data file for the new cluster.
// We want to avoid this and force a manual removal of the storage
// servers' old data when being assigned to a new cluster to avoid
// accidental data loss.
TraceEvent(SevError, "StorageServerBelongsToExistingCluster")
.detail("ClusterID", durableClusterId)
.detail("NewClusterID", clusterId);
wait(Future<Void>(Never()));
}
TraceEvent("StorageServerStartingCore", self.thisServerID).detail("TimeTaken", now() - start);

View File

@ -1907,6 +1907,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
runningStorages.end(),
[&req](const auto& p) { return p.second != req.storeType; }) ||
req.seedTag != invalidTag)) {
ASSERT(req.clusterId.isValid());
LocalLineage _;
getCurrentLineage()->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Storage;
bool isTss = req.tssPairIDAndVersion.present();
@ -1951,6 +1952,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
Future<Void> s = storageServer(data,
recruited,
req.seedTag,
req.clusterId,
isTss ? req.tssPairIDAndVersion.get().second : 0,
storageReady,
dbInfo,

View File

@ -106,6 +106,7 @@ ERROR( tag_throttled, 1213, "Transaction tag is being throttled" )
ERROR( grv_proxy_failed, 1214, "Master terminating because a GRVProxy failed" )
ERROR( dd_tracker_cancelled, 1215, "The data distribution tracker has been cancelled" )
ERROR( failed_to_progress, 1216, "Process has failed to make sufficient progress" )
ERROR( invalid_cluster_id, 1217, "Attempted to join cluster with a different cluster ID" )
// 15xx Platform errors
ERROR( platform_error, 1500, "Platform error" )