1
0
mirror of https://github.com/apple/foundationdb.git synced 2025-05-31 18:19:35 +08:00

Delete a bunch more code

This commit is contained in:
Daniel Smith 2020-01-31 12:57:43 -05:00
parent 3c64e77179
commit 3f6cacb477

@ -48,35 +48,6 @@ static inline int skfastrand() {
void setAffinity(int proc);
class SlowConflictSet {
public:
bool is_conflict(const VectorRef<KeyRangeRef>& readRanges, Version read_snapshot);
void add(const VectorRef<KeyRangeRef>& clearRanges, const VectorRef<KeyValueRef>& setValues, Version now);
void clear(Version now);
private:
KeyRangeMap<Version> age;
};
bool SlowConflictSet::is_conflict(const VectorRef<KeyRangeRef>& readRanges, Version read_snapshot) {
for (auto range = readRanges.begin(); range != readRanges.end(); ++range) {
auto intersecting = age.intersectingRanges(*range);
for (auto it = intersecting.begin(); it != intersecting.end(); ++it)
if (it.value() > read_snapshot) return true;
}
return false;
}
void SlowConflictSet::clear(Version now) {
age.insert(allKeys, now);
}
void SlowConflictSet::add(const VectorRef<KeyRangeRef>& clearRanges, const VectorRef<KeyValueRef>& setValues,
Version now) {
for (auto c = clearRanges.begin(); c != clearRanges.end(); ++c) age.insert(*c, now);
for (auto s = setValues.begin(); s != setValues.end(); ++s) age.insert(s->key, now);
}
PerfDoubleCounter g_buildTest("Build", skc), g_add("Add", skc), g_add_sort("A.Sort", skc),
g_detectConflicts("Detect", skc), g_sort("D.Sort", skc), g_combine("D.Combine", skc),
g_checkRead("D.CheckRead", skc), g_checkBatch("D.CheckIntraBatch", skc), g_merge("D.MergeWrite", skc),
@ -243,8 +214,6 @@ void sortPoints(std::vector<KeyInfo>& points) {
// copy back into original points array
for (int i = 0; i < st.size; i++) points[st.begin + i] = newPoints[i];
}
// cout << endl << "Radix sort done" << endl;
}
class SkipList : NonCopyable {
@ -267,9 +236,7 @@ private:
uint8_t* value() { return end() + nPointers * (sizeof(Node*) + sizeof(Version)); }
int length() { return valueLength; }
Node* getNext(int i) { return *((Node**)end() + i); }
void setNext(int i, Node* n) {
*((Node**)end() + i) = n;
}
void setNext(int i, Node* n) { *((Node**)end() + i) = n; }
Version getMaxVersion(int i) { return ((Version*)(end() + nPointers * sizeof(Node*)))[i]; }
void setMaxVersion(int i, Version v) { ((Version*)(end() + nPointers * sizeof(Node*)))[i] = v; }
@ -725,11 +692,6 @@ private:
for (int l = 0; l < MaxLevels; l++) {
right.header->setNext(l, f.finger[l]->getNext(l));
f.finger[l]->setNext(l, NULL);
/*if (l) {
// SOMEDAY: Do we actually need these?
right.header->calcVersionForLevel(l);
f.finger[l]->calcVersionForLevel(l);
}*/
}
}
@ -744,23 +706,6 @@ private:
}
};
struct Action {
virtual void operator()() = 0; // self-destructs
};
typedef Action* PAction;
template <class F>
PAction action(F&& f) {
struct FAction : Action, F, FastAllocated<FAction> {
FAction(F&& f) : F(std::move(f)) {}
virtual void operator()() {
F::operator()();
delete this;
}
};
return new FAction(std::move(f));
};
StringRef setK(Arena& arena, int i) {
char t[sizeof(i)];
*(int*)t = i;
@ -783,9 +728,6 @@ struct ConflictSet {
SkipList versionHistory;
Key removalKey;
Version oldestVersion;
std::vector<PAction> worker_nextAction;
std::vector<Event*> worker_ready;
std::vector<Event*> worker_finished;
};
ConflictSet* newConflictSet() {
@ -1021,27 +963,8 @@ void ConflictBatch::detectConflicts(Version now, Version newOldestVersion, std::
void ConflictBatch::checkReadConflictRanges() {
if (!combinedReadConflictRanges.size()) return;
if (PARALLEL_THREAD_COUNT) {
Event done[PARALLEL_THREAD_COUNT ? PARALLEL_THREAD_COUNT : 1];
for (int t = 0; t < PARALLEL_THREAD_COUNT; t++) {
cs->worker_nextAction[t] = action([&, t] {
#pragma GCC diagnostic push
DISABLE_ZERO_DIVISION_FLAG
auto begin =
&combinedReadConflictRanges[0] + t * combinedReadConflictRanges.size() / PARALLEL_THREAD_COUNT;
auto end = &combinedReadConflictRanges[0] +
(t + 1) * combinedReadConflictRanges.size() / PARALLEL_THREAD_COUNT;
#pragma GCC diagnostic pop
cs->versionHistory.detectConflicts(begin, end - begin, transactionConflictStatus);
done[t].set();
});
cs->worker_ready[t]->set();
}
for (int i = 0; i < PARALLEL_THREAD_COUNT; i++) done[i].block();
} else {
cs->versionHistory.detectConflicts(&combinedReadConflictRanges[0], combinedReadConflictRanges.size(),
transactionConflictStatus);
}
cs->versionHistory.detectConflicts(&combinedReadConflictRanges[0], combinedReadConflictRanges.size(),
transactionConflictStatus);
}
void ConflictBatch::addConflictRanges(Version now, std::vector<std::pair<StringRef, StringRef>>::iterator begin,
@ -1068,62 +991,7 @@ void ConflictBatch::addConflictRanges(Version now, std::vector<std::pair<StringR
void ConflictBatch::mergeWriteConflictRanges(Version now) {
if (!combinedWriteConflictRanges.size()) return;
if (PARALLEL_THREAD_COUNT) {
std::vector<SkipList> parts;
for (int i = 0; i < PARALLEL_THREAD_COUNT; i++) parts.emplace_back();
std::vector<StringRef> splits(parts.size() - 1);
for (int s = 0; s < splits.size(); s++)
splits[s] = combinedWriteConflictRanges[(s + 1) * combinedWriteConflictRanges.size() / parts.size()].first;
cs->versionHistory.partition(splits.size() ? &splits[0] : NULL, splits.size(), &parts[0]);
std::vector<double> tstart(PARALLEL_THREAD_COUNT), tend(PARALLEL_THREAD_COUNT);
Event done[PARALLEL_THREAD_COUNT ? PARALLEL_THREAD_COUNT : 1];
double before = timer();
for (int t = 0; t < parts.size(); t++) {
cs->worker_nextAction[t] = action([&, t] {
tstart[t] = timer();
auto begin =
combinedWriteConflictRanges.begin() + (t * combinedWriteConflictRanges.size() / parts.size());
auto end =
combinedWriteConflictRanges.begin() + ((t + 1) * combinedWriteConflictRanges.size() / parts.size());
addConflictRanges(now, begin, end, &parts[t]);
tend[t] = timer();
done[t].set();
});
cs->worker_ready[t]->set();
}
double launch = timer();
for (int i = 0; i < PARALLEL_THREAD_COUNT; i++) done[i].block();
double after = timer();
g_merge_launch += launch - before;
g_merge_fork += *std::min_element(tstart.begin(), tstart.end()) - before;
g_merge_start_var +=
*std::max_element(tstart.begin(), tstart.end()) - *std::min_element(tstart.begin(), tstart.end());
g_merge_end_var += *std::max_element(tend.begin(), tend.end()) - *std::min_element(tend.begin(), tend.end());
g_merge_join += after - *std::max_element(tend.begin(), tend.end());
double run_max = 0, run_min = 1e9;
for (int i = 0; i < tend.size(); i++) {
run_max = max(run_max, tend[i] - tstart[i]);
run_min = min(run_min, tend[i] - tstart[i]);
}
g_merge_run_var += run_max - run_min;
g_merge_run_shortest += run_min;
g_merge_run_longest += run_max;
g_merge_run_total +=
std::accumulate(tend.begin(), tend.end(), 0.0) - std::accumulate(tstart.begin(), tstart.end(), 0.0);
cs->versionHistory.concatenate(&parts[0], parts.size());
} else {
addConflictRanges(now, combinedWriteConflictRanges.begin(), combinedWriteConflictRanges.end(),
&cs->versionHistory);
}
// for(auto w = combinedWriteConflictRanges.begin(); w != combinedWriteConflictRanges.end(); ++w)
// versionHistory.addConflictRange( w->first.begin(), w->first.size(), w->second.begin(), w->second.size(), now );
addConflictRanges(now, combinedWriteConflictRanges.begin(), combinedWriteConflictRanges.end(), &cs->versionHistory);
}
void ConflictBatch::combineWriteConflictRanges() {
@ -1191,7 +1059,7 @@ void skipListTest() {
}
}
printf("Test data generated (%d)\n", deterministicRandom()->randomInt(0, 100000));
printf(" %d threads, %d batches, %d/batch\n", PARALLEL_THREAD_COUNT, testData.size(), testData[0].size());
printf(" %d batches, %d/batch\n", testData.size(), testData[0].size());
printf("Running\n");