Fixing BGVerifyBalance test killing issues

This commit is contained in:
Josh Slocum 2022-02-25 11:30:21 -06:00
parent 623db663dc
commit bc7cc984b0
4 changed files with 78 additions and 7 deletions

View File

@ -2759,6 +2759,38 @@ ACTOR Future<Void> doLockChecks(Reference<BlobManagerData> bmData) {
}
}
ACTOR Future<Void> blobManagerExclusionSafetyCheck(Reference<BlobManagerData> self,
BlobManagerExclusionSafetyCheckRequest req) {
TraceEvent("BMExclusionSafetyCheckBegin", self->id).log();
BlobManagerExclusionSafetyCheckReply reply(true);
// make sure at least one blob worker remains after exclusions
if (self->workersById.empty()) {
TraceEvent("BMExclusionSafetyCheckNoWorkers", self->id).log();
reply.safe = false;
} else {
// TODO REMOVE prints
std::set<UID> remainingWorkers;
for (auto& worker : self->workersById) {
remainingWorkers.insert(worker.first);
}
for (const AddressExclusion& excl : req.exclusions) {
for (auto& worker : self->workersById) {
if (excl.excludes(worker.second.address())) {
remainingWorkers.erase(worker.first);
}
}
}
TraceEvent("BMExclusionSafetyChecked", self->id).detail("RemainingWorkers", remainingWorkers.size()).log();
reply.safe = !remainingWorkers.empty();
}
TraceEvent("BMExclusionSafetyCheckEnd", self->id).log();
req.reply.send(reply);
return Void();
}
ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
int64_t epoch) {
@ -2814,6 +2846,10 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
TraceEvent("BlobGranulesHalted", bmInterf.id()).detail("ReqID", req.requesterID);
break;
}
when(BlobManagerExclusionSafetyCheckRequest exclCheckReq =
waitNext(bmInterf.blobManagerExclCheckReq.getFuture())) {
self->addActor.send(blobManagerExclusionSafetyCheck(self, exclCheckReq));
}
when(wait(collection)) {
TraceEvent("BlobManagerActorCollectionError");
ASSERT(false);

View File

@ -31,6 +31,7 @@ struct BlobManagerInterface {
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream<struct HaltBlobManagerRequest> haltBlobManager;
RequestStream<struct HaltBlobGranulesRequest> haltBlobGranules;
RequestStream<struct BlobManagerExclusionSafetyCheckRequest> blobManagerExclCheckReq;
struct LocalityData locality;
UID myId;
@ -45,7 +46,7 @@ struct BlobManagerInterface {
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, waitFailure, haltBlobManager, haltBlobGranules, locality, myId);
serializer(ar, waitFailure, haltBlobManager, haltBlobGranules, blobManagerExclCheckReq, locality, myId);
}
};
@ -77,4 +78,32 @@ struct HaltBlobGranulesRequest {
}
};
struct BlobManagerExclusionSafetyCheckReply {
constexpr static FileIdentifier file_identifier = 8068627;
bool safe;
BlobManagerExclusionSafetyCheckReply() : safe(false) {}
explicit BlobManagerExclusionSafetyCheckReply(bool safe) : safe(safe) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, safe);
}
};
struct BlobManagerExclusionSafetyCheckRequest {
constexpr static FileIdentifier file_identifier = 1996387;
std::vector<AddressExclusion> exclusions;
ReplyPromise<BlobManagerExclusionSafetyCheckReply> reply;
BlobManagerExclusionSafetyCheckRequest() {}
explicit BlobManagerExclusionSafetyCheckRequest(std::vector<AddressExclusion> exclusions)
: exclusions(exclusions) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, exclusions, reply);
}
};
#endif

View File

@ -1802,11 +1802,21 @@ ACTOR Future<Void> proxyCheckSafeExclusion(Reference<AsyncVar<ServerDBInfo> cons
return Void();
}
try {
state Future<ErrorOr<DistributorExclusionSafetyCheckReply>> safeFuture =
state Future<ErrorOr<DistributorExclusionSafetyCheckReply>> ddSafeFuture =
db->get().distributor.get().distributorExclCheckReq.tryGetReply(
DistributorExclusionSafetyCheckRequest(req.exclusions));
DistributorExclusionSafetyCheckReply _reply = wait(throwErrorOr(safeFuture));
DistributorExclusionSafetyCheckReply _reply = wait(throwErrorOr(ddSafeFuture));
reply.safe = _reply.safe;
if (db->get().blobManager.present()) {
TraceEvent("SafetyCheckCommitProxyBM").detail("BMID", db->get().blobManager.get().id());
state Future<ErrorOr<BlobManagerExclusionSafetyCheckReply>> bmSafeFuture =
db->get().blobManager.get().blobManagerExclCheckReq.tryGetReply(
BlobManagerExclusionSafetyCheckRequest(req.exclusions));
BlobManagerExclusionSafetyCheckReply _reply = wait(throwErrorOr(bmSafeFuture));
reply.safe &= _reply.safe;
} else {
TraceEvent("SafetyCheckCommitProxyNoBM");
}
} catch (Error& e) {
TraceEvent("SafetyCheckCommitProxyResponseError").error(e);
if (e.code() != error_code_operation_cancelled) {

View File

@ -29,10 +29,6 @@ testTitle = 'BlobGranuleVerifyBalance'
testDuration = 120.0
meanDelay = 10.0
[[test.workload]]
testName = 'Attrition'
testDuration = 120.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10