mirror of
https://github.com/apple/foundationdb.git
synced 2025-06-02 03:12:12 +08:00
adding implementation and check for blob worker exclusion (#9700)
This commit is contained in:
parent
adda32db46
commit
c7c41bc9db
@ -50,6 +50,7 @@
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/BlobGranuleValidation.actor.h"
|
||||
#include "fdbserver/BlobGranuleServerCommon.actor.h"
|
||||
#include "fdbserver/ExclusionTracker.actor.h"
|
||||
#include "fdbserver/QuietDatabase.h"
|
||||
#include "fdbserver/WaitFailure.h"
|
||||
#include "fdbserver/WorkerInterface.actor.h"
|
||||
@ -407,6 +408,8 @@ struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
|
||||
Debouncer restartRecruiting;
|
||||
std::set<NetworkAddress> recruitingLocalities; // the addrs of the workers being recruited on
|
||||
AsyncVar<int> recruitingStream;
|
||||
ExclusionTracker exclusionTracker;
|
||||
|
||||
Promise<Void> foundBlobWorkers;
|
||||
Promise<Void> doneRecovering;
|
||||
Promise<Void> loadedClientRanges;
|
||||
@ -430,7 +433,8 @@ struct BlobManagerData : NonCopyable, ReferenceCounted<BlobManagerData> {
|
||||
mergeCandidates(MergeCandidateInfo(MergeCandidateUnknown), normalKeys.end),
|
||||
activeGranuleMerges(invalidVersion, normalKeys.end), forcePurgingRanges(false, normalKeys.end),
|
||||
concurrentMergeChecks(SERVER_KNOBS->BLOB_MANAGER_CONCURRENT_MERGE_CHECKS),
|
||||
restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY), recruitingStream(0), epoch(epoch) {}
|
||||
restartRecruiting(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY), recruitingStream(0), exclusionTracker(db),
|
||||
epoch(epoch) {}
|
||||
|
||||
// only initialize blob store if actually needed
|
||||
void initBStore() {
|
||||
@ -3101,20 +3105,36 @@ ACTOR Future<Void> monitorBlobWorker(Reference<BlobManagerData> bmData, BlobWork
|
||||
try {
|
||||
state Future<Void> waitFailure = waitFailureClient(bwInterf.waitFailure, SERVER_KNOBS->BLOB_WORKER_TIMEOUT);
|
||||
state Future<Void> monitorStatus = monitorBlobWorkerStatus(bmData, bwInterf);
|
||||
// set to already run future so we check this first loop
|
||||
state Future<Void> exclusionsChanged = Future<Void>(Void());
|
||||
|
||||
choose {
|
||||
when(wait(waitFailure)) {
|
||||
if (SERVER_KNOBS->BLOB_WORKER_DISK_ENABLED) {
|
||||
wait(delay(SERVER_KNOBS->BLOB_WORKER_REJOIN_TIME));
|
||||
loop {
|
||||
choose {
|
||||
when(wait(waitFailure)) {
|
||||
if (SERVER_KNOBS->BLOB_WORKER_DISK_ENABLED) {
|
||||
wait(delay(SERVER_KNOBS->BLOB_WORKER_REJOIN_TIME));
|
||||
}
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("BM {0} detected BW {1} is dead\n", bmData->epoch, bwInterf.id().toString());
|
||||
}
|
||||
TraceEvent("BlobWorkerFailed", bmData->id).detail("BlobWorkerID", bwInterf.id());
|
||||
break;
|
||||
}
|
||||
if (BM_DEBUG) {
|
||||
fmt::print("BM {0} detected BW {1} is dead\n", bmData->epoch, bwInterf.id().toString());
|
||||
when(wait(monitorStatus)) {
|
||||
// should only return when manager got replaced
|
||||
ASSERT(!bmData->iAmReplaced.canBeSet());
|
||||
break;
|
||||
}
|
||||
when(wait(exclusionsChanged)) {
|
||||
// check to see if we were just excluded
|
||||
if (bmData->exclusionTracker.isFailedOrExcluded(bwInterf.stableAddress())) {
|
||||
TraceEvent("BlobWorkerExcluded", bmData->id)
|
||||
.detail("BlobWorkerID", bwInterf.id())
|
||||
.detail("Addr", bwInterf.stableAddress());
|
||||
break;
|
||||
}
|
||||
exclusionsChanged = bmData->exclusionTracker.changed.onTrigger();
|
||||
}
|
||||
TraceEvent("BlobWorkerFailed", bmData->id).detail("BlobWorkerID", bwInterf.id());
|
||||
}
|
||||
when(wait(monitorStatus)) {
|
||||
// should only return when manager got replaced
|
||||
ASSERT(!bmData->iAmReplaced.canBeSet());
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
@ -3175,14 +3195,19 @@ ACTOR Future<Void> checkBlobWorkerList(Reference<BlobManagerData> bmData, Promis
|
||||
bool foundAnyNew = false;
|
||||
for (auto& worker : blobWorkers) {
|
||||
if (!bmData->deadWorkers.count(worker.id())) {
|
||||
bool isFailedOrExcluded = bmData->exclusionTracker.isFailedOrExcluded(worker.stableAddress());
|
||||
if (!bmData->workerAddresses.count(worker.stableAddress()) &&
|
||||
worker.locality.dcId() == bmData->dcId) {
|
||||
worker.locality.dcId() == bmData->dcId && !isFailedOrExcluded) {
|
||||
bmData->workerAddresses.insert(worker.stableAddress());
|
||||
bmData->workersById[worker.id()] = worker;
|
||||
bmData->workerStats[worker.id()] = BlobWorkerInfo();
|
||||
bmData->addActor.send(monitorBlobWorker(bmData, worker));
|
||||
foundAnyNew = true;
|
||||
} else if (!bmData->workersById.count(worker.id())) {
|
||||
TraceEvent("KillingExtraneousBlobWorker", bmData->id)
|
||||
.detail("WorkerId", worker.id())
|
||||
.detail("Addr", worker.stableAddress())
|
||||
.detail("FailedOrExcluded", isFailedOrExcluded);
|
||||
bmData->addActor.send(killBlobWorker(bmData, worker, false));
|
||||
}
|
||||
}
|
||||
@ -4066,7 +4091,22 @@ ACTOR Future<Void> blobWorkerRecruiter(
|
||||
recruitReq.excludeAddresses.emplace_back(AddressExclusion(addr.ip, addr.port));
|
||||
}
|
||||
|
||||
TraceEvent("BMRecruiting", self->id).detail("Epoch", self->epoch).detail("State", "Sending request to CC");
|
||||
// don't recruit on excluded or failed addresses
|
||||
CODE_PROBE(!self->exclusionTracker.excluded.empty(), "ignoring excluded hosts in BM recruitment");
|
||||
CODE_PROBE(!self->exclusionTracker.failed.empty(), "ignoring failed hosts in BM recruitment");
|
||||
|
||||
for (auto addr : self->exclusionTracker.excluded) {
|
||||
recruitReq.excludeAddresses.push_back(addr);
|
||||
}
|
||||
|
||||
for (auto addr : self->exclusionTracker.failed) {
|
||||
recruitReq.excludeAddresses.push_back(addr);
|
||||
}
|
||||
|
||||
TraceEvent("BMRecruiting", self->id)
|
||||
.detail("Epoch", self->epoch)
|
||||
.detail("ExcludedCount", recruitReq.excludeAddresses.size())
|
||||
.detail("State", "Sending request to CC");
|
||||
|
||||
if (!fCandidateWorker.isValid() || fCandidateWorker.isReady() ||
|
||||
recruitReq.excludeAddresses != lastRequest.excludeAddresses) {
|
||||
@ -4090,6 +4130,9 @@ ACTOR Future<Void> blobWorkerRecruiter(
|
||||
|
||||
// signal used to restart the loop and try to recruit the next blob worker
|
||||
when(wait(self->restartRecruiting.onTrigger())) {}
|
||||
|
||||
// signal used to restart the loop and update request to CC with new exclusions
|
||||
when(wait(self->exclusionTracker.changed.onTrigger())) {}
|
||||
}
|
||||
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY, TaskPriority::BlobManager));
|
||||
} catch (Error& e) {
|
||||
|
@ -46,6 +46,11 @@ struct ExclusionTracker {
|
||||
ExclusionTracker() {}
|
||||
ExclusionTracker(Database db) : db(db) { trackerFuture = tracker(this); }
|
||||
|
||||
bool isFailedOrExcluded(NetworkAddress addr) {
|
||||
AddressExclusion addrExclusion(addr.ip, addr.port);
|
||||
return excluded.count(addrExclusion) || failed.count(addrExclusion);
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> tracker(ExclusionTracker* self) {
|
||||
// Fetch the list of excluded servers
|
||||
state ReadYourWritesTransaction tr(self->db);
|
||||
|
@ -1243,6 +1243,9 @@ struct ConsistencyCheckWorkload : TestWorkload {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} else if (blobWorkersByAddr[addr] > 0) {
|
||||
TraceEvent("ConsistencyCheck_BWOnExcludedAddr").detail("Address", addr);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return numBlobWorkerProcesses > 0;
|
||||
|
Loading…
x
Reference in New Issue
Block a user