FastRestore:Applier:Avoid extra copy in getAndComputeStagingKeys

This commit is contained in:
Meng Xu 2020-04-08 12:21:53 -07:00
parent f500353368
commit 2325ab209f
3 changed files with 13 additions and 6 deletions

View File

@ -320,8 +320,7 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
// Get keys in stagingKeys which does not have a baseline key by reading database cx, and precompute the key's value
std::vector<Future<Void>> fGetAndComputeKeys;
std::vector<std::map<Key, std::map<Key, StagingKey>::iterator>> incompleteStagingKeysBuf(1);
std::map<Key, std::map<Key, StagingKey>::iterator>& incompleteStagingKeys = incompleteStagingKeysBuf.back();
std::map<Key, std::map<Key, StagingKey>::iterator> incompleteStagingKeys;
std::map<Key, StagingKey>::iterator stagingKeyIter = batchData->stagingKeys.begin();
int numKeysInBatch = 0;
for (; stagingKeyIter != batchData->stagingKeys.end(); stagingKeyIter++) {
@ -333,8 +332,7 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
if (numKeysInBatch == SERVER_KNOBS->FASTRESTORE_APPLIER_FETCH_KEYS_SIZE) {
fGetAndComputeKeys.push_back(getAndComputeStagingKeys(incompleteStagingKeys, cx, applierID));
numKeysInBatch = 0;
incompleteStagingKeysBuf.push_back(std::map<Key, std::map<Key, StagingKey>::iterator>());
incompleteStagingKeys = incompleteStagingKeysBuf.back();
incompleteStagingKeys.clear();
}
}
if (numKeysInBatch > 0) {

View File

@ -279,10 +279,18 @@ Future<Void> getBatchReplies(RequestStream<Request> Interface::*channel, std::ma
state std::vector<Future<REPLY_TYPE(Request)>> ongoingReplies;
state std::vector<int> ongoingRepliesIndex;
state int loopCount = 0;
loop {
ongoingReplies.clear();
ongoingRepliesIndex.clear();
for (int i = 0; i < cmdReplies.size(); ++i) {
TraceEvent(SevInfo, "FastRestoreGetBatchReplies")
.detail("Requests", requests.size())
.detail("OutstandingReplies", oustandingReplies)
.detail("ReplyIndex", i)
.detail("ReplyReady", cmdReplies[i].isReady())
.detail("RequestNode", requests[i].first)
.detail("Request", requests[i].second.toString());
if (!cmdReplies[i].isReady()) { // still wait for reply
ongoingReplies.push_back(cmdReplies[i]);
ongoingRepliesIndex.push_back(i);

View File

@ -518,10 +518,11 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
batchIndex, asset, prevVersion, commitVersion.version, isRangeFile,
applierMutationsBuffer[applierID], applierSubsBuffer[applierID]));
}
TraceEvent(SevDebug, "FastRestore_SendMutationToApplier")
TraceEvent(SevDebug, "FastRestoreLoaderSendMutationToApplier")
.detail("PrevVersion", prevVersion)
.detail("CommitVersion", commitVersion.toString())
.detail("RestoreAsset", asset.toString());
.detail("RestoreAsset", asset.toString())
.detail("Requests", requests.size());
ASSERT(prevVersion < commitVersion.version);
prevVersion = commitVersion.version;
wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests,