There’s a race here already on master where another thread may run Start() after we release our lock on m_mutex above. Start() will both set m_interrupt = false, and later, the m_work_queue could also be added to from this new “session”, while the moved-out worker-threads that we join() on here are still reading the very same data.
The HTTP logic doesn’t call Start() a second time AFAIK so it’s not an issue in practice, but would still be nice to fix.
0diff --git a/src/util/threadpool.h b/src/util/threadpool.h
1index b75a94157e..9cafbd87d8 100644
2--- a/src/util/threadpool.h
3+++ b/src/util/threadpool.h
4@@ -12,8 +12,8 @@
5
6 #include <algorithm>
7 #include <condition_variable>
8-#include <functional>
9 #include <future>
10+#include <memory>
11 #include <queue>
12 #include <stdexcept>
13 #include <thread>
14@@ -46,39 +46,41 @@ class ThreadPool
15 {
16 private:
17 std::string m_name;
18- Mutex m_mutex;
19- std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
20- std::condition_variable m_cv;
21- // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
22- // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
23- // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
24- bool m_interrupt GUARDED_BY(m_mutex){false};
25- std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
26-
27- void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
28+ struct Batch {
29+ Mutex m_mutex;
30+ std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
31+ std::condition_variable m_cv;
32+ // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
33+ // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
34+ // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
35+ bool m_interrupt GUARDED_BY(m_mutex){false};
36+ std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
37+ };
38+ Mutex m_batch_mutex;
39+ std::unique_ptr<Batch> m_batch GUARDED_BY(m_batch_mutex);
40+
41+ // The function is executed within the context of a Batch which may no
42+ // longer be stored in m_batch while stopping.
43+ void WorkerThread(Batch& batch) EXCLUSIVE_LOCKS_REQUIRED(!batch.m_mutex)
44 {
45- WAIT_LOCK(m_mutex, wait_lock);
46+ WAIT_LOCK(batch.m_mutex, wait_lock);
47 for (;;) {
48- std::packaged_task<void()> task;
49- {
50- // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
51- if (!m_interrupt && m_work_queue.empty()) {
52- // Block until the pool is interrupted or a task is available.
53- m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
54- }
55-
56- // If stopped and no work left, exit worker
57- if (m_interrupt && m_work_queue.empty()) {
58- return;
59- }
60-
61- task = std::move(m_work_queue.front());
62- m_work_queue.pop();
63+ // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
64+ if (!batch.m_interrupt && batch.m_work_queue.empty()) {
65+ // Block until the pool is interrupted or a task is available.
66+ batch.m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(batch.m_mutex) { return batch.m_interrupt || !batch.m_work_queue.empty(); });
67+ }
68+
69+ // If stopped and no work left, exit worker
70+ if (batch.m_interrupt && batch.m_work_queue.empty()) {
71+ return;
72 }
73
74+ std::packaged_task<void()> task{std::move(batch.m_work_queue.front())};
75+ batch.m_work_queue.pop();
76 {
77 // Execute the task without the lock
78- REVERSE_LOCK(wait_lock, m_mutex);
79+ REVERSE_LOCK(wait_lock, batch.m_mutex);
80 task();
81 }
82 }
83@@ -100,17 +102,20 @@ public:
84 *
85 * Must be called from a controller (non-worker) thread.
86 */
87- void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
88+ void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_batch_mutex, !m_batch->m_mutex)
89 {
90 assert(num_workers > 0);
91- LOCK(m_mutex);
92- if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
93- m_interrupt = false; // Reset
94+ LOCK(m_batch_mutex);
95+ if (m_batch) throw std::runtime_error("Thread pool already started");
96+
97+ m_batch = std::make_unique<Batch>();
98+ LOCK(m_batch->m_mutex);
99+ m_batch->m_interrupt = false; // Reset
100
101 // Create workers
102- m_workers.reserve(num_workers);
103+ m_batch->m_workers.reserve(num_workers);
104 for (int i = 0; i < num_workers; i++) {
105- m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
106+ m_batch->m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this, batch = m_batch.get()] { this->WorkerThread(*batch); });
107 }
108 }
109
110@@ -122,24 +127,30 @@ public:
111 *
112 * Must be called from a controller (non-worker) thread.
113 */
114- void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
115+ void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_batch_mutex, !m_batch->m_mutex)
116 {
117 // Notify workers and join them
118- std::vector<std::thread> threads_to_join;
119+ std::unique_ptr<Batch> local_batch;
120 {
121- LOCK(m_mutex);
122+ LOCK(m_batch_mutex);
123+ if (!m_batch) return;
124+
125+ LOCK(m_batch->m_mutex);
126 // Ensure Stop() is not called from a worker thread while workers are still registered,
127 // otherwise a self-join deadlock would occur.
128- auto id = std::this_thread::get_id();
129- for (const auto& worker : m_workers) assert(worker.get_id() != id);
130+ const auto id = std::this_thread::get_id();
131+ for (const auto& worker : m_batch->m_workers) assert(worker.get_id() != id);
132 // Early shutdown to return right away on any concurrent Submit() call
133- m_interrupt = true;
134- threads_to_join.swap(m_workers);
135+ m_batch->m_interrupt = true;
136+ local_batch.swap(m_batch);
137 }
138- m_cv.notify_all();
139+ local_batch->m_cv.notify_all();
140+ std::vector<std::thread> threads_to_join;
141+ WITH_LOCK(local_batch->m_mutex, threads_to_join.swap(local_batch->m_workers));
142 for (auto& worker : threads_to_join) worker.join();
143 // Since we currently wait for tasks completion, sanity-check empty queue
144- WITH_LOCK(m_mutex, Assume(m_work_queue.empty()));
145+ //WITH_LOCK(m_mutex, Assume(m_work_queue.empty())); // Racy! Another thread might have Start() + Submit() after the last join!
146+ WITH_LOCK(local_batch->m_mutex, Assume(local_batch->m_work_queue.empty()));
147 // Note: m_interrupt is left true until next Start()
148 }
149
150@@ -151,19 +162,22 @@ public:
151 * Note: Ignoring the returned future requires guarding the task against
152 * uncaught exceptions, as they would otherwise be silently discarded.
153 */
154- template <class F> [[nodiscard]] EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
155- auto Submit(F&& fn)
156+ template <class F>
157+ [[nodiscard]] auto Submit(F&& fn) EXCLUSIVE_LOCKS_REQUIRED(!m_batch_mutex, !m_batch->m_mutex)
158 {
159 std::packaged_task task{std::forward<F>(fn)};
160 auto future{task.get_future()};
161+ LOCK(m_batch_mutex);
162+ if (!m_batch) throw std::runtime_error("No active workers; cannot accept new tasks");
163 {
164- LOCK(m_mutex);
165- if (m_interrupt || m_workers.empty()) {
166+ LOCK(m_batch->m_mutex);
167+ assert(!m_batch->m_workers.empty());
168+ if (m_batch->m_interrupt) {
169 throw std::runtime_error("No active workers; cannot accept new tasks");
170 }
171- m_work_queue.emplace(std::move(task));
172+ m_batch->m_work_queue.emplace(std::move(task));
173 }
174- m_cv.notify_one();
175+ m_batch->m_cv.notify_one();
176 return future;
177 }
178
179@@ -171,16 +185,18 @@ public:
180 * [@brief](/bitcoin-bitcoin/contributor/brief/) Execute a single queued task synchronously.
181 * Removes one task from the queue and executes it on the calling thread.
182 */
183- void ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
184+ void ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_batch_mutex, !m_batch->m_mutex)
185 {
186 std::packaged_task<void()> task;
187 {
188- LOCK(m_mutex);
189- if (m_work_queue.empty()) return;
190+ LOCK(m_batch_mutex);
191+ if (!m_batch) return;
192+ LOCK(m_batch->m_mutex);
193+ if (m_batch->m_work_queue.empty()) return;
194
195 // Pop the task
196- task = std::move(m_work_queue.front());
197- m_work_queue.pop();
198+ task = std::move(m_batch->m_work_queue.front());
199+ m_batch->m_work_queue.pop();
200 }
201 task();
202 }
203@@ -191,20 +207,31 @@ public:
204 * Wakes all worker threads so they can drain the queue and exit.
205 * Unlike Stop(), this function does not wait for threads to finish.
206 */
207- void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
208+ void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_batch_mutex, !m_batch->m_mutex)
209 {
210- WITH_LOCK(m_mutex, m_interrupt = true);
211- m_cv.notify_all();
212+ LOCK(m_batch_mutex);
213+ if (!m_batch) return;
214+ {
215+ LOCK(m_batch->m_mutex);
216+ m_batch->m_interrupt = true;
217+ }
218+ m_batch->m_cv.notify_all();
219 }
220
221- size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
222+ size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_batch_mutex, !m_batch->m_mutex)
223 {
224- return WITH_LOCK(m_mutex, return m_work_queue.size());
225+ LOCK(m_batch_mutex);
226+ if (!m_batch) return 0;
227+ LOCK(m_batch->m_mutex);
228+ return m_batch->m_work_queue.size();
229 }
230
231- size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
232+ size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_batch_mutex, !m_batch->m_mutex)
233 {
234- return WITH_LOCK(m_mutex, return m_workers.size());
235+ LOCK(m_batch_mutex);
236+ if (!m_batch) return 0;
237+ LOCK(m_batch->m_mutex);
238+ return m_batch->m_workers.size();
239 }
240 };
241