diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index b32b65f64d..109ed3318c 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -516,7 +516,6 @@ ACTOR Future monitorBatchLimitedTime(Reference cons // Runs the data distribution algorithm for FDB, including the DD Queue, DD tracker, and DD team collection ACTOR Future dataDistribution(Reference self, PromiseStream getShardMetricsList, - FutureStream>> getMinReplicasRemaining, const DDEnabledState* ddEnabledState) { state double lastLimited = 0; self->addActor.send(monitorBatchLimitedTime(self->dbInfo, &lastLimited)); @@ -755,7 +754,6 @@ ACTOR Future dataDistribution(Reference self, lock, getAverageShardBytes, getUnhealthyRelocationCount.getFuture(), - getMinReplicasRemaining, self->ddId, storageTeamSize, configuration.storageTeamSize, @@ -915,24 +913,6 @@ Future> trySendSnapReq(RequestStream stream, Req req) { return ErrorOr(Void()); } -// Returns the number of storage snapshot failures we can ignore while still still successfully snapshotting storage -// servers -ACTOR static Future getTolerableFailedStorageSnapshots( - Database cx, - PromiseStream>> getMinReplicasRemaining) { - Promise> minReplicasRemainingPromise; - getMinReplicasRemaining.send(minReplicasRemainingPromise); - state Optional minReplicasRemaining = wait(minReplicasRemainingPromise.getFuture()); - DatabaseConfiguration configuration = wait(getDatabaseConfiguration(cx)); - auto faultTolerance = - std::min(SERVER_KNOBS->MAX_STORAGE_SNAPSHOT_FAULT_TOLERANCE, configuration.storageTeamSize); - if (minReplicasRemaining.present()) { - TEST(minReplicasRemaining.get() == 0); // Some data has 0 replicas across all storage servers - return std::min(faultTolerance, std::max(minReplicasRemaining.get() - 1, 0)); - } - return faultTolerance; -} - ACTOR static Future waitForMost(std::vector>> futures, int faultTolerance, Error e, @@ -951,9 +931,7 @@ ACTOR static Future waitForMost(std::vector>> futures return Void(); } -ACTOR Future ddSnapCreateCore(DistributorSnapRequest snapReq, - Reference const> db, - PromiseStream>> getMinReplicasRemaining) { +ACTOR Future ddSnapCreateCore(DistributorSnapRequest snapReq, Reference const> db) { state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, LockAware::True); state ReadYourWritesTransaction tr(cx); @@ -990,9 +968,16 @@ ACTOR Future ddSnapCreateCore(DistributorSnapRequest snapReq, .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); // snap local storage nodes - state int storageFaultTolerance = wait(getTolerableFailedStorageSnapshots(cx, getMinReplicasRemaining)); - std::vector storageWorkers = + state DatabaseConfiguration configuration = wait(getDatabaseConfiguration(cx)); + std::pair, int> storageWorkersAndFailures = wait(transformErrors(getStorageWorkers(cx, db, true /* localOnly */), snap_storage_failed())); + const auto& [storageWorkers, storageFailures] = storageWorkersAndFailures; + auto const storageFaultTolerance = + static_cast(SERVER_KNOBS->MAX_STORAGE_SNAPSHOT_FAULT_TOLERANCE) - storageFailures; + if (storageFaultTolerance < 0) { + TEST(true); // Too many failed storage servers to complete snapshot + throw snap_storage_failed(); + } TraceEvent("SnapDataDistributor_GotStorageWorkers") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); @@ -1095,7 +1080,6 @@ ACTOR Future ddSnapCreateCore(DistributorSnapRequest snapReq, ACTOR Future ddSnapCreate(DistributorSnapRequest snapReq, Reference const> db, - PromiseStream>> getMinReplicasRemaining, DDEnabledState* ddEnabledState) { state Future dbInfoChange = db->onChange(); if (!ddEnabledState->setDDEnabled(false, snapReq.snapUID)) { @@ -1114,7 +1098,7 @@ ACTOR Future ddSnapCreate(DistributorSnapRequest snapReq, .detail("SnapUID", snapReq.snapUID); snapReq.reply.sendError(snap_with_recovery_unsupported()); } - when(wait(ddSnapCreateCore(snapReq, db, getMinReplicasRemaining))) { + when(wait(ddSnapCreateCore(snapReq, db))) { TraceEvent("SnapDDCreateSuccess") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); @@ -1269,7 +1253,6 @@ ACTOR Future dataDistributor(DataDistributorInterface di, Reference self(new DataDistributorData(db, di.id())); state Future collection = actorCollection(self->addActor.getFuture()); state PromiseStream getShardMetricsList; - state PromiseStream>> getMinReplicasRemaining; state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, LockAware::True); state ActorCollection actors(false); state DDEnabledState ddEnabledState; @@ -1280,11 +1263,11 @@ ACTOR Future dataDistributor(DataDistributorInterface di, ReferenceaddActor.send(waitFailureServer(di.waitFailure.getFuture())); self->addActor.send(cacheServerWatcher(&cx)); - state Future distributor = reportErrorsExcept( - dataDistribution(self, getShardMetricsList, getMinReplicasRemaining.getFuture(), &ddEnabledState), - "DataDistribution", - di.id(), - &normalDataDistributorErrors()); + state Future distributor = + reportErrorsExcept(dataDistribution(self, getShardMetricsList, &ddEnabledState), + "DataDistribution", + di.id(), + &normalDataDistributorErrors()); loop choose { when(wait(distributor || collection)) { @@ -1300,7 +1283,7 @@ ACTOR Future dataDistributor(DataDistributorInterface di, Reference dataDistributionQueue(Database cx, MoveKeysLock lock, PromiseStream> getAverageShardBytes, FutureStream> getUnhealthyRelocationCount, - FutureStream>> getMinRemainingReplicas, UID distributorId, int teamSize, int singleRegionTeamSize, diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp index 755e096a52..49eab48d50 100644 --- a/fdbserver/DataDistributionQueue.actor.cpp +++ b/fdbserver/DataDistributionQueue.actor.cpp @@ -1037,21 +1037,6 @@ struct DDQueueData { } return highestPriority; } - - // Returns the minimum number of replicas remaining on any team. - // If no replicas are missing, return Optional{}, indicating that - // we can fall back to DatabaseConfiguration::storageTeamSize - Optional getMinReplicasRemaining() const { - auto const highestPriority = getHighestPriorityRelocation(); - if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_0_LEFT) { - return 0; - } else if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_1_LEFT) { - return 1; - } else if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_2_LEFT) { - return 2; - } - return {}; - } }; static std::string destServersString(std::vector, bool>> const& bestTeams) { @@ -1723,7 +1708,6 @@ ACTOR Future dataDistributionQueue(Database cx, MoveKeysLock lock, PromiseStream> getAverageShardBytes, FutureStream> getUnhealthyRelocationCount, - FutureStream>> getMinReplicasRemaining, UID distributorId, int teamSize, int singleRegionTeamSize, @@ -1851,12 +1835,9 @@ ACTOR Future dataDistributionQueue(Database cx, // DataDistributorData::movingDataEventHolder. The track latest key // we use here must match the key used in the holder. } - when(Promise> r = waitNext(getMinReplicasRemaining)) { - r.send(self.getMinReplicasRemaining()); - } - when(Promise r = waitNext(getUnhealthyRelocationCount)) { r.send(self.unhealthyRelocations); } when(wait(self.error.getFuture())) {} // Propagate errors from dataDistributionRelocator when(wait(waitForAll(balancingFutures))) {} + when(Promise r = waitNext(getUnhealthyRelocationCount)) { r.send(self.unhealthyRelocations); } } } } catch (Error& e) { diff --git a/fdbserver/QuietDatabase.actor.cpp b/fdbserver/QuietDatabase.actor.cpp index 68cccaa05c..b87ade6fa1 100644 --- a/fdbserver/QuietDatabase.actor.cpp +++ b/fdbserver/QuietDatabase.actor.cpp @@ -277,9 +277,8 @@ ACTOR Future> getStorageServers(Database cx, } } -ACTOR Future> getStorageWorkers(Database cx, - Reference const> dbInfo, - bool localOnly) { +ACTOR Future, int>> +getStorageWorkers(Database cx, Reference const> dbInfo, bool localOnly) { state std::vector servers = wait(getStorageServers(cx)); state std::map workersMap; std::vector workers = wait(getWorkers(dbInfo)); @@ -299,7 +298,9 @@ ACTOR Future> getStorageWorkers(Database cx, } auto masterDcId = dbInfo->get().master.locality.dcId(); - std::vector result; + std::pair, int> result; + auto& [workerInterfaces, failures] = result; + failures = 0; for (const auto& server : servers) { TraceEvent(SevDebug, "DcIdInfo") .detail("ServerLocalityID", server.locality.dcId()) @@ -310,9 +311,9 @@ ACTOR Future> getStorageWorkers(Database cx, TraceEvent(SevWarn, "GetStorageWorkers") .detail("Reason", "Could not find worker for storage server") .detail("SS", server.id()); - throw operation_failed(); + ++failures; } - result.push_back(itr->second); + workerInterfaces.push_back(itr->second); } } return result; diff --git a/fdbserver/QuietDatabase.h b/fdbserver/QuietDatabase.h index 3cd406fe49..5265608b4c 100644 --- a/fdbserver/QuietDatabase.h +++ b/fdbserver/QuietDatabase.h @@ -46,9 +46,11 @@ Future getMasterWorker(Database const& cx, Reference repairDeadDatacenter(Database const& cx, Reference const> const& dbInfo, std::string const& context); -Future> getStorageWorkers(Database const& cx, - Reference const> const& dbInfo, - bool const& localOnly); + +// Returns list of worker interfaces for available storage servers and the number of unavailable +// storage servers +Future, int>> +getStorageWorkers(Database const& cx, Reference const> const& dbInfo, bool const& localOnly); Future> getCoordWorkers(Database const& cx, Reference const> const& dbInfo); diff --git a/fdbserver/workloads/DiskFailureInjection.actor.cpp b/fdbserver/workloads/DiskFailureInjection.actor.cpp index 8f3338eb28..098d3f141a 100644 --- a/fdbserver/workloads/DiskFailureInjection.actor.cpp +++ b/fdbserver/workloads/DiskFailureInjection.actor.cpp @@ -154,9 +154,13 @@ struct DiskFailureInjectionWorkload : TestWorkload { loop { wait(poisson(&lastTime, 1)); try { - wait(store(machines, getStorageWorkers(cx, self->dbInfo, false))); + std::pair, int> m = wait(getStorageWorkers(cx, self->dbInfo, false)); + if (m.second > 0) { + throw operation_failed(); + } + machines = std::move(m.first); } catch (Error& e) { - // If we failed to get a list of storage servers, we can't inject failure events + // If we failed to get a complete list of storage servers, we can't inject failure events // But don't throw the error in that case continue; }