mirror of
https://github.com/apple/foundationdb.git
synced 2025-06-03 03:41:53 +08:00
Merge pull request #3903 from jzhou77/refactor
Add more comments and refactor for SkipList
This commit is contained in:
commit
a145601a37
@ -56,6 +56,7 @@ private:
|
||||
std::vector<std::pair<StringRef, StringRef>> combinedWriteConflictRanges;
|
||||
std::vector<struct ReadConflictRange> combinedReadConflictRanges;
|
||||
bool* transactionConflictStatus;
|
||||
// Stores the map: a transaction -> conflicted transactions' indices
|
||||
std::map<int, VectorRef<int>>* conflictingKeyRangeMap;
|
||||
Arena* resolveBatchReplyArena;
|
||||
|
||||
|
@ -20,6 +20,7 @@
|
||||
|
||||
#include "flow/ActorCollection.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/ConflictSet.h"
|
||||
#include "fdbserver/ResolverInterface.h"
|
||||
#include "fdbserver/MasterInterface.h"
|
||||
#include "fdbserver/WorkerInterface.actor.h"
|
||||
@ -27,9 +28,9 @@
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/ServerDBInfo.h"
|
||||
#include "fdbserver/Orderer.actor.h"
|
||||
#include "fdbserver/ConflictSet.h"
|
||||
#include "fdbserver/StorageMetrics.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
namespace {
|
||||
|
@ -32,7 +32,7 @@
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/KeyRangeMap.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/ConflictSet.h"
|
||||
|
||||
using std::max;
|
||||
using std::min;
|
||||
@ -46,16 +46,9 @@ static inline int skfastrand() {
|
||||
return g_seed;
|
||||
}
|
||||
|
||||
void setAffinity(int proc);
|
||||
|
||||
PerfDoubleCounter g_buildTest("Build", skc), g_add("Add", skc), g_add_sort("A.Sort", skc),
|
||||
g_detectConflicts("Detect", skc), g_sort("D.Sort", skc), g_combine("D.Combine", skc),
|
||||
g_checkRead("D.CheckRead", skc), g_checkBatch("D.CheckIntraBatch", skc), g_merge("D.MergeWrite", skc),
|
||||
g_merge_launch("D.Merge.Launch", skc), g_merge_fork("D.Merge.Fork", skc),
|
||||
g_merge_start_var("D.Merge.StartVariance", skc), g_merge_end_var("D.Merge.EndVariance", skc),
|
||||
g_merge_run_var("D.Merge.RunVariance", skc), g_merge_run_shortest("D.Merge.ShortestRun", skc),
|
||||
g_merge_run_longest("D.Merge.LongestRun", skc), g_merge_run_total("D.Merge.TotalRun", skc),
|
||||
g_merge_join("D.Merge.Join", skc), g_removeBefore("D.RemoveBefore", skc);
|
||||
PerfDoubleCounter g_buildTest("Build", skc), g_add("Add", skc), g_detectConflicts("Detect", skc), g_sort("D.Sort", skc),
|
||||
g_combine("D.Combine", skc), g_checkRead("D.CheckRead", skc), g_checkBatch("D.CheckIntraBatch", skc),
|
||||
g_merge("D.MergeWrite", skc), g_removeBefore("D.RemoveBefore", skc);
|
||||
|
||||
static force_inline int compare(const StringRef& a, const StringRef& b) {
|
||||
int c = memcmp(a.begin(), b.begin(), min(a.size(), b.size()));
|
||||
@ -88,7 +81,7 @@ struct KeyInfo {
|
||||
bool write;
|
||||
int transaction;
|
||||
|
||||
KeyInfo(){};
|
||||
KeyInfo() = default;
|
||||
KeyInfo(StringRef key, bool begin, bool write, int transaction, int* pIndex)
|
||||
: key(key), begin(begin), write(write), transaction(transaction), pIndex(pIndex) {}
|
||||
};
|
||||
@ -229,17 +222,25 @@ private:
|
||||
return level;
|
||||
}
|
||||
|
||||
// Represent a node in the SkipList. The node has multiple (i.e., level) pointers to
|
||||
// other nodes, and keeps a record of the max versions for each level.
|
||||
struct Node {
|
||||
int level() { return nPointers - 1; }
|
||||
uint8_t* value() { return end() + nPointers * (sizeof(Node*) + sizeof(Version)); }
|
||||
int length() { return valueLength; }
|
||||
Node* getNext(int i) { return *((Node**)end() + i); }
|
||||
void setNext(int i, Node* n) { *((Node**)end() + i) = n; }
|
||||
|
||||
// Returns the next node pointer at the given level.
|
||||
Node* getNext(int level) { return *((Node**)end() + level); }
|
||||
// Sets the next node pointer at the given level.
|
||||
void setNext(int level, Node* n) { *((Node**)end() + level) = n; }
|
||||
|
||||
// Returns the max version at the given level.
|
||||
Version getMaxVersion(int i) { return ((Version*)(end() + nPointers * sizeof(Node*)))[i]; }
|
||||
// Sets the max version at the given level.
|
||||
void setMaxVersion(int i, Version v) { ((Version*)(end() + nPointers * sizeof(Node*)))[i] = v; }
|
||||
|
||||
// Return a node with initialized value but uninitialized pointers
|
||||
// Memory layout: *this, (level+1) Node*, (level+1) Version, value
|
||||
static Node* create(const StringRef& value, int level) {
|
||||
int nodeSize = sizeof(Node) + value.size() + (level + 1) * (sizeof(Node*) + sizeof(Version));
|
||||
|
||||
@ -289,6 +290,7 @@ private:
|
||||
|
||||
private:
|
||||
int getNodeSize() { return sizeof(Node) + valueLength + nPointers * (sizeof(Node*) + sizeof(Version)); }
|
||||
// Returns the first Node* pointer
|
||||
uint8_t* end() { return (uint8_t*)(this + 1); }
|
||||
int nPointers, valueLength;
|
||||
};
|
||||
@ -311,16 +313,19 @@ private:
|
||||
}
|
||||
|
||||
public:
|
||||
// Points the location (i.e., Node*) that value would appear in the SkipList.
|
||||
// If the "value" is in the list, then finger[0] points to that exact node;
|
||||
// otherwise, the finger points to Nodes that the value should be inserted before.
|
||||
// Note the SkipList organizes all nodes at level 0, higher levels contain jump pointers.
|
||||
struct Finger {
|
||||
Node* finger[MaxLevels]; // valid for levels >= level
|
||||
int level;
|
||||
Node* x;
|
||||
Node* alreadyChecked;
|
||||
int level = MaxLevels;
|
||||
Node* x = nullptr;
|
||||
Node* alreadyChecked = nullptr;
|
||||
StringRef value;
|
||||
|
||||
Finger() : level(MaxLevels), x(nullptr), alreadyChecked(nullptr) {}
|
||||
|
||||
Finger(Node* header, const StringRef& ptr) : value(ptr), level(MaxLevels), alreadyChecked(nullptr), x(header) {}
|
||||
Finger() = default;
|
||||
Finger(Node* header, const StringRef& ptr) : value(ptr), x(header) {}
|
||||
|
||||
void init(const StringRef& value, Node* header) {
|
||||
this->value = value;
|
||||
@ -337,6 +342,8 @@ public:
|
||||
}
|
||||
|
||||
// pre: !finished()
|
||||
// Advances the pointer at the current level to a Node that's >= finger's value
|
||||
// if possible; or move to the next level (i.e., level--).
|
||||
// Returns true if we have advanced to the next level
|
||||
force_inline bool advance() {
|
||||
Node* next = x->getNext(level - 1);
|
||||
@ -360,6 +367,7 @@ public:
|
||||
|
||||
force_inline bool finished() { return level == 0; }
|
||||
|
||||
// Returns if the finger value is found in the SkipList.
|
||||
force_inline Node* found() const {
|
||||
// valid after finished returns true
|
||||
Node* n = finger[0]->getNext(0); // or alreadyChecked, but that is more easily invalidated
|
||||
@ -375,7 +383,8 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
int count() {
|
||||
// Returns the total number of nodes in the list.
|
||||
int count() const {
|
||||
int count = 0;
|
||||
Node* x = header->getNext(0);
|
||||
while (x) {
|
||||
@ -453,6 +462,7 @@ public:
|
||||
// partitions. In between, operations on each partition must not touch any keys outside
|
||||
// the partition. Specifically, the partition to the left of 'key' must not have a range
|
||||
// [...,key) inserted, since that would insert an entry at 'key'.
|
||||
// Note this function is not used.
|
||||
void partition(StringRef* begin, int splitCount, SkipList* output) {
|
||||
for (int i = splitCount - 1; i >= 0; i--) {
|
||||
Finger f(header, begin[i]);
|
||||
@ -462,6 +472,8 @@ public:
|
||||
swap(output[0]);
|
||||
}
|
||||
|
||||
// Concatenates multiple SkipList objects into one and stores in input[0].
|
||||
// Note this function is not used.
|
||||
void concatenate(SkipList* input, int count) {
|
||||
std::vector<Finger> ends(count - 1);
|
||||
for (int i = 0; i < ends.size(); i++) input[i].getEnd(ends[i]);
|
||||
@ -692,6 +704,7 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
// Splits the SkipLists so that those after finger is moved to "right".
|
||||
void split(const Finger& f, SkipList& right) {
|
||||
ASSERT(!right.header->getNext(0)); // right must be empty
|
||||
right.header->setMaxVersion(0, f.finger[0]->getMaxVersion(0));
|
||||
@ -701,6 +714,7 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
// Sets end's finger to the last nodes at all levels.
|
||||
void getEnd(Finger& end) {
|
||||
Node* node = header;
|
||||
for (int l = MaxLevels - 1; l >= 0; l--) {
|
||||
@ -712,21 +726,6 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
StringRef setK(Arena& arena, int i) {
|
||||
char t[sizeof(i)];
|
||||
*(int*)t = i;
|
||||
|
||||
const int keySize = 16;
|
||||
|
||||
char* ss = new (arena) char[keySize];
|
||||
for (int c = 0; c < keySize - sizeof(i); c++) ss[c] = '.';
|
||||
for (int c = 0; c < sizeof(i); c++) ss[c + keySize - sizeof(i)] = t[sizeof(i) - 1 - c];
|
||||
|
||||
return StringRef((const uint8_t*)ss, keySize);
|
||||
}
|
||||
|
||||
#include "fdbserver/ConflictSet.h"
|
||||
|
||||
struct ConflictSet {
|
||||
ConflictSet() : oldestVersion(0), removalKey(makeString(0)) {}
|
||||
~ConflictSet() {}
|
||||
@ -761,7 +760,7 @@ struct TransactionInfo {
|
||||
};
|
||||
|
||||
void ConflictBatch::addTransaction(const CommitTransactionRef& tr) {
|
||||
int t = transactionCount++;
|
||||
const int t = transactionCount++;
|
||||
|
||||
Arena& arena = transactionInfo.arena();
|
||||
TransactionInfo* info = new (arena) TransactionInfo;
|
||||
@ -774,7 +773,6 @@ void ConflictBatch::addTransaction(const CommitTransactionRef& tr) {
|
||||
info->readRanges.resize(arena, tr.read_conflict_ranges.size());
|
||||
info->writeRanges.resize(arena, tr.write_conflict_ranges.size());
|
||||
|
||||
std::vector<KeyInfo>& points = this->points;
|
||||
for (int r = 0; r < tr.read_conflict_ranges.size(); r++) {
|
||||
const KeyRangeRef& range = tr.read_conflict_ranges[r];
|
||||
points.emplace_back(range.begin, true, false, t, &info->readRanges[r].first);
|
||||
@ -791,7 +789,7 @@ void ConflictBatch::addTransaction(const CommitTransactionRef& tr) {
|
||||
}
|
||||
}
|
||||
|
||||
this->transactionInfo.push_back(arena, info);
|
||||
transactionInfo.push_back(arena, info);
|
||||
}
|
||||
|
||||
// SOMEDAY: This should probably be replaced with a roaring bitmap.
|
||||
@ -891,7 +889,7 @@ void ConflictBatch::detectConflicts(Version now, Version newOldestVersion, std::
|
||||
}
|
||||
|
||||
void ConflictBatch::checkReadConflictRanges() {
|
||||
if (!combinedReadConflictRanges.size()) return;
|
||||
if (combinedReadConflictRanges.empty()) return;
|
||||
|
||||
cs->versionHistory.detectConflicts(&combinedReadConflictRanges[0], combinedReadConflictRanges.size(),
|
||||
transactionConflictStatus);
|
||||
@ -899,35 +897,34 @@ void ConflictBatch::checkReadConflictRanges() {
|
||||
|
||||
void ConflictBatch::addConflictRanges(Version now, std::vector<std::pair<StringRef, StringRef>>::iterator begin,
|
||||
std::vector<std::pair<StringRef, StringRef>>::iterator end, SkipList* part) {
|
||||
int count = end - begin;
|
||||
static_assert(sizeof(begin[0]) == sizeof(StringRef) * 2,
|
||||
const int count = end - begin;
|
||||
static_assert(sizeof(*begin) == sizeof(StringRef) * 2,
|
||||
"Write Conflict Range type not convertible to two StringPtrs");
|
||||
const StringRef* strings = reinterpret_cast<const StringRef*>(&*begin);
|
||||
int stringCount = count * 2;
|
||||
const int stringCount = count * 2;
|
||||
|
||||
static const int stripeSize = 16;
|
||||
const int stripeSize = 16;
|
||||
SkipList::Finger fingers[stripeSize];
|
||||
int temp[stripeSize];
|
||||
int stripes = (stringCount + stripeSize - 1) / stripeSize;
|
||||
|
||||
int ss = stringCount - (stripes - 1) * stripeSize;
|
||||
for (int s = stripes - 1; s >= 0; s--) {
|
||||
part->find(&strings[s * stripeSize], &fingers[0], temp, ss);
|
||||
part->addConflictRanges(&fingers[0], ss / 2, now);
|
||||
part->find(&strings[s * stripeSize], fingers, temp, ss);
|
||||
part->addConflictRanges(fingers, ss / 2, now);
|
||||
ss = stripeSize;
|
||||
}
|
||||
}
|
||||
|
||||
void ConflictBatch::mergeWriteConflictRanges(Version now) {
|
||||
if (!combinedWriteConflictRanges.size()) return;
|
||||
if (combinedWriteConflictRanges.empty()) return;
|
||||
|
||||
addConflictRanges(now, combinedWriteConflictRanges.begin(), combinedWriteConflictRanges.end(), &cs->versionHistory);
|
||||
}
|
||||
|
||||
void ConflictBatch::combineWriteConflictRanges() {
|
||||
int activeWriteCount = 0;
|
||||
for (int i = 0; i < points.size(); i++) {
|
||||
KeyInfo& point = points[i];
|
||||
for (const KeyInfo& point : points) {
|
||||
if (point.write && !transactionConflictStatus[point.transaction]) {
|
||||
if (point.begin) {
|
||||
activeWriteCount++;
|
||||
@ -940,6 +937,20 @@ void ConflictBatch::combineWriteConflictRanges() {
|
||||
}
|
||||
}
|
||||
|
||||
namespace {
|
||||
StringRef setK(Arena& arena, int i) {
|
||||
char t[sizeof(i)];
|
||||
*(int*)t = i;
|
||||
|
||||
const int keySize = 16;
|
||||
|
||||
char* ss = new (arena) char[keySize];
|
||||
for (int c = 0; c < keySize - sizeof(i); c++) ss[c] = '.';
|
||||
for (int c = 0; c < sizeof(i); c++) ss[c + keySize - sizeof(i)] = t[sizeof(i) - 1 - c];
|
||||
|
||||
return StringRef((const uint8_t*)ss, keySize);
|
||||
}
|
||||
|
||||
void miniConflictSetTest() {
|
||||
for (int i = 0; i < 2000000; i++) {
|
||||
int size = 64 * 5; // Also run 64*64*5 to test multiple words of andValues and orValues
|
||||
@ -991,6 +1002,7 @@ void operatorLessThanTest() {
|
||||
ASSERT(!(a == b));
|
||||
}
|
||||
}
|
||||
} // namespace
|
||||
|
||||
void skipListTest() {
|
||||
printf("Skip list test\n");
|
||||
@ -1007,21 +1019,22 @@ void skipListTest() {
|
||||
|
||||
Arena testDataArena;
|
||||
VectorRef<VectorRef<KeyRangeRef>> testData;
|
||||
testData.resize(testDataArena, 500);
|
||||
std::vector<std::vector<uint8_t>> success(testData.size());
|
||||
std::vector<std::vector<uint8_t>> success2(testData.size());
|
||||
for (int i = 0; i < testData.size(); i++) {
|
||||
testData[i].resize(testDataArena, 5000);
|
||||
success[i].assign(testData[i].size(), false);
|
||||
success2[i].assign(testData[i].size(), false);
|
||||
for (int j = 0; j < testData[i].size(); j++) {
|
||||
const int batches = 500; // deterministicRandom()->randomInt(500, 5000);
|
||||
const int data_per_batch = 5000;
|
||||
testData.resize(testDataArena, batches);
|
||||
std::vector<std::vector<uint8_t>> success(batches);
|
||||
std::vector<std::vector<uint8_t>> success2(batches);
|
||||
for (int i = 0; i < batches; i++) {
|
||||
testData[i].resize(testDataArena, data_per_batch);
|
||||
success[i].assign(data_per_batch, false);
|
||||
success2[i].assign(data_per_batch, false);
|
||||
for (int j = 0; j < data_per_batch; j++) {
|
||||
int key = deterministicRandom()->randomInt(0, 20000000);
|
||||
int key2 = key + 1 + deterministicRandom()->randomInt(0, 10);
|
||||
testData[i][j] = KeyRangeRef(setK(testDataArena, key), setK(testDataArena, key2));
|
||||
}
|
||||
}
|
||||
printf("Test data generated (%d)\n", deterministicRandom()->randomInt(0, 100000));
|
||||
printf(" %d batches, %d/batch\n", testData.size(), testData[0].size());
|
||||
printf("Test data generated: %d batches, %d/batch\n", batches, data_per_batch);
|
||||
|
||||
printf("Running\n");
|
||||
|
||||
@ -1029,23 +1042,24 @@ void skipListTest() {
|
||||
int cranges = 0, tcount = 0;
|
||||
|
||||
start = timer();
|
||||
std::vector<std::vector<int>> nonConflict(testData.size());
|
||||
for (int i = 0; i < testData.size(); i++) {
|
||||
std::vector<std::vector<int>> nonConflict(batches);
|
||||
Version version = 0;
|
||||
for (const auto& data : testData) {
|
||||
Arena buf;
|
||||
std::vector<CommitTransactionRef> trs;
|
||||
double t = timer();
|
||||
for (int j = 0; j + readCount + writeCount <= testData[i].size(); j += readCount + writeCount) {
|
||||
for (int j = 0; j + readCount + writeCount <= data.size(); j += readCount + writeCount) {
|
||||
CommitTransactionRef tr;
|
||||
for (int k = 0; k < readCount; k++) {
|
||||
KeyRangeRef r(buf, testData[i][j + k]);
|
||||
KeyRangeRef r(buf, data[j + k]);
|
||||
tr.read_conflict_ranges.push_back(buf, r);
|
||||
}
|
||||
for (int k = 0; k < writeCount; k++) {
|
||||
KeyRangeRef r(buf, testData[i][j + readCount + k]);
|
||||
KeyRangeRef r(buf, data[j + readCount + k]);
|
||||
tr.write_conflict_ranges.push_back(buf, r);
|
||||
}
|
||||
cranges += tr.read_conflict_ranges.size() + tr.write_conflict_ranges.size();
|
||||
tr.read_snapshot = i;
|
||||
tr.read_snapshot = version;
|
||||
trs.push_back(tr);
|
||||
}
|
||||
tcount += trs.size();
|
||||
@ -1053,12 +1067,16 @@ void skipListTest() {
|
||||
|
||||
t = timer();
|
||||
ConflictBatch batch(cs);
|
||||
for (int j = 0; j < trs.size(); j++) batch.addTransaction(trs[j]);
|
||||
for (const auto& tr : trs) {
|
||||
batch.addTransaction(tr);
|
||||
}
|
||||
g_add += timer() - t;
|
||||
|
||||
t = timer();
|
||||
batch.detectConflicts(i + 50, i, nonConflict[i]);
|
||||
batch.detectConflicts(version + 50, version, nonConflict[version]);
|
||||
g_detectConflicts += timer() - t;
|
||||
|
||||
version++;
|
||||
}
|
||||
double elapsed = timer() - start;
|
||||
printf("New conflict set: %0.3f sec\n", elapsed);
|
||||
@ -1076,8 +1094,8 @@ void skipListTest() {
|
||||
printf(" %0.3f Mkeys/sec\n", cranges * 2 / elapsed / 1e6);
|
||||
|
||||
printf("Performance counters:\n");
|
||||
for (int c = 0; c < skc.size(); c++) {
|
||||
printf("%20s: %s\n", skc[c]->getMetric().name().c_str(), skc[c]->getMetric().formatted().c_str());
|
||||
for (const auto& counter : skc) {
|
||||
printf("%20s: %s\n", counter->getMetric().name().c_str(), counter->getMetric().formatted().c_str());
|
||||
}
|
||||
|
||||
printf("%d entries in version history\n", cs->versionHistory.count());
|
||||
|
Loading…
x
Reference in New Issue
Block a user