From 91930b8040922cab24d25b0a0f91c1f8c5bed892 Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Thu, 7 Apr 2022 23:23:23 -0700 Subject: [PATCH] Remove getMinReplicasRemaining PromiseStream. Instead, in order to enforce the maximum fault tolerance for snapshots, update getStorageWorkers to return the number of unavailable storage servers (instead of throwing an error when unavailable storage servers exist). --- fdbserver/DataDistribution.actor.cpp | 51 +++++++------------ fdbserver/DataDistribution.actor.h | 1 - fdbserver/DataDistributionQueue.actor.cpp | 21 +------- fdbserver/QuietDatabase.actor.cpp | 13 ++--- fdbserver/QuietDatabase.h | 8 +-- .../workloads/DiskFailureInjection.actor.cpp | 8 ++- 6 files changed, 36 insertions(+), 66 deletions(-) diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index b32b65f64d..109ed3318c 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -516,7 +516,6 @@ ACTOR Future monitorBatchLimitedTime(Reference cons // Runs the data distribution algorithm for FDB, including the DD Queue, DD tracker, and DD team collection ACTOR Future dataDistribution(Reference self, PromiseStream getShardMetricsList, - FutureStream>> getMinReplicasRemaining, const DDEnabledState* ddEnabledState) { state double lastLimited = 0; self->addActor.send(monitorBatchLimitedTime(self->dbInfo, &lastLimited)); @@ -755,7 +754,6 @@ ACTOR Future dataDistribution(Reference self, lock, getAverageShardBytes, getUnhealthyRelocationCount.getFuture(), - getMinReplicasRemaining, self->ddId, storageTeamSize, configuration.storageTeamSize, @@ -915,24 +913,6 @@ Future> trySendSnapReq(RequestStream stream, Req req) { return ErrorOr(Void()); } -// Returns the number of storage snapshot failures we can ignore while still still successfully snapshotting storage -// servers -ACTOR static Future getTolerableFailedStorageSnapshots( - Database cx, - PromiseStream>> getMinReplicasRemaining) { - Promise> minReplicasRemainingPromise; - getMinReplicasRemaining.send(minReplicasRemainingPromise); - state Optional minReplicasRemaining = wait(minReplicasRemainingPromise.getFuture()); - DatabaseConfiguration configuration = wait(getDatabaseConfiguration(cx)); - auto faultTolerance = - std::min(SERVER_KNOBS->MAX_STORAGE_SNAPSHOT_FAULT_TOLERANCE, configuration.storageTeamSize); - if (minReplicasRemaining.present()) { - TEST(minReplicasRemaining.get() == 0); // Some data has 0 replicas across all storage servers - return std::min(faultTolerance, std::max(minReplicasRemaining.get() - 1, 0)); - } - return faultTolerance; -} - ACTOR static Future waitForMost(std::vector>> futures, int faultTolerance, Error e, @@ -951,9 +931,7 @@ ACTOR static Future waitForMost(std::vector>> futures return Void(); } -ACTOR Future ddSnapCreateCore(DistributorSnapRequest snapReq, - Reference const> db, - PromiseStream>> getMinReplicasRemaining) { +ACTOR Future ddSnapCreateCore(DistributorSnapRequest snapReq, Reference const> db) { state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, LockAware::True); state ReadYourWritesTransaction tr(cx); @@ -990,9 +968,16 @@ ACTOR Future ddSnapCreateCore(DistributorSnapRequest snapReq, .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); // snap local storage nodes - state int storageFaultTolerance = wait(getTolerableFailedStorageSnapshots(cx, getMinReplicasRemaining)); - std::vector storageWorkers = + state DatabaseConfiguration configuration = wait(getDatabaseConfiguration(cx)); + std::pair, int> storageWorkersAndFailures = wait(transformErrors(getStorageWorkers(cx, db, true /* localOnly */), snap_storage_failed())); + const auto& [storageWorkers, storageFailures] = storageWorkersAndFailures; + auto const storageFaultTolerance = + static_cast(SERVER_KNOBS->MAX_STORAGE_SNAPSHOT_FAULT_TOLERANCE) - storageFailures; + if (storageFaultTolerance < 0) { + TEST(true); // Too many failed storage servers to complete snapshot + throw snap_storage_failed(); + } TraceEvent("SnapDataDistributor_GotStorageWorkers") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); @@ -1095,7 +1080,6 @@ ACTOR Future ddSnapCreateCore(DistributorSnapRequest snapReq, ACTOR Future ddSnapCreate(DistributorSnapRequest snapReq, Reference const> db, - PromiseStream>> getMinReplicasRemaining, DDEnabledState* ddEnabledState) { state Future dbInfoChange = db->onChange(); if (!ddEnabledState->setDDEnabled(false, snapReq.snapUID)) { @@ -1114,7 +1098,7 @@ ACTOR Future ddSnapCreate(DistributorSnapRequest snapReq, .detail("SnapUID", snapReq.snapUID); snapReq.reply.sendError(snap_with_recovery_unsupported()); } - when(wait(ddSnapCreateCore(snapReq, db, getMinReplicasRemaining))) { + when(wait(ddSnapCreateCore(snapReq, db))) { TraceEvent("SnapDDCreateSuccess") .detail("SnapPayload", snapReq.snapPayload) .detail("SnapUID", snapReq.snapUID); @@ -1269,7 +1253,6 @@ ACTOR Future dataDistributor(DataDistributorInterface di, Reference self(new DataDistributorData(db, di.id())); state Future collection = actorCollection(self->addActor.getFuture()); state PromiseStream getShardMetricsList; - state PromiseStream>> getMinReplicasRemaining; state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, LockAware::True); state ActorCollection actors(false); state DDEnabledState ddEnabledState; @@ -1280,11 +1263,11 @@ ACTOR Future dataDistributor(DataDistributorInterface di, ReferenceaddActor.send(waitFailureServer(di.waitFailure.getFuture())); self->addActor.send(cacheServerWatcher(&cx)); - state Future distributor = reportErrorsExcept( - dataDistribution(self, getShardMetricsList, getMinReplicasRemaining.getFuture(), &ddEnabledState), - "DataDistribution", - di.id(), - &normalDataDistributorErrors()); + state Future distributor = + reportErrorsExcept(dataDistribution(self, getShardMetricsList, &ddEnabledState), + "DataDistribution", + di.id(), + &normalDataDistributorErrors()); loop choose { when(wait(distributor || collection)) { @@ -1300,7 +1283,7 @@ ACTOR Future dataDistributor(DataDistributorInterface di, Reference dataDistributionQueue(Database cx, MoveKeysLock lock, PromiseStream> getAverageShardBytes, FutureStream> getUnhealthyRelocationCount, - FutureStream>> getMinRemainingReplicas, UID distributorId, int teamSize, int singleRegionTeamSize, diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp index 755e096a52..49eab48d50 100644 --- a/fdbserver/DataDistributionQueue.actor.cpp +++ b/fdbserver/DataDistributionQueue.actor.cpp @@ -1037,21 +1037,6 @@ struct DDQueueData { } return highestPriority; } - - // Returns the minimum number of replicas remaining on any team. - // If no replicas are missing, return Optional{}, indicating that - // we can fall back to DatabaseConfiguration::storageTeamSize - Optional getMinReplicasRemaining() const { - auto const highestPriority = getHighestPriorityRelocation(); - if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_0_LEFT) { - return 0; - } else if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_1_LEFT) { - return 1; - } else if (highestPriority >= SERVER_KNOBS->PRIORITY_TEAM_2_LEFT) { - return 2; - } - return {}; - } }; static std::string destServersString(std::vector, bool>> const& bestTeams) { @@ -1723,7 +1708,6 @@ ACTOR Future dataDistributionQueue(Database cx, MoveKeysLock lock, PromiseStream> getAverageShardBytes, FutureStream> getUnhealthyRelocationCount, - FutureStream>> getMinReplicasRemaining, UID distributorId, int teamSize, int singleRegionTeamSize, @@ -1851,12 +1835,9 @@ ACTOR Future dataDistributionQueue(Database cx, // DataDistributorData::movingDataEventHolder. The track latest key // we use here must match the key used in the holder. } - when(Promise> r = waitNext(getMinReplicasRemaining)) { - r.send(self.getMinReplicasRemaining()); - } - when(Promise r = waitNext(getUnhealthyRelocationCount)) { r.send(self.unhealthyRelocations); } when(wait(self.error.getFuture())) {} // Propagate errors from dataDistributionRelocator when(wait(waitForAll(balancingFutures))) {} + when(Promise r = waitNext(getUnhealthyRelocationCount)) { r.send(self.unhealthyRelocations); } } } } catch (Error& e) { diff --git a/fdbserver/QuietDatabase.actor.cpp b/fdbserver/QuietDatabase.actor.cpp index 68cccaa05c..b87ade6fa1 100644 --- a/fdbserver/QuietDatabase.actor.cpp +++ b/fdbserver/QuietDatabase.actor.cpp @@ -277,9 +277,8 @@ ACTOR Future> getStorageServers(Database cx, } } -ACTOR Future> getStorageWorkers(Database cx, - Reference const> dbInfo, - bool localOnly) { +ACTOR Future, int>> +getStorageWorkers(Database cx, Reference const> dbInfo, bool localOnly) { state std::vector servers = wait(getStorageServers(cx)); state std::map workersMap; std::vector workers = wait(getWorkers(dbInfo)); @@ -299,7 +298,9 @@ ACTOR Future> getStorageWorkers(Database cx, } auto masterDcId = dbInfo->get().master.locality.dcId(); - std::vector result; + std::pair, int> result; + auto& [workerInterfaces, failures] = result; + failures = 0; for (const auto& server : servers) { TraceEvent(SevDebug, "DcIdInfo") .detail("ServerLocalityID", server.locality.dcId()) @@ -310,9 +311,9 @@ ACTOR Future> getStorageWorkers(Database cx, TraceEvent(SevWarn, "GetStorageWorkers") .detail("Reason", "Could not find worker for storage server") .detail("SS", server.id()); - throw operation_failed(); + ++failures; } - result.push_back(itr->second); + workerInterfaces.push_back(itr->second); } } return result; diff --git a/fdbserver/QuietDatabase.h b/fdbserver/QuietDatabase.h index 3cd406fe49..5265608b4c 100644 --- a/fdbserver/QuietDatabase.h +++ b/fdbserver/QuietDatabase.h @@ -46,9 +46,11 @@ Future getMasterWorker(Database const& cx, Reference repairDeadDatacenter(Database const& cx, Reference const> const& dbInfo, std::string const& context); -Future> getStorageWorkers(Database const& cx, - Reference const> const& dbInfo, - bool const& localOnly); + +// Returns list of worker interfaces for available storage servers and the number of unavailable +// storage servers +Future, int>> +getStorageWorkers(Database const& cx, Reference const> const& dbInfo, bool const& localOnly); Future> getCoordWorkers(Database const& cx, Reference const> const& dbInfo); diff --git a/fdbserver/workloads/DiskFailureInjection.actor.cpp b/fdbserver/workloads/DiskFailureInjection.actor.cpp index 8f3338eb28..098d3f141a 100644 --- a/fdbserver/workloads/DiskFailureInjection.actor.cpp +++ b/fdbserver/workloads/DiskFailureInjection.actor.cpp @@ -154,9 +154,13 @@ struct DiskFailureInjectionWorkload : TestWorkload { loop { wait(poisson(&lastTime, 1)); try { - wait(store(machines, getStorageWorkers(cx, self->dbInfo, false))); + std::pair, int> m = wait(getStorageWorkers(cx, self->dbInfo, false)); + if (m.second > 0) { + throw operation_failed(); + } + machines = std::move(m.first); } catch (Error& e) { - // If we failed to get a list of storage servers, we can't inject failure events + // If we failed to get a complete list of storage servers, we can't inject failure events // But don't throw the error in that case continue; }