SysTester: use boost io_context for scheduling

This commit is contained in:
Vaidas Gasiunas 2022-02-23 15:43:01 +01:00
parent e50c4320f0
commit f5d722b65b

View File

@ -20,127 +20,45 @@
#include "SysTestScheduler.h"
#include "flow/Arena.h"
#include "flow/ThreadPrimitives.h"
#include "flow/ThreadSafeQueue.h"
#include "flow/IRandom.h"
#include <iostream>
#include <thread>
#include <cassert>
#include <boost/asio.hpp>
using namespace boost::asio;
namespace FDBSystemTester {
class SingleThreadedScheduler : public IScheduler {
class AsioScheduler : public IScheduler {
public:
SingleThreadedScheduler() : stopRequested(false), sleeping(false), thr(nullptr) {}
~SingleThreadedScheduler() override {
if (thr) {
delete thr;
}
}
AsioScheduler(int numThreads) : numThreads(numThreads) {}
void start() override {
assert(thr == nullptr);
assert(!stop_);
thr = new std::thread([this]() { this->threadMain(); });
}
void schedule(TTaskFct task) override {
taskQueue.push(task);
wake();
}
void stop() override {
if (stopRequested.exchange(true) == false) {
if (thr) {
wake();
}
}
}
void join() override {
assert(thr);
thr->join();
}
private:
void threadMain() {
while (!stopRequested) {
Optional<TTaskFct> t = taskQueue.pop();
if (t.present()) {
t.get()();
continue;
}
sleeping = true;
wakeEvent.block();
sleeping = false;
continue;
}
}
void wake() {
while (sleeping) {
wakeEvent.set();
}
}
ThreadSafeQueue<TTaskFct> taskQueue;
std::atomic<bool> stopRequested;
std::atomic<bool> sleeping;
Event wakeEvent;
std::thread* thr;
};
class MultiThreadedScheduler : public IScheduler {
public:
MultiThreadedScheduler(int numThreads) : numThreads(numThreads) {
work = require(io_ctx.get_executor(), execution::outstanding_work.tracked);
for (int i = 0; i < numThreads; i++) {
schedulers.push_back(new SingleThreadedScheduler());
threads.emplace_back([this]() { io_ctx.run(); });
}
}
~MultiThreadedScheduler() override {
for (auto sch : schedulers) {
delete sch;
}
}
void schedule(TTaskFct task) override { post(io_ctx, task); }
void start() override {
for (auto sch : schedulers) {
sch->start();
}
}
void schedule(TTaskFct task) override {
int idx = deterministicRandom()->randomInt(0, numThreads);
schedulers[idx]->schedule(task);
}
void stop() override {
for (auto sch : schedulers) {
sch->stop();
}
}
void stop() override { work = any_io_executor(); }
void join() override {
for (auto sch : schedulers) {
sch->join();
for (auto& th : threads) {
th.join();
}
}
private:
std::vector<IScheduler*> schedulers;
int numThreads;
std::vector<std::thread> threads;
io_context io_ctx;
any_io_executor work;
};
IScheduler* createScheduler(int numThreads) {
assert(numThreads > 0 && numThreads <= 1000);
if (numThreads == 1) {
return new SingleThreadedScheduler();
} else {
return new MultiThreadedScheduler(numThreads);
}
return new AsioScheduler(numThreads);
}
} // namespace FDBSystemTester