mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-14 01:42:37 +08:00
- Encode version vector before sending it over the wire.
Encoding methods used: - Tag localities: Run length encoding - Tag ids: Compact representation - Commit versions: delta encoding. If "n" is the number of entries in the version vector, with the tags spread over "m" data centers, these techniques will reduce the number of bytes to represent the version vector from "(11 * n)" bytes to "(3 * m + 2 * n)" / "(3 * m + 3 * n)" bytes (depending on the max tag id value, and ignoring some constants) in the best case.
This commit is contained in:
parent
96181feab6
commit
cb3add17b8
@ -151,6 +151,7 @@ set(FDBCLIENT_SRCS
|
||||
VersionedMap.h
|
||||
VersionedMap.cpp
|
||||
VersionVector.h
|
||||
VersionVector.cpp
|
||||
WellKnownEndpoints.h
|
||||
WriteMap.h
|
||||
json_spirit/json_spirit_error_position.h
|
||||
|
380
fdbclient/VersionVector.cpp
Normal file
380
fdbclient/VersionVector.cpp
Normal file
@ -0,0 +1,380 @@
|
||||
/*
|
||||
* VersionVector.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/UnitTest.h"
|
||||
#include "fdbclient/VersionVector.h"
|
||||
|
||||
namespace unit_tests {
|
||||
|
||||
struct TestContextArena {
|
||||
Arena& _arena;
|
||||
Arena& arena() { return _arena; }
|
||||
ProtocolVersion protocolVersion() const { return g_network->protocolVersion(); }
|
||||
uint8_t* allocate(size_t size) { return new (_arena) uint8_t[size]; }
|
||||
};
|
||||
|
||||
TEST_CASE("/fdbclient/VersionVector/emptyVV") {
|
||||
Arena arena;
|
||||
TestContextArena context{ arena };
|
||||
|
||||
{
|
||||
VersionVector serializedVV; // an empty version vector
|
||||
size_t size = dynamic_size_traits<VersionVector>::size(serializedVV, context);
|
||||
|
||||
uint8_t* buf = context.allocate(size);
|
||||
dynamic_size_traits<VersionVector>::save(buf, serializedVV, context);
|
||||
|
||||
VersionVector deserializedVV;
|
||||
dynamic_size_traits<VersionVector>::load(buf, size, deserializedVV, context);
|
||||
|
||||
ASSERT(serializedVV.compare(deserializedVV));
|
||||
}
|
||||
|
||||
{
|
||||
VersionVector serializedVV(133200164); // "VersionVector::maxVersion" is set, empty otherwise
|
||||
size_t size = dynamic_size_traits<VersionVector>::size(serializedVV, context);
|
||||
|
||||
uint8_t* buf = context.allocate(size);
|
||||
dynamic_size_traits<VersionVector>::save(buf, serializedVV, context);
|
||||
|
||||
VersionVector deserializedVV;
|
||||
dynamic_size_traits<VersionVector>::load(buf, size, deserializedVV, context);
|
||||
|
||||
ASSERT(serializedVV.compare(deserializedVV));
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbclient/VersionVector/simpleVV") {
|
||||
Arena arena;
|
||||
TestContextArena context{ arena };
|
||||
|
||||
VersionVector serializedVV;
|
||||
serializedVV.setVersion(Tag(-1, 2), 3619339);
|
||||
serializedVV.setVersion(Tag(0, 13), 13292611);
|
||||
|
||||
std::set<Tag> tags;
|
||||
tags.emplace(0, 2);
|
||||
tags.emplace(0, 1);
|
||||
tags.emplace(0, 0);
|
||||
serializedVV.setVersion(tags, 13391141);
|
||||
|
||||
size_t size = dynamic_size_traits<VersionVector>::size(serializedVV, context);
|
||||
|
||||
uint8_t* buf = context.allocate(size);
|
||||
dynamic_size_traits<VersionVector>::save(buf, serializedVV, context);
|
||||
|
||||
VersionVector deserializedVV;
|
||||
dynamic_size_traits<VersionVector>::load(buf, size, deserializedVV, context);
|
||||
|
||||
ASSERT(serializedVV.compare(deserializedVV));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Populates version vector (with randomly generated tag localities, ids, and commit versions)
|
||||
// based on the given specifications.
|
||||
// @param vv Version vector
|
||||
// @param tagCount total number of storage servers in the cluster
|
||||
// @param localityCount total number of localities/regions in the cluster
|
||||
// @param maxTagId maximum value of any tag id in the cluster
|
||||
// @param maxCommitVersionDelta maximum difference between commit versions in the version vector
|
||||
// @note assumes each locality contains the same number of tags
|
||||
// @note picks locality values randomly from range [tagLocalityInvalid+1, INT8_MAX)
|
||||
void populateVersionVector(VersionVector& vv,
|
||||
int tagCount,
|
||||
int localityCount,
|
||||
int maxTagId,
|
||||
const uint64_t maxCommitVersionDelta) {
|
||||
std::vector<uint16_t> ids;
|
||||
std::vector<int8_t> localities;
|
||||
Version minVersion;
|
||||
std::vector<Version> versions;
|
||||
int tagsPerLocality = tagCount / localityCount;
|
||||
|
||||
// Populate localities.
|
||||
for (int i = 0; localities.size() < (size_t)localityCount; i++) {
|
||||
int8_t locality = deterministicRandom()->randomInt(tagLocalityInvalid + 1, INT8_MAX);
|
||||
if (std::find(localities.begin(), localities.end(), locality) == localities.end()) {
|
||||
localities.push_back(locality);
|
||||
}
|
||||
}
|
||||
|
||||
// Populate ids.
|
||||
for (int i = 0; i < tagCount; i++) {
|
||||
// Some of the ids could be duplicates, that's fine.
|
||||
ids.push_back(deterministicRandom()->randomInt(0, maxTagId));
|
||||
}
|
||||
|
||||
// Choose a value for minVersion. (Choose a value in such a way that
|
||||
// "minVersion + maxCommitVersionDelta" does not exceed INT64_MAX.)
|
||||
if (maxCommitVersionDelta <= UINT16_MAX) {
|
||||
minVersion = deterministicRandom()->randomUInt32();
|
||||
} else if (maxCommitVersionDelta <= UINT32_MAX) {
|
||||
minVersion = deterministicRandom()->randomInt(0, UINT16_MAX);
|
||||
} else {
|
||||
minVersion = 0;
|
||||
}
|
||||
|
||||
// Populate versions.
|
||||
Version versionDelta;
|
||||
for (int i = 0; i < tagCount; i++) {
|
||||
if (maxCommitVersionDelta <= UINT8_MAX) {
|
||||
versionDelta = deterministicRandom()->randomInt(0, UINT8_MAX);
|
||||
} else if (maxCommitVersionDelta <= UINT16_MAX) {
|
||||
versionDelta = deterministicRandom()->randomInt(0, UINT16_MAX);
|
||||
} else if (maxCommitVersionDelta <= UINT32_MAX) {
|
||||
versionDelta = deterministicRandom()->randomUInt32();
|
||||
} else {
|
||||
versionDelta = deterministicRandom()->randomInt64(0, INT64_MAX);
|
||||
}
|
||||
// Some of the versions could be duplicates, that's fine.
|
||||
versions.push_back(minVersion + versionDelta);
|
||||
}
|
||||
|
||||
// Sort versions.
|
||||
std::sort(versions.begin(), versions.end());
|
||||
|
||||
// Populate the version vector.
|
||||
std::set<Tag> tags;
|
||||
int tagIndex = 0;
|
||||
for (int i = 0; i < localities.size() && tagIndex < tagCount; i++) {
|
||||
for (int j = 0; j < tagsPerLocality && tagIndex < tagCount; j++, tagIndex++) {
|
||||
if (Tag(localities[i], ids[tagIndex]) == invalidTag) {
|
||||
continue; // skip this tag (this version also gets skipped, that's fine)
|
||||
}
|
||||
if (versions[tagIndex] == vv.getMaxVersion()) {
|
||||
tags.emplace(localities[i], ids[tagIndex]);
|
||||
continue; // skip this version; this tag will get the next higher version
|
||||
}
|
||||
if (tags.empty()) {
|
||||
vv.setVersion(Tag(localities[i], ids[tagIndex]), versions[tagIndex]);
|
||||
} else {
|
||||
vv.setVersion(tags, versions[tagIndex]);
|
||||
tags.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
ASSERT(tagIndex == tagCount);
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbclient/VersionVector/testA") {
|
||||
Arena arena;
|
||||
TestContextArena context{ arena };
|
||||
|
||||
VersionVector serializedVV;
|
||||
// 80 storage servers spread over 2 regions, maxTagId < INT8_MAX, and
|
||||
// maxCommitVersionDelta < UINT8_MAX.
|
||||
populateVersionVector(serializedVV, 80, 2, INT8_MAX, UINT8_MAX);
|
||||
|
||||
size_t size = dynamic_size_traits<VersionVector>::size(serializedVV, context);
|
||||
|
||||
uint8_t* buf = context.allocate(size);
|
||||
dynamic_size_traits<VersionVector>::save(buf, serializedVV, context);
|
||||
|
||||
VersionVector deserializedVV;
|
||||
dynamic_size_traits<VersionVector>::load(buf, size, deserializedVV, context);
|
||||
|
||||
ASSERT(serializedVV.compare(deserializedVV));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbclient/VersionVector/testB") {
|
||||
Arena arena;
|
||||
TestContextArena context{ arena };
|
||||
|
||||
VersionVector serializedVV;
|
||||
// 800 storage servers spread over 2 regions, maxTagId < INT16_MAX, and
|
||||
// maxCommitVersionDelta < UINT8_MAX.
|
||||
populateVersionVector(serializedVV, 800, 2, INT16_MAX, UINT8_MAX);
|
||||
|
||||
size_t size = dynamic_size_traits<VersionVector>::size(serializedVV, context);
|
||||
|
||||
uint8_t* buf = context.allocate(size);
|
||||
dynamic_size_traits<VersionVector>::save(buf, serializedVV, context);
|
||||
|
||||
VersionVector deserializedVV;
|
||||
dynamic_size_traits<VersionVector>::load(buf, size, deserializedVV, context);
|
||||
|
||||
ASSERT(serializedVV.compare(deserializedVV));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbclient/VersionVector/testC") {
|
||||
Arena arena;
|
||||
TestContextArena context{ arena };
|
||||
|
||||
VersionVector serializedVV;
|
||||
// 800 storage servers spread over 2 regions, maxTagId < INT16_MAX, and
|
||||
// maxCommitVersionDelta < UINT16_MAX.
|
||||
populateVersionVector(serializedVV, 800, 2, INT16_MAX, UINT16_MAX);
|
||||
|
||||
size_t size = dynamic_size_traits<VersionVector>::size(serializedVV, context);
|
||||
|
||||
uint8_t* buf = context.allocate(size);
|
||||
dynamic_size_traits<VersionVector>::save(buf, serializedVV, context);
|
||||
|
||||
VersionVector deserializedVV;
|
||||
dynamic_size_traits<VersionVector>::load(buf, size, deserializedVV, context);
|
||||
|
||||
ASSERT(serializedVV.compare(deserializedVV));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbclient/VersionVector/testD") {
|
||||
Arena arena;
|
||||
TestContextArena context{ arena };
|
||||
|
||||
VersionVector serializedVV;
|
||||
// 800 storage servers spread over 2 regions, maxTagId < INT16_MAX, and
|
||||
// maxCommitVersionDelta < UINT32_MAX.
|
||||
populateVersionVector(serializedVV, 800, 2, INT16_MAX, UINT32_MAX);
|
||||
|
||||
size_t size = dynamic_size_traits<VersionVector>::size(serializedVV, context);
|
||||
|
||||
uint8_t* buf = context.allocate(size);
|
||||
dynamic_size_traits<VersionVector>::save(buf, serializedVV, context);
|
||||
|
||||
VersionVector deserializedVV;
|
||||
dynamic_size_traits<VersionVector>::load(buf, size, deserializedVV, context);
|
||||
|
||||
ASSERT(serializedVV.compare(deserializedVV));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbclient/VersionVector/testE") {
|
||||
Arena arena;
|
||||
TestContextArena context{ arena };
|
||||
|
||||
VersionVector serializedVV;
|
||||
// 800 storage servers spread over 2 regions, maxTagId < INT16_MAX, and
|
||||
// maxCommitVersionDelta < UINT64_MAX.
|
||||
populateVersionVector(serializedVV, 800, 2, INT16_MAX, UINT64_MAX);
|
||||
|
||||
size_t size = dynamic_size_traits<VersionVector>::size(serializedVV, context);
|
||||
|
||||
uint8_t* buf = context.allocate(size);
|
||||
dynamic_size_traits<VersionVector>::save(buf, serializedVV, context);
|
||||
|
||||
VersionVector deserializedVV;
|
||||
dynamic_size_traits<VersionVector>::load(buf, size, deserializedVV, context);
|
||||
|
||||
ASSERT(serializedVV.compare(deserializedVV));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbclient/VersionVector/testF") {
|
||||
Arena arena;
|
||||
TestContextArena context{ arena };
|
||||
|
||||
VersionVector serializedVV;
|
||||
// 1600 storage servers spread over 4 regions, maxTagId < INT16_MAX, and
|
||||
// maxCommitVersionDelta < UINT8_MAX.
|
||||
populateVersionVector(serializedVV, 1600, 4, INT16_MAX, UINT8_MAX);
|
||||
|
||||
size_t size = dynamic_size_traits<VersionVector>::size(serializedVV, context);
|
||||
|
||||
uint8_t* buf = context.allocate(size);
|
||||
dynamic_size_traits<VersionVector>::save(buf, serializedVV, context);
|
||||
|
||||
VersionVector deserializedVV;
|
||||
dynamic_size_traits<VersionVector>::load(buf, size, deserializedVV, context);
|
||||
|
||||
ASSERT(serializedVV.compare(deserializedVV));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbclient/VersionVector/testG") {
|
||||
Arena arena;
|
||||
TestContextArena context{ arena };
|
||||
|
||||
VersionVector serializedVV;
|
||||
// 1600 storage servers spread over 4 regions, maxTagId < INT16_MAX, and
|
||||
// maxCommitVersionDelta < UINT16_MAX.
|
||||
populateVersionVector(serializedVV, 1600, 4, INT16_MAX, UINT16_MAX);
|
||||
|
||||
size_t size = dynamic_size_traits<VersionVector>::size(serializedVV, context);
|
||||
|
||||
uint8_t* buf = context.allocate(size);
|
||||
dynamic_size_traits<VersionVector>::save(buf, serializedVV, context);
|
||||
|
||||
VersionVector deserializedVV;
|
||||
dynamic_size_traits<VersionVector>::load(buf, size, deserializedVV, context);
|
||||
|
||||
ASSERT(serializedVV.compare(deserializedVV));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbclient/VersionVector/testH") {
|
||||
Arena arena;
|
||||
TestContextArena context{ arena };
|
||||
|
||||
VersionVector serializedVV;
|
||||
// 3200 storage servers spread over 4 regions, maxTagId < INT16_MAX, and
|
||||
// maxCommitVersionDelta < UINT32_MAX.
|
||||
populateVersionVector(serializedVV, 3200, 4, INT16_MAX, UINT32_MAX);
|
||||
|
||||
size_t size = dynamic_size_traits<VersionVector>::size(serializedVV, context);
|
||||
|
||||
uint8_t* buf = context.allocate(size);
|
||||
dynamic_size_traits<VersionVector>::save(buf, serializedVV, context);
|
||||
|
||||
VersionVector deserializedVV;
|
||||
dynamic_size_traits<VersionVector>::load(buf, size, deserializedVV, context);
|
||||
|
||||
ASSERT(serializedVV.compare(deserializedVV));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbclient/VersionVector/testI") {
|
||||
Arena arena;
|
||||
TestContextArena context{ arena };
|
||||
|
||||
VersionVector serializedVV;
|
||||
// 3200 storage servers spread over 4 regions, maxTagId < INT16_MAX, and
|
||||
// maxCommitVersionDelta < UINT64_MAX.
|
||||
populateVersionVector(serializedVV, 3200, 4, INT16_MAX, UINT64_MAX);
|
||||
|
||||
size_t size = dynamic_size_traits<VersionVector>::size(serializedVV, context);
|
||||
|
||||
uint8_t* buf = context.allocate(size);
|
||||
dynamic_size_traits<VersionVector>::save(buf, serializedVV, context);
|
||||
|
||||
VersionVector deserializedVV;
|
||||
dynamic_size_traits<VersionVector>::load(buf, size, deserializedVV, context);
|
||||
|
||||
ASSERT(serializedVV.compare(deserializedVV));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
} // namespace unit_tests
|
||||
|
||||
void forceLinkVersionVectorTests() {}
|
@ -29,39 +29,65 @@
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/Knobs.h"
|
||||
|
||||
static const int InvalidEncodedSize = 0;
|
||||
|
||||
struct VersionVector {
|
||||
boost::container::flat_map<Tag, Version> versions;
|
||||
friend struct serializable_traits<VersionVector>;
|
||||
boost::container::flat_map<Tag, Version> versions; // An ordered map. (Note:
|
||||
// changing this to an unordered
|
||||
// map will break the
|
||||
// serialization code below.)
|
||||
Version maxVersion; // Specifies the max version in this version vector. (Note:
|
||||
// there may or may not be a corresponding entry for this
|
||||
// version in the "versions" map.)
|
||||
|
||||
VersionVector() : maxVersion(invalidVersion) {}
|
||||
VersionVector(Version version) : maxVersion(version) {}
|
||||
VersionVector() : maxVersion(invalidVersion), cachedEncodedSize(InvalidEncodedSize) {}
|
||||
VersionVector(Version version) : maxVersion(version), cachedEncodedSize(InvalidEncodedSize) {}
|
||||
|
||||
private:
|
||||
// Only invoked by getDelta() and applyDelta(), where tag has been validated
|
||||
// and version is guaranteed to be larger than the existing value.
|
||||
inline void setVersionNoCheck(const Tag& tag, Version version) { versions[tag] = version; }
|
||||
inline void setVersionNoCheck(const Tag& tag, Version version) {
|
||||
versions[tag] = version;
|
||||
invalidateCachedEncodedSize();
|
||||
}
|
||||
|
||||
inline void invalidateCachedEncodedSize() { cachedEncodedSize = InvalidEncodedSize; }
|
||||
|
||||
// Encoded version vector size. Introduced to help speed up serialization.
|
||||
// @note This encoded size is not meant to be kept in sync with the updates
|
||||
// that happen to the version vector.
|
||||
// @note A value of 0 (= InvalidEncodedSize) indicates that the encoded version
|
||||
// vector size is not cached.
|
||||
size_t cachedEncodedSize;
|
||||
|
||||
public:
|
||||
Version getMaxVersion() const { return maxVersion; }
|
||||
|
||||
void setMaxVersion(Version version) { maxVersion = version; }
|
||||
|
||||
int size() const { return versions.size(); }
|
||||
|
||||
bool empty() const { return versions.empty(); }
|
||||
|
||||
void setVersion(const Tag& tag, Version version) {
|
||||
ASSERT(tag != invalidTag);
|
||||
ASSERT(tag.locality > tagLocalityInvalid);
|
||||
ASSERT(version > maxVersion);
|
||||
versions[tag] = version;
|
||||
maxVersion = version;
|
||||
invalidateCachedEncodedSize();
|
||||
}
|
||||
|
||||
void setVersion(const std::set<Tag>& tags, Version version) {
|
||||
ASSERT(version > maxVersion);
|
||||
for (auto& tag : tags) {
|
||||
ASSERT(tag != invalidTag);
|
||||
ASSERT(tag.locality > tagLocalityInvalid);
|
||||
versions[tag] = version;
|
||||
}
|
||||
maxVersion = version;
|
||||
invalidateCachedEncodedSize();
|
||||
}
|
||||
|
||||
bool hasVersion(const Tag& tag) const {
|
||||
@ -80,6 +106,7 @@ public:
|
||||
void clear() {
|
||||
versions.clear();
|
||||
maxVersion = invalidVersion;
|
||||
invalidateCachedEncodedSize();
|
||||
}
|
||||
|
||||
// @note this method, together with method applyDelta(), helps minimize
|
||||
@ -144,9 +171,386 @@ public:
|
||||
bool operator!=(const VersionVector& vv) const { return maxVersion != vv.maxVersion; }
|
||||
bool operator<(const VersionVector& vv) const { return maxVersion < vv.maxVersion; }
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, versions, maxVersion);
|
||||
bool compare(const VersionVector& vv) {
|
||||
if (maxVersion != vv.maxVersion) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (versions.size() != vv.versions.size()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto iterA = versions.begin();
|
||||
auto iterB = vv.versions.begin();
|
||||
for (; iterA != versions.end(); iterA++, iterB++) {
|
||||
if (iterA->first != iterB->first || iterA->second != iterB->second) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
ASSERT(iterB == vv.versions.end());
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
//
|
||||
// Methods to set/get/check cached encoded version vector size.
|
||||
//
|
||||
void setCachedEncodedSize(size_t size) { cachedEncodedSize = size; }
|
||||
|
||||
bool isEncodedSizeCached() const { return cachedEncodedSize != InvalidEncodedSize; }
|
||||
|
||||
size_t getCachedEncodedSize() const {
|
||||
ASSERT(isEncodedSizeCached());
|
||||
return cachedEncodedSize;
|
||||
}
|
||||
|
||||
//
|
||||
// Methods to copy an encoded version vector into the serialization buffer.
|
||||
//
|
||||
// Encoding methods used:
|
||||
//
|
||||
// - Tag localities: Run-length encoding
|
||||
// - Tag ids: Compact representation (depending on the max tag id value)
|
||||
// - Commit versions: Delta encoding
|
||||
//
|
||||
|
||||
// Extracts information about tag ids, tag localities, and commit versions that are
|
||||
// captured in the version vector. This will avoid the need to make multiple iterations
|
||||
// over the contents of the version vector while (encoding and) serializing it.
|
||||
void getTagAndCommitVersionInfo(size_t& utlCount,
|
||||
uint16_t& maxTagId,
|
||||
Version& minCommitVersion,
|
||||
Version& maxCommitVersion) const {
|
||||
// Initialization
|
||||
utlCount = 0; // unique tag locality count
|
||||
maxTagId = 0; // the highest tag id in the version vector
|
||||
minCommitVersion = MAX_VERSION; // the lowest commit version in "VersionVector::versions"
|
||||
maxCommitVersion = invalidVersion; // the highest commit version in "VersionVector::versions"
|
||||
|
||||
// Population
|
||||
int8_t locality = tagLocalityInvalid;
|
||||
for (const auto& [tag, version] : versions) {
|
||||
if (locality != tag.locality) {
|
||||
locality = tag.locality;
|
||||
utlCount++;
|
||||
}
|
||||
|
||||
maxTagId = std::max(maxTagId, tag.id);
|
||||
minCommitVersion = std::min(minCommitVersion, version);
|
||||
maxCommitVersion = std::max(maxCommitVersion, version);
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate size of the encoded version vector.
|
||||
size_t getEncodedSize() const {
|
||||
size_t utlCount; // unique tag locality count
|
||||
uint16_t maxTagId; // the highest tag id in the version vector
|
||||
Version minVersion; // the lowest commit version in the version vector
|
||||
Version maxVersion; // the highest commit version in the version vector
|
||||
getTagAndCommitVersionInfo(utlCount, maxTagId, minVersion, maxVersion);
|
||||
|
||||
// Is the version vector empty?
|
||||
if (utlCount == 0) {
|
||||
return sizeof(size_t) + /* captures unique tag locality count (= 0, in this case) */
|
||||
sizeof(Version); /* captures VersionVector::maxVersion */
|
||||
}
|
||||
|
||||
size_t tagIdSize = 0; // number of bytes needed to serialize an individual (potentially compacted) tag id
|
||||
tagIdSize = (maxTagId <= UINT8_MAX) ? sizeof(uint8_t) : sizeof(uint16_t);
|
||||
|
||||
size_t commitVersionSize = 0; // number of bytes needed to serialize an individual commit version
|
||||
if ((maxVersion - minVersion) <= UINT8_MAX) {
|
||||
commitVersionSize = sizeof(uint8_t);
|
||||
} else if ((maxVersion - minVersion) <= UINT16_MAX) {
|
||||
commitVersionSize = sizeof(uint16_t);
|
||||
} else if ((maxVersion - minVersion) <= UINT32_MAX) {
|
||||
commitVersionSize = sizeof(uint32_t);
|
||||
} else {
|
||||
commitVersionSize = sizeof(uint64_t);
|
||||
}
|
||||
|
||||
return sizeof(size_t) + /* unique tag locality count */
|
||||
utlCount * (sizeof(int8_t) + sizeof(uint16_t)) + // unique tag locality values and their run lengths
|
||||
sizeof(uint8_t) + /* number of bytes needed to serialize an individual (potentially compacted) tag id */
|
||||
sizeof(uint8_t) + /* number of bytes needed to serialize an individual commit version */
|
||||
sizeof(Version) + /* the lowest commit version in the version vector */
|
||||
sizeof(size_t) + /* number of <tagid, version> pairs */
|
||||
this->size() * (tagIdSize + commitVersionSize) + /* encoded <tagid, version> pairs */
|
||||
sizeof(Version); /* VersionVector::maxVersion */
|
||||
}
|
||||
|
||||
// Copy "value" into the serialization buffer.
|
||||
template <typename T>
|
||||
void serialize(uint8_t*& out, T value) const {
|
||||
memcpy(out, &value, sizeof(T));
|
||||
out += sizeof(T);
|
||||
}
|
||||
|
||||
// Copy RLE encoded tag locality values into the serialization buffer.
|
||||
void serializeTagLocalities(size_t utlCount, uint8_t*& out) const {
|
||||
serialize<size_t>(out, utlCount); // unique tag locality count
|
||||
|
||||
// Is the version vector empty?
|
||||
if (utlCount == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
int8_t locality = tagLocalityInvalid;
|
||||
uint16_t localityCount = 0;
|
||||
for (const auto& [tag, version] : versions) {
|
||||
if (locality != tag.locality) {
|
||||
if (locality != tagLocalityInvalid) {
|
||||
serialize<int8_t>(out, locality); // tag locality value
|
||||
serialize<uint16_t>(out, localityCount); // run length of the locality value
|
||||
}
|
||||
locality = tag.locality;
|
||||
localityCount = 1;
|
||||
} else {
|
||||
localityCount++;
|
||||
}
|
||||
}
|
||||
|
||||
if (locality != tagLocalityInvalid) {
|
||||
serialize<int8_t>(out, locality); // tag locality value
|
||||
serialize<uint16_t>(out, localityCount); // run length of the locality value
|
||||
}
|
||||
}
|
||||
|
||||
// Copy encoded tag id and commit version values into the serialization buffer.
|
||||
// T: Type to be used to serialize tag ids (uint8_t/uint16_t)
|
||||
// V: Type to be used to serialize commit version deltas (uint8_t/uint16_t/uint32_t/uint64_t)
|
||||
template <typename T, typename V>
|
||||
void serializeSizedTagIdsAndSizedCommitVersions(Version minCommitVersion, uint8_t*& out) const {
|
||||
// Number of bytes that will be used to serialize an individual tag id.
|
||||
serialize<uint8_t>(out, (uint8_t)sizeof(T));
|
||||
// Number of bytes that will be used to serialize an individual commit version delta value.
|
||||
serialize<uint8_t>(out, (uint8_t)sizeof(V));
|
||||
// The lowest commit version in the version vector.
|
||||
serialize<Version>(out, minCommitVersion);
|
||||
// The number of <tagId, commitVersion> pairs.
|
||||
serialize<size_t>(out, (this->size()));
|
||||
|
||||
for (const auto& [tag, version] : versions) {
|
||||
// Serialize tag id.
|
||||
serialize<T>(out, (T)tag.id);
|
||||
|
||||
// Serialize commit version delta.
|
||||
serialize<V>(out, (V)(version - minCommitVersion));
|
||||
}
|
||||
}
|
||||
|
||||
// Figure out the type to be used to serialize delta encoded commit version values,
|
||||
// and call the above method to do the serialization.
|
||||
// T: Type to be used to serialize tag ids (uint8_t/uint16_t)
|
||||
template <typename T>
|
||||
void serializeSizedTagIdsAndCommitVersions(Version minVersion, Version maxVersion, uint8_t*& out) const {
|
||||
if ((maxVersion - minVersion) <= UINT8_MAX) {
|
||||
serializeSizedTagIdsAndSizedCommitVersions<T, uint8_t>(minVersion, out);
|
||||
} else if ((maxVersion - minVersion) <= UINT16_MAX) {
|
||||
serializeSizedTagIdsAndSizedCommitVersions<T, uint16_t>(minVersion, out);
|
||||
} else if ((maxVersion - minVersion) <= UINT32_MAX) {
|
||||
serializeSizedTagIdsAndSizedCommitVersions<T, uint32_t>(minVersion, out);
|
||||
} else {
|
||||
serializeSizedTagIdsAndSizedCommitVersions<T, uint64_t>(minVersion, out);
|
||||
}
|
||||
}
|
||||
|
||||
// Figure out the types to be used to serialize (potentially compacted) tag ids and delta
|
||||
// encoded commit version values, and call the above methods to do the serialization.
|
||||
void serializeTagIdsAndCommitVersions(uint16_t maxTagId,
|
||||
Version minVersion,
|
||||
Version maxVersion,
|
||||
uint8_t*& out) const {
|
||||
ASSERT(!this->empty());
|
||||
if (maxTagId <= UINT8_MAX) {
|
||||
serializeSizedTagIdsAndCommitVersions<uint8_t>(minVersion, maxVersion, out);
|
||||
} else {
|
||||
serializeSizedTagIdsAndCommitVersions<uint16_t>(minVersion, maxVersion, out);
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Methods to load (an encoded) version vector from the serialization buffer.
|
||||
//
|
||||
|
||||
// Extract "value" from the serialization buffer.
|
||||
template <typename T>
|
||||
void deserialize(const uint8_t*& data, T& value) const {
|
||||
memcpy(&value, data, sizeof(T));
|
||||
data += sizeof(T);
|
||||
}
|
||||
|
||||
// Deserialize RLE encoded tag locality values.
|
||||
void deserializeLocalities(const uint8_t*& data,
|
||||
size_t& utlCount,
|
||||
std::vector<int8_t>& localities,
|
||||
std::vector<uint16_t>& localityCounts) {
|
||||
// Initialization
|
||||
localities.clear();
|
||||
localityCounts.clear();
|
||||
|
||||
// Extract unique tag locality count from the buffer.
|
||||
deserialize<size_t>(data, utlCount);
|
||||
|
||||
if (utlCount == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
int8_t locality;
|
||||
uint16_t localityCount;
|
||||
localities.reserve(utlCount);
|
||||
localityCounts.reserve(utlCount);
|
||||
for (size_t i = 0; i < utlCount; i++) {
|
||||
deserialize<int8_t>(data, locality);
|
||||
localities.push_back(locality);
|
||||
|
||||
deserialize<uint16_t>(data, localityCount);
|
||||
localityCounts.push_back(localityCount);
|
||||
}
|
||||
}
|
||||
|
||||
// Deserialize tag ids and commit version values.
|
||||
// T: Type that was used to serialize tag ids (uint8_t/uint16_t)
|
||||
// V: Type that was used to serialize commit version deltas (uint8_t/uint16_t/uint32_t/uint64_t)
|
||||
template <typename T, typename V>
|
||||
void deserializeSizedTagIdsAndSizedCommitVersions(const uint8_t*& data,
|
||||
std::vector<int8_t>& localities,
|
||||
std::vector<uint16_t>& localityCounts) {
|
||||
Version minCommitVersion;
|
||||
deserialize<Version>(data, minCommitVersion);
|
||||
|
||||
size_t pairCount; // number of serialized <tag id, commit version> pairs
|
||||
deserialize<size_t>(data, pairCount);
|
||||
|
||||
T tagId;
|
||||
V versionDelta;
|
||||
for (size_t i = 0; i < localities.size(); i++) {
|
||||
for (size_t j = 0; j < localityCounts[i]; j++) {
|
||||
// Deserialize tag id.
|
||||
deserialize<T>(data, tagId);
|
||||
|
||||
// Deserialize commit version delta.
|
||||
deserialize<V>(data, versionDelta);
|
||||
|
||||
Tag tag(localities[i], tagId);
|
||||
setVersionNoCheck(tag, minCommitVersion + versionDelta);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Figrue out the type that was used to serialize commit version deltas and call the above
|
||||
// method to do the deserialization.
|
||||
// T: Type that was used to serialize tag ids (uint8_t/uint16_t)
|
||||
template <typename T>
|
||||
void deserializeSizedTagIdsAndCommitVersions(const uint8_t*& data,
|
||||
std::vector<int8_t>& localities,
|
||||
std::vector<uint16_t>& localityCounts) {
|
||||
uint8_t commitVersionDeltaSize; // number of bytes that were used to serialize an individual commit version
|
||||
// delta value
|
||||
deserialize<uint8_t>(data, commitVersionDeltaSize);
|
||||
|
||||
if (commitVersionDeltaSize == sizeof(uint8_t)) {
|
||||
deserializeSizedTagIdsAndSizedCommitVersions<T, uint8_t>(data, localities, localityCounts);
|
||||
} else if (commitVersionDeltaSize == sizeof(uint16_t)) {
|
||||
deserializeSizedTagIdsAndSizedCommitVersions<T, uint16_t>(data, localities, localityCounts);
|
||||
} else if (commitVersionDeltaSize == sizeof(uint32_t)) {
|
||||
deserializeSizedTagIdsAndSizedCommitVersions<T, uint32_t>(data, localities, localityCounts);
|
||||
} else {
|
||||
ASSERT(commitVersionDeltaSize == sizeof(uint64_t));
|
||||
deserializeSizedTagIdsAndSizedCommitVersions<T, uint64_t>(data, localities, localityCounts);
|
||||
}
|
||||
}
|
||||
|
||||
// Figure out the types that were used to serialize tag ids and commit version deltas, and
|
||||
// call the above methods to do the deserialization.
|
||||
void deserializeTagIdsAndCommitVersions(const uint8_t*& data,
|
||||
std::vector<int8_t>& localities,
|
||||
std::vector<uint16_t>& localityCounts) {
|
||||
uint8_t tagIdSize; // number of bytes that were used to serialize an individual tag id
|
||||
deserialize<uint8_t>(data, tagIdSize);
|
||||
|
||||
if (tagIdSize == sizeof(uint8_t)) {
|
||||
deserializeSizedTagIdsAndCommitVersions<uint8_t>(data, localities, localityCounts);
|
||||
} else {
|
||||
ASSERT(tagIdSize == sizeof(uint16_t));
|
||||
deserializeSizedTagIdsAndCommitVersions<uint16_t>(data, localities, localityCounts);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// @note Enabling/Disabling version vector encoding during serialization (and
|
||||
// de-serialization):
|
||||
// - To enable version vector encoding during serialization/de-serialization:
|
||||
// derive "struct dynamic_size_traits<VersionVector>" from "std::true_type" and
|
||||
// derive "struct serializable_traits<VersionVector>" from "std::false_type".
|
||||
//
|
||||
// - To disable version vector encoding during serialization/de-serialization::
|
||||
// derive "struct dynamic_size_traits<VersionVector>" from "std::false_type" and
|
||||
// derive "struct serializable_traits<VersionVector>" from "std::true_type".
|
||||
template <>
|
||||
struct serializable_traits<VersionVector> : std::false_type {
|
||||
template <class Archiver>
|
||||
static void serialize(Archiver& ar, VersionVector& vv) {
|
||||
serializer(ar, vv.versions, vv.maxVersion);
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
struct dynamic_size_traits<VersionVector> : std::true_type {
|
||||
template <class Context>
|
||||
static size_t size(const VersionVector& vv, Context&) {
|
||||
size_t encodedSize;
|
||||
if (vv.isEncodedSizeCached()) {
|
||||
encodedSize = vv.getCachedEncodedSize();
|
||||
// @todo remove this assert before doing performance tests
|
||||
ASSERT(encodedSize == vv.getEncodedSize());
|
||||
} else {
|
||||
encodedSize = vv.getEncodedSize();
|
||||
const_cast<VersionVector&>(vv).setCachedEncodedSize(encodedSize);
|
||||
}
|
||||
return encodedSize;
|
||||
}
|
||||
|
||||
template <class Context>
|
||||
static void save(uint8_t* out, const VersionVector& vv, Context&) {
|
||||
auto* begin = out;
|
||||
|
||||
size_t utlCount; // unique tag locality count
|
||||
uint16_t maxTagId; // the highest tag id in the version vector
|
||||
Version minCommitVersion; // the lowest commit version in the version vector (in "VersionVector::versions")
|
||||
Version maxCommitVersion; // the highest commit version in the version vector (in "VersionVector::versions")
|
||||
vv.getTagAndCommitVersionInfo(utlCount, maxTagId, minCommitVersion, maxCommitVersion);
|
||||
|
||||
vv.serializeTagLocalities(utlCount, out);
|
||||
if (!vv.empty()) {
|
||||
vv.serializeTagIdsAndCommitVersions(maxTagId, minCommitVersion, maxCommitVersion, out);
|
||||
}
|
||||
|
||||
// Serialize vv::maxVersion.
|
||||
vv.serialize<Version>(out, (vv.getMaxVersion()));
|
||||
|
||||
// @todo remove this assert before doing performance tests
|
||||
ASSERT(out - begin == vv.getEncodedSize());
|
||||
}
|
||||
|
||||
template <class Context>
|
||||
static void load(const uint8_t* data, size_t size, VersionVector& vv, Context& context) {
|
||||
auto* p = data;
|
||||
|
||||
size_t utlCount;
|
||||
std::vector<int8_t> localities;
|
||||
std::vector<uint16_t> localityCounts;
|
||||
vv.deserializeLocalities(data, utlCount, localities, localityCounts);
|
||||
if (utlCount > 0) {
|
||||
vv.deserializeTagIdsAndCommitVersions(data, localities, localityCounts);
|
||||
}
|
||||
|
||||
// Deserialize VersionVector::maxVersion.
|
||||
Version maxVersion;
|
||||
vv.deserialize<Version>(data, maxVersion);
|
||||
vv.setMaxVersion(maxVersion);
|
||||
|
||||
ASSERT(data - p == size);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -37,6 +37,7 @@ void forceLinkSimExternalConnectionTests();
|
||||
void forceLinkMutationLogReaderTests();
|
||||
void forceLinkSimEncryptVaultProxyTests();
|
||||
void forceLinkIThreadPoolTests();
|
||||
void forceLinkVersionVectorTests();
|
||||
|
||||
struct UnitTestWorkload : TestWorkload {
|
||||
bool enabled;
|
||||
@ -84,6 +85,7 @@ struct UnitTestWorkload : TestWorkload {
|
||||
forceLinkMutationLogReaderTests();
|
||||
forceLinkSimEncryptVaultProxyTests();
|
||||
forceLinkIThreadPoolTests();
|
||||
forceLinkVersionVectorTests();
|
||||
}
|
||||
|
||||
std::string description() const override { return "UnitTests"; }
|
||||
|
Loading…
x
Reference in New Issue
Block a user