From 0f9e88572a262e73ee745f05fa1c1a1bc5fa5698 Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Thu, 17 Mar 2022 14:40:34 -0500 Subject: [PATCH] Cleaning up debugging and fixing race in blob manager recruitment --- fdbserver/BlobManager.actor.cpp | 7 +-- fdbserver/ClusterController.actor.cpp | 1 + fdbserver/worker.actor.cpp | 74 +++++++++++++++++---------- 3 files changed, 52 insertions(+), 30 deletions(-) diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index 28fcab7856..2b6a4da2bf 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -2735,17 +2735,18 @@ ACTOR Future blobManager(BlobManagerInterface bmInterf, if (BM_DEBUG) { fmt::print("BM {} exiting because it is replaced\n", self->epoch); } + TraceEvent("BlobManagerReplaced", bmInterf.id()).detail("Epoch", epoch); break; } when(HaltBlobManagerRequest req = waitNext(bmInterf.haltBlobManager.getFuture())) { req.reply.send(Void()); - TraceEvent("BlobManagerHalted", bmInterf.id()).detail("ReqID", req.requesterID); + TraceEvent("BlobManagerHalted", bmInterf.id()).detail("Epoch", epoch).detail("ReqID", req.requesterID); break; } when(state HaltBlobGranulesRequest req = waitNext(bmInterf.haltBlobGranules.getFuture())) { wait(haltBlobGranules(self)); req.reply.send(Void()); - TraceEvent("BlobGranulesHalted", bmInterf.id()).detail("ReqID", req.requesterID); + TraceEvent("BlobGranulesHalted", bmInterf.id()).detail("Epoch", epoch).detail("ReqID", req.requesterID); break; } when(BlobManagerExclusionSafetyCheckRequest exclCheckReq = @@ -2753,7 +2754,7 @@ ACTOR Future blobManager(BlobManagerInterface bmInterf, blobManagerExclusionSafetyCheck(self, exclCheckReq); } when(wait(collection)) { - TraceEvent("BlobManagerActorCollectionError"); + TraceEvent(SevError, "BlobManagerActorCollectionError"); ASSERT(false); throw internal_error(); } diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index 899522f229..a7169cc259 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -2123,6 +2123,7 @@ ACTOR Future getNextBMEpoch(ClusterControllerData* self) { tr->set(blobManagerEpochKey, blobManagerEpochValueFor(newEpoch)); wait(tr->commit()); + TraceEvent(SevDebug, "CCNextBlobManagerEpoch", self->id).detail("Epoch", newEpoch); return newEpoch; } catch (Error& e) { wait(tr->onError(e)); diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index ff6adde754..913ed139fb 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -524,20 +524,21 @@ std::vector getDiskStores(std::string folder) { // Register the worker interf to cluster controller (cc) and // re-register the worker when key roles interface, e.g., cc, dd, ratekeeper, change. -ACTOR Future registrationClient(Reference> const> ccInterface, - WorkerInterface interf, - Reference> asyncPriorityInfo, - ProcessClass initialClass, - Reference> const> ddInterf, - Reference> const> rkInterf, - Reference> const> bmInterf, - Reference> const> ekpInterf, - Reference const> degraded, - Reference connRecord, - Reference> const> issues, - Reference configNode, - Reference localConfig, - Reference> dbInfo) { +ACTOR Future registrationClient( + Reference> const> ccInterface, + WorkerInterface interf, + Reference> asyncPriorityInfo, + ProcessClass initialClass, + Reference> const> ddInterf, + Reference> const> rkInterf, + Reference>> const> bmInterf, + Reference> const> ekpInterf, + Reference const> degraded, + Reference connRecord, + Reference> const> issues, + Reference configNode, + Reference localConfig, + Reference> dbInfo) { // Keeps the cluster controller (as it may be re-elected) informed that this worker exists // The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply // (requiring us to re-register) The registration request piggybacks optional distributor interface if it exists. @@ -567,7 +568,8 @@ ACTOR Future registrationClient(Referenceget(), rkInterf->get(), - bmInterf->get(), + bmInterf->get().present() ? bmInterf->get().get().second + : Optional(), ekpInterf->get(), degraded->get(), localConfig->lastSeenVersion(), @@ -1374,6 +1376,24 @@ ACTOR Future chaosMetricsLogger() { } } +// like genericactors setWhenDoneOrError, but we need to take into account the bm epoch. We don't want to reset it if +// this manager was replaced by a later manager (with a higher epoch) on this worker +ACTOR Future resetBlobManagerWhenDoneOrError( + Future blobManagerProcess, + Reference>>> var, + int64_t epoch) { + try { + wait(blobManagerProcess); + } catch (Error& e) { + if (e.code() == error_code_actor_cancelled) + throw; + } + if (var->get().present() && var->get().get().first == epoch) { + var->set(Optional>()); + } + return Void(); +} + ACTOR Future workerServer(Reference connRecord, Reference> const> ccInterface, LocalityData locality, @@ -1395,8 +1415,8 @@ ACTOR Future workerServer(Reference connRecord, state Reference>> ddInterf( new AsyncVar>()); state Reference>> rkInterf(new AsyncVar>()); - state Reference>> bmInterf(new AsyncVar>()); - state int64_t myBMEpoch = -1; + state Reference>>> bmEpochAndInterf( + new AsyncVar>>()); state Reference>> ekpInterf( new AsyncVar>()); state Future handleErrors = workerHandleErrors(errors.getFuture()); // Needs to be stopped last @@ -1672,7 +1692,7 @@ ACTOR Future workerServer(Reference connRecord, initialClass, ddInterf, rkInterf, - bmInterf, + bmEpochAndInterf, ekpInterf, degraded, connRecord, @@ -1874,8 +1894,8 @@ ACTOR Future workerServer(Reference connRecord, BlobManagerInterface recruited(locality, req.reqId); recruited.initEndpoints(); - if (bmInterf->get().present() && myBMEpoch == req.epoch) { - recruited = bmInterf->get().get(); + if (bmEpochAndInterf->get().present() && bmEpochAndInterf->get().get().first == req.epoch) { + recruited = bmEpochAndInterf->get().get().second; TEST(true); // Recruited while already a blob manager. } else { @@ -1884,7 +1904,6 @@ ACTOR Future workerServer(Reference connRecord, // Also, not halting lets us handle the case here where the last BM had a higher // epoch and somehow the epochs got out of order by a delayed initialize request. The one we start // here will just halt on the lock check. - myBMEpoch = req.epoch; startRole(Role::BLOB_MANAGER, recruited.id(), interf.id()); DUMPTOKEN(recruited.waitFailure); DUMPTOKEN(recruited.haltBlobManager); @@ -1892,12 +1911,13 @@ ACTOR Future workerServer(Reference connRecord, DUMPTOKEN(recruited.blobManagerExclCheckReq); Future blobManagerProcess = blobManager(recruited, dbInfo, req.epoch); - errorForwarders.add(forwardError( - errors, - Role::BLOB_MANAGER, - recruited.id(), - setWhenDoneOrError(blobManagerProcess, bmInterf, Optional()))); - bmInterf->set(Optional(recruited)); + errorForwarders.add( + forwardError(errors, + Role::BLOB_MANAGER, + recruited.id(), + resetBlobManagerWhenDoneOrError(blobManagerProcess, bmEpochAndInterf, req.epoch))); + bmEpochAndInterf->set( + Optional>(std::pair(req.epoch, recruited))); } TraceEvent("BlobManagerReceived", req.reqId).detail("BlobManagerId", recruited.id()); req.reply.send(recruited);