From 179dea5a1b7a3c5eea3b0dc25c9f46faf2455e96 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Tue, 3 Nov 2020 20:29:32 +0000 Subject: [PATCH] Name the RocksDB background threads --- fdbserver/CoroFlow.actor.cpp | 16 ++++++-------- fdbserver/KeyValueStoreRocksDB.actor.cpp | 4 ++-- flow/IThreadPool.cpp | 4 ++-- flow/IThreadPool.h | 6 +++-- flow/Platform.cpp | 28 +++++++++++++++++------- flow/Platform.h | 4 ++-- flow/Trace.cpp | 2 +- 7 files changed, 38 insertions(+), 26 deletions(-) diff --git a/fdbserver/CoroFlow.actor.cpp b/fdbserver/CoroFlow.actor.cpp index 751e89855c..b5012f9803 100644 --- a/fdbserver/CoroFlow.actor.cpp +++ b/fdbserver/CoroFlow.actor.cpp @@ -58,8 +58,7 @@ struct Coroutine /*: IThreadlike*/ { void start() { int result = Coro_startCoro_( swapCoro(coro), coro, this, &entry ); - if (result == ENOMEM) - platform::outOfMemory(); + if (result == ENOMEM) platform::outOfMemory(); } void unblock() { @@ -136,7 +135,7 @@ class WorkPool : public IThreadPool, public ReferenceCountedinit(); - + while (!stop) { pool->queueLock.enter(); if (pool->work.empty()) { @@ -178,8 +177,8 @@ class WorkPool : public IThreadPool, public ReferenceCounted stopOnError( WorkPool* w ) { - try { - wait( w->getError() ); + try { + wait(w->getError()); ASSERT(false); } catch (Error& e) { w->stop(e); @@ -200,7 +199,7 @@ public: } virtual Future getError() { return pool->anyError.getResult(); } - virtual void addThread( IThreadPoolReceiver* userData ) { + virtual void addThread(IThreadPoolReceiver* userData, const char*) { checkError(); auto w = new Worker(pool.getPtr(), userData); @@ -245,7 +244,7 @@ public: for(int i=0; iworkers.size(); i++) pool->workers[i]->stop = true; - std::vector idle; + std::vector idle; std::swap(idle, pool->idle); pool->queueLock.leave(); @@ -294,8 +293,7 @@ void CoroThreadPool::init() { if (!current_coro) { current_coro = main_coro = Coro_new(); - if (main_coro == NULL) - platform::outOfMemory(); + if (main_coro == NULL) platform::outOfMemory(); Coro_initializeMainCoro(main_coro); //printf("Main thread: %d bytes stack presumed available\n", Coro_bytesLeftOnStack(current_coro)); diff --git a/fdbserver/KeyValueStoreRocksDB.actor.cpp b/fdbserver/KeyValueStoreRocksDB.actor.cpp index 6a8edf1f7f..8eea7aacc2 100644 --- a/fdbserver/KeyValueStoreRocksDB.actor.cpp +++ b/fdbserver/KeyValueStoreRocksDB.actor.cpp @@ -359,9 +359,9 @@ struct RocksDBKeyValueStore : IKeyValueStore { { writeThread = createGenericThreadPool(); readThreads = createGenericThreadPool(); - writeThread->addThread(new Writer(db, id)); + writeThread->addThread(new Writer(db, id), "fdb-rocksdb-wr"); for (unsigned i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; ++i) { - readThreads->addThread(new Reader(db)); + readThreads->addThread(new Reader(db), "fdb-rocksdb-re"); } } diff --git a/flow/IThreadPool.cpp b/flow/IThreadPool.cpp index 3e375048f0..9c85f8d2cd 100644 --- a/flow/IThreadPool.cpp +++ b/flow/IThreadPool.cpp @@ -95,9 +95,9 @@ public: virtual Future getError() { return Never(); } // FIXME virtual void addref() { ReferenceCounted::addref(); } virtual void delref() { if (ReferenceCounted::delref_no_destroy()) stop(); } - void addThread( IThreadPoolReceiver* userData ) { + void addThread(IThreadPoolReceiver* userData, const char* name) { threads.push_back(new Thread(this, userData)); - startThread(start, threads.back(), stackSize); + startThread(start, threads.back(), stackSize, name); } void post( PThreadAction action ) { ios.post( ActionWrapper( action ) ); diff --git a/flow/IThreadPool.h b/flow/IThreadPool.h index 651d7c5f5a..4f94d7a5a7 100644 --- a/flow/IThreadPool.h +++ b/flow/IThreadPool.h @@ -22,6 +22,8 @@ #define FLOW_ITHREADPOOL_H #pragma once +#include + #include "flow/flow.h" // The IThreadPool interface represents a thread pool suitable for doing blocking disk-intensive work @@ -47,7 +49,7 @@ public: virtual void init() = 0; }; -struct ThreadAction { +struct ThreadAction { virtual void operator()(IThreadPoolReceiver*) = 0; // self-destructs virtual void cancel() = 0; virtual double getTimeEstimate() = 0; // for simulation @@ -58,7 +60,7 @@ class IThreadPool { public: virtual ~IThreadPool() {} virtual Future getError() = 0; // asynchronously throws an error if there is an internal error - virtual void addThread( IThreadPoolReceiver* userData ) = 0; + virtual void addThread(IThreadPoolReceiver* userData, const char* name = nullptr) = 0; virtual void post( PThreadAction action ) = 0; virtual Future stop(Error const& e = success()) = 0; virtual bool isCoro() const { return false; } diff --git a/flow/Platform.cpp b/flow/Platform.cpp index c43ec4439b..6cea07c096 100644 --- a/flow/Platform.cpp +++ b/flow/Platform.cpp @@ -1047,9 +1047,8 @@ void getDiskStatistics(std::string const& directory, uint64_t& currentIOs, uint6 reads = total_transfers_read; writes = total_transfers_write; writeSectors = total_blocks_read; - readSectors = total_blocks_write; + readSectors = total_blocks_write; } - } dev_t getDeviceId(std::string path) { @@ -2519,11 +2518,19 @@ void setCloseOnExec( int fd ) { } // namespace platform #ifdef _WIN32 -THREAD_HANDLE startThread(void (*func) (void *), void *arg, int stackSize) { - return (void *)_beginthread(func, stackSize, arg); +THREAD_HANDLE startThread(void (*func)(void*), void* arg, int stackSize, const char* name) { + // Convert `const char*` to `const wchar*` because Windows. + size_t newsize = strlen(orig) + 1; + wchar_t* wcstring = new wchar_t[newsize]; + size_t convertedChars = 0; + mbstowcs_s(&convertedChars, wcstring, newsize, name, _TRUNCATE); + auto handle = _beginthread(func, stackSize, arg); + SetThreadDescription(handle, wcstring); + delete[] wcstring; + return (void*)handle; } #elif (defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__)) -THREAD_HANDLE startThread(void *(*func) (void *), void *arg, int stackSize) { +THREAD_HANDLE startThread(void* (*func)(void*), void* arg, int stackSize, const char* name) { pthread_t t; pthread_attr_t attr; @@ -2542,6 +2549,11 @@ THREAD_HANDLE startThread(void *(*func) (void *), void *arg, int stackSize) { pthread_create(&t, &attr, func, arg); pthread_attr_destroy(&attr); + if (name != nullptr) { + // TODO: Should this just truncate? + ASSERT_EQ(pthread_setname_np(t, name), 0); + } + return t; } #else @@ -3273,7 +3285,7 @@ int64_t getNumProfilesCaptured() { void profileHandler(int sig) { #ifdef __linux__ - if(!profileThread) { + if (!profileThread) { return; } @@ -3311,7 +3323,7 @@ void profileHandler(int sig) { #endif } -void setProfilingEnabled(int enabled) { +void setProfilingEnabled(int enabled) { #ifdef __linux__ if(profileThread && enabled && !profilingEnabled && profileRequested) { profilingEnabled = true; @@ -3323,7 +3335,7 @@ void setProfilingEnabled(int enabled) { } #else // No profiling for other platforms! -#endif +#endif } void* checkThread(void *arg) { diff --git a/flow/Platform.h b/flow/Platform.h index 776f560dde..55dd6f3d0e 100644 --- a/flow/Platform.h +++ b/flow/Platform.h @@ -153,13 +153,13 @@ inline static T& makeDependent(T& value) { return value; } #define THREAD_FUNC static void __cdecl #define THREAD_FUNC_RETURN void #define THREAD_HANDLE void * -THREAD_HANDLE startThread(void (func) (void *), void *arg, int stackSize = 0); +THREAD_HANDLE startThread(void(func)(void*), void* arg, int stackSize = 0, const char* name = nullptr); #define THREAD_RETURN return #elif defined(__unixish__) #define THREAD_FUNC static void * #define THREAD_FUNC_RETURN void * #define THREAD_HANDLE pthread_t -THREAD_HANDLE startThread(void *(func) (void *), void *arg, int stackSize = 0); +THREAD_HANDLE startThread(void*(func)(void*), void* arg, int stackSize = 0, const char* name = nullptr); #define THREAD_RETURN return NULL #else #error How do I start a new thread on this platform? diff --git a/flow/Trace.cpp b/flow/Trace.cpp index 2ee97b3996..3a5da85a25 100644 --- a/flow/Trace.cpp +++ b/flow/Trace.cpp @@ -61,7 +61,7 @@ public: Future getError() { return errors.getFuture(); } - void addThread( IThreadPoolReceiver* userData ) { + void addThread(IThreadPoolReceiver* userData, const char*) { ASSERT( !thread ); thread = userData; }