Switched to a separate test ThreadPoolReceiver

This commit is contained in:
helium 2021-08-11 10:47:51 -07:00
parent 751fc59d54
commit f8edf6e1f2

View File

@ -11,10 +11,9 @@
void forceLinkIThreadPoolTests() {} void forceLinkIThreadPoolTests() {}
static ThreadReturnPromiseStream<std::string> notifications;
struct ThreadNameReceiver final : IThreadPoolReceiver { struct ThreadNameReceiver final : IThreadPoolReceiver {
ThreadNameReceiver(const bool stream_signal = false) : stream_signal(stream_signal) {} ThreadNameReceiver(){};
void init() override {} void init() override {}
@ -32,21 +31,14 @@ struct ThreadNameReceiver final : IThreadPoolReceiver {
if (err != 0) { if (err != 0) {
std::cout << "Get name failed with error code: " << err << std::endl; std::cout << "Get name failed with error code: " << err << std::endl;
a.name.sendError(platform_error()); a.name.sendError(platform_error());
if (stream_signal) {
notifications.sendError(platform_error());
}
return; return;
} }
std::string s = name; std::string s = name;
if (stream_signal) {
notifications.send(s);
}
a.name.send(std::move(s)); a.name.send(std::move(s));
} }
bool stream_signal;
}; };
TEST_CASE("/flow/IThreadPool/NamedThread") { TEST_CASE("/flow/IThreadPool/NamedThread") {
noUnseed = true; noUnseed = true;
@ -72,11 +64,41 @@ TEST_CASE("/flow/IThreadPool/NamedThread") {
return Void(); return Void();
} }
struct ThreadSafePromiseStreamSender final : IThreadPoolReceiver {
ThreadSafePromiseStreamSender(
ThreadReturnPromiseStream<std::string>* notifications)
: notifications(notifications) {}
void init() override {}
struct GetNameAction final : TypedAction<ThreadSafePromiseStreamSender, GetNameAction> {
double getTimeEstimate() const override { return 3.; }
};
void action(GetNameAction& a) {
pthread_t t = pthread_self();
const size_t arrayLen = 16;
char name[arrayLen];
int err = pthread_getname_np(t, name, arrayLen);
if (err != 0) {
std::cout << "Get name failed with error code: " << err << std::endl;
notifications->sendError(platform_error());
return;
}
notifications->send(std::move(name));
}
private:
ThreadReturnPromiseStream<std::string>* notifications;
};
TEST_CASE("/flow/IThreadPool/ThreadReturnPromiseStream") { TEST_CASE("/flow/IThreadPool/ThreadReturnPromiseStream") {
noUnseed = true; noUnseed = true;
state std::unique_ptr<ThreadReturnPromiseStream<std::string>>
notifications(new ThreadReturnPromiseStream<std::string>());
state Reference<IThreadPool> pool = createGenericThreadPool(); state Reference<IThreadPool> pool = createGenericThreadPool();
pool->addThread(new ThreadNameReceiver(/*stream_signal=*/true), "thread-foo"); pool->addThread(new ThreadSafePromiseStreamSender(notifications.get()), "thread-foo");
// Warning: this action is a little racy with the call to `pthread_setname_np`. In practice, // Warning: this action is a little racy with the call to `pthread_setname_np`. In practice,
// ~nothing should depend on the thread name being set instantaneously. If this test ever // ~nothing should depend on the thread name being set instantaneously. If this test ever
@ -84,11 +106,11 @@ TEST_CASE("/flow/IThreadPool/ThreadReturnPromiseStream") {
// the actions. // the actions.
state int num = 3; state int num = 3;
for (int i = 0; i < num; ++i) { for (int i = 0; i < num; ++i) {
auto* a = new ThreadNameReceiver::GetNameAction(); auto* a = new ThreadSafePromiseStreamSender::GetNameAction();
pool->post(a); pool->post(a);
} }
state FutureStream<std::string> futs = notifications.getFuture(); state FutureStream<std::string> futs = notifications->getFuture();
state int n = 0; state int n = 0;
while (n < num) { while (n < num) {