/* * TagPartitionedLogSystem.actor.h * * This source file is part of the FoundationDB open source project * * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #if defined(NO_INTELLISENSE) && !defined(FDBSERVER_TAGPARTITIONEDLOGSYSTEM_ACTOR_G_H) #define FDBSERVER_TAGPARTITIONEDLOGSYSTEM_ACTOR_G_H #include "fdbserver/TagPartitionedLogSystem.actor.g.h" #elif !defined(FDBSERVER_TAGPARTITIONEDLOGSYSTEM_ACTOR_H) #define FDBSERVER_TAGPARTITIONEDLOGSYSTEM_ACTOR_H #pragma once #include "fdbclient/SystemData.h" #include "fdbrpc/Replication.h" #include "fdbrpc/ReplicationUtils.h" #include "fdbrpc/simulator.h" #include "fdbserver/DBCoreState.h" #include "fdbserver/Knobs.h" #include "fdbserver/LogProtocolMessage.h" #include "fdbserver/LogSystem.h" #include "fdbserver/RecoveryState.h" #include "fdbserver/ServerDBInfo.h" #include "fdbserver/WaitFailure.h" #include "flow/ActorCollection.h" #include "flow/actorcompiler.h" // This must be the last #include. // TagPartitionedLogSystem info in old epoch struct OldLogData { std::vector> tLogs; int32_t logRouterTags; int32_t txsTags; // The number of txsTags, which may change across generations. Version epochBegin, epochEnd; std::set pseudoLocalities; LogEpoch epoch; OldLogData() : logRouterTags(0), txsTags(0), epochBegin(0), epochEnd(0), epoch(0) {} // Constructor for T of OldTLogConf and OldTLogCoreData template explicit OldLogData(const T& conf) : logRouterTags(conf.logRouterTags), txsTags(conf.txsTags), epochBegin(conf.epochBegin), epochEnd(conf.epochEnd), pseudoLocalities(conf.pseudoLocalities), epoch(conf.epoch) { tLogs.resize(conf.tLogs.size()); for (int j = 0; j < conf.tLogs.size(); j++) { auto logSet = makeReference(conf.tLogs[j]); tLogs[j] = logSet; } } }; struct LogLockInfo { Version epochEnd; bool isCurrent; Reference logSet; std::vector> replies; LogLockInfo() : epochEnd(std::numeric_limits::max()), isCurrent(false) {} }; struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted { const UID dbgid; LogSystemType logSystemType; std::vector> tLogs; // LogSets in different locations: primary, satellite, or remote int expectedLogSets; int logRouterTags; int txsTags; UID recruitmentID; int repopulateRegionAntiQuorum; bool stopped; std::set pseudoLocalities; // Represent special localities that will be mapped to tagLocalityLogRouter const LogEpoch epoch; LogEpoch oldestBackupEpoch; // new members std::map pseudoLocalityPopVersion; Future rejoins; Future recoveryComplete; Future remoteRecovery; Future remoteRecoveryComplete; std::vector lockResults; AsyncVar recoveryCompleteWrittenToCoreState; bool remoteLogsWrittenToCoreState; bool hasRemoteServers; AsyncTrigger backupWorkerChanged; std::set removedBackupWorkers; // Workers that are removed before setting them. Optional recoverAt; Optional recoveredAt; Version knownCommittedVersion; Version backupStartVersion = invalidVersion; // max(tLogs[0].startVersion, previous epochEnd). std::map rvLogs; // recovery versions per tlog LocalityData locality; // For each currently running popFromLog actor, outstandingPops is // (logID, tag)->(max popped version, durableKnownCommittedVersion). // Why do we need durableKnownCommittedVersion? knownCommittedVersion gives the lower bound of what data // will need to be copied into the next generation to restore the replication factor. // Guess: It probably serves as a minimum version of what data should be on a TLog in the next generation and // sending a pop for anything less than durableKnownCommittedVersion for the TLog will be absurd. std::map, std::pair> outstandingPops; Optional>> addActor; ActorCollection popActors; std::vector oldLogData; // each element has the log info. in one old epoch. AsyncTrigger logSystemConfigChanged; TagPartitionedLogSystem(UID dbgid, LocalityData locality, LogEpoch e, Optional>> addActor = Optional>>()) : dbgid(dbgid), logSystemType(LogSystemType::empty), expectedLogSets(0), logRouterTags(0), txsTags(0), repopulateRegionAntiQuorum(0), stopped(false), epoch(e), oldestBackupEpoch(0), recoveryCompleteWrittenToCoreState(false), remoteLogsWrittenToCoreState(false), hasRemoteServers(false), locality(locality), addActor(addActor), popActors(false) {} void stopRejoins() final; void addref() final; void delref() final; std::string describe() const final; UID getDebugID() const final; void addPseudoLocality(int8_t locality); Tag getPseudoPopTag(Tag tag, ProcessClass::ClassType type) const final; bool hasPseudoLocality(int8_t locality) const final; // Return the min version of all pseudoLocalities, i.e., logRouter and backupTag Version popPseudoLocalityTag(Tag tag, Version upTo) final; static Future recoverAndEndEpoch(Reference>> const& outLogSystem, UID const& dbgid, DBCoreState const& oldState, FutureStream const& rejoins, LocalityData const& locality, bool* forceRecovery); static Reference fromLogSystemConfig(UID const& dbgid, LocalityData const& locality, LogSystemConfig const& lsConf, bool excludeRemote, bool useRecoveredAt, Optional>> addActor); static Reference fromOldLogSystemConfig(UID const& dbgid, LocalityData const& locality, LogSystemConfig const& lsConf); // Convert TagPartitionedLogSystem to DBCoreState and override input newState as return value void toCoreState(DBCoreState& newState) final; bool remoteStorageRecovered() final; Future onCoreStateChanged() final; void coreStateWritten(DBCoreState const& newState) final; Future onError() final; ACTOR static Future onError_internal(TagPartitionedLogSystem* self); ACTOR static Future pushResetChecker(Reference self, NetworkAddress addr); ACTOR static Future recordPushMetrics(Reference self, Reference dist, NetworkAddress addr, Future in); Future push(Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, LogPushData& data, SpanID const& spanContext, Optional debugID, Optional> tpcvMap) final; Reference peekAll(UID dbgid, Version begin, Version end, Tag tag, bool parallelGetMore); Reference peekRemote(UID dbgid, Version begin, Optional end, Tag tag, bool parallelGetMore); Reference peek(UID dbgid, Version begin, Optional end, Tag tag, bool parallelGetMore) final; Reference peek(UID dbgid, Version begin, Optional end, std::vector tags, bool parallelGetMore) final; Reference peekLocal(UID dbgid, Tag tag, Version begin, Version end, bool useMergePeekCursors, int8_t peekLocality = tagLocalityInvalid); Reference peekTxs(UID dbgid, Version begin, int8_t peekLocality, Version localEnd, bool canDiscardPopped) final; Reference peekSingle(UID dbgid, Version begin, Tag tag, std::vector> history) final; // LogRouter or BackupWorker use this function to obtain a cursor for peeking tlogs of a generation (i.e., epoch). // Specifically, the epoch is determined by looking up "dbgid" in tlog sets of generations. // The returned cursor can peek data at the "tag" from the given "begin" version to that epoch's end version or // the recovery version for the latest old epoch. For the current epoch, the cursor has no end version. Reference peekLogRouter(UID dbgid, Version begin, Tag tag) final; Version getKnownCommittedVersion() final; Future onKnownCommittedVersionChange() final; void popLogRouter(Version upTo, Tag tag, Version durableKnownCommittedVersion, int8_t popLocality); void popTxs(Version upTo, int8_t popLocality) final; // pop 'tag.locality' type data up to the 'upTo' version void pop(Version upTo, Tag tag, Version durableKnownCommittedVersion, int8_t popLocality) final; // pop tag from log up to the version defined in self->outstandingPops[].first ACTOR static Future popFromLog(TagPartitionedLogSystem* self, Reference>> log, Tag tag, double time); ACTOR static Future getPoppedFromTLog(Reference>> log, Tag tag); ACTOR static Future getPoppedTxs(TagPartitionedLogSystem* self); Future getTxsPoppedVersion() final; ACTOR static Future confirmEpochLive_internal(Reference logSet, Optional debugID); // Returns success after confirming that pushes in the current epoch are still possible Future confirmEpochLive(Optional debugID) final; Future endEpoch() final; // Call only after end_epoch() has successfully completed. Returns a new epoch immediately following this one. // The new epoch is only provisional until the caller updates the coordinated DBCoreState. Future> newEpoch(RecruitFromConfigurationReply const& recr, Future const& fRemoteWorkers, UID clusterId, DatabaseConfiguration const& config, LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality, std::vector const& allTags, Reference> const& recruitmentStalled) final; LogSystemConfig getLogSystemConfig() const final; Standalone getLogsValue() const final; Future onLogSystemConfigChange() final; Version getEnd() const final; Version getPeekEnd() const; void getPushLocations(VectorRef tags, std::vector& locations, bool allLocations) const final; bool hasRemoteLogs() const final; Tag getRandomRouterTag() const final; Tag getRandomTxsTag() const final; TLogVersion getTLogVersion() const final; int getLogRouterTags() const final; Version getBackupStartVersion() const final; std::map getOldEpochTagsVersionsInfo() const final; inline Reference getEpochLogSet(LogEpoch epoch) const; void setBackupWorkers(const std::vector& replies) final; bool removeBackupWorker(const BackupWorkerDoneRequest& req) final; LogEpoch getOldestBackupEpoch() const final; void setOldestBackupEpoch(LogEpoch epoch) final; ACTOR static Future monitorLog(Reference>> logServer, Reference> failed); // returns the log group's knownComittedVersion, DV, and a vector of TLogLockResults for each tLog in the group. Optional>> static getDurableVersion( UID dbgid, LogLockInfo lockInfo, std::vector>> failed = std::vector>>(), Optional lastEnd = Optional()); ACTOR static Future getDurableVersionChanged( LogLockInfo lockInfo, std::vector>> failed = std::vector>>()); ACTOR static Future epochEnd(Reference>> outLogSystem, UID dbgid, DBCoreState prevState, FutureStream rejoinRequests, LocalityData locality, bool* forceRecovery); ACTOR static Future recruitOldLogRouters(TagPartitionedLogSystem* self, std::vector workers, LogEpoch recoveryCount, int8_t locality, Version startVersion, std::vector tLogLocalities, Reference tLogPolicy, bool forRemote); static Version getMaxLocalStartVersion(const std::vector>& tLogs); static std::vector getLocalTags(int8_t locality, const std::vector& allTags); ACTOR static Future newRemoteEpoch(TagPartitionedLogSystem* self, Reference oldLogSystem, Future fRemoteWorkers, UID clusterId, DatabaseConfiguration configuration, LogEpoch recoveryCount, int8_t remoteLocality, std::vector allTags); ACTOR static Future> newEpoch(Reference oldLogSystem, RecruitFromConfigurationReply recr, Future fRemoteWorkers, UID clusterId, DatabaseConfiguration configuration, LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality, std::vector allTags, Reference> recruitmentStalled); ACTOR static Future trackRejoins( UID dbgid, std::vector>>, Reference>> logServers, FutureStream rejoinRequests); ACTOR static Future lockTLog(UID myID, Reference>> tlog); template static std::vector getReadyNonError(std::vector> const& futures); }; template std::vector TagPartitionedLogSystem::getReadyNonError(std::vector> const& futures) { // Return the values of those futures which have (non-error) values ready std::vector result; for (auto& f : futures) if (f.isReady() && !f.isError()) result.push_back(f.get()); return result; } #include "flow/unactorcompiler.h" #endif // FDBSERVER_TAGPARTITIONEDLOGSYSTEM_ACTOR_H