IMO stopping the ThreadPool before we stop the libevent dispatch loop is the wrong order. Would ideally do something like the following patch:
0http: Correct shutdown order of libevent vs ThreadPool
1
2g_threadpool_http is initialized before the libevent dispatch loop in g_thread_http,
3so it should be deinitialized in the reverse order. Exit the event loop first so
4that we stop trying to Submit() new tasks to the pool before stopping it. (We will
5still interrupt the pool earlier on, so submission-attempts can still fail).
6
7Also:
8* Delete the accept sockets before exiting the loop.
9* Free eventHTTP from our own thread as events are no longer processed (this also fixes a frequent memory leak where eventHTTP would not be freed).
10
11diff --git a/src/httpserver.cpp b/src/httpserver.cpp
12index 578c4ec693..ec821dd0b0 100644
13--- a/src/httpserver.cpp
14+++ b/src/httpserver.cpp
15@@ -457,15 +457,20 @@ void StopHTTPServer()
16 {
17 LogDebug(BCLog::HTTP, "Stopping HTTP server\n");
18
19- LogDebug(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n");
20- g_threadpool_http.Stop();
21-
22- // Unlisten sockets, these are what make the event loop running, which means
23- // that after this and all connections are closed the event loop will quit.
24+ // Unlisten sockets
25 for (evhttp_bound_socket *socket : boundSockets) {
26 evhttp_del_accept_socket(eventHTTP, socket);
27 }
28 boundSockets.clear();
29+
30+ if (eventBase) {
31+ LogDebug(BCLog::HTTP, "Finishing processing of event loop callbacks\n");
32+ event_base_loopexit(eventBase, nullptr);
33+ }
34+
35+ LogDebug(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n");
36+ g_threadpool_http.Stop();
37+
38 {
39 if (const auto n_connections{g_requests.CountActiveConnections()}; n_connections != 0) {
40 LogDebug(BCLog::HTTP, "Waiting for %d connections to stop HTTP server\n", n_connections);
41@@ -473,13 +478,8 @@ void StopHTTPServer()
42 g_requests.WaitUntilEmpty();
43 }
44 if (eventHTTP) {
45- // Schedule a callback to call evhttp_free in the event base thread, so
46- // that evhttp_free does not need to be called again after the handling
47- // of unfinished request connections that follows.
48- event_base_once(eventBase, -1, EV_TIMEOUT, [](evutil_socket_t, short, void*) {
49- evhttp_free(eventHTTP);
50- eventHTTP = nullptr;
51- }, nullptr, nullptr);
52+ evhttp_free(eventHTTP);
53+ eventHTTP = nullptr;
54 }
55 if (eventBase) {
56 LogDebug(BCLog::HTTP, "Waiting for HTTP event thread to exit\n");
Doing the above allows for merging the ThreadPool’s constructor with Start() and merging the destructor with Stop().
- This follows from not allowing calling
Start() twice, which is implicit in a constructor. The new assert in Start() becomes unnecessary.
- Having
Stop() logic only happen during destruction also implies that only a single thread should have access to the ThreadPool instance at that time. This would prevent the potential race condition I found here: #34562 (review)
0util: Merge ThreadPool ctor with Start(), and ThreadPool dtor with Stop().
1
2HTTP now wraps the pool in std::optional.
3
4diff --git a/src/httpserver.cpp b/src/httpserver.cpp
5index ec821dd0b0..8ca5b7771e 100644
6--- a/src/httpserver.cpp
7+++ b/src/httpserver.cpp
8@@ -75,7 +75,7 @@ static std::vector<HTTPPathHandler> pathHandlers GUARDED_BY(g_httppathhandlers_m
9 //! Bound listening sockets
10 static std::vector<evhttp_bound_socket *> boundSockets;
11 //! Http thread pool - future: encapsulate in HttpContext
12-static ThreadPool g_threadpool_http("http");
13+static std::optional<ThreadPool> g_threadpool_http;
14 static int g_max_queue_depth{100};
15
16 /**
17@@ -252,7 +252,7 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
18
19 // Dispatch to worker thread
20 if (i != iend) {
21- if (static_cast<int>(g_threadpool_http.WorkQueueSize()) >= g_max_queue_depth) {
22+ if (static_cast<int>(g_threadpool_http->WorkQueueSize()) >= g_max_queue_depth) {
23 LogWarning("Request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting");
24 hreq->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded");
25 return;
26@@ -276,7 +276,7 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
27 req->WriteReply(HTTP_INTERNAL_SERVER_ERROR, err_msg);
28 };
29
30- if (auto res = g_threadpool_http.Submit(std::move(item)); !res.has_value()) {
31+ if (auto res = g_threadpool_http->Submit(std::move(item)); !res.has_value()) {
32 // Both SubmitError::Inactive and SubmitError::Interrupted mean shutdown
33 LogWarning("HTTP request rejected during server shutdown: '%s'", SubmitErrorString(res.error()));
34 hreq->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Request rejected during server shutdown");
35@@ -438,7 +438,7 @@ void StartHTTPServer()
36 {
37 int rpcThreads = std::max((long)gArgs.GetIntArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L);
38 LogInfo("Starting HTTP server with %d worker threads\n", rpcThreads);
39- g_threadpool_http.Start(rpcThreads);
40+ g_threadpool_http.emplace("http", rpcThreads);
41 g_thread_http = std::thread(ThreadHTTP, eventBase);
42 }
43
44@@ -450,7 +450,7 @@ void InterruptHTTPServer()
45 evhttp_set_gencb(eventHTTP, http_reject_request_cb, nullptr);
46 }
47 // Interrupt pool after disabling requests
48- g_threadpool_http.Interrupt();
49+ g_threadpool_http->Interrupt();
50 }
51
52 void StopHTTPServer()
53@@ -469,7 +469,7 @@ void StopHTTPServer()
54 }
55
56 LogDebug(BCLog::HTTP, "Waiting for HTTP worker threads to exit\n");
57- g_threadpool_http.Stop();
58+ g_threadpool_http.reset();
59
60 {
61 if (const auto n_connections{g_requests.CountActiveConnections()}; n_connections != 0) {
62diff --git a/src/util/threadpool.h b/src/util/threadpool.h
63index e1f2d86ed0..7a3d77e453 100644
64--- a/src/util/threadpool.h
65+++ b/src/util/threadpool.h
66@@ -87,13 +87,6 @@ private:
67 }
68
69 public:
70- explicit ThreadPool(const std::string& name) : m_name(name) {}
71-
72- ~ThreadPool()
73- {
74- Stop(); // In case it hasn't been stopped.
75- }
76-
77 /**
78 * [@brief](/bitcoin-bitcoin/contributor/brief/) Start worker threads.
79 *
80@@ -104,12 +97,11 @@ public:
81 * initialization and idle shutdown patterns, callers must provide their
82 * own synchronization.
83 */
84- void Start(const int num_workers) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
85+ explicit ThreadPool(const std::string& name, const int num_workers) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
86+ : m_name(name)
87 {
88 assert(num_workers > 0);
89 LOCK(m_mutex);
90- assert(m_workers.empty());
91- m_interrupt = false; // Reset
92
93 // Create workers
94 m_workers.reserve(num_workers);
95@@ -126,7 +118,7 @@ public:
96 *
97 * Must be called from a controller (non-worker) thread.
98 */
99- void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
100+ ~ThreadPool() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
101 {
102 // Notify workers and join them
103 std::vector<std::thread> threads_to_join;
Curious to hear what others think. This is not blocking for me within this PR. I haven’t thoroughly tested the libevent change beyond running functional tests and I can understand not wanting to touch libevent behavior. Maybe the change to ThreadPool should wait until #32061 lands.