mirror of
https://github.com/apple/foundationdb.git
synced 2025-06-02 03:12:12 +08:00
Merge pull request #3450 from sfc-gh-tclinkenbeard/6.3-use-vector-mutations
Replace VersionUpdateRef with VerUpdateRef
This commit is contained in:
commit
c6a2043d53
@ -48,9 +48,13 @@ inline bool canReplyWith(Error e) {
|
||||
};
|
||||
}
|
||||
|
||||
const int VERSION_OVERHEAD = 64 + sizeof(Version) + sizeof(Standalone<VersionUpdateRef>) + //mutationLog, 64b overhead for map
|
||||
2 * (64 + sizeof(Version) + sizeof(Reference<VersionedMap<KeyRef,
|
||||
ValueOrClearToRef>::PTreeT>)); //versioned map [ x2 for createNewVersion(version+1) ], 64b overhead for map
|
||||
const int VERSION_OVERHEAD =
|
||||
64 + sizeof(Version) + sizeof(Standalone<VerUpdateRef>) + // mutationLog, 64b overhead for map
|
||||
2 * (64 + sizeof(Version) +
|
||||
sizeof(
|
||||
Reference<VersionedMap<KeyRef,
|
||||
ValueOrClearToRef>::PTreeT>)); // versioned map [ x2 for createNewVersion(version+1)
|
||||
// ], 64b overhead for map
|
||||
static int mvccStorageBytes( MutationRef const& m ) { return VersionedMap<KeyRef, ValueOrClearToRef>::overheadPerItem * 2 + (MutationRef::OVERHEAD_BYTES + m.param1.size() + m.param2.size()) * 2; }
|
||||
|
||||
struct StorageCacheData {
|
||||
@ -60,7 +64,7 @@ private:
|
||||
VersionedData versionedData;
|
||||
// in-memory mutationLog that the versionedData contains references to
|
||||
// TODO change it to a deque, already contains mutations in version order
|
||||
std::map<Version, Standalone<VersionUpdateRef>> mutationLog; // versions (durableVersion, version]
|
||||
std::map<Version, Standalone<VerUpdateRef>> mutationLog; // versions (durableVersion, version]
|
||||
|
||||
public:
|
||||
UID thisServerID; // unique id
|
||||
@ -147,12 +151,12 @@ public:
|
||||
}
|
||||
|
||||
Arena lastArena;
|
||||
std::map<Version, Standalone<VersionUpdateRef>> const & getMutationLog() { return mutationLog; }
|
||||
std::map<Version, Standalone<VersionUpdateRef>>& getMutableMutationLog() { return mutationLog; }
|
||||
std::map<Version, Standalone<VerUpdateRef>> const& getMutationLog() { return mutationLog; }
|
||||
std::map<Version, Standalone<VerUpdateRef>>& getMutableMutationLog() { return mutationLog; }
|
||||
VersionedData const& data() const { return versionedData; }
|
||||
VersionedData& mutableData() { return versionedData; }
|
||||
|
||||
Standalone<VersionUpdateRef>& addVersionToMutationLog(Version v) {
|
||||
Standalone<VerUpdateRef>& addVersionToMutationLog(Version v) {
|
||||
// return existing version...
|
||||
auto m = mutationLog.find(v);
|
||||
if (m != mutationLog.end())
|
||||
@ -167,13 +171,12 @@ public:
|
||||
return u;
|
||||
}
|
||||
|
||||
MutationRef addMutationToMutationLog(Standalone<VersionUpdateRef> &mLV, MutationRef const& m){
|
||||
MutationRef addMutationToMutationLog(Standalone<VerUpdateRef>& mLV, MutationRef const& m) {
|
||||
//TODO find out more
|
||||
//byteSampleApplyMutation(m, mLV.version);
|
||||
counters.bytesInput += mvccStorageBytes(m);
|
||||
return mLV.mutations.push_back_deep( mLV.arena(), m );
|
||||
return mLV.push_back_deep(mLV.arena(), m);
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
///////////////////////////////////// Queries /////////////////////////////////
|
||||
|
@ -135,22 +135,6 @@ struct TLogConfirmRunningRequest {
|
||||
}
|
||||
};
|
||||
|
||||
struct VersionUpdateRef {
|
||||
Version version;
|
||||
MutationListRef mutations;
|
||||
bool isPrivateData;
|
||||
|
||||
VersionUpdateRef() : isPrivateData(false), version(invalidVersion) {}
|
||||
VersionUpdateRef( Arena& to, const VersionUpdateRef& from ) : version(from.version), mutations( to, from.mutations ), isPrivateData( from.isPrivateData ) {}
|
||||
int totalSize() const { return mutations.totalSize(); }
|
||||
int expectedSize() const { return mutations.expectedSize(); }
|
||||
|
||||
template <class Ar>
|
||||
void serialize( Ar& ar ) {
|
||||
serializer(ar, version, mutations, isPrivateData);
|
||||
}
|
||||
};
|
||||
|
||||
struct VerUpdateRef {
|
||||
Version version;
|
||||
VectorRef<MutationRef> mutations;
|
||||
@ -160,6 +144,11 @@ struct VerUpdateRef {
|
||||
VerUpdateRef( Arena& to, const VerUpdateRef& from ) : version(from.version), mutations( to, from.mutations ), isPrivateData( from.isPrivateData ) {}
|
||||
int expectedSize() const { return mutations.expectedSize(); }
|
||||
|
||||
MutationRef push_back_deep(Arena& arena, const MutationRef& m) {
|
||||
mutations.push_back_deep(arena, m);
|
||||
return mutations.back();
|
||||
}
|
||||
|
||||
template <class Ar>
|
||||
void serialize( Ar& ar ) {
|
||||
serializer(ar, version, mutations, isPrivateData);
|
||||
|
@ -178,7 +178,7 @@ private:
|
||||
struct StorageServer* data;
|
||||
IKeyValueStore* storage;
|
||||
|
||||
void writeMutations( MutationListRef mutations, Version debugVersion, const char* debugContext );
|
||||
void writeMutations(const VectorRef<MutationRef>& mutations, Version debugVersion, const char* debugContext);
|
||||
|
||||
ACTOR static Future<Key> readFirstKey( IKeyValueStore* storage, KeyRangeRef range ) {
|
||||
Standalone<RangeResultRef> r = wait( storage->readRange( range, 1 ) );
|
||||
@ -242,8 +242,12 @@ struct UpdateEagerReadInfo {
|
||||
}
|
||||
};
|
||||
|
||||
const int VERSION_OVERHEAD = 64 + sizeof(Version) + sizeof(Standalone<VersionUpdateRef>) + //mutationLog, 64b overhead for map
|
||||
2 * (64 + sizeof(Version) + sizeof(Reference<VersionedMap<KeyRef, ValueOrClearToRef>::PTreeT>)); //versioned map [ x2 for createNewVersion(version+1) ], 64b overhead for map
|
||||
const int VERSION_OVERHEAD =
|
||||
64 + sizeof(Version) + sizeof(Standalone<VerUpdateRef>) + // mutationLog, 64b overhead for map
|
||||
2 * (64 + sizeof(Version) +
|
||||
sizeof(Reference<VersionedMap<KeyRef, ValueOrClearToRef>::PTreeT>)); // versioned map [ x2 for
|
||||
// createNewVersion(version+1) ], 64b
|
||||
// overhead for map
|
||||
static int mvccStorageBytes( MutationRef const& m ) { return VersionedMap<KeyRef, ValueOrClearToRef>::overheadPerItem * 2 + (MutationRef::OVERHEAD_BYTES + m.param1.size() + m.param2.size()) * 2; }
|
||||
|
||||
struct FetchInjectionInfo {
|
||||
@ -277,7 +281,7 @@ private:
|
||||
// at older versions may contain older items which are also in storage (this is OK because of idempotency)
|
||||
|
||||
VersionedData versionedData;
|
||||
std::map<Version, Standalone<VersionUpdateRef>> mutationLog; // versions (durableVersion, version]
|
||||
std::map<Version, Standalone<VerUpdateRef>> mutationLog; // versions (durableVersion, version]
|
||||
|
||||
public:
|
||||
Tag tag;
|
||||
@ -289,8 +293,8 @@ public:
|
||||
double cpuUsage;
|
||||
double diskUsage;
|
||||
|
||||
std::map<Version, Standalone<VersionUpdateRef>> const & getMutationLog() { return mutationLog; }
|
||||
std::map<Version, Standalone<VersionUpdateRef>>& getMutableMutationLog() { return mutationLog; }
|
||||
std::map<Version, Standalone<VerUpdateRef>> const& getMutationLog() const { return mutationLog; }
|
||||
std::map<Version, Standalone<VerUpdateRef>>& getMutableMutationLog() { return mutationLog; }
|
||||
VersionedData const& data() const { return versionedData; }
|
||||
VersionedData& mutableData() { return versionedData; }
|
||||
|
||||
@ -348,7 +352,7 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
Standalone<VersionUpdateRef>& addVersionToMutationLog(Version v) {
|
||||
Standalone<VerUpdateRef>& addVersionToMutationLog(Version v) {
|
||||
// return existing version...
|
||||
auto m = mutationLog.find(v);
|
||||
if (m != mutationLog.end())
|
||||
@ -363,10 +367,10 @@ public:
|
||||
return u;
|
||||
}
|
||||
|
||||
MutationRef addMutationToMutationLog(Standalone<VersionUpdateRef> &mLV, MutationRef const& m){
|
||||
MutationRef addMutationToMutationLog(Standalone<VerUpdateRef>& mLV, MutationRef const& m) {
|
||||
byteSampleApplyMutation(m, mLV.version);
|
||||
counters.bytesInput += mvccStorageBytes(m);
|
||||
return mLV.mutations.push_back_deep( mLV.arena(), m );
|
||||
return mLV.push_back_deep(mLV.arena(), m);
|
||||
}
|
||||
|
||||
StorageServerDisk storage;
|
||||
@ -1746,18 +1750,18 @@ bool changeDurableVersion( StorageServer* data, Version desiredDurableVersion )
|
||||
verData.createNewVersion( data->version.get()+1 );
|
||||
|
||||
int64_t bytesDurable = VERSION_OVERHEAD;
|
||||
for(auto m = v.mutations.begin(); m; ++m) {
|
||||
bytesDurable += mvccStorageBytes(*m);
|
||||
auto i = verData.atLatest().find(m->param1);
|
||||
for (const auto& m : v.mutations) {
|
||||
bytesDurable += mvccStorageBytes(m);
|
||||
auto i = verData.atLatest().find(m.param1);
|
||||
if (i) {
|
||||
ASSERT( i.key() == m->param1 );
|
||||
ASSERT(i.key() == m.param1);
|
||||
ASSERT( i.insertVersion() >= nextDurableVersion );
|
||||
if (i.insertVersion() == nextDurableVersion)
|
||||
verData.erase(i);
|
||||
}
|
||||
if (m->type == MutationRef::SetValue) {
|
||||
if (m.type == MutationRef::SetValue) {
|
||||
// A set can split a clear, so there might be another entry immediately after this one that should also be cleaned up
|
||||
i = verData.atLatest().upper_bound(m->param1);
|
||||
i = verData.atLatest().upper_bound(m.param1);
|
||||
if (i) {
|
||||
ASSERT( i.insertVersion() >= nextDurableVersion );
|
||||
if (i.insertVersion() == nextDurableVersion)
|
||||
@ -1929,7 +1933,8 @@ void applyMutation( StorageServer *self, MutationRef const& m, Arena& arena, Sto
|
||||
|
||||
}
|
||||
|
||||
void removeDataRange( StorageServer *ss, Standalone<VersionUpdateRef> &mLV, KeyRangeMap<Reference<ShardInfo>>& shards, KeyRangeRef range ) {
|
||||
void removeDataRange(StorageServer* ss, Standalone<VerUpdateRef>& mLV, KeyRangeMap<Reference<ShardInfo>>& shards,
|
||||
KeyRangeRef range) {
|
||||
// modify the latest version of data to remove all sets and trim all clears to exclude range.
|
||||
// Add a clear to mLV (mutationLog[data.getLatestVersion()]) that ensures all keys in range are removed from the disk when this latest version becomes durable
|
||||
// mLV is also modified if necessary to ensure that split clears can be forgotten
|
||||
@ -2582,7 +2587,7 @@ void StorageServer::addMutation(Version version, MutationRef const& mutation, Ke
|
||||
}
|
||||
|
||||
struct OrderByVersion {
|
||||
bool operator()( const VersionUpdateRef& a, const VersionUpdateRef& b ) {
|
||||
bool operator()(const VerUpdateRef& a, const VerUpdateRef& b) {
|
||||
if (a.version != b.version) return a.version < b.version;
|
||||
if (a.isPrivateData != b.isPrivateData) return a.isPrivateData;
|
||||
return false;
|
||||
@ -3181,13 +3186,14 @@ void StorageServerDisk::writeMutation( MutationRef mutation ) {
|
||||
ASSERT(false);
|
||||
}
|
||||
|
||||
void StorageServerDisk::writeMutations( MutationListRef mutations, Version debugVersion, const char* debugContext ) {
|
||||
for(auto m = mutations.begin(); m; ++m) {
|
||||
debugMutation(debugContext, debugVersion, *m);
|
||||
if (m->type == MutationRef::SetValue) {
|
||||
storage->set( KeyValueRef(m->param1, m->param2) );
|
||||
} else if (m->type == MutationRef::ClearRange) {
|
||||
storage->clear( KeyRangeRef(m->param1, m->param2) );
|
||||
void StorageServerDisk::writeMutations(const VectorRef<MutationRef>& mutations, Version debugVersion,
|
||||
const char* debugContext) {
|
||||
for (const auto& m : mutations) {
|
||||
debugMutation(debugContext, debugVersion, m);
|
||||
if (m.type == MutationRef::SetValue) {
|
||||
storage->set(KeyValueRef(m.param1, m.param2));
|
||||
} else if (m.type == MutationRef::ClearRange) {
|
||||
storage->clear(KeyRangeRef(m.param1, m.param2));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -3198,12 +3204,11 @@ bool StorageServerDisk::makeVersionMutationsDurable( Version& prevStorageVersion
|
||||
// Apply mutations from the mutationLog
|
||||
auto u = data->getMutationLog().upper_bound(prevStorageVersion);
|
||||
if (u != data->getMutationLog().end() && u->first <= newStorageVersion) {
|
||||
VersionUpdateRef const& v = u->second;
|
||||
VerUpdateRef const& v = u->second;
|
||||
ASSERT( v.version > prevStorageVersion && v.version <= newStorageVersion );
|
||||
debugKeyRange("makeVersionMutationsDurable", v.version, allKeys);
|
||||
writeMutations(v.mutations, v.version, "makeVersionDurable");
|
||||
for(auto m=v.mutations.begin(); m; ++m)
|
||||
bytesLeft -= mvccStorageBytes(*m);
|
||||
for (const auto& m : v.mutations) bytesLeft -= mvccStorageBytes(m);
|
||||
prevStorageVersion = v.version;
|
||||
return false;
|
||||
} else {
|
||||
|
Loading…
x
Reference in New Issue
Block a user