IMO stopping the ThreadPool before we stop the libevent dispatch loop is the wrong order. Would ideally do something like the following patch:
<details><summary>Patch for HTTP shutdown order</summary>
http: Correct shutdown order of libevent vs ThreadPool
g_threadpool_http is initialized before the libevent dispatch loop in g_thread_http,
so it should be deinitialized in the reverse order. Exit the event loop first so
that we stop trying to Submit() new tasks to the pool before stopping it. (We will
still interrupt the pool earlier on, so submission-attempts can still fail).
Also:
* Delete the accept sockets before exiting the loop.
* Free eventHTTP from our own thread as events are no longer processed (this also fixes a frequent memory leak where eventHTTP would not be freed).
diff --git a/src/httpserver.cpp b/src/httpserver.cpp
index 578c4ec693..ec821dd0b0 100644
--- a/src/httpserver.cpp
+++ b/src/httpserver.cpp
@@ -457,15 +457,20 @@ void StopHTTPServer()
{
LogDebug(BCLog::HTTP, "Stopping HTTP server\n");
- LogDebug(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n");
- g_threadpool_http.Stop();
-
- // Unlisten sockets, these are what make the event loop running, which means
- // that after this and all connections are closed the event loop will quit.
+ // Unlisten sockets
for (evhttp_bound_socket *socket : boundSockets) {
evhttp_del_accept_socket(eventHTTP, socket);
}
boundSockets.clear();
+
+ if (eventBase) {
+ LogDebug(BCLog::HTTP, "Finishing processing of event loop callbacks\n");
+ event_base_loopexit(eventBase, nullptr);
+ }
+
+ LogDebug(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n");
+ g_threadpool_http.Stop();
+
{
if (const auto n_connections{g_requests.CountActiveConnections()}; n_connections != 0) {
LogDebug(BCLog::HTTP, "Waiting for %d connections to stop HTTP server\n", n_connections);
@@ -473,13 +478,8 @@ void StopHTTPServer()
g_requests.WaitUntilEmpty();
}
if (eventHTTP) {
- // Schedule a callback to call evhttp_free in the event base thread, so
- // that evhttp_free does not need to be called again after the handling
- // of unfinished request connections that follows.
- event_base_once(eventBase, -1, EV_TIMEOUT, [](evutil_socket_t, short, void*) {
- evhttp_free(eventHTTP);
- eventHTTP = nullptr;
- }, nullptr, nullptr);
+ evhttp_free(eventHTTP);
+ eventHTTP = nullptr;
}
if (eventBase) {
LogDebug(BCLog::HTTP, "Waiting for HTTP event thread to exit\n");
</details>
Doing the above allows for merging the ThreadPool's constructor with Start() and merging the destructor with Stop().
- This follows from not allowing calling
Start() twice, which is implicit in a constructor. The new assert in Start() becomes unnecessary.
- Having
Stop() logic only happen during destruction also implies that only a single thread should have access to the ThreadPool instance at that time. This would prevent the potential race condition I found here: #34562 (review)
<details><summary>Patch to merge ThreadPool ctor with Start() and dtor with Stop()</summary>
util: Merge ThreadPool ctor with Start(), and ThreadPool dtor with Stop().
HTTP now wraps the pool in std::optional.
diff --git a/src/httpserver.cpp b/src/httpserver.cpp
index ec821dd0b0..8ca5b7771e 100644
--- a/src/httpserver.cpp
+++ b/src/httpserver.cpp
@@ -75,7 +75,7 @@ static std::vector<HTTPPathHandler> pathHandlers GUARDED_BY(g_httppathhandlers_m
//! Bound listening sockets
static std::vector<evhttp_bound_socket *> boundSockets;
//! Http thread pool - future: encapsulate in HttpContext
-static ThreadPool g_threadpool_http("http");
+static std::optional<ThreadPool> g_threadpool_http;
static int g_max_queue_depth{100};
/**
@@ -252,7 +252,7 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
// Dispatch to worker thread
if (i != iend) {
- if (static_cast<int>(g_threadpool_http.WorkQueueSize()) >= g_max_queue_depth) {
+ if (static_cast<int>(g_threadpool_http->WorkQueueSize()) >= g_max_queue_depth) {
LogWarning("Request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting");
hreq->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded");
return;
@@ -276,7 +276,7 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
req->WriteReply(HTTP_INTERNAL_SERVER_ERROR, err_msg);
};
- if (auto res = g_threadpool_http.Submit(std::move(item)); !res.has_value()) {
+ if (auto res = g_threadpool_http->Submit(std::move(item)); !res.has_value()) {
// Both SubmitError::Inactive and SubmitError::Interrupted mean shutdown
LogWarning("HTTP request rejected during server shutdown: '%s'", SubmitErrorString(res.error()));
hreq->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Request rejected during server shutdown");
@@ -438,7 +438,7 @@ void StartHTTPServer()
{
int rpcThreads = std::max((long)gArgs.GetIntArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L);
LogInfo("Starting HTTP server with %d worker threads\n", rpcThreads);
- g_threadpool_http.Start(rpcThreads);
+ g_threadpool_http.emplace("http", rpcThreads);
g_thread_http = std::thread(ThreadHTTP, eventBase);
}
@@ -450,7 +450,7 @@ void InterruptHTTPServer()
evhttp_set_gencb(eventHTTP, http_reject_request_cb, nullptr);
}
// Interrupt pool after disabling requests
- g_threadpool_http.Interrupt();
+ g_threadpool_http->Interrupt();
}
void StopHTTPServer()
@@ -469,7 +469,7 @@ void StopHTTPServer()
}
LogDebug(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n");
- g_threadpool_http.Stop();
+ g_threadpool_http.reset();
{
if (const auto n_connections{g_requests.CountActiveConnections()}; n_connections != 0) {
diff --git a/src/util/threadpool.h b/src/util/threadpool.h
index e1f2d86ed0..7a3d77e453 100644
--- a/src/util/threadpool.h
+++ b/src/util/threadpool.h
@@ -87,13 +87,6 @@ private:
}
public:
- explicit ThreadPool(const std::string& name) : m_name(name) {}
-
- ~ThreadPool()
- {
- Stop(); // In case it hasn't been stopped.
- }
-
/**
* [@brief](/bitcoin-bitcoin/contributor/brief/) Start worker threads.
*
@@ -104,12 +97,11 @@ public:
* initialization and idle shutdown patterns, callers must provide their
* own synchronization.
*/
- void Start(const int num_workers) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
+ explicit ThreadPool(const std::string& name, const int num_workers) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
+ : m_name(name)
{
assert(num_workers > 0);
LOCK(m_mutex);
- assert(m_workers.empty());
- m_interrupt = false; // Reset
// Create workers
m_workers.reserve(num_workers);
@@ -126,7 +118,7 @@ public:
*
* Must be called from a controller (non-worker) thread.
*/
- void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
+ ~ThreadPool() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
// Notify workers and join them
std::vector<std::thread> threads_to_join;
</details>
Curious to hear what others think. This is not blocking for me within this PR. I haven't thoroughly tested the libevent change beyond running functional tests and I can understand not wanting to touch libevent behavior. Maybe the change to ThreadPool should wait until #32061 lands.