/* * SkipList.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #include #include #include #include "flow/Platform.h" #include "fdbrpc/fdbrpc.h" #include "fdbrpc/PerfMetric.h" #include "fdbclient/FDBTypes.h" #include "fdbclient/KeyRangeMap.h" #include "fdbclient/SystemData.h" #include "fdbserver/Knobs.h" using std::max; using std::min; static std::vector skc; static thread_local uint32_t g_seed = 0; static inline int skfastrand() { g_seed = g_seed * 1664525L + 1013904223L; return g_seed; } void setAffinity(int proc); class SlowConflictSet { public: bool is_conflict(const VectorRef& readRanges, Version read_snapshot); void add(const VectorRef& clearRanges, const VectorRef& setValues, Version now); void clear(Version now); private: KeyRangeMap age; }; bool SlowConflictSet::is_conflict(const VectorRef& readRanges, Version read_snapshot) { for (auto range = readRanges.begin(); range != readRanges.end(); ++range) { auto intersecting = age.intersectingRanges(*range); for (auto it = intersecting.begin(); it != intersecting.end(); ++it) if (it.value() > read_snapshot) return true; } return false; } void SlowConflictSet::clear(Version now) { age.insert(allKeys, now); } void SlowConflictSet::add(const VectorRef& clearRanges, const VectorRef& setValues, Version now) { for (auto c = clearRanges.begin(); c != clearRanges.end(); ++c) age.insert(*c, now); for (auto s = setValues.begin(); s != setValues.end(); ++s) age.insert(s->key, now); } PerfDoubleCounter g_buildTest("Build", skc), g_add("Add", skc), g_add_sort("A.Sort", skc), g_detectConflicts("Detect", skc), g_sort("D.Sort", skc), g_combine("D.Combine", skc), g_checkRead("D.CheckRead", skc), g_checkBatch("D.CheckIntraBatch", skc), g_merge("D.MergeWrite", skc), g_merge_launch("D.Merge.Launch", skc), g_merge_fork("D.Merge.Fork", skc), g_merge_start_var("D.Merge.StartVariance", skc), g_merge_end_var("D.Merge.EndVariance", skc), g_merge_run_var("D.Merge.RunVariance", skc), g_merge_run_shortest("D.Merge.ShortestRun", skc), g_merge_run_longest("D.Merge.LongestRun", skc), g_merge_run_total("D.Merge.TotalRun", skc), g_merge_join("D.Merge.Join", skc), g_removeBefore("D.RemoveBefore", skc); static force_inline int compare(const StringRef& a, const StringRef& b) { int c = memcmp(a.begin(), b.begin(), min(a.size(), b.size())); if (c < 0) return -1; if (c > 0) return +1; if (a.size() < b.size()) return -1; if (a.size() == b.size()) return 0; return +1; } struct ReadConflictRange { StringRef begin, end; Version version; int transaction; ReadConflictRange(StringRef begin, StringRef end, Version version, int transaction) : begin(begin), end(end), version(version), transaction(transaction) {} bool operator<(const ReadConflictRange& rhs) const { return compare(begin, rhs.begin) < 0; } }; struct KeyInfo { StringRef key; int* pIndex; bool nextKey; bool begin; bool write; int transaction; KeyInfo(){}; KeyInfo(StringRef key, bool nextKey, bool begin, bool write, int transaction, int* pIndex) : key(key), nextKey(nextKey), begin(begin), write(write), transaction(transaction), pIndex(pIndex) {} }; // returns true if done with string force_inline bool getCharacter(const KeyInfo& ki, int character, int& outputCharacter) { // normal case if (character < ki.key.size()) { outputCharacter = 5 + ki.key.begin()[character]; return false; } // nextKey append a zero if (ki.nextKey && character >= ki.key.size()) { if (character == ki.key.size()) { outputCharacter = 5; // extra '0' character return false; } character--; } // termination if (character == ki.key.size()) { outputCharacter = 0; return false; } if (character == ki.key.size() + 1) { // end/begin+read/write relative sorting outputCharacter = ki.begin * 2 + (ki.write ^ ki.begin); return false; } outputCharacter = 0; return true; } bool operator<(const KeyInfo& lhs, const KeyInfo& rhs) { int i = min(lhs.key.size(), rhs.key.size()); int c = memcmp(lhs.key.begin(), rhs.key.begin(), i); if (c != 0) return c < 0; // SOMEDAY: This is probably not very fast. Slows D.Sort by ~20% relative to previous (incorrect) version. bool lDone, rDone; int lc, rc; while (true) { lDone = getCharacter(lhs, i, lc); rDone = getCharacter(rhs, i, rc); if (lDone && rDone) return false; // equality if (lc < rc) return true; if (lc > rc) return false; i++; } } bool operator==(const KeyInfo& lhs, const KeyInfo& rhs) { return !(lhs < rhs || rhs < lhs); } void swapSort(std::vector& points, int a, int b) { if (points[b] < points[a]) { KeyInfo temp; temp = points[a]; points[a] = points[b]; points[b] = temp; } } void smallSort(std::vector& points, int start, int N) { for (int i = 1; i < N; i++) for (int j = i; j > 0; j -= 2) swapSort(points, start + j - 1, start + j); for (int i = N - 2; i > 0; i--) for (int j = i; j > 0; j -= 2) swapSort(points, start + j - 1, start + j); } struct SortTask { int begin; int size; int character; SortTask(int begin, int size, int character) : begin(begin), size(size), character(character) {} }; void sortPoints(std::vector& points) { std::vector tasks; std::vector newPoints; std::vector counts; tasks.emplace_back(0, points.size(), 0); while (tasks.size()) { SortTask st = tasks.back(); tasks.pop_back(); if (st.size < 10) { // smallSort(points, st.begin, st.size); std::sort(points.begin() + st.begin, points.begin() + st.begin + st.size); continue; } newPoints.resize(st.size); counts.assign(256 + 5, 0); // get counts int c; bool allDone = true; for (int i = st.begin; i < st.begin + st.size; i++) { allDone &= getCharacter(points[i], st.character, c); counts[c]++; } if (allDone) continue; // calculate offsets from counts and build next level of tasks int total = 0; for (int i = 0; i < counts.size(); i++) { int temp = counts[i]; if (temp > 1) tasks.emplace_back(st.begin + total, temp, st.character + 1); counts[i] = total; total += temp; } // put in their places for (int i = st.begin; i < st.begin + st.size; i++) { getCharacter(points[i], st.character, c); newPoints[counts[c]++] = points[i]; } // copy back into original points array for (int i = 0; i < st.size; i++) points[st.begin + i] = newPoints[i]; } // cout << endl << "Radix sort done" << endl; } class SkipList : NonCopyable { private: static const int MaxLevels = 26; int randomLevel() { uint32_t i = uint32_t(skfastrand()) >> (32 - (MaxLevels - 1)); int level = 0; while (i & 1) { i >>= 1; level++; } ASSERT(level < MaxLevels); return level; } struct Node { int level() { return nPointers - 1; } uint8_t* value() { return end() + nPointers * (sizeof(Node*) + sizeof(Version)); } int length() { return valueLength; } Node* getNext(int i) { return *((Node**)end() + i); } void setNext(int i, Node* n) { *((Node**)end() + i) = n; } Version getMaxVersion(int i) { return ((Version*)(end() + nPointers * sizeof(Node*)))[i]; } void setMaxVersion(int i, Version v) { ((Version*)(end() + nPointers * sizeof(Node*)))[i] = v; } // Return a node with initialized value but uninitialized pointers static Node* create(const StringRef& value, int level) { int nodeSize = sizeof(Node) + value.size() + (level + 1) * (sizeof(Node*) + sizeof(Version)); Node* n; if (nodeSize <= 64) { n = (Node*)FastAllocator<64>::allocate(); INSTRUMENT_ALLOCATE("SkipListNode64"); } else if (nodeSize <= 128) { n = (Node*)FastAllocator<128>::allocate(); INSTRUMENT_ALLOCATE("SkipListNode128"); } else { n = (Node*)new char[nodeSize]; INSTRUMENT_ALLOCATE("SkipListNodeLarge"); } n->nPointers = level + 1; n->valueLength = value.size(); if (value.size() > 0) { memcpy(n->value(), value.begin(), value.size()); } return n; } // pre: level>0, all lower level nodes between this and getNext(level) have correct maxversions void calcVersionForLevel(int level) { Node* end = getNext(level); Version v = getMaxVersion(level - 1); for (Node* x = getNext(level - 1); x != end; x = x->getNext(level - 1)) v = max(v, x->getMaxVersion(level - 1)); setMaxVersion(level, v); } void destroy() { int nodeSize = getNodeSize(); if (nodeSize <= 64) { FastAllocator<64>::release(this); INSTRUMENT_RELEASE("SkipListNode64"); } else if (nodeSize <= 128) { FastAllocator<128>::release(this); INSTRUMENT_RELEASE("SkipListNode128"); } else { delete[](char*) this; INSTRUMENT_RELEASE("SkipListNodeLarge"); } } private: int getNodeSize() { return sizeof(Node) + valueLength + nPointers * (sizeof(Node*) + sizeof(Version)); } uint8_t* end() { return (uint8_t*)(this + 1); } int nPointers, valueLength; }; static force_inline bool less(const uint8_t* a, int aLen, const uint8_t* b, int bLen) { int len = min(aLen, bLen); for (int i = 0; i < len; i++) if (a[i] < b[i]) return true; else if (a[i] > b[i]) return false; return aLen < bLen; } Node* header; void destroy() { Node *next, *x; for (x = header; x; x = next) { next = x->getNext(0); x->destroy(); } } public: struct Finger { Node* finger[MaxLevels]; // valid for levels >= level int level; Node* x; Node* alreadyChecked; StringRef value; Finger() : level(MaxLevels), x(NULL), alreadyChecked(NULL) {} Finger(Node* header, const StringRef& ptr) : value(ptr), level(MaxLevels), alreadyChecked(NULL), x(header) {} void init(const StringRef& value, Node* header) { this->value = value; x = header; alreadyChecked = NULL; level = MaxLevels; } // pre: !finished() force_inline void prefetch() { Node* next = x->getNext(level - 1); _mm_prefetch((const char*)next, _MM_HINT_T0); _mm_prefetch((const char*)next + 64, _MM_HINT_T0); } // pre: !finished() // Returns true if we have advanced to the next level force_inline bool advance() { Node* next = x->getNext(level - 1); if (next == alreadyChecked || !less(next->value(), next->length(), value.begin(), value.size())) { alreadyChecked = next; level--; finger[level] = x; return true; } else { x = next; return false; } } // pre: !finished() force_inline void nextLevel() { while (!advance()) ; } force_inline bool finished() { return level == 0; } force_inline Node* found() const { // valid after finished returns true Node* n = finger[0]->getNext(0); // or alreadyChecked, but that is more easily invalidated if (n && n->length() == value.size() && !memcmp(n->value(), value.begin(), value.size())) return n; else return NULL; } StringRef getValue() const { Node* n = finger[0]->getNext(0); return n ? StringRef(n->value(), n->length()) : StringRef(); } }; int count() { int count = 0; Node* x = header->getNext(0); while (x) { x = x->getNext(0); count++; } return count; } explicit SkipList(Version version = 0) { header = Node::create(StringRef(), MaxLevels - 1); for (int l = 0; l < MaxLevels; l++) { header->setNext(l, NULL); header->setMaxVersion(l, version); } } ~SkipList() { destroy(); } SkipList(SkipList&& other) BOOST_NOEXCEPT : header(other.header) { other.header = NULL; } void operator=(SkipList&& other) BOOST_NOEXCEPT { destroy(); header = other.header; other.header = NULL; } void swap(SkipList& other) { std::swap(header, other.header); } void addConflictRanges(const Finger* fingers, int rangeCount, Version version) { for (int r = rangeCount - 1; r >= 0; r--) { const Finger& startF = fingers[r * 2]; const Finger& endF = fingers[r * 2 + 1]; if (endF.found() == NULL) insert(endF, endF.finger[0]->getMaxVersion(0)); remove(startF, endF); insert(startF, version); } } void detectConflicts(ReadConflictRange* ranges, int count, bool* transactionConflictStatus) { const int M = 16; int nextJob[M]; CheckMax inProgress[M]; if (!count) return; int started = min(M, count); for (int i = 0; i < started; i++) { inProgress[i].init(ranges[i], header, transactionConflictStatus); nextJob[i] = i + 1; } nextJob[started - 1] = 0; int prevJob = started - 1; int job = 0; // vtune: 340 parts while (true) { if (inProgress[job].advance()) { if (started == count) { if (prevJob == job) break; nextJob[prevJob] = nextJob[job]; job = prevJob; } else inProgress[job].init(ranges[started++], header, transactionConflictStatus); } prevJob = job; job = nextJob[job]; } } // Splits the version history represented by this skiplist into separate key ranges // delimited by the given array of keys. This SkipList is left empty. this->partition // is intended to be followed by a call to this->concatenate() recombining the same // partitions. In between, operations on each partition must not touch any keys outside // the partition. Specifically, the partition to the left of 'key' must not have a range // [...,key) inserted, since that would insert an entry at 'key'. void partition(StringRef* begin, int splitCount, SkipList* output) { for (int i = splitCount - 1; i >= 0; i--) { Finger f(header, begin[i]); while (!f.finished()) f.nextLevel(); split(f, output[i + 1]); } swap(output[0]); } void concatenate(SkipList* input, int count) { std::vector ends(count - 1); for (int i = 0; i < ends.size(); i++) input[i].getEnd(ends[i]); for (int l = 0; l < MaxLevels; l++) { for (int i = ends.size() - 1; i >= 0; i--) { ends[i].finger[l]->setNext(l, input[i + 1].header->getNext(l)); if (l && (!i || ends[i].finger[l] != input[i].header)) ends[i].finger[l]->calcVersionForLevel(l); input[i + 1].header->setNext(l, NULL); } } swap(input[0]); } void find(const StringRef* values, Finger* results, int* temp, int count) { // Relying on the ordering of values, descend until the values aren't all in the // same part of the tree // vtune: 11 parts results[0].init(values[0], header); const StringRef& endValue = values[count - 1]; while (results[0].level > 1) { results[0].nextLevel(); Node* ac = results[0].alreadyChecked; if (ac && less(ac->value(), ac->length(), endValue.begin(), endValue.size())) break; } // Init all the other fingers to start descending where we stopped // the first one // SOMEDAY: this loop showed up on vtune, could be faster? // vtune: 8 parts int startLevel = results[0].level + 1; Node* x = startLevel < MaxLevels ? results[0].finger[startLevel] : header; for (int i = 1; i < count; i++) { results[i].level = startLevel; results[i].x = x; results[i].alreadyChecked = NULL; results[i].value = values[i]; for (int j = startLevel; j < MaxLevels; j++) results[i].finger[j] = results[0].finger[j]; } int* nextJob = temp; for (int i = 0; i < count - 1; i++) nextJob[i] = i + 1; nextJob[count - 1] = 0; int prevJob = count - 1; int job = 0; // vtune: 225 parts while (true) { Finger* f = &results[job]; f->advance(); if (f->finished()) { if (prevJob == job) break; nextJob[prevJob] = nextJob[job]; } else { f->prefetch(); prevJob = job; } job = nextJob[job]; } } int removeBefore(Version v, Finger& f, int nodeCount) { // f.x, f.alreadyChecked? int removedCount = 0; bool wasAbove = true; while (nodeCount--) { Node* x = f.finger[0]->getNext(0); if (!x) break; // double prefetch gives +25% speed (single threaded) Node* next = x->getNext(0); _mm_prefetch((const char*)next, _MM_HINT_T0); next = x->getNext(1); _mm_prefetch((const char*)next, _MM_HINT_T0); bool isAbove = x->getMaxVersion(0) >= v; if (isAbove || wasAbove) { // f.nextItem for (int l = 0; l <= x->level(); l++) f.finger[l] = x; } else { // f.eraseItem removedCount++; for (int l = 0; l <= x->level(); l++) f.finger[l]->setNext(l, x->getNext(l)); for (int i = 1; i <= x->level(); i++) f.finger[i]->setMaxVersion(i, max(f.finger[i]->getMaxVersion(i), x->getMaxVersion(i))); x->destroy(); } wasAbove = isAbove; } return removedCount; } private: void remove(const Finger& start, const Finger& end) { if (start.finger[0] == end.finger[0]) return; Node* x = start.finger[0]->getNext(0); // vtune says: this loop is the expensive parts (6 parts) for (int i = 0; i < MaxLevels; i++) if (start.finger[i] != end.finger[i]) start.finger[i]->setNext(i, end.finger[i]->getNext(i)); while (true) { Node* next = x->getNext(0); x->destroy(); if (x == end.finger[0]) break; x = next; } } void insert(const Finger& f, Version version) { int level = randomLevel(); // cout << std::string((const char*)value,length) << " level: " << level << endl; Node* x = Node::create(f.value, level); x->setMaxVersion(0, version); for (int i = 0; i <= level; i++) { x->setNext(i, f.finger[i]->getNext(i)); f.finger[i]->setNext(i, x); } // vtune says: this loop is the costly part of this function for (int i = 1; i <= level; i++) { f.finger[i]->calcVersionForLevel(i); x->calcVersionForLevel(i); } for (int i = level + 1; i < MaxLevels; i++) { Version v = f.finger[i]->getMaxVersion(i); if (v >= version) break; f.finger[i]->setMaxVersion(i, version); } } void insert(const StringRef& value, Version version) { Finger f(header, value); while (!f.finished()) f.nextLevel(); // SOMEDAY: equality? insert(f, version); } struct CheckMax { Finger start, end; Version version; bool* result; int state; void init(const ReadConflictRange& r, Node* header, bool* tCS) { this->start.init(r.begin, header); this->end.init(r.end, header); this->version = r.version; result = &tCS[r.transaction]; this->state = 0; } bool noConflict() { return true; } bool conflict() { *result = true; return true; } // Return true if finished force_inline bool advance() { switch (state) { case 0: // find where start and end fingers diverge while (true) { if (!start.advance()) { start.prefetch(); return false; } end.x = start.x; while (!end.advance()) ; int l = start.level; if (start.finger[l] != end.finger[l]) break; // accept if the range spans the check range, but does not have a greater version if (start.finger[l]->getMaxVersion(l) <= version) return noConflict(); if (l == 0) return conflict(); } state = 1; case 1: { // check the end side of the pyramid Node* e = end.finger[end.level]; while (e->getMaxVersion(end.level) > version) { if (end.finished()) return conflict(); end.nextLevel(); Node* f = end.finger[end.level]; while (e != f) { if (e->getMaxVersion(end.level) > version) return conflict(); e = e->getNext(end.level); } } // check the start side of the pyramid Node* s = end.finger[start.level]; while (true) { Node* nextS = start.finger[start.level]->getNext(start.level); Node* p = nextS; while (p != s) { if (p->getMaxVersion(start.level) > version) return conflict(); p = p->getNext(start.level); } if (start.finger[start.level]->getMaxVersion(start.level) <= version) return noConflict(); s = nextS; if (start.finished()) { if (nextS->length() == start.value.size() && !memcmp(nextS->value(), start.value.begin(), start.value.size())) return noConflict(); else return conflict(); } start.nextLevel(); } } default: __assume(false); } } }; void split(const Finger& f, SkipList& right) { ASSERT(!right.header->getNext(0)); // right must be empty right.header->setMaxVersion(0, f.finger[0]->getMaxVersion(0)); for (int l = 0; l < MaxLevels; l++) { right.header->setNext(l, f.finger[l]->getNext(l)); f.finger[l]->setNext(l, NULL); /*if (l) { // SOMEDAY: Do we actually need these? right.header->calcVersionForLevel(l); f.finger[l]->calcVersionForLevel(l); }*/ } } void getEnd(Finger& end) { Node* node = header; for (int l = MaxLevels - 1; l >= 0; l--) { Node* next; while ((next = node->getNext(l)) != NULL) node = next; end.finger[l] = node; } end.level = 0; } }; struct Action { virtual void operator()() = 0; // self-destructs }; typedef Action* PAction; template PAction action(F&& f) { struct FAction : Action, F, FastAllocated { FAction(F&& f) : F(std::move(f)) {} virtual void operator()() { F::operator()(); delete this; } }; return new FAction(std::move(f)); }; StringRef setK(Arena& arena, int i) { char t[sizeof(i)]; *(int*)t = i; const int keySize = 16; char* ss = new (arena) char[keySize]; for (int c = 0; c < keySize - sizeof(i); c++) ss[c] = '.'; for (int c = 0; c < sizeof(i); c++) ss[c + keySize - sizeof(i)] = t[sizeof(i) - 1 - c]; return StringRef((const uint8_t*)ss, keySize); } #include "fdbserver/ConflictSet.h" struct ConflictSet { ConflictSet() : oldestVersion(0) {} ~ConflictSet() {} SkipList versionHistory; Key removalKey; Version oldestVersion; std::vector worker_nextAction; std::vector worker_ready; std::vector worker_finished; }; ConflictSet* newConflictSet() { return new ConflictSet; } void clearConflictSet(ConflictSet* cs, Version v) { SkipList(v).swap(cs->versionHistory); } void destroyConflictSet(ConflictSet* cs) { delete cs; } ConflictBatch::ConflictBatch(ConflictSet* cs) : cs(cs), transactionCount(0) {} ConflictBatch::~ConflictBatch() {} struct TransactionInfo { VectorRef> readRanges; VectorRef> writeRanges; bool tooOld; }; void ConflictBatch::addTransaction(const CommitTransactionRef& tr) { int t = transactionCount++; Arena& arena = transactionInfo.arena(); TransactionInfo* info = new (arena) TransactionInfo; if (tr.read_snapshot < cs->oldestVersion && tr.read_conflict_ranges.size()) { info->tooOld = true; } else { info->tooOld = false; info->readRanges.resize(arena, tr.read_conflict_ranges.size()); info->writeRanges.resize(arena, tr.write_conflict_ranges.size()); std::vector& points = this->points; for (int r = 0; r < tr.read_conflict_ranges.size(); r++) { const KeyRangeRef& range = tr.read_conflict_ranges[r]; points.emplace_back(range.begin, false, true, false, t, &info->readRanges[r].first); points.emplace_back(range.end, false, false, false, t, &info->readRanges[r].second); combinedReadConflictRanges.emplace_back(range.begin, range.end, tr.read_snapshot, t); } for (int r = 0; r < tr.write_conflict_ranges.size(); r++) { const KeyRangeRef& range = tr.write_conflict_ranges[r]; points.emplace_back(range.begin, false, true, true, t, &info->writeRanges[r].first); points.emplace_back(range.end, false, false, true, t, &info->writeRanges[r].second); } } this->transactionInfo.push_back(arena, info); } class MiniConflictSet2 : NonCopyable { std::vector values; public: explicit MiniConflictSet2(int size) { values.assign(size, false); } void set(int begin, int end) { for (int i = begin; i < end; i++) values[i] = true; } bool any(int begin, int end) { for (int i = begin; i < end; i++) if (values[i]) return true; return false; } }; class MiniConflictSet : NonCopyable { typedef uint64_t wordType; enum { bucketShift = 6, bucketMask = sizeof(wordType) * 8 - 1 }; std::vector values; // undefined when andValues is true for a range of values std::vector orValues; std::vector andValues; MiniConflictSet2 debug; // SOMEDAY: Test on big ranges, eliminate this wordType bitMask(unsigned int bit) { // computes results for bit%word return (((wordType)1) << (bit & bucketMask)); } void setNthBit(std::vector& v, const unsigned int bit) { v[bit >> bucketShift] |= bitMask(bit); } void clearNthBit(std::vector& v, const unsigned int bit) { v[bit >> bucketShift] &= ~(bitMask(bit)); } bool getNthBit(const std::vector& v, const unsigned int bit) { return (v[bit >> bucketShift] & bitMask(bit)) != 0; } int wordsForNBits(unsigned int bits) { return (bits + ((1 << bucketShift) - 1)) >> bucketShift; } wordType highBits(int b) { // bits (b&bucketMask) and higher are 1 #pragma warning(disable : 4146) return -bitMask(b); #pragma warning(default : 4146) } wordType lowBits(int b) { // bits lower than (b&bucketMask) are 1 return bitMask(b) - 1; } wordType lowBits2(int b) { return (b & bucketMask) ? lowBits(b) : -1; } void setBits(std::vector& v, int bitBegin, int bitEnd, bool fillMiddle) { if (bitBegin >= bitEnd) return; int beginWord = bitBegin >> bucketShift; int lastWord = ((bitEnd + bucketMask) >> bucketShift) - 1; if (beginWord == lastWord) { v[beginWord] |= highBits(bitBegin) & lowBits2(bitEnd); } else { v[beginWord] |= highBits(bitBegin); if (fillMiddle) for (int w = beginWord + 1; w < lastWord; w++) v[w] = wordType(-1); v[lastWord] |= lowBits2(bitEnd); } } bool orBits(std::vector& v, int bitBegin, int bitEnd, bool getMiddle) { if (bitBegin >= bitEnd) return false; int beginWord = bitBegin >> bucketShift; int lastWord = ((bitEnd + bucketMask) >> bucketShift) - 1; if (beginWord == lastWord) return (v[beginWord] & highBits(bitBegin) & lowBits2(bitEnd)) != 0; else { if (getMiddle) for (int w = beginWord + 1; w < lastWord; w++) if (v[w]) return true; return ((v[beginWord] & highBits(bitBegin)) | (v[lastWord] & lowBits2(bitEnd))) != 0; } } bool orImpl(int begin, int end) { if (begin == end) return false; int beginWord = begin >> bucketShift; int lastWord = ((end + bucketMask) >> bucketShift) - 1; return orBits(orValues, beginWord + 1, lastWord, true) || getNthBit(andValues, beginWord) || getNthBit(andValues, lastWord) || orBits(values, begin, end, false); } public: explicit MiniConflictSet(int size) : debug(size) { static_assert((1 << bucketShift) == sizeof(wordType) * 8, "BucketShift incorrect"); values.assign(wordsForNBits(size), false); orValues.assign(wordsForNBits(wordsForNBits(size)), false); andValues.assign(wordsForNBits(wordsForNBits(size)), false); } void set(int begin, int end) { debug.set(begin, end); if (begin == end) return; int beginWord = begin >> bucketShift; int lastWord = ((end + bucketMask) >> bucketShift) - 1; setBits(values, begin, end, false); setBits(andValues, beginWord + 1, lastWord, true); setBits(orValues, beginWord, lastWord + 1, true); } bool any(int begin, int end) { bool a = orImpl(begin, end); bool b = debug.any(begin, end); ASSERT(a == b); return b; } }; void ConflictBatch::checkIntraBatchConflicts() { int index = 0; for (int p = 0; p < points.size(); p++) *points[p].pIndex = index++; MiniConflictSet mcs(index); for (int t = 0; t < transactionInfo.size(); t++) { const TransactionInfo& tr = *transactionInfo[t]; if (transactionConflictStatus[t]) continue; bool conflict = tr.tooOld; for (int i = 0; i < tr.readRanges.size(); i++) if (mcs.any(tr.readRanges[i].first, tr.readRanges[i].second)) { conflict = true; break; } transactionConflictStatus[t] = conflict; if (!conflict) for (int i = 0; i < tr.writeRanges.size(); i++) mcs.set(tr.writeRanges[i].first, tr.writeRanges[i].second); } } void ConflictBatch::GetTooOldTransactions(std::vector& tooOldTransactions) { for (int i = 0; i < transactionInfo.size(); i++) { if (transactionInfo[i]->tooOld) { tooOldTransactions.push_back(i); } } } void ConflictBatch::detectConflicts(Version now, Version newOldestVersion, std::vector& nonConflicting, std::vector* tooOldTransactions) { double t = timer(); sortPoints(points); g_sort += timer() - t; transactionConflictStatus = new bool[transactionCount]; memset(transactionConflictStatus, 0, transactionCount * sizeof(bool)); t = timer(); checkReadConflictRanges(); g_checkRead += timer() - t; t = timer(); checkIntraBatchConflicts(); g_checkBatch += timer() - t; t = timer(); combineWriteConflictRanges(); g_combine += timer() - t; t = timer(); mergeWriteConflictRanges(now); g_merge += timer() - t; for (int i = 0; i < transactionCount; i++) { if (!transactionConflictStatus[i]) nonConflicting.push_back(i); if (tooOldTransactions && transactionInfo[i]->tooOld) tooOldTransactions->push_back(i); } delete[] transactionConflictStatus; t = timer(); if (newOldestVersion > cs->oldestVersion) { cs->oldestVersion = newOldestVersion; SkipList::Finger finger; int temp; cs->versionHistory.find(&cs->removalKey, &finger, &temp, 1); cs->versionHistory.removeBefore(cs->oldestVersion, finger, combinedWriteConflictRanges.size() * 3 + 10); cs->removalKey = finger.getValue(); } g_removeBefore += timer() - t; } void ConflictBatch::checkReadConflictRanges() { if (!combinedReadConflictRanges.size()) return; if (PARALLEL_THREAD_COUNT) { Event done[PARALLEL_THREAD_COUNT ? PARALLEL_THREAD_COUNT : 1]; for (int t = 0; t < PARALLEL_THREAD_COUNT; t++) { cs->worker_nextAction[t] = action([&, t] { #pragma GCC diagnostic push DISABLE_ZERO_DIVISION_FLAG auto begin = &combinedReadConflictRanges[0] + t * combinedReadConflictRanges.size() / PARALLEL_THREAD_COUNT; auto end = &combinedReadConflictRanges[0] + (t + 1) * combinedReadConflictRanges.size() / PARALLEL_THREAD_COUNT; #pragma GCC diagnostic pop cs->versionHistory.detectConflicts(begin, end - begin, transactionConflictStatus); done[t].set(); }); cs->worker_ready[t]->set(); } for (int i = 0; i < PARALLEL_THREAD_COUNT; i++) done[i].block(); } else { cs->versionHistory.detectConflicts(&combinedReadConflictRanges[0], combinedReadConflictRanges.size(), transactionConflictStatus); } } void ConflictBatch::addConflictRanges(Version now, std::vector>::iterator begin, std::vector>::iterator end, SkipList* part) { int count = end - begin; static_assert(sizeof(begin[0]) == sizeof(StringRef) * 2, "Write Conflict Range type not convertible to two StringPtrs"); const StringRef* strings = reinterpret_cast(&*begin); int stringCount = count * 2; static const int stripeSize = 16; SkipList::Finger fingers[stripeSize]; int temp[stripeSize]; int stripes = (stringCount + stripeSize - 1) / stripeSize; int ss = stringCount - (stripes - 1) * stripeSize; for (int s = stripes - 1; s >= 0; s--) { part->find(&strings[s * stripeSize], &fingers[0], temp, ss); part->addConflictRanges(&fingers[0], ss / 2, now); ss = stripeSize; } } void ConflictBatch::mergeWriteConflictRanges(Version now) { if (!combinedWriteConflictRanges.size()) return; if (PARALLEL_THREAD_COUNT) { std::vector parts; for (int i = 0; i < PARALLEL_THREAD_COUNT; i++) parts.emplace_back(); std::vector splits(parts.size() - 1); for (int s = 0; s < splits.size(); s++) splits[s] = combinedWriteConflictRanges[(s + 1) * combinedWriteConflictRanges.size() / parts.size()].first; cs->versionHistory.partition(splits.size() ? &splits[0] : NULL, splits.size(), &parts[0]); std::vector tstart(PARALLEL_THREAD_COUNT), tend(PARALLEL_THREAD_COUNT); Event done[PARALLEL_THREAD_COUNT ? PARALLEL_THREAD_COUNT : 1]; double before = timer(); for (int t = 0; t < parts.size(); t++) { cs->worker_nextAction[t] = action([&, t] { tstart[t] = timer(); auto begin = combinedWriteConflictRanges.begin() + (t * combinedWriteConflictRanges.size() / parts.size()); auto end = combinedWriteConflictRanges.begin() + ((t + 1) * combinedWriteConflictRanges.size() / parts.size()); addConflictRanges(now, begin, end, &parts[t]); tend[t] = timer(); done[t].set(); }); cs->worker_ready[t]->set(); } double launch = timer(); for (int i = 0; i < PARALLEL_THREAD_COUNT; i++) done[i].block(); double after = timer(); g_merge_launch += launch - before; g_merge_fork += *std::min_element(tstart.begin(), tstart.end()) - before; g_merge_start_var += *std::max_element(tstart.begin(), tstart.end()) - *std::min_element(tstart.begin(), tstart.end()); g_merge_end_var += *std::max_element(tend.begin(), tend.end()) - *std::min_element(tend.begin(), tend.end()); g_merge_join += after - *std::max_element(tend.begin(), tend.end()); double run_max = 0, run_min = 1e9; for (int i = 0; i < tend.size(); i++) { run_max = max(run_max, tend[i] - tstart[i]); run_min = min(run_min, tend[i] - tstart[i]); } g_merge_run_var += run_max - run_min; g_merge_run_shortest += run_min; g_merge_run_longest += run_max; g_merge_run_total += std::accumulate(tend.begin(), tend.end(), 0.0) - std::accumulate(tstart.begin(), tstart.end(), 0.0); cs->versionHistory.concatenate(&parts[0], parts.size()); } else { addConflictRanges(now, combinedWriteConflictRanges.begin(), combinedWriteConflictRanges.end(), &cs->versionHistory); } // for(auto w = combinedWriteConflictRanges.begin(); w != combinedWriteConflictRanges.end(); ++w) // versionHistory.addConflictRange( w->first.begin(), w->first.size(), w->second.begin(), w->second.size(), now ); } void ConflictBatch::combineWriteConflictRanges() { int activeWriteCount = 0; for (int i = 0; i < points.size(); i++) { KeyInfo& point = points[i]; if (point.write && !transactionConflictStatus[point.transaction]) { if (point.begin) { activeWriteCount++; if (activeWriteCount == 1) combinedWriteConflictRanges.emplace_back(point.key, KeyRef()); } else /*if (point.end)*/ { activeWriteCount--; if (activeWriteCount == 0) combinedWriteConflictRanges.back().second = point.key; } } } } void miniConflictSetTest() { for (int i = 0; i < 2000000; i++) { int size = 64 * 5; // Also run 64*64*5 to test multiple words of andValues and orValues MiniConflictSet mini(size); for (int j = 0; j < 2; j++) { int a = deterministicRandom()->randomInt(0, size); int b = deterministicRandom()->randomInt(a, size); mini.set(a, b); } for (int j = 0; j < 4; j++) { int a = deterministicRandom()->randomInt(0, size); int b = deterministicRandom()->randomInt(a, size); mini.any(a, b); // Tests correctness internally } } printf("miniConflictSetTest complete\n"); } void skipListTest() { printf("Skip list test\n"); // A test case that breaks the old operator< // KeyInfo a( LiteralStringRef("hello"), true, false, true, -1 ); // KeyInfo b( LiteralStringRef("hello\0"), false, false, false, 0 ); miniConflictSetTest(); setAffinity(0); double start; ConflictSet* cs = newConflictSet(); Arena testDataArena; VectorRef> testData; testData.resize(testDataArena, 500); std::vector> success(testData.size()); std::vector> success2(testData.size()); for (int i = 0; i < testData.size(); i++) { testData[i].resize(testDataArena, 5000); success[i].assign(testData[i].size(), false); success2[i].assign(testData[i].size(), false); for (int j = 0; j < testData[i].size(); j++) { int key = deterministicRandom()->randomInt(0, 20000000); int key2 = key + 1 + deterministicRandom()->randomInt(0, 10); testData[i][j] = KeyRangeRef(setK(testDataArena, key), setK(testDataArena, key2)); } } printf("Test data generated (%d)\n", deterministicRandom()->randomInt(0, 100000)); printf(" %d threads, %d batches, %d/batch\n", PARALLEL_THREAD_COUNT, testData.size(), testData[0].size()); printf("Running\n"); int readCount = 1, writeCount = 1; int cranges = 0, tcount = 0; start = timer(); std::vector> nonConflict(testData.size()); for (int i = 0; i < testData.size(); i++) { Arena buf; std::vector trs; double t = timer(); for (int j = 0; j + readCount + writeCount <= testData[i].size(); j += readCount + writeCount) { CommitTransactionRef tr; for (int k = 0; k < readCount; k++) { KeyRangeRef r(buf, testData[i][j + k]); tr.read_conflict_ranges.push_back(buf, r); } for (int k = 0; k < writeCount; k++) { KeyRangeRef r(buf, testData[i][j + readCount + k]); tr.write_conflict_ranges.push_back(buf, r); } cranges += tr.read_conflict_ranges.size() + tr.write_conflict_ranges.size(); tr.read_snapshot = i; trs.push_back(tr); } tcount += trs.size(); g_buildTest += timer() - t; t = timer(); ConflictBatch batch(cs); for (int j = 0; j < trs.size(); j++) batch.addTransaction(trs[j]); g_add += timer() - t; t = timer(); batch.detectConflicts(i + 50, i, nonConflict[i]); g_detectConflicts += timer() - t; } double elapsed = timer() - start; printf("New conflict set: %0.3f sec\n", elapsed); printf(" %0.3f Mtransactions/sec\n", tcount / elapsed / 1e6); printf(" %0.3f Mkeys/sec\n", cranges * 2 / elapsed / 1e6); elapsed = g_detectConflicts.getValue(); printf("Detect only: %0.3f sec\n", elapsed); printf(" %0.3f Mtransactions/sec\n", tcount / elapsed / 1e6); printf(" %0.3f Mkeys/sec\n", cranges * 2 / elapsed / 1e6); elapsed = g_checkRead.getValue() + g_merge.getValue(); printf("Skiplist only: %0.3f sec\n", elapsed); printf(" %0.3f Mtransactions/sec\n", tcount / elapsed / 1e6); printf(" %0.3f Mkeys/sec\n", cranges * 2 / elapsed / 1e6); printf("Performance counters:\n"); for (int c = 0; c < skc.size(); c++) { printf("%20s: %s\n", skc[c]->getMetric().name().c_str(), skc[c]->getMetric().formatted().c_str()); } printf("%d entries in version history\n", cs->versionHistory.count()); }