FastRestore:Fix uninitialized variable

This commit is contained in:
Meng Xu 2020-08-18 11:58:57 -07:00
parent 046260b9d7
commit 9b2f667bbe
3 changed files with 11 additions and 6 deletions

View File

@ -111,6 +111,7 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
Reference<RestoreApplierData> self) { Reference<RestoreApplierData> self) {
state Reference<ApplierBatchData> batchData = self->batch[req.batchIndex]; state Reference<ApplierBatchData> batchData = self->batch[req.batchIndex];
state bool printTrace = false; state bool printTrace = false;
state NotifiedVersion* curMsgIndex = nullptr;
ASSERT(batchData.isValid()); ASSERT(batchData.isValid());
ASSERT(self->finishedBatch.get() < req.batchIndex); ASSERT(self->finishedBatch.get() < req.batchIndex);
@ -134,14 +135,15 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
wait(isSchedulable(self, req.batchIndex, __FUNCTION__)); wait(isSchedulable(self, req.batchIndex, __FUNCTION__));
ASSERT(batchData.isValid()); ASSERT(batchData.isValid());
ASSERT(req.batchIndex > self->finishedBatch.get());
// Assume: processedFileState[req.asset] will not be erased while the actor is active. // Assume: processedFileState[req.asset] will not be erased while the actor is active.
// Note: Insert new items into processedFileState will not invalidate the reference. // Note: Insert new items into processedFileState will not invalidate the reference.
state NotifiedVersion& curMsgIndex = batchData->processedFileState[req.asset]; curMsgIndex = &batchData->processedFileState[req.asset];
wait(curMsgIndex.whenAtLeast(req.msgIndex - 1)); wait(curMsgIndex->whenAtLeast(req.msgIndex - 1));
batchData->vbState = ApplierVersionBatchState::RECEIVE_MUTATIONS; batchData->vbState = ApplierVersionBatchState::RECEIVE_MUTATIONS;
state bool isDuplicated = true; state bool isDuplicated = true;
if (curMsgIndex.get() == req.msgIndex - 1) { if (curMsgIndex->get() == req.msgIndex - 1) {
isDuplicated = false; isDuplicated = false;
for (int mIndex = 0; mIndex < req.versionedMutations.size(); mIndex++) { for (int mIndex = 0; mIndex < req.versionedMutations.size(); mIndex++) {
@ -169,14 +171,14 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
ASSERT(versionedMutation.mutation.type != MutationRef::SetVersionstampedKey && ASSERT(versionedMutation.mutation.type != MutationRef::SetVersionstampedKey &&
versionedMutation.mutation.type != MutationRef::SetVersionstampedValue); versionedMutation.mutation.type != MutationRef::SetVersionstampedValue);
} }
curMsgIndex.set(req.msgIndex); curMsgIndex->set(req.msgIndex);
} }
req.reply.send(RestoreCommonReply(self->id(), isDuplicated)); req.reply.send(RestoreCommonReply(self->id(), isDuplicated));
TraceEvent(printTrace ? SevInfo : SevFRDebugInfo, "FastRestoreApplierPhaseReceiveMutationsDone", self->id()) TraceEvent(printTrace ? SevInfo : SevFRDebugInfo, "FastRestoreApplierPhaseReceiveMutationsDone", self->id())
.detail("BatchIndex", req.batchIndex) .detail("BatchIndex", req.batchIndex)
.detail("RestoreAsset", req.asset.toString()) .detail("RestoreAsset", req.asset.toString())
.detail("ProcessedMessageIndex", curMsgIndex.get()) .detail("ProcessedMessageIndex", curMsgIndex->get())
.detail("Request", req.toString()); .detail("Request", req.toString());
return Void(); return Void();
} }

View File

@ -476,6 +476,7 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
state bool isDuplicated = true; state bool isDuplicated = true;
state bool printTrace = false; state bool printTrace = false;
ASSERT(batchData.isValid()); ASSERT(batchData.isValid());
ASSERT(req.batchIndex > self->finishedBatch.get());
bool paramExist = batchData->processedFileParams.find(req.param) != batchData->processedFileParams.end(); bool paramExist = batchData->processedFileParams.find(req.param) != batchData->processedFileParams.end();
bool isReady = paramExist ? batchData->processedFileParams[req.param].isReady() : false; bool isReady = paramExist ? batchData->processedFileParams[req.param].isReady() : false;
@ -563,6 +564,7 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
state bool isDuplicated = true; state bool isDuplicated = true;
ASSERT(batchData.isValid() && batchStatus.isValid()); ASSERT(batchData.isValid() && batchStatus.isValid());
ASSERT(req.batchIndex > self->finishedBatch.get());
TraceEvent("FastRestoreLoaderPhaseSendMutations", self->id()) TraceEvent("FastRestoreLoaderPhaseSendMutations", self->id())
.detail("BatchIndex", req.batchIndex) .detail("BatchIndex", req.batchIndex)
.detail("UseRangeFile", req.useRangeFile) .detail("UseRangeFile", req.useRangeFile)

View File

@ -93,7 +93,8 @@ struct LoaderBatchData : public ReferenceCounted<LoaderBatchData> {
oldLogMutations("OldLogMutations", cc) {} oldLogMutations("OldLogMutations", cc) {}
} counters; } counters;
explicit LoaderBatchData(UID nodeID, int batchIndex) : counters(this, nodeID, batchIndex), vbState(LoaderVersionBatchState::NOT_INIT) { explicit LoaderBatchData(UID nodeID, int batchIndex)
: counters(this, nodeID, batchIndex), vbState(LoaderVersionBatchState::NOT_INIT), loadFileReqs(0) {
pollMetrics = traceCounters(format("FastRestoreLoaderMetrics%d", batchIndex), nodeID, pollMetrics = traceCounters(format("FastRestoreLoaderMetrics%d", batchIndex), nodeID,
SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY, &counters.cc, SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY, &counters.cc,
nodeID.toString() + "/RestoreLoaderMetrics/" + std::to_string(batchIndex)); nodeID.toString() + "/RestoreLoaderMetrics/" + std::to_string(batchIndex));