diff --git a/fdbserver/SkipList.cpp b/fdbserver/SkipList.cpp index f75b60c465..eb93e0d2e3 100644 --- a/fdbserver/SkipList.cpp +++ b/fdbserver/SkipList.cpp @@ -48,35 +48,6 @@ static inline int skfastrand() { void setAffinity(int proc); -class SlowConflictSet { -public: - bool is_conflict(const VectorRef& readRanges, Version read_snapshot); - void add(const VectorRef& clearRanges, const VectorRef& setValues, Version now); - void clear(Version now); - -private: - KeyRangeMap age; -}; - -bool SlowConflictSet::is_conflict(const VectorRef& readRanges, Version read_snapshot) { - for (auto range = readRanges.begin(); range != readRanges.end(); ++range) { - auto intersecting = age.intersectingRanges(*range); - for (auto it = intersecting.begin(); it != intersecting.end(); ++it) - if (it.value() > read_snapshot) return true; - } - return false; -} - -void SlowConflictSet::clear(Version now) { - age.insert(allKeys, now); -} - -void SlowConflictSet::add(const VectorRef& clearRanges, const VectorRef& setValues, - Version now) { - for (auto c = clearRanges.begin(); c != clearRanges.end(); ++c) age.insert(*c, now); - for (auto s = setValues.begin(); s != setValues.end(); ++s) age.insert(s->key, now); -} - PerfDoubleCounter g_buildTest("Build", skc), g_add("Add", skc), g_add_sort("A.Sort", skc), g_detectConflicts("Detect", skc), g_sort("D.Sort", skc), g_combine("D.Combine", skc), g_checkRead("D.CheckRead", skc), g_checkBatch("D.CheckIntraBatch", skc), g_merge("D.MergeWrite", skc), @@ -243,8 +214,6 @@ void sortPoints(std::vector& points) { // copy back into original points array for (int i = 0; i < st.size; i++) points[st.begin + i] = newPoints[i]; } - - // cout << endl << "Radix sort done" << endl; } class SkipList : NonCopyable { @@ -267,9 +236,7 @@ private: uint8_t* value() { return end() + nPointers * (sizeof(Node*) + sizeof(Version)); } int length() { return valueLength; } Node* getNext(int i) { return *((Node**)end() + i); } - void setNext(int i, Node* n) { - *((Node**)end() + i) = n; - } + void setNext(int i, Node* n) { *((Node**)end() + i) = n; } Version getMaxVersion(int i) { return ((Version*)(end() + nPointers * sizeof(Node*)))[i]; } void setMaxVersion(int i, Version v) { ((Version*)(end() + nPointers * sizeof(Node*)))[i] = v; } @@ -725,11 +692,6 @@ private: for (int l = 0; l < MaxLevels; l++) { right.header->setNext(l, f.finger[l]->getNext(l)); f.finger[l]->setNext(l, NULL); - /*if (l) { - // SOMEDAY: Do we actually need these? - right.header->calcVersionForLevel(l); - f.finger[l]->calcVersionForLevel(l); - }*/ } } @@ -744,23 +706,6 @@ private: } }; -struct Action { - virtual void operator()() = 0; // self-destructs -}; -typedef Action* PAction; - -template -PAction action(F&& f) { - struct FAction : Action, F, FastAllocated { - FAction(F&& f) : F(std::move(f)) {} - virtual void operator()() { - F::operator()(); - delete this; - } - }; - return new FAction(std::move(f)); -}; - StringRef setK(Arena& arena, int i) { char t[sizeof(i)]; *(int*)t = i; @@ -783,9 +728,6 @@ struct ConflictSet { SkipList versionHistory; Key removalKey; Version oldestVersion; - std::vector worker_nextAction; - std::vector worker_ready; - std::vector worker_finished; }; ConflictSet* newConflictSet() { @@ -1021,27 +963,8 @@ void ConflictBatch::detectConflicts(Version now, Version newOldestVersion, std:: void ConflictBatch::checkReadConflictRanges() { if (!combinedReadConflictRanges.size()) return; - if (PARALLEL_THREAD_COUNT) { - Event done[PARALLEL_THREAD_COUNT ? PARALLEL_THREAD_COUNT : 1]; - for (int t = 0; t < PARALLEL_THREAD_COUNT; t++) { - cs->worker_nextAction[t] = action([&, t] { -#pragma GCC diagnostic push - DISABLE_ZERO_DIVISION_FLAG - auto begin = - &combinedReadConflictRanges[0] + t * combinedReadConflictRanges.size() / PARALLEL_THREAD_COUNT; - auto end = &combinedReadConflictRanges[0] + - (t + 1) * combinedReadConflictRanges.size() / PARALLEL_THREAD_COUNT; -#pragma GCC diagnostic pop - cs->versionHistory.detectConflicts(begin, end - begin, transactionConflictStatus); - done[t].set(); - }); - cs->worker_ready[t]->set(); - } - for (int i = 0; i < PARALLEL_THREAD_COUNT; i++) done[i].block(); - } else { - cs->versionHistory.detectConflicts(&combinedReadConflictRanges[0], combinedReadConflictRanges.size(), - transactionConflictStatus); - } + cs->versionHistory.detectConflicts(&combinedReadConflictRanges[0], combinedReadConflictRanges.size(), + transactionConflictStatus); } void ConflictBatch::addConflictRanges(Version now, std::vector>::iterator begin, @@ -1068,62 +991,7 @@ void ConflictBatch::addConflictRanges(Version now, std::vector parts; - for (int i = 0; i < PARALLEL_THREAD_COUNT; i++) parts.emplace_back(); - - std::vector splits(parts.size() - 1); - for (int s = 0; s < splits.size(); s++) - splits[s] = combinedWriteConflictRanges[(s + 1) * combinedWriteConflictRanges.size() / parts.size()].first; - - cs->versionHistory.partition(splits.size() ? &splits[0] : NULL, splits.size(), &parts[0]); - std::vector tstart(PARALLEL_THREAD_COUNT), tend(PARALLEL_THREAD_COUNT); - Event done[PARALLEL_THREAD_COUNT ? PARALLEL_THREAD_COUNT : 1]; - double before = timer(); - for (int t = 0; t < parts.size(); t++) { - cs->worker_nextAction[t] = action([&, t] { - tstart[t] = timer(); - auto begin = - combinedWriteConflictRanges.begin() + (t * combinedWriteConflictRanges.size() / parts.size()); - auto end = - combinedWriteConflictRanges.begin() + ((t + 1) * combinedWriteConflictRanges.size() / parts.size()); - - addConflictRanges(now, begin, end, &parts[t]); - - tend[t] = timer(); - done[t].set(); - }); - cs->worker_ready[t]->set(); - } - double launch = timer(); - for (int i = 0; i < PARALLEL_THREAD_COUNT; i++) done[i].block(); - double after = timer(); - - g_merge_launch += launch - before; - g_merge_fork += *std::min_element(tstart.begin(), tstart.end()) - before; - g_merge_start_var += - *std::max_element(tstart.begin(), tstart.end()) - *std::min_element(tstart.begin(), tstart.end()); - g_merge_end_var += *std::max_element(tend.begin(), tend.end()) - *std::min_element(tend.begin(), tend.end()); - g_merge_join += after - *std::max_element(tend.begin(), tend.end()); - double run_max = 0, run_min = 1e9; - for (int i = 0; i < tend.size(); i++) { - run_max = max(run_max, tend[i] - tstart[i]); - run_min = min(run_min, tend[i] - tstart[i]); - } - g_merge_run_var += run_max - run_min; - g_merge_run_shortest += run_min; - g_merge_run_longest += run_max; - g_merge_run_total += - std::accumulate(tend.begin(), tend.end(), 0.0) - std::accumulate(tstart.begin(), tstart.end(), 0.0); - - cs->versionHistory.concatenate(&parts[0], parts.size()); - } else { - addConflictRanges(now, combinedWriteConflictRanges.begin(), combinedWriteConflictRanges.end(), - &cs->versionHistory); - } - - // for(auto w = combinedWriteConflictRanges.begin(); w != combinedWriteConflictRanges.end(); ++w) - // versionHistory.addConflictRange( w->first.begin(), w->first.size(), w->second.begin(), w->second.size(), now ); + addConflictRanges(now, combinedWriteConflictRanges.begin(), combinedWriteConflictRanges.end(), &cs->versionHistory); } void ConflictBatch::combineWriteConflictRanges() { @@ -1191,7 +1059,7 @@ void skipListTest() { } } printf("Test data generated (%d)\n", deterministicRandom()->randomInt(0, 100000)); - printf(" %d threads, %d batches, %d/batch\n", PARALLEL_THREAD_COUNT, testData.size(), testData[0].size()); + printf(" %d batches, %d/batch\n", testData.size(), testData[0].size()); printf("Running\n");