fix dd storage audit race between serving request and resumption

This commit is contained in:
Zhe Wang 2023-03-02 21:47:06 -08:00
parent 3d9f37d1d1
commit 0b6fa91056
2 changed files with 60 additions and 32 deletions

View File

@ -305,7 +305,7 @@ public:
Promise<Void> initialized;
std::unordered_map<AuditType, std::unordered_map<UID, std::shared_ptr<DDAudit>>> audits;
Future<Void> auditInitialized;
Promise<Void> auditInitialized;
Optional<Reference<TenantCache>> ddTenantCache;
@ -578,6 +578,26 @@ ACTOR Future<Void> doAuditOnStorageServer(Reference<DataDistributor> self,
StorageServerInterface ssi,
AuditStorageRequest req);
ACTOR Future<Void> resumeStorageAudits(Reference<DataDistributor> self) {
state std::vector<Future<Void>> fs;
for (const auto& auditState : self->initData->auditStates) {
if (self->audits.contains(auditState.getType())) {
if (self->audits[auditState.getType()].contains(auditState.id)) {
continue; // ignore outdated resume
}
}
if (auditState.getPhase() == AuditPhase::Complete || auditState.getPhase() == AuditPhase::Error) {
continue;
}
TraceEvent("ResumingAuditStorage", self->ddId).detail("AuditID", auditState.id);
fs.push_back(resumeAuditStorage(self, auditState));
}
wait(waitForAll(fs));
self->auditInitialized.send(Void());
TraceEvent("ResumingAuditStorageDone", self->ddId);
return Void();
}
// Periodically check and log the physicalShard status; clean up empty physicalShard;
ACTOR Future<Void> monitorPhysicalShardStatus(Reference<PhysicalShardCollection> self) {
ASSERT(SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA);
@ -609,6 +629,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
loop {
trackerCancelled = false;
self->initialized = Promise<Void>();
self->auditInitialized = Promise<Void>();
// Stored outside of data distribution tracker to avoid slow tasks
// when tracker is cancelled
@ -620,11 +641,6 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
// When/If this assertion fails, Evan owes Ben a pat on the back for his foresight
ASSERT(self->configuration.storageTeamSize > 0);
for (const auto& auditState : self->initData->auditStates) {
TraceEvent("ResumingAuditStorage", self->ddId).detail("AuditID", auditState.id);
self->addActor.send(resumeAuditStorage(self, auditState));
}
state PromiseStream<Promise<int64_t>> getAverageShardBytes;
state PromiseStream<Promise<int>> getUnhealthyRelocationCount;
state PromiseStream<GetMetricsRequest> getShardMetrics;
@ -661,6 +677,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
anyZeroHealthyTeams = zeroHealthyTeams[0];
}
actors.push_back(resumeStorageAudits(self));
actors.push_back(self->pollMoveKeysLock());
actors.push_back(reportErrorsExcept(dataDistributionTracker(self->initData,
self->txnProcessor,
@ -1378,47 +1396,54 @@ ACTOR Future<Void> ddGetMetrics(GetDataDistributorMetricsRequest req,
}
ACTOR Future<Void> resumeAuditStorage(Reference<DataDistributor> self, AuditStorageState auditState) {
if (auditState.getPhase() == AuditPhase::Complete) {
return Void();
}
state std::shared_ptr<DDAudit> audit =
std::make_shared<DDAudit>(auditState.id, auditState.range, auditState.getType());
self->audits[auditState.getType()][audit->id] = audit;
audit->actors.add(loadAndDispatchAuditRange(self, audit, auditState.range));
TraceEvent(SevDebug, "DDResumAuditStorageBegin", self->ddId)
TraceEvent(SevDebug, "DDResumeAuditStorageBegin", self->ddId)
.detail("AuditID", audit->id)
.detail("Range", auditState.range)
.detail("AuditType", auditState.type);
try {
wait(audit->actors.getResult());
TraceEvent(SevDebug, "DDResumeAuditStorageEnd", self->ddId)
.detail("AuditID", audit->id)
.detail("Range", auditState.range)
.detail("AuditType", auditState.type);
auditState.setPhase(AuditPhase::Complete);
wait(persistAuditState(self->txnProcessor->context(), auditState));
} catch (Error& e) {
TraceEvent(SevInfo, "DDResumeAuditStorageOperationError", self->ddId)
.errorUnsuppressed(e)
.detail("AuditID", audit->id)
.detail("Range", auditState.range)
.detail("AuditType", auditState.type);
if (e.code() == error_code_audit_storage_error) {
auditState.setPhase(AuditPhase::Error);
state int retryTime = 0;
loop {
try {
wait(audit->actors.getResult());
TraceEvent(SevDebug, "DDResumeAuditStorageEnd", self->ddId)
.detail("AuditID", audit->id)
.detail("Range", auditState.range)
.detail("AuditType", auditState.type);
auditState.setPhase(AuditPhase::Complete);
wait(persistAuditState(self->txnProcessor->context(), auditState));
} else if (e.code() != error_code_actor_cancelled) {
wait(delay(30));
self->addActor.send(resumeAuditStorage(self, auditState));
} catch (Error& e) {
TraceEvent(SevInfo, "DDResumeAuditStorageOperationError", self->ddId)
.errorUnsuppressed(e)
.detail("AuditID", audit->id)
.detail("Range", auditState.range)
.detail("AuditType", auditState.type);
if (e.code() == error_code_audit_storage_error) {
auditState.setPhase(AuditPhase::Error);
wait(persistAuditState(self->txnProcessor->context(), auditState));
} else if (retryTime > 10) {
auditState.setPhase(AuditPhase::Failed);
wait(persistAuditState(self->txnProcessor->context(), auditState));
} else if (e.code() != error_code_actor_cancelled) {
wait(delay(30));
retryTime++;
continue;
}
}
break;
}
self->audits[auditState.getType()].erase(audit->id);
return Void();
}
ACTOR Future<Void> auditStorage(Reference<DataDistributor> self, TriggerAuditRequest req) {
std::vector<Future<Void>> fs;
fs.push_back(self->auditInitialized.getFuture());
fs.push_back(self->initialized.getFuture());
wait(waitForAll(fs));
state std::shared_ptr<DDAudit> audit;
// TODO: store AuditStorageState in DDAudit.
state AuditStorageState auditState;

View File

@ -125,6 +125,7 @@ struct ValidateStorage : TestWorkload {
throw audit_storage_failed();
}
ASSERT(auditId_ != auditId);
TraceEvent("TestValidateEnd").detail("AuditID", auditId_);
break;
} catch (Error& e) {
TraceEvent(SevWarn, "StartAuditStorageError").errorUnsuppressed(e);
@ -132,6 +133,8 @@ struct ValidateStorage : TestWorkload {
}
}
TraceEvent("TestValidateEndAll");
return Void();
}