http: replace WorkQueue and single threads handling for ThreadPool #33689

pull furszy wants to merge 3 commits into bitcoin:master from furszy:2025_threadpool_http_server changing 7 files +630 −119
  1. furszy commented at 2:36 pm on October 23, 2025: member

    This has been a recent discovery; the general thread pool class created for #26966, cleanly integrates into the HTTP server. It simplifies init, shutdown and requests execution logic. Replacing code that was never unit tested for code that is properly unit and fuzz tested. Although our functional test framework extensively uses this RPC interface (that’s how we’ve been ensuring its correct behavior so far - which is not the best).

    This clearly separates the responsibilities: The HTTP server now focuses solely on receiving and dispatching requests, while ThreadPool handles concurrency, queuing, and execution.

    This will also allows us to experiment with further performance improvements at the task queuing and execution level, such as a lock-free structure or task prioritization or any other implementation detail like coroutines in the future, without having to deal with HTTP code that lives on a different layer.

    Note: The rationale behind introducing the ThreadPool first is to be able to easily cherry-pick it across different working paths. Some of the ones that are benefited from it are #26966 for the parallelization of the indexes initial sync, #31132 for the parallelization of the inputs fetching procedure, #32061 for the libevent replacement, the kernel API #30595 (https://github.com/bitcoin/bitcoin/pull/30595#discussion_r2413702370) to avoid blocking validation among others use cases not publicly available.

    Note 2: I could have created a wrapper around the existing code and replaced the WorkQueue in a subsequent commit, but it didn’t seem worth the extra commits and review effort. The ThreadPool implements essentially the same functionality in a more modern and cleaner way.

  2. DrahtBot added the label RPC/REST/ZMQ on Oct 23, 2025
  3. DrahtBot commented at 2:36 pm on October 23, 2025: contributor

    The following sections might be updated with supplementary metadata relevant to reviewers and maintainers.

    Code Coverage & Benchmarks

    For details see: https://corecheck.dev/bitcoin/bitcoin/pulls/33689.

    Reviews

    See the guideline for information on the review process.

    Type Reviewers
    Concept ACK ismaelsadeeq, Eunovo
    Stale ACK pinheadmz, TheCharlatan

    If your review is incorrectly listed, please react with 👎 to this comment and the bot will ignore it on the next update.

    Conflicts

    Reviewers, this pull request conflicts with the following ones:

    • #32061 (Replace libevent with our own HTTP and socket-handling implementation by pinheadmz)
    • #29641 (scripted-diff: Use LogInfo over LogPrintf [WIP, NOMERGE, DRAFT] by maflcko)
    • #27731 (Prevent file descriptor exhaustion from too many RPC calls by fjahr)

    If you consider this pull request important, please also help to review the conflicting pull requests. Ideally, start with the one that should be merged first.

    LLM Linter (✨ experimental)

    Possible typos and grammar issues:

    • catch must be done in the consumer side -> the catch must be done on the consumer side [preposition correction for clarity]

    drahtbot_id_5_m

  4. pinheadmz commented at 2:38 pm on October 23, 2025: member
    concept ACK :-) will be reviewing this
  5. laanwj requested review from laanwj on Oct 23, 2025
  6. laanwj commented at 2:50 pm on October 23, 2025: member
    Adding myself as i wrote the original shitty code.
  7. Raimo33 commented at 9:54 am on October 24, 2025: contributor
    Can’t we use an already existing open source library instead of reinventing the wheel?
  8. in src/test/fuzz/threadpool.cpp:51 in bb71b0e3bf outdated
    48+// Global to verify we always have the same number of threads.
    49+size_t g_num_workers = 3;
    50+
    51+static void setup_threadpool_test()
    52+{
    53+    // Disable logging entirely. It seems to cause memory leaks.
    


    l0rinc commented at 4:31 pm on October 24, 2025:
    Can we investigate that?

    furszy commented at 8:22 pm on October 30, 2025:
    It is a known issue. The pcp fuzz test does it too for the same reason (in an undocumented manner). I only document it properly so we don’t forget this exists. Feel free to investigate it in a separate issue/PR. I’m not planning to do it. It is not an issue of the code introduced in this PR.

    l0rinc commented at 1:22 pm on October 31, 2025:
    k, thanks, please resolve the comment
  9. furszy commented at 7:17 pm on October 24, 2025: member

    Can’t we use an already existing open source library instead of reinventing the wheel?

    That’s a good question. It’s usually where we all start. Generally, the project consensus is to avoid introducing new external dependencies (unless they’re maintained by us) to minimize potential attack vectors. This doesn’t mean we should reinvent everything, just that we need to be very careful about what we decide to include.

    That being said, for the changes introduced in this PR, can argue that we’re encapsulating, documenting, and unit + fuzz testing code that wasn’t covered before, while also improving separation of responsibilities. We’re not adding anything more complex or that behaves radically differently from what we currently have. The nice property of this PR is that it will let us experiment with more complex approaches in the future without having to deal with application-specific code (like the HTTP server code). This also includes learning from other open source libraries for sure.

  10. in src/test/threadpool_tests.cpp:1 in c219b93c3b outdated
    0@@ -0,0 +1,267 @@
    1+// Copyright (c) 2024-present The Bitcoin Core developers
    


    pinheadmz commented at 7:55 pm on October 27, 2025:

    c219b93c3b043de202bdf3c65b433fd17af2da89

    nit: don’t bother with the year any more… (applies to all new files)


    furszy commented at 3:55 pm on October 29, 2025:
    Sure. Done as suggested.
  11. TheCharlatan commented at 10:17 pm on October 27, 2025: contributor
    Approach ACK
  12. in src/util/threadpool.h:53 in c219b93c3b outdated
    48+private:
    49+    std::string m_name;
    50+    Mutex m_mutex;
    51+    std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
    52+    std::condition_variable m_cv;
    53+    // Note: m_interrupt must be modified while holding the same mutex used by threads waiting on the condition variable.
    


    andrewtoth commented at 2:38 pm on October 28, 2025:
    Isn’t this comment already implied by GUARDED_BY(m_mutex)? Also it’s written by the control thread and read by worker threads, so of course it must only be modified while holding the mutex. This comment doesn’t add anything IMO.

    furszy commented at 3:18 pm on October 28, 2025:

    Isn’t this comment already implied by GUARDED_BY(m_mutex)? Also it’s written by the control thread and read by worker threads, so of course it must only be modified while holding the mutex. This comment doesn’t add anything IMO.

    What I meant there is that the variable shouldn’t be an atomic bool alone (in case someone propose to change it in the future). It must share the same mutex as the condition variable; otherwise, workers might not see the update. This is stated in the std::condition_variable ref. I mention it because it is very tempting to avoid locking the mutex during task submission in this way, and might seen like a harmless optimization, but it turns out to cause very subtle issues.

    Maybe you have a better way to state this? Happy to change it if it is not being understood as supposed.


    andrewtoth commented at 4:04 pm on October 28, 2025:

    I see. Then perhaps more directly:

    0    // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
    

    furszy commented at 7:07 pm on October 28, 2025:
    sounds good, taken. thanks!
  13. in src/util/threadpool.h:189 in c219b93c3b outdated
    177+            m_work_queue.pop();
    178+        }
    179+        task();
    180+    }
    181+
    182+    void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    


    andrewtoth commented at 2:51 pm on October 28, 2025:
    I’m unclear what the purpose of this function is. It interrupts the workers, like Stop, but doesn’t join them or wait for them to finish. If the idea is to clear the workqueue and park the threads without being able to add more tasks, it does not accomplish that since the threads will exit their loops as well and not be parked on the cv.

    furszy commented at 3:35 pm on October 28, 2025:

    I’m unclear what the purpose of this function is. It interrupts the workers, like Stop, but doesn’t join them or wait for them to finish. If the idea is to clear the workqueue and park the threads without being able to add more tasks, it does not accomplish that since the threads will exit their loops as well and not be parked on the cv.

    Actually, the goal is to keep the current shutdown semantics unchanged. The process already follows two stages: first, we stop accepting new requests and events (e.g. unregistering the libevent callback to stop incoming requests); then, once no new work can be queued, we call Stop() and wait for the remaining tasks to finish before tearing down the objects.

    Interrupt() belongs to the first stage. It basically block new task submissions during shutdown to avoid queue growth. It’s not intended to clear the queue or park the threads. It is there just to avoid pilling up new tasks during shutdown.


    andrewtoth commented at 3:52 pm on October 28, 2025:
    I see, so the goal is to stop the thread pool but in a non-blocking way. Could the same be done via a bool parameter to Stop to skip joining? Then Stop will be called without that bool set in the destructor to join the threads, if this behavior is desired (like it is in the http server). Also, could we modify Start in that case to not assert on number of threads if m_interrupt is true, and instead join them if we stilll have outstanding threads?

    furszy commented at 7:01 pm on October 28, 2025:

    Could the same be done via a bool parameter to Stop to skip joining?

    If we do that, the threads wouldn’t remain in the m_workers vector anymore. They’d be swapped out on the first call. That means we wouldn’t be able to wait for them to finish later on, which would be quite bad since we’d lose track of when all requests are fulfilled before destroying the backend objects (that’s basically what joining the threads mean for us right now). And this could lead to requests accessing null pointers if we proceed with the shutdown without waiting on the threads to join, etc.

    Also, could we modify Start in that case to not assert on number of threads, and instead join them if m_interrupt is true and we still have outstanding threads?

    We could also help them process task by calling ProcessTask() if something like that happens. But I’m not sure that’s the best design. We would be integrating Stop() inside Start(), which would make callers less careful on when to call Stop(), which should be part of their code design.

    Still, I don’t think we should worry nor overthink this now. The current code starts and stops the thread pool only once during the entire app’s lifecycle.


    andrewtoth commented at 7:09 pm on October 28, 2025:

    I was thinking like this

     0diff --git a/src/util/threadpool.h b/src/util/threadpool.h
     1index 94409facd5..dc1a218abd 100644
     2--- a/src/util/threadpool.h
     3+++ b/src/util/threadpool.h
     4@@ -123,19 +123,19 @@ public:
     5      *
     6      * Must be called from a controller (non-worker) thread.
     7      */
     8-    void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     9+    void Stop(bool join_threads = true) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    10     {
    11         // Notify workers and join them.
    12         std::vector<std::thread> threads_to_join;
    13         {
    14             LOCK(m_mutex);
    15             m_interrupt = true;
    16-            threads_to_join.swap(m_workers);
    17+            if (join_threads) threads_to_join.swap(m_workers);
    18         }
    19         m_cv.notify_all();
    20         for (auto& worker : threads_to_join) worker.join();
    21         // Since we currently wait for tasks completion, sanity-check empty queue
    22-        WITH_LOCK(m_mutex, Assume(m_work_queue.empty()));
    23+         if (join_threads) WITH_LOCK(m_mutex, Assume(m_work_queue.empty()));
    24         // Note: m_interrupt is left true until next Start()
    25     }
    26 
    27@@ -179,12 +179,6 @@ public:
    28         task();
    29     }
    30 
    31-    void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    32-    {
    33-        WITH_LOCK(m_mutex, m_interrupt = true);
    34-        m_cv.notify_all();
    35-    }
    36-
    37     size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    38     {
    39         return WITH_LOCK(m_mutex, return m_work_queue.size());
    

    Still, I don’t think we should worry nor overthink this now. The current code starts and stops the thread pool only once during the entire app’s lifecycle.

    Err, then why not have threads start during construction? Otherwise there’s no need for Start.


    furszy commented at 7:15 pm on October 28, 2025:
    You know, we could call ProcessTask() during Stop() too, which would be a strict improvement over the current locking-wait behavior in master (since we would shut down faster by actively processing pending requests on the shutdown thread as well). This is something that hasn’t been possible until now. Still, this is material for a follow-up; would say that it is better to keep this PR as contained as possible so all other PRs and working paths that depend on this structure can actually make use of it.

    andrewtoth commented at 7:22 pm on October 28, 2025:
    That is a good idea, but not related to my suggestion here :). Yes, that would be a good followup. I don’t think we should have a method that would cause the node to crash if Start is called more than once. I disagree and think just calling Stop at the beginning of Start would be safer than the current implementation.

    furszy commented at 7:37 pm on October 28, 2025:

    hehe, it seems we sent a message at a similar time and missed yours.

    re Interrupt() removal suggestion:

    I find it harder to reason about the current two stages shutdown procedure if we have to call to the same Stop() method twice. At that point it would be simpler to just wait for all tasks to finish on the first call and remove the second one. But that seems suboptimal as there is no rush to wait for them at that point of the shutdown sequence.

    Also, a benefit of keeping Interrupt() is that we can build on top of it and have a faster and non-locking way of checking if the thread pool is enabled. So callers would be able to call something like pool.IsRunning() to see if they can submit tasks without have to worry locking the main mutex and slowing down the workers processing. Something like:

     0diff --git a/src/util/threadpool.h b/src/util/threadpool.h
     1--- a/src/util/threadpool.h	(revision e0ec3232daf2c311471a1da149821bed18853fcc)
     2+++ b/src/util/threadpool.h	(date 1761158485938)
     3@@ -55,6 +55,9 @@
     4     // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
     5     bool m_interrupt GUARDED_BY(m_mutex){false};
     6     std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
     7+    // Enabled only after Start and disabled early on Stop/Interrupt.
     8+    // This lets us do non-blocking 'IsRunning' checks.
     9+    std::atomic<bool> m_running{false};
    10 
    11     void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    12     {
    13@@ -109,9 +112,11 @@
    14         m_interrupt = false; // Reset
    15 
    16         // Create workers
    17         m_workers.reserve(num_workers);
    18         for (int i = 0; i < num_workers; i++) {
    19             m_workers.emplace_back(&util::TraceThread, m_name + "_pool_" + util::ToString(i), [this] { WorkerThread(); });
    20         }
    21+        m_running.store(true, std::memory_order_release);
    22     }
    23 
    24     /**
    25@@ -124,7 +129,9 @@
    26      */
    27     void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    28     {
    29-        // Notify workers and join them.
    30+        // Mark as no longer accepting new tasks
    31+        m_running.store(false, std::memory_order_release);
    32+        // Notify workers and join them
    33         std::vector<std::thread> threads_to_join;
    34         {
    35             LOCK(m_mutex);
    36@@ -147,11 +154,16 @@
    37     template<class F> EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    38     auto Submit(F&& fn)
    39     {
    40+        // Quick rejection based on Start/Stop/Interrupt before locking
    41+        if (!m_running.load(std::memory_order_acquire)) {
    42+            throw std::runtime_error("ThreadPool not running");
    43+        }
    44+
    45         std::packaged_task task{std::forward<F>(fn)};
    46         auto future{task.get_future()};
    47         {
    48             LOCK(m_mutex);
    49             if (m_workers.empty() || m_interrupt) {
    50                 throw std::runtime_error("No active workers; cannot accept new tasks");
    51             }
    52             m_work_queue.emplace(std::move(task));
    53@@ -180,7 +192,9 @@
    54 
    55     void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    56     {
    57+        m_running.store(false, std::memory_order_release);
    58         WITH_LOCK(m_mutex, m_interrupt = true);
    59         m_cv.notify_all();
    60     }
    61 
    62     size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    63@@ -192,6 +206,11 @@
    64     {
    65         return WITH_LOCK(m_mutex, return m_workers.size());
    66     }
    67+
    68+    bool IsRunning() const noexcept
    69+    {
    70+        return m_running.load(std::memory_order_acquire);
    71+    }
    72 };
    73 
    74 #endif // BITCOIN_UTIL_THREADPOOL_H
    

    (this is not needed here because we are disconnecting the http callback prior to interrupting the pool but it seems useful in general).


    furszy commented at 7:46 pm on October 28, 2025:

    I don’t think we should have a method that would cause the node to crash if Start is called more than once. I disagree and think just calling Stop at the beginning of Start would be safer than the current implementation.

    Wouldn’t you agree that if your code calls Start() twice, you have a design issue? Thread pools are typically started once, maintained for the entire app’s lifecycle, and reused across modules. Mainly because creating and destroying threads isn’t cheap. I think it’s fair to expect an exception in that case, as it would indicate poorly structured code.

    I also don’t think anyone would expect Start() to wait for all existing threads to shut down and then spawn new ones. That would be a surprising behavior to me. Why would you spawn new ones if you already have threads available?


    andrewtoth commented at 7:48 pm on October 28, 2025:
    So why have Start and Stop then? Just start threads in constructor and join in destructor? Use Interrupt to stop new tasks from being added. This follows RAII principles.

    furszy commented at 8:11 pm on October 28, 2025:

    So why have Start and Stop then? Just start threads in constructor and join in destructor? Use Interrupt to stop new tasks from being added. This follows RAII principles.

    Good question. We could easily build an RAII wrapper on top of this. The approaches aren’t conflicting; it would just be an additional layer over the current code. Yet, I think the current design smoothly integrates with our existing workflows. We already use the start, interrupt, and stop terms throughout the codebase (check init.cpp and all our components), so following that general convention makes it easier to reason about these components globally. Even without deep knowledge of the http server workflow, anyone familiar with this convention can quickly hook up new functionality.

    Also, I suspect we’ll find opportunities for improvements (or smarter shutdown behaviors) along the way that might require customizing Stop() with some inputs, so might be better not to enforce RAII too early.


    furszy commented at 9:29 pm on October 28, 2025:
    Follow-up to this convo after some great back-and-forth via DM with Andrew. Usually, RAII works best when we don’t care exactly when an object gets destroyed. In this case, though, we actually do care about that moment (or, in our terms, when it’s stopped), since it marks the point where there are no pending tasks and it’s safe to tear down the backend handlers. If we don’t enforce that explicitly, the shutdown procedure could continue while workers are still active, leading them to access null pointers, etc. This is something we can’t guarantee if we just let the object be destroyed whenever the program releases it using RAII. So even using RAII, we would be forced to destroy the object manually anyway. Which kinda defeats the purpose of it.

    l0rinc commented at 3:37 pm on October 30, 2025:
    I was also wondering what this was meant to so, especially since there isn’t any test coverage for it.
  14. in src/test/threadpool_tests.cpp:111 in c219b93c3b outdated
    106+        const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT-1, /*context=*/"test2 blocking tasks enabled");
    107+
    108+        // Now execute tasks on the single available worker
    109+        // and check that all the tasks are executed.
    110+        int num_tasks = 15;
    111+        std::atomic<int> counter = 0;
    


    pinheadmz commented at 2:58 pm on October 28, 2025:

    c219b93c3b043de202bdf3c65b433fd17af2da89

    I was wondering if there’s any way to assert that the unblocked tasks are all executing on one single remaining worker… would it be insane to use a non-atomic int here?


    furszy commented at 3:43 pm on October 29, 2025:

    c219b93

    I was wondering if there’s any way to assert that the unblocked tasks are all executing on one single remaining worker… would it be insane to use a non-atomic int here?

    hmm, even if the non-atomic int works, that doesn’t really guarantee that the increment was done in the same thread. If we want to be 100% correct, we should store the ids of the threads that processed the tasks on a synchronized structure. Then check that only one id is there.


    andrewtoth commented at 3:57 pm on October 29, 2025:
    thread sanitizer would likely pick up on it though if you made it non-atomic.

    furszy commented at 6:37 pm on October 29, 2025:
    true. Closing this.

    andrewtoth commented at 6:44 pm on October 29, 2025:
    I mean pick up that it’s being written by multiple threads if something breaks the assumptions in this test. It seems like an ok idea. Or am I wrong?

    furszy commented at 7:05 pm on October 29, 2025:

    I mean pick up that it’s being written by multiple threads if something breaks the assumptions in this test. It seems like an ok idea. Or am I wrong?

    I don’t think there is any broken assumption. BlockWorkers() ensure that all threads except one are blocked. We could go further and check that all tasks submitted post the BlockWorkers call are executed by the same thread too, but that seems to be an overkill to me. We are basically ensuring that in another way.


    andrewtoth commented at 7:46 pm on October 29, 2025:
    Sorry for being unclear. I don’t think there is any broken assumption either. However, if we made this a non-atomic as suggested, then if something in the future were to break this assumption and run these tasks on multiple threads the unit test would break. Thus it would act as a regression test.

    furszy commented at 8:28 pm on October 29, 2025:

    Sorry for being unclear. I don’t think there is any broken assumption either. However, if we made this a non-atomic as suggested, then if something in the future were to break this assumption and run these tasks on multiple threads the unit test would break. Thus it would act as a regression test.

    But as you said, the thread sanitizer would likely complain about that. We’re accessing the same variable from different threads (main and worker). The main issue, however, will probably be related to thread visibility; changes made in one thread aren’t guaranteed to be visible to another. So that approach seems fragile to me.

    Also, going back to my initial comment: even if we switch to a non-atomic variable, the test could still pass. If we want it to be fully deterministic, we should store the IDs of the threads that processed the tasks in a synchronized structure, and then check that only one ID is present at the end. That seems to be the most secure approach to me.

    I could push this if have to re-touch. But it doesn’t seem to be a blocking feature (at least to me). We do ensure all threads except one are blocked in the test, and we fail if not.


    andrewtoth commented at 10:16 pm on October 29, 2025:

    But as you said, the thread sanitizer would likely complain about that. We’re accessing the same variable from different threads (main and worker).

    It won’t complain if we synchronize access to the same non-atomic variable, which we do here because we lock when calling Submit.

    The main issue, however, will probably be related to thread visibility; changes made in one thread aren’t guaranteed to be visible to another.

    I don’t think this is the main issue. If a future change breaks the ThreadPool such that in this test more than 1 thread will write to this non-atomic variable, then the thread sanitizer will cause the test to fail due to a data race.


    furszy commented at 1:23 am on October 30, 2025:
    Pushed.
  15. in src/test/threadpool_tests.cpp:205 in c219b93c3b outdated
    197+        for (int i = 0; i < num_tasks; i++) {
    198+            threadPool.Submit([&counter]() {
    199+                counter.fetch_add(1);
    200+            });
    201+        }
    202+        std::this_thread::sleep_for(std::chrono::milliseconds{100});
    


    pinheadmz commented at 3:11 pm on October 28, 2025:

    c219b93c3b043de202bdf3c65b433fd17af2da89

    As mentioned before, not a blocker but i feel like sleeps like this are test-flakiness waiting to happen …


    furszy commented at 3:49 pm on October 29, 2025:

    c219b93

    As mentioned before, not a blocker but i feel like sleeps like this are test-flakiness waiting to happen …

    This is one of those “wait and see if something happens” scenarios (if any task gets processed). We expect no activity here since all workers are busy. I’m not sure there is another way of doing this, but if it fails (even if it is not deterministic), that’s still a useful signal because it means something unexpected happened.

  16. in src/test/threadpool_tests.cpp:212 in c219b93c3b outdated
    204+
    205+        // Now process manually
    206+        for (int i = 0; i < num_tasks; i++) {
    207+            threadPool.ProcessTask();
    208+        }
    209+        BOOST_CHECK_EQUAL(counter.load(), num_tasks);
    


    pinheadmz commented at 3:13 pm on October 28, 2025:

    c219b93c3b043de202bdf3c65b433fd17af2da89

    belt-and-suspenders, could BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0); I just like the symmetry in a test since you assert the value is 20 before taking action.


    furszy commented at 3:55 pm on October 29, 2025:
    Sounds good. Done as suggested. Thanks
  17. in src/test/fuzz/threadpool.cpp:28 in bb71b0e3bf
    23+};
    24+
    25+struct CounterTask {
    26+    std::atomic_uint32_t& m_counter;
    27+    explicit CounterTask(std::atomic_uint32_t& counter) : m_counter{counter} {}
    28+    void operator()() const { m_counter.fetch_add(1); }
    


    andrewtoth commented at 5:26 pm on October 28, 2025:

    Maybe we should relax this atomic operation so that it doesn’t hide any issues by synchronizing them for us.

    0    void operator()() const { m_counter.fetch_add(1, std::memory_order_relaxed); }
    

    furszy commented at 7:09 pm on October 28, 2025:
    Sure, pushed. Thanks!
  18. furszy force-pushed on Oct 28, 2025
  19. ismaelsadeeq commented at 7:29 pm on October 28, 2025: member
    Concept ACK, thanks for the seperation from the initial index PR.
  20. pinheadmz approved
  21. pinheadmz commented at 3:28 pm on October 29, 2025: member

    ACK 195a96258f970c384ce180d57e73616904ef5fa1

    Built and tested on macos/arm64 and debian/x86. Reviewed each commit and left a few comments.

    I also tested the branch against other popular software that consumes the HTTP interface by running their CI:

    The http worker threads are clearly labeled and visible in htop!

     0-----BEGIN PGP SIGNED MESSAGE-----
     1Hash: SHA256
     2
     3ACK 195a96258f970c384ce180d57e73616904ef5fa1
     4-----BEGIN PGP SIGNATURE-----
     5
     6iQIzBAEBCAAdFiEE5hdzzW4BBA4vG9eM5+KYS2KJyToFAmkCMdwACgkQ5+KYS2KJ
     7yTriXA//RRnzezUHdmzRlKmoSDA+ZBHz0RY3z2LGk63izb/YdYaJY9JBZL2Y9BA8
     8K2nyexdSDC/DFOm4H56ddEe6ChlB7w+uZc92SgFSLSvavInpZ80KEJRk07vgoIL7
     9hwuyyevWyOOU32iz1NE3q316TMaJmzVsPhRGwbdmTXNwJLtUX6g4czfh28ajW1DC
    10Y9ULKwT36rFHRcKwC1YZYuBJUNBZWQgVBcydcmS1UEykY4mBnCW9knrATwn/29b7
    112AYPV+yuaiy9OpDEOJ8iKtZOPGR36NrIUleMUqruq4Sy2/TnJtm3AKNK0336/Fxu
    12MqVyPKSyusg7kBA6f2h/2+NHpbyLoboYhjZew+HvED/aFfi+Jla+nxybkUYXfciL
    13pzbND22TTuRGB1fKU7AwPD0TO9JwOTU385iEdpoGq8rbT3EpgPr31N4TeDQDJx5t
    14jPzzWZYj43JMuIc3bm/K5S2HYSdFZUEDQC81kbND+jOLF6YkJwS9794anLO38tvi
    15fip/eLK8Nw4pmWnW63/9lc+Y/1gLpgLDMxxhA1NKJyytk7z2IRo7vJKck6TqAjcZ
    16nU8Wv9/ful5ndDJfLIKuYT9jqRk9ORohVwv+P+ppuO8jhFjhuswxPlFJKkMXTeZn
    17hGi8QCrAUuibvVuLfLKVExJqSmmeUAkTVKp5ZTWLwNB7IkZrSoo=
    18=hZYm
    19-----END PGP SIGNATURE-----
    

    pinheadmz’s public key is on openpgp.org

  22. DrahtBot requested review from ismaelsadeeq on Oct 29, 2025
  23. DrahtBot requested review from TheCharlatan on Oct 29, 2025
  24. furszy force-pushed on Oct 29, 2025
  25. furszy commented at 3:56 pm on October 29, 2025: member
    Oh, awesome extensive test and review @pinheadmz! you rock!
  26. pinheadmz approved
  27. pinheadmz commented at 6:20 pm on October 29, 2025: member

    ACK f5eca8d5252a68f0fbc8b5efec021c75744555b6

    minor changes since last ack

     0-----BEGIN PGP SIGNED MESSAGE-----
     1Hash: SHA256
     2
     3ACK f5eca8d5252a68f0fbc8b5efec021c75744555b6
     4-----BEGIN PGP SIGNATURE-----
     5
     6iQIzBAEBCAAdFiEE5hdzzW4BBA4vG9eM5+KYS2KJyToFAmkCWuYACgkQ5+KYS2KJ
     7yTpb9RAAgxNlDQlqkVfnIK2Xt8BQmRsiG+1KHQGJA2d3RHoxjFg2129fj47y5i+t
     8GlatlcsXEcibI2D0F22sKSzY1u0LWy4GYLI1d12JAIewA99n/lRr1ktDk3v64pp8
     9kldvYpd8UBs7DCHSJhPs4HbOPgwIILPASdSbQTb+6m48X5jg+cu+yMBuANVq3sbB
    109I8rUlbUgRva5voy3EGRkXsGTuwcaCoLDNHnnjrqQkJMYTym47rMF/xTZJJ11isF
    11QrpzFu+P2tFy1oj0bGd4e5EhqNk++qfRCuw9sGLgLL6YEHzC/ihVH/L5NAxoeJX4
    12L0yX09BG5wDrbUQvZn4w1uaQz39Uor5mb8+tZyDUyRmO9MrG5AHomUtfdbiYLwSO
    13007bLD+WX4Hxode47xCdhUhqflXqbHD2mhkf5ESyUgGu3smSHSQQosuzIJiVmTn9
    14b2UJG6see/tckFvart6YLn1AIu9uCyUMmB+hZIcSasZv95oEHv1YB3E5dcKUmq0d
    15A1cUHIhNvU4i/hNy2xgijgSjDdpVQMFbLYUt9y4ZnzpJXFd7mnbpHVUVM1P+Onvp
    16ScAtgvosXfbd5PjnAQWsMWgSmVHUtSc4t9GPjGlRYaVpHFkEbqWpWeKrYqMkU3id
    179YiFP9BueWNbNV7OBlB0LcfCpwO2WbvfJOW93/jxDovc0tgohbU=
    18=c8aD
    19-----END PGP SIGNATURE-----
    

    pinheadmz’s public key is on openpgp.org

  28. TheCharlatan approved
  29. TheCharlatan commented at 9:26 pm on October 29, 2025: contributor

    ACK f5eca8d5252a68f0fbc8b5efec021c75744555b6

    I have fuzzed the thread pool for some time again, and did not find any new issues. Also the previously observed memory leak during fuzzing was not observed anymore. The changes to the http workers look sane to me.

    Can you run the commits through clang-format-diff.py? There are a bunch of formatting issues in the first two commits.

  30. furszy force-pushed on Oct 30, 2025
  31. furszy commented at 1:22 am on October 30, 2025: member

    Can you run the commits through clang-format-diff.py? There are a bunch of formatting issues in the first two commits.

    Pushed.

  32. in src/test/threadpool_tests.cpp:12 in e1eb4cd3a5 outdated
     7+
     8+#include <boost/test/unit_test.hpp>
     9+
    10+BOOST_AUTO_TEST_SUITE(threadpool_tests)
    11+
    12+constexpr auto TIMEOUT_SECS = std::chrono::seconds(120);
    


    l0rinc commented at 10:17 am on October 30, 2025:

    nit: we don’t need to include the type here:

    0constexpr auto TIMEOUT{std::chrono::seconds(120)};
    

    #26966 (review)


    furszy commented at 2:58 pm on November 17, 2025:
    renamed.
  33. in src/util/threadpool.h:114 in e1eb4cd3a5 outdated
    109+        m_interrupt = false; // Reset
    110+
    111+        // Create workers
    112+        m_workers.reserve(num_workers);
    113+        for (int i = 0; i < num_workers; i++) {
    114+            m_workers.emplace_back(&util::TraceThread, m_name + "_pool_" + util::ToString(i), [this] { WorkerThread(); });
    


    l0rinc commented at 10:29 am on October 30, 2025:

    nit: we could use strprintf in a few more places:

    0            m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
    

    furszy commented at 3:02 pm on November 17, 2025:
    I’m not sure about including extra dependencies on a low-level class for tiny readability improvements. But I’m not really opposed to this one, so done as suggested.
  34. in src/util/threadpool.h:140 in e1eb4cd3a5 outdated
    130+        {
    131+            LOCK(m_mutex);
    132+            m_interrupt = true;
    133+            threads_to_join.swap(m_workers);
    134+        }
    135+        m_cv.notify_all();
    


    l0rinc commented at 10:33 am on October 30, 2025:
    What’s the reason for locking and notifying manually instead of using higher-level primitives - have you tried these with C++20 https://en.cppreference.com/w/cpp/thread/barrier.html instead?

    furszy commented at 8:43 pm on October 30, 2025:
    That is part of the possible future experimentations written in the PR description. Goal is to keep the same synchronization mechanisms we had in the http server code.

    l0rinc commented at 1:22 pm on October 31, 2025:

    Goal is to keep the same synchronization mechanisms we had in the http server code

    What is the reason for that, isn’t the goal to have a reusable ThreadPool?

  35. in src/util/threadpool.h:130 in e1eb4cd3a5 outdated
    124+     * Must be called from a controller (non-worker) thread.
    125+     */
    126+    void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    127+    {
    128+        // Notify workers and join them.
    129+        std::vector<std::thread> threads_to_join;
    


    l0rinc commented at 10:33 am on October 30, 2025:
    I’m not sure this is necessarry
  36. in src/util/threadpool.h:67 in e1eb4cd3a5 outdated
    61+        WAIT_LOCK(m_mutex, wait_lock);
    62+        for (;;) {
    63+            std::packaged_task<void()> task;
    64+            {
    65+                // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
    66+                if (!m_interrupt && m_work_queue.empty()) {
    


    l0rinc commented at 10:41 am on October 30, 2025:
    do we really need this extra complexity here?

    furszy commented at 8:31 pm on October 30, 2025:
    Yes, we do. The reason is explained there. All thread busy, a new task arrives so notifications goes unnoticed, then threads finish processing and sleep. Loosing the new task wake up notification, not running the new task and sleeping until a new one gets submitted. There is a test exercising this exact behavior.

    l0rinc commented at 1:31 pm on October 31, 2025:
    Can you point me to the test that fails please? I have removed it, ran the tests added in this PR and they all passed.
  37. in src/util/threadpool.h:72 in e1eb4cd3a5 outdated
    66+                if (!m_interrupt && m_work_queue.empty()) {
    67+                    // Block until the pool is interrupted or a task is available.
    68+                    m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
    69+                }
    70+
    71+                // If stopped and no work left, exit worker
    


    l0rinc commented at 10:41 am on October 30, 2025:
    the comment mirrors the code, it doesn’t really add anything that the code doesn’t already say (especially since the comment above already stated the same)
  38. in src/test/threadpool_tests.cpp:89 in e1eb4cd3a5 outdated
    84+
    85+        // Store futures to ensure completion before checking counter.
    86+        std::vector<std::future<void>> futures;
    87+        futures.reserve(num_tasks);
    88+
    89+        for (int i = 1; i <= num_tasks; i++) {
    


    l0rinc commented at 10:48 am on October 30, 2025:

    nit: for consistency

    0        for (size_t i{1}; i <= num_tasks; ++i) {
    

    furszy commented at 8:49 pm on October 30, 2025:
    num_tasks is an int. That would require casting it to size_t.

    l0rinc commented at 1:32 pm on October 31, 2025:
    Please see my suggestions above, it’s a size_t there
  39. in src/test/threadpool_tests.cpp:206 in e1eb4cd3a5 outdated
    201+            threadPool.Submit([&counter]() {
    202+                counter.fetch_add(1);
    203+            });
    204+        }
    205+        std::this_thread::sleep_for(std::chrono::milliseconds{100});
    206+        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 20);
    


    l0rinc commented at 10:51 am on October 30, 2025:
    0        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
    
  40. in src/test/threadpool_tests.cpp:75 in e1eb4cd3a5 outdated
    70+            threadPool.Submit([]() { return false; });
    71+        } catch (const std::runtime_error&) {
    72+            err = true;
    73+        }
    74+        BOOST_CHECK(err);
    75+    }
    


    l0rinc commented at 10:58 am on October 30, 2025:

    I like how thoroughly we’re testing the functionality here, that helps a lot with being able to trust this complex code.

    I wanted to debug these one-by-one and I don’t want to run all other tests before - and it’s a bit awkward to have so many tests in a single place when they’re admittedly separate. Could you please split them out into separate units, such as:

    0BOOST_AUTO_TEST_CASE(submit_to_non_started_pool_throws)
    1{
    2    ThreadPool threadPool{"not_started"};
    3    BOOST_CHECK_EXCEPTION(threadPool.Submit([]{ return false; }), std::runtime_error, HasReason{"No active workers"});
    4}
    

    furszy commented at 3:03 pm on November 17, 2025:
    Done as suggested.
  41. in src/test/threadpool_tests.cpp:51 in e1eb4cd3a5 outdated
    46+    return blocking_tasks;
    47+}
    48+
    49+BOOST_AUTO_TEST_CASE(threadpool_basic)
    50+{
    51+    // Test Cases
    


    l0rinc commented at 11:01 am on October 30, 2025:

    As mentioned before, we’re having a single big-bang test case here (called “threadpool_basic”, but it contains everything we have), announcing the parts in comments followed by duplicating each comment. Can we split them out to focused, self-documenting test cases, something like:

    0BOOST_AUTO_TEST_CASE(submit_to_non_started_pool_throws)
    1BOOST_AUTO_TEST_CASE(submit_and_verify_completion)
    2BOOST_AUTO_TEST_CASE(single_available_worker_processes_all_tasks)
    3BOOST_AUTO_TEST_CASE(future_wait_blocks_until_task_completes)
    4BOOST_AUTO_TEST_CASE(future_get_returns_task_result)
    5BOOST_AUTO_TEST_CASE(task_exception_propagated_to_future)
    6BOOST_AUTO_TEST_CASE(process_task_manually_when_workers_busy)
    7BOOST_AUTO_TEST_CASE(recursive_task_submission)
    8BOOST_AUTO_TEST_CASE(stop_completes_queued_tasks_gracefully)
    

    That would help reviewers experiment and have exact test faulures, we could replace a lot of dead comments with code and provide better structure for the testing efforts.


    furszy commented at 3:03 pm on November 17, 2025:
    Done.
  42. in src/test/threadpool_tests.cpp:83 in e1eb4cd3a5 outdated
    78+    {
    79+        int num_tasks = 50;
    80+
    81+        ThreadPool threadPool(POOL_NAME);
    82+        threadPool.Start(NUM_WORKERS_DEFAULT);
    83+        std::atomic<int> counter = 0;
    


    l0rinc commented at 11:10 am on October 30, 2025:

    nits: std::atomic_int and its brothers seem more focused (and brace init helps with many narrowing problems) - we’re using these in the fuzzing commit as well:

    0        std::atomic_size_t counter{0};
    
  43. in src/test/threadpool_tests.cpp:90 in e1eb4cd3a5 outdated
    85+        // Store futures to ensure completion before checking counter.
    86+        std::vector<std::future<void>> futures;
    87+        futures.reserve(num_tasks);
    88+
    89+        for (int i = 1; i <= num_tasks; i++) {
    90+            futures.emplace_back(threadPool.Submit([&counter, i]() {
    


    l0rinc commented at 11:11 am on October 30, 2025:

    nit:

    0            futures.emplace_back(threadPool.Submit([&counter, i] {
    
  44. in src/test/threadpool_tests.cpp:96 in e1eb4cd3a5 outdated
    91+                counter.fetch_add(i);
    92+            }));
    93+        }
    94+
    95+        // Wait for all tasks to finish
    96+        WaitFor(futures, /*context=*/"test1 task");
    


    l0rinc commented at 11:13 am on October 30, 2025:
    I’m not yet sure I understand why the test framework needs a wait method, that sounds like something the thread pool should provide. This is why I mentioned in #26966 (review) that the problem has to come before the solution, because the reviewers have to check out the follow-up comments to see if this is representative of actual usage or not.
  45. in src/test/threadpool_tests.cpp:95 in e1eb4cd3a5 outdated
    90+            futures.emplace_back(threadPool.Submit([&counter, i]() {
    91+                counter.fetch_add(i);
    92+            }));
    93+        }
    94+
    95+        // Wait for all tasks to finish
    


    l0rinc commented at 11:13 am on October 30, 2025:
    comment is redundant, the method name and context already state all of that
  46. in src/util/threadpool.h:107 in e1eb4cd3a5 outdated
    101+     *
    102+     * Must be called from a controller (non-worker) thread.
    103+     */
    104+    void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    105+    {
    106+        assert(num_workers > 0);
    


    l0rinc commented at 11:17 am on October 30, 2025:

    Can we narrow the type in that case to at least disallow negative values?

    0    void Start(size_t num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    1    {
    2        assert(num_workers > 0);
    

    furszy commented at 9:01 pm on October 30, 2025:
    This decision was on purpose. To be able to assert and abort the program if a negative number is provided. A negative integer to size_t conversion would be bad.

    l0rinc commented at 1:34 pm on October 31, 2025:

    Where would we be getting negative numbers from? If we care about the value being in a certain range, we could validate that instead:

    0        assert(num_workers > 0 && num_workers <= MAX_WORKER_COUNT);
    
  47. in src/test/threadpool_tests.cpp:91 in e1eb4cd3a5 outdated
    86+        std::vector<std::future<void>> futures;
    87+        futures.reserve(num_tasks);
    88+
    89+        for (int i = 1; i <= num_tasks; i++) {
    90+            futures.emplace_back(threadPool.Submit([&counter, i]() {
    91+                counter.fetch_add(i);
    


    l0rinc commented at 11:23 am on October 30, 2025:

    Since this isn’t a synchronization operations, the default is memory_order_seq_cst seems too strong here and it might hide the data race by providing unintended synchronization:

    0                counter.fetch_add(i, std::memory_order_relaxed);
    

    I have created a godbolt reproducer to understand it better, see: https://godbolt.org/z/o87dsq1fG I couldn’t find any real difference on x86 cpus, but ARM ones show the difference (__aarch64_ldadd4_acq_rel vs __aarch64_ldadd4_relax)


    furszy commented at 3:04 pm on November 17, 2025:
    Done as suggested.

    l0rinc commented at 4:03 pm on November 17, 2025:
    Can you please apply this to the other cases as well?

    furszy commented at 9:29 pm on November 18, 2025:
    Done.

    l0rinc commented at 12:06 pm on November 19, 2025:
    Thanks. Currently all fetch_add calls use memory_order_relaxed, which makes sense since we don’t want the counter itself to act as a barrier for other data, but the corresponding loads still use the default std::memory_order_seq_cst. Since external synchronization (e.g. via WAIT_FOR calls) already provide happens-before relationships, I don’t think the memory ordering of load operations should be sequential. I don’t think it’s a bug, but would likely be more consistent if we used the same memory_order_relaxed on the loads as well. To avoid accidental synchronization, can we use memory_order_relaxed throughout (for fetch_add & load)? What do you think?
  48. in src/test/threadpool_tests.cpp:77 in e1eb4cd3a5 outdated
    72+            err = true;
    73+        }
    74+        BOOST_CHECK(err);
    75+    }
    76+
    77+    // Test case 1, submit tasks and verify completion.
    


    l0rinc commented at 12:12 pm on October 30, 2025:

    I’m usually coding along while reviewing, to simplify applying the suggestions that you like, I’m posting the final results as well:

     0BOOST_AUTO_TEST_CASE(submit_and_verify_completion)
     1{
     2    ThreadPool threadPool{"completion"};
     3    threadPool.Start(NUM_WORKERS_DEFAULT);
     4
     5    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
     6    std::atomic_size_t counter{0};
     7
     8    std::vector<std::future<void>> futures(num_tasks);
     9    for (size_t i{0}; i < num_tasks; ++i) {
    10        futures[i] = threadPool.Submit([&counter, i] { counter.fetch_add(i, std::memory_order_relaxed); });
    11    }
    12
    13    WaitFor(futures);
    14    BOOST_CHECK_EQUAL(counter.load(std::memory_order_relaxed), (num_tasks - 1) * num_tasks / 2);
    15    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    16}
    
  49. in src/test/threadpool_tests.cpp:79 in e1eb4cd3a5 outdated
    74+        BOOST_CHECK(err);
    75+    }
    76+
    77+    // Test case 1, submit tasks and verify completion.
    78+    {
    79+        int num_tasks = 50;
    


    l0rinc commented at 12:13 pm on October 30, 2025:

    we’re iterating forward, seems simpler to use an unsigned here

    0        constexpr size_t num_tasks{50};
    

    and to add some variance to the tests we could vary these randomly:

    0        const auto num_tasks{1 + m_rng.randrange<size_t>(50)};
    

    note that this will need:

    0BOOST_FIXTURE_TEST_SUITE(threadpool_tests, BasicTestingSetup)
    
  50. in src/test/threadpool_tests.cpp:43 in e1eb4cd3a5 outdated
    21+    }
    22+}
    23+
    24+// Block a number of worker threads by submitting tasks that wait on `blocker_future`.
    25+// Returns the futures of the blocking tasks, ensuring all have started and are waiting.
    26+std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::shared_future<void>& blocker_future, int num_of_threads_to_block, const std::string& context)
    


    l0rinc commented at 12:32 pm on October 30, 2025:

    ThreadPool::WorkersCount() returns size_t, if we restricted num_of_threads_to_block to size_t as well, we could assert here:

    0    assert(threadPool.WorkersCount() >= num_of_threads_to_block);
    
  51. in src/test/threadpool_tests.cpp:39 in e1eb4cd3a5 outdated
    17+    for (size_t i = 0; i < futures.size(); ++i) {
    18+        if (futures[i].wait_for(TIMEOUT_SECS) != std::future_status::ready) {
    19+            throw std::runtime_error("Timeout waiting for: " + context + ", task index " + util::ToString(i));
    20+        }
    21+    }
    22+}
    


    l0rinc commented at 12:55 pm on October 30, 2025:

    As mentioned before, I think these could be simplified to std::span, std::string_view and strprintf. And if we extract each test to a separate unit (as suggested below), we won’t need the context parameter anymore. The thread index also doesn’t really help in case of the failure, which would enable us to simplify this. And lastly, since BlockWorkers is already a std::vector<std::future<void>> we don’t actually need the template here:

    0void WaitFor(std::span<const std::future<void>> futures)
    1{
    2    for (const auto& f : futures) {
    3        BOOST_REQUIRE(f.wait_for(TIMEOUT) == std::future_status::ready);
    4    }
    5}
    

    furszy commented at 3:09 pm on November 17, 2025:
    we would probably still need the context string to differentiate between the initial BlockWorkers() calls vs the final result WaitFor. Should re-check if BOOST_REQUIRE prints the stack trace; most likely it doesn’t and that was the reason behind my past-self added the context string.

    l0rinc commented at 4:25 pm on November 17, 2025:

    The original failure looks like this:

    unknown location:0: fatal error: in “threadpool_tests/submit_tasks_complete_successfully”: std::runtime_error: Timeout waiting for: test1 task, task index 0

    test/threadpool_tests.cpp:77: last checkpoint: “submit_tasks_complete_successfully” test entry

    If you want the extra method to not mask the failure stack, we can make it a macro, like we did with e.g. https://github.com/bitcoin/bitcoin/blob/cc5dda1de333cf7aa10e2237ee2c9221f705dbd9/src/test/headers_sync_chainwork_tests.cpp#L22-L42

    Which would look like:

    0#define WAIT_FOR(futures)                                                         \
    1    do {                                                                          \
    2        for (const auto& f : futures) {                                           \
    3            BOOST_REQUIRE(f.wait_for(WAIT_TIMEOUT) == std::future_status::ready); \
    4        }                                                                         \
    5    } while (0)
    

    and a failure stack would be more helpful:

    test/threadpool_tests.cpp:96: fatal error: in “threadpool_tests/submit_tasks_complete_successfully”: critical check f.wait_for(WAIT_TIMEOUT) != std::future_status::ready has failed

    which would point to the exact call site instead of the helper method: https://github.com/bitcoin/bitcoin/blob/41c99a2e3ef4c1cddaaa313a454009a141a6a782/src/test/threadpool_tests.cpp#L96


    furszy commented at 9:30 pm on November 18, 2025:
    Done as suggested.
  52. in src/test/threadpool_tests.cpp:102 in e1eb4cd3a5 outdated
     97+        int expected_value = (num_tasks * (num_tasks + 1)) / 2; // Gauss sum.
     98+        BOOST_CHECK_EQUAL(counter.load(), expected_value);
     99+        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    100+    }
    101+
    102+    // Test case 2, maintain all threads busy except one.
    


    l0rinc commented at 1:05 pm on October 30, 2025:

    we could generalize this by counting from 1 to <NUM_WORKERS_DEFAULT, something like:

     0BOOST_AUTO_TEST_CASE(limited_free_workers_processes_all_task)
     1{
     2    ThreadPool threadPool{"block_counts"};
     3    threadPool.Start(NUM_WORKERS_DEFAULT);
     4    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
     5
     6    for (size_t free{1}; free < NUM_WORKERS_DEFAULT; ++free) {
     7        BOOST_TEST_MESSAGE("Testing with " << free << " available workers");
     8        std::counting_semaphore sem{0};
     9        const auto blocking_tasks{BlockWorkers(threadPool, sem, free)};
    10
    11        size_t counter{0};
    12        std::vector<std::future<void>> futures(num_tasks);
    13        for (auto& f : futures) f = threadPool.Submit([&counter] { ++counter; });
    14
    15        WaitFor(futures);
    16        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    17
    18        if (free == 1) {
    19            BOOST_CHECK_EQUAL(counter, num_tasks);
    20        } else {
    21            BOOST_CHECK_LE(counter, num_tasks); // unsynchronized update from multiple threads doesn't guarantee consistency
    22        }
    23
    24        sem.release(free);
    25        WaitFor(blocking_tasks);
    26    }
    27
    28    threadPool.Stop();
    29}
    
  53. in src/test/threadpool_tests.cpp:62 in e1eb4cd3a5 outdated
    57+    // 5) The task throws an exception, catch must be done in the consumer side.
    58+    // 6) Busy workers, help them by processing tasks from outside.
    59+    // 7) Recursive submission of tasks.
    60+    // 8) Submit task when all threads are busy, stop pool and verify the task gets executed.
    61+
    62+    const int NUM_WORKERS_DEFAULT = 3;
    


    l0rinc commented at 1:37 pm on October 30, 2025:

    I needed at least nproc+1 workers to be able to reliably reproduce concurrency issues (such as the non-atomic counter update)

    0    const size_t NUM_WORKERS_DEFAULT{size_t(GetNumCores()) + 1}; // we need to make sure there's *some* contention
    

    furszy commented at 3:11 pm on November 17, 2025:
    Instead of doing it for all the test cases, added another case that exercises the contention.
  54. in src/test/threadpool_tests.cpp:123 in e1eb4cd3a5 outdated
    118+        futures.reserve(num_tasks);
    119+        for (int i = 0; i < num_tasks; i++) {
    120+            futures.emplace_back(threadPool.Submit([&counter]() {
    121+                counter += 1;
    122+            }));
    123+        }
    


    l0rinc commented at 1:48 pm on October 30, 2025:

    nit: these setup parts are longer than they need to be, this is just glue code, I find it distracting from the actual test logic:

    0        std::vector<std::future<void>> futures(num_tasks);
    1        for (auto& f : futures) f = threadPool.Submit([&counter]{ ++counter; });
    

    furszy commented at 3:12 pm on November 17, 2025:
    Done as suggested.

    l0rinc commented at 4:27 pm on November 17, 2025:
    I still see init + reserve + loop + emplace in most tests, could we apply to the rest as well?
  55. in src/test/threadpool_tests.cpp:143 in e1eb4cd3a5 outdated
    138+        std::atomic<bool> flag = false;
    139+        std::future<void> future = threadPool.Submit([&flag]() {
    140+            std::this_thread::sleep_for(std::chrono::milliseconds{200});
    141+            flag.store(true);
    142+        });
    143+        future.wait();
    


    l0rinc commented at 2:08 pm on October 30, 2025:
    hmm, isn’t this why WaitFor was added with a timeout in the first place?
  56. in src/test/threadpool_tests.cpp:140 in e1eb4cd3a5 outdated
    135+    {
    136+        ThreadPool threadPool(POOL_NAME);
    137+        threadPool.Start(NUM_WORKERS_DEFAULT);
    138+        std::atomic<bool> flag = false;
    139+        std::future<void> future = threadPool.Submit([&flag]() {
    140+            std::this_thread::sleep_for(std::chrono::milliseconds{200});
    


    l0rinc commented at 2:10 pm on October 30, 2025:

    maybe we can test more here, each test waiting a different amount (e.g. work index milliseconds) and we could assert at the end that complection takes at least NUM_WORKERS_DEFAULT milliseconds.

     0BOOST_AUTO_TEST_CASE(future_wait_blocks_until_task_completes)
     1{
     2    ThreadPool threadPool{"wait_test"};
     3    threadPool.Start(NUM_WORKERS_DEFAULT);
     4    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
     5
     6    const auto start{steady_clock::now()};
     7    std::vector<std::future<void>> futures(num_tasks + 1);
     8    for (size_t i{0}; i <= num_tasks; ++i) {
     9        futures[i] = threadPool.Submit([i] { UninterruptibleSleep(milliseconds{i}); });
    10    }
    11    WaitFor(futures);
    12    const size_t elapsed_ms{size_t(duration_cast<milliseconds>(steady_clock::now() - start).count())};
    13    BOOST_CHECK(elapsed_ms >= num_tasks);
    14}
    
  57. in src/test/threadpool_tests.cpp:252 in e1eb4cd3a5 outdated
    247+        // At this point, all workers are blocked, and the extra task is queued
    248+        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
    249+
    250+        // Wait a short moment before unblocking the threads to mimic a concurrent shutdown
    251+        std::thread thread_unblocker([&blocker]() {
    252+            std::this_thread::sleep_for(std::chrono::milliseconds{300});
    


    l0rinc commented at 2:13 pm on October 30, 2025:
    We could likely use UninterruptibleSleep here instead

    furszy commented at 3:12 pm on November 17, 2025:
    Done as suggested.
  58. in src/test/threadpool_tests.cpp:64 in e1eb4cd3a5 outdated
    42+    }
    43+
    44+    // Wait until all threads are actually blocked
    45+    WaitFor(ready_futures, context);
    46+    return blocking_tasks;
    47+}
    


    l0rinc commented at 2:41 pm on October 30, 2025:

    What is the reason for the heavy use of promises? Does it have any advantage compared to C++20 concurrency primitives? This seems to me like a barebones implementation for an std::latch. Can we use that instead?

    This seemed like a std::binary_semaphore to me initially but couldn’t make it work with that.

    But we can still use a simple std::counting_semaphore which would avoid the std::promise + std::shared_future + std::vector<std::future<void>>, something like:

     0std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::counting_semaphore<>& release_sem, size_t num_of_threads_to_block)
     1{
     2    assert(threadPool.WorkersCount() >= num_of_threads_to_block);
     3    std::latch ready{std::ptrdiff_t(num_of_threads_to_block)};
     4
     5    std::vector<std::future<void>> blocking_tasks(num_of_threads_to_block);
     6    for (auto& f : blocking_tasks) f = threadPool.Submit([&] {
     7        ready.count_down();
     8        release_sem.acquire();
     9    });
    10
    11    ready.wait();
    12    return blocking_tasks;
    13}
    

    furszy commented at 3:16 pm on November 17, 2025:

    What is the reason for the heavy use of promises? Does it have any advantage compared to C++20 concurrency primitives?

    I wrote this code 4 years ago. When the project was using C++11 or C++14 (I don’t recall which one was).

    This seems to me like a barebones implementation for an std::latch. Can we use that instead?

    Probably. But I don’t think it matters much at the end. Will check it. Thanks.

  59. in src/test/threadpool_tests.cpp:147 in e1eb4cd3a5 outdated
    142+        });
    143+        future.wait();
    144+        BOOST_CHECK(flag.load());
    145+    }
    146+
    147+    // Test case 4, obtain result object.
    


    l0rinc commented at 2:47 pm on October 30, 2025:

    we could extract this to something like:

    0BOOST_AUTO_TEST_CASE(future_get_returns_task_result)
    1{
    2    ThreadPool threadPool{"result_test"};
    3    threadPool.Start(NUM_WORKERS_DEFAULT);
    4    
    5    BOOST_CHECK_EQUAL(threadPool.Submit([] { return true; }).get(), true);
    6    BOOST_CHECK_EQUAL(threadPool.Submit([] { return 42; }).get(), 42);
    7    BOOST_CHECK_EQUAL(threadPool.Submit([] { return std::string{"true"}; }).get(), "true");
    8}
    
  60. in src/test/threadpool_tests.cpp:168 in e1eb4cd3a5 outdated
    163+    // Test case 5, throw exception and catch it on the consumer side.
    164+    {
    165+        ThreadPool threadPool(POOL_NAME);
    166+        threadPool.Start(NUM_WORKERS_DEFAULT);
    167+
    168+        int ROUNDS = 5;
    


    l0rinc commented at 2:48 pm on October 30, 2025:
    in other cases we called this num_tasks

    furszy commented at 3:16 pm on November 17, 2025:
    Done as suggested.
  61. in src/test/threadpool_tests.cpp:170 in e1eb4cd3a5 outdated
    165+        ThreadPool threadPool(POOL_NAME);
    166+        threadPool.Start(NUM_WORKERS_DEFAULT);
    167+
    168+        int ROUNDS = 5;
    169+        std::string err_msg{"something wrong happened"};
    170+        std::vector<std::future<void>> futures;
    


    l0rinc commented at 2:52 pm on October 30, 2025:

    do we need a vector here of can we just consume whatever we just submitted?

     0BOOST_AUTO_TEST_CASE(task_exception_propagated_to_future)
     1{
     2    ThreadPool threadPool{"exception_test"};
     3    threadPool.Start(NUM_WORKERS_DEFAULT);
     4
     5    const auto make_err{[&](size_t n) { return strprintf("error on thread #%s", n); }};
     6
     7    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
     8    for (size_t i{0}; i < num_tasks; ++i) {
     9        BOOST_CHECK_EXCEPTION(threadPool.Submit([&] { throw std::runtime_error(make_err(i)); }).get(), std::runtime_error, HasReason{make_err(i)});
    10    }
    11}
    

    furszy commented at 3:18 pm on November 17, 2025:

    do we need a vector here of can we just consume whatever we just submitted?

    Consuming right away would wait for the task to be executed; we want to exercise some concurrency too.


    l0rinc commented at 4:46 pm on November 17, 2025:

    we want to exercise some concurrency too

    You’re right, that’s better indeed. My suggestion updated to keep the vector

     0// Test 5, throw exceptions and catch it on the consumer side
     1BOOST_AUTO_TEST_CASE(task_exception_propagates_to_future)
     2{
     3    ThreadPool threadPool("exception_test");
     4    threadPool.Start(NUM_WORKERS_DEFAULT);
     5
     6    const auto make_err{[&](size_t n) { return strprintf("error on thread #%s", n); }};
     7
     8    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
     9    std::vector<std::future<void>> futures(num_tasks);
    10    for (size_t i{0}; i < num_tasks; ++i) {
    11        futures[i] = threadPool.Submit([&make_err, i] { throw std::runtime_error(make_err(i)); });
    12    }
    13
    14    for (size_t i{0}; i < num_tasks; ++i) {
    15        BOOST_CHECK_EXCEPTION(futures[i].get(), std::runtime_error, HasReason{make_err(i)});
    16    }
    17}
    
  62. in src/test/threadpool_tests.cpp:188 in e1eb4cd3a5 outdated
    183+                BOOST_CHECK_EQUAL(e.what(), err_msg + util::ToString(i));
    184+            }
    185+        }
    186+    }
    187+
    188+    // Test case 6, all workers are busy, help them by processing tasks from outside.
    


    l0rinc commented at 3:34 pm on October 30, 2025:

    We can split out the remaining 3 tests as well:

     0
     1BOOST_AUTO_TEST_CASE(process_task_manually_when_workers_busy)
     2{
     3    ThreadPool threadPool{"manual_process"};
     4    threadPool.Start(NUM_WORKERS_DEFAULT);
     5    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
     6
     7    std::counting_semaphore sem{0};
     8    const auto blocking_tasks{BlockWorkers(threadPool, sem, NUM_WORKERS_DEFAULT)};
     9
    10    std::atomic_size_t counter{0};
    11    std::vector<std::future<void>> futures(num_tasks);
    12    for (auto& f : futures) f = threadPool.Submit([&counter] { counter.fetch_add(1, std::memory_order_relaxed); });
    13
    14    UninterruptibleSleep(milliseconds{100});
    15    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
    16
    17    for (size_t i{0}; i < num_tasks; ++i) {
    18        threadPool.ProcessTask();
    19    }
    20
    21    BOOST_CHECK_EQUAL(counter.load(), num_tasks);
    22    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    23
    24    sem.release(NUM_WORKERS_DEFAULT);
    25    WaitFor(blocking_tasks);
    26}
    27
    28BOOST_AUTO_TEST_CASE(recursive_task_submission)
    29{
    30    ThreadPool threadPool{"recursive"};
    31    threadPool.Start(NUM_WORKERS_DEFAULT);
    32
    33    std::promise<void> signal;
    34    threadPool.Submit([&threadPool, &signal] {
    35        threadPool.Submit([&signal] {
    36            signal.set_value();
    37        });
    38    });
    39
    40    signal.get_future().wait();
    41}
    42
    43BOOST_AUTO_TEST_CASE(stop_completes_queued_tasks_gracefully)
    44{
    45    ThreadPool threadPool{"graceful_stop"};
    46    threadPool.Start(NUM_WORKERS_DEFAULT);
    47
    48    std::counting_semaphore sem{0};
    49    const auto blocking_tasks{BlockWorkers(threadPool, sem, NUM_WORKERS_DEFAULT)};
    50
    51    auto future{threadPool.Submit([] { return true; })};
    52    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
    53
    54    std::thread thread_unblocker{[&sem] {
    55        std::this_thread::sleep_for(milliseconds{300});
    56        sem.release(NUM_WORKERS_DEFAULT);
    57    }};
    58
    59    threadPool.Stop();
    60
    61    BOOST_CHECK(future.get());
    62    thread_unblocker.join();
    63    WaitFor(blocking_tasks);
    64    BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
    65}
    

    furszy commented at 3:18 pm on November 17, 2025:
    Splitted.
  63. in src/httpserver.cpp:53 in 435e8b5a55 outdated
    49@@ -49,83 +50,6 @@ using common::InvalidPortErrMsg;
    50 /** Maximum size of http request (request line + headers) */
    51 static const size_t MAX_HEADERS_SIZE = 8192;
    52 
    53-/** HTTP request work item */
    54-class HTTPWorkItem final : public HTTPClosure
    


    l0rinc commented at 3:40 pm on October 30, 2025:
    Is there a way to do this final threadpool migration in smaller steps?

    furszy commented at 3:20 pm on November 17, 2025:
    I guess you wrote this first and then answered yourself in your final comment; which is basically a long version of the PR’s “Note 2” description.

    l0rinc commented at 4:55 pm on November 17, 2025:
    Not exactly sure what you mean by that - Note 2 does seem like a good idea to me and it’s probably similar to what I meant. It would help the review process to gain more confidence in the implementation if we migrated away in smaller steps, documenting how the tests we add for the old implementation also pass when we switch over to the new implementation.

    furszy commented at 8:54 pm on November 17, 2025:

    I imagine the problem is that you haven’t compared the current http code with the ThreadPool code, which is pretty much the same but properly documented and test covered. That’s one of the reasons behind the not modernization of the underlying sync mechanism in this PR, and why I added the “Note 2” paragraph as well as wrote #33689 (comment).

    Look at the current WorkQueue:

     0void Run() EXCLUSIVE_LOCKS_REQUIRED(!cs)
     1    {
     2        while (true) {
     3            std::unique_ptr<WorkItem> i;
     4            {
     5                WAIT_LOCK(cs, lock);
     6                while (running && queue.empty())
     7                    cond.wait(lock);
     8                if (!running && queue.empty())
     9                    break;
    10                i = std::move(queue.front());
    11                queue.pop_front();
    12            }
    13            (*i)();
    14        }
    15    }
    

    And this is the ThreadPool (stripping all comments)

     0
     1void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     2    {
     3        WAIT_LOCK(m_mutex, wait_lock);
     4        for (;;) {
     5            std::packaged_task<void()> task;
     6            {
     7                if (!m_interrupt && m_work_queue.empty()) {
     8                    m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
     9                }
    10
    11                if (m_interrupt && m_work_queue.empty()) return;
    12               
    13                task = std::move(m_work_queue.front());
    14                m_work_queue.pop();
    15            }
    16
    17            REVERSE_LOCK(wait_lock, m_mutex);
    18            task();
    19        }
    20    }
    

    In any case, since most reviewers seem ok to proceed with the current approach, I think it’s more productive to keep working on it rather than continue circling around this topic, even if it’s not to your taste.


    l0rinc commented at 5:08 pm on November 19, 2025:
    This is a big change, other reviewers can just review the unified view if they don’t want smaller commits, but I currently cannot view this in small steps, so I insist that we should split it into smaller changes. I want to help, that’s why I’m spending so much time with the details, I think this is a really risky change, I want to make sure it’s correct. Let me know how I can help.
  64. l0rinc changes_requested
  65. l0rinc commented at 4:05 pm on October 30, 2025: contributor

    I have started reviewing this PR but have only finished the first commit (e1eb4cd3a5eb192cd6d9ee5d255688c06ab2089a).

    The depth of tests gives me hope that this transition will be smooth, the tests cover most of the functionality, even a few corner cases I haven’t thought of!

    It seemed, however, that I had more to say about that than anticipated. I didn’t even get to reviewing the threadpool properly, just most of the tests.

    I know receiving this amount of feedback can be daunting, but we both want this to succeed. I have spent a lot of time meticulously going through the details. I hope you will take it as I meant it: to make absolutely sure this won’t cause any problems and that our test suite is rock solid. I will continue the review, but wanted to make sure you’re aware of the progress and thought we can synchronize more often this way (pun intended).

    In general I think adding a threadpool can untangle complicated code as such, but we have to find a balance between IO and CPU bound tasks. I found that oversubscription is usually a smaller problem, so we can likely solve the IO/CPU contention problem by creating dedicated ThreadPools for each major task (http, script verification, input fetcher, compaction, etc). This way it won’t be a general resource guardian (which can be a ThreadPool’s main purpose), just a tool to avoid the overhead of starting/stopping threads.

    As hinted in the original thread, I find the current structure to be harder to follow, I would appreciate if you would consider doing the extraction in smaller steps:

    • first step would be to use the new ThreadPool’s structure, but extract the old logic into it;
    • add the tests (unit and fuzz, no need to split it) against the old impl in a separate commit;
    • in a last step swap it out with the new logic, keeping the same structure and tests.

    This way we could debug and understand the old code before we jump into the new one - and the tests would guide us in guaranteeing that the behavior stayed basically the same. I understand that’s extra work, but I’m of course willing to help in whatever you need to be able to do a less risky transition.

    I glanced quickly to the actual ThreadPool implementation, looks okay, but I want to investigate as part of my next wave of reviews why we need so much locking here and why we’re relying on old primitives here instead of the C++20 concurrency tools (such as latches and barriers) and whether we can create and destroy the pool via RAII instead of manual start/stops.

    Note that I have done an IBD before and after this change to see if everything was still working and I didn’t see any regression!

      0diff --git a/src/test/threadpool_tests.cpp b/src/test/threadpool_tests.cpp
      1index e8200533cd..052784db37 100644
      2--- a/src/test/threadpool_tests.cpp
      3+++ b/src/test/threadpool_tests.cpp
      4@@ -2,270 +2,208 @@
      5 // Distributed under the MIT software license, see the accompanying
      6 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
      7 
      8+#include <common/system.h>
      9+#include <test/util/setup_common.h>
     10 #include <util/string.h>
     11 #include <util/threadpool.h>
     12+#include <util/time.h>
     13 
     14 #include <boost/test/unit_test.hpp>
     15 
     16-BOOST_AUTO_TEST_SUITE(threadpool_tests)
     17+#include <latch>
     18+#include <semaphore>
     19 
     20-constexpr auto TIMEOUT_SECS = std::chrono::seconds(120);
     21+using namespace std::chrono;
     22+constexpr auto TIMEOUT = seconds(120);
     23 
     24-template <typename T>
     25-void WaitFor(const std::vector<std::future<T>>& futures, const std::string& context)
     26+void WaitFor(std::span<const std::future<void>> futures)
     27 {
     28-    for (size_t i = 0; i < futures.size(); ++i) {
     29-        if (futures[i].wait_for(TIMEOUT_SECS) != std::future_status::ready) {
     30-            throw std::runtime_error("Timeout waiting for: " + context + ", task index " + util::ToString(i));
     31-        }
     32+    for (const auto& f : futures) {
     33+        BOOST_REQUIRE(f.wait_for(TIMEOUT) == std::future_status::ready);
     34     }
     35 }
     36 
     37 // Block a number of worker threads by submitting tasks that wait on `blocker_future`.
     38 // Returns the futures of the blocking tasks, ensuring all have started and are waiting.
     39-std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::shared_future<void>& blocker_future, int num_of_threads_to_block, const std::string& context)
     40+std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::counting_semaphore<>& release_sem, size_t num_of_threads_to_block)
     41 {
     42-    // Per-thread ready promises to ensure all workers are actually blocked
     43-    std::vector<std::promise<void>> ready_promises(num_of_threads_to_block);
     44-    std::vector<std::future<void>> ready_futures;
     45-    ready_futures.reserve(num_of_threads_to_block);
     46-    for (auto& p : ready_promises) ready_futures.emplace_back(p.get_future());
     47-
     48-    // Fill all workers with blocking tasks
     49-    std::vector<std::future<void>> blocking_tasks;
     50-    for (int i = 0; i < num_of_threads_to_block; i++) {
     51-        std::promise<void>& ready = ready_promises[i];
     52-        blocking_tasks.emplace_back(threadPool.Submit([blocker_future, &ready]() {
     53-            ready.set_value();
     54-            blocker_future.wait();
     55-        }));
     56-    }
     57+    assert(threadPool.WorkersCount() >= num_of_threads_to_block);
     58+    std::latch ready{std::ptrdiff_t(num_of_threads_to_block)};
     59 
     60-    // Wait until all threads are actually blocked
     61-    WaitFor(ready_futures, context);
     62+    std::vector<std::future<void>> blocking_tasks(num_of_threads_to_block);
     63+    for (auto& f : blocking_tasks) f = threadPool.Submit([&] {
     64+        ready.count_down();
     65+        release_sem.acquire();
     66+    });
     67+
     68+    ready.wait();
     69     return blocking_tasks;
     70 }
     71 
     72-BOOST_AUTO_TEST_CASE(threadpool_basic)
     73+BOOST_FIXTURE_TEST_SUITE(threadpool_tests, BasicTestingSetup)
     74+
     75+const size_t NUM_WORKERS_DEFAULT{size_t(GetNumCores()) + 1}; // we need to make sure there's *some* contention
     76+
     77+BOOST_AUTO_TEST_CASE(submit_to_non_started_pool_throws)
     78 {
     79-    // Test Cases
     80-    // 0) Submit task to a non-started pool.
     81-    // 1) Submit tasks and verify completion.
     82-    // 2) Maintain all threads busy except one.
     83-    // 3) Wait for work to finish.
     84-    // 4) Wait for result object.
     85-    // 5) The task throws an exception, catch must be done in the consumer side.
     86-    // 6) Busy workers, help them by processing tasks from outside.
     87-    // 7) Recursive submission of tasks.
     88-    // 8) Submit task when all threads are busy, stop pool and verify the task gets executed.
     89-
     90-    const int NUM_WORKERS_DEFAULT = 3;
     91-    const std::string POOL_NAME = "test";
     92-
     93-    // Test case 0, submit task to a non-started pool
     94-    {
     95-        ThreadPool threadPool(POOL_NAME);
     96-        bool err = false;
     97-        try {
     98-            threadPool.Submit([]() { return false; });
     99-        } catch (const std::runtime_error&) {
    100-            err = true;
    101-        }
    102-        BOOST_CHECK(err);
    103+    ThreadPool threadPool{"not_started"};
    104+    BOOST_CHECK_EXCEPTION(threadPool.Submit([] { return 0; }), std::runtime_error, HasReason{"No active workers"});
    105+}
    106+
    107+BOOST_AUTO_TEST_CASE(submit_and_verify_completion)
    108+{
    109+    ThreadPool threadPool{"completion"};
    110+    threadPool.Start(NUM_WORKERS_DEFAULT);
    111+
    112+    const auto num_tasks{1 + m_rng.randrange<size_t>(50)};
    113+    std::atomic_size_t counter{0};
    114+
    115+    std::vector<std::future<void>> futures(num_tasks);
    116+    for (size_t i{0}; i < num_tasks; ++i) {
    117+        futures[i] = threadPool.Submit([&counter, i] { counter.fetch_add(i, std::memory_order_relaxed); });
    118     }
    119 
    120-    // Test case 1, submit tasks and verify completion.
    121-    {
    122-        int num_tasks = 50;
    123+    WaitFor(futures);
    124+    BOOST_CHECK_EQUAL(counter.load(std::memory_order_relaxed), (num_tasks - 1) * num_tasks / 2);
    125+    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    126+}
    127 
    128-        ThreadPool threadPool(POOL_NAME);
    129-        threadPool.Start(NUM_WORKERS_DEFAULT);
    130-        std::atomic<int> counter = 0;
    131+BOOST_AUTO_TEST_CASE(limited_free_workers_processes_all_task)
    132+{
    133+    ThreadPool threadPool{"block_counts"};
    134+    threadPool.Start(NUM_WORKERS_DEFAULT);
    135+    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
    136 
    137-        // Store futures to ensure completion before checking counter.
    138-        std::vector<std::future<void>> futures;
    139-        futures.reserve(num_tasks);
    140+    for (size_t free{1}; free < NUM_WORKERS_DEFAULT; ++free) {
    141+        BOOST_TEST_MESSAGE("Testing with " << free << " available workers");
    142+        std::counting_semaphore sem{0};
    143+        const auto blocking_tasks{BlockWorkers(threadPool, sem, free)};
    144 
    145-        for (int i = 1; i <= num_tasks; i++) {
    146-            futures.emplace_back(threadPool.Submit([&counter, i]() {
    147-                counter.fetch_add(i);
    148-            }));
    149-        }
    150+        size_t counter{0};
    151+        std::vector<std::future<void>> futures(num_tasks);
    152+        for (auto& f : futures) f = threadPool.Submit([&counter] { ++counter; });
    153 
    154-        // Wait for all tasks to finish
    155-        WaitFor(futures, /*context=*/"test1 task");
    156-        int expected_value = (num_tasks * (num_tasks + 1)) / 2; // Gauss sum.
    157-        BOOST_CHECK_EQUAL(counter.load(), expected_value);
    158+        WaitFor(futures);
    159         BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    160-    }
    161 
    162-    // Test case 2, maintain all threads busy except one.
    163-    {
    164-        ThreadPool threadPool(POOL_NAME);
    165-        threadPool.Start(NUM_WORKERS_DEFAULT);
    166-        // Single blocking future for all threads
    167-        std::promise<void> blocker;
    168-        std::shared_future<void> blocker_future(blocker.get_future());
    169-        const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT - 1, /*context=*/"test2 blocking tasks enabled");
    170-
    171-        // Now execute tasks on the single available worker
    172-        // and check that all the tasks are executed.
    173-        int num_tasks = 15;
    174-        int counter = 0;
    175-
    176-        // Store futures to wait on
    177-        std::vector<std::future<void>> futures;
    178-        futures.reserve(num_tasks);
    179-        for (int i = 0; i < num_tasks; i++) {
    180-            futures.emplace_back(threadPool.Submit([&counter]() {
    181-                counter += 1;
    182-            }));
    183+        if (free == 1) {
    184+            BOOST_CHECK_EQUAL(counter, num_tasks);
    185+        } else {
    186+            BOOST_CHECK_LE(counter, num_tasks); // unsynchronized update from multiple threads doesn't guarantee consistency
    187         }
    188 
    189-        WaitFor(futures, /*context=*/"test2 tasks");
    190-        BOOST_CHECK_EQUAL(counter, num_tasks);
    191-
    192-        blocker.set_value();
    193-        WaitFor(blocking_tasks, /*context=*/"test2 blocking tasks disabled");
    194-        threadPool.Stop();
    195-        BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
    196+        sem.release(free);
    197+        WaitFor(blocking_tasks);
    198     }
    199 
    200-    // Test case 3, wait for work to finish.
    201-    {
    202-        ThreadPool threadPool(POOL_NAME);
    203-        threadPool.Start(NUM_WORKERS_DEFAULT);
    204-        std::atomic<bool> flag = false;
    205-        std::future<void> future = threadPool.Submit([&flag]() {
    206-            std::this_thread::sleep_for(std::chrono::milliseconds{200});
    207-            flag.store(true);
    208-        });
    209-        future.wait();
    210-        BOOST_CHECK(flag.load());
    211-    }
    212+    threadPool.Stop();
    213+}
    214 
    215-    // Test case 4, obtain result object.
    216-    {
    217-        ThreadPool threadPool(POOL_NAME);
    218-        threadPool.Start(NUM_WORKERS_DEFAULT);
    219-        std::future<bool> future_bool = threadPool.Submit([]() {
    220-            return true;
    221-        });
    222-        BOOST_CHECK(future_bool.get());
    223+BOOST_AUTO_TEST_CASE(future_wait_blocks_until_task_completes)
    224+{
    225+    ThreadPool threadPool{"wait_test"};
    226+    threadPool.Start(NUM_WORKERS_DEFAULT);
    227+    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
    228 
    229-        std::future<std::string> future_str = threadPool.Submit([]() {
    230-            return std::string("true");
    231-        });
    232-        std::string result = future_str.get();
    233-        BOOST_CHECK_EQUAL(result, "true");
    234+    const auto start{steady_clock::now()};
    235+
    236+    std::vector<std::future<void>> futures(num_tasks + 1);
    237+    for (size_t i{0}; i <= num_tasks; ++i) {
    238+        futures[i] = threadPool.Submit([i] { UninterruptibleSleep(milliseconds{i}); });
    239     }
    240+    WaitFor(futures);
    241 
    242-    // Test case 5, throw exception and catch it on the consumer side.
    243-    {
    244-        ThreadPool threadPool(POOL_NAME);
    245-        threadPool.Start(NUM_WORKERS_DEFAULT);
    246-
    247-        int ROUNDS = 5;
    248-        std::string err_msg{"something wrong happened"};
    249-        std::vector<std::future<void>> futures;
    250-        futures.reserve(ROUNDS);
    251-        for (int i = 0; i < ROUNDS; i++) {
    252-            futures.emplace_back(threadPool.Submit([err_msg, i]() {
    253-                throw std::runtime_error(err_msg + util::ToString(i));
    254-            }));
    255-        }
    256+    const size_t elapsed_ms{size_t(duration_cast<milliseconds>(steady_clock::now() - start).count())};
    257+    BOOST_CHECK(elapsed_ms >= num_tasks);
    258+}
    259 
    260-        for (int i = 0; i < ROUNDS; i++) {
    261-            try {
    262-                futures.at(i).get();
    263-                BOOST_FAIL("Expected exception not thrown");
    264-            } catch (const std::runtime_error& e) {
    265-                BOOST_CHECK_EQUAL(e.what(), err_msg + util::ToString(i));
    266-            }
    267-        }
    268-    }
    269+BOOST_AUTO_TEST_CASE(future_get_returns_task_result)
    270+{
    271+    ThreadPool threadPool{"result_test"};
    272+    threadPool.Start(NUM_WORKERS_DEFAULT);
    273 
    274-    // Test case 6, all workers are busy, help them by processing tasks from outside.
    275-    {
    276-        ThreadPool threadPool(POOL_NAME);
    277-        threadPool.Start(NUM_WORKERS_DEFAULT);
    278-
    279-        std::promise<void> blocker;
    280-        std::shared_future<void> blocker_future(blocker.get_future());
    281-        const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT, /*context=*/"test6 blocking tasks enabled");
    282-
    283-        // Now submit tasks and check that none of them are executed.
    284-        int num_tasks = 20;
    285-        std::atomic<int> counter = 0;
    286-        for (int i = 0; i < num_tasks; i++) {
    287-            threadPool.Submit([&counter]() {
    288-                counter.fetch_add(1);
    289-            });
    290-        }
    291-        std::this_thread::sleep_for(std::chrono::milliseconds{100});
    292-        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 20);
    293+    BOOST_CHECK_EQUAL(threadPool.Submit([] { return true; }).get(), true);
    294+    BOOST_CHECK_EQUAL(threadPool.Submit([] { return 42; }).get(), 42);
    295+    BOOST_CHECK_EQUAL(threadPool.Submit([] { return std::string{"true"}; }).get(), "true");
    296+}
    297 
    298-        // Now process manually
    299-        for (int i = 0; i < num_tasks; i++) {
    300-            threadPool.ProcessTask();
    301-        }
    302-        BOOST_CHECK_EQUAL(counter.load(), num_tasks);
    303-        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    304-        blocker.set_value();
    305-        threadPool.Stop();
    306-        WaitFor(blocking_tasks, "Failure waiting for test6 blocking task futures");
    307+BOOST_AUTO_TEST_CASE(task_exception_propagated_to_future)
    308+{
    309+    ThreadPool threadPool{"exception_test"};
    310+    threadPool.Start(NUM_WORKERS_DEFAULT);
    311+
    312+    const auto err{[&](size_t n) { return strprintf("error on thread #%s", n); }};
    313+
    314+    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
    315+    for (size_t i{0}; i < num_tasks; ++i) {
    316+        BOOST_CHECK_EXCEPTION(threadPool.Submit([&] { throw std::runtime_error(err(i)); }).get(), std::runtime_error, HasReason{err(i)});
    317     }
    318+}
    319+
    320+BOOST_AUTO_TEST_CASE(process_task_manually_when_workers_busy)
    321+{
    322+    ThreadPool threadPool{"manual_process"};
    323+    threadPool.Start(NUM_WORKERS_DEFAULT);
    324+    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
    325 
    326-    // Test case 7, recursive submission of tasks.
    327-    {
    328-        ThreadPool threadPool(POOL_NAME);
    329-        threadPool.Start(NUM_WORKERS_DEFAULT);
    330+    std::counting_semaphore sem{0};
    331+    const auto blocking_tasks{BlockWorkers(threadPool, sem, NUM_WORKERS_DEFAULT)};
    332 
    333-        std::promise<void> signal;
    334-        threadPool.Submit([&]() {
    335-            threadPool.Submit([&]() {
    336-                signal.set_value();
    337-            });
    338-        });
    339+    std::atomic_size_t counter{0};
    340+    std::vector<std::future<void>> futures(num_tasks);
    341+    for (auto& f : futures) f = threadPool.Submit([&counter] { counter.fetch_add(1, std::memory_order_relaxed); });
    342 
    343-        signal.get_future().wait();
    344-        threadPool.Stop();
    345-    }
    346+    UninterruptibleSleep(milliseconds{100});
    347+    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
    348 
    349-    // Test case 8, submit a task when all threads are busy and then stop the pool.
    350-    {
    351-        ThreadPool threadPool(POOL_NAME);
    352-        threadPool.Start(NUM_WORKERS_DEFAULT);
    353+    for (size_t i{0}; i < num_tasks; ++i) {
    354+        threadPool.ProcessTask();
    355+    }
    356 
    357-        std::promise<void> blocker;
    358-        std::shared_future<void> blocker_future(blocker.get_future());
    359-        const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT, /*context=*/"test8 blocking tasks enabled");
    360+    BOOST_CHECK_EQUAL(counter.load(), num_tasks);
    361+    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    362 
    363-        // Submit an extra task that should execute once a worker is free
    364-        std::future<bool> future = threadPool.Submit([]() { return true; });
    365+    sem.release(NUM_WORKERS_DEFAULT);
    366+    WaitFor(blocking_tasks);
    367+}
    368 
    369-        // At this point, all workers are blocked, and the extra task is queued
    370-        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
    371+BOOST_AUTO_TEST_CASE(recursive_task_submission)
    372+{
    373+    ThreadPool threadPool{"recursive"};
    374+    threadPool.Start(NUM_WORKERS_DEFAULT);
    375 
    376-        // Wait a short moment before unblocking the threads to mimic a concurrent shutdown
    377-        std::thread thread_unblocker([&blocker]() {
    378-            std::this_thread::sleep_for(std::chrono::milliseconds{300});
    379-            blocker.set_value();
    380+    std::promise<void> signal;
    381+    threadPool.Submit([&threadPool, &signal] {
    382+        threadPool.Submit([&signal] {
    383+            signal.set_value();
    384         });
    385+    });
    386 
    387-        // Stop the pool while the workers are still blocked
    388-        threadPool.Stop();
    389+    signal.get_future().wait();
    390+}
    391 
    392-        // Expect the submitted task to complete
    393-        BOOST_CHECK(future.get());
    394-        thread_unblocker.join();
    395+BOOST_AUTO_TEST_CASE(stop_completes_queued_tasks_gracefully)
    396+{
    397+    ThreadPool threadPool{"graceful_stop"};
    398+    threadPool.Start(NUM_WORKERS_DEFAULT);
    399 
    400-        // Obviously all the previously blocking tasks should be completed at this point too
    401-        WaitFor(blocking_tasks, "Failure waiting for test8 blocking task futures");
    402+    std::counting_semaphore sem{0};
    403+    const auto blocking_tasks{BlockWorkers(threadPool, sem, NUM_WORKERS_DEFAULT)};
    404 
    405-        // Pool should be stopped and no workers remaining
    406-        BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
    407-    }
    408+    auto future{threadPool.Submit([] { return true; })};
    409+    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
    410+
    411+    std::thread thread_unblocker{[&sem] {
    412+        std::this_thread::sleep_for(milliseconds{300});
    413+        sem.release(NUM_WORKERS_DEFAULT);
    414+    }};
    415+
    416+    threadPool.Stop();
    417+
    418+    BOOST_CHECK(future.get());
    419+    thread_unblocker.join();
    420+    WaitFor(blocking_tasks);
    421+    BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
    422 }
    423 
    424 BOOST_AUTO_TEST_SUITE_END()
    425diff --git a/src/util/threadpool.h b/src/util/threadpool.h
    426index 5d9884086e..c89fda37c2 100644
    427--- a/src/util/threadpool.h
    428+++ b/src/util/threadpool.h
    429@@ -24,6 +24,8 @@
    430 #include <utility>
    431 #include <vector>
    432 
    433+#include <tinyformat.h>
    434+
    435 /**
    436  * [@brief](/bitcoin-bitcoin/contributor/brief/) Fixed-size thread pool for running arbitrary tasks concurrently.
    437  *
    438@@ -62,16 +64,9 @@ private:
    439         for (;;) {
    440             std::packaged_task<void()> task;
    441             {
    442-                // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
    443-                if (!m_interrupt && m_work_queue.empty()) {
    444-                    // Block until the pool is interrupted or a task is available.
    445-                    m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
    446-                }
    447-
    448-                // If stopped and no work left, exit worker
    449-                if (m_interrupt && m_work_queue.empty()) {
    450-                    return;
    451-                }
    452+                // Block until the pool is interrupted or a task is available.
    453+                m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
    454+                if (m_interrupt && m_work_queue.empty()) return;
    455 
    456                 task = std::move(m_work_queue.front());
    457                 m_work_queue.pop();
    458@@ -101,17 +96,16 @@ public:
    459      *
    460      * Must be called from a controller (non-worker) thread.
    461      */
    462-    void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    463+    void Start(size_t num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    464     {
    465         assert(num_workers > 0);
    466         LOCK(m_mutex);
    467         if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
    468         m_interrupt = false; // Reset
    469 
    470-        // Create workers
    471         m_workers.reserve(num_workers);
    472-        for (int i = 0; i < num_workers; i++) {
    473-            m_workers.emplace_back(&util::TraceThread, m_name + "_pool_" + util::ToString(i), [this] { WorkerThread(); });
    474+        for (size_t i{0}; i < num_workers; i++) {
    475+            m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
    476         }
    477     }
    478 
    479@@ -179,12 +173,6 @@ public:
    480         task();
    481     }
    482 
    483-    void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    484-    {
    485-        WITH_LOCK(m_mutex, m_interrupt = true);
    486-        m_cv.notify_all();
    487-    }
    488-
    489     size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    490     {
    491         return WITH_LOCK(m_mutex, return m_work_queue.size());
    
  66. marcofleon commented at 7:03 pm on October 30, 2025: contributor

    I’ve been playing around with the fuzz test and wanted to update here, even if it’s still a bit inconclusive.

    I’m running into non-reproducible timeouts when using fork with libFuzzer. I’m overusing my cores by a lot (fork=25 plus the 3 workers each), and I’m not yet sure if these timeouts would happen eventually when using less cores.

    I have “fixed” this by changing notify_one in Submit to notify_all, although I can’t say I’m exactly sure why this works. It almost seems like some of the workers aren’t being woken up at all when only one is being notified. Maybe the same one somehow ends up being reused over and over, so it gets stuck waiting? But then with notify_all all idle workers are woken up with every task, so the thread pool is able to handle the cpu overload more effectively.

    edit: This should be reproducible in the sense that if you run with fork close to the number of total cores on your machine and set -timeout=30 you should get timeouts in less than an hour of fuzzing.

  67. in src/util/threadpool.h:103 in e1eb4cd3a5 outdated
     97+     * @brief Start worker threads.
     98+     *
     99+     * Creates and launches `num_workers` threads that begin executing tasks
    100+     * from the queue. If the pool is already started, throws.
    101+     *
    102+     * Must be called from a controller (non-worker) thread.
    


    Eunovo commented at 10:55 am on October 31, 2025:

    laanwj commented at 8:37 am on November 11, 2025:
    i remember that in the past we had tons of issues with thread-local variables. They’ve been a pain in the ass on some platforms, and decided to not use them except for one case. Not sure if this is still the case, but if not, this needs to be updated: https://github.com/bitcoin/bitcoin/blob/master/src/util/threadnames.cpp#L39

    fanquake commented at 9:35 am on November 11, 2025:
    There’s also our thread_local clang-tidy plugin: https://github.com/bitcoin/bitcoin/tree/master/contrib/devtools/bitcoin-tidy.

    Eunovo commented at 1:38 pm on November 11, 2025:

    There’s also our thread_local clang-tidy plugin: https://github.com/bitcoin/bitcoin/tree/master/contrib/devtools/bitcoin-tidy.

    I don’t see why we would need a thread local variable with a non-trivial desctructor to implement this.

    i remember that in the past we had tons of issues with thread-local variables. They’ve been a pain in the ass on some platforms, and decided to not use them except for one case. Not sure if this is still the case, but if not, this needs to be updated: https://github.com/bitcoin/bitcoin/blob/master/src/util/threadnames.cpp#L39

    I think it would be fine in this case because the use is limited. A simple thread-local boolean and assertion should be enough.

    I don’t have a strong opinion here; I just think it would be better to programmatically enforce the rule that certain functions should not be called from Worker Threads.


    furszy commented at 10:45 pm on November 13, 2025:

    Can we enforce this rule using a thread-local variable?

    As m_workers is guarded, we could enforce it in a simple way:

    0for (const auto& worker : m_workers) assert(worker.get_id() != std::this_thread::get_id());
    
  68. Eunovo commented at 12:48 pm on October 31, 2025: contributor

    Concept ACK https://github.com/bitcoin/bitcoin/pull/33689/commits/435e8b5a55033fbfc6428a612b9826713b3cf57a

    The PR looks good already, but I think we can block users from calling Threadpool::Start() and Threadpool::Stop() inside Worker threads; We can use a thread local variable to identify worker threads and reject the operation.

  69. furszy force-pushed on Nov 17, 2025
  70. in src/test/threadpool_tests.cpp:70 in adb891184e outdated
    68+{
    69+    ThreadPool threadPool(POOL_NAME);
    70+    BOOST_CHECK_EXCEPTION(threadPool.Submit([]{ return false; }), std::runtime_error, [&](const std::runtime_error& e) {
    71+        BOOST_CHECK_EQUAL(e.what(), "No active workers; cannot accept new tasks");
    72+        return true;
    73+    });
    


    l0rinc commented at 4:01 pm on November 17, 2025:

    Continuing the thread from #33689 (review):

    0#include <common/system.h>
    1+#include <test/util/setup_common.h>
    2#include <util/string.h>
    
    0    BOOST_CHECK_EXCEPTION(threadPool.Submit([]{ return false; }), std::runtime_error, HasReason{"No active workers; cannot accept new tasks"});
    

    furszy commented at 6:23 pm on November 17, 2025:

    Missed to fully answer your previous comment related to this change, sorry. I didn’t do it to not include setup_common.h, as it comes with way more dependencies than just HasReason().

    The idea is to try to reduce large Bitcoin-Core repo dependencies in this low level class, mainly when they do not add much value (this one just let us remove two lines of code with no behavior change), so this class and tests can be easily used elsewhere. We could move HasReason() to another test utils file too but I don’t think it worth the effort.


    l0rinc commented at 7:21 pm on November 17, 2025:
    This is needed in multiple methods here, it would simplify the diff and it provides a higher level primitive instead of repeating and reimplementing what we have already built a helper for. Not sure why we’d want to reduce dependencies here, what’s the advantage of that? I value clean code a lot more, especially for a test that doesn’t even have performance requirements? We can of course move HasReason in a separate PR, but I think we can use it here before that as well.

    furszy commented at 8:03 pm on November 17, 2025:

    Not sure why we’d want to reduce dependencies here, what’s the advantage of that?

    I wrote it above; the idea is to be able to use this low-level class and tests elsewhere, outside the project, just by pulling a few files without dragging in all our unit test framework machinery, which has tons other dependencies. If we were talking about a substantial improvement, I’d agree with you, but here it’s just a 2-line diff with no behavior change. And for me, that makes the rationale for including it not very convincing.


    l0rinc commented at 12:17 pm on November 19, 2025:

    the idea is to be able to use this low-level class and tests elsewhere

    I haven’t heard that argument before, why would we care about other projects wanting to copy-paste our code? Let them refactor, but we should write the best code for our project.

    but here it’s just a 2-line diff

    It’s not, please see my remaining suggestions many of which haven’t been applied yet.


    Also, since we’re ignoring the return value here, we should likely cast to void here:

    0    BOOST_CHECK_EXCEPTION((void)threadPool.Submit([] { return false; }), std::runtime_error, HasReason{"No active workers; cannot accept new tasks"});
    
  71. in src/test/threadpool_tests.cpp:27 in adb891184e outdated
    22+// 9) Congestion test; create more workers than available cores.
    23+// 10) Ensure Interrupt() prevents further submissions.
    24+BOOST_AUTO_TEST_SUITE(threadpool_tests)
    25+
    26+// General test values
    27+constexpr int NUM_WORKERS_DEFAULT = 3;
    


    l0rinc commented at 4:06 pm on November 17, 2025:

    Continuing the discussion in #33689 (review), could we randomize this on the call-site, so that we can exercise cases when we have other than 3 workers (to make sure we don’t introduce an accidental bias), i.e. in the test it would be something like:

    0    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
    

    furszy commented at 7:09 pm on November 17, 2025:

    While I like the extra coverage, I’m not fully convinced about adding this bit of non-determinism. That’s why I didn’t include it in the last push. It could make reproducing failures trickier, since we’d be adding another dimension to all tests; core counts behave differently across environments.

    Probably a good middle ground could be adding a separate test that picks a random worker count and prints the available core count on failure. That gives us some extra coverage without making the other tests harder to debug, and it makes it clear what’s needed to reproduce those specific failures.


    l0rinc commented at 7:25 pm on November 17, 2025:

    It’s not about coverage, but rather avoiding bias that we may introduce by hard-coding arbitrary values.

    I prefer a situation that is “hard to reproduce” over something that we won’t find because of hard-coded values.

    core counts behave differently across environments

    Exactly, let’s exercise those differences, that’s exactly what we want from our tests, to ruthlessly try to break the code, right?


    furszy commented at 8:31 pm on November 17, 2025:

    Exactly, let’s exercise those differences, that’s exactly what we want from our tests, to ruthlessly try to break the code, right?

    We’re going to side-track the discussion a bit, but I’m not sure I completely agree. Generally, I see unit tests as being meant for correctness, not stress testing. For example, we want to ensure certain behavior is always retained — it’s not about trying to break the code in a non-deterministic way. That’s what fuzz tests are for, where we randomize inputs. I’d also argue that we don’t yet have a proper heavy-load testing framework either.

    Still, IIRC, our current fuzzing framework has some limitations; the engine runs the function faster than the OS can release the threads, which causes memory usage to increase non-stop (at least that’s what I remember from a good number of experiments we did a few months ago).

    In any case, this is just a general software development sidetrack… sorry, couldn’t contain myself. We could still have a specific test case for this or dynamically adapt the number of workers if others are happy with that too.


    l0rinc commented at 12:14 pm on November 19, 2025:

    We could still have a specific test case for this or dynamically adapt the number of workers

    How would that solve the problem I highlighted, namely that 3 is a special value that introduces needless bias? We could theoretically have a bug that only manifests when the worker count equals (or exceeds) the CPU count (but maybe only happens for exceptions), which would never happen with 3 workers, but would sometimes fail correctly if we randomize instead of hard-code magic values.

  72. l0rinc changes_requested
  73. l0rinc commented at 5:00 pm on November 17, 2025: contributor
    Left a few more comments to help with moving this forward. I like the new structure more and after the tests and the restructuring are finished, I would like to spend some more time reviewing the ThreadPool as well - but I want to make sure we have a solid foundation first.
  74. util: introduce general purpose thread pool 258518d880
  75. fuzz: add test case for threadpool
    Co-authored-by: furszy <matiasfurszyfer@protonmail.com>
    49f4e91bdd
  76. http: replace WorkQueue and threads handling for ThreadPool
    Replace the HTTP server's WorkQueue implementation and single threads
    handling code with ThreadPool for processing HTTP requests. The
    ThreadPool class encapsulates all this functionality on a reusable
    class, properly unit and fuzz tested (the previous code was not
    unit nor fuzz tested at all).
    
    This cleanly separates responsibilities:
    The HTTP server now focuses solely on receiving and dispatching requests,
    while ThreadPool handles concurrency, queuing, and execution.
    It simplifies init, shutdown and requests tracking.
    
    This also allows us to experiment with further performance improvements at
    the task queuing and execution level, such as a lock-free structure, task
    prioritization or any other performance improvement in the future, without
    having to deal with HTTP code that lives on a different layer.
    2de0ce5cd8
  77. furszy force-pushed on Nov 18, 2025
  78. furszy commented at 8:04 pm on November 18, 2025: member

    I’ve been playing around with the fuzz test and wanted to update here, even if it’s still a bit inconclusive.

    I’m running into non-reproducible timeouts when using fork with libFuzzer. I’m overusing my cores by a lot (fork=25 plus the 3 workers each), and I’m not yet sure if these timeouts would happen eventually when using less cores.

    I have “fixed” this by changing notify_one in Submit to notify_all, although I can’t say I’m exactly sure why this works. It almost seems like some of the workers aren’t being woken up at all when only one is being notified. Maybe the same one somehow ends up being reused over and over, so it gets stuck waiting? But then with notify_all all idle workers are woken up with every task, so the thread pool is able to handle the cpu overload more effectively.

    edit: This should be reproducible in the sense that if you run with fork close to the number of total cores on your machine and set -timeout=30 you should get timeouts in less than an hour of fuzzing. @marcofleon I don’t think this is an issue. You’re just massively oversubscribing the CPU and lowering the timeout to the point where all the context switching triggers it. Switching to notify_all() only forces all workers awake on every submission, which masks the OS scheduler starvation you get in this kind of extreme setup.

  79. in src/test/threadpool_tests.cpp:23 in 258518d880 outdated
    18+// 5) The task throws an exception, catch must be done in the consumer side.
    19+// 6) Busy workers, help them by processing tasks externally.
    20+// 7) Recursive submission of tasks.
    21+// 8) Submit task when all threads are busy, stop pool and verify task gets executed.
    22+// 9) Congestion test; create more workers than available cores.
    23+// 10) Ensure Interrupt() prevents further submissions.
    


    l0rinc commented at 12:16 pm on November 19, 2025:
    we already have individual test cases, there’s no need to duplicate them here
  80. in src/util/threadpool.h:150 in 2de0ce5cd8
    145+    }
    146+
    147+    /**
    148+     * @brief Enqueues a new task for asynchronous execution.
    149+     *
    150+     * Returns a `std::future` that provides the task’s result or propagates
    


    l0rinc commented at 12:27 pm on November 19, 2025:

    nit: for consistency with other spelling, e.g. https://github.com/bitcoin/bitcoin/blob/2de0ce5cd85e1b99e318883964df318ffb615fe4/src/util/threadpool.h#L94

    0     * Returns a `std::future` that provides the task's result or propagates
    
  81. in src/util/threadpool.h:13 in 2de0ce5cd8
     8+#include <sync.h>
     9+#include <tinyformat.h>
    10+#include <util/check.h>
    11+#include <util/string.h>
    12+#include <util/thread.h>
    13+#include <util/threadinterrupt.h>
    


    l0rinc commented at 12:39 pm on November 19, 2025:

    some of these includes seem unused, can you please check?

     0 python3 /opt/homebrew/bin/iwyu_tool.py -p build-iwyu src/test/threadpool_tests.cpp -- -Xiwyu --cxx17ns -Xiwyu --mapping_file=$PWD/contrib/devtools/iwyu/bitcoin.core.i
     1mp -Xiwyu --max_line_length=160 -Xiwyu --check_also=$PWD/src/util/threadpool.h
     2
     3/Users/lorinc/IdeaProjects/bitcoin/src/util/threadpool.h should add these lines:
     4#include <__vector/vector.h>   // for vector
     5
     6/Users/lorinc/IdeaProjects/bitcoin/src/util/threadpool.h should remove these lines:
     7- #include <util/threadinterrupt.h>  // lines 13-13
     8- #include <atomic>  // lines 16-16
     9- #include <functional>  // lines 19-19
    10- #include <memory>  // lines 21-21
    11- #include <vector>  // lines 26-26
    12
    13The full include-list for /Users/lorinc/IdeaProjects/bitcoin/src/util/threadpool.h:
    14#include <__vector/vector.h>   // for vector
    15#include <sync.h>              // for UniqueLock, EXCLUSIVE_LOCKS_REQUIRED, LOCK, WITH_LOCK, GUARDED_BY, Mutex, REVERSE_LOCK, WAIT_LOCK
    16#include <tinyformat.h>        // for format, formatTruncated, formatValue, makeFormatList, strprintf
    17#include <util/check.h>        // for assert, inline_assertion_check, Assume
    18#include <util/string.h>       // for string, basic_string, basic_string_view
    19#include <util/thread.h>       // for TraceThread
    20#include <algorithm>           // for move
    21#include <condition_variable>  // for condition_variable
    22#include <cstddef>             // for size_t
    23#include <future>              // for packaged_task
    24#include <queue>               // for queue
    25#include <stdexcept>           // for runtime_error
    26#include <thread>              // for thread, get_id, operator==, __thread_id
    27#include <utility>             // for move, forward
    28---
    29
    30/Users/lorinc/IdeaProjects/bitcoin/src/test/threadpool_tests.cpp should add these lines:
    31#include <__vector/vector.h>                                        // for vector
    32#include <algorithm>                                                // for max
    33#include <atomic>                                                   // for atomic, memory_order_relaxed, memory_order_acquire, memory_order_release
    34#include <boost/preprocessor/arithmetic/limits/dec_256.hpp>         // for BOOST_PP_DEC_1, BOOST_PP_DEC_2, BOOST_PP_DEC_128, BOOST_PP_DEC_16, BOOST_PP_DEC_3
    35#include <boost/preprocessor/comparison/limits/not_equal_256.hpp>   // for BOOST_PP_NOT_EQUAL_1, BOOST_PP_NOT_EQUAL_CHECK_BOOST_PP_NOT_EQUAL_1
    36#include <boost/preprocessor/control/expr_iif.hpp>                  // for BOOST_PP_EXPR_IIF_1
    37#include <boost/preprocessor/control/iif.hpp>                       // for BOOST_PP_IIF_1, BOOST_PP_IIF_0
    38#include <boost/preprocessor/detail/limits/auto_rec_256.hpp>        // for BOOST_PP_NODE_ENTRY_256
    39#include <boost/preprocessor/logical/compl.hpp>                     // for BOOST_PP_COMPL_0
    40#include <boost/preprocessor/logical/limits/bool_256.hpp>           // for BOOST_PP_BOOL_0, BOOST_PP_BOOL_1, BOOST_PP_BOOL_2
    41#include <boost/preprocessor/repetition/detail/limits/for_256.hpp>  // for BOOST_PP_FOR_0, BOOST_PP_FOR_1, BOOST_PP_FOR_127, BOOST_PP_FOR_15, BOOST_PP_FOR_3
    42#include <boost/preprocessor/repetition/for.hpp>                    // for BOOST_PP_FOR_CHECK_BOOST_PP_NIL
    43#include <boost/preprocessor/seq/limits/elem_256.hpp>               // for BOOST_PP_SEQ_ELEM_0
    44#include <boost/preprocessor/seq/limits/size_256.hpp>               // for BOOST_PP_SEQ_SIZE_BOOST_PP_SEQ_SIZE_2, BOOST_PP_SEQ_SIZE_BOOST_PP_SEQ_SIZE_3
    45#include <boost/preprocessor/tuple/elem.hpp>                        // for BOOST_PP_TUPLE_ELEM_O_3
    46#include <boost/preprocessor/variadic/limits/elem_64.hpp>           // for BOOST_PP_VARIADIC_ELEM_3
    47#include <boost/test/tools/assertion_result.hpp>                    // for assertion_result
    48#include <boost/test/tools/old/interface.hpp>                       // for BOOST_TEST_TOOL_PASS_ARGS2, BOOST_TEST_TOOL_PASS_PRED2, BOOST_CHECK_EQUAL, BOOST_...
    49#include <boost/test/tree/auto_registration.hpp>                    // for auto_test_unit_registrar
    50#include <boost/test/unit_test_suite.hpp>                           // for BOOST_AUTO_TEST_CASE, BOOST_AUTO_TEST_CASE_FIXTURE, BOOST_AUTO_TEST_SUITE, BOOST_...
    51#include <boost/test/utils/basic_cstring/basic_cstring.hpp>         // for basic_cstring
    52#include <boost/test/utils/lazy_ostream.hpp>                        // for operator<<
    53#include <future>                                                   // for future, promise, future_status, shared_future
    54#include <stdexcept>                                                // for runtime_error
    55#include <thread>                                                   // for thread
    56
    57/Users/lorinc/IdeaProjects/bitcoin/src/test/threadpool_tests.cpp should remove these lines:
    58- #include <boost/test/unit_test.hpp>  // lines 10-10
    59
    60The full include-list for /Users/lorinc/IdeaProjects/bitcoin/src/test/threadpool_tests.cpp:
    61#include <__vector/vector.h>                                        // for vector
    62#include <common/system.h>                                          // for GetNumCores
    63#include <util/string.h>                                            // for basic_string, allocator, char_traits, string, ToString, operator+
    64#include <util/threadpool.h>                                        // for ThreadPool
    65#include <util/time.h>                                              // for UninterruptibleSleep, milliseconds, operator""s
    66#include <algorithm>                                                // for max
    67#include <atomic>                                                   // for atomic, memory_order_relaxed, memory_order_acquire, memory_order_release
    68#include <boost/preprocessor/arithmetic/limits/dec_256.hpp>         // for BOOST_PP_DEC_1, BOOST_PP_DEC_2, BOOST_PP_DEC_128, BOOST_PP_DEC_16, BOOST_PP_DEC_3
    69#include <boost/preprocessor/comparison/limits/not_equal_256.hpp>   // for BOOST_PP_NOT_EQUAL_1, BOOST_PP_NOT_EQUAL_CHECK_BOOST_PP_NOT_EQUAL_1
    70#include <boost/preprocessor/control/expr_iif.hpp>                  // for BOOST_PP_EXPR_IIF_1
    71#include <boost/preprocessor/control/iif.hpp>                       // for BOOST_PP_IIF_1, BOOST_PP_IIF_0
    72#include <boost/preprocessor/detail/limits/auto_rec_256.hpp>        // for BOOST_PP_NODE_ENTRY_256
    73#include <boost/preprocessor/logical/compl.hpp>                     // for BOOST_PP_COMPL_0
    74#include <boost/preprocessor/logical/limits/bool_256.hpp>           // for BOOST_PP_BOOL_0, BOOST_PP_BOOL_1, BOOST_PP_BOOL_2
    75#include <boost/preprocessor/repetition/detail/limits/for_256.hpp>  // for BOOST_PP_FOR_0, BOOST_PP_FOR_1, BOOST_PP_FOR_127, BOOST_PP_FOR_15, BOOST_PP_FOR_3
    76#include <boost/preprocessor/repetition/for.hpp>                    // for BOOST_PP_FOR_CHECK_BOOST_PP_NIL
    77#include <boost/preprocessor/seq/limits/elem_256.hpp>               // for BOOST_PP_SEQ_ELEM_0
    78#include <boost/preprocessor/seq/limits/size_256.hpp>               // for BOOST_PP_SEQ_SIZE_BOOST_PP_SEQ_SIZE_2, BOOST_PP_SEQ_SIZE_BOOST_PP_SEQ_SIZE_3
    79#include <boost/preprocessor/tuple/elem.hpp>                        // for BOOST_PP_TUPLE_ELEM_O_3
    80#include <boost/preprocessor/variadic/limits/elem_64.hpp>           // for BOOST_PP_VARIADIC_ELEM_3
    81#include <boost/test/tools/assertion_result.hpp>                    // for assertion_result
    82#include <boost/test/tools/old/interface.hpp>                       // for BOOST_TEST_TOOL_PASS_ARGS2, BOOST_TEST_TOOL_PASS_PRED2, BOOST_CHECK_EQUAL, BOOST_...
    83#include <boost/test/tree/auto_registration.hpp>                    // for auto_test_unit_registrar
    84#include <boost/test/unit_test_suite.hpp>                           // for BOOST_AUTO_TEST_CASE, BOOST_AUTO_TEST_CASE_FIXTURE, BOOST_AUTO_TEST_SUITE, BOOST_...
    85#include <boost/test/utils/basic_cstring/basic_cstring.hpp>         // for basic_cstring
    86#include <boost/test/utils/lazy_ostream.hpp>                        // for operator<<
    87#include <future>                                                   // for future, promise, future_status, shared_future
    88#include <stdexcept>                                                // for runtime_error
    89#include <thread>                                                   // for thread
    90---
    
  82. in src/util/threadpool.h:141 in 2de0ce5cd8
    136+            // Early shutdown to return right away on any concurrent 'Submit()' call
    137+            m_interrupt = true;
    138+            threads_to_join.swap(m_workers);
    139+        }
    140+        m_cv.notify_all();
    141+        for (auto& worker : threads_to_join) worker.join();
    


    l0rinc commented at 1:26 pm on November 19, 2025:
    https://corecheck.dev/bitcoin/bitcoin/pulls/33689 suggests using std::jthread instead, which would eliminate the need for manual joins - can you check if our infrastructure would support that?
  83. in src/util/threadpool.h:94 in 2de0ce5cd8
    89+public:
    90+    explicit ThreadPool(const std::string& name) : m_name(name) {}
    91+
    92+    ~ThreadPool()
    93+    {
    94+        Stop(); // In case it hasn't been stopped.
    


    l0rinc commented at 1:28 pm on November 19, 2025:
    nit: comment is redundant, the code already explains it. There are few other comments that don’t provide value, can you please check them?
  84. in src/util/threadpool.h:156 in 2de0ce5cd8
    151+     * any exception it throws.
    152+     * Note: Ignoring the returned future requires guarding the task against
    153+     * uncaught exceptions, as they would otherwise be silently discarded.
    154+     */
    155+    template <class F> EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    156+    auto Submit(F&& fn)
    


    l0rinc commented at 1:38 pm on November 19, 2025:

    nit: we could mark this as [[nodiscard]] and reformat with clang-format:

    0    template <class F>
    1    [[nodiscard]] EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) auto Enqueue(F&& fn)
    

    Note that this reveals that we often ignore the return value and should make it explicit if that’s deliberate or not. I also think that calling in Enqueue could describe its behavior better.

  85. in src/httpserver.cpp:256 in 2de0ce5cd8
    251@@ -327,13 +252,13 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
    252 
    253     // Dispatch to worker thread
    254     if (i != iend) {
    255-        std::unique_ptr<HTTPWorkItem> item(new HTTPWorkItem(std::move(hreq), path, i->handler));
    256-        assert(g_work_queue);
    257-        if (g_work_queue->Enqueue(item.get())) {
    258-            item.release(); /* if true, queue took ownership */
    259+        if ((int)g_threadpool_http.WorkQueueSize() < g_max_queue_depth) {
    260+            g_threadpool_http.Submit([req = std::move(hreq), in_path = std::move(path), fn = i->handler]() {
    


    l0rinc commented at 1:44 pm on November 19, 2025:

    as far as I can tell, we’re ignoring the return value here and silently ignoring any thrown thread exception - which we have tested so carefully.

    Since my understanding is that this is an asynchronous fire-and-forget task, we can’t just .get() the future here, but we can likely handle the potential exceptions inside and void the method to signal that we’re deliberately discarding the future (not forgetting it).

  86. in src/test/threadpool_tests.cpp:133 in 2de0ce5cd8
    128+{
    129+    ThreadPool threadPool(POOL_NAME);
    130+    threadPool.Start(NUM_WORKERS_DEFAULT);
    131+    std::atomic<bool> flag = false;
    132+    std::future<void> future = threadPool.Submit([&flag]() {
    133+        UninterruptibleSleep(std::chrono::milliseconds{200});
    


    l0rinc commented at 2:13 pm on November 19, 2025:
    0        UninterruptibleSleep(200ms);
    
  87. in src/test/threadpool_tests.cpp:136 in 2de0ce5cd8
    131+    std::atomic<bool> flag = false;
    132+    std::future<void> future = threadPool.Submit([&flag]() {
    133+        UninterruptibleSleep(std::chrono::milliseconds{200});
    134+        flag.store(true, std::memory_order_release);
    135+    });
    136+    BOOST_CHECK(future.wait_for(WAIT_TIMEOUT) == std::future_status::ready);
    


    l0rinc commented at 2:13 pm on November 19, 2025:
    0    BOOST_CHECK_EQUAL(future.wait_for(WAIT_TIMEOUT), std::future_status::ready);
    
  88. in src/httpserver.cpp:433 in 2de0ce5cd8
    428@@ -516,21 +429,17 @@ void InterruptHTTPServer()
    429         // Reject requests on current connections
    430         evhttp_set_gencb(eventHTTP, http_reject_request_cb, nullptr);
    431     }
    432-    if (g_work_queue) {
    433-        g_work_queue->Interrupt();
    434-    }
    435+    // Interrupt pool after disabling requests
    436+    g_threadpool_http.Interrupt();
    


    l0rinc commented at 3:07 pm on November 19, 2025:
    still not sure I fully understand the difference between stopping and interrupting - we should probably merge it with Stop and document it better
  89. in src/util/threadpool.h:78 in 2de0ce5cd8
    73+                if (m_interrupt && m_work_queue.empty()) {
    74+                    return;
    75+                }
    76+
    77+                task = std::move(m_work_queue.front());
    78+                m_work_queue.pop();
    


    l0rinc commented at 3:56 pm on November 19, 2025:

    we could make it a bit more specific by making

    0std::deque<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
    

    which would allow

    0                m_work_queue.pop_front();
    

    for symmetry with m_work_queue.front() and to clarify that

    0            m_work_queue.emplace(std::move(task));
    

    actually inserts at the back

    0            m_work_queue.emplace_back(std::move(task));
    
  90. in src/util/threadpool.h:175 in 2de0ce5cd8
    170+
    171+    /**
    172+     * @brief Execute a single queued task synchronously.
    173+     * Removes one task from the queue and executes it on the calling thread.
    174+     */
    175+    void ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    


    l0rinc commented at 4:28 pm on November 19, 2025:

    It seems to me this is a convenience method to expose the inner workings of the pool - but the current usages don’t seem to require it.

    // Test 6, all workers are busy, help them by processing tasks from outside

    It’s not obvious why that would be necessary, doesn’t this mean that we have misjudged the number of threads the pool should have?

    I would prefer having the simplest pool we can to get the job done and extend it when we actually need the functionality instead. Or if we do need this for some reason, we should be able to deduplicate since this is basically the exact same implementation and what the WorkerThread does in a loop:

     0std::counting_semaphore<> m_sem{0};
     1
     2void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     3{
     4    for (;;) {
     5        m_sem.acquire();
     6        if (!ProcessTask()) return;
     7    }
     8}
     9
    10...
    11
    12/**
    13 * [@brief](/bitcoin-bitcoin/contributor/brief/) Execute a single queued task synchronously.
    14 * Removes one task from the queue and executes it on the calling thread.
    15 * [@return](/bitcoin-bitcoin/contributor/return/) true if a task was executed, false if queue was empty
    16 */
    17bool ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    18{
    19    std::packaged_task<void()> task;
    20    {
    21        LOCK(m_mutex);
    22        if (m_work_queue.empty()) return false;
    23        task = std::move(m_work_queue.front());
    24        m_work_queue.pop_front();
    25    }
    26    task();
    27    return true;
    28}
    
  91. in src/util/threadpool.h:56 in 2de0ce5cd8
    51+    Mutex m_mutex;
    52+    std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
    53+    std::condition_variable m_cv;
    54+    // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
    55+    // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
    56+    // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
    


    l0rinc commented at 4:42 pm on November 19, 2025:
    The condition_variable(since C++11) with locks, reverse locks, waits, notifies can be modernized to a std::counting_semaphore<> m_sem{0}; (since C++20) that we release when we enqueue an element in Submit, in Interrupt() and Stop we release all of them, and in WorkerThread we m_sem.acquire(); before calling ProcessTask
  92. in src/util/threadpool.h:135 in 2de0ce5cd8
    130+        std::vector<std::thread> threads_to_join;
    131+        {
    132+            LOCK(m_mutex);
    133+            // Ensure 'Stop()' isn't called from any worker thread to avoid deadlocks
    134+            auto id = std::this_thread::get_id();
    135+            for (const auto& worker : m_workers) assert(worker.get_id() != id);
    


    l0rinc commented at 4:45 pm on November 19, 2025:
    tests are passing without these lines - can we cover them?
  93. in src/util/threadpool.h:144 in 2de0ce5cd8
    139+        }
    140+        m_cv.notify_all();
    141+        for (auto& worker : threads_to_join) worker.join();
    142+        // Since we currently wait for tasks completion, sanity-check empty queue
    143+        WITH_LOCK(m_mutex, Assume(m_work_queue.empty()));
    144+        // Note: m_interrupt is left true until next Start()
    


    l0rinc commented at 4:45 pm on November 19, 2025:
    I also find it weird that we can restart a ThreadPool - can we add a Start() → Stop() → Start() unit test if that something we want indeed? I understand it may not be trivial, but I would rather we restructure for RAII or some better lifecycle instead.
  94. in src/util/threadpool.h:133 in 2de0ce5cd8
    128+    {
    129+        // Notify workers and join them
    130+        std::vector<std::thread> threads_to_join;
    131+        {
    132+            LOCK(m_mutex);
    133+            // Ensure 'Stop()' isn't called from any worker thread to avoid deadlocks
    


    l0rinc commented at 4:46 pm on November 19, 2025:

    nit: we’ve used backticks in other similar cases

    0            // Ensure `Stop()` isn't called from any worker thread to avoid deadlocks
    
  95. in src/util/threadpool.h:136 in 2de0ce5cd8
    131+        {
    132+            LOCK(m_mutex);
    133+            // Ensure 'Stop()' isn't called from any worker thread to avoid deadlocks
    134+            auto id = std::this_thread::get_id();
    135+            for (const auto& worker : m_workers) assert(worker.get_id() != id);
    136+            // Early shutdown to return right away on any concurrent 'Submit()' call
    


    l0rinc commented at 4:46 pm on November 19, 2025:

    same:

    0            // Early shutdown to return right away on any concurrent `Submit()` call
    
  96. l0rinc changes_requested
  97. l0rinc commented at 6:03 pm on November 19, 2025: contributor

    After the tests, I have reviewed the ThreadPool as well now - first iteration, have a few questions, I expect a few more rounds of review. I left some modernization suggestions and fine-grained commit requests to make the change easily reviewable.

    I still share many of @andrewtoth’s concerns regarding RAII vs Start/Stop/Interrupt and I think we can modernize the pool from std::condition_variable with locks to C++20 std::counting_semaphore. Besides my suggestions below I think we should merge Stop/Interrupt maybe via a bool join = true parameter.

    There’s also some duplication between WorkerThread and ProcessTask that we can likely avoid (the test are passing, we can either simplify or improve our testing). There are also a few remaining inconsistencies (I have mentioned a few, repeating a some of them here for visibility). Similarly to before, I have tracked my suggestions locally, here’s the patch to simplify checking my suggestions against your local copy:

      0diff --git a/src/httpserver.cpp b/src/httpserver.cpp
      1index 6069062abd..c5033462ac 100644
      2--- a/src/httpserver.cpp
      3+++ b/src/httpserver.cpp
      4@@ -253,7 +253,7 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
      5     // Dispatch to worker thread
      6     if (i != iend) {
      7         if ((int)g_threadpool_http.WorkQueueSize() < g_max_queue_depth) {
      8-            g_threadpool_http.Submit([req = std::move(hreq), in_path = std::move(path), fn = i->handler]() {
      9+            (void)g_threadpool_http.Enqueue([req = std::move(hreq), in_path = std::move(path), fn = i->handler] {
     10                 fn(req.get(), in_path);
     11             });
     12         } else {
     13diff --git a/src/test/threadpool_tests.cpp b/src/test/threadpool_tests.cpp
     14index 8cbaf89ddf..56e4fc27e0 100644
     15--- a/src/test/threadpool_tests.cpp
     16+++ b/src/test/threadpool_tests.cpp
     17@@ -3,30 +3,18 @@
     18 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
     19 
     20 #include <common/system.h>
     21-#include <util/string.h>
     22+#include <test/util/setup_common.h>
     23 #include <util/threadpool.h>
     24 #include <util/time.h>
     25 
     26 #include <boost/test/unit_test.hpp>
     27 
     28-// Test Cases Overview
     29-// 0) Submit task to a non-started pool.
     30-// 1) Submit tasks and verify completion.
     31-// 2) Maintain all threads busy except one.
     32-// 3) Wait for work to finish.
     33-// 4) Wait for result object.
     34-// 5) The task throws an exception, catch must be done in the consumer side.
     35-// 6) Busy workers, help them by processing tasks externally.
     36-// 7) Recursive submission of tasks.
     37-// 8) Submit task when all threads are busy, stop pool and verify task gets executed.
     38-// 9) Congestion test; create more workers than available cores.
     39-// 10) Ensure Interrupt() prevents further submissions.
     40-BOOST_AUTO_TEST_SUITE(threadpool_tests)
     41-
     42-// General test values
     43-constexpr int NUM_WORKERS_DEFAULT = 3;
     44-constexpr char POOL_NAME[] = "test";
     45-constexpr auto WAIT_TIMEOUT = 120s;
     46+#include <latch>
     47+#include <semaphore>
     48+
     49+using namespace std::chrono;
     50+
     51+constexpr auto WAIT_TIMEOUT{120s};
     52 
     53 #define WAIT_FOR(futures)                                                         \
     54     do {                                                                          \
     55@@ -37,259 +25,233 @@ constexpr auto WAIT_TIMEOUT = 120s;
     56 
     57 // Block a number of worker threads by submitting tasks that wait on `blocker_future`.
     58 // Returns the futures of the blocking tasks, ensuring all have started and are waiting.
     59-std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::shared_future<void>& blocker_future, int num_of_threads_to_block)
     60+std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::counting_semaphore<>& release_sem, size_t num_of_threads_to_block)
     61 {
     62-    // Per-thread ready promises to ensure all workers are actually blocked
     63-    std::vector<std::promise<void>> ready_promises(num_of_threads_to_block);
     64-    std::vector<std::future<void>> ready_futures;
     65-    ready_futures.reserve(num_of_threads_to_block);
     66-    for (auto& p : ready_promises) ready_futures.emplace_back(p.get_future());
     67-
     68-    // Fill all workers with blocking tasks
     69-    std::vector<std::future<void>> blocking_tasks;
     70-    for (int i = 0; i < num_of_threads_to_block; i++) {
     71-        std::promise<void>& ready = ready_promises[i];
     72-        blocking_tasks.emplace_back(threadPool.Submit([blocker_future, &ready]() {
     73-            ready.set_value();
     74-            blocker_future.wait();
     75-        }));
     76-    }
     77-
     78-    // Wait until all threads are actually blocked
     79-    WAIT_FOR(ready_futures);
     80+    assert(threadPool.WorkersCount() >= num_of_threads_to_block);
     81+    std::latch ready{std::ptrdiff_t(num_of_threads_to_block)};
     82+    std::vector<std::future<void>> blocking_tasks(num_of_threads_to_block);
     83+    for (auto& f : blocking_tasks) f = threadPool.Enqueue([&] {
     84+        ready.count_down();
     85+        release_sem.acquire();
     86+    });
     87+    ready.wait();
     88     return blocking_tasks;
     89 }
     90 
     91-// Test 0, submit task to a non-started pool
     92+BOOST_FIXTURE_TEST_SUITE(threadpool_tests, BasicTestingSetup)
     93+
     94+// Submit task to a non-started pool
     95 BOOST_AUTO_TEST_CASE(submit_task_before_start_fails)
     96 {
     97-    ThreadPool threadPool(POOL_NAME);
     98-    BOOST_CHECK_EXCEPTION(threadPool.Submit([]{ return false; }), std::runtime_error, [&](const std::runtime_error& e) {
     99-        BOOST_CHECK_EQUAL(e.what(), "No active workers; cannot accept new tasks");
    100-        return true;
    101-    });
    102+    ThreadPool threadPool{"not_started"};
    103+    BOOST_CHECK_EXCEPTION((void)threadPool.Enqueue([] { return 0; }), std::runtime_error, HasReason{"No active workers"});
    104 }
    105 
    106-// Test 1, submit tasks and verify completion
    107+// Submit tasks and verify completion
    108 BOOST_AUTO_TEST_CASE(submit_tasks_complete_successfully)
    109 {
    110-    int num_tasks = 50;
    111-
    112-    ThreadPool threadPool(POOL_NAME);
    113-    threadPool.Start(NUM_WORKERS_DEFAULT);
    114-    std::atomic<int> counter = 0;
    115-
    116-    // Store futures to ensure completion before checking counter.
    117-    std::vector<std::future<void>> futures;
    118-    futures.reserve(num_tasks);
    119-    for (int i = 1; i <= num_tasks; i++) {
    120-        futures.emplace_back(threadPool.Submit([&counter, i]() {
    121-            counter.fetch_add(i, std::memory_order_relaxed);
    122-        }));
    123+    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
    124+    ThreadPool threadPool{"completion"};
    125+    threadPool.Start(num_workers);
    126+
    127+    const auto num_tasks{1 + m_rng.randrange<size_t>(50)};
    128+    std::atomic_size_t counter{0};
    129+
    130+    std::vector<std::future<void>> futures(num_tasks);
    131+    for (size_t i{0}; i < num_tasks; ++i) {
    132+        futures[i] = threadPool.Enqueue([&counter, i] { counter.fetch_add(i, std::memory_order_relaxed); });
    133     }
    134 
    135-    // Wait for all tasks to finish
    136     WAIT_FOR(futures);
    137-    int expected_value = (num_tasks * (num_tasks + 1)) / 2; // Gauss sum.
    138-    BOOST_CHECK_EQUAL(counter.load(), expected_value);
    139+    BOOST_CHECK_EQUAL(counter.load(std::memory_order_relaxed), (num_tasks - 1) * num_tasks / 2);
    140     BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    141 }
    142 
    143-// Test 2, maintain all threads busy except one
    144-BOOST_AUTO_TEST_CASE(single_available_worker_executes_all_tasks)
    145+// Block varying numbers of workers and verify remaining workers process all tasks
    146+BOOST_AUTO_TEST_CASE(limited_free_workers_processes_all_tasks)
    147 {
    148-    ThreadPool threadPool(POOL_NAME);
    149-    threadPool.Start(NUM_WORKERS_DEFAULT);
    150-    // Single blocking future for all threads
    151-    std::promise<void> blocker;
    152-    std::shared_future<void> blocker_future(blocker.get_future());
    153-    const auto blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT - 1);
    154-
    155-    // Now execute tasks on the single available worker
    156-    // and check that all the tasks are executed.
    157-    int num_tasks = 15;
    158-    int counter = 0;
    159-
    160-    // Store futures to wait on
    161-    std::vector<std::future<void>> futures(num_tasks);
    162-    for (auto& f : futures) f = threadPool.Submit([&counter]{ counter++; });
    163+    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
    164+    ThreadPool threadPool{"limited_workers"};
    165+    threadPool.Start(num_workers);
    166 
    167-    WAIT_FOR(futures);
    168-    BOOST_CHECK_EQUAL(counter, num_tasks);
    169+    const auto num_tasks{5 + m_rng.randrange<size_t>(20)};
    170+
    171+    for (size_t free{1}; free < num_workers; ++free) {
    172+        BOOST_TEST_MESSAGE("Testing with " << free << " available workers");
    173+        std::counting_semaphore sem{0};
    174+        const auto blocking_tasks{BlockWorkers(threadPool, sem, num_workers - free)};
    175+
    176+        size_t counter{0};
    177+        std::vector<std::future<void>> futures(num_tasks);
    178+        for (auto& f : futures) f = threadPool.Enqueue([&counter] { ++counter; });
    179+
    180+        WAIT_FOR(futures);
    181+        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    182+
    183+        if (free == 1) {
    184+            BOOST_CHECK_EQUAL(counter, num_tasks);
    185+        } else {
    186+            BOOST_CHECK_LE(counter, num_tasks); // unsynchronized update from multiple threads doesn't guarantee consistency
    187+        }
    188+
    189+        sem.release(num_workers - free);
    190+        WAIT_FOR(blocking_tasks);
    191+    }
    192 
    193-    blocker.set_value();
    194-    WAIT_FOR(blocking_tasks);
    195     threadPool.Stop();
    196     BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
    197 }
    198 
    199-// Test 3, wait for work to finish
    200+// Wait for work to finish
    201 BOOST_AUTO_TEST_CASE(wait_for_task_to_finish)
    202 {
    203-    ThreadPool threadPool(POOL_NAME);
    204-    threadPool.Start(NUM_WORKERS_DEFAULT);
    205-    std::atomic<bool> flag = false;
    206-    std::future<void> future = threadPool.Submit([&flag]() {
    207-        UninterruptibleSleep(std::chrono::milliseconds{200});
    208-        flag.store(true, std::memory_order_release);
    209-    });
    210-    BOOST_CHECK(future.wait_for(WAIT_TIMEOUT) == std::future_status::ready);
    211-    BOOST_CHECK(flag.load(std::memory_order_acquire));
    212+    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
    213+    ThreadPool threadPool{"wait_test"};
    214+    threadPool.Start(num_workers);
    215+
    216+    const auto num_tasks{1 + m_rng.randrange<size_t>(10)};
    217+    const auto start{SteadyClock::now()};
    218+
    219+    std::vector<std::future<void>> futures(num_tasks + 1);
    220+    for (size_t i{0}; i <= num_tasks; ++i) {
    221+        futures[i] = threadPool.Enqueue([i] { UninterruptibleSleep(milliseconds{i}); });
    222+    }
    223+
    224+    WAIT_FOR(futures);
    225+    const size_t elapsed_ms{size_t(duration_cast<milliseconds>(steady_clock::now() - start).count())};
    226+    BOOST_CHECK(elapsed_ms >= num_tasks);
    227 }
    228 
    229-// Test 4, obtain result object
    230+// Obtain result object
    231 BOOST_AUTO_TEST_CASE(get_result_from_completed_task)
    232 {
    233-    ThreadPool threadPool(POOL_NAME);
    234-    threadPool.Start(NUM_WORKERS_DEFAULT);
    235-    std::future<bool> future_bool = threadPool.Submit([]() { return true; });
    236-    BOOST_CHECK(future_bool.get());
    237-
    238-    std::future<std::string> future_str = threadPool.Submit([]() { return std::string("true"); });
    239-    std::string result = future_str.get();
    240-    BOOST_CHECK_EQUAL(result, "true");
    241+    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
    242+    ThreadPool threadPool{"result_test"};
    243+    threadPool.Start(num_workers);
    244+
    245+    BOOST_CHECK_EQUAL(threadPool.Enqueue([] { return true; }).get(), true);
    246+    BOOST_CHECK_EQUAL(threadPool.Enqueue([] { return 42; }).get(), 42);
    247+    BOOST_CHECK_EQUAL(threadPool.Enqueue([] { return std::string{"true"}; }).get(), "true");
    248 }
    249 
    250-// Test 5, throw exception and catch it on the consumer side
    251+// Throw exception and catch it on the consumer side
    252 BOOST_AUTO_TEST_CASE(task_exception_propagates_to_future)
    253 {
    254-    ThreadPool threadPool(POOL_NAME);
    255-    threadPool.Start(NUM_WORKERS_DEFAULT);
    256-
    257-    int num_tasks = 5;
    258-    std::string err_msg{"something wrong happened"};
    259-    std::vector<std::future<void>> futures;
    260-    futures.reserve(num_tasks);
    261-    for (int i = 0; i < num_tasks; i++) {
    262-        futures.emplace_back(threadPool.Submit([err_msg, i]() {
    263-            throw std::runtime_error(err_msg + util::ToString(i));
    264-        }));
    265+    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
    266+    ThreadPool threadPool{"exception_test"};
    267+    threadPool.Start(num_workers);
    268+
    269+    const auto make_err{[](size_t n) { return strprintf("error on thread #%s", n); }};
    270+    const auto num_tasks{5 + m_rng.randrange<size_t>(15)};
    271+
    272+    std::vector<std::future<void>> futures(num_tasks);
    273+    for (size_t i{0}; i < num_tasks; ++i) {
    274+        futures[i] = threadPool.Enqueue([&make_err, i] { throw std::runtime_error(make_err(i)); });
    275     }
    276 
    277-    for (int i = 0; i < num_tasks; i++) {
    278-        BOOST_CHECK_EXCEPTION(futures.at(i).get(), std::runtime_error, [&](const std::runtime_error& e) {
    279-            BOOST_CHECK_EQUAL(e.what(), err_msg + util::ToString(i));
    280-            return true;
    281-        });
    282+    for (size_t i{0}; i < num_tasks; ++i) {
    283+        BOOST_CHECK_EXCEPTION(futures[i].get(), std::runtime_error, HasReason{make_err(i)});
    284     }
    285 }
    286 
    287-// Test 6, all workers are busy, help them by processing tasks from outside
    288+// All workers are busy, help them by processing tasks from outside
    289 BOOST_AUTO_TEST_CASE(process_tasks_manually_when_workers_busy)
    290 {
    291-    ThreadPool threadPool(POOL_NAME);
    292-    threadPool.Start(NUM_WORKERS_DEFAULT);
    293-
    294-    std::promise<void> blocker;
    295-    std::shared_future<void> blocker_future(blocker.get_future());
    296-    const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT);
    297-
    298-    // Now submit tasks and check that none of them are executed.
    299-    int num_tasks = 20;
    300-    std::atomic<int> counter = 0;
    301-    for (int i = 0; i < num_tasks; i++) {
    302-        threadPool.Submit([&counter]() {
    303-            counter.fetch_add(1, std::memory_order_relaxed);
    304-        });
    305-    }
    306-    UninterruptibleSleep(std::chrono::milliseconds{100});
    307+    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
    308+    ThreadPool threadPool{"manual_process"};
    309+    threadPool.Start(num_workers);
    310+
    311+    std::counting_semaphore sem{0};
    312+    const auto blocking_tasks{BlockWorkers(threadPool, sem, num_workers)};
    313+
    314+    const auto num_tasks{10 + m_rng.randrange<size_t>(30)};
    315+    std::atomic_size_t counter{0};
    316+
    317+    std::vector<std::future<void>> futures(num_tasks);
    318+    for (auto& f : futures) f = threadPool.Enqueue([&counter] { counter.fetch_add(1, std::memory_order_relaxed); });
    319+
    320+    UninterruptibleSleep(100ms);
    321     BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
    322 
    323-    // Now process manually
    324-    for (int i = 0; i < num_tasks; i++) {
    325-        threadPool.ProcessTask();
    326-    }
    327-    BOOST_CHECK_EQUAL(counter.load(), num_tasks);
    328+    for (size_t i{0}; i < num_tasks; ++i) threadPool.ProcessTask();
    329+
    330+    BOOST_CHECK_EQUAL(counter.load(std::memory_order_relaxed), num_tasks);
    331     BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    332-    blocker.set_value();
    333+
    334+    WAIT_FOR(futures);
    335+
    336+    sem.release(num_workers);
    337     threadPool.Stop();
    338     WAIT_FOR(blocking_tasks);
    339 }
    340 
    341-// Test 7, submit tasks from other tasks
    342+// Submit tasks from other tasks
    343 BOOST_AUTO_TEST_CASE(recursive_task_submission)
    344 {
    345-    ThreadPool threadPool(POOL_NAME);
    346-    threadPool.Start(NUM_WORKERS_DEFAULT);
    347+    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
    348+    ThreadPool threadPool{"recursive"};
    349+    threadPool.Start(num_workers);
    350 
    351     std::promise<void> signal;
    352-    threadPool.Submit([&]() {
    353-        threadPool.Submit([&]() {
    354-            signal.set_value();
    355-        });
    356-    });
    357+    (void)threadPool.Enqueue([&] { (void)threadPool.Enqueue([&] { signal.set_value(); }); });
    358 
    359     signal.get_future().wait();
    360     threadPool.Stop();
    361 }
    362 
    363-// Test 8, submit task when all threads are busy and then stop the pool
    364+// Submit task when all threads are busy and then stop the pool
    365 BOOST_AUTO_TEST_CASE(task_submitted_while_busy_completes)
    366 {
    367-    ThreadPool threadPool(POOL_NAME);
    368-    threadPool.Start(NUM_WORKERS_DEFAULT);
    369-
    370-    std::promise<void> blocker;
    371-    std::shared_future<void> blocker_future(blocker.get_future());
    372-    const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT);
    373+    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
    374+    ThreadPool threadPool{"graceful_stop"};
    375+    threadPool.Start(num_workers);
    376 
    377-    // Submit an extra task that should execute once a worker is free
    378-    std::future<bool> future = threadPool.Submit([]() { return true; });
    379+    std::counting_semaphore sem{0};
    380+    const auto blocking_tasks{BlockWorkers(threadPool, sem, num_workers)};
    381 
    382-    // At this point, all workers are blocked, and the extra task is queued
    383+    std::future<bool> future{threadPool.Enqueue([] { return true; })};
    384     BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
    385 
    386-    // Wait a short moment before unblocking the threads to mimic a concurrent shutdown
    387-    std::thread thread_unblocker([&blocker]() {
    388-        UninterruptibleSleep(std::chrono::milliseconds{300});
    389-        blocker.set_value();
    390-    });
    391+    std::thread thread_unblocker{[&sem, num_workers] {
    392+        UninterruptibleSleep(300ms);
    393+        sem.release(num_workers);
    394+    }};
    395 
    396-    // Stop the pool while the workers are still blocked
    397     threadPool.Stop();
    398 
    399-    // Expect the submitted task to complete
    400     BOOST_CHECK(future.get());
    401     thread_unblocker.join();
    402-
    403-    // Obviously all the previously blocking tasks should be completed at this point too
    404     WAIT_FOR(blocking_tasks);
    405-
    406-    // Pool should be stopped and no workers remaining
    407     BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
    408 }
    409 
    410-// Test 9, more workers than available cores (congestion test)
    411+// More workers than available cores (congestion test)
    412 BOOST_AUTO_TEST_CASE(congestion_more_workers_than_cores)
    413 {
    414-    ThreadPool threadPool(POOL_NAME);
    415-    threadPool.Start(std::max(1, GetNumCores() * 2)); // Oversubscribe by 2×
    416-
    417-    int num_tasks = 200;
    418-    std::atomic<int> counter{0};
    419-
    420-    std::vector<std::future<void>> futures;
    421-    futures.reserve(num_tasks);
    422-    for (int i = 0; i < num_tasks; i++) {
    423-        futures.emplace_back(threadPool.Submit([&counter] {
    424-            counter.fetch_add(1, std::memory_order_relaxed);
    425-        }));
    426+    const auto oversubscribe_factor{2 + m_rng.randrange<int>(3)};
    427+    ThreadPool threadPool{"congestion"};
    428+    threadPool.Start(std::max(1, GetNumCores() * oversubscribe_factor));
    429+
    430+    const auto num_tasks{100 + m_rng.randrange<size_t>(200)};
    431+    std::atomic_size_t counter{0};
    432+
    433+    std::vector<std::future<void>> futures(num_tasks);
    434+    for (auto& f : futures) {
    435+        f = threadPool.Enqueue([&counter] { counter.fetch_add(1, std::memory_order_relaxed); });
    436     }
    437 
    438     WAIT_FOR(futures);
    439-    BOOST_CHECK_EQUAL(counter.load(), num_tasks);
    440+    BOOST_CHECK_EQUAL(counter.load(std::memory_order_relaxed), num_tasks);
    441 }
    442 
    443-// Test 10, Interrupt() prevents further submissions
    444+// Interrupt() prevents further submissions
    445 BOOST_AUTO_TEST_CASE(interrupt_blocks_new_submissions)
    446 {
    447-    ThreadPool threadPool(POOL_NAME);
    448-    threadPool.Start(NUM_WORKERS_DEFAULT);
    449+    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
    450+    ThreadPool threadPool{"interrupt"};
    451+    threadPool.Start(num_workers);
    452     threadPool.Interrupt();
    453-    BOOST_CHECK_EXCEPTION(threadPool.Submit([]{}), std::runtime_error, [&](const std::runtime_error& e) {
    454-        BOOST_CHECK_EQUAL(e.what(), "No active workers; cannot accept new tasks");
    455-        return true;
    456-    });
    457+    BOOST_CHECK_EXCEPTION((void)threadPool.Enqueue([] {}), std::runtime_error, HasReason{"No active workers"});
    458 }
    459 
    460 BOOST_AUTO_TEST_SUITE_END()
    461diff --git a/src/util/threadpool.h b/src/util/threadpool.h
    462index b489e34c2f..f224c81b55 100644
    463--- a/src/util/threadpool.h
    464+++ b/src/util/threadpool.h
    465@@ -8,19 +8,13 @@
    466 #include <sync.h>
    467 #include <tinyformat.h>
    468 #include <util/check.h>
    469-#include <util/string.h>
    470 #include <util/thread.h>
    471-#include <util/threadinterrupt.h>
    472 
    473-#include <algorithm>
    474-#include <atomic>
    475-#include <condition_variable>
    476-#include <cstddef>
    477-#include <functional>
    478+#include <deque>
    479 #include <future>
    480-#include <memory>
    481-#include <queue>
    482+#include <semaphore>
    483 #include <stdexcept>
    484+#include <string>
    485 #include <thread>
    486 #include <utility>
    487 #include <vector>
    488@@ -46,53 +40,26 @@
    489  */
    490 class ThreadPool
    491 {
    492-private:
    493     std::string m_name;
    494     Mutex m_mutex;
    495-    std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
    496-    std::condition_variable m_cv;
    497-    // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
    498-    // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
    499-    // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
    500+    std::deque<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
    501     bool m_interrupt GUARDED_BY(m_mutex){false};
    502     std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
    503 
    504+    std::counting_semaphore<> m_sem{0};
    505+
    506     void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    507     {
    508-        WAIT_LOCK(m_mutex, wait_lock);
    509         for (;;) {
    510-            std::packaged_task<void()> task;
    511-            {
    512-                // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
    513-                if (!m_interrupt && m_work_queue.empty()) {
    514-                    // Block until the pool is interrupted or a task is available.
    515-                    m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
    516-                }
    517-
    518-                // If stopped and no work left, exit worker
    519-                if (m_interrupt && m_work_queue.empty()) {
    520-                    return;
    521-                }
    522-
    523-                task = std::move(m_work_queue.front());
    524-                m_work_queue.pop();
    525-            }
    526-
    527-            {
    528-                // Execute the task without the lock
    529-                REVERSE_LOCK(wait_lock, m_mutex);
    530-                task();
    531-            }
    532+            m_sem.acquire();
    533+            if (!ProcessTask()) return;
    534         }
    535     }
    536 
    537 public:
    538     explicit ThreadPool(const std::string& name) : m_name(name) {}
    539 
    540-    ~ThreadPool()
    541-    {
    542-        Stop(); // In case it hasn't been stopped.
    543-    }
    544+    ~ThreadPool() { Stop(); }
    545 
    546     /**
    547      * [@brief](/bitcoin-bitcoin/contributor/brief/) Start worker threads.
    548@@ -109,7 +76,6 @@ public:
    549         if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
    550         m_interrupt = false; // Reset
    551 
    552-        // Create workers
    553         m_workers.reserve(num_workers);
    554         for (int i = 0; i < num_workers; i++) {
    555             m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
    556@@ -130,14 +96,15 @@ public:
    557         std::vector<std::thread> threads_to_join;
    558         {
    559             LOCK(m_mutex);
    560-            // Ensure 'Stop()' isn't called from any worker thread to avoid deadlocks
    561-            auto id = std::this_thread::get_id();
    562+            // Ensure `Stop()` isn't called from any worker thread to avoid deadlocks
    563+            const auto id{std::this_thread::get_id()};
    564             for (const auto& worker : m_workers) assert(worker.get_id() != id);
    565-            // Early shutdown to return right away on any concurrent 'Submit()' call
    566+            // Early shutdown to return right away on any concurrent `Submit()` call
    567             m_interrupt = true;
    568             threads_to_join.swap(m_workers);
    569         }
    570-        m_cv.notify_all();
    571+        m_sem.release(threads_to_join.size());
    572+
    573         for (auto& worker : threads_to_join) worker.join();
    574         // Since we currently wait for tasks completion, sanity-check empty queue
    575         WITH_LOCK(m_mutex, Assume(m_work_queue.empty()));
    576@@ -147,13 +114,13 @@ public:
    577     /**
    578      * [@brief](/bitcoin-bitcoin/contributor/brief/) Enqueues a new task for asynchronous execution.
    579      *
    580-     * Returns a `std::future` that provides the task’s result or propagates
    581+     * Returns a `std::future` that provides the task's result or propagates
    582      * any exception it throws.
    583      * Note: Ignoring the returned future requires guarding the task against
    584      * uncaught exceptions, as they would otherwise be silently discarded.
    585      */
    586-    template <class F> EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    587-    auto Submit(F&& fn)
    588+    template <class F>
    589+    [[nodiscard]] EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) auto Enqueue(F&& fn)
    590     {
    591         std::packaged_task task{std::forward<F>(fn)};
    592         auto future{task.get_future()};
    593@@ -162,34 +129,34 @@ public:
    594             if (m_interrupt || m_workers.empty()) {
    595                 throw std::runtime_error("No active workers; cannot accept new tasks");
    596             }
    597-            m_work_queue.emplace(std::move(task));
    598+            m_work_queue.emplace_back(std::move(task));
    599         }
    600-        m_cv.notify_one();
    601+        m_sem.release();
    602         return future;
    603     }
    604 
    605     /**
    606      * [@brief](/bitcoin-bitcoin/contributor/brief/) Execute a single queued task synchronously.
    607      * Removes one task from the queue and executes it on the calling thread.
    608+     * [@return](/bitcoin-bitcoin/contributor/return/) true if a task was executed, false if queue was empty
    609      */
    610-    void ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    611+    bool ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    612     {
    613         std::packaged_task<void()> task;
    614         {
    615             LOCK(m_mutex);
    616-            if (m_work_queue.empty()) return;
    617-
    618-            // Pop the task
    619+            if (m_work_queue.empty()) return false;
    620             task = std::move(m_work_queue.front());
    621-            m_work_queue.pop();
    622+            m_work_queue.pop_front();
    623         }
    624         task();
    625+        return true;
    626     }
    627 
    628     void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    629     {
    630         WITH_LOCK(m_mutex, m_interrupt = true);
    631-        m_cv.notify_all();
    632+        m_sem.release(WorkersCount());
    633     }
    634 
    635     size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    

github-metadata-mirror

This is a metadata mirror of the GitHub repository bitcoin/bitcoin. This site is not affiliated with GitHub. Content is generated from a GitHub metadata backup.
generated: 2025-11-28 06:13 UTC

This site is hosted by @0xB10C
More mirrored repositories can be found on mirror.b10c.me