FastRestore:Applier:Intro StagingKey struct

This commit is contained in:
Meng Xu 2020-02-12 13:57:08 -08:00
parent 3e6bbe9e5b
commit c0f75d77b1
3 changed files with 54 additions and 3 deletions

View File

@ -134,7 +134,7 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
}
for (int mIndex = 0; mIndex < mutations.size(); mIndex++) {
MutationRef mutation = mutations[mIndex];
TraceEvent(SevFRMutationInfo, "FastRestore")
TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseReceiveMutations")
.detail("ApplierNode", self->id())
.detail("RestoreAsset", req.asset.toString())
.detail("Version", commitVersion)

View File

@ -40,11 +40,64 @@
#include "flow/actorcompiler.h" // has to be last include
struct StagingKey {
Key key; // TODO: Maybe not needed?
Value val;
MutationRef::Type type; // set or clear
Version version; // largest version of set or clear for the key
std::map<Version, MutationsVec> pendingMutations; // mutations not set or clear type
// bool operator < (const StagingKey& rhs) const {
// return std::tie(key, version, type, value)
// }
explicit StagingKey() : version(0) {}
explicit StagingKey(MutationRef m, Version version)
: key(m.param1), val(m.param2), type(m.type), version(versoin) {}
void add(const MutationRef& m, Version newVersion) {
ASSERT(version > 0); // Only add mutation
if (version < newVersion) {
if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) {
key = m.param1;
val = m.param2;
type = m.type;
version = newVersion;
} else {
if (pendingMutations.find(newVersion) == pendingMutations.end()) {
pendingMutations.emplace(newVersion, MutationsVec());
}
// TODO: Do we really need deep copy?
pendingMutations[newVersion].push_back_deep(pendingMutations.arena(), m);
}
} else if (version == newVersion) {
TraceEvent("FastRestoreApplierStagingKeyMutationAtSameVersion")
.detail("Version", newVersion)
.detail("NewMutation", m.toString())
.detail("ExistingKeyType", typeString[type]);
if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) {
TraceEvent(SevError, "FastRestoreApplierStagingKeyMutationAtSameVersionUnhandled")
.detail("Version", newVersion)
.detail("NewMutation", m.toString())
.detail("ExistingKeyType", typeString[type]);
}
} // else input mutation is old and can be ignored
return;
}
}
struct StagingKeyRange {
KeyRange range;
MutationRef::Type type; // clearrange
Version version;
}
struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
// processedFileState: key: RestoreAsset; value: largest version of mutation received on the applier
std::map<RestoreAsset, NotifiedVersion> processedFileState;
Optional<Future<Void>> dbApplier;
VersionedMutationsMap kvOps; // Mutations at each version
std::map<Key, StagingKey> stagingKeys;
std::set<StagingKeyRange> stagingKeyRanges;
Future<Void> pollMetrics;

View File

@ -292,8 +292,6 @@ std::string RestoreConfigFR::toString() {
return ss.str();
}
//typedef RestoreConfigFR::RestoreFile RestoreFileFR;
// parallelFileRestore is copied from FileBackupAgent.actor.cpp for the same reason as RestoreConfigFR is copied
// The implementation of parallelFileRestore is copied from FileBackupAgent.actor.cpp
// parallelFileRestore is copied from FileBackupAgent.actor.cpp for the same reason as RestoreConfigFR is copied