ThreadPool follow-ups, proactive shutdown and HasReason dependency cleanup #34562

pull furszy wants to merge 4 commits into bitcoin:master from furszy:2026_threadpool_follow_ups changing 20 files +65 −49
  1. furszy commented at 6:00 pm on February 11, 2026: member

    A few follow-ups to #33689, includes:

    1. ThreadPool active-wait during shutdown: Instead of just waiting for workers to finish processing tasks, Stop() now helps them actively. This speeds up the JSON-RPC and REST server shutdown, resulting in a faster node shutdown when many requests remain unhandled. This wasn’t included in the original PR due to the behavior change this introduces.

    2. Decouple HasReason from the unit test framework machinery This avoids providing the entire unit test framework dependency to low-level tests that only require access to the HasReason utility class. Examples are: reverlock_tests.cpp, sync_tests.cpp, util_check_tests.cpp, util_string_tests.cpp, script_parse_tests.cpp and threadpool_tests.cpp. These tests no longer gain access to unnecessary components like the chainstate, node context, caches, etc. It includes @l0rinc’s threadpool_tests.cpp HasReason changes.

    3. Include response body in non-JSON HTTP error messages Straight from @pinheadmz comment, it makes debugging CI issues easier.

  2. threadpool: active-wait during shutdown
    Instead of waiting for the workers to finish processing tasks, help
    them actively inside Stop().
    
    This speeds up the JSON-RPC and REST server shutdown procedure,
    and results in a faster node shutdown when many requests remain unhandled
    a6f5658b05
  3. test: refactor, decouple HasReason from test framework machinery
    Avoid providing the entire unit test framework dependency to tests that only
    require access to the HasReason utility class.
    
    E.g. reverlock_tests.cpp, sync_tests.cpp, util_check_tests.cpp, util_string_tests.cpp,
    and script_parse_tests.cpp only require access to HasReason and nothing else.
    79c303ef67
  4. test: cleanup, use HasReason in threadpool_tests.cpp d2f118251b
  5. test: include response body in non-JSON HTTP error msg
    Useful for debugging issues.
    ddf227371e
  6. DrahtBot commented at 6:00 pm on February 11, 2026: contributor

    The following sections might be updated with supplementary metadata relevant to reviewers and maintainers.

    Reviews

    See the guideline for information on the review process. A summary of reviews will appear here.

    Conflicts

    Reviewers, this pull request conflicts with the following ones:

    • #34577 (http: fix submission during shutdown race by furszy)
    • #34349 (util: Remove brittle and confusing sp::Popen(std::string) by maflcko)
    • #33929 (test: Remove system_tests/run_command runtime dependencies by hebasto)
    • #33421 (node: add BlockTemplateCache by ismaelsadeeq)
    • #32061 (Replace libevent with our own HTTP and socket-handling implementation by pinheadmz)

    If you consider this pull request important, please also help to review the conflicting pull requests. Ideally, start with the one that should be merged first.

  7. fanquake added this to the milestone 31.0 on Feb 11, 2026
  8. in src/util/threadpool.h:146 in ddf227371e


    hodlinator commented at 8:00 pm on February 12, 2026:

    There’s a race here already on master where another thread may run Start() after we release our lock on m_mutex above. Start() will both set m_interrupt = false, and later, the m_work_queue could also be added to from this new “session”, while the moved-out worker-threads that we join() on here are still reading the very same data.

    The HTTP logic doesn’t call Start() a second time AFAIK so it’s not an issue in practice, but would still be nice to fix.

      0diff --git a/src/util/threadpool.h b/src/util/threadpool.h
      1index b75a94157e..9cafbd87d8 100644
      2--- a/src/util/threadpool.h
      3+++ b/src/util/threadpool.h
      4@@ -12,8 +12,8 @@
      5 
      6 #include <algorithm>
      7 #include <condition_variable>
      8-#include <functional>
      9 #include <future>
     10+#include <memory>
     11 #include <queue>
     12 #include <stdexcept>
     13 #include <thread>
     14@@ -46,39 +46,41 @@ class ThreadPool
     15 {
     16 private:
     17     std::string m_name;
     18-    Mutex m_mutex;
     19-    std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
     20-    std::condition_variable m_cv;
     21-    // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
     22-    // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
     23-    // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
     24-    bool m_interrupt GUARDED_BY(m_mutex){false};
     25-    std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
     26-
     27-    void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     28+    struct Batch {
     29+        Mutex m_mutex;
     30+        std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
     31+        std::condition_variable m_cv;
     32+        // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
     33+        // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
     34+        // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
     35+        bool m_interrupt GUARDED_BY(m_mutex){false};
     36+        std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
     37+    };
     38+    Mutex m_batch_mutex;
     39+    std::unique_ptr<Batch> m_batch GUARDED_BY(m_batch_mutex);
     40+
     41+    // The function is executed within the context of a Batch which may no
     42+    // longer be stored in m_batch while stopping.
     43+    void WorkerThread(Batch& batch) EXCLUSIVE_LOCKS_REQUIRED(!batch.m_mutex)
     44     {
     45-        WAIT_LOCK(m_mutex, wait_lock);
     46+        WAIT_LOCK(batch.m_mutex, wait_lock);
     47         for (;;) {
     48-            std::packaged_task<void()> task;
     49-            {
     50-                // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
     51-                if (!m_interrupt && m_work_queue.empty()) {
     52-                    // Block until the pool is interrupted or a task is available.
     53-                    m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
     54-                }
     55-
     56-                // If stopped and no work left, exit worker
     57-                if (m_interrupt && m_work_queue.empty()) {
     58-                    return;
     59-                }
     60-
     61-                task = std::move(m_work_queue.front());
     62-                m_work_queue.pop();
     63+            // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
     64+            if (!batch.m_interrupt && batch.m_work_queue.empty()) {
     65+                // Block until the pool is interrupted or a task is available.
     66+                batch.m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(batch.m_mutex) { return batch.m_interrupt || !batch.m_work_queue.empty(); });
     67+            }
     68+
     69+            // If stopped and no work left, exit worker
     70+            if (batch.m_interrupt && batch.m_work_queue.empty()) {
     71+                return;
     72             }
     73 
     74+            std::packaged_task<void()> task{std::move(batch.m_work_queue.front())};
     75+            batch.m_work_queue.pop();
     76             {
     77                 // Execute the task without the lock
     78-                REVERSE_LOCK(wait_lock, m_mutex);
     79+                REVERSE_LOCK(wait_lock, batch.m_mutex);
     80                 task();
     81             }
     82         }
     83@@ -100,17 +102,20 @@ public:
     84      *
     85      * Must be called from a controller (non-worker) thread.
     86      */
     87-    void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     88+    void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_batch_mutex, !m_batch->m_mutex)
     89     {
     90         assert(num_workers > 0);
     91-        LOCK(m_mutex);
     92-        if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
     93-        m_interrupt = false; // Reset
     94+        LOCK(m_batch_mutex);
     95+        if (m_batch) throw std::runtime_error("Thread pool already started");
     96+
     97+        m_batch = std::make_unique<Batch>();
     98+        LOCK(m_batch->m_mutex);
     99+        m_batch->m_interrupt = false; // Reset
    100 
    101         // Create workers
    102-        m_workers.reserve(num_workers);
    103+        m_batch->m_workers.reserve(num_workers);
    104         for (int i = 0; i < num_workers; i++) {
    105-            m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
    106+            m_batch->m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this, batch = m_batch.get()] { this->WorkerThread(*batch); });
    107         }
    108     }
    109 
    110@@ -122,24 +127,30 @@ public:
    111      *
    112      * Must be called from a controller (non-worker) thread.
    113      */
    114-    void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    115+    void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_batch_mutex, !m_batch->m_mutex)
    116     {
    117         // Notify workers and join them
    118-        std::vector<std::thread> threads_to_join;
    119+        std::unique_ptr<Batch> local_batch;
    120         {
    121-            LOCK(m_mutex);
    122+            LOCK(m_batch_mutex);
    123+            if (!m_batch) return;
    124+
    125+            LOCK(m_batch->m_mutex);
    126             // Ensure Stop() is not called from a worker thread while workers are still registered,
    127             // otherwise a self-join deadlock would occur.
    128-            auto id = std::this_thread::get_id();
    129-            for (const auto& worker : m_workers) assert(worker.get_id() != id);
    130+            const auto id = std::this_thread::get_id();
    131+            for (const auto& worker : m_batch->m_workers) assert(worker.get_id() != id);
    132             // Early shutdown to return right away on any concurrent Submit() call
    133-            m_interrupt = true;
    134-            threads_to_join.swap(m_workers);
    135+            m_batch->m_interrupt = true;
    136+            local_batch.swap(m_batch);
    137         }
    138-        m_cv.notify_all();
    139+        local_batch->m_cv.notify_all();
    140+        std::vector<std::thread> threads_to_join;
    141+        WITH_LOCK(local_batch->m_mutex, threads_to_join.swap(local_batch->m_workers));
    142         for (auto& worker : threads_to_join) worker.join();
    143         // Since we currently wait for tasks completion, sanity-check empty queue
    144-        WITH_LOCK(m_mutex, Assume(m_work_queue.empty()));
    145+        //WITH_LOCK(m_mutex, Assume(m_work_queue.empty())); // Racy! Another thread might have Start() + Submit() after the last join!
    146+        WITH_LOCK(local_batch->m_mutex, Assume(local_batch->m_work_queue.empty()));
    147         // Note: m_interrupt is left true until next Start()
    148     }
    149 
    150@@ -151,19 +162,22 @@ public:
    151      * Note: Ignoring the returned future requires guarding the task against
    152      * uncaught exceptions, as they would otherwise be silently discarded.
    153      */
    154-    template <class F> [[nodiscard]] EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    155-    auto Submit(F&& fn)
    156+    template <class F>
    157+    [[nodiscard]] auto Submit(F&& fn) EXCLUSIVE_LOCKS_REQUIRED(!m_batch_mutex, !m_batch->m_mutex)
    158     {
    159         std::packaged_task task{std::forward<F>(fn)};
    160         auto future{task.get_future()};
    161+        LOCK(m_batch_mutex);
    162+        if (!m_batch) throw std::runtime_error("No active workers; cannot accept new tasks");
    163         {
    164-            LOCK(m_mutex);
    165-            if (m_interrupt || m_workers.empty()) {
    166+            LOCK(m_batch->m_mutex);
    167+            assert(!m_batch->m_workers.empty());
    168+            if (m_batch->m_interrupt) {
    169                 throw std::runtime_error("No active workers; cannot accept new tasks");
    170             }
    171-            m_work_queue.emplace(std::move(task));
    172+            m_batch->m_work_queue.emplace(std::move(task));
    173         }
    174-        m_cv.notify_one();
    175+        m_batch->m_cv.notify_one();
    176         return future;
    177     }
    178 
    179@@ -171,16 +185,18 @@ public:
    180      * [@brief](/bitcoin-bitcoin/contributor/brief/) Execute a single queued task synchronously.
    181      * Removes one task from the queue and executes it on the calling thread.
    182      */
    183-    void ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    184+    void ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_batch_mutex, !m_batch->m_mutex)
    185     {
    186         std::packaged_task<void()> task;
    187         {
    188-            LOCK(m_mutex);
    189-            if (m_work_queue.empty()) return;
    190+            LOCK(m_batch_mutex);
    191+            if (!m_batch) return;
    192+            LOCK(m_batch->m_mutex);
    193+            if (m_batch->m_work_queue.empty()) return;
    194 
    195             // Pop the task
    196-            task = std::move(m_work_queue.front());
    197-            m_work_queue.pop();
    198+            task = std::move(m_batch->m_work_queue.front());
    199+            m_batch->m_work_queue.pop();
    200         }
    201         task();
    202     }
    203@@ -191,20 +207,31 @@ public:
    204      * Wakes all worker threads so they can drain the queue and exit.
    205      * Unlike Stop(), this function does not wait for threads to finish.
    206      */
    207-    void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    208+    void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_batch_mutex, !m_batch->m_mutex)
    209     {
    210-        WITH_LOCK(m_mutex, m_interrupt = true);
    211-        m_cv.notify_all();
    212+        LOCK(m_batch_mutex);
    213+        if (!m_batch) return;
    214+        {
    215+            LOCK(m_batch->m_mutex);
    216+            m_batch->m_interrupt = true;
    217+        }
    218+        m_batch->m_cv.notify_all();
    219     }
    220 
    221-    size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    222+    size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_batch_mutex, !m_batch->m_mutex)
    223     {
    224-        return WITH_LOCK(m_mutex, return m_work_queue.size());
    225+        LOCK(m_batch_mutex);
    226+        if (!m_batch) return 0;
    227+        LOCK(m_batch->m_mutex);
    228+        return m_batch->m_work_queue.size();
    229     }
    230 
    231-    size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    232+    size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_batch_mutex, !m_batch->m_mutex)
    233     {
    234-        return WITH_LOCK(m_mutex, return m_workers.size());
    235+        LOCK(m_batch_mutex);
    236+        if (!m_batch) return 0;
    237+        LOCK(m_batch->m_mutex);
    238+        return m_batch->m_workers.size();
    239     }
    240 };
    241 
    

    maflcko commented at 9:45 am on February 13, 2026:

    style nit, could use a C++11 for loop here:

     0diff --git a/src/test/threadpool_tests.cpp b/src/test/threadpool_tests.cpp
     1index 46acf1d67d..41882345a2 100644
     2--- a/src/test/threadpool_tests.cpp
     3+++ b/src/test/threadpool_tests.cpp
     4@@ -56,8 +56,7 @@ std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, const std::s
     5 
     6     // Fill all workers with blocking tasks
     7     std::vector<std::future<void>> blocking_tasks;
     8-    for (int i = 0; i < num_of_threads_to_block; i++) {
     9-        std::promise<void>& ready = ready_promises[i];
    10+    for (auto& ready : ready_promises) {
    11         blocking_tasks.emplace_back(threadPool.Submit([blocker_future, &ready]() {
    12             ready.set_value();
    13             blocker_future.wait();
    

    maflcko commented at 10:25 am on February 13, 2026:
    Also, there seems missing unit test coverage for what happens with queued tasks after an interrupt (they should be checked to still execute and not be dropped)?
  9. in test/functional/test_framework/authproxy.py:198 in ddf227371e
    194@@ -195,7 +195,7 @@ def _get_response(self):
    195         content_type = http_response.getheader('Content-Type')
    196         if content_type != 'application/json':
    197             raise JSONRPCException(
    198-                {'code': -342, 'message': 'non-JSON HTTP response with \'%i %s\' from server' % (http_response.status, http_response.reason)},
    199+                {'code': -342, 'message': 'non-JSON HTTP response with \'%i %s\' from server: %s' % (http_response.status, http_response.reason, http_response.read().decode())},
    


    maflcko commented at 8:12 pm on February 12, 2026:
    nit: Would be nice to use format or an f-string instead. (It seems a bit odd to use Python to not type types, but then format strings that could throw TypeError)
  10. in src/test/util/common.h:6 in ddf227371e
    0@@ -0,0 +1,26 @@
    1+// Copyright (c) The Bitcoin Core developers
    2+// Distributed under the MIT software license, see the accompanying
    3+// file COPYING or https://www.opensource.org/licenses/mit-license.php.
    4+
    5+#ifndef BITCOIN_TEST_UTIL_COMMON_H
    6+#define BITCOIN_TEST_UTIL_COMMON_H
    


    maflcko commented at 8:18 pm on February 12, 2026:
    nit: Would be nice to call this checkers.h (or so). It is not really “common” if only a few tests use it.

github-metadata-mirror

This is a metadata mirror of the GitHub repository bitcoin/bitcoin. This site is not affiliated with GitHub. Content is generated from a GitHub metadata backup.
generated: 2026-02-17 06:13 UTC

This site is hosted by @0xB10C
More mirrored repositories can be found on mirror.b10c.me