/* * SkipList.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #include #include #include /* #ifdef __GNUG__ #include #endif */ #include "flow/Platform.h" #include "fdbrpc/fdbrpc.h" #include "fdbrpc/PerfMetric.h" #include "fdbclient/FDBTypes.h" #include "fdbclient/KeyRangeMap.h" #include "fdbclient/SystemData.h" #include "fdbserver/Knobs.h" #define PARALLEL_THREAD_COUNT 0 // FIXME: When >1, program execution (e.g. random numbers) is/was nondeterministic. Why? using std::min; using std::max; static std::vector skc; static thread_local uint32_t g_seed = 0; static inline int skfastrand() { g_seed = g_seed * 1664525L + 1013904223L; return g_seed; } void setAffinity(int proc); class SlowConflictSet { public: bool is_conflict( const VectorRef& readRanges, Version read_snapshot ); void add( const VectorRef& clearRanges, const VectorRef& setValues, Version now ); void clear( Version now ); private: KeyRangeMap age; }; bool SlowConflictSet::is_conflict( const VectorRef& readRanges, Version read_snapshot ) { for(auto range = readRanges.begin(); range != readRanges.end(); ++range) { auto intersecting = age.intersectingRanges( *range ); for(auto it = intersecting.begin(); it != intersecting.end(); ++it) if ( it.value() > read_snapshot ) return true; } return false; } void SlowConflictSet::clear( Version now ) { age.insert(allKeys, now); } void SlowConflictSet::add( const VectorRef& clearRanges, const VectorRef& setValues, Version now ) { for(auto c = clearRanges.begin(); c != clearRanges.end(); ++c) age.insert( *c, now ); for(auto s = setValues.begin(); s != setValues.end(); ++s) age.insert( s->key, now ); } PerfDoubleCounter g_buildTest("Build", skc), g_add("Add", skc), g_add_sort("A.Sort", skc), g_detectConflicts("Detect", skc), g_sort("D.Sort", skc), g_combine("D.Combine", skc), g_checkRead("D.CheckRead", skc), g_checkBatch("D.CheckIntraBatch", skc), g_merge("D.MergeWrite", skc), g_merge_launch("D.Merge.Launch", skc), g_merge_fork("D.Merge.Fork", skc), g_merge_start_var("D.Merge.StartVariance", skc), g_merge_end_var("D.Merge.EndVariance", skc), g_merge_run_var("D.Merge.RunVariance", skc), g_merge_run_shortest("D.Merge.ShortestRun", skc), g_merge_run_longest("D.Merge.LongestRun", skc), g_merge_run_total("D.Merge.TotalRun", skc), g_merge_join("D.Merge.Join", skc), g_removeBefore("D.RemoveBefore", skc) ; static force_inline int compare( const StringRef& a, const StringRef& b ) { int c = memcmp( a.begin(), b.begin(), min(a.size(), b.size()) ); if (c<0) return -1; if (c>0) return +1; if (a.size() < b.size()) return -1; if (a.size() == b.size()) return 0; return +1; } struct ReadConflictRange { StringRef begin, end; Version version; int transaction; std::vector* conflictingKeyRange; int indexInTx; ReadConflictRange( StringRef begin, StringRef end, Version version, int transaction, int indexInTx, std::vector * cKR = nullptr ) : begin(begin), end(end), version(version), transaction(transaction), indexInTx(indexInTx), conflictingKeyRange(cKR) { } bool operator<(const ReadConflictRange& rhs) const { return compare(begin, rhs.begin)<0; } }; struct KeyInfo { StringRef key; int* pIndex; bool nextKey; bool begin; bool write; int transaction; KeyInfo() {}; KeyInfo( StringRef key, bool nextKey, bool begin, bool write, int transaction, int* pIndex ) : key(key), nextKey(nextKey), begin(begin), write(write), transaction(transaction), pIndex(pIndex) {} }; // returns true if done with string force_inline bool getCharacter(const KeyInfo& ki, int character, int &outputCharacter){ // normal case if (character < ki.key.size()){ outputCharacter = 5 + ki.key.begin()[character]; return false; } // nextKey append a zero if (ki.nextKey && character >= ki.key.size()){ if (character == ki.key.size()){ outputCharacter = 5; // extra '0' character return false; } character--; } // termination if (character == ki.key.size()){ outputCharacter = 0; return false; } if (character == ki.key.size()+1) { // end/begin+read/write relative sorting outputCharacter = ki.begin*2 + (ki.write ^ ki.begin); return false; } outputCharacter = 0; return true; } bool operator < ( const KeyInfo& lhs, const KeyInfo& rhs ) { int i = min(lhs.key.size(), rhs.key.size()); int c = memcmp( lhs.key.begin(), rhs.key.begin(), i ); if (c!=0) return c<0; // SOMEDAY: This is probably not very fast. Slows D.Sort by ~20% relative to previous (incorrect) version. bool lDone, rDone; int lc, rc; while (true) { lDone = getCharacter(lhs, i, lc); rDone = getCharacter(rhs, i, rc); if (lDone && rDone) return false; // equality if (lc < rc) return true; if (lc > rc) return false; i++; } } bool operator == (const KeyInfo& lhs, const KeyInfo& rhs ) { return !(lhs& points, int a, int b){ if (points[b] < points[a]){ KeyInfo temp; temp = points[a]; points[a] = points[b]; points[b] = temp; } } void smallSort(std::vector& points, int start, int N){ for (int i=1;i0;j-=2) swapSort(points, start+j-1, start+j); for (int i=N-2;i>0;i--) for (int j=i;j>0;j-=2) swapSort(points, start+j-1, start+j); } struct SortTask { int begin; int size; int character; SortTask(int begin, int size, int character) : begin(begin), size(size), character(character) {} }; void sortPoints(std::vector& points){ std::vector tasks; std::vector newPoints; std::vector counts; tasks.emplace_back(0, points.size(), 0); while (tasks.size()){ SortTask st = tasks.back(); tasks.pop_back(); if (st.size < 10){ //smallSort(points, st.begin, st.size); std::sort(points.begin() + st.begin, points.begin() + st.begin + st.size ); continue; } newPoints.resize(st.size); counts.assign(256+5, 0); // get counts int c; bool allDone = true; for (int i=st.begin; i 1) tasks.emplace_back(st.begin+total, temp, st.character+1); counts[i] = total; total += temp; } // put in their places for (int i=st.begin; irandom01() < 0.5 && l < MaxLevels-1) l++; return l; */ //deterministicRandom()->randomInt(0, 1<<(MaxLevels-1)); uint32_t i = uint32_t(skfastrand()) >> (32-(MaxLevels-1)); int level = 0; while (i&1) { i>>=1; level++; } ASSERT( level < MaxLevels ); return level; } /* struct Node { int nPointers, valueLength; Node *pointers[nPointers]; Version maxVersions[nPointers]; char value[valueLength]; }; */ struct Node { int level() { return nPointers-1; } uint8_t* value() { return end() + nPointers*(sizeof(Node*)+sizeof(Version)); } int length() { return valueLength; } Node* getNext(int i) { return *((Node**)end() + i); } void setNext(int i, Node* n) { *((Node**)end() + i) = n; #if defined(_DEBUG) || 1 /*if (n && n->level() < i) *(volatile int*)0 = 0;*/ #endif } Version getMaxVersion(int i) { return ((Version*)(end() + nPointers*sizeof(Node*)))[i]; } void setMaxVersion(int i, Version v) { ((Version*)(end() + nPointers*sizeof(Node*)))[i] = v; } // Return a node with initialized value but uninitialized pointers static Node* create( const StringRef& value, int level ) { int nodeSize = sizeof(Node) + value.size() + (level+1)*(sizeof(Node*)+sizeof(Version)); Node* n; if (nodeSize <= 64) { n = (Node*)FastAllocator<64>::allocate(); INSTRUMENT_ALLOCATE("SkipListNode64"); } else if (nodeSize <= 128) { n = (Node*)FastAllocator<128>::allocate(); INSTRUMENT_ALLOCATE("SkipListNode128"); } else { n = (Node*)new char[ nodeSize ]; INSTRUMENT_ALLOCATE("SkipListNodeLarge"); } n->nPointers = level+1; n->valueLength = value.size(); memcpy(n->value(), value.begin(), value.size()); return n; } // pre: level>0, all lower level nodes between this and getNext(level) have correct maxversions void calcVersionForLevel(int level){ Node *end = getNext(level); Version v = getMaxVersion(level-1); for(Node *x = getNext(level-1); x != end; x = x->getNext(level-1)) v = max(v, x->getMaxVersion(level-1)); setMaxVersion(level, v); } void destroy() { int nodeSize = getNodeSize(); if (nodeSize <= 64) { FastAllocator<64>::release(this); INSTRUMENT_RELEASE("SkipListNode64"); } else if (nodeSize <= 128) { FastAllocator<128>::release(this); INSTRUMENT_RELEASE("SkipListNode128"); } else { delete[] (char*)this; INSTRUMENT_RELEASE("SkipListNodeLarge"); } } private: int getNodeSize() { return sizeof(Node) + valueLength + nPointers*(sizeof(Node*)+sizeof(Version)); } uint8_t* end() { return (uint8_t*)(this+1); } int nPointers, valueLength; }; static force_inline bool less( const uint8_t* a, int aLen, const uint8_t* b, int bLen ) { int len = min(aLen, bLen); for(int i=0; i b[i]) return false; /*int c = memcmp(a,b,min(aLen,bLen)); if (c<0) return true; if (c>0) return false;*/ return aLen < bLen; } Node *header; void destroy() { Node *next, *x; for(x = header; x; x = next) { next = x->getNext(0); x->destroy(); } } public: struct Finger{ Node *finger[MaxLevels]; // valid for levels >= level int level; Node* x; Node *alreadyChecked; StringRef value; Finger() : level(MaxLevels), x(NULL), alreadyChecked(NULL) {} Finger( Node* header, const StringRef& ptr ) : value(ptr), level(MaxLevels), alreadyChecked(NULL), x(header) { } void init(const StringRef& value, Node *header){ this->value = value; x = header; alreadyChecked = NULL; level = MaxLevels; } // pre: !finished() force_inline void prefetch() { Node* next = x->getNext(level-1); _mm_prefetch( (const char*)next, _MM_HINT_T0 ); //if ( (((intptr_t)next) & 64) == 0 ) _mm_prefetch( (const char*)next+64, _MM_HINT_T0 ); //_mm_prefetch( (const char*)next+128, _MM_HINT_T0 ); //_mm_prefetch( (const char*)next+192, _MM_HINT_T0 ); //_mm_prefetch( (const char*)next+256, _MM_HINT_T0 ); //_mm_prefetch( (const char*)next+320, _MM_HINT_T0 ); } // pre: !finished() // Returns true if we have advanced to the next level force_inline bool advance() { Node* next = x->getNext(level-1); if (next == alreadyChecked || !less(next->value(), next->length(), value.begin(), value.size())) { alreadyChecked = next; level--; finger[level] = x; return true; } else { x = next; return false; } } // pre: !finished() force_inline void nextLevel() { while (!advance()); } force_inline bool finished(){ return level == 0; } force_inline Node* found() const { // valid after finished returns true Node *n = finger[0]->getNext(0); // or alreadyChecked, but that is more easily invalidated if (n && n->length() == value.size() && !memcmp(n->value(), value.begin(), value.size())) return n; else return NULL; } StringRef getValue() const { Node* n = finger[0]->getNext(0); return n ? StringRef( n->value(), n->length() ) : StringRef(); } }; int count() { int count = 0; Node* x = header->getNext(0); while (x) { x = x->getNext(0); count++; } return count; } explicit SkipList( Version version = 0 ) { header = Node::create(StringRef(), MaxLevels-1); for(int l=0; lsetNext(l, NULL); header->setMaxVersion(l, version); } } ~SkipList() { destroy(); } SkipList(SkipList&& other) BOOST_NOEXCEPT : header(other.header) { other.header = NULL; } void operator=(SkipList&& other) BOOST_NOEXCEPT { destroy(); header = other.header; other.header = NULL; } void swap( SkipList& other ) { std::swap(header, other.header); } void addConflictRanges( const Finger* fingers, int rangeCount, Version version ) { for(int r=rangeCount-1; r>=0; r--) { const Finger& startF = fingers[r*2]; const Finger& endF = fingers[r*2+1]; if (endF.found()==NULL) insert(endF, endF.finger[0]->getMaxVersion(0)); remove( startF, endF ); insert( startF, version ); } } void detectConflicts( ReadConflictRange* ranges, int count, bool* transactionConflictStatus ) { const int M = 16; int nextJob[M]; CheckMax inProgress[ M ]; if (!count) return; int started = min(M,count); for(int i=0; ipartition // is intended to be followed by a call to this->concatenate() recombining the same // partitions. In between, operations on each partition must not touch any keys outside // the partition. Specifically, the partition to the left of 'key' must not have a range // [...,key) inserted, since that would insert an entry at 'key'. void partition( StringRef* begin, int splitCount, SkipList* output ) { for(int i=splitCount-1; i>=0; i--) { Finger f( header, begin[i] ); while (!f.finished()) f.nextLevel(); split(f, output[i+1]); } swap(output[0]); } void concatenate( SkipList* input, int count ) { std::vector ends( count-1 ); for(int i=0; i=0; i--) { ends[i].finger[l]->setNext( l, input[i+1].header->getNext(l) ); if (l && (!i || ends[i].finger[l] != input[i].header)) ends[i].finger[l]->calcVersionForLevel(l); input[i+1].header->setNext( l, NULL ); } } swap(input[0]); } void find( const StringRef* values, Finger* results, int* temp, int count ) { // Relying on the ordering of values, descend until the values aren't all in the // same part of the tree // vtune: 11 parts results[0].init( values[0], header ); const StringRef& endValue = values[count-1]; while ( results[0].level > 1 ) { results[0].nextLevel(); Node* ac = results[0].alreadyChecked; if (ac && less(ac->value(), ac->length(), endValue.begin(), endValue.size())) break; } // Init all the other fingers to start descending where we stopped // the first one // SOMEDAY: this loop showed up on vtune, could be faster? // vtune: 8 parts int startLevel = results[0].level+1; Node *x = startLeveladvance(); if (f->finished()) { if (prevJob == job) break; nextJob[prevJob] = nextJob[job]; } else { f->prefetch(); prevJob = job; } job = nextJob[job]; } } /*Finger randomFinger() { // Written, not exactly uniform, not tested Finger f( header, StringRef() ); Node* begin = header, *end = 0; for(int lev = MaxLevels-1; lev>=0; lev--) { int length = 0; for( Node* x = begin; x != end; x=x->getNext(lev) ) length++; if (length == 1) { // forced down f.finger[lev] = begin; } else { int c = deterministicRandom()->randomInt(0, length); for( Node* x = begin; x != end; x=x->getNext(lev) ) if (!c--) { f.finger[lev] = begin = x; end = x->getNext(lev); break; } } } f.level = 0; return f; }*/ int removeBefore( Version v, Finger& f, int nodeCount ) { /*Finger f( header, StringRef() ); for(int i=0; igetNext(0); if (!x) break; // double prefetch gives +25% speed (single threaded) Node* next = x->getNext(0); _mm_prefetch( (const char*)next, _MM_HINT_T0 ); //_mm_prefetch( (const char*)next+64, _MM_HINT_T0 ); next = x->getNext(1); _mm_prefetch( (const char*)next, _MM_HINT_T0 ); //_mm_prefetch( (const char*)next+64, _MM_HINT_T0 ); bool isAbove = x->getMaxVersion(0) >= v; if (isAbove || wasAbove) { // f.nextItem for(int l=0; l<=x->level(); l++) f.finger[l] = x; } else { // f.eraseItem removedCount++; for(int l=0; l<=x->level(); l++) f.finger[l]->setNext(l, x->getNext(l)); for(int i=1; i<=x->level(); i++) f.finger[i]->setMaxVersion( i, max(f.finger[i]->getMaxVersion(i), x->getMaxVersion(i)) ); x->destroy(); } wasAbove = isAbove; } return removedCount; } private: void remove( const Finger& start, const Finger& end ) { if (start.finger[0] == end.finger[0]) return; Node *x = start.finger[0]->getNext(0); // vtune says: this loop is the expensive parts (6 parts) for(int i=0; isetNext(i, end.finger[i]->getNext(i)); while (true) { Node* next = x->getNext(0); x->destroy(); if (x == end.finger[0]) break; x = next; } } //void insert( const std::string& v, Version version ) { insert(StringRef(v), version); } void insert( const Finger& f, Version version ) { int level = randomLevel(); //cout << std::string((const char*)value,length) << " level: " << level << endl; Node *x = Node::create( f.value, level ); x->setMaxVersion(0, version); for(int i=0; i<=level; i++) { x->setNext(i, f.finger[i]->getNext(i)); f.finger[i]->setNext(i, x); } // vtune says: this loop is the costly part of this function for(int i=1; i<=level; i++) { f.finger[i]->calcVersionForLevel(i); x->calcVersionForLevel(i); } for(int i=level+1; igetMaxVersion(i); if (v >= version) break; f.finger[i]->setMaxVersion(i, version); } } void insert( const StringRef& value, Version version ) { Finger f(header, value); while (!f.finished()) f.nextLevel(); // SOMEDAY: equality? insert( f, version ); } struct CheckMax { Finger start, end; Version version; bool *result; int state; int indexInTx; std::vector* conflictingKeyRange; // null if report_conflicting_keys is not enabled. void init( const ReadConflictRange& r, Node* header, bool* tCS, int indexInTx, std::vector* cKR) { this->start.init( r.begin, header ); this->end.init( r.end, header ); this->version = r.version; this->indexInTx = indexInTx; result = &tCS[ r.transaction ]; conflictingKeyRange = cKR; this->state = 0; } bool noConflict() { return true; } bool conflict() { *result = true; if(conflictingKeyRange != nullptr) conflictingKeyRange->push_back(indexInTx); return true; } // Return true if finished force_inline bool advance() { switch (state) { case 0: // find where start and end fingers diverge while (true) { if (!start.advance()) { start.prefetch(); return false; } end.x = start.x; while (!end.advance()); int l = start.level; if (start.finger[l] != end.finger[l]) break; // accept if the range spans the check range, but does not have a greater version if (start.finger[l]->getMaxVersion(l) <= version) return noConflict(); if (l==0) return conflict(); // The whole read_conflict_range is conflicting with other transactions with higher version number } state = 1; case 1: { // check the end side of the pyramid Node *e = end.finger[end.level]; while (e->getMaxVersion(end.level) > version) { if (end.finished()) return conflict(); end.nextLevel(); Node *f = end.finger[end.level]; while (e != f){ if (e->getMaxVersion(end.level) > version) return conflict(); e = e->getNext(end.level); } } // check the start side of the pyramid Node *s = end.finger[start.level]; while (true){ Node *nextS = start.finger[start.level]->getNext(start.level); Node *p = nextS; while (p != s){ if (p->getMaxVersion(start.level) > version) return conflict(); p = p->getNext(start.level); } if (start.finger[start.level]->getMaxVersion(start.level) <= version) return noConflict(); s = nextS; if (start.finished()) { if (nextS->length() == start.value.size() && !memcmp(nextS->value(), start.value.begin(), start.value.size())) return noConflict(); else return conflict(); } start.nextLevel(); } } default: __assume(false); } } }; void split( const Finger& f, SkipList& right ) { ASSERT( !right.header->getNext(0) ); // right must be empty right.header->setMaxVersion(0, f.finger[0]->getMaxVersion(0)); for(int l=0; lsetNext(l, f.finger[l]->getNext(l)); f.finger[l]->setNext(l, NULL); /*if (l) { // SOMEDAY: Do we actually need these? right.header->calcVersionForLevel(l); f.finger[l]->calcVersionForLevel(l); }*/ } } void getEnd( Finger& end ) { Node* node = header; for(int l=MaxLevels-1; l>=0; l--) { Node* next; while ( (next=node->getNext(l)) != NULL ) node = next; end.finger[l] = node; } end.level = 0; // SOMEDAY: end.x? end.alreadyChecked? /*end = Finger(header, (const uint8_t*)"\xff\xff\xff\xff\xff\xff", 6); while (!end.finished()) end.nextLevel();*/ } }; struct Action { virtual void operator()() = 0; // self-destructs }; typedef Action* PAction; template PAction action( F && f ) { struct FAction : Action, F, FastAllocated { FAction( F&& f ) : F(std::move(f)) {} virtual void operator()() { F::operator()(); delete this; } }; return new FAction( std::move(f) ); }; void workerThread( PAction* nextAction, Event* nextActionReady, int index, Event* whenFinished ) { ASSERT(false); /* inThread( [nextAction,nextActionReady,index,whenFinished]()->Void { g_seed = index*123; fastrand(); setAffinity( index ); while (true) { try { nextActionReady->block(); // auto-reset Action* action = *nextAction; *nextAction = 0; if (!action) break; (*action)(); } catch (Error& e) { fprintf(stderr, "Error in worker thread: %s\n", e.what()); } catch (...) { fprintf(stderr, "Error in worker thread: %s\n", unknown_error().what()); } } //cout << "Worker thread finished" << endl; whenFinished->set(); return Void(); });*/ } StringRef setK( Arena& arena, int i ) { char t[ sizeof(i) ]; *(int*)t = i; const int keySize = 16; char* ss = new (arena) char[ keySize ]; for(int c=0; cset(); } // Wait for workers to terminate; otherwise can get crashes at shutdown time for(int i=0; iblock(); } SkipList versionHistory; Key removalKey; Version oldestVersion; std::vector worker_nextAction; std::vector worker_ready; std::vector worker_finished; }; ConflictSet* newConflictSet() { return new ConflictSet; } void clearConflictSet( ConflictSet* cs, Version v ) { SkipList(v).swap( cs->versionHistory ); } void destroyConflictSet(ConflictSet* cs) { delete cs; } ConflictBatch::ConflictBatch( ConflictSet* cs, std::map< int, std::vector< int > >* conflictingKeyRangeMap ) : cs(cs), transactionCount(0), conflictingKeyRangeMap(conflictingKeyRangeMap) { } ConflictBatch::~ConflictBatch() { } struct TransactionInfo { VectorRef< std::pair > readRanges; VectorRef< std::pair > writeRanges; bool tooOld; bool reportConflictingKeys; }; void ConflictBatch::addTransaction( const CommitTransactionRef& tr ) { int t = transactionCount++; Arena& arena = transactionInfo.arena(); TransactionInfo* info = new (arena) TransactionInfo; info->reportConflictingKeys = tr.report_conflicting_keys; if (tr.read_snapshot < cs->oldestVersion && tr.read_conflict_ranges.size()) { info->tooOld = true; } else { info->tooOld = false; info->readRanges.resize( arena, tr.read_conflict_ranges.size() ); info->writeRanges.resize( arena, tr.write_conflict_ranges.size() ); std::vector& points = this->points; for(int r=0; rreadRanges[r].first); //points.back().keyEnd = StringRef(buf,range.second); points.emplace_back(range.end, false, false, false, t, &info->readRanges[r].second); combinedReadConflictRanges.emplace_back(range.begin, range.end, tr.read_snapshot, t, r, tr.report_conflicting_keys ? &(*conflictingKeyRangeMap)[t] : nullptr); } for(int r=0; rwriteRanges[r].first); points.emplace_back(range.end, false, false, true, t, &info->writeRanges[r].second); } } this->transactionInfo.push_back( arena, info ); } class MiniConflictSet2 : NonCopyable { std::vector values; public: explicit MiniConflictSet2( int size ) { values.assign( size, false ); } void set( int begin, int end ) { for(int i=begin; i values; // undefined when andValues is true for a range of values std::vector orValues; std::vector andValues; MiniConflictSet2 debug; // SOMEDAY: Test on big ranges, eliminate this uint64_t bitMask(unsigned int bit){ // computes results for bit%word return (((wordType)1) << ( bit & bucketMask )); // '&' unnecesary? } void setNthBit(std::vector& v, const unsigned int bit){ v[bit>>bucketShift] |= bitMask(bit); } void clearNthBit(std::vector& v, const unsigned int bit){ v[bit>>bucketShift] &= ~(bitMask(bit)); } bool getNthBit(const std::vector& v, const unsigned int bit){ return (v[bit>>bucketShift] & bitMask(bit)) != 0; } int wordsForNBits(unsigned int bits){ return (bits+((1<>bucketShift; } wordType highBits(int b){ // bits (b&bucketMask) and higher are 1 #pragma warning(disable: 4146) return -(wordType(1) << b); #pragma warning(default: 4146) } wordType lowBits(int b){ // bits lower than b are 1 return (wordType(1)<& v, int bitBegin, int bitEnd, bool fillMiddle){ if (bitBegin >= bitEnd) return; int beginWord = bitBegin>>bucketShift; int lastWord = ((bitEnd+bucketMask) >> bucketShift) - 1; if (beginWord == lastWord){ v[beginWord] |= highBits(bitBegin) & lowBits2(bitEnd); } else { v[beginWord] |= highBits(bitBegin); if (fillMiddle) for(int w=beginWord+1;w& v, int bitBegin, int bitEnd, bool getMiddle) { if (bitBegin >= bitEnd) return false; int beginWord = bitBegin >> bucketShift; int lastWord = ((bitEnd+bucketMask) >> bucketShift) - 1; if (beginWord == lastWord) return (v[beginWord] & highBits(bitBegin) & lowBits2(bitEnd)) != 0; else { if (getMiddle) for(int w=beginWord+1; w>bucketShift; int lastWord = ((end+bucketMask) >> bucketShift) - 1; setBits(values, begin, end, false); setBits(andValues, beginWord+1, lastWord, true); setBits(orValues, beginWord, lastWord+1, true); } bool any(int begin, int end) { bool a = orImpl(begin,end); bool b = debug.any(begin,end); ASSERT( a == b ); return b; } bool orImpl( int begin, int end ) { if (begin == end) return false; int beginWord = begin>>bucketShift; int lastWord = ((end+bucketMask) >> bucketShift) - 1; return orBits( orValues, beginWord+1, lastWord, true ) || getNthBit( andValues, beginWord ) || getNthBit( andValues, lastWord ) || orBits( values, begin, end, false ); } }; void ConflictBatch::checkIntraBatchConflicts() { int index = 0; for(int p=0; p& tooOldTransactions) { for (int i = 0; itooOld) { tooOldTransactions.push_back(i); } } } void ConflictBatch::detectConflicts(Version now, Version newOldestVersion, std::vector& nonConflicting, std::vector* tooOldTransactions) { double t = timer(); sortPoints( points ); //std::sort( combinedReadConflictRanges.begin(), combinedReadConflictRanges.end() ); g_sort += timer()-t; transactionConflictStatus = new bool[ transactionCount ]; memset(transactionConflictStatus, 0, transactionCount*sizeof(bool)); t = timer(); checkReadConflictRanges(); g_checkRead += timer()-t; t = timer(); checkIntraBatchConflicts(); g_checkBatch += timer()-t; t = timer(); combineWriteConflictRanges(); g_combine += timer()-t; t = timer(); mergeWriteConflictRanges(now); g_merge += timer()-t; for (int i = 0; i < transactionCount; i++) { if (!transactionConflictStatus[i]) nonConflicting.push_back( i ); if (tooOldTransactions && transactionInfo[i]->tooOld) tooOldTransactions->push_back(i); } delete[] transactionConflictStatus; t = timer(); if (newOldestVersion > cs->oldestVersion) { cs->oldestVersion = newOldestVersion; SkipList::Finger finger; int temp; cs->versionHistory.find( &cs->removalKey, &finger, &temp, 1 ); cs->versionHistory.removeBefore( cs->oldestVersion, finger, combinedWriteConflictRanges.size()*3 + 10 ); cs->removalKey = finger.getValue(); } g_removeBefore += timer()-t; } void ConflictBatch::checkReadConflictRanges() { if (!combinedReadConflictRanges.size()) return; if (PARALLEL_THREAD_COUNT) { Event done[PARALLEL_THREAD_COUNT?PARALLEL_THREAD_COUNT:1]; for(int t=0; tworker_nextAction[t] = action( [&,t] { #pragma GCC diagnostic push DISABLE_ZERO_DIVISION_FLAG auto begin = &combinedReadConflictRanges[0] + t*combinedReadConflictRanges.size()/PARALLEL_THREAD_COUNT; auto end = &combinedReadConflictRanges[0] + (t+1)*combinedReadConflictRanges.size()/PARALLEL_THREAD_COUNT; #pragma GCC diagnostic pop cs->versionHistory.detectConflicts( begin, end-begin, transactionConflictStatus ); done[t].set(); }); cs->worker_ready[t]->set(); } for(int i=0; iversionHistory.detectConflicts( &combinedReadConflictRanges[0], combinedReadConflictRanges.size(), transactionConflictStatus ); } } void ConflictBatch::addConflictRanges(Version now, std::vector< std::pair >::iterator begin, std::vector< std::pair >::iterator end,SkipList* part) { int count = end-begin; #if 0 //for(auto w = begin; w != end; ++w) for(auto w = end-1; w != begin-1; --w) part->addConflictRange( w->first, w->second, now ); #else static_assert( sizeof( begin[0] ) == sizeof(StringRef)*2, "Write Conflict Range type not convertible to two StringPtrs" ); const StringRef* strings = reinterpret_cast( &*begin ); int stringCount = count*2; static const int stripeSize = 16; SkipList::Finger fingers[ stripeSize ]; int temp[ stripeSize ]; int stripes = (stringCount+stripeSize-1)/stripeSize; int ss = stringCount - (stripes-1)*stripeSize; for(int s=stripes-1; s>=0; s--) { part->find( &strings[s * stripeSize], &fingers[0], temp, ss ); part->addConflictRanges( &fingers[0], ss/2, now ); ss = stripeSize; } #endif } void ConflictBatch::mergeWriteConflictRanges(Version now) { if (!combinedWriteConflictRanges.size()) return; if (PARALLEL_THREAD_COUNT) { std::vector parts; for (int i = 0; i < PARALLEL_THREAD_COUNT; i++) parts.emplace_back(); std::vector splits( parts.size()-1 ); for(int s=0; sversionHistory.partition( splits.size() ? &splits[0] : NULL, splits.size(), &parts[0] ); std::vector tstart(PARALLEL_THREAD_COUNT), tend(PARALLEL_THREAD_COUNT); Event done[PARALLEL_THREAD_COUNT ? PARALLEL_THREAD_COUNT : 1]; double before = timer(); for(int t=0; tworker_nextAction[t] = action( [&,t] { tstart[t] = timer(); auto begin = combinedWriteConflictRanges.begin() + (t*combinedWriteConflictRanges.size()/parts.size()); auto end = combinedWriteConflictRanges.begin() + ((t+1)*combinedWriteConflictRanges.size()/parts.size()); addConflictRanges(now, begin, end, &parts[t]); tend[t] = timer(); done[t].set(); }); cs->worker_ready[t]->set(); } double launch = timer(); for(int i=0; iversionHistory.concatenate( &parts[0], parts.size() ); } else { addConflictRanges( now, combinedWriteConflictRanges.begin(), combinedWriteConflictRanges.end(), &cs->versionHistory ); } //for(auto w = combinedWriteConflictRanges.begin(); w != combinedWriteConflictRanges.end(); ++w) // versionHistory.addConflictRange( w->first.begin(), w->first.size(), w->second.begin(), w->second.size(), now ); } void ConflictBatch::combineWriteConflictRanges() { int activeWriteCount = 0; for(int i=0; i= %s\n", a, b); } void sse4Test(){ tless("hello", "world"); tless("a", "a"); tless("world", "hello"); tless("world", "worry"); tless("worry", "world"); tless("hello", "hello1"); tless("hello1", "hello"); tless("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaahello", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaworld"); tless("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaworld", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaahello"); tless("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaworld", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaworry"); tless("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaworry", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaworld"); tless("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaahello", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaahello1"); tless("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaahello1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaahello"); char *a = "hello1worldthisisalonglonglongstring"; char *b = "hello"; __m128i aa = *(__m128i*)a; __m128i bb = *(__m128i*)a; int res = _mm_cmpestri(aa, 2, bb, 2, _SIDD_UBYTE_OPS | _SIDD_CMP_EQUAL_EACH | _SIDD_NEGATIVE_POLARITY | _SIDD_LEAST_SIGNIFICANT ); cout << res << endl; } */ void miniConflictSetTest() { for(int i=0; i<2000000; i++) { int size = 64*5; // Also run 64*64*5 to test multiple words of andValues and orValues MiniConflictSet mini(size); for(int j=0; j<2; j++) { int a = deterministicRandom()->randomInt(0, size); int b = deterministicRandom()->randomInt(a, size); mini.set( a, b ); } for(int j=0; j<4; j++) { int a = deterministicRandom()->randomInt(0, size); int b = deterministicRandom()->randomInt(a, size); mini.any( a, b ); // Tests correctness internally } } printf("miniConflictSetTest complete\n"); } void skipListTest() { printf("Skip list test\n"); //sse4Test(); //A test case that breaks the old operator< //KeyInfo a( LiteralStringRef("hello"), true, false, true, -1 ); //KeyInfo b( LiteralStringRef("hello\0"), false, false, false, 0 ); miniConflictSetTest(); setAffinity(0); //showNumaStatus(); double start; ConflictSet* cs = newConflictSet(); Arena testDataArena; VectorRef< VectorRef > testData; testData.resize(testDataArena, 500); std::vector> success( testData.size() ); std::vector> success2( testData.size() ); for(int i=0; irandomInt(0, 20000000); int key2 = key + 1 + deterministicRandom()->randomInt(0, 10); testData[i][j] = KeyRangeRef( setK( testDataArena, key ), setK( testDataArena, key2 ) ); } } printf("Test data generated (%d)\n", deterministicRandom()->randomInt(0,100000)); printf(" %d threads, %d batches, %d/batch\n", PARALLEL_THREAD_COUNT, testData.size(), testData[0].size()); printf("Running\n"); int readCount = 1, writeCount = 1; int cranges = 0, tcount = 0; start = timer(); std::vector> nonConflict( testData.size() ); for(int i=0; i trs; double t = timer(); for(int j=0; j+readCount+writeCount<=testData[i].size(); j+=readCount+writeCount) { CommitTransactionRef tr; for(int k=0; kgetMetric().name().c_str(), skc[c]->getMetric().formatted().c_str()); } //showNumaStatus(); printf("%d entries in version history\n", cs->versionHistory.count()); /*start = timer(); vector> nonConflict2( testData.size() ); SlowConflictSet scs; Standalone> ranges; ranges.resize( ranges.arena(), 1 ); for(int i=0; i(), i + 50 ); } } } printf("Old conflict set: %0.3f sec\n", timer()-start); int aminusb=0, bminusa=0, atotal=0; for(int i=0; i a( testData[i].size() ), b( testData[i].size() ); for(int j=0; j