/* * DataDistribution.actor.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include "fdbclient/DatabaseContext.h" #include "fdbclient/FDBOptions.g.h" #include "fdbclient/FDBTypes.h" #include "fdbclient/Knobs.h" #include "fdbclient/ManagementAPI.actor.h" #include "fdbclient/RunTransaction.actor.h" #include "fdbclient/StorageServerInterface.h" #include "fdbclient/SystemData.h" #include "fdbclient/Tenant.h" #include "fdbrpc/Replication.h" #include "fdbserver/DataDistribution.actor.h" #include "fdbserver/DDTeamCollection.h" #include "fdbserver/FDBExecHelper.actor.h" #include "fdbserver/IKeyValueStore.h" #include "fdbserver/Knobs.h" #include "fdbserver/QuietDatabase.h" #include "fdbserver/ServerDBInfo.h" #include "fdbserver/TenantCache.h" #include "fdbserver/TLogInterface.h" #include "fdbserver/WaitFailure.h" #include "fdbserver/workloads/workloads.actor.h" #include "flow/ActorCollection.h" #include "flow/Arena.h" #include "flow/BooleanParam.h" #include "flow/genericactors.actor.h" #include "flow/serialize.h" #include "flow/Trace.h" #include "flow/UnitTest.h" #include "flow/actorcompiler.h" // This must be the last #include. void DataMove::validateShard(const DDShardInfo& shard, KeyRangeRef range, int priority) { if (!valid) { if (shard.hasDest && shard.destId != anonymousShardId) { TraceEvent(SevError, "DataMoveValidationError") .detail("Range", range) .detail("Reason", "DataMoveMissing") .detail("ShardPrimaryDest", describe(shard.primaryDest)) .detail("ShardRemoteDest", describe(shard.remoteDest)); } return; } ASSERT(this->meta.range.contains(range)); if (!shard.hasDest) { TraceEvent(SevError, "DataMoveValidationError") .detail("Range", range) .detail("Reason", "ShardMissingDest") .detail("DataMoveMetaData", this->meta.toString()) .detail("DataMovePrimaryDest", describe(this->primaryDest)) .detail("DataMoveRemoteDest", describe(this->remoteDest)); cancelled = true; return; } if (shard.destId != this->meta.id) { TraceEvent(SevError, "DataMoveValidationError") .detail("Range", range) .detail("Reason", "DataMoveIDMissMatch") .detail("DataMoveMetaData", this->meta.toString()) .detail("ShardMoveID", shard.destId); cancelled = true; return; } if (!std::includes( this->primaryDest.begin(), this->primaryDest.end(), shard.primaryDest.begin(), shard.primaryDest.end()) || !std::includes( this->remoteDest.begin(), this->remoteDest.end(), shard.remoteDest.begin(), shard.remoteDest.end())) { TraceEvent(SevError, "DataMoveValidationError") .detail("Range", range) .detail("Reason", "DataMoveDestMissMatch") .detail("DataMoveMetaData", this->meta.toString()) .detail("DataMovePrimaryDest", describe(this->primaryDest)) .detail("DataMoveRemoteDest", describe(this->remoteDest)) .detail("ShardPrimaryDest", describe(shard.primaryDest)) .detail("ShardRemoteDest", describe(shard.remoteDest)); cancelled = true; } } Future StorageWiggler::onCheck() const { return delay(MIN_ON_CHECK_DELAY_SEC); } // add server to wiggling queue void StorageWiggler::addServer(const UID& serverId, const StorageMetadataType& metadata) { // std::cout << "size: " << pq_handles.size() << " add " << serverId.toString() << " DC: " // << teamCollection->isPrimary() << std::endl; ASSERT(!pq_handles.count(serverId)); pq_handles[serverId] = wiggle_pq.emplace(metadata, serverId); } void StorageWiggler::removeServer(const UID& serverId) { // std::cout << "size: " << pq_handles.size() << " remove " << serverId.toString() << " DC: " // << teamCollection->isPrimary() << std::endl; if (contains(serverId)) { // server haven't been popped auto handle = pq_handles.at(serverId); pq_handles.erase(serverId); wiggle_pq.erase(handle); } } void StorageWiggler::updateMetadata(const UID& serverId, const StorageMetadataType& metadata) { // std::cout << "size: " << pq_handles.size() << " update " << serverId.toString() // << " DC: " << teamCollection->isPrimary() << std::endl; auto handle = pq_handles.at(serverId); if ((*handle).first == metadata) { return; } wiggle_pq.update(handle, std::make_pair(metadata, serverId)); } bool StorageWiggler::necessary(const UID& serverId, const StorageMetadataType& metadata) const { return metadata.wrongConfigured || (now() - metadata.createdTime > SERVER_KNOBS->DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC); } Optional StorageWiggler::getNextServerId(bool necessaryOnly) { if (!wiggle_pq.empty()) { auto [metadata, id] = wiggle_pq.top(); if (necessaryOnly && !necessary(id, metadata)) { return {}; } wiggle_pq.pop(); pq_handles.erase(id); return Optional(id); } return Optional(); } Future StorageWiggler::resetStats() { auto newMetrics = StorageWiggleMetrics(); newMetrics.smoothed_round_duration = metrics.smoothed_round_duration; newMetrics.smoothed_wiggle_duration = metrics.smoothed_wiggle_duration; return StorageWiggleMetrics::runSetTransaction(teamCollection->cx, teamCollection->isPrimary(), newMetrics); } Future StorageWiggler::restoreStats() { auto& metricsRef = metrics; auto assignFunc = [&metricsRef](Optional v) { if (v.present()) { metricsRef = BinaryReader::fromStringRef(v.get(), IncludeVersion()); } return Void(); }; auto readFuture = StorageWiggleMetrics::runGetTransaction(teamCollection->cx, teamCollection->isPrimary()); return map(readFuture, assignFunc); } Future StorageWiggler::startWiggle() { metrics.last_wiggle_start = StorageMetadataType::currentTime(); if (shouldStartNewRound()) { metrics.last_round_start = metrics.last_wiggle_start; } return StorageWiggleMetrics::runSetTransaction(teamCollection->cx, teamCollection->isPrimary(), metrics); } Future StorageWiggler::finishWiggle() { metrics.last_wiggle_finish = StorageMetadataType::currentTime(); metrics.finished_wiggle += 1; auto duration = metrics.last_wiggle_finish - metrics.last_wiggle_start; metrics.smoothed_wiggle_duration.setTotal((double)duration); if (shouldFinishRound()) { metrics.last_round_finish = metrics.last_wiggle_finish; metrics.finished_round += 1; duration = metrics.last_round_finish - metrics.last_round_start; metrics.smoothed_round_duration.setTotal((double)duration); } return StorageWiggleMetrics::runSetTransaction(teamCollection->cx, teamCollection->isPrimary(), metrics); } ACTOR Future remoteRecovered(Reference const> db) { TraceEvent("DDTrackerStarting").log(); while (db->get().recoveryState < RecoveryState::ALL_LOGS_RECRUITED) { TraceEvent("DDTrackerStarting").detail("RecoveryState", (int)db->get().recoveryState); wait(db->onChange()); } return Void(); } ACTOR Future isDataDistributionEnabled(Database cx, const DDEnabledState* ddEnabledState) { state Transaction tr(cx); loop { try { Optional mode = wait(tr.get(dataDistributionModeKey)); if (!mode.present() && ddEnabledState->isDDEnabled()) return true; if (mode.present()) { BinaryReader rd(mode.get(), Unversioned()); int m; rd >> m; if (m && ddEnabledState->isDDEnabled()) { TraceEvent(SevDebug, "IsDDEnabledSucceeded") .detail("Mode", m) .detail("IsDDEnabled", ddEnabledState->isDDEnabled()); return true; } } // SOMEDAY: Write a wrapper in MoveKeys.actor.h Optional readVal = wait(tr.get(moveKeysLockOwnerKey)); UID currentOwner = readVal.present() ? BinaryReader::fromStringRef(readVal.get(), Unversioned()) : UID(); if (ddEnabledState->isDDEnabled() && (currentOwner != dataDistributionModeLock)) { TraceEvent(SevDebug, "IsDDEnabledSucceeded") .detail("CurrentOwner", currentOwner) .detail("DDModeLock", dataDistributionModeLock) .detail("IsDDEnabled", ddEnabledState->isDDEnabled()); return true; } TraceEvent(SevDebug, "IsDDEnabledFailed") .detail("CurrentOwner", currentOwner) .detail("DDModeLock", dataDistributionModeLock) .detail("IsDDEnabled", ddEnabledState->isDDEnabled()); return false; } catch (Error& e) { wait(tr.onError(e)); } } } // Ensures that the serverKeys key space is properly coalesced // This method is only used for testing and is not implemented in a manner that is safe for large databases ACTOR Future debugCheckCoalescing(Database cx) { state Transaction tr(cx); loop { try { state RangeResult serverList = wait(tr.getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY)); ASSERT(!serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY); state int i; for (i = 0; i < serverList.size(); i++) { state UID id = decodeServerListValue(serverList[i].value).id(); RangeResult ranges = wait(krmGetRanges(&tr, serverKeysPrefixFor(id), allKeys)); ASSERT(ranges.end()[-1].key == allKeys.end); for (int j = 0; j < ranges.size() - 2; j++) if (ranges[j].value == ranges[j + 1].value) TraceEvent(SevError, "UncoalescedValues", id) .detail("Key1", ranges[j].key) .detail("Key2", ranges[j + 1].key) .detail("Value", ranges[j].value); } TraceEvent("DoneCheckingCoalescing").log(); return Void(); } catch (Error& e) { wait(tr.onError(e)); } } } static std::set const& normalDDQueueErrors() { static std::set s; if (s.empty()) { s.insert(error_code_movekeys_conflict); s.insert(error_code_broken_promise); s.insert(error_code_data_move_cancelled); s.insert(error_code_data_move_dest_team_not_found); } return s; } ACTOR Future pollMoveKeysLock(Database cx, MoveKeysLock lock, const DDEnabledState* ddEnabledState) { loop { wait(delay(SERVER_KNOBS->MOVEKEYS_LOCK_POLLING_DELAY)); state Transaction tr(cx); loop { try { wait(checkMoveKeysLockReadOnly(&tr, lock, ddEnabledState)); break; } catch (Error& e) { wait(tr.onError(e)); } } } } struct DataDistributor : NonCopyable, ReferenceCounted { public: Reference const> dbInfo; UID ddId; PromiseStream> addActor; // State initialized when bootstrap std::shared_ptr txnProcessor; MoveKeysLock lock; DatabaseConfiguration configuration; std::vector> primaryDcId; std::vector> remoteDcIds; Reference initData; Reference initialDDEventHolder; Reference movingDataEventHolder; Reference totalDataInFlightEventHolder; Reference totalDataInFlightRemoteEventHolder; // Optional components that can be set after ::init(). They're optional when test, but required for DD being // fully-functional. DDTeamCollection* teamCollection; Reference shardsAffectedByTeamFailure; // consumer is a yield stream from producer. The RelocateShard is pushed into relocationProducer and popped from // relocationConsumer (by DDQueue) PromiseStream relocationProducer, relocationConsumer; DataDistributor(Reference const> const& db, UID id) : dbInfo(db), ddId(id), txnProcessor(nullptr), initialDDEventHolder(makeReference("InitialDD")), movingDataEventHolder(makeReference("MovingData")), totalDataInFlightEventHolder(makeReference("TotalDataInFlight")), totalDataInFlightRemoteEventHolder(makeReference("TotalDataInFlightRemote")), teamCollection(nullptr) {} // bootstrap steps Future takeMoveKeysLock() { return store(lock, txnProcessor->takeMoveKeysLock(ddId)); } Future loadDatabaseConfiguration() { return store(configuration, txnProcessor->getDatabaseConfiguration()); } Future updateReplicaKeys() { return txnProcessor->updateReplicaKeys(primaryDcId, remoteDcIds, configuration); } Future loadInitialDataDistribution(const DDEnabledState* ddEnabledState) { return store(initData, txnProcessor->getInitialDataDistribution( ddId, lock, configuration.usableRegions > 1 ? remoteDcIds : std::vector>(), ddEnabledState)); } void initDcInfo() { primaryDcId.clear(); remoteDcIds.clear(); const std::vector& regions = configuration.regions; if (configuration.regions.size() > 0) { primaryDcId.push_back(regions[0].dcId); } if (configuration.regions.size() > 1) { remoteDcIds.push_back(regions[1].dcId); } } Future waitDataDistributorEnabled(const DDEnabledState* ddEnabledState) const { return txnProcessor->waitForDataDistributionEnabled(ddEnabledState); } // Initialize the required internal states of DataDistributor. It's necessary before DataDistributor start working. // Doesn't include initialization of optional components, like TenantCache, DDQueue, Tracker, TeamCollection. The // components should call its own ::init methods. ACTOR static Future init(Reference self, const DDEnabledState* ddEnabledState) { loop { TraceEvent("DDInitTakingMoveKeysLock", self->ddId).log(); wait(self->takeMoveKeysLock()); TraceEvent("DDInitTookMoveKeysLock", self->ddId).log(); wait(self->loadDatabaseConfiguration()); self->initDcInfo(); TraceEvent("DDInitGotConfiguration", self->ddId).detail("Conf", self->configuration.toString()); wait(self->updateReplicaKeys()); TraceEvent("DDInitUpdatedReplicaKeys", self->ddId).log(); wait(self->loadInitialDataDistribution(ddEnabledState)); if (self->initData->shards.size() > 1) { TraceEvent("DDInitGotInitialDD", self->ddId) .detail("B", self->initData->shards.end()[-2].key) .detail("E", self->initData->shards.end()[-1].key) .detail("Src", describe(self->initData->shards.end()[-2].primarySrc)) .detail("Dest", describe(self->initData->shards.end()[-2].primaryDest)) .trackLatest(self->initialDDEventHolder->trackingKey); } else { TraceEvent("DDInitGotInitialDD", self->ddId) .detail("B", "") .detail("E", "") .detail("Src", "[no items]") .detail("Dest", "[no items]") .trackLatest(self->initialDDEventHolder->trackingKey); } if (self->initData->mode && ddEnabledState->isDDEnabled()) { // mode may be set true by system operator using fdbcli and isDDEnabled() set to true break; } TraceEvent("DataDistributionDisabled", self->ddId).log(); TraceEvent("MovingData", self->ddId) .detail("InFlight", 0) .detail("InQueue", 0) .detail("AverageShardSize", -1) .detail("UnhealthyRelocations", 0) .detail("HighestPriority", 0) .detail("BytesWritten", 0) .detail("PriorityRecoverMove", 0) .detail("PriorityRebalanceUnderutilizedTeam", 0) .detail("PriorityRebalannceOverutilizedTeam", 0) .detail("PriorityTeamHealthy", 0) .detail("PriorityTeamContainsUndesiredServer", 0) .detail("PriorityTeamRedundant", 0) .detail("PriorityMergeShard", 0) .detail("PriorityTeamUnhealthy", 0) .detail("PriorityTeam2Left", 0) .detail("PriorityTeam1Left", 0) .detail("PriorityTeam0Left", 0) .detail("PrioritySplitShard", 0) .trackLatest(self->movingDataEventHolder->trackingKey); TraceEvent("TotalDataInFlight", self->ddId) .detail("Primary", true) .detail("TotalBytes", 0) .detail("UnhealthyServers", 0) .detail("HighestPriority", 0) .trackLatest(self->totalDataInFlightEventHolder->trackingKey); TraceEvent("TotalDataInFlight", self->ddId) .detail("Primary", false) .detail("TotalBytes", 0) .detail("UnhealthyServers", 0) .detail("HighestPriority", self->configuration.usableRegions > 1 ? 0 : -1) .trackLatest(self->totalDataInFlightRemoteEventHolder->trackingKey); wait(self->waitDataDistributorEnabled(ddEnabledState)); TraceEvent("DataDistributionEnabled").log(); } return Void(); } ACTOR static Future resumeFromShards(Reference self, bool traceShard) { state int shard = 0; for (; shard < self->initData->shards.size() - 1; shard++) { const DDShardInfo& iShard = self->initData->shards[shard]; KeyRangeRef keys = KeyRangeRef(iShard.key, self->initData->shards[shard + 1].key); self->shardsAffectedByTeamFailure->defineShard(keys); std::vector teams; teams.push_back(ShardsAffectedByTeamFailure::Team(iShard.primarySrc, true)); if (self->configuration.usableRegions > 1) { teams.push_back(ShardsAffectedByTeamFailure::Team(iShard.remoteSrc, false)); } if (traceShard) { TraceEvent(SevDebug, "DDInitShard") .detail("Keys", keys) .detail("PrimarySrc", describe(iShard.primarySrc)) .detail("RemoteSrc", describe(iShard.remoteSrc)) .detail("PrimaryDest", describe(iShard.primaryDest)) .detail("RemoteDest", describe(iShard.remoteDest)) .detail("SrcID", iShard.srcId) .detail("DestID", iShard.destId); } self->shardsAffectedByTeamFailure->moveShard(keys, teams); if (iShard.hasDest && iShard.destId == anonymousShardId) { // This shard is already in flight. Ideally we should use dest in ShardsAffectedByTeamFailure and // generate a dataDistributionRelocator directly in DataDistributionQueue to track it, but it's // easier to just (with low priority) schedule it for movement. bool unhealthy = iShard.primarySrc.size() != self->configuration.storageTeamSize; if (!unhealthy && self->configuration.usableRegions > 1) { unhealthy = iShard.remoteSrc.size() != self->configuration.storageTeamSize; } self->relocationProducer.send( RelocateShard(keys, unhealthy ? DataMovementReason::TEAM_UNHEALTHY : DataMovementReason::RECOVER_MOVE, RelocateReason::OTHER)); } wait(yield(TaskPriority::DataDistribution)); } return Void(); } // TODO: unit test needed ACTOR static Future resumeFromDataMoves(Reference self, Future readyToStart) { state KeyRangeMap>::iterator it = self->initData->dataMoveMap.ranges().begin(); wait(readyToStart); for (; it != self->initData->dataMoveMap.ranges().end(); ++it) { const DataMoveMetaData& meta = it.value()->meta; if (it.value()->isCancelled() || (it.value()->valid && !SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA)) { RelocateShard rs(meta.range, DataMovementReason::RECOVER_MOVE, RelocateReason::OTHER); rs.dataMoveId = meta.id; rs.cancelled = true; self->relocationProducer.send(rs); TraceEvent("DDInitScheduledCancelDataMove", self->ddId).detail("DataMove", meta.toString()); } else if (it.value()->valid) { TraceEvent(SevDebug, "DDInitFoundDataMove", self->ddId).detail("DataMove", meta.toString()); ASSERT(meta.range == it.range()); // TODO: Persist priority in DataMoveMetaData. RelocateShard rs(meta.range, DataMovementReason::RECOVER_MOVE, RelocateReason::OTHER); rs.dataMoveId = meta.id; rs.dataMove = it.value(); std::vector teams; teams.push_back(ShardsAffectedByTeamFailure::Team(rs.dataMove->primaryDest, true)); if (!rs.dataMove->remoteDest.empty()) { teams.push_back(ShardsAffectedByTeamFailure::Team(rs.dataMove->remoteDest, false)); } // Since a DataMove could cover more than one keyrange, e.g., during merge, we need to define // the target shard and restart the shard tracker. self->shardsAffectedByTeamFailure->restartShardTracker.send(rs.keys); self->shardsAffectedByTeamFailure->defineShard(rs.keys); // When restoring a DataMove, the destination team is determined, and hence we need to register // the data move now, so that team failures can be captured. self->shardsAffectedByTeamFailure->moveShard(rs.keys, teams); self->relocationProducer.send(rs); wait(yield(TaskPriority::DataDistribution)); } } return Void(); } // Resume inflight relocations from the previous DD // TODO: The initialDataDistribution is unused once resumeRelocations and // DataDistributionTracker::trackInitialShards are done. In the future, we can release the object to save memory // usage if it turns out to be a problem. Future resumeRelocations() { ASSERT(shardsAffectedByTeamFailure); // has to be allocated Future shardsReady = resumeFromShards(Reference::addRef(this), g_network->isSimulated()); return resumeFromDataMoves(Reference::addRef(this), shardsReady); } }; // Runs the data distribution algorithm for FDB, including the DD Queue, DD tracker, and DD team collection ACTOR Future dataDistribution(Reference self, PromiseStream getShardMetricsList, const DDEnabledState* ddEnabledState) { state Database cx = openDBOnServer(self->dbInfo, TaskPriority::DataDistributionLaunch, LockAware::True); cx->locationCacheSize = SERVER_KNOBS->DD_LOCATION_CACHE_SIZE; self->txnProcessor = std::shared_ptr(new DDTxnProcessor(cx)); // cx->setOption( FDBDatabaseOptions::LOCATION_CACHE_SIZE, StringRef((uint8_t*) // &SERVER_KNOBS->DD_LOCATION_CACHE_SIZE, 8) ); ASSERT( cx->locationCacheSize == // SERVER_KNOBS->DD_LOCATION_CACHE_SIZE // ); // wait(debugCheckCoalescing(cx)); // FIXME: wrap the bootstrap process into class DataDistributor state Reference primaryTeamCollection; state Reference remoteTeamCollection; state bool trackerCancelled; state bool ddIsTenantAware = SERVER_KNOBS->DD_TENANT_AWARENESS_ENABLED; loop { trackerCancelled = false; // Stored outside of data distribution tracker to avoid slow tasks // when tracker is cancelled state KeyRangeMap shards; state Promise removeFailedServer; try { wait(DataDistributor::init(self, ddEnabledState)); state Reference ddTenantCache; if (ddIsTenantAware) { ddTenantCache = makeReference(cx, self->ddId); wait(ddTenantCache->build(cx)); } // When/If this assertion fails, Evan owes Ben a pat on the back for his foresight ASSERT(self->configuration.storageTeamSize > 0); state PromiseStream> getAverageShardBytes; state PromiseStream> getUnhealthyRelocationCount; state PromiseStream getShardMetrics; state PromiseStream getTopKShardMetrics; state Reference> processingUnhealthy(new AsyncVar(false)); state Reference> processingWiggle(new AsyncVar(false)); state Promise readyToStart; self->shardsAffectedByTeamFailure = makeReference(); wait(self->resumeRelocations()); std::vector tcis; // primary and remote region interface Reference> anyZeroHealthyTeams; // true if primary or remote has zero healthy team std::vector>> zeroHealthyTeams; // primary and remote tcis.push_back(TeamCollectionInterface()); zeroHealthyTeams.push_back(makeReference>(true)); int storageTeamSize = self->configuration.storageTeamSize; std::vector> actors; // the container of ACTORs if (self->configuration.usableRegions > 1) { tcis.push_back(TeamCollectionInterface()); storageTeamSize = 2 * self->configuration.storageTeamSize; zeroHealthyTeams.push_back(makeReference>(true)); anyZeroHealthyTeams = makeReference>(true); actors.push_back(anyTrue(zeroHealthyTeams, anyZeroHealthyTeams)); } else { anyZeroHealthyTeams = zeroHealthyTeams[0]; } if (ddIsTenantAware) { actors.push_back(reportErrorsExcept( ddTenantCache->monitorTenantMap(), "DDTenantCacheMonitor", self->ddId, &normalDDQueueErrors())); } actors.push_back(pollMoveKeysLock(cx, self->lock, ddEnabledState)); actors.push_back(reportErrorsExcept(dataDistributionTracker(self->initData, cx, self->relocationProducer, self->shardsAffectedByTeamFailure, getShardMetrics, getTopKShardMetrics.getFuture(), getShardMetricsList, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, self->ddId, &shards, &trackerCancelled), "DDTracker", self->ddId, &normalDDQueueErrors())); actors.push_back(reportErrorsExcept(dataDistributionQueue(cx, self->relocationProducer, self->relocationConsumer.getFuture(), getShardMetrics, getTopKShardMetrics, processingUnhealthy, processingWiggle, tcis, self->shardsAffectedByTeamFailure, self->lock, getAverageShardBytes, getUnhealthyRelocationCount.getFuture(), self->ddId, storageTeamSize, self->configuration.storageTeamSize, ddEnabledState), "DDQueue", self->ddId, &normalDDQueueErrors())); std::vector teamCollectionsPtrs; primaryTeamCollection = makeReference( cx, self->ddId, self->lock, self->relocationProducer, self->shardsAffectedByTeamFailure, self->configuration, self->primaryDcId, self->configuration.usableRegions > 1 ? self->remoteDcIds : std::vector>(), readyToStart.getFuture(), zeroHealthyTeams[0], IsPrimary::True, processingUnhealthy, processingWiggle, getShardMetrics, removeFailedServer, getUnhealthyRelocationCount); teamCollectionsPtrs.push_back(primaryTeamCollection.getPtr()); auto recruitStorage = IAsyncListener>::create( self->dbInfo, [](auto const& info) { return info.clusterInterface.recruitStorage; }); if (self->configuration.usableRegions > 1) { remoteTeamCollection = makeReference(cx, self->ddId, self->lock, self->relocationProducer, self->shardsAffectedByTeamFailure, self->configuration, self->remoteDcIds, Optional>>(), readyToStart.getFuture() && remoteRecovered(self->dbInfo), zeroHealthyTeams[1], IsPrimary::False, processingUnhealthy, processingWiggle, getShardMetrics, removeFailedServer, getUnhealthyRelocationCount); teamCollectionsPtrs.push_back(remoteTeamCollection.getPtr()); remoteTeamCollection->teamCollections = teamCollectionsPtrs; actors.push_back(reportErrorsExcept( DDTeamCollection::run( remoteTeamCollection, self->initData, tcis[1], recruitStorage, *ddEnabledState), "DDTeamCollectionSecondary", self->ddId, &normalDDQueueErrors())); actors.push_back(DDTeamCollection::printSnapshotTeamsInfo(remoteTeamCollection)); } primaryTeamCollection->teamCollections = teamCollectionsPtrs; self->teamCollection = primaryTeamCollection.getPtr(); actors.push_back(reportErrorsExcept( DDTeamCollection::run(primaryTeamCollection, self->initData, tcis[0], recruitStorage, *ddEnabledState), "DDTeamCollectionPrimary", self->ddId, &normalDDQueueErrors())); actors.push_back(DDTeamCollection::printSnapshotTeamsInfo(primaryTeamCollection)); actors.push_back(yieldPromiseStream(self->relocationProducer.getFuture(), self->relocationConsumer)); wait(waitForAll(actors)); return Void(); } catch (Error& e) { trackerCancelled = true; state Error err = e; TraceEvent("DataDistributorDestroyTeamCollections").error(e); state std::vector teamForDroppedRange; if (removeFailedServer.getFuture().isReady() && !removeFailedServer.getFuture().isError()) { // Choose a random healthy team to host the to-be-dropped range. const UID serverID = removeFailedServer.getFuture().get(); std::vector pTeam = primaryTeamCollection->getRandomHealthyTeam(serverID); teamForDroppedRange.insert(teamForDroppedRange.end(), pTeam.begin(), pTeam.end()); if (self->configuration.usableRegions > 1) { std::vector rTeam = remoteTeamCollection->getRandomHealthyTeam(serverID); teamForDroppedRange.insert(teamForDroppedRange.end(), rTeam.begin(), rTeam.end()); } } self->teamCollection = nullptr; primaryTeamCollection = Reference(); remoteTeamCollection = Reference(); if (err.code() == error_code_actor_cancelled) { // When cancelled, we cannot clear asyncronously because // this will result in invalid memory access. This should only // be an issue in simulation. if (!g_network->isSimulated()) { TraceEvent(SevWarnAlways, "DataDistributorCancelled"); } shards.clear(); throw e; } else { wait(shards.clearAsync()); } TraceEvent("DataDistributorTeamCollectionsDestroyed").error(err); if (removeFailedServer.getFuture().isReady() && !removeFailedServer.getFuture().isError()) { TraceEvent("RemoveFailedServer", removeFailedServer.getFuture().get()).error(err); wait(removeKeysFromFailedServer( cx, removeFailedServer.getFuture().get(), teamForDroppedRange, self->lock, ddEnabledState)); Optional tssPairID; wait(removeStorageServer( cx, removeFailedServer.getFuture().get(), tssPairID, self->lock, ddEnabledState)); } else { if (err.code() != error_code_movekeys_conflict) { throw err; } bool ddEnabled = wait(isDataDistributionEnabled(cx, ddEnabledState)); TraceEvent("DataDistributionMoveKeysConflict").error(err).detail("DataDistributionEnabled", ddEnabled); if (ddEnabled) { throw err; } } } } } static std::set const& normalDataDistributorErrors() { static std::set s; if (s.empty()) { s.insert(error_code_worker_removed); s.insert(error_code_broken_promise); s.insert(error_code_actor_cancelled); s.insert(error_code_please_reboot); s.insert(error_code_movekeys_conflict); s.insert(error_code_data_move_cancelled); s.insert(error_code_data_move_dest_team_not_found); } return s; } ACTOR template Future sendSnapReq(RequestStream stream, Req req, Error e) { ErrorOr reply = wait(stream.tryGetReply(req)); if (reply.isError()) { TraceEvent("SnapDataDistributor_ReqError") .errorUnsuppressed(reply.getError()) .detail("ConvertedErrorType", e.what()) .detail("Peer", stream.getEndpoint().getPrimaryAddress()); throw e; } return Void(); } ACTOR Future> trySendSnapReq(RequestStream stream, WorkerSnapRequest req) { state int snapReqRetry = 0; state double snapRetryBackoff = FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY; loop { ErrorOr reply = wait(stream.tryGetReply(req)); if (reply.isError()) { TraceEvent("SnapDataDistributor_ReqError") .errorUnsuppressed(reply.getError()) .detail("Peer", stream.getEndpoint().getPrimaryAddress()) .detail("Retry", snapReqRetry); if (reply.getError().code() != error_code_request_maybe_delivered || ++snapReqRetry > SERVER_KNOBS->SNAP_NETWORK_FAILURE_RETRY_LIMIT) return ErrorOr(reply.getError()); else { // retry for network failures with same snap UID to avoid snapshot twice req = WorkerSnapRequest(req.snapPayload, req.snapUID, req.role); wait(delay(snapRetryBackoff)); snapRetryBackoff = snapRetryBackoff * 2; } } else break; } return ErrorOr(Void()); } ACTOR Future>> getStatefulWorkers( Database cx, Reference const> dbInfo, std::vector* tlogs, int* storageFaultTolerance) { state std::map> result; state std::map workersMap; state Transaction tr(cx); state DatabaseConfiguration configuration; loop { try { // necessary options tr.setOption(FDBTransactionOptions::LOCK_AWARE); tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); // get database configuration DatabaseConfiguration _configuration = wait(getDatabaseConfiguration(&tr)); configuration = _configuration; // get storages RangeResult serverList = wait(tr.getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY)); ASSERT(!serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY); state std::vector storageServers; storageServers.reserve(serverList.size()); for (int i = 0; i < serverList.size(); i++) storageServers.push_back(decodeServerListValue(serverList[i].value)); // get workers state std::vector workers = wait(getWorkers(dbInfo)); for (const auto& worker : workers) { workersMap[worker.interf.address()] = worker.interf; } Optional regionsValue = wait(tr.get(LiteralStringRef("usable_regions").withPrefix(configKeysPrefix))); int usableRegions = 1; if (regionsValue.present()) { usableRegions = atoi(regionsValue.get().toString().c_str()); } auto masterDcId = dbInfo->get().master.locality.dcId(); int storageFailures = 0; for (const auto& server : storageServers) { TraceEvent(SevDebug, "StorageServerDcIdInfo") .detail("Address", server.address().toString()) .detail("ServerLocalityID", server.locality.dcId()) .detail("MasterDcID", masterDcId); if (usableRegions == 1 || server.locality.dcId() == masterDcId) { auto itr = workersMap.find(server.address()); if (itr == workersMap.end()) { TraceEvent(SevWarn, "GetStorageWorkers") .detail("Reason", "Could not find worker for storage server") .detail("SS", server.id()); ++storageFailures; } else { if (result.count(server.address())) { ASSERT(itr->second.id() == result[server.address()].first.id()); if (result[server.address()].second.find("storage") == std::string::npos) result[server.address()].second.append(",storage"); } else { result[server.address()] = std::make_pair(itr->second, "storage"); } } } } // calculate fault tolerance *storageFaultTolerance = std::min(static_cast(SERVER_KNOBS->MAX_STORAGE_SNAPSHOT_FAULT_TOLERANCE), configuration.storageTeamSize - 1) - storageFailures; if (*storageFaultTolerance < 0) { CODE_PROBE(true, "Too many failed storage servers to complete snapshot"); throw snap_storage_failed(); } // tlogs for (const auto& tlog : *tlogs) { TraceEvent(SevDebug, "GetStatefulWorkersTlog").detail("Addr", tlog.address()); if (workersMap.find(tlog.address()) == workersMap.end()) { TraceEvent(SevError, "MissingTlogWorkerInterface").detail("TlogAddress", tlog.address()); throw snap_tlog_failed(); } if (result.count(tlog.address())) { ASSERT(workersMap[tlog.address()].id() == result[tlog.address()].first.id()); result[tlog.address()].second.append(",tlog"); } else { result[tlog.address()] = std::make_pair(workersMap[tlog.address()], "tlog"); } } // get coordinators Optional coordinators = wait(tr.get(coordinatorsKey)); if (!coordinators.present()) { CODE_PROBE(true, "Failed to read the coordinatorsKey"); throw operation_failed(); } ClusterConnectionString ccs(coordinators.get().toString()); std::vector coordinatorsAddr = wait(ccs.tryResolveHostnames()); std::set coordinatorsAddrSet(coordinatorsAddr.begin(), coordinatorsAddr.end()); for (const auto& worker : workers) { // Note : only considers second address for coordinators, // as we use primary addresses from storage and tlog interfaces above NetworkAddress primary = worker.interf.address(); Optional secondary = worker.interf.tLog.getEndpoint().addresses.secondaryAddress; if (coordinatorsAddrSet.find(primary) != coordinatorsAddrSet.end() || (secondary.present() && (coordinatorsAddrSet.find(secondary.get()) != coordinatorsAddrSet.end()))) { if (result.count(primary)) { ASSERT(workersMap[primary].id() == result[primary].first.id()); result[primary].second.append(",coord"); } else { result[primary] = std::make_pair(workersMap[primary], "coord"); } } } if (SERVER_KNOBS->SNAPSHOT_ALL_STATEFUL_PROCESSES) { for (const auto& worker : workers) { const auto& processAddress = worker.interf.address(); // skip processes that are already included if (result.count(processAddress)) continue; const auto& processClassType = worker.processClass.classType(); // coordinators are always configured to be recruited if (processClassType == ProcessClass::StorageClass) { result[processAddress] = std::make_pair(worker.interf, "storage"); TraceEvent(SevInfo, "SnapUnRecruitedStorageProcess").detail("ProcessAddress", processAddress); } else if (processClassType == ProcessClass::TransactionClass || processClassType == ProcessClass::LogClass) { result[processAddress] = std::make_pair(worker.interf, "tlog"); TraceEvent(SevInfo, "SnapUnRecruitedLogProcess").detail("ProcessAddress", processAddress); } } } return result; } catch (Error& e) { wait(tr.onError(e)); result.clear(); } } } ACTOR Future ddSnapCreateCore(DistributorSnapRequest snapReq, Reference const> db) { state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, LockAware::True); state ReadYourWritesTransaction tr(cx); loop { try { tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr.setOption(FDBTransactionOptions::LOCK_AWARE); TraceEvent("SnapDataDistributor_WriteFlagAttempt") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); tr.set(writeRecoveryKey, writeRecoveryKeyTrue); wait(tr.commit()); break; } catch (Error& e) { TraceEvent("SnapDataDistributor_WriteFlagError").error(e); wait(tr.onError(e)); } } TraceEvent("SnapDataDistributor_SnapReqEnter") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); try { // disable tlog pop on local tlog nodes state std::vector tlogs = db->get().logSystemConfig.allLocalLogs(false); std::vector> disablePops; disablePops.reserve(tlogs.size()); for (const auto& tlog : tlogs) { disablePops.push_back(sendSnapReq( tlog.disablePopRequest, TLogDisablePopRequest{ snapReq.snapUID }, snap_disable_tlog_pop_failed())); } wait(waitForAll(disablePops)); TraceEvent("SnapDataDistributor_AfterDisableTLogPop") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); state int storageFaultTolerance; // snap stateful nodes state std::map> statefulWorkers = wait(transformErrors(getStatefulWorkers(cx, db, &tlogs, &storageFaultTolerance), snap_storage_failed())); TraceEvent("SnapDataDistributor_GotStatefulWorkers") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID) .detail("StorageFaultTolerance", storageFaultTolerance); // we need to snapshot storage nodes before snapshot any tlogs std::vector>> storageSnapReqs; for (const auto& [addr, entry] : statefulWorkers) { auto& [interf, role] = entry; if (role.find("storage") != std::string::npos) storageSnapReqs.push_back(trySendSnapReq( interf.workerSnapReq, WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, "storage"_sr))); } wait(waitForMost(storageSnapReqs, storageFaultTolerance, snap_storage_failed())); TraceEvent("SnapDataDistributor_AfterSnapStorage") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); std::vector>> tLogSnapReqs; tLogSnapReqs.reserve(tlogs.size()); for (const auto& [addr, entry] : statefulWorkers) { auto& [interf, role] = entry; if (role.find("tlog") != std::string::npos) tLogSnapReqs.push_back(trySendSnapReq( interf.workerSnapReq, WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, "tlog"_sr))); } wait(waitForMost(tLogSnapReqs, 0, snap_tlog_failed())); TraceEvent("SnapDataDistributor_AfterTLogStorage") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); // enable tlog pop on local tlog nodes std::vector> enablePops; enablePops.reserve(tlogs.size()); for (const auto& tlog : tlogs) { enablePops.push_back(sendSnapReq( tlog.enablePopRequest, TLogEnablePopRequest{ snapReq.snapUID }, snap_enable_tlog_pop_failed())); } wait(waitForAll(enablePops)); TraceEvent("SnapDataDistributor_AfterEnableTLogPops") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); std::vector>> coordSnapReqs; for (const auto& [addr, entry] : statefulWorkers) { auto& [interf, role] = entry; if (role.find("coord") != std::string::npos) coordSnapReqs.push_back(trySendSnapReq( interf.workerSnapReq, WorkerSnapRequest(snapReq.snapPayload, snapReq.snapUID, "coord"_sr))); } auto const coordFaultTolerance = std::min(std::max(0, coordSnapReqs.size() / 2 - 1), SERVER_KNOBS->MAX_COORDINATOR_SNAPSHOT_FAULT_TOLERANCE); wait(waitForMost(coordSnapReqs, coordFaultTolerance, snap_coord_failed())); TraceEvent("SnapDataDistributor_AfterSnapCoords") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); tr.reset(); loop { try { tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr.setOption(FDBTransactionOptions::LOCK_AWARE); TraceEvent("SnapDataDistributor_ClearFlagAttempt") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); tr.clear(writeRecoveryKey); wait(tr.commit()); break; } catch (Error& e) { TraceEvent("SnapDataDistributor_ClearFlagError").error(e); wait(tr.onError(e)); } } } catch (Error& err) { state Error e = err; TraceEvent("SnapDataDistributor_SnapReqExit") .errorUnsuppressed(e) .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); if (e.code() == error_code_snap_storage_failed || e.code() == error_code_snap_tlog_failed || e.code() == error_code_operation_cancelled || e.code() == error_code_snap_disable_tlog_pop_failed) { // enable tlog pop on local tlog nodes std::vector tlogs = db->get().logSystemConfig.allLocalLogs(false); try { std::vector> enablePops; enablePops.reserve(tlogs.size()); for (const auto& tlog : tlogs) { enablePops.push_back(transformErrors( throwErrorOr(tlog.enablePopRequest.tryGetReply(TLogEnablePopRequest(snapReq.snapUID))), snap_enable_tlog_pop_failed())); } wait(waitForAll(enablePops)); } catch (Error& error) { TraceEvent(SevDebug, "IgnoreEnableTLogPopFailure").log(); } } throw e; } return Void(); } ACTOR Future ddSnapCreate( DistributorSnapRequest snapReq, Reference const> db, DDEnabledState* ddEnabledState, std::map* ddSnapMap /* ongoing snapshot requests */, std::map>* ddSnapResultMap /* finished snapshot requests, expired in SNAP_MINIMUM_TIME_GAP seconds */) { state Future dbInfoChange = db->onChange(); if (!ddEnabledState->setDDEnabled(false, snapReq.snapUID)) { // disable DD before doing snapCreate, if previous snap req has already disabled DD then this operation fails // here TraceEvent("SnapDDSetDDEnabledFailedInMemoryCheck").detail("SnapUID", snapReq.snapUID); ddSnapMap->at(snapReq.snapUID).reply.sendError(operation_failed()); ddSnapMap->erase(snapReq.snapUID); (*ddSnapResultMap)[snapReq.snapUID] = ErrorOr(operation_failed()); return Void(); } try { choose { when(wait(dbInfoChange)) { TraceEvent("SnapDDCreateDBInfoChanged") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); ddSnapMap->at(snapReq.snapUID).reply.sendError(snap_with_recovery_unsupported()); ddSnapMap->erase(snapReq.snapUID); (*ddSnapResultMap)[snapReq.snapUID] = ErrorOr(snap_with_recovery_unsupported()); } when(wait(ddSnapCreateCore(snapReq, db))) { TraceEvent("SnapDDCreateSuccess") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); ddSnapMap->at(snapReq.snapUID).reply.send(Void()); ddSnapMap->erase(snapReq.snapUID); (*ddSnapResultMap)[snapReq.snapUID] = ErrorOr(Void()); } when(wait(delay(SERVER_KNOBS->SNAP_CREATE_MAX_TIMEOUT))) { TraceEvent("SnapDDCreateTimedOut") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); ddSnapMap->at(snapReq.snapUID).reply.sendError(timed_out()); ddSnapMap->erase(snapReq.snapUID); (*ddSnapResultMap)[snapReq.snapUID] = ErrorOr(timed_out()); } } } catch (Error& e) { TraceEvent("SnapDDCreateError") .errorUnsuppressed(e) .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); if (e.code() != error_code_operation_cancelled) { ddSnapMap->at(snapReq.snapUID).reply.sendError(e); ddSnapMap->erase(snapReq.snapUID); (*ddSnapResultMap)[snapReq.snapUID] = ErrorOr(e); } else { // enable DD should always succeed bool success = ddEnabledState->setDDEnabled(true, snapReq.snapUID); ASSERT(success); throw e; } } // enable DD should always succeed bool success = ddEnabledState->setDDEnabled(true, snapReq.snapUID); ASSERT(success); return Void(); } ACTOR Future ddExclusionSafetyCheck(DistributorExclusionSafetyCheckRequest req, Reference self, Database cx) { TraceEvent("DDExclusionSafetyCheckBegin", self->ddId).log(); std::vector ssis = wait(getStorageServers(cx)); DistributorExclusionSafetyCheckReply reply(true); if (!self->teamCollection) { TraceEvent("DDExclusionSafetyCheckTeamCollectionInvalid", self->ddId).log(); reply.safe = false; req.reply.send(reply); return Void(); } // If there is only 1 team, unsafe to mark failed: team building can get stuck due to lack of servers left if (self->teamCollection->teams.size() <= 1) { TraceEvent("DDExclusionSafetyCheckNotEnoughTeams", self->ddId).log(); reply.safe = false; req.reply.send(reply); return Void(); } std::vector excludeServerIDs; // Go through storage server interfaces and translate Address -> server ID (UID) for (const AddressExclusion& excl : req.exclusions) { for (const auto& ssi : ssis) { if (excl.excludes(ssi.address()) || (ssi.secondaryAddress().present() && excl.excludes(ssi.secondaryAddress().get()))) { excludeServerIDs.push_back(ssi.id()); } } } reply.safe = self->teamCollection->exclusionSafetyCheck(excludeServerIDs); TraceEvent("DDExclusionSafetyCheckFinish", self->ddId).log(); req.reply.send(reply); return Void(); } ACTOR Future waitFailCacheServer(Database* db, StorageServerInterface ssi) { state Transaction tr(*db); state Key key = storageCacheServerKey(ssi.id()); wait(waitFailureClient(ssi.waitFailure)); loop { tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); try { tr.addReadConflictRange(storageCacheServerKeys); tr.clear(key); wait(tr.commit()); break; } catch (Error& e) { wait(tr.onError(e)); } } return Void(); } ACTOR Future cacheServerWatcher(Database* db) { state Transaction tr(*db); state ActorCollection actors(false); state std::set knownCaches; loop { tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); try { RangeResult range = wait(tr.getRange(storageCacheServerKeys, CLIENT_KNOBS->TOO_MANY)); ASSERT(!range.more); std::set caches; for (auto& kv : range) { UID id; BinaryReader reader{ kv.key.removePrefix(storageCacheServersPrefix), Unversioned() }; reader >> id; caches.insert(id); if (knownCaches.find(id) == knownCaches.end()) { StorageServerInterface ssi; BinaryReader reader{ kv.value, IncludeVersion() }; reader >> ssi; actors.add(waitFailCacheServer(db, ssi)); } } knownCaches = std::move(caches); tr.reset(); wait(delay(5.0) || actors.getResult()); ASSERT(!actors.getResult().isReady()); } catch (Error& e) { wait(tr.onError(e)); } } } static int64_t getMedianShardSize(VectorRef metricVec) { std::nth_element(metricVec.begin(), metricVec.begin() + metricVec.size() / 2, metricVec.end(), [](const DDMetricsRef& d1, const DDMetricsRef& d2) { return d1.shardBytes < d2.shardBytes; }); return metricVec[metricVec.size() / 2].shardBytes; } GetStorageWigglerStateReply getStorageWigglerStates(Reference self) { GetStorageWigglerStateReply reply; if (self->teamCollection) { std::tie(reply.primary, reply.lastStateChangePrimary) = self->teamCollection->getStorageWigglerState(); if (self->teamCollection->teamCollections.size() > 1) { std::tie(reply.remote, reply.lastStateChangeRemote) = self->teamCollection->teamCollections[1]->getStorageWigglerState(); } } return reply; } ACTOR Future ddGetMetrics(GetDataDistributorMetricsRequest req, PromiseStream getShardMetricsList) { ErrorOr>> result = wait( errorOr(brokenPromiseToNever(getShardMetricsList.getReply(GetMetricsListRequest(req.keys, req.shardLimit))))); if (result.isError()) { req.reply.sendError(result.getError()); } else { GetDataDistributorMetricsReply rep; if (!req.midOnly) { rep.storageMetricsList = result.get(); } else { auto& metricVec = result.get(); if (metricVec.empty()) rep.midShardSize = 0; else { rep.midShardSize = getMedianShardSize(metricVec.contents()); } } req.reply.send(rep); } return Void(); } ACTOR Future dataDistributor(DataDistributorInterface di, Reference const> db) { state Reference self(new DataDistributor(db, di.id())); state Future collection = actorCollection(self->addActor.getFuture()); state PromiseStream getShardMetricsList; state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, LockAware::True); state ActorCollection actors(false); state DDEnabledState ddEnabledState; state std::map ddSnapReqMap; state std::map> ddSnapReqResultMap; self->addActor.send(actors.getResult()); self->addActor.send(traceRole(Role::DATA_DISTRIBUTOR, di.id())); try { TraceEvent("DataDistributorRunning", di.id()); self->addActor.send(waitFailureServer(di.waitFailure.getFuture())); self->addActor.send(cacheServerWatcher(&cx)); state Future distributor = reportErrorsExcept(dataDistribution(self, getShardMetricsList, &ddEnabledState), "DataDistribution", di.id(), &normalDataDistributorErrors()); loop choose { when(wait(distributor || collection)) { ASSERT(false); throw internal_error(); } when(HaltDataDistributorRequest req = waitNext(di.haltDataDistributor.getFuture())) { req.reply.send(Void()); TraceEvent("DataDistributorHalted", di.id()).detail("ReqID", req.requesterID); break; } when(GetDataDistributorMetricsRequest req = waitNext(di.dataDistributorMetrics.getFuture())) { actors.add(ddGetMetrics(req, getShardMetricsList)); } when(DistributorSnapRequest snapReq = waitNext(di.distributorSnapReq.getFuture())) { auto& snapUID = snapReq.snapUID; if (ddSnapReqResultMap.count(snapUID)) { CODE_PROBE(true, "Data distributor received a duplicate finished snapshot request"); auto result = ddSnapReqResultMap[snapUID]; result.isError() ? snapReq.reply.sendError(result.getError()) : snapReq.reply.send(result.get()); TraceEvent("RetryFinishedDistributorSnapRequest") .detail("SnapUID", snapUID) .detail("Result", result.isError() ? result.getError().code() : 0); } else if (ddSnapReqMap.count(snapReq.snapUID)) { CODE_PROBE(true, "Data distributor received a duplicate ongoing snapshot request"); TraceEvent("RetryOngoingDistributorSnapRequest").detail("SnapUID", snapUID); ASSERT(snapReq.snapPayload == ddSnapReqMap[snapUID].snapPayload); ddSnapReqMap[snapUID] = snapReq; } else { ddSnapReqMap[snapUID] = snapReq; actors.add(ddSnapCreate(snapReq, db, &ddEnabledState, &ddSnapReqMap, &ddSnapReqResultMap)); auto* ddSnapReqResultMapPtr = &ddSnapReqResultMap; actors.add(fmap( [ddSnapReqResultMapPtr, snapUID](Void _) { ddSnapReqResultMapPtr->erase(snapUID); return Void(); }, delay(SERVER_KNOBS->SNAP_MINIMUM_TIME_GAP))); } } when(DistributorExclusionSafetyCheckRequest exclCheckReq = waitNext(di.distributorExclCheckReq.getFuture())) { actors.add(ddExclusionSafetyCheck(exclCheckReq, self, cx)); } when(GetStorageWigglerStateRequest req = waitNext(di.storageWigglerState.getFuture())) { req.reply.send(getStorageWigglerStates(self)); } } } catch (Error& err) { if (normalDataDistributorErrors().count(err.code()) == 0) { TraceEvent("DataDistributorError", di.id()).errorUnsuppressed(err); throw err; } TraceEvent("DataDistributorDied", di.id()).errorUnsuppressed(err); } return Void(); } namespace data_distribution_test { inline DDShardInfo doubleToNoLocationShardInfo(double d, bool hasDest) { DDShardInfo res(doubleToTestKey(d), anonymousShardId, anonymousShardId); res.primarySrc.emplace_back((uint64_t)d, 0); if (hasDest) { res.primaryDest.emplace_back((uint64_t)d + 1, 0); res.hasDest = true; } return res; } inline int getRandomShardCount() { #if defined(USE_SANITIZER) return deterministicRandom()->randomInt(1000, 24000); // 24000 * MAX_SHARD_SIZE = 12TB #else return deterministicRandom()->randomInt(1000, CLIENT_KNOBS->TOO_MANY); // 2000000000; OOM #endif } } // namespace data_distribution_test TEST_CASE("/DataDistribution/StorageWiggler/Order") { StorageWiggler wiggler(nullptr); double startTime = now() - SERVER_KNOBS->DD_STORAGE_WIGGLE_MIN_SS_AGE_SEC - 0.4; wiggler.addServer(UID(1, 0), StorageMetadataType(startTime, KeyValueStoreType::SSD_BTREE_V2)); wiggler.addServer(UID(2, 0), StorageMetadataType(startTime + 0.1, KeyValueStoreType::MEMORY, true)); wiggler.addServer(UID(3, 0), StorageMetadataType(startTime + 0.2, KeyValueStoreType::SSD_ROCKSDB_V1, true)); wiggler.addServer(UID(4, 0), StorageMetadataType(startTime + 0.3, KeyValueStoreType::SSD_BTREE_V2)); std::vector correctOrder{ UID(2, 0), UID(3, 0), UID(1, 0), UID(4, 0) }; for (int i = 0; i < correctOrder.size(); ++i) { auto id = wiggler.getNextServerId(); std::cout << "Get " << id.get().shortString() << "\n"; ASSERT(id == correctOrder[i]); } ASSERT(!wiggler.getNextServerId().present()); return Void(); } TEST_CASE("/DataDistribution/Initialization/ResumeFromShard") { state Reference const> dbInfo; state Reference self(new DataDistributor(dbInfo, UID())); self->shardsAffectedByTeamFailure = makeReference(); self->initData = makeReference(); self->configuration.usableRegions = 1; self->configuration.storageTeamSize = 1; // add DDShardInfo self->shardsAffectedByTeamFailure->setCheckMode( ShardsAffectedByTeamFailure::CheckMode::ForceNoCheck); // skip check when build int shardNum = data_distribution_test::getRandomShardCount(); std::cout << "generating " << shardNum << " shards...\n"; for (int i = 1; i <= SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM; ++i) { self->initData->shards.emplace_back(data_distribution_test::doubleToNoLocationShardInfo(i, true)); } for (int i = SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM + 1; i <= shardNum; ++i) { self->initData->shards.emplace_back(data_distribution_test::doubleToNoLocationShardInfo(i, false)); } self->initData->shards.emplace_back(DDShardInfo(allKeys.end)); std::cout << "Start resuming...\n"; wait(DataDistributor::resumeFromShards(self, false)); std::cout << "Start validation...\n"; auto relocateFuture = self->relocationProducer.getFuture(); for (int i = 0; i < SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM; ++i) { ASSERT(relocateFuture.isReady()); auto rs = relocateFuture.pop(); ASSERT(rs.isRestore() == false); ASSERT(rs.cancelled == false); ASSERT(rs.dataMoveId == anonymousShardId); ASSERT(rs.priority == SERVER_KNOBS->PRIORITY_RECOVER_MOVE); // std::cout << rs.keys.begin.toString() << " " << self->initData->shards[i].key.toString() << " \n"; ASSERT(rs.keys.begin.compare(self->initData->shards[i].key) == 0); ASSERT(rs.keys.end == self->initData->shards[i + 1].key); } self->shardsAffectedByTeamFailure->setCheckMode(ShardsAffectedByTeamFailure::CheckMode::ForceCheck); self->shardsAffectedByTeamFailure->check(); return Void(); }