FastRestore:Loader:Merge MutationsVec and LogMessageVersionVec into VersionedMutationsVec

Remove the actor that sends one mutation message batch in the previous commit,
because that actor no longer reduces the code complexity.
This commit is contained in:
Meng Xu 2020-04-21 22:04:42 -07:00
parent 9ade63a685
commit d21da5065a
4 changed files with 71 additions and 106 deletions

View File

@ -466,29 +466,27 @@ struct RestoreSendVersionedMutationsRequest : TimedRequest {
Version msgIndex; // Monitonically increasing index of mutation messages
bool isRangeFile;
MutationsVec mutations; // Mutations that may be at different versions parsed by one loader
LogMessageVersionVec mVersions; // (version, subversion) of each mutation in mutations field
VersionedMutationsVec
versionedMutations; // Versioned mutations that may be at different versions parsed by one loader
ReplyPromise<RestoreCommonReply> reply;
RestoreSendVersionedMutationsRequest() = default;
explicit RestoreSendVersionedMutationsRequest(int batchIndex, const RestoreAsset& asset, Version msgIndex,
bool isRangeFile, MutationsVec mutations,
LogMessageVersionVec mVersions)
: batchIndex(batchIndex), asset(asset), msgIndex(msgIndex), isRangeFile(isRangeFile), mutations(mutations),
mVersions(mVersions) {}
bool isRangeFile, VersionedMutationsVec versionedMutations)
: batchIndex(batchIndex), asset(asset), msgIndex(msgIndex), isRangeFile(isRangeFile),
versionedMutations(versionedMutations) {}
std::string toString() {
std::stringstream ss;
ss << "VersionBatchIndex:" << batchIndex << "RestoreAsset:" << asset.toString() << " msgIndex:" << msgIndex
<< " isRangeFile:" << isRangeFile << " mutations.size:" << mutations.size()
<< " mVersions.size:" << mVersions.size();
<< " isRangeFile:" << isRangeFile << " versionedMutations.size:" << versionedMutations.size();
return ss.str();
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, batchIndex, asset, msgIndex, isRangeFile, mutations, mVersions, reply);
serializer(ar, batchIndex, asset, msgIndex, isRangeFile, versionedMutations, reply);
}
};

View File

@ -128,29 +128,29 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
state bool isDuplicated = true;
if (curMsgIndex.get() == req.msgIndex - 1) {
isDuplicated = false;
ASSERT(req.mutations.size() == req.mVersions.size());
for (int mIndex = 0; mIndex < req.mutations.size(); mIndex++) {
const MutationRef& mutation = req.mutations[mIndex];
const LogMessageVersion mutationVersion(req.mVersions[mIndex]);
for (int mIndex = 0; mIndex < req.versionedMutations.size(); mIndex++) {
const VersionedMutation& versionedMutation = req.versionedMutations[mIndex];
TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseReceiveMutations", self->id())
.detail("RestoreAsset", req.asset.toString())
.detail("Version", mutationVersion.toString())
.detail("Version", versionedMutation.version.toString())
.detail("Index", mIndex)
.detail("MutationReceived", mutation.toString());
batchData->counters.receivedBytes += mutation.totalSize();
batchData->counters.receivedWeightedBytes += mutation.weightedTotalSize(); // atomicOp will be amplified
.detail("MutationReceived", versionedMutation.mutation.toString());
batchData->counters.receivedBytes += versionedMutation.mutation.totalSize();
batchData->counters.receivedWeightedBytes +=
versionedMutation.mutation.weightedTotalSize(); // atomicOp will be amplified
batchData->counters.receivedMutations += 1;
batchData->counters.receivedAtomicOps += isAtomicOp((MutationRef::Type)mutation.type) ? 1 : 0;
batchData->counters.receivedAtomicOps +=
isAtomicOp((MutationRef::Type)versionedMutation.mutation.type) ? 1 : 0;
// Sanity check
ASSERT_WE_THINK(req.asset.isInVersionRange(mutationVersion.version));
ASSERT_WE_THINK(req.asset.isInKeyRange(mutation));
ASSERT_WE_THINK(req.asset.isInVersionRange(versionedMutation.version.version));
ASSERT_WE_THINK(req.asset.isInKeyRange(versionedMutation.mutation));
// Note: Log and range mutations may be delivered out of order. Can we handle it?
batchData->addMutation(mutation, mutationVersion);
batchData->addMutation(versionedMutation.mutation, versionedMutation.version);
ASSERT(mutation.type != MutationRef::SetVersionstampedKey &&
mutation.type != MutationRef::SetVersionstampedValue);
ASSERT(versionedMutation.mutation.type != MutationRef::SetVersionstampedKey &&
versionedMutation.mutation.type != MutationRef::SetVersionstampedValue);
}
curMsgIndex.set(req.msgIndex);
}

View File

@ -443,36 +443,6 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
}
// TODO: Enable sending mutation batches out of order;
// TODO: Consolidate the three maps
ACTOR Future<Void> sendOneMutationBatchToAppliers(std::vector<UID>* applierIDs, int* batchIndex, RestoreAsset* asset,
Version* msgIndex, bool isRangeFile,
std::map<UID, MutationsVec>* applierMutationsBuffer,
std::map<UID, LogMessageVersionVec>* applierVersionsBuffer,
std::map<UID, double>* applierMutationsSize,
std::map<UID, RestoreApplierInterface>* pApplierInterfaces) {
std::vector<std::pair<UID, RestoreSendVersionedMutationsRequest>> requests;
for (const UID& applierID : *applierIDs) {
requests.emplace_back(applierID,
RestoreSendVersionedMutationsRequest(*batchIndex, *asset, *msgIndex, isRangeFile,
applierMutationsBuffer->at(applierID),
applierVersionsBuffer->at(applierID)));
}
TraceEvent(SevDebug, "FastRestoreLoaderSendMutationToApplier")
.detail("MessageIndex", *msgIndex)
.detail("RestoreAsset", asset->toString())
.detail("Requests", requests.size());
wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests,
TaskPriority::RestoreLoaderSendMutations));
++(*msgIndex);
for (auto& applierID : *applierIDs) {
applierMutationsBuffer->at(applierID) = MutationsVec();
applierVersionsBuffer->at(applierID) = LogMessageVersionVec();
applierMutationsSize->at(applierID) = 0.0;
}
return Void();
}
// Assume: kvOps data are from the same RestoreAsset.
// Input: pkvOps: versioned kv mutation for the asset in the version batch (batchIndex)
// isRangeFile: is pkvOps from range file? Let receiver (applier) know if the mutation is log mutation;
@ -510,17 +480,12 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
splitMutationIndex = 0;
kvCount = 0;
// applierMutationsBuffer is the mutation vector to be sent to each applier
// applierMutationsSize is buffered mutation vector size for each applier
state std::map<UID, MutationsVec> applierMutationsBuffer;
state std::map<UID, LogMessageVersionVec> applierVersionsBuffer;
state std::map<UID, double> applierMutationsSize;
// applierVersionedMutationsBuffer is the mutation-and-its-version vector to be sent to each applier
state std::map<UID, VersionedMutationsVec> applierVersionedMutationsBuffer;
state int mIndex = 0;
state LogMessageVersion commitVersion;
for (auto& applierID : applierIDs) {
applierMutationsBuffer[applierID] = MutationsVec();
applierVersionsBuffer[applierID] = LogMessageVersionVec();
applierMutationsSize[applierID] = 0.0;
applierVersionedMutationsBuffer[applierID] = VersionedMutationsVec();
}
for (kvOp = kvOps.begin(); kvOp != kvOps.end(); kvOp++) {
commitVersion = kvOp->first;
@ -555,9 +520,8 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
.detail("Version", commitVersion.toString())
.detail("Mutation", mutation.toString());
}
applierMutationsBuffer[applierID].push_back_deep(applierMutationsBuffer[applierID].arena(), mutation);
applierVersionsBuffer[applierID].push_back(applierVersionsBuffer[applierID].arena(), commitVersion);
applierMutationsSize[applierID] += mutation.expectedSize();
applierVersionedMutationsBuffer[applierID].push_back(
applierVersionedMutationsBuffer[applierID].arena(), VersionedMutation(mutation, commitVersion));
msgSize += mutation.expectedSize();
kvCount++;
@ -575,39 +539,31 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
.detail("Version", commitVersion.toString())
.detail("Mutation", kvm.toString());
}
applierMutationsBuffer[applierID].push_back_deep(applierMutationsBuffer[applierID].arena(), kvm);
applierVersionsBuffer[applierID].push_back(applierVersionsBuffer[applierID].arena(), commitVersion);
applierMutationsSize[applierID] += kvm.expectedSize();
applierVersionedMutationsBuffer[applierID].push_back(applierVersionedMutationsBuffer[applierID].arena(),
VersionedMutation(kvm, commitVersion));
msgSize += kvm.expectedSize();
}
// Batch mutations at multiple versions up to FASTRESTORE_LOADER_SEND_MUTATION_MSG_BYTES size
// to improve bandwidth from a loader to appliers
if (msgSize >= SERVER_KNOBS->FASTRESTORE_LOADER_SEND_MUTATION_MSG_BYTES) {
wait(sendOneMutationBatchToAppliers(&applierIDs, &batchIndex, &asset, &msgIndex, isRangeFile,
&applierMutationsBuffer, &applierVersionsBuffer,
&applierMutationsSize, pApplierInterfaces));
msgSize = 0; // Reset msgSize for next message
// for (const UID& applierID : applierIDs) {
// requests.emplace_back(applierID,
// RestoreSendVersionedMutationsRequest(batchIndex, asset, msgIndex, isRangeFile,
// applierMutationsBuffer[applierID],
// applierVersionsBuffer[applierID]));
// }
// TraceEvent(SevDebug, "FastRestoreLoaderSendMutationToApplier")
// .detail("MessageIndex", msgIndex)
// .detail("RestoreAsset", asset.toString())
// .detail("Requests", requests.size());
// wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests,
// TaskPriority::RestoreLoaderSendMutations));
// msgIndex++;
// msgSize = 0;
// requests.clear();
// for (auto& applierID : applierIDs) {
// applierMutationsBuffer[applierID] = MutationsVec();
// applierVersionsBuffer[applierID] = LogMessageVersionVec();
// applierMutationsSize[applierID] = 0.0;
// }
for (const UID& applierID : applierIDs) {
requests.emplace_back(
applierID, RestoreSendVersionedMutationsRequest(batchIndex, asset, msgIndex, isRangeFile,
applierVersionedMutationsBuffer[applierID]));
}
TraceEvent(SevDebug, "FastRestoreLoaderSendMutationToApplier")
.detail("MessageIndex", msgIndex)
.detail("RestoreAsset", asset.toString())
.detail("Requests", requests.size());
wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests,
TaskPriority::RestoreLoaderSendMutations));
msgIndex++;
msgSize = 0;
requests.clear();
for (auto& applierID : applierIDs) {
applierVersionedMutationsBuffer[applierID] = VersionedMutationsVec();
}
}
} // Mutations at the same LogMessageVersion
} // all versions of mutations in the same file
@ -615,21 +571,17 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
// Send the remaining mutations in the applierMutationsBuffer
if (msgSize > 0) {
// TODO: Sanity check each asset has been received exactly once!
wait(sendOneMutationBatchToAppliers(&applierIDs, &batchIndex, &asset, &msgIndex, isRangeFile,
&applierMutationsBuffer, &applierVersionsBuffer, &applierMutationsSize,
pApplierInterfaces));
msgSize = 0;
// for (const UID& applierID : applierIDs) {
// requests.emplace_back(applierID, RestoreSendVersionedMutationsRequest(
// batchIndex, asset, msgIndex, isRangeFile,
// applierMutationsBuffer[applierID], applierVersionsBuffer[applierID]));
// }
// TraceEvent(SevDebug, "FastRestoreLoaderSendMutationToApplier")
// .detail("MessageIndex", msgIndex)
// .detail("RestoreAsset", asset.toString())
// .detail("Requests", requests.size());
// wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests,
// TaskPriority::RestoreLoaderSendMutations));
for (const UID& applierID : applierIDs) {
requests.emplace_back(applierID,
RestoreSendVersionedMutationsRequest(batchIndex, asset, msgIndex, isRangeFile,
applierVersionedMutationsBuffer[applierID]));
}
TraceEvent(SevDebug, "FastRestoreLoaderSendMutationToApplier")
.detail("MessageIndex", msgIndex)
.detail("RestoreAsset", asset.toString())
.detail("Requests", requests.size());
wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests,
TaskPriority::RestoreLoaderSendMutations));
}
TraceEvent("FastRestore").detail("LoaderSendMutationOnAppliers", kvCount);

View File

@ -38,8 +38,23 @@
//#define SevFRMutationInfo SevVerbose
#define SevFRMutationInfo SevInfo
struct VersionedMutation {
MutationRef mutation;
LogMessageVersion version;
VersionedMutation() = default;
explicit VersionedMutation(MutationRef mutation, LogMessageVersion version)
: mutation(mutation), version(version) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, mutation, version);
}
};
using MutationsVec = Standalone<VectorRef<MutationRef>>;
using LogMessageVersionVec = Standalone<VectorRef<LogMessageVersion>>;
using VersionedMutationsVec = Standalone<VectorRef<VersionedMutation>>;
enum class RestoreRole { Invalid = 0, Master = 1, Loader, Applier };
BINARY_SERIALIZABLE(RestoreRole);