http: replace WorkQueue and single threads handling for ThreadPool #33689

pull furszy wants to merge 5 commits into bitcoin:master from furszy:2025_threadpool_http_server changing 8 files +694 −120
  1. furszy commented at 2:36 PM on October 23, 2025: member

    This has been a recent discovery; the general thread pool class created for #26966, cleanly integrates into the HTTP server. It simplifies init, shutdown and requests execution logic. Replacing code that was never unit tested for code that is properly unit and fuzz tested. Although our functional test framework extensively uses this RPC interface (that’s how we’ve been ensuring its correct behavior so far - which is not the best).

    This clearly separates the responsibilities: The HTTP server now focuses solely on receiving and dispatching requests, while ThreadPool handles concurrency, queuing, and execution.

    This will also allows us to experiment with further performance improvements at the task queuing and execution level, such as a lock-free structure or task prioritization or any other implementation detail like coroutines in the future, without having to deal with HTTP code that lives on a different layer.

    Note: The rationale behind introducing the ThreadPool first is to be able to easily cherry-pick it across different working paths. Some of the ones that are benefited from it are #26966 for the parallelization of the indexes initial sync, #31132 for the parallelization of the inputs fetching procedure, #32061 for the libevent replacement, the kernel API #30595 (https://github.com/bitcoin/bitcoin/pull/30595#discussion_r2413702370) to avoid blocking validation among others use cases not publicly available.

    Note 2: I could have created a wrapper around the existing code and replaced the WorkQueue in a subsequent commit, but it didn’t seem worth the extra commits and review effort. The ThreadPool implements essentially the same functionality in a more modern and cleaner way.

  2. DrahtBot added the label RPC/REST/ZMQ on Oct 23, 2025
  3. DrahtBot commented at 2:36 PM on October 23, 2025: contributor

    <!--e57a25ab6845829454e8d69fc972939a-->

    The following sections might be updated with supplementary metadata relevant to reviewers and maintainers.

    <!--006a51241073e994b41acfe9ec718e94-->

    Code Coverage & Benchmarks

    For details see: https://corecheck.dev/bitcoin/bitcoin/pulls/33689.

    <!--021abf342d371248e50ceaed478a90ca-->

    Reviews

    See the guideline for information on the review process.

    Type Reviewers
    ACK Eunovo, sedited, pinheadmz
    Stale ACK ismaelsadeeq

    If your review is incorrectly listed, please copy-paste <code>&lt;!--meta-tag:bot-skip--&gt;</code> into the comment that the bot should ignore.

    <!--174a7506f384e20aa4161008e828411d-->

    Conflicts

    Reviewers, this pull request conflicts with the following ones:

    • #34411 ([POC] Full Libevent removal by fanquake)
    • #34400 (wallet: parallel fast rescan (approx 5x speed up with 16 threads) by Eunovo)
    • #32061 (Replace libevent with our own HTTP and socket-handling implementation by pinheadmz)
    • #26966 (index: initial sync speedup, parallelize process by furszy)

    If you consider this pull request important, please also help to review the conflicting pull requests. Ideally, start with the one that should be merged first.

    <!--5faf32d7da4f0f540f40219e4f7537a3-->

    LLM Linter (✨ experimental)

    Possible typos and grammar issues:

    • servers responses -> server responses or servers' responses [possessive is missing; "servers responses" is ungrammatical]

    <sup>2026-01-30 21:18:09</sup>

  4. pinheadmz commented at 2:38 PM on October 23, 2025: member

    concept ACK :-) will be reviewing this

  5. laanwj requested review from laanwj on Oct 23, 2025
  6. laanwj commented at 2:50 PM on October 23, 2025: member

    Adding myself as i wrote the original shitty code.

  7. Raimo33 commented at 9:54 AM on October 24, 2025: contributor

    Can't we use an already existing open source library instead of reinventing the wheel?

  8. in src/test/fuzz/threadpool.cpp:53 in bb71b0e3bf outdated
      48 | +// Global to verify we always have the same number of threads.
      49 | +size_t g_num_workers = 3;
      50 | +
      51 | +static void setup_threadpool_test()
      52 | +{
      53 | +    // Disable logging entirely. It seems to cause memory leaks.
    


    l0rinc commented at 4:31 PM on October 24, 2025:

    Can we investigate that?


    furszy commented at 8:22 PM on October 30, 2025:

    It is a known issue. The pcp fuzz test does it too for the same reason (in an undocumented manner). I only document it properly so we don't forget this exists. Feel free to investigate it in a separate issue/PR. I'm not planning to do it. It is not an issue of the code introduced in this PR.


    l0rinc commented at 1:22 PM on October 31, 2025:

    k, thanks, please resolve the comment

  9. furszy commented at 7:17 PM on October 24, 2025: member

    Can't we use an already existing open source library instead of reinventing the wheel?

    That's a good question. It's usually where we all start. Generally, the project consensus is to avoid introducing new external dependencies (unless they’re maintained by us) to minimize potential attack vectors. This doesn’t mean we should reinvent everything, just that we need to be very careful about what we decide to include.

    That being said, for the changes introduced in this PR, can argue that we’re encapsulating, documenting, and unit + fuzz testing code that wasn’t covered before, while also improving separation of responsibilities. We’re not adding anything more complex or that behaves radically differently from what we currently have. The nice property of this PR is that it will let us experiment with more complex approaches in the future without having to deal with application-specific code (like the HTTP server code). This also includes learning from other open source libraries for sure.

  10. in src/test/threadpool_tests.cpp:1 in c219b93c3b outdated
       0 | @@ -0,0 +1,267 @@
       1 | +// Copyright (c) 2024-present The Bitcoin Core developers
    


    pinheadmz commented at 7:55 PM on October 27, 2025:

    c219b93c3b043de202bdf3c65b433fd17af2da89

    nit: don't bother with the year any more... (applies to all new files)


    furszy commented at 3:55 PM on October 29, 2025:

    Sure. Done as suggested.

  11. sedited commented at 10:17 PM on October 27, 2025: contributor

    Approach ACK

  12. in src/util/threadpool.h:53 in c219b93c3b outdated
      48 | +private:
      49 | +    std::string m_name;
      50 | +    Mutex m_mutex;
      51 | +    std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
      52 | +    std::condition_variable m_cv;
      53 | +    // Note: m_interrupt must be modified while holding the same mutex used by threads waiting on the condition variable.
    


    andrewtoth commented at 2:38 PM on October 28, 2025:

    Isn't this comment already implied by GUARDED_BY(m_mutex)? Also it's written by the control thread and read by worker threads, so of course it must only be modified while holding the mutex. This comment doesn't add anything IMO.


    furszy commented at 3:18 PM on October 28, 2025:

    Isn't this comment already implied by GUARDED_BY(m_mutex)? Also it's written by the control thread and read by worker threads, so of course it must only be modified while holding the mutex. This comment doesn't add anything IMO.

    What I meant there is that the variable shouldn’t be an atomic bool alone (in case someone propose to change it in the future). It must share the same mutex as the condition variable; otherwise, workers might not see the update. This is stated in the std::condition_variable ref. I mention it because it is very tempting to avoid locking the mutex during task submission in this way, and might seen like a harmless optimization, but it turns out to cause very subtle issues.

    Maybe you have a better way to state this? Happy to change it if it is not being understood as supposed.


    andrewtoth commented at 4:04 PM on October 28, 2025:

    I see. Then perhaps more directly:

        // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
    

    furszy commented at 7:07 PM on October 28, 2025:

    sounds good, taken. thanks!

  13. in src/util/threadpool.h:182 in c219b93c3b outdated
     177 | +            m_work_queue.pop();
     178 | +        }
     179 | +        task();
     180 | +    }
     181 | +
     182 | +    void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    


    andrewtoth commented at 2:51 PM on October 28, 2025:

    I'm unclear what the purpose of this function is. It interrupts the workers, like Stop, but doesn't join them or wait for them to finish. If the idea is to clear the workqueue and park the threads without being able to add more tasks, it does not accomplish that since the threads will exit their loops as well and not be parked on the cv.


    furszy commented at 3:35 PM on October 28, 2025:

    I'm unclear what the purpose of this function is. It interrupts the workers, like Stop, but doesn't join them or wait for them to finish. If the idea is to clear the workqueue and park the threads without being able to add more tasks, it does not accomplish that since the threads will exit their loops as well and not be parked on the cv.

    Actually, the goal is to keep the current shutdown semantics unchanged. The process already follows two stages: first, we stop accepting new requests and events (e.g. unregistering the libevent callback to stop incoming requests); then, once no new work can be queued, we call Stop() and wait for the remaining tasks to finish before tearing down the objects.

    Interrupt() belongs to the first stage. It basically block new task submissions during shutdown to avoid queue growth. It’s not intended to clear the queue or park the threads. It is there just to avoid pilling up new tasks during shutdown.


    andrewtoth commented at 3:52 PM on October 28, 2025:

    I see, so the goal is to stop the thread pool but in a non-blocking way. Could the same be done via a bool parameter to Stop to skip joining? Then Stop will be called without that bool set in the destructor to join the threads, if this behavior is desired (like it is in the http server). Also, could we modify Start in that case to not assert on number of threads if m_interrupt is true, and instead join them if we stilll have outstanding threads?


    furszy commented at 7:01 PM on October 28, 2025:

    Could the same be done via a bool parameter to Stop to skip joining?

    If we do that, the threads wouldn't remain in the m_workers vector anymore. They'd be swapped out on the first call. That means we wouldn't be able to wait for them to finish later on, which would be quite bad since we’d lose track of when all requests are fulfilled before destroying the backend objects (that's basically what joining the threads mean for us right now). And this could lead to requests accessing null pointers if we proceed with the shutdown without waiting on the threads to join, etc.

    Also, could we modify Start in that case to not assert on number of threads, and instead join them if m_interrupt is true and we still have outstanding threads?

    We could also help them process task by calling ProcessTask() if something like that happens. But I'm not sure that's the best design. We would be integrating Stop() inside Start(), which would make callers less careful on when to call Stop(), which should be part of their code design.

    Still, I don't think we should worry nor overthink this now. The current code starts and stops the thread pool only once during the entire app's lifecycle.


    andrewtoth commented at 7:09 PM on October 28, 2025:

    I was thinking like this

    diff --git a/src/util/threadpool.h b/src/util/threadpool.h
    index 94409facd5..dc1a218abd 100644
    --- a/src/util/threadpool.h
    +++ b/src/util/threadpool.h
    @@ -123,19 +123,19 @@ public:
          *
          * Must be called from a controller (non-worker) thread.
          */
    -    void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    +    void Stop(bool join_threads = true) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
         {
             // Notify workers and join them.
             std::vector<std::thread> threads_to_join;
             {
                 LOCK(m_mutex);
                 m_interrupt = true;
    -            threads_to_join.swap(m_workers);
    +            if (join_threads) threads_to_join.swap(m_workers);
             }
             m_cv.notify_all();
             for (auto& worker : threads_to_join) worker.join();
             // Since we currently wait for tasks completion, sanity-check empty queue
    -        WITH_LOCK(m_mutex, Assume(m_work_queue.empty()));
    +         if (join_threads) WITH_LOCK(m_mutex, Assume(m_work_queue.empty()));
             // Note: m_interrupt is left true until next Start()
         }
     
    @@ -179,12 +179,6 @@ public:
             task();
         }
     
    -    void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    -    {
    -        WITH_LOCK(m_mutex, m_interrupt = true);
    -        m_cv.notify_all();
    -    }
    -
         size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
         {
             return WITH_LOCK(m_mutex, return m_work_queue.size());
    

    Still, I don't think we should worry nor overthink this now. The current code starts and stops the thread pool only once during the entire app's lifecycle.

    Err, then why not have threads start during construction? Otherwise there's no need for Start.


    furszy commented at 7:15 PM on October 28, 2025:

    You know, we could call ProcessTask() during Stop() too, which would be a strict improvement over the current locking-wait behavior in master (since we would shut down faster by actively processing pending requests on the shutdown thread as well). This is something that hasn’t been possible until now. Still, this is material for a follow-up; would say that it is better to keep this PR as contained as possible so all other PRs and working paths that depend on this structure can actually make use of it.


    andrewtoth commented at 7:22 PM on October 28, 2025:

    That is a good idea, but not related to my suggestion here :). Yes, that would be a good followup. I don't think we should have a method that would cause the node to crash if Start is called more than once. I disagree and think just calling Stop at the beginning of Start would be safer than the current implementation.


    furszy commented at 7:37 PM on October 28, 2025:

    hehe, it seems we sent a message at a similar time and missed yours.

    re Interrupt() removal suggestion:

    I find it harder to reason about the current two stages shutdown procedure if we have to call to the same Stop() method twice. At that point it would be simpler to just wait for all tasks to finish on the first call and remove the second one. But that seems suboptimal as there is no rush to wait for them at that point of the shutdown sequence.

    Also, a benefit of keeping Interrupt() is that we can build on top of it and have a faster and non-locking way of checking if the thread pool is enabled. So callers would be able to call something like pool.IsRunning() to see if they can submit tasks without have to worry locking the main mutex and slowing down the workers processing. Something like:

    diff --git a/src/util/threadpool.h b/src/util/threadpool.h
    --- a/src/util/threadpool.h	(revision e0ec3232daf2c311471a1da149821bed18853fcc)
    +++ b/src/util/threadpool.h	(date 1761158485938)
    @@ -55,6 +55,9 @@
         // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
         bool m_interrupt GUARDED_BY(m_mutex){false};
         std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
    +    // Enabled only after Start and disabled early on Stop/Interrupt.
    +    // This lets us do non-blocking 'IsRunning' checks.
    +    std::atomic<bool> m_running{false};
     
         void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
         {
    @@ -109,9 +112,11 @@
             m_interrupt = false; // Reset
     
             // Create workers
             m_workers.reserve(num_workers);
             for (int i = 0; i < num_workers; i++) {
                 m_workers.emplace_back(&util::TraceThread, m_name + "_pool_" + util::ToString(i), [this] { WorkerThread(); });
             }
    +        m_running.store(true, std::memory_order_release);
         }
     
         /**
    @@ -124,7 +129,9 @@
          */
         void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
         {
    -        // Notify workers and join them.
    +        // Mark as no longer accepting new tasks
    +        m_running.store(false, std::memory_order_release);
    +        // Notify workers and join them
             std::vector<std::thread> threads_to_join;
             {
                 LOCK(m_mutex);
    @@ -147,11 +154,16 @@
         template<class F> EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
         auto Submit(F&& fn)
         {
    +        // Quick rejection based on Start/Stop/Interrupt before locking
    +        if (!m_running.load(std::memory_order_acquire)) {
    +            throw std::runtime_error("ThreadPool not running");
    +        }
    +
             std::packaged_task task{std::forward<F>(fn)};
             auto future{task.get_future()};
             {
                 LOCK(m_mutex);
                 if (m_workers.empty() || m_interrupt) {
                     throw std::runtime_error("No active workers; cannot accept new tasks");
                 }
                 m_work_queue.emplace(std::move(task));
    @@ -180,7 +192,9 @@
     
         void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
         {
    +        m_running.store(false, std::memory_order_release);
             WITH_LOCK(m_mutex, m_interrupt = true);
             m_cv.notify_all();
         }
     
         size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    @@ -192,6 +206,11 @@
         {
             return WITH_LOCK(m_mutex, return m_workers.size());
         }
    +
    +    bool IsRunning() const noexcept
    +    {
    +        return m_running.load(std::memory_order_acquire);
    +    }
     };
     
     #endif // BITCOIN_UTIL_THREADPOOL_H
    

    (this is not needed here because we are disconnecting the http callback prior to interrupting the pool but it seems useful in general).


    furszy commented at 7:46 PM on October 28, 2025:

    I don't think we should have a method that would cause the node to crash if Start is called more than once. I disagree and think just calling Stop at the beginning of Start would be safer than the current implementation.

    Wouldn't you agree that if your code calls Start() twice, you have a design issue? Thread pools are typically started once, maintained for the entire app’s lifecycle, and reused across modules. Mainly because creating and destroying threads isn’t cheap. I think it’s fair to expect an exception in that case, as it would indicate poorly structured code.

    I also don’t think anyone would expect Start() to wait for all existing threads to shut down and then spawn new ones. That would be a surprising behavior to me. Why would you spawn new ones if you already have threads available?


    andrewtoth commented at 7:48 PM on October 28, 2025:

    So why have Start and Stop then? Just start threads in constructor and join in destructor? Use Interrupt to stop new tasks from being added. This follows RAII principles.


    furszy commented at 8:11 PM on October 28, 2025:

    So why have Start and Stop then? Just start threads in constructor and join in destructor? Use Interrupt to stop new tasks from being added. This follows RAII principles.

    Good question. We could easily build an RAII wrapper on top of this. The approaches aren’t conflicting; it would just be an additional layer over the current code. Yet, I think the current design smoothly integrates with our existing workflows. We already use the start, interrupt, and stop terms throughout the codebase (check init.cpp and all our components), so following that general convention makes it easier to reason about these components globally. Even without deep knowledge of the http server workflow, anyone familiar with this convention can quickly hook up new functionality.

    Also, I suspect we’ll find opportunities for improvements (or smarter shutdown behaviors) along the way that might require customizing Stop() with some inputs, so might be better not to enforce RAII too early.


    furszy commented at 9:29 PM on October 28, 2025:

    Follow-up to this convo after some great back-and-forth via DM with Andrew. Usually, RAII works best when we don’t care exactly when an object gets destroyed. In this case, though, we actually do care about that moment (or, in our terms, when it’s stopped), since it marks the point where there are no pending tasks and it’s safe to tear down the backend handlers. If we don’t enforce that explicitly, the shutdown procedure could continue while workers are still active, leading them to access null pointers, etc. This is something we can’t guarantee if we just let the object be destroyed whenever the program releases it using RAII. So even using RAII, we would be forced to destroy the object manually anyway. Which kinda defeats the purpose of it.


    l0rinc commented at 3:37 PM on October 30, 2025:

    I was also wondering what this was meant to so, especially since there isn't any test coverage for it.

  14. in src/test/threadpool_tests.cpp:111 in c219b93c3b outdated
     106 | +        const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT-1, /*context=*/"test2 blocking tasks enabled");
     107 | +
     108 | +        // Now execute tasks on the single available worker
     109 | +        // and check that all the tasks are executed.
     110 | +        int num_tasks = 15;
     111 | +        std::atomic<int> counter = 0;
    


    pinheadmz commented at 2:58 PM on October 28, 2025:

    c219b93c3b043de202bdf3c65b433fd17af2da89

    I was wondering if there's any way to assert that the unblocked tasks are all executing on one single remaining worker... would it be insane to use a non-atomic int here?


    furszy commented at 3:43 PM on October 29, 2025:

    c219b93

    I was wondering if there's any way to assert that the unblocked tasks are all executing on one single remaining worker... would it be insane to use a non-atomic int here?

    hmm, even if the non-atomic int works, that doesn't really guarantee that the increment was done in the same thread. If we want to be 100% correct, we should store the ids of the threads that processed the tasks on a synchronized structure. Then check that only one id is there.


    andrewtoth commented at 3:57 PM on October 29, 2025:

    thread sanitizer would likely pick up on it though if you made it non-atomic.


    furszy commented at 6:37 PM on October 29, 2025:

    true. Closing this.


    andrewtoth commented at 6:44 PM on October 29, 2025:

    I mean pick up that it's being written by multiple threads if something breaks the assumptions in this test. It seems like an ok idea. Or am I wrong?


    furszy commented at 7:05 PM on October 29, 2025:

    I mean pick up that it's being written by multiple threads if something breaks the assumptions in this test. It seems like an ok idea. Or am I wrong?

    I don't think there is any broken assumption. BlockWorkers() ensure that all threads except one are blocked. We could go further and check that all tasks submitted post the BlockWorkers call are executed by the same thread too, but that seems to be an overkill to me. We are basically ensuring that in another way.


    andrewtoth commented at 7:46 PM on October 29, 2025:

    Sorry for being unclear. I don't think there is any broken assumption either. However, if we made this a non-atomic as suggested, then if something in the future were to break this assumption and run these tasks on multiple threads the unit test would break. Thus it would act as a regression test.


    furszy commented at 8:28 PM on October 29, 2025:

    Sorry for being unclear. I don't think there is any broken assumption either. However, if we made this a non-atomic as suggested, then if something in the future were to break this assumption and run these tasks on multiple threads the unit test would break. Thus it would act as a regression test.

    But as you said, the thread sanitizer would likely complain about that. We’re accessing the same variable from different threads (main and worker). The main issue, however, will probably be related to thread visibility; changes made in one thread aren’t guaranteed to be visible to another. So that approach seems fragile to me.

    Also, going back to my initial comment: even if we switch to a non-atomic variable, the test could still pass. If we want it to be fully deterministic, we should store the IDs of the threads that processed the tasks in a synchronized structure, and then check that only one ID is present at the end. That seems to be the most secure approach to me.

    I could push this if have to re-touch. But it doesn't seem to be a blocking feature (at least to me). We do ensure all threads except one are blocked in the test, and we fail if not.


    andrewtoth commented at 10:16 PM on October 29, 2025:

    But as you said, the thread sanitizer would likely complain about that. We’re accessing the same variable from different threads (main and worker).

    It won't complain if we synchronize access to the same non-atomic variable, which we do here because we lock when calling Submit.

    The main issue, however, will probably be related to thread visibility; changes made in one thread aren’t guaranteed to be visible to another.

    I don't think this is the main issue. If a future change breaks the ThreadPool such that in this test more than 1 thread will write to this non-atomic variable, then the thread sanitizer will cause the test to fail due to a data race.


    furszy commented at 1:23 AM on October 30, 2025:

    Pushed.

  15. in src/test/threadpool_tests.cpp:202 in c219b93c3b outdated
     197 | +        for (int i = 0; i < num_tasks; i++) {
     198 | +            threadPool.Submit([&counter]() {
     199 | +                counter.fetch_add(1);
     200 | +            });
     201 | +        }
     202 | +        std::this_thread::sleep_for(std::chrono::milliseconds{100});
    


    pinheadmz commented at 3:11 PM on October 28, 2025:

    c219b93c3b043de202bdf3c65b433fd17af2da89

    As mentioned before, not a blocker but i feel like sleeps like this are test-flakiness waiting to happen ...


    furszy commented at 3:49 PM on October 29, 2025:

    c219b93

    As mentioned before, not a blocker but i feel like sleeps like this are test-flakiness waiting to happen ...

    This is one of those "wait and see if something happens" scenarios (if any task gets processed). We expect no activity here since all workers are busy. I'm not sure there is another way of doing this, but if it fails (even if it is not deterministic), that’s still a useful signal because it means something unexpected happened.

  16. in src/test/threadpool_tests.cpp:209 in c219b93c3b outdated
     204 | +
     205 | +        // Now process manually
     206 | +        for (int i = 0; i < num_tasks; i++) {
     207 | +            threadPool.ProcessTask();
     208 | +        }
     209 | +        BOOST_CHECK_EQUAL(counter.load(), num_tasks);
    


    pinheadmz commented at 3:13 PM on October 28, 2025:

    c219b93c3b043de202bdf3c65b433fd17af2da89

    belt-and-suspenders, could BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0); I just like the symmetry in a test since you assert the value is 20 before taking action.


    furszy commented at 3:55 PM on October 29, 2025:

    Sounds good. Done as suggested. Thanks

  17. in src/test/fuzz/threadpool.cpp:28 in bb71b0e3bf
      23 | +};
      24 | +
      25 | +struct CounterTask {
      26 | +    std::atomic_uint32_t& m_counter;
      27 | +    explicit CounterTask(std::atomic_uint32_t& counter) : m_counter{counter} {}
      28 | +    void operator()() const { m_counter.fetch_add(1); }
    


    andrewtoth commented at 5:26 PM on October 28, 2025:

    Maybe we should relax this atomic operation so that it doesn't hide any issues by synchronizing them for us.

        void operator()() const { m_counter.fetch_add(1, std::memory_order_relaxed); }
    

    furszy commented at 7:09 PM on October 28, 2025:

    Sure, pushed. Thanks!

  18. furszy force-pushed on Oct 28, 2025
  19. ismaelsadeeq commented at 7:29 PM on October 28, 2025: member

    Concept ACK, thanks for the seperation from the initial index PR.

  20. pinheadmz approved
  21. pinheadmz commented at 3:28 PM on October 29, 2025: member

    ACK 195a96258f970c384ce180d57e73616904ef5fa1

    Built and tested on macos/arm64 and debian/x86. Reviewed each commit and left a few comments.

    I also tested the branch against other popular software that consumes the HTTP interface by running their CI:

    The http worker threads are clearly labeled and visible in htop!

    <img width="559" height="304" alt="Image" src="https://github.com/user-attachments/assets/d27aedd8-0ac1-4659-8fa9-6389e49b0b8f" />

    <details><summary>Show Signature</summary>

    -----BEGIN PGP SIGNED MESSAGE-----
    Hash: SHA256
    
    ACK 195a96258f970c384ce180d57e73616904ef5fa1
    -----BEGIN PGP SIGNATURE-----
    
    iQIzBAEBCAAdFiEE5hdzzW4BBA4vG9eM5+KYS2KJyToFAmkCMdwACgkQ5+KYS2KJ
    yTriXA//RRnzezUHdmzRlKmoSDA+ZBHz0RY3z2LGk63izb/YdYaJY9JBZL2Y9BA8
    K2nyexdSDC/DFOm4H56ddEe6ChlB7w+uZc92SgFSLSvavInpZ80KEJRk07vgoIL7
    hwuyyevWyOOU32iz1NE3q316TMaJmzVsPhRGwbdmTXNwJLtUX6g4czfh28ajW1DC
    Y9ULKwT36rFHRcKwC1YZYuBJUNBZWQgVBcydcmS1UEykY4mBnCW9knrATwn/29b7
    2AYPV+yuaiy9OpDEOJ8iKtZOPGR36NrIUleMUqruq4Sy2/TnJtm3AKNK0336/Fxu
    MqVyPKSyusg7kBA6f2h/2+NHpbyLoboYhjZew+HvED/aFfi+Jla+nxybkUYXfciL
    pzbND22TTuRGB1fKU7AwPD0TO9JwOTU385iEdpoGq8rbT3EpgPr31N4TeDQDJx5t
    jPzzWZYj43JMuIc3bm/K5S2HYSdFZUEDQC81kbND+jOLF6YkJwS9794anLO38tvi
    fip/eLK8Nw4pmWnW63/9lc+Y/1gLpgLDMxxhA1NKJyytk7z2IRo7vJKck6TqAjcZ
    nU8Wv9/ful5ndDJfLIKuYT9jqRk9ORohVwv+P+ppuO8jhFjhuswxPlFJKkMXTeZn
    hGi8QCrAUuibvVuLfLKVExJqSmmeUAkTVKp5ZTWLwNB7IkZrSoo=
    =hZYm
    -----END PGP SIGNATURE-----
    

    pinheadmz's public key is on openpgp.org

    </details>

  22. DrahtBot requested review from ismaelsadeeq on Oct 29, 2025
  23. DrahtBot requested review from sedited on Oct 29, 2025
  24. furszy force-pushed on Oct 29, 2025
  25. furszy commented at 3:56 PM on October 29, 2025: member

    Oh, awesome extensive test and review @pinheadmz! you rock!

  26. pinheadmz approved
  27. pinheadmz commented at 6:20 PM on October 29, 2025: member

    ACK f5eca8d5252a68f0fbc8b5efec021c75744555b6

    minor changes since last ack

    <details><summary>Show Signature</summary>

    -----BEGIN PGP SIGNED MESSAGE-----
    Hash: SHA256
    
    ACK f5eca8d5252a68f0fbc8b5efec021c75744555b6
    -----BEGIN PGP SIGNATURE-----
    
    iQIzBAEBCAAdFiEE5hdzzW4BBA4vG9eM5+KYS2KJyToFAmkCWuYACgkQ5+KYS2KJ
    yTpb9RAAgxNlDQlqkVfnIK2Xt8BQmRsiG+1KHQGJA2d3RHoxjFg2129fj47y5i+t
    GlatlcsXEcibI2D0F22sKSzY1u0LWy4GYLI1d12JAIewA99n/lRr1ktDk3v64pp8
    kldvYpd8UBs7DCHSJhPs4HbOPgwIILPASdSbQTb+6m48X5jg+cu+yMBuANVq3sbB
    9I8rUlbUgRva5voy3EGRkXsGTuwcaCoLDNHnnjrqQkJMYTym47rMF/xTZJJ11isF
    QrpzFu+P2tFy1oj0bGd4e5EhqNk++qfRCuw9sGLgLL6YEHzC/ihVH/L5NAxoeJX4
    L0yX09BG5wDrbUQvZn4w1uaQz39Uor5mb8+tZyDUyRmO9MrG5AHomUtfdbiYLwSO
    007bLD+WX4Hxode47xCdhUhqflXqbHD2mhkf5ESyUgGu3smSHSQQosuzIJiVmTn9
    b2UJG6see/tckFvart6YLn1AIu9uCyUMmB+hZIcSasZv95oEHv1YB3E5dcKUmq0d
    A1cUHIhNvU4i/hNy2xgijgSjDdpVQMFbLYUt9y4ZnzpJXFd7mnbpHVUVM1P+Onvp
    ScAtgvosXfbd5PjnAQWsMWgSmVHUtSc4t9GPjGlRYaVpHFkEbqWpWeKrYqMkU3id
    9YiFP9BueWNbNV7OBlB0LcfCpwO2WbvfJOW93/jxDovc0tgohbU=
    =c8aD
    -----END PGP SIGNATURE-----
    

    pinheadmz's public key is on openpgp.org

    </details>

  28. sedited approved
  29. sedited commented at 9:26 PM on October 29, 2025: contributor

    ACK f5eca8d5252a68f0fbc8b5efec021c75744555b6

    I have fuzzed the thread pool for some time again, and did not find any new issues. Also the previously observed memory leak during fuzzing was not observed anymore. The changes to the http workers look sane to me.

    Can you run the commits through clang-format-diff.py? There are a bunch of formatting issues in the first two commits.

  30. furszy force-pushed on Oct 30, 2025
  31. furszy commented at 1:22 AM on October 30, 2025: member

    Can you run the commits through clang-format-diff.py? There are a bunch of formatting issues in the first two commits.

    Pushed.

  32. in src/test/threadpool_tests.cpp:12 in e1eb4cd3a5 outdated
       7 | +
       8 | +#include <boost/test/unit_test.hpp>
       9 | +
      10 | +BOOST_AUTO_TEST_SUITE(threadpool_tests)
      11 | +
      12 | +constexpr auto TIMEOUT_SECS = std::chrono::seconds(120);
    


    l0rinc commented at 10:17 AM on October 30, 2025:

    nit: we don't need to include the type here:

    constexpr auto TIMEOUT{std::chrono::seconds(120)};
    

    #26966 (review)


    furszy commented at 2:58 PM on November 17, 2025:

    renamed.

  33. in src/util/threadpool.h:114 in e1eb4cd3a5 outdated
     109 | +        m_interrupt = false; // Reset
     110 | +
     111 | +        // Create workers
     112 | +        m_workers.reserve(num_workers);
     113 | +        for (int i = 0; i < num_workers; i++) {
     114 | +            m_workers.emplace_back(&util::TraceThread, m_name + "_pool_" + util::ToString(i), [this] { WorkerThread(); });
    


    l0rinc commented at 10:29 AM on October 30, 2025:

    nit: we could use strprintf in a few more places:

                m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
    

    furszy commented at 3:02 PM on November 17, 2025:

    I'm not sure about including extra dependencies on a low-level class for tiny readability improvements. But I'm not really opposed to this one, so done as suggested.

  34. in src/util/threadpool.h:135 in e1eb4cd3a5 outdated
     130 | +        {
     131 | +            LOCK(m_mutex);
     132 | +            m_interrupt = true;
     133 | +            threads_to_join.swap(m_workers);
     134 | +        }
     135 | +        m_cv.notify_all();
    


    l0rinc commented at 10:33 AM on October 30, 2025:

    What's the reason for locking and notifying manually instead of using higher-level primitives - have you tried these with C++20 https://en.cppreference.com/w/cpp/thread/barrier.html instead?


    furszy commented at 8:43 PM on October 30, 2025:

    That is part of the possible future experimentations written in the PR description. Goal is to keep the same synchronization mechanisms we had in the http server code.


    l0rinc commented at 1:22 PM on October 31, 2025:

    Goal is to keep the same synchronization mechanisms we had in the http server code

    What is the reason for that, isn't the goal to have a reusable ThreadPool?

  35. in src/util/threadpool.h:129 in e1eb4cd3a5 outdated
     124 | +     * Must be called from a controller (non-worker) thread.
     125 | +     */
     126 | +    void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     127 | +    {
     128 | +        // Notify workers and join them.
     129 | +        std::vector<std::thread> threads_to_join;
    


    l0rinc commented at 10:33 AM on October 30, 2025:

    I'm not sure this is necessarry

  36. in src/util/threadpool.h:66 in e1eb4cd3a5 outdated
      61 | +        WAIT_LOCK(m_mutex, wait_lock);
      62 | +        for (;;) {
      63 | +            std::packaged_task<void()> task;
      64 | +            {
      65 | +                // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
      66 | +                if (!m_interrupt && m_work_queue.empty()) {
    


    l0rinc commented at 10:41 AM on October 30, 2025:

    do we really need this extra complexity here?


    furszy commented at 8:31 PM on October 30, 2025:

    Yes, we do. The reason is explained there. All thread busy, a new task arrives so notifications goes unnoticed, then threads finish processing and sleep. Loosing the new task wake up notification, not running the new task and sleeping until a new one gets submitted. There is a test exercising this exact behavior.


    l0rinc commented at 1:31 PM on October 31, 2025:

    Can you point me to the test that fails please? I have removed it, ran the tests added in this PR and they all passed.

  37. in src/util/threadpool.h:71 in e1eb4cd3a5 outdated
      66 | +                if (!m_interrupt && m_work_queue.empty()) {
      67 | +                    // Block until the pool is interrupted or a task is available.
      68 | +                    m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
      69 | +                }
      70 | +
      71 | +                // If stopped and no work left, exit worker
    


    l0rinc commented at 10:41 AM on October 30, 2025:

    the comment mirrors the code, it doesn't really add anything that the code doesn't already say (especially since the comment above already stated the same)

  38. in src/test/threadpool_tests.cpp:89 in e1eb4cd3a5 outdated
      84 | +
      85 | +        // Store futures to ensure completion before checking counter.
      86 | +        std::vector<std::future<void>> futures;
      87 | +        futures.reserve(num_tasks);
      88 | +
      89 | +        for (int i = 1; i <= num_tasks; i++) {
    


    l0rinc commented at 10:48 AM on October 30, 2025:

    nit: for consistency

            for (size_t i{1}; i <= num_tasks; ++i) {
    

    furszy commented at 8:49 PM on October 30, 2025:

    num_tasks is an int. That would require casting it to size_t.


    l0rinc commented at 1:32 PM on October 31, 2025:

    Please see my suggestions above, it's a size_t there

  39. in src/test/threadpool_tests.cpp:206 in e1eb4cd3a5 outdated
     201 | +            threadPool.Submit([&counter]() {
     202 | +                counter.fetch_add(1);
     203 | +            });
     204 | +        }
     205 | +        std::this_thread::sleep_for(std::chrono::milliseconds{100});
     206 | +        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 20);
    


    l0rinc commented at 10:51 AM on October 30, 2025:
            BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
    
  40. in src/test/threadpool_tests.cpp:75 in e1eb4cd3a5 outdated
      70 | +            threadPool.Submit([]() { return false; });
      71 | +        } catch (const std::runtime_error&) {
      72 | +            err = true;
      73 | +        }
      74 | +        BOOST_CHECK(err);
      75 | +    }
    


    l0rinc commented at 10:58 AM on October 30, 2025:

    I like how thoroughly we're testing the functionality here, that helps a lot with being able to trust this complex code.

    I wanted to debug these one-by-one and I don't want to run all other tests before - and it's a bit awkward to have so many tests in a single place when they're admittedly separate. Could you please split them out into separate units, such as:

    BOOST_AUTO_TEST_CASE(submit_to_non_started_pool_throws)
    {
        ThreadPool threadPool{"not_started"};
        BOOST_CHECK_EXCEPTION(threadPool.Submit([]{ return false; }), std::runtime_error, HasReason{"No active workers"});
    }
    

    furszy commented at 3:03 PM on November 17, 2025:

    Done as suggested.

  41. in src/test/threadpool_tests.cpp:51 in e1eb4cd3a5 outdated
      46 | +    return blocking_tasks;
      47 | +}
      48 | +
      49 | +BOOST_AUTO_TEST_CASE(threadpool_basic)
      50 | +{
      51 | +    // Test Cases
    


    l0rinc commented at 11:01 AM on October 30, 2025:

    As mentioned before, we're having a single big-bang test case here (called "threadpool_basic", but it contains everything we have), announcing the parts in comments followed by duplicating each comment. Can we split them out to focused, self-documenting test cases, something like:

    BOOST_AUTO_TEST_CASE(submit_to_non_started_pool_throws)
    BOOST_AUTO_TEST_CASE(submit_and_verify_completion)
    BOOST_AUTO_TEST_CASE(single_available_worker_processes_all_tasks)
    BOOST_AUTO_TEST_CASE(future_wait_blocks_until_task_completes)
    BOOST_AUTO_TEST_CASE(future_get_returns_task_result)
    BOOST_AUTO_TEST_CASE(task_exception_propagated_to_future)
    BOOST_AUTO_TEST_CASE(process_task_manually_when_workers_busy)
    BOOST_AUTO_TEST_CASE(recursive_task_submission)
    BOOST_AUTO_TEST_CASE(stop_completes_queued_tasks_gracefully)
    

    That would help reviewers experiment and have exact test faulures, we could replace a lot of dead comments with code and provide better structure for the testing efforts.


    furszy commented at 3:03 PM on November 17, 2025:

    Done.

  42. in src/test/threadpool_tests.cpp:83 in e1eb4cd3a5 outdated
      78 | +    {
      79 | +        int num_tasks = 50;
      80 | +
      81 | +        ThreadPool threadPool(POOL_NAME);
      82 | +        threadPool.Start(NUM_WORKERS_DEFAULT);
      83 | +        std::atomic<int> counter = 0;
    


    l0rinc commented at 11:10 AM on October 30, 2025:

    nits: std::atomic_int and its brothers seem more focused (and brace init helps with many narrowing problems) - we're using these in the fuzzing commit as well:

            std::atomic_size_t counter{0};
    
  43. in src/test/threadpool_tests.cpp:90 in e1eb4cd3a5 outdated
      85 | +        // Store futures to ensure completion before checking counter.
      86 | +        std::vector<std::future<void>> futures;
      87 | +        futures.reserve(num_tasks);
      88 | +
      89 | +        for (int i = 1; i <= num_tasks; i++) {
      90 | +            futures.emplace_back(threadPool.Submit([&counter, i]() {
    


    l0rinc commented at 11:11 AM on October 30, 2025:

    nit:

                futures.emplace_back(threadPool.Submit([&counter, i] {
    
  44. in src/test/threadpool_tests.cpp:96 in e1eb4cd3a5 outdated
      91 | +                counter.fetch_add(i);
      92 | +            }));
      93 | +        }
      94 | +
      95 | +        // Wait for all tasks to finish
      96 | +        WaitFor(futures, /*context=*/"test1 task");
    


    l0rinc commented at 11:13 AM on October 30, 2025:

    I'm not yet sure I understand why the test framework needs a wait method, that sounds like something the thread pool should provide. This is why I mentioned in #26966 (review) that the problem has to come before the solution, because the reviewers have to check out the follow-up comments to see if this is representative of actual usage or not.

  45. in src/test/threadpool_tests.cpp:95 in e1eb4cd3a5 outdated
      90 | +            futures.emplace_back(threadPool.Submit([&counter, i]() {
      91 | +                counter.fetch_add(i);
      92 | +            }));
      93 | +        }
      94 | +
      95 | +        // Wait for all tasks to finish
    


    l0rinc commented at 11:13 AM on October 30, 2025:

    comment is redundant, the method name and context already state all of that

  46. in src/util/threadpool.h:106 in e1eb4cd3a5 outdated
     101 | +     *
     102 | +     * Must be called from a controller (non-worker) thread.
     103 | +     */
     104 | +    void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     105 | +    {
     106 | +        assert(num_workers > 0);
    


    l0rinc commented at 11:17 AM on October 30, 2025:

    Can we narrow the type in that case to at least disallow negative values?

        void Start(size_t num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
        {
            assert(num_workers > 0);
    

    furszy commented at 9:01 PM on October 30, 2025:

    This decision was on purpose. To be able to assert and abort the program if a negative number is provided. A negative integer to size_t conversion would be bad.


    l0rinc commented at 1:34 PM on October 31, 2025:

    Where would we be getting negative numbers from? If we care about the value being in a certain range, we could validate that instead:

            assert(num_workers > 0 && num_workers <= MAX_WORKER_COUNT);
    
  47. in src/test/threadpool_tests.cpp:91 in e1eb4cd3a5 outdated
      86 | +        std::vector<std::future<void>> futures;
      87 | +        futures.reserve(num_tasks);
      88 | +
      89 | +        for (int i = 1; i <= num_tasks; i++) {
      90 | +            futures.emplace_back(threadPool.Submit([&counter, i]() {
      91 | +                counter.fetch_add(i);
    


    l0rinc commented at 11:23 AM on October 30, 2025:

    Since this isn't a synchronization operations, the default is memory_order_seq_cst seems too strong here and it might hide the data race by providing unintended synchronization:

                    counter.fetch_add(i, std::memory_order_relaxed);
    

    I have created a godbolt reproducer to understand it better, see: https://godbolt.org/z/o87dsq1fG I couldn't find any real difference on x86 cpus, but ARM ones show the difference (__aarch64_ldadd4_acq_rel vs __aarch64_ldadd4_relax)


    furszy commented at 3:04 PM on November 17, 2025:

    Done as suggested.


    l0rinc commented at 4:03 PM on November 17, 2025:

    Can you please apply this to the other cases as well?


    furszy commented at 9:29 PM on November 18, 2025:

    Done.


    l0rinc commented at 12:06 PM on November 19, 2025:

    Thanks. Currently all fetch_add calls use memory_order_relaxed, which makes sense since we don't want the counter itself to act as a barrier for other data, but the corresponding loads still use the default std::memory_order_seq_cst. Since external synchronization (e.g. via WAIT_FOR calls) already provide happens-before relationships, I don't think the memory ordering of load operations should be sequential. I don't think it's a bug, but would likely be more consistent if we used the same memory_order_relaxed on the loads as well. To avoid accidental synchronization, can we use memory_order_relaxed throughout (for fetch_add & load)? What do you think?

  48. in src/test/threadpool_tests.cpp:77 in e1eb4cd3a5 outdated
      72 | +            err = true;
      73 | +        }
      74 | +        BOOST_CHECK(err);
      75 | +    }
      76 | +
      77 | +    // Test case 1, submit tasks and verify completion.
    


    l0rinc commented at 12:12 PM on October 30, 2025:

    I'm usually coding along while reviewing, to simplify applying the suggestions that you like, I'm posting the final results as well:

    BOOST_AUTO_TEST_CASE(submit_and_verify_completion)
    {
        ThreadPool threadPool{"completion"};
        threadPool.Start(NUM_WORKERS_DEFAULT);
    
        const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
        std::atomic_size_t counter{0};
    
        std::vector<std::future<void>> futures(num_tasks);
        for (size_t i{0}; i < num_tasks; ++i) {
            futures[i] = threadPool.Submit([&counter, i] { counter.fetch_add(i, std::memory_order_relaxed); });
        }
    
        WaitFor(futures);
        BOOST_CHECK_EQUAL(counter.load(std::memory_order_relaxed), (num_tasks - 1) * num_tasks / 2);
        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    }
    
  49. in src/test/threadpool_tests.cpp:79 in e1eb4cd3a5 outdated
      74 | +        BOOST_CHECK(err);
      75 | +    }
      76 | +
      77 | +    // Test case 1, submit tasks and verify completion.
      78 | +    {
      79 | +        int num_tasks = 50;
    


    l0rinc commented at 12:13 PM on October 30, 2025:

    we're iterating forward, seems simpler to use an unsigned here

            constexpr size_t num_tasks{50};
    

    and to add some variance to the tests we could vary these randomly:

            const auto num_tasks{1 + m_rng.randrange<size_t>(50)};
    

    note that this will need:

    BOOST_FIXTURE_TEST_SUITE(threadpool_tests, BasicTestingSetup)
    
  50. in src/test/threadpool_tests.cpp:26 in e1eb4cd3a5 outdated
      21 | +    }
      22 | +}
      23 | +
      24 | +// Block a number of worker threads by submitting tasks that wait on `blocker_future`.
      25 | +// Returns the futures of the blocking tasks, ensuring all have started and are waiting.
      26 | +std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::shared_future<void>& blocker_future, int num_of_threads_to_block, const std::string& context)
    


    l0rinc commented at 12:32 PM on October 30, 2025:

    ThreadPool::WorkersCount() returns size_t, if we restricted num_of_threads_to_block to size_t as well, we could assert here:

        assert(threadPool.WorkersCount() >= num_of_threads_to_block);
    
  51. in src/test/threadpool_tests.cpp:22 in e1eb4cd3a5 outdated
      17 | +    for (size_t i = 0; i < futures.size(); ++i) {
      18 | +        if (futures[i].wait_for(TIMEOUT_SECS) != std::future_status::ready) {
      19 | +            throw std::runtime_error("Timeout waiting for: " + context + ", task index " + util::ToString(i));
      20 | +        }
      21 | +    }
      22 | +}
    


    l0rinc commented at 12:55 PM on October 30, 2025:

    As mentioned before, I think these could be simplified to std::span, std::string_view and strprintf. And if we extract each test to a separate unit (as suggested below), we won't need the context parameter anymore. The thread index also doesn't really help in case of the failure, which would enable us to simplify this. And lastly, since BlockWorkers is already a std::vector<std::future<void>> we don't actually need the template here:

    void WaitFor(std::span<const std::future<void>> futures)
    {
        for (const auto& f : futures) {
            BOOST_REQUIRE(f.wait_for(TIMEOUT) == std::future_status::ready);
        }
    }
    

    furszy commented at 3:09 PM on November 17, 2025:

    we would probably still need the context string to differentiate between the initial BlockWorkers() calls vs the final result WaitFor. Should re-check if BOOST_REQUIRE prints the stack trace; most likely it doesn't and that was the reason behind my past-self added the context string.


    l0rinc commented at 4:25 PM on November 17, 2025:

    The original failure looks like this:

    unknown location:0: fatal error: in "threadpool_tests/submit_tasks_complete_successfully": std::runtime_error: Timeout waiting for: test1 task, task index 0

    test/threadpool_tests.cpp:77: last checkpoint: "submit_tasks_complete_successfully" test entry

    If you want the extra method to not mask the failure stack, we can make it a macro, like we did with e.g. https://github.com/bitcoin/bitcoin/blob/cc5dda1de333cf7aa10e2237ee2c9221f705dbd9/src/test/headers_sync_chainwork_tests.cpp#L22-L42

    Which would look like:

    #define WAIT_FOR(futures)                                                         \
        do {                                                                          \
            for (const auto& f : futures) {                                           \
                BOOST_REQUIRE(f.wait_for(WAIT_TIMEOUT) == std::future_status::ready); \
            }                                                                         \
        } while (0)
    

    and a failure stack would be more helpful:

    test/threadpool_tests.cpp:96: fatal error: in "threadpool_tests/submit_tasks_complete_successfully": critical check f.wait_for(WAIT_TIMEOUT) != std::future_status::ready has failed

    which would point to the exact call site instead of the helper method: https://github.com/bitcoin/bitcoin/blob/41c99a2e3ef4c1cddaaa313a454009a141a6a782/src/test/threadpool_tests.cpp#L96


    furszy commented at 9:30 PM on November 18, 2025:

    Done as suggested.

  52. in src/test/threadpool_tests.cpp:102 in e1eb4cd3a5 outdated
      97 | +        int expected_value = (num_tasks * (num_tasks + 1)) / 2; // Gauss sum.
      98 | +        BOOST_CHECK_EQUAL(counter.load(), expected_value);
      99 | +        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
     100 | +    }
     101 | +
     102 | +    // Test case 2, maintain all threads busy except one.
    


    l0rinc commented at 1:05 PM on October 30, 2025:

    we could generalize this by counting from 1 to <NUM_WORKERS_DEFAULT, something like:

    BOOST_AUTO_TEST_CASE(limited_free_workers_processes_all_task)
    {
        ThreadPool threadPool{"block_counts"};
        threadPool.Start(NUM_WORKERS_DEFAULT);
        const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
    
        for (size_t free{1}; free < NUM_WORKERS_DEFAULT; ++free) {
            BOOST_TEST_MESSAGE("Testing with " << free << " available workers");
            std::counting_semaphore sem{0};
            const auto blocking_tasks{BlockWorkers(threadPool, sem, free)};
    
            size_t counter{0};
            std::vector<std::future<void>> futures(num_tasks);
            for (auto& f : futures) f = threadPool.Submit([&counter] { ++counter; });
    
            WaitFor(futures);
            BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    
            if (free == 1) {
                BOOST_CHECK_EQUAL(counter, num_tasks);
            } else {
                BOOST_CHECK_LE(counter, num_tasks); // unsynchronized update from multiple threads doesn't guarantee consistency
            }
    
            sem.release(free);
            WaitFor(blocking_tasks);
        }
    
        threadPool.Stop();
    }
    
  53. in src/test/threadpool_tests.cpp:62 in e1eb4cd3a5 outdated
      57 | +    // 5) The task throws an exception, catch must be done in the consumer side.
      58 | +    // 6) Busy workers, help them by processing tasks from outside.
      59 | +    // 7) Recursive submission of tasks.
      60 | +    // 8) Submit task when all threads are busy, stop pool and verify the task gets executed.
      61 | +
      62 | +    const int NUM_WORKERS_DEFAULT = 3;
    


    l0rinc commented at 1:37 PM on October 30, 2025:

    I needed at least nproc+1 workers to be able to reliably reproduce concurrency issues (such as the non-atomic counter update)

        const size_t NUM_WORKERS_DEFAULT{size_t(GetNumCores()) + 1}; // we need to make sure there's *some* contention
    

    furszy commented at 3:11 PM on November 17, 2025:

    Instead of doing it for all the test cases, added another case that exercises the contention.

  54. in src/test/threadpool_tests.cpp:123 in e1eb4cd3a5 outdated
     118 | +        futures.reserve(num_tasks);
     119 | +        for (int i = 0; i < num_tasks; i++) {
     120 | +            futures.emplace_back(threadPool.Submit([&counter]() {
     121 | +                counter += 1;
     122 | +            }));
     123 | +        }
    


    l0rinc commented at 1:48 PM on October 30, 2025:

    nit: these setup parts are longer than they need to be, this is just glue code, I find it distracting from the actual test logic:

            std::vector<std::future<void>> futures(num_tasks);
            for (auto& f : futures) f = threadPool.Submit([&counter]{ ++counter; });
    

    furszy commented at 3:12 PM on November 17, 2025:

    Done as suggested.


    l0rinc commented at 4:27 PM on November 17, 2025:

    I still see init + reserve + loop + emplace in most tests, could we apply to the rest as well?

  55. in src/test/threadpool_tests.cpp:143 in e1eb4cd3a5 outdated
     138 | +        std::atomic<bool> flag = false;
     139 | +        std::future<void> future = threadPool.Submit([&flag]() {
     140 | +            std::this_thread::sleep_for(std::chrono::milliseconds{200});
     141 | +            flag.store(true);
     142 | +        });
     143 | +        future.wait();
    


    l0rinc commented at 2:08 PM on October 30, 2025:

    hmm, isn't this why WaitFor was added with a timeout in the first place?

  56. in src/test/threadpool_tests.cpp:140 in e1eb4cd3a5 outdated
     135 | +    {
     136 | +        ThreadPool threadPool(POOL_NAME);
     137 | +        threadPool.Start(NUM_WORKERS_DEFAULT);
     138 | +        std::atomic<bool> flag = false;
     139 | +        std::future<void> future = threadPool.Submit([&flag]() {
     140 | +            std::this_thread::sleep_for(std::chrono::milliseconds{200});
    


    l0rinc commented at 2:10 PM on October 30, 2025:

    maybe we can test more here, each test waiting a different amount (e.g. work index milliseconds) and we could assert at the end that complection takes at least NUM_WORKERS_DEFAULT milliseconds.

    BOOST_AUTO_TEST_CASE(future_wait_blocks_until_task_completes)
    {
        ThreadPool threadPool{"wait_test"};
        threadPool.Start(NUM_WORKERS_DEFAULT);
        const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
    
        const auto start{steady_clock::now()};
        std::vector<std::future<void>> futures(num_tasks + 1);
        for (size_t i{0}; i <= num_tasks; ++i) {
            futures[i] = threadPool.Submit([i] { UninterruptibleSleep(milliseconds{i}); });
        }
        WaitFor(futures);
        const size_t elapsed_ms{size_t(duration_cast<milliseconds>(steady_clock::now() - start).count())};
        BOOST_CHECK(elapsed_ms >= num_tasks);
    }
    
  57. in src/test/threadpool_tests.cpp:252 in e1eb4cd3a5 outdated
     247 | +        // At this point, all workers are blocked, and the extra task is queued
     248 | +        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
     249 | +
     250 | +        // Wait a short moment before unblocking the threads to mimic a concurrent shutdown
     251 | +        std::thread thread_unblocker([&blocker]() {
     252 | +            std::this_thread::sleep_for(std::chrono::milliseconds{300});
    


    l0rinc commented at 2:13 PM on October 30, 2025:

    We could likely use UninterruptibleSleep here instead


    furszy commented at 3:12 PM on November 17, 2025:

    Done as suggested.

  58. in src/test/threadpool_tests.cpp:47 in e1eb4cd3a5 outdated
      42 | +    }
      43 | +
      44 | +    // Wait until all threads are actually blocked
      45 | +    WaitFor(ready_futures, context);
      46 | +    return blocking_tasks;
      47 | +}
    


    l0rinc commented at 2:41 PM on October 30, 2025:

    What is the reason for the heavy use of promises? Does it have any advantage compared to C++20 concurrency primitives? This seems to me like a barebones implementation for an std::latch. Can we use that instead?

    This seemed like a std::binary_semaphore to me initially but couldn't make it work with that.

    But we can still use a simple std::counting_semaphore which would avoid the std::promise + std::shared_future + std::vector<std::future<void>>, something like:

    std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::counting_semaphore<>& release_sem, size_t num_of_threads_to_block)
    {
        assert(threadPool.WorkersCount() >= num_of_threads_to_block);
        std::latch ready{std::ptrdiff_t(num_of_threads_to_block)};
    
        std::vector<std::future<void>> blocking_tasks(num_of_threads_to_block);
        for (auto& f : blocking_tasks) f = threadPool.Submit([&] {
            ready.count_down();
            release_sem.acquire();
        });
    
        ready.wait();
        return blocking_tasks;
    }
    

    furszy commented at 3:16 PM on November 17, 2025:

    What is the reason for the heavy use of promises? Does it have any advantage compared to C++20 concurrency primitives?

    I wrote this code 4 years ago. When the project was using C++11 or C++14 (I don't recall which one was).

    This seems to me like a barebones implementation for an std::latch. Can we use that instead?

    Probably. But I don't think it matters much at the end. Will check it. Thanks.

  59. in src/test/threadpool_tests.cpp:147 in e1eb4cd3a5 outdated
     142 | +        });
     143 | +        future.wait();
     144 | +        BOOST_CHECK(flag.load());
     145 | +    }
     146 | +
     147 | +    // Test case 4, obtain result object.
    


    l0rinc commented at 2:47 PM on October 30, 2025:

    we could extract this to something like:

    BOOST_AUTO_TEST_CASE(future_get_returns_task_result)
    {
        ThreadPool threadPool{"result_test"};
        threadPool.Start(NUM_WORKERS_DEFAULT);
        
        BOOST_CHECK_EQUAL(threadPool.Submit([] { return true; }).get(), true);
        BOOST_CHECK_EQUAL(threadPool.Submit([] { return 42; }).get(), 42);
        BOOST_CHECK_EQUAL(threadPool.Submit([] { return std::string{"true"}; }).get(), "true");
    }
    
  60. in src/test/threadpool_tests.cpp:168 in e1eb4cd3a5 outdated
     163 | +    // Test case 5, throw exception and catch it on the consumer side.
     164 | +    {
     165 | +        ThreadPool threadPool(POOL_NAME);
     166 | +        threadPool.Start(NUM_WORKERS_DEFAULT);
     167 | +
     168 | +        int ROUNDS = 5;
    


    l0rinc commented at 2:48 PM on October 30, 2025:

    in other cases we called this num_tasks


    furszy commented at 3:16 PM on November 17, 2025:

    Done as suggested.

  61. in src/test/threadpool_tests.cpp:170 in e1eb4cd3a5 outdated
     165 | +        ThreadPool threadPool(POOL_NAME);
     166 | +        threadPool.Start(NUM_WORKERS_DEFAULT);
     167 | +
     168 | +        int ROUNDS = 5;
     169 | +        std::string err_msg{"something wrong happened"};
     170 | +        std::vector<std::future<void>> futures;
    


    l0rinc commented at 2:52 PM on October 30, 2025:

    do we need a vector here of can we just consume whatever we just submitted?

    BOOST_AUTO_TEST_CASE(task_exception_propagated_to_future)
    {
        ThreadPool threadPool{"exception_test"};
        threadPool.Start(NUM_WORKERS_DEFAULT);
    
        const auto make_err{[&](size_t n) { return strprintf("error on thread #%s", n); }};
    
        const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
        for (size_t i{0}; i < num_tasks; ++i) {
            BOOST_CHECK_EXCEPTION(threadPool.Submit([&] { throw std::runtime_error(make_err(i)); }).get(), std::runtime_error, HasReason{make_err(i)});
        }
    }
    

    furszy commented at 3:18 PM on November 17, 2025:

    do we need a vector here of can we just consume whatever we just submitted?

    Consuming right away would wait for the task to be executed; we want to exercise some concurrency too.


    l0rinc commented at 4:46 PM on November 17, 2025:

    we want to exercise some concurrency too

    You're right, that's better indeed. My suggestion updated to keep the vector

    // Test 5, throw exceptions and catch it on the consumer side
    BOOST_AUTO_TEST_CASE(task_exception_propagates_to_future)
    {
        ThreadPool threadPool("exception_test");
        threadPool.Start(NUM_WORKERS_DEFAULT);
    
        const auto make_err{[&](size_t n) { return strprintf("error on thread #%s", n); }};
    
        const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
        std::vector<std::future<void>> futures(num_tasks);
        for (size_t i{0}; i < num_tasks; ++i) {
            futures[i] = threadPool.Submit([&make_err, i] { throw std::runtime_error(make_err(i)); });
        }
    
        for (size_t i{0}; i < num_tasks; ++i) {
            BOOST_CHECK_EXCEPTION(futures[i].get(), std::runtime_error, HasReason{make_err(i)});
        }
    }
    
  62. in src/test/threadpool_tests.cpp:188 in e1eb4cd3a5 outdated
     183 | +                BOOST_CHECK_EQUAL(e.what(), err_msg + util::ToString(i));
     184 | +            }
     185 | +        }
     186 | +    }
     187 | +
     188 | +    // Test case 6, all workers are busy, help them by processing tasks from outside.
    


    l0rinc commented at 3:34 PM on October 30, 2025:

    We can split out the remaining 3 tests as well:

    
    BOOST_AUTO_TEST_CASE(process_task_manually_when_workers_busy)
    {
        ThreadPool threadPool{"manual_process"};
        threadPool.Start(NUM_WORKERS_DEFAULT);
        const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
    
        std::counting_semaphore sem{0};
        const auto blocking_tasks{BlockWorkers(threadPool, sem, NUM_WORKERS_DEFAULT)};
    
        std::atomic_size_t counter{0};
        std::vector<std::future<void>> futures(num_tasks);
        for (auto& f : futures) f = threadPool.Submit([&counter] { counter.fetch_add(1, std::memory_order_relaxed); });
    
        UninterruptibleSleep(milliseconds{100});
        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
    
        for (size_t i{0}; i < num_tasks; ++i) {
            threadPool.ProcessTask();
        }
    
        BOOST_CHECK_EQUAL(counter.load(), num_tasks);
        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    
        sem.release(NUM_WORKERS_DEFAULT);
        WaitFor(blocking_tasks);
    }
    
    BOOST_AUTO_TEST_CASE(recursive_task_submission)
    {
        ThreadPool threadPool{"recursive"};
        threadPool.Start(NUM_WORKERS_DEFAULT);
    
        std::promise<void> signal;
        threadPool.Submit([&threadPool, &signal] {
            threadPool.Submit([&signal] {
                signal.set_value();
            });
        });
    
        signal.get_future().wait();
    }
    
    BOOST_AUTO_TEST_CASE(stop_completes_queued_tasks_gracefully)
    {
        ThreadPool threadPool{"graceful_stop"};
        threadPool.Start(NUM_WORKERS_DEFAULT);
    
        std::counting_semaphore sem{0};
        const auto blocking_tasks{BlockWorkers(threadPool, sem, NUM_WORKERS_DEFAULT)};
    
        auto future{threadPool.Submit([] { return true; })};
        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
    
        std::thread thread_unblocker{[&sem] {
            std::this_thread::sleep_for(milliseconds{300});
            sem.release(NUM_WORKERS_DEFAULT);
        }};
    
        threadPool.Stop();
    
        BOOST_CHECK(future.get());
        thread_unblocker.join();
        WaitFor(blocking_tasks);
        BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
    }
    

    furszy commented at 3:18 PM on November 17, 2025:

    Splitted.

  63. in src/httpserver.cpp:53 in 435e8b5a55 outdated
      49 | @@ -49,83 +50,6 @@ using common::InvalidPortErrMsg;
      50 |  /** Maximum size of http request (request line + headers) */
      51 |  static const size_t MAX_HEADERS_SIZE = 8192;
      52 |  
      53 | -/** HTTP request work item */
      54 | -class HTTPWorkItem final : public HTTPClosure
    


    l0rinc commented at 3:40 PM on October 30, 2025:

    Is there a way to do this final threadpool migration in smaller steps?


    furszy commented at 3:20 PM on November 17, 2025:

    I guess you wrote this first and then answered yourself in your final comment; which is basically a long version of the PR's "Note 2" description.


    l0rinc commented at 4:55 PM on November 17, 2025:

    Not exactly sure what you mean by that - Note 2 does seem like a good idea to me and it's probably similar to what I meant. It would help the review process to gain more confidence in the implementation if we migrated away in smaller steps, documenting how the tests we add for the old implementation also pass when we switch over to the new implementation.


    furszy commented at 8:54 PM on November 17, 2025:

    I imagine the problem is that you haven't compared the current http code with the ThreadPool code, which is pretty much the same but properly documented and test covered. That's one of the reasons behind the not modernization of the underlying sync mechanism in this PR, and why I added the "Note 2" paragraph as well as wrote #33689 (comment).

    Look at the current WorkQueue:

    void Run() EXCLUSIVE_LOCKS_REQUIRED(!cs)
        {
            while (true) {
                std::unique_ptr<WorkItem> i;
                {
                    WAIT_LOCK(cs, lock);
                    while (running && queue.empty())
                        cond.wait(lock);
                    if (!running && queue.empty())
                        break;
                    i = std::move(queue.front());
                    queue.pop_front();
                }
                (*i)();
            }
        }
    

    And this is the ThreadPool (stripping all comments)

    
    void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
        {
            WAIT_LOCK(m_mutex, wait_lock);
            for (;;) {
                std::packaged_task<void()> task;
                {
                    if (!m_interrupt && m_work_queue.empty()) {
                        m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
                    }
    
                    if (m_interrupt && m_work_queue.empty()) return;
                   
                    task = std::move(m_work_queue.front());
                    m_work_queue.pop();
                }
    
                REVERSE_LOCK(wait_lock, m_mutex);
                task();
            }
        }
    

    In any case, since most reviewers seem ok to proceed with the current approach, I think it’s more productive to keep working on it rather than continue circling around this topic, even if it’s not to your taste.


    l0rinc commented at 5:08 PM on November 19, 2025:

    This is a big change, other reviewers can just review the unified view if they don't want smaller commits, but I currently cannot view this in small steps, so I insist that we should split it into smaller changes. I want to help, that's why I'm spending so much time with the details, I think this is a really risky change, I want to make sure it's correct. Let me know how I can help.

  64. l0rinc changes_requested
  65. l0rinc commented at 4:05 PM on October 30, 2025: contributor

    I have started reviewing this PR but have only finished the first commit (e1eb4cd3a5eb192cd6d9ee5d255688c06ab2089a).

    The depth of tests gives me hope that this transition will be smooth, the tests cover most of the functionality, even a few corner cases I haven't thought of!

    It seemed, however, that I had more to say about that than anticipated. I didn't even get to reviewing the threadpool properly, just most of the tests.

    I know receiving this amount of feedback can be daunting, but we both want this to succeed. I have spent a lot of time meticulously going through the details. I hope you will take it as I meant it: to make absolutely sure this won't cause any problems and that our test suite is rock solid. I will continue the review, but wanted to make sure you're aware of the progress and thought we can synchronize more often this way (pun intended).

    In general I think adding a threadpool can untangle complicated code as such, but we have to find a balance between IO and CPU bound tasks. I found that oversubscription is usually a smaller problem, so we can likely solve the IO/CPU contention problem by creating dedicated ThreadPools for each major task (http, script verification, input fetcher, compaction, etc). This way it won't be a general resource guardian (which can be a ThreadPool's main purpose), just a tool to avoid the overhead of starting/stopping threads.

    As hinted in the original thread, I find the current structure to be harder to follow, I would appreciate if you would consider doing the extraction in smaller steps:

    • first step would be to use the new ThreadPool's structure, but extract the old logic into it;
    • add the tests (unit and fuzz, no need to split it) against the old impl in a separate commit;
    • in a last step swap it out with the new logic, keeping the same structure and tests.

    This way we could debug and understand the old code before we jump into the new one - and the tests would guide us in guaranteeing that the behavior stayed basically the same. I understand that's extra work, but I'm of course willing to help in whatever you need to be able to do a less risky transition.

    I glanced quickly to the actual ThreadPool implementation, looks okay, but I want to investigate as part of my next wave of reviews why we need so much locking here and why we're relying on old primitives here instead of the C++20 concurrency tools (such as latches and barriers) and whether we can create and destroy the pool via RAII instead of manual start/stops.

    Note that I have done an IBD before and after this change to see if everything was still working and I didn't see any regression!

    <details> <summary>here are all the changes I did locally while reviewing the change</summary>

    diff --git a/src/test/threadpool_tests.cpp b/src/test/threadpool_tests.cpp
    index e8200533cd..052784db37 100644
    --- a/src/test/threadpool_tests.cpp
    +++ b/src/test/threadpool_tests.cpp
    @@ -2,270 +2,208 @@
     // Distributed under the MIT software license, see the accompanying
     // file COPYING or http://www.opensource.org/licenses/mit-license.php.
     
    +#include <common/system.h>
    +#include <test/util/setup_common.h>
     #include <util/string.h>
     #include <util/threadpool.h>
    +#include <util/time.h>
     
     #include <boost/test/unit_test.hpp>
     
    -BOOST_AUTO_TEST_SUITE(threadpool_tests)
    +#include <latch>
    +#include <semaphore>
     
    -constexpr auto TIMEOUT_SECS = std::chrono::seconds(120);
    +using namespace std::chrono;
    +constexpr auto TIMEOUT = seconds(120);
     
    -template <typename T>
    -void WaitFor(const std::vector<std::future<T>>& futures, const std::string& context)
    +void WaitFor(std::span<const std::future<void>> futures)
     {
    -    for (size_t i = 0; i < futures.size(); ++i) {
    -        if (futures[i].wait_for(TIMEOUT_SECS) != std::future_status::ready) {
    -            throw std::runtime_error("Timeout waiting for: " + context + ", task index " + util::ToString(i));
    -        }
    +    for (const auto& f : futures) {
    +        BOOST_REQUIRE(f.wait_for(TIMEOUT) == std::future_status::ready);
         }
     }
     
     // Block a number of worker threads by submitting tasks that wait on `blocker_future`.
     // Returns the futures of the blocking tasks, ensuring all have started and are waiting.
    -std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::shared_future<void>& blocker_future, int num_of_threads_to_block, const std::string& context)
    +std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::counting_semaphore<>& release_sem, size_t num_of_threads_to_block)
     {
    -    // Per-thread ready promises to ensure all workers are actually blocked
    -    std::vector<std::promise<void>> ready_promises(num_of_threads_to_block);
    -    std::vector<std::future<void>> ready_futures;
    -    ready_futures.reserve(num_of_threads_to_block);
    -    for (auto& p : ready_promises) ready_futures.emplace_back(p.get_future());
    -
    -    // Fill all workers with blocking tasks
    -    std::vector<std::future<void>> blocking_tasks;
    -    for (int i = 0; i < num_of_threads_to_block; i++) {
    -        std::promise<void>& ready = ready_promises[i];
    -        blocking_tasks.emplace_back(threadPool.Submit([blocker_future, &ready]() {
    -            ready.set_value();
    -            blocker_future.wait();
    -        }));
    -    }
    +    assert(threadPool.WorkersCount() >= num_of_threads_to_block);
    +    std::latch ready{std::ptrdiff_t(num_of_threads_to_block)};
     
    -    // Wait until all threads are actually blocked
    -    WaitFor(ready_futures, context);
    +    std::vector<std::future<void>> blocking_tasks(num_of_threads_to_block);
    +    for (auto& f : blocking_tasks) f = threadPool.Submit([&] {
    +        ready.count_down();
    +        release_sem.acquire();
    +    });
    +
    +    ready.wait();
         return blocking_tasks;
     }
     
    -BOOST_AUTO_TEST_CASE(threadpool_basic)
    +BOOST_FIXTURE_TEST_SUITE(threadpool_tests, BasicTestingSetup)
    +
    +const size_t NUM_WORKERS_DEFAULT{size_t(GetNumCores()) + 1}; // we need to make sure there's *some* contention
    +
    +BOOST_AUTO_TEST_CASE(submit_to_non_started_pool_throws)
     {
    -    // Test Cases
    -    // 0) Submit task to a non-started pool.
    -    // 1) Submit tasks and verify completion.
    -    // 2) Maintain all threads busy except one.
    -    // 3) Wait for work to finish.
    -    // 4) Wait for result object.
    -    // 5) The task throws an exception, catch must be done in the consumer side.
    -    // 6) Busy workers, help them by processing tasks from outside.
    -    // 7) Recursive submission of tasks.
    -    // 8) Submit task when all threads are busy, stop pool and verify the task gets executed.
    -
    -    const int NUM_WORKERS_DEFAULT = 3;
    -    const std::string POOL_NAME = "test";
    -
    -    // Test case 0, submit task to a non-started pool
    -    {
    -        ThreadPool threadPool(POOL_NAME);
    -        bool err = false;
    -        try {
    -            threadPool.Submit([]() { return false; });
    -        } catch (const std::runtime_error&) {
    -            err = true;
    -        }
    -        BOOST_CHECK(err);
    +    ThreadPool threadPool{"not_started"};
    +    BOOST_CHECK_EXCEPTION(threadPool.Submit([] { return 0; }), std::runtime_error, HasReason{"No active workers"});
    +}
    +
    +BOOST_AUTO_TEST_CASE(submit_and_verify_completion)
    +{
    +    ThreadPool threadPool{"completion"};
    +    threadPool.Start(NUM_WORKERS_DEFAULT);
    +
    +    const auto num_tasks{1 + m_rng.randrange<size_t>(50)};
    +    std::atomic_size_t counter{0};
    +
    +    std::vector<std::future<void>> futures(num_tasks);
    +    for (size_t i{0}; i < num_tasks; ++i) {
    +        futures[i] = threadPool.Submit([&counter, i] { counter.fetch_add(i, std::memory_order_relaxed); });
         }
     
    -    // Test case 1, submit tasks and verify completion.
    -    {
    -        int num_tasks = 50;
    +    WaitFor(futures);
    +    BOOST_CHECK_EQUAL(counter.load(std::memory_order_relaxed), (num_tasks - 1) * num_tasks / 2);
    +    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    +}
     
    -        ThreadPool threadPool(POOL_NAME);
    -        threadPool.Start(NUM_WORKERS_DEFAULT);
    -        std::atomic<int> counter = 0;
    +BOOST_AUTO_TEST_CASE(limited_free_workers_processes_all_task)
    +{
    +    ThreadPool threadPool{"block_counts"};
    +    threadPool.Start(NUM_WORKERS_DEFAULT);
    +    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
     
    -        // Store futures to ensure completion before checking counter.
    -        std::vector<std::future<void>> futures;
    -        futures.reserve(num_tasks);
    +    for (size_t free{1}; free < NUM_WORKERS_DEFAULT; ++free) {
    +        BOOST_TEST_MESSAGE("Testing with " << free << " available workers");
    +        std::counting_semaphore sem{0};
    +        const auto blocking_tasks{BlockWorkers(threadPool, sem, free)};
     
    -        for (int i = 1; i <= num_tasks; i++) {
    -            futures.emplace_back(threadPool.Submit([&counter, i]() {
    -                counter.fetch_add(i);
    -            }));
    -        }
    +        size_t counter{0};
    +        std::vector<std::future<void>> futures(num_tasks);
    +        for (auto& f : futures) f = threadPool.Submit([&counter] { ++counter; });
     
    -        // Wait for all tasks to finish
    -        WaitFor(futures, /*context=*/"test1 task");
    -        int expected_value = (num_tasks * (num_tasks + 1)) / 2; // Gauss sum.
    -        BOOST_CHECK_EQUAL(counter.load(), expected_value);
    +        WaitFor(futures);
             BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    -    }
     
    -    // Test case 2, maintain all threads busy except one.
    -    {
    -        ThreadPool threadPool(POOL_NAME);
    -        threadPool.Start(NUM_WORKERS_DEFAULT);
    -        // Single blocking future for all threads
    -        std::promise<void> blocker;
    -        std::shared_future<void> blocker_future(blocker.get_future());
    -        const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT - 1, /*context=*/"test2 blocking tasks enabled");
    -
    -        // Now execute tasks on the single available worker
    -        // and check that all the tasks are executed.
    -        int num_tasks = 15;
    -        int counter = 0;
    -
    -        // Store futures to wait on
    -        std::vector<std::future<void>> futures;
    -        futures.reserve(num_tasks);
    -        for (int i = 0; i < num_tasks; i++) {
    -            futures.emplace_back(threadPool.Submit([&counter]() {
    -                counter += 1;
    -            }));
    +        if (free == 1) {
    +            BOOST_CHECK_EQUAL(counter, num_tasks);
    +        } else {
    +            BOOST_CHECK_LE(counter, num_tasks); // unsynchronized update from multiple threads doesn't guarantee consistency
             }
     
    -        WaitFor(futures, /*context=*/"test2 tasks");
    -        BOOST_CHECK_EQUAL(counter, num_tasks);
    -
    -        blocker.set_value();
    -        WaitFor(blocking_tasks, /*context=*/"test2 blocking tasks disabled");
    -        threadPool.Stop();
    -        BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
    +        sem.release(free);
    +        WaitFor(blocking_tasks);
         }
     
    -    // Test case 3, wait for work to finish.
    -    {
    -        ThreadPool threadPool(POOL_NAME);
    -        threadPool.Start(NUM_WORKERS_DEFAULT);
    -        std::atomic<bool> flag = false;
    -        std::future<void> future = threadPool.Submit([&flag]() {
    -            std::this_thread::sleep_for(std::chrono::milliseconds{200});
    -            flag.store(true);
    -        });
    -        future.wait();
    -        BOOST_CHECK(flag.load());
    -    }
    +    threadPool.Stop();
    +}
     
    -    // Test case 4, obtain result object.
    -    {
    -        ThreadPool threadPool(POOL_NAME);
    -        threadPool.Start(NUM_WORKERS_DEFAULT);
    -        std::future<bool> future_bool = threadPool.Submit([]() {
    -            return true;
    -        });
    -        BOOST_CHECK(future_bool.get());
    +BOOST_AUTO_TEST_CASE(future_wait_blocks_until_task_completes)
    +{
    +    ThreadPool threadPool{"wait_test"};
    +    threadPool.Start(NUM_WORKERS_DEFAULT);
    +    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
     
    -        std::future<std::string> future_str = threadPool.Submit([]() {
    -            return std::string("true");
    -        });
    -        std::string result = future_str.get();
    -        BOOST_CHECK_EQUAL(result, "true");
    +    const auto start{steady_clock::now()};
    +
    +    std::vector<std::future<void>> futures(num_tasks + 1);
    +    for (size_t i{0}; i <= num_tasks; ++i) {
    +        futures[i] = threadPool.Submit([i] { UninterruptibleSleep(milliseconds{i}); });
         }
    +    WaitFor(futures);
     
    -    // Test case 5, throw exception and catch it on the consumer side.
    -    {
    -        ThreadPool threadPool(POOL_NAME);
    -        threadPool.Start(NUM_WORKERS_DEFAULT);
    -
    -        int ROUNDS = 5;
    -        std::string err_msg{"something wrong happened"};
    -        std::vector<std::future<void>> futures;
    -        futures.reserve(ROUNDS);
    -        for (int i = 0; i < ROUNDS; i++) {
    -            futures.emplace_back(threadPool.Submit([err_msg, i]() {
    -                throw std::runtime_error(err_msg + util::ToString(i));
    -            }));
    -        }
    +    const size_t elapsed_ms{size_t(duration_cast<milliseconds>(steady_clock::now() - start).count())};
    +    BOOST_CHECK(elapsed_ms >= num_tasks);
    +}
     
    -        for (int i = 0; i < ROUNDS; i++) {
    -            try {
    -                futures.at(i).get();
    -                BOOST_FAIL("Expected exception not thrown");
    -            } catch (const std::runtime_error& e) {
    -                BOOST_CHECK_EQUAL(e.what(), err_msg + util::ToString(i));
    -            }
    -        }
    -    }
    +BOOST_AUTO_TEST_CASE(future_get_returns_task_result)
    +{
    +    ThreadPool threadPool{"result_test"};
    +    threadPool.Start(NUM_WORKERS_DEFAULT);
     
    -    // Test case 6, all workers are busy, help them by processing tasks from outside.
    -    {
    -        ThreadPool threadPool(POOL_NAME);
    -        threadPool.Start(NUM_WORKERS_DEFAULT);
    -
    -        std::promise<void> blocker;
    -        std::shared_future<void> blocker_future(blocker.get_future());
    -        const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT, /*context=*/"test6 blocking tasks enabled");
    -
    -        // Now submit tasks and check that none of them are executed.
    -        int num_tasks = 20;
    -        std::atomic<int> counter = 0;
    -        for (int i = 0; i < num_tasks; i++) {
    -            threadPool.Submit([&counter]() {
    -                counter.fetch_add(1);
    -            });
    -        }
    -        std::this_thread::sleep_for(std::chrono::milliseconds{100});
    -        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 20);
    +    BOOST_CHECK_EQUAL(threadPool.Submit([] { return true; }).get(), true);
    +    BOOST_CHECK_EQUAL(threadPool.Submit([] { return 42; }).get(), 42);
    +    BOOST_CHECK_EQUAL(threadPool.Submit([] { return std::string{"true"}; }).get(), "true");
    +}
     
    -        // Now process manually
    -        for (int i = 0; i < num_tasks; i++) {
    -            threadPool.ProcessTask();
    -        }
    -        BOOST_CHECK_EQUAL(counter.load(), num_tasks);
    -        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    -        blocker.set_value();
    -        threadPool.Stop();
    -        WaitFor(blocking_tasks, "Failure waiting for test6 blocking task futures");
    +BOOST_AUTO_TEST_CASE(task_exception_propagated_to_future)
    +{
    +    ThreadPool threadPool{"exception_test"};
    +    threadPool.Start(NUM_WORKERS_DEFAULT);
    +
    +    const auto err{[&](size_t n) { return strprintf("error on thread #%s", n); }};
    +
    +    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
    +    for (size_t i{0}; i < num_tasks; ++i) {
    +        BOOST_CHECK_EXCEPTION(threadPool.Submit([&] { throw std::runtime_error(err(i)); }).get(), std::runtime_error, HasReason{err(i)});
         }
    +}
    +
    +BOOST_AUTO_TEST_CASE(process_task_manually_when_workers_busy)
    +{
    +    ThreadPool threadPool{"manual_process"};
    +    threadPool.Start(NUM_WORKERS_DEFAULT);
    +    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
     
    -    // Test case 7, recursive submission of tasks.
    -    {
    -        ThreadPool threadPool(POOL_NAME);
    -        threadPool.Start(NUM_WORKERS_DEFAULT);
    +    std::counting_semaphore sem{0};
    +    const auto blocking_tasks{BlockWorkers(threadPool, sem, NUM_WORKERS_DEFAULT)};
     
    -        std::promise<void> signal;
    -        threadPool.Submit([&]() {
    -            threadPool.Submit([&]() {
    -                signal.set_value();
    -            });
    -        });
    +    std::atomic_size_t counter{0};
    +    std::vector<std::future<void>> futures(num_tasks);
    +    for (auto& f : futures) f = threadPool.Submit([&counter] { counter.fetch_add(1, std::memory_order_relaxed); });
     
    -        signal.get_future().wait();
    -        threadPool.Stop();
    -    }
    +    UninterruptibleSleep(milliseconds{100});
    +    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
     
    -    // Test case 8, submit a task when all threads are busy and then stop the pool.
    -    {
    -        ThreadPool threadPool(POOL_NAME);
    -        threadPool.Start(NUM_WORKERS_DEFAULT);
    +    for (size_t i{0}; i < num_tasks; ++i) {
    +        threadPool.ProcessTask();
    +    }
     
    -        std::promise<void> blocker;
    -        std::shared_future<void> blocker_future(blocker.get_future());
    -        const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT, /*context=*/"test8 blocking tasks enabled");
    +    BOOST_CHECK_EQUAL(counter.load(), num_tasks);
    +    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
     
    -        // Submit an extra task that should execute once a worker is free
    -        std::future<bool> future = threadPool.Submit([]() { return true; });
    +    sem.release(NUM_WORKERS_DEFAULT);
    +    WaitFor(blocking_tasks);
    +}
     
    -        // At this point, all workers are blocked, and the extra task is queued
    -        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
    +BOOST_AUTO_TEST_CASE(recursive_task_submission)
    +{
    +    ThreadPool threadPool{"recursive"};
    +    threadPool.Start(NUM_WORKERS_DEFAULT);
     
    -        // Wait a short moment before unblocking the threads to mimic a concurrent shutdown
    -        std::thread thread_unblocker([&blocker]() {
    -            std::this_thread::sleep_for(std::chrono::milliseconds{300});
    -            blocker.set_value();
    +    std::promise<void> signal;
    +    threadPool.Submit([&threadPool, &signal] {
    +        threadPool.Submit([&signal] {
    +            signal.set_value();
             });
    +    });
     
    -        // Stop the pool while the workers are still blocked
    -        threadPool.Stop();
    +    signal.get_future().wait();
    +}
     
    -        // Expect the submitted task to complete
    -        BOOST_CHECK(future.get());
    -        thread_unblocker.join();
    +BOOST_AUTO_TEST_CASE(stop_completes_queued_tasks_gracefully)
    +{
    +    ThreadPool threadPool{"graceful_stop"};
    +    threadPool.Start(NUM_WORKERS_DEFAULT);
     
    -        // Obviously all the previously blocking tasks should be completed at this point too
    -        WaitFor(blocking_tasks, "Failure waiting for test8 blocking task futures");
    +    std::counting_semaphore sem{0};
    +    const auto blocking_tasks{BlockWorkers(threadPool, sem, NUM_WORKERS_DEFAULT)};
     
    -        // Pool should be stopped and no workers remaining
    -        BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
    -    }
    +    auto future{threadPool.Submit([] { return true; })};
    +    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
    +
    +    std::thread thread_unblocker{[&sem] {
    +        std::this_thread::sleep_for(milliseconds{300});
    +        sem.release(NUM_WORKERS_DEFAULT);
    +    }};
    +
    +    threadPool.Stop();
    +
    +    BOOST_CHECK(future.get());
    +    thread_unblocker.join();
    +    WaitFor(blocking_tasks);
    +    BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
     }
     
     BOOST_AUTO_TEST_SUITE_END()
    diff --git a/src/util/threadpool.h b/src/util/threadpool.h
    index 5d9884086e..c89fda37c2 100644
    --- a/src/util/threadpool.h
    +++ b/src/util/threadpool.h
    @@ -24,6 +24,8 @@
     #include <utility>
     #include <vector>
     
    +#include <tinyformat.h>
    +
     /**
      * [@brief](/bitcoin-bitcoin/contributor/brief/) Fixed-size thread pool for running arbitrary tasks concurrently.
      *
    @@ -62,16 +64,9 @@ private:
             for (;;) {
                 std::packaged_task<void()> task;
                 {
    -                // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
    -                if (!m_interrupt && m_work_queue.empty()) {
    -                    // Block until the pool is interrupted or a task is available.
    -                    m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
    -                }
    -
    -                // If stopped and no work left, exit worker
    -                if (m_interrupt && m_work_queue.empty()) {
    -                    return;
    -                }
    +                // Block until the pool is interrupted or a task is available.
    +                m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
    +                if (m_interrupt && m_work_queue.empty()) return;
     
                     task = std::move(m_work_queue.front());
                     m_work_queue.pop();
    @@ -101,17 +96,16 @@ public:
          *
          * Must be called from a controller (non-worker) thread.
          */
    -    void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    +    void Start(size_t num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
         {
             assert(num_workers > 0);
             LOCK(m_mutex);
             if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
             m_interrupt = false; // Reset
     
    -        // Create workers
             m_workers.reserve(num_workers);
    -        for (int i = 0; i < num_workers; i++) {
    -            m_workers.emplace_back(&util::TraceThread, m_name + "_pool_" + util::ToString(i), [this] { WorkerThread(); });
    +        for (size_t i{0}; i < num_workers; i++) {
    +            m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
             }
         }
     
    @@ -179,12 +173,6 @@ public:
             task();
         }
     
    -    void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    -    {
    -        WITH_LOCK(m_mutex, m_interrupt = true);
    -        m_cv.notify_all();
    -    }
    -
         size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
         {
             return WITH_LOCK(m_mutex, return m_work_queue.size());
    

    </details>

  66. marcofleon commented at 7:03 PM on October 30, 2025: contributor

    I've been playing around with the fuzz test and wanted to update here, even if it's still a bit inconclusive.

    I'm running into non-reproducible timeouts when using fork with libFuzzer. I'm overusing my cores by a lot (fork=25 plus the 3 workers each), and I'm not yet sure if these timeouts would happen eventually when using less cores.

    I have "fixed" this by changing notify_one in Submit to notify_all, although I can't say I'm exactly sure why this works. It almost seems like some of the workers aren't being woken up at all when only one is being notified. Maybe the same one somehow ends up being reused over and over, so it gets stuck waiting? But then with notify_all all idle workers are woken up with every task, so the thread pool is able to handle the cpu overload more effectively.

    edit: This should be reproducible in the sense that if you run with fork close to the number of total cores on your machine and set -timeout=30 you should get timeouts in less than an hour of fuzzing.

  67. in src/util/threadpool.h:102 in e1eb4cd3a5 outdated
      97 | +     * @brief Start worker threads.
      98 | +     *
      99 | +     * Creates and launches `num_workers` threads that begin executing tasks
     100 | +     * from the queue. If the pool is already started, throws.
     101 | +     *
     102 | +     * Must be called from a controller (non-worker) thread.
    


    Eunovo commented at 10:55 AM on October 31, 2025:

    laanwj commented at 8:37 AM on November 11, 2025:

    i remember that in the past we had tons of issues with thread-local variables. They've been a pain in the ass on some platforms, and decided to not use them except for one case. Not sure if this is still the case, but if not, this needs to be updated: https://github.com/bitcoin/bitcoin/blob/master/src/util/threadnames.cpp#L39


    fanquake commented at 9:35 AM on November 11, 2025:

    There's also our thread_local clang-tidy plugin: https://github.com/bitcoin/bitcoin/tree/master/contrib/devtools/bitcoin-tidy.


    Eunovo commented at 1:38 PM on November 11, 2025:

    There's also our thread_local clang-tidy plugin: https://github.com/bitcoin/bitcoin/tree/master/contrib/devtools/bitcoin-tidy.

    I don't see why we would need a thread local variable with a non-trivial desctructor to implement this.

    i remember that in the past we had tons of issues with thread-local variables. They've been a pain in the ass on some platforms, and decided to not use them except for one case. Not sure if this is still the case, but if not, this needs to be updated: https://github.com/bitcoin/bitcoin/blob/master/src/util/threadnames.cpp#L39

    I think it would be fine in this case because the use is limited. A simple thread-local boolean and assertion should be enough.

    I don't have a strong opinion here; I just think it would be better to programmatically enforce the rule that certain functions should not be called from Worker Threads.


    furszy commented at 10:45 PM on November 13, 2025:

    Can we enforce this rule using a thread-local variable?

    As m_workers is guarded, we could enforce it in a simple way:

    for (const auto& worker : m_workers) assert(worker.get_id() != std::this_thread::get_id());
    
  68. Eunovo commented at 12:48 PM on October 31, 2025: contributor

    Concept ACK https://github.com/bitcoin/bitcoin/pull/33689/commits/435e8b5a55033fbfc6428a612b9826713b3cf57a

    The PR looks good already, but I think we can block users from calling Threadpool::Start() and Threadpool::Stop() inside Worker threads; We can use a thread local variable to identify worker threads and reject the operation.

  69. furszy force-pushed on Nov 17, 2025
  70. in src/test/threadpool_tests.cpp:73 in adb891184e outdated
      68 | +{
      69 | +    ThreadPool threadPool(POOL_NAME);
      70 | +    BOOST_CHECK_EXCEPTION(threadPool.Submit([]{ return false; }), std::runtime_error, [&](const std::runtime_error& e) {
      71 | +        BOOST_CHECK_EQUAL(e.what(), "No active workers; cannot accept new tasks");
      72 | +        return true;
      73 | +    });
    


    l0rinc commented at 4:01 PM on November 17, 2025:

    Continuing the thread from #33689 (review):

    #include <common/system.h>
    +#include <test/util/setup_common.h>
    #include <util/string.h>
    
        BOOST_CHECK_EXCEPTION(threadPool.Submit([]{ return false; }), std::runtime_error, HasReason{"No active workers; cannot accept new tasks"});
    

    furszy commented at 6:23 PM on November 17, 2025:

    Missed to fully answer your previous comment related to this change, sorry. I didn't do it to not include setup_common.h, as it comes with way more dependencies than just HasReason().

    The idea is to try to reduce large Bitcoin-Core repo dependencies in this low level class, mainly when they do not add much value (this one just let us remove two lines of code with no behavior change), so this class and tests can be easily used elsewhere. We could move HasReason() to another test utils file too but I don't think it worth the effort.


    l0rinc commented at 7:21 PM on November 17, 2025:

    This is needed in multiple methods here, it would simplify the diff and it provides a higher level primitive instead of repeating and reimplementing what we have already built a helper for. Not sure why we'd want to reduce dependencies here, what's the advantage of that? I value clean code a lot more, especially for a test that doesn't even have performance requirements? We can of course move HasReason in a separate PR, but I think we can use it here before that as well.


    furszy commented at 8:03 PM on November 17, 2025:

    Not sure why we'd want to reduce dependencies here, what's the advantage of that?

    I wrote it above; the idea is to be able to use this low-level class and tests elsewhere, outside the project, just by pulling a few files without dragging in all our unit test framework machinery, which has tons other dependencies. If we were talking about a substantial improvement, I’d agree with you, but here it’s just a 2-line diff with no behavior change. And for me, that makes the rationale for including it not very convincing.


    l0rinc commented at 12:17 PM on November 19, 2025:

    the idea is to be able to use this low-level class and tests elsewhere

    I haven't heard that argument before, why would we care about other projects wanting to copy-paste our code? Let them refactor, but we should write the best code for our project.

    but here it’s just a 2-line diff

    It's not, please see my remaining suggestions many of which haven't been applied yet.


    Also, since we're ignoring the return value here, we should likely cast to void here:

        BOOST_CHECK_EXCEPTION((void)threadPool.Submit([] { return false; }), std::runtime_error, HasReason{"No active workers; cannot accept new tasks"});
    
  71. in src/test/threadpool_tests.cpp:27 in adb891184e outdated
      22 | +// 9) Congestion test; create more workers than available cores.
      23 | +// 10) Ensure Interrupt() prevents further submissions.
      24 | +BOOST_AUTO_TEST_SUITE(threadpool_tests)
      25 | +
      26 | +// General test values
      27 | +constexpr int NUM_WORKERS_DEFAULT = 3;
    


    l0rinc commented at 4:06 PM on November 17, 2025:

    Continuing the discussion in #33689 (review), could we randomize this on the call-site, so that we can exercise cases when we have other than 3 workers (to make sure we don't introduce an accidental bias), i.e. in the test it would be something like:

        const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
    

    furszy commented at 7:09 PM on November 17, 2025:

    While I like the extra coverage, I’m not fully convinced about adding this bit of non-determinism. That’s why I didn’t include it in the last push. It could make reproducing failures trickier, since we’d be adding another dimension to all tests; core counts behave differently across environments.

    Probably a good middle ground could be adding a separate test that picks a random worker count and prints the available core count on failure. That gives us some extra coverage without making the other tests harder to debug, and it makes it clear what’s needed to reproduce those specific failures.


    l0rinc commented at 7:25 PM on November 17, 2025:

    It's not about coverage, but rather avoiding bias that we may introduce by hard-coding arbitrary values.

    I prefer a situation that is "hard to reproduce" over something that we won't find because of hard-coded values.

    core counts behave differently across environments

    Exactly, let's exercise those differences, that's exactly what we want from our tests, to ruthlessly try to break the code, right?


    furszy commented at 8:31 PM on November 17, 2025:

    Exactly, let's exercise those differences, that's exactly what we want from our tests, to ruthlessly try to break the code, right?

    We’re going to side-track the discussion a bit, but I’m not sure I completely agree. Generally, I see unit tests as being meant for correctness, not stress testing. For example, we want to ensure certain behavior is always retained — it’s not about trying to break the code in a non-deterministic way. That’s what fuzz tests are for, where we randomize inputs. I’d also argue that we don’t yet have a proper heavy-load testing framework either.

    Still, IIRC, our current fuzzing framework has some limitations; the engine runs the function faster than the OS can release the threads, which causes memory usage to increase non-stop (at least that’s what I remember from a good number of experiments we did a few months ago).

    In any case, this is just a general software development sidetrack… sorry, couldn’t contain myself. We could still have a specific test case for this or dynamically adapt the number of workers if others are happy with that too.


    l0rinc commented at 12:14 PM on November 19, 2025:

    We could still have a specific test case for this or dynamically adapt the number of workers

    How would that solve the problem I highlighted, namely that 3 is a special value that introduces needless bias? We could theoretically have a bug that only manifests when the worker count equals (or exceeds) the CPU count (but maybe only happens for exceptions), which would never happen with 3 workers, but would sometimes fail correctly if we randomize instead of hard-code magic values.

  72. l0rinc changes_requested
  73. l0rinc commented at 5:00 PM on November 17, 2025: contributor

    Left a few more comments to help with moving this forward. I like the new structure more and after the tests and the restructuring are finished, I would like to spend some more time reviewing the ThreadPool as well - but I want to make sure we have a solid foundation first.

  74. furszy force-pushed on Nov 18, 2025
  75. furszy commented at 8:04 PM on November 18, 2025: member

    I've been playing around with the fuzz test and wanted to update here, even if it's still a bit inconclusive.

    I'm running into non-reproducible timeouts when using fork with libFuzzer. I'm overusing my cores by a lot (fork=25 plus the 3 workers each), and I'm not yet sure if these timeouts would happen eventually when using less cores.

    I have "fixed" this by changing notify_one in Submit to notify_all, although I can't say I'm exactly sure why this works. It almost seems like some of the workers aren't being woken up at all when only one is being notified. Maybe the same one somehow ends up being reused over and over, so it gets stuck waiting? But then with notify_all all idle workers are woken up with every task, so the thread pool is able to handle the cpu overload more effectively.

    edit: This should be reproducible in the sense that if you run with fork close to the number of total cores on your machine and set -timeout=30 you should get timeouts in less than an hour of fuzzing. @marcofleon I don’t think this is an issue. You’re just massively oversubscribing the CPU and lowering the timeout to the point where all the context switching triggers it. Switching to notify_all() only forces all workers awake on every submission, which masks the OS scheduler starvation you get in this kind of extreme setup.

  76. in src/test/threadpool_tests.cpp:23 in 258518d880 outdated
      18 | +// 5) The task throws an exception, catch must be done in the consumer side.
      19 | +// 6) Busy workers, help them by processing tasks externally.
      20 | +// 7) Recursive submission of tasks.
      21 | +// 8) Submit task when all threads are busy, stop pool and verify task gets executed.
      22 | +// 9) Congestion test; create more workers than available cores.
      23 | +// 10) Ensure Interrupt() prevents further submissions.
    


    l0rinc commented at 12:16 PM on November 19, 2025:

    we already have individual test cases, there's no need to duplicate them here

  77. in src/util/threadpool.h:150 in 2de0ce5cd8
     145 | +    }
     146 | +
     147 | +    /**
     148 | +     * @brief Enqueues a new task for asynchronous execution.
     149 | +     *
     150 | +     * Returns a `std::future` that provides the task’s result or propagates
    


    l0rinc commented at 12:27 PM on November 19, 2025:

    nit: for consistency with other spelling, e.g. https://github.com/bitcoin/bitcoin/blob/2de0ce5cd85e1b99e318883964df318ffb615fe4/src/util/threadpool.h#L94

         * Returns a `std::future` that provides the task's result or propagates
    

    furszy commented at 9:40 PM on January 27, 2026:

    Done as suggested.

  78. in src/util/threadpool.h:13 in 2de0ce5cd8
       8 | +#include <sync.h>
       9 | +#include <tinyformat.h>
      10 | +#include <util/check.h>
      11 | +#include <util/string.h>
      12 | +#include <util/thread.h>
      13 | +#include <util/threadinterrupt.h>
    


    l0rinc commented at 12:39 PM on November 19, 2025:

    some of these includes seem unused, can you please check?

    <details> <summary>iwyu_tool.py</summary>

     python3 /opt/homebrew/bin/iwyu_tool.py -p build-iwyu src/test/threadpool_tests.cpp -- -Xiwyu --cxx17ns -Xiwyu --mapping_file=$PWD/contrib/devtools/iwyu/bitcoin.core.i
    mp -Xiwyu --max_line_length=160 -Xiwyu --check_also=$PWD/src/util/threadpool.h
    
    /Users/lorinc/IdeaProjects/bitcoin/src/util/threadpool.h should add these lines:
    #include <__vector/vector.h>   // for vector
    
    /Users/lorinc/IdeaProjects/bitcoin/src/util/threadpool.h should remove these lines:
    - #include <util/threadinterrupt.h>  // lines 13-13
    - #include <atomic>  // lines 16-16
    - #include <functional>  // lines 19-19
    - #include <memory>  // lines 21-21
    - #include <vector>  // lines 26-26
    
    The full include-list for /Users/lorinc/IdeaProjects/bitcoin/src/util/threadpool.h:
    #include <__vector/vector.h>   // for vector
    #include <sync.h>              // for UniqueLock, EXCLUSIVE_LOCKS_REQUIRED, LOCK, WITH_LOCK, GUARDED_BY, Mutex, REVERSE_LOCK, WAIT_LOCK
    #include <tinyformat.h>        // for format, formatTruncated, formatValue, makeFormatList, strprintf
    #include <util/check.h>        // for assert, inline_assertion_check, Assume
    #include <util/string.h>       // for string, basic_string, basic_string_view
    #include <util/thread.h>       // for TraceThread
    #include <algorithm>           // for move
    #include <condition_variable>  // for condition_variable
    #include <cstddef>             // for size_t
    #include <future>              // for packaged_task
    #include <queue>               // for queue
    #include <stdexcept>           // for runtime_error
    #include <thread>              // for thread, get_id, operator==, __thread_id
    #include <utility>             // for move, forward
    ---
    
    /Users/lorinc/IdeaProjects/bitcoin/src/test/threadpool_tests.cpp should add these lines:
    #include <__vector/vector.h>                                        // for vector
    #include <algorithm>                                                // for max
    #include <atomic>                                                   // for atomic, memory_order_relaxed, memory_order_acquire, memory_order_release
    #include <boost/preprocessor/arithmetic/limits/dec_256.hpp>         // for BOOST_PP_DEC_1, BOOST_PP_DEC_2, BOOST_PP_DEC_128, BOOST_PP_DEC_16, BOOST_PP_DEC_3
    #include <boost/preprocessor/comparison/limits/not_equal_256.hpp>   // for BOOST_PP_NOT_EQUAL_1, BOOST_PP_NOT_EQUAL_CHECK_BOOST_PP_NOT_EQUAL_1
    #include <boost/preprocessor/control/expr_iif.hpp>                  // for BOOST_PP_EXPR_IIF_1
    #include <boost/preprocessor/control/iif.hpp>                       // for BOOST_PP_IIF_1, BOOST_PP_IIF_0
    #include <boost/preprocessor/detail/limits/auto_rec_256.hpp>        // for BOOST_PP_NODE_ENTRY_256
    #include <boost/preprocessor/logical/compl.hpp>                     // for BOOST_PP_COMPL_0
    #include <boost/preprocessor/logical/limits/bool_256.hpp>           // for BOOST_PP_BOOL_0, BOOST_PP_BOOL_1, BOOST_PP_BOOL_2
    #include <boost/preprocessor/repetition/detail/limits/for_256.hpp>  // for BOOST_PP_FOR_0, BOOST_PP_FOR_1, BOOST_PP_FOR_127, BOOST_PP_FOR_15, BOOST_PP_FOR_3
    #include <boost/preprocessor/repetition/for.hpp>                    // for BOOST_PP_FOR_CHECK_BOOST_PP_NIL
    #include <boost/preprocessor/seq/limits/elem_256.hpp>               // for BOOST_PP_SEQ_ELEM_0
    #include <boost/preprocessor/seq/limits/size_256.hpp>               // for BOOST_PP_SEQ_SIZE_BOOST_PP_SEQ_SIZE_2, BOOST_PP_SEQ_SIZE_BOOST_PP_SEQ_SIZE_3
    #include <boost/preprocessor/tuple/elem.hpp>                        // for BOOST_PP_TUPLE_ELEM_O_3
    #include <boost/preprocessor/variadic/limits/elem_64.hpp>           // for BOOST_PP_VARIADIC_ELEM_3
    #include <boost/test/tools/assertion_result.hpp>                    // for assertion_result
    #include <boost/test/tools/old/interface.hpp>                       // for BOOST_TEST_TOOL_PASS_ARGS2, BOOST_TEST_TOOL_PASS_PRED2, BOOST_CHECK_EQUAL, BOOST_...
    #include <boost/test/tree/auto_registration.hpp>                    // for auto_test_unit_registrar
    #include <boost/test/unit_test_suite.hpp>                           // for BOOST_AUTO_TEST_CASE, BOOST_AUTO_TEST_CASE_FIXTURE, BOOST_AUTO_TEST_SUITE, BOOST_...
    #include <boost/test/utils/basic_cstring/basic_cstring.hpp>         // for basic_cstring
    #include <boost/test/utils/lazy_ostream.hpp>                        // for operator<<
    #include <future>                                                   // for future, promise, future_status, shared_future
    #include <stdexcept>                                                // for runtime_error
    #include <thread>                                                   // for thread
    
    /Users/lorinc/IdeaProjects/bitcoin/src/test/threadpool_tests.cpp should remove these lines:
    - #include <boost/test/unit_test.hpp>  // lines 10-10
    
    The full include-list for /Users/lorinc/IdeaProjects/bitcoin/src/test/threadpool_tests.cpp:
    #include <__vector/vector.h>                                        // for vector
    #include <common/system.h>                                          // for GetNumCores
    #include <util/string.h>                                            // for basic_string, allocator, char_traits, string, ToString, operator+
    #include <util/threadpool.h>                                        // for ThreadPool
    #include <util/time.h>                                              // for UninterruptibleSleep, milliseconds, operator""s
    #include <algorithm>                                                // for max
    #include <atomic>                                                   // for atomic, memory_order_relaxed, memory_order_acquire, memory_order_release
    #include <boost/preprocessor/arithmetic/limits/dec_256.hpp>         // for BOOST_PP_DEC_1, BOOST_PP_DEC_2, BOOST_PP_DEC_128, BOOST_PP_DEC_16, BOOST_PP_DEC_3
    #include <boost/preprocessor/comparison/limits/not_equal_256.hpp>   // for BOOST_PP_NOT_EQUAL_1, BOOST_PP_NOT_EQUAL_CHECK_BOOST_PP_NOT_EQUAL_1
    #include <boost/preprocessor/control/expr_iif.hpp>                  // for BOOST_PP_EXPR_IIF_1
    #include <boost/preprocessor/control/iif.hpp>                       // for BOOST_PP_IIF_1, BOOST_PP_IIF_0
    #include <boost/preprocessor/detail/limits/auto_rec_256.hpp>        // for BOOST_PP_NODE_ENTRY_256
    #include <boost/preprocessor/logical/compl.hpp>                     // for BOOST_PP_COMPL_0
    #include <boost/preprocessor/logical/limits/bool_256.hpp>           // for BOOST_PP_BOOL_0, BOOST_PP_BOOL_1, BOOST_PP_BOOL_2
    #include <boost/preprocessor/repetition/detail/limits/for_256.hpp>  // for BOOST_PP_FOR_0, BOOST_PP_FOR_1, BOOST_PP_FOR_127, BOOST_PP_FOR_15, BOOST_PP_FOR_3
    #include <boost/preprocessor/repetition/for.hpp>                    // for BOOST_PP_FOR_CHECK_BOOST_PP_NIL
    #include <boost/preprocessor/seq/limits/elem_256.hpp>               // for BOOST_PP_SEQ_ELEM_0
    #include <boost/preprocessor/seq/limits/size_256.hpp>               // for BOOST_PP_SEQ_SIZE_BOOST_PP_SEQ_SIZE_2, BOOST_PP_SEQ_SIZE_BOOST_PP_SEQ_SIZE_3
    #include <boost/preprocessor/tuple/elem.hpp>                        // for BOOST_PP_TUPLE_ELEM_O_3
    #include <boost/preprocessor/variadic/limits/elem_64.hpp>           // for BOOST_PP_VARIADIC_ELEM_3
    #include <boost/test/tools/assertion_result.hpp>                    // for assertion_result
    #include <boost/test/tools/old/interface.hpp>                       // for BOOST_TEST_TOOL_PASS_ARGS2, BOOST_TEST_TOOL_PASS_PRED2, BOOST_CHECK_EQUAL, BOOST_...
    #include <boost/test/tree/auto_registration.hpp>                    // for auto_test_unit_registrar
    #include <boost/test/unit_test_suite.hpp>                           // for BOOST_AUTO_TEST_CASE, BOOST_AUTO_TEST_CASE_FIXTURE, BOOST_AUTO_TEST_SUITE, BOOST_...
    #include <boost/test/utils/basic_cstring/basic_cstring.hpp>         // for basic_cstring
    #include <boost/test/utils/lazy_ostream.hpp>                        // for operator<<
    #include <future>                                                   // for future, promise, future_status, shared_future
    #include <stdexcept>                                                // for runtime_error
    #include <thread>                                                   // for thread
    ---
    

    </details>


    furszy commented at 9:40 PM on January 27, 2026:

    Done as suggested.

  79. in src/util/threadpool.h:141 in 2de0ce5cd8 outdated
     136 | +            // Early shutdown to return right away on any concurrent 'Submit()' call
     137 | +            m_interrupt = true;
     138 | +            threads_to_join.swap(m_workers);
     139 | +        }
     140 | +        m_cv.notify_all();
     141 | +        for (auto& worker : threads_to_join) worker.join();
    


    l0rinc commented at 1:26 PM on November 19, 2025:

    https://corecheck.dev/bitcoin/bitcoin/pulls/33689 suggests using std::jthread instead, which would eliminate the need for manual joins - can you check if our infrastructure would support that?

  80. in src/util/threadpool.h:94 in 2de0ce5cd8 outdated
      89 | +public:
      90 | +    explicit ThreadPool(const std::string& name) : m_name(name) {}
      91 | +
      92 | +    ~ThreadPool()
      93 | +    {
      94 | +        Stop(); // In case it hasn't been stopped.
    


    l0rinc commented at 1:28 PM on November 19, 2025:

    nit: comment is redundant, the code already explains it. There are few other comments that don't provide value, can you please check them?

  81. in src/util/threadpool.h:156 in 2de0ce5cd8 outdated
     151 | +     * any exception it throws.
     152 | +     * Note: Ignoring the returned future requires guarding the task against
     153 | +     * uncaught exceptions, as they would otherwise be silently discarded.
     154 | +     */
     155 | +    template <class F> EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     156 | +    auto Submit(F&& fn)
    


    l0rinc commented at 1:38 PM on November 19, 2025:

    nit: we could mark this as [[nodiscard]] and reformat with clang-format:

        template <class F>
        [[nodiscard]] EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) auto Enqueue(F&& fn)
    

    Note that this reveals that we often ignore the return value and should make it explicit if that's deliberate or not. I also think that calling in Enqueue could describe its behavior better.


    furszy commented at 9:45 PM on January 27, 2026:

    Sure. Added [[nodiscard]]. I personally prefer Submit as tasks might not get executed in order as enqueue suggests.

  82. in src/httpserver.cpp:256 in 2de0ce5cd8
     251 | @@ -327,13 +252,13 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
     252 |  
     253 |      // Dispatch to worker thread
     254 |      if (i != iend) {
     255 | -        std::unique_ptr<HTTPWorkItem> item(new HTTPWorkItem(std::move(hreq), path, i->handler));
     256 | -        assert(g_work_queue);
     257 | -        if (g_work_queue->Enqueue(item.get())) {
     258 | -            item.release(); /* if true, queue took ownership */
     259 | +        if ((int)g_threadpool_http.WorkQueueSize() < g_max_queue_depth) {
     260 | +            g_threadpool_http.Submit([req = std::move(hreq), in_path = std::move(path), fn = i->handler]() {
    


    l0rinc commented at 1:44 PM on November 19, 2025:

    as far as I can tell, we're ignoring the return value here and silently ignoring any thrown thread exception - which we have tested so carefully.

    Since my understanding is that this is an asynchronous fire-and-forget task, we can't just .get() the future here, but we can likely handle the potential exceptions inside and void the method to signal that we're deliberately discarding the future (not forgetting it).


    furszy commented at 8:52 PM on January 28, 2026:

    Yeah, that why I was very explicit in the Submit() documentation about this behavior. I was thinking specifically about this line.

    I managed to crash master just by throwing an exception from the RPC handler, which highlights another advantage of this PR since the same scenario no longer results in a crash here. Because of this, I added an early commit in the PR to explain and explicitly fix the crash, keeping it isolated from the HTTP-pool changes. And also included an error logging improvement for the functional test framework.

  83. in src/test/threadpool_tests.cpp:133 in 2de0ce5cd8
     128 | +{
     129 | +    ThreadPool threadPool(POOL_NAME);
     130 | +    threadPool.Start(NUM_WORKERS_DEFAULT);
     131 | +    std::atomic<bool> flag = false;
     132 | +    std::future<void> future = threadPool.Submit([&flag]() {
     133 | +        UninterruptibleSleep(std::chrono::milliseconds{200});
    


    l0rinc commented at 2:13 PM on November 19, 2025:
            UninterruptibleSleep(200ms);
    

    furszy commented at 9:49 PM on January 27, 2026:

    Done as suggested.

  84. in src/test/threadpool_tests.cpp:136 in 2de0ce5cd8 outdated
     131 | +    std::atomic<bool> flag = false;
     132 | +    std::future<void> future = threadPool.Submit([&flag]() {
     133 | +        UninterruptibleSleep(std::chrono::milliseconds{200});
     134 | +        flag.store(true, std::memory_order_release);
     135 | +    });
     136 | +    BOOST_CHECK(future.wait_for(WAIT_TIMEOUT) == std::future_status::ready);
    


    l0rinc commented at 2:13 PM on November 19, 2025:
        BOOST_CHECK_EQUAL(future.wait_for(WAIT_TIMEOUT), std::future_status::ready);
    

    furszy commented at 9:49 PM on January 27, 2026:

    Done as suggested.

  85. in src/httpserver.cpp:433 in 2de0ce5cd8 outdated
     428 | @@ -516,21 +429,17 @@ void InterruptHTTPServer()
     429 |          // Reject requests on current connections
     430 |          evhttp_set_gencb(eventHTTP, http_reject_request_cb, nullptr);
     431 |      }
     432 | -    if (g_work_queue) {
     433 | -        g_work_queue->Interrupt();
     434 | -    }
     435 | +    // Interrupt pool after disabling requests
     436 | +    g_threadpool_http.Interrupt();
    


    l0rinc commented at 3:07 PM on November 19, 2025:

    still not sure I fully understand the difference between stopping and interrupting - we should probably merge it with Stop and document it better

  86. in src/util/threadpool.h:78 in 2de0ce5cd8 outdated
      73 | +                if (m_interrupt && m_work_queue.empty()) {
      74 | +                    return;
      75 | +                }
      76 | +
      77 | +                task = std::move(m_work_queue.front());
      78 | +                m_work_queue.pop();
    


    l0rinc commented at 3:56 PM on November 19, 2025:

    we could make it a bit more specific by making

    std::deque<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
    

    which would allow

                    m_work_queue.pop_front();
    

    for symmetry with m_work_queue.front() and to clarify that

                m_work_queue.emplace(std::move(task));
    

    actually inserts at the back

                m_work_queue.emplace_back(std::move(task));
    
  87. in src/util/threadpool.h:175 in 2de0ce5cd8 outdated
     170 | +
     171 | +    /**
     172 | +     * @brief Execute a single queued task synchronously.
     173 | +     * Removes one task from the queue and executes it on the calling thread.
     174 | +     */
     175 | +    void ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    


    l0rinc commented at 4:28 PM on November 19, 2025:

    It seems to me this is a convenience method to expose the inner workings of the pool - but the current usages don't seem to require it.

    // Test 6, all workers are busy, help them by processing tasks from outside

    It's not obvious why that would be necessary, doesn't this mean that we have misjudged the number of threads the pool should have?

    I would prefer having the simplest pool we can to get the job done and extend it when we actually need the functionality instead. Or if we do need this for some reason, we should be able to deduplicate since this is basically the exact same implementation and what the WorkerThread does in a loop:

    std::counting_semaphore<> m_sem{0};
    
    void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    {
        for (;;) {
            m_sem.acquire();
            if (!ProcessTask()) return;
        }
    }
    
    ...
    
    /**
     * [@brief](/bitcoin-bitcoin/contributor/brief/) Execute a single queued task synchronously.
     * Removes one task from the queue and executes it on the calling thread.
     * [@return](/bitcoin-bitcoin/contributor/return/) true if a task was executed, false if queue was empty
     */
    bool ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    {
        std::packaged_task<void()> task;
        {
            LOCK(m_mutex);
            if (m_work_queue.empty()) return false;
            task = std::move(m_work_queue.front());
            m_work_queue.pop_front();
        }
        task();
        return true;
    }
    
  88. in src/util/threadpool.h:56 in 2de0ce5cd8 outdated
      51 | +    Mutex m_mutex;
      52 | +    std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
      53 | +    std::condition_variable m_cv;
      54 | +    // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
      55 | +    // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
      56 | +    // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
    


    l0rinc commented at 4:42 PM on November 19, 2025:

    The condition_variable(since C++11) with locks, reverse locks, waits, notifies can be modernized to a std::counting_semaphore<> m_sem{0}; (since C++20) that we release when we enqueue an element in Submit, in Interrupt() and Stop we release all of them, and in WorkerThread we m_sem.acquire(); before calling ProcessTask

  89. in src/util/threadpool.h:135 in 2de0ce5cd8 outdated
     130 | +        std::vector<std::thread> threads_to_join;
     131 | +        {
     132 | +            LOCK(m_mutex);
     133 | +            // Ensure 'Stop()' isn't called from any worker thread to avoid deadlocks
     134 | +            auto id = std::this_thread::get_id();
     135 | +            for (const auto& worker : m_workers) assert(worker.get_id() != id);
    


    l0rinc commented at 4:45 PM on November 19, 2025:

    tests are passing without these lines - can we cover them?

  90. in src/util/threadpool.h:144 in 2de0ce5cd8 outdated
     139 | +        }
     140 | +        m_cv.notify_all();
     141 | +        for (auto& worker : threads_to_join) worker.join();
     142 | +        // Since we currently wait for tasks completion, sanity-check empty queue
     143 | +        WITH_LOCK(m_mutex, Assume(m_work_queue.empty()));
     144 | +        // Note: m_interrupt is left true until next Start()
    


    l0rinc commented at 4:45 PM on November 19, 2025:

    I also find it weird that we can restart a ThreadPool - can we add a Start() → Stop() → Start() unit test if that something we want indeed? I understand it may not be trivial, but I would rather we restructure for RAII or some better lifecycle instead.

  91. in src/util/threadpool.h:133 in 2de0ce5cd8
     128 | +    {
     129 | +        // Notify workers and join them
     130 | +        std::vector<std::thread> threads_to_join;
     131 | +        {
     132 | +            LOCK(m_mutex);
     133 | +            // Ensure 'Stop()' isn't called from any worker thread to avoid deadlocks
    


    l0rinc commented at 4:46 PM on November 19, 2025:

    nit: we've used backticks in other similar cases

                // Ensure `Stop()` isn't called from any worker thread to avoid deadlocks
    

    ismaelsadeeq commented at 2:58 PM on December 18, 2025:

    In "util: introduce general purpose thread pool” 258518d880e81938fcd9672b37bc640e8a06c930

    This comment is incorrect because this check alone is not sufficient to guarantee that Stop() is never called from a worker thread.

    Consider the following scenario: Stop() is called both from a worker thread and from a non-worker thread sequentially. The call from outside the pool may execute first (unless the caller explicitly waits on the future from the thread pool Stop()). The non thread pool Stop() call acquires the lock, sets the interrupt flag, clears the worker list, and joins all worker threads.

    When the subsequent call to Stop() from within a worker thread executes, the worker list is already empty, comparison succeeds, the check does not detect that Stop() is being invoked from a worker thread.

    This does not result in a deadlock, since there are no remaining threads to join.

    Perhaps the comment should be adjusted to something like

    // Ensure Stop() is not called from a worker thread while workers are still registered,
    // which would otherwise cause a self-join deadlock during shutdown.
    

    furszy commented at 6:10 PM on December 28, 2025:

    Good catch. Thanks. Updated.


    furszy commented at 9:50 PM on January 27, 2026:

    Removed the backticks.

  92. in src/util/threadpool.h:136 in 2de0ce5cd8 outdated
     131 | +        {
     132 | +            LOCK(m_mutex);
     133 | +            // Ensure 'Stop()' isn't called from any worker thread to avoid deadlocks
     134 | +            auto id = std::this_thread::get_id();
     135 | +            for (const auto& worker : m_workers) assert(worker.get_id() != id);
     136 | +            // Early shutdown to return right away on any concurrent 'Submit()' call
    


    l0rinc commented at 4:46 PM on November 19, 2025:

    same:

                // Early shutdown to return right away on any concurrent `Submit()` call
    

    furszy commented at 9:51 PM on January 27, 2026:

    removed the backticks

  93. l0rinc changes_requested
  94. l0rinc commented at 6:03 PM on November 19, 2025: contributor

    After the tests, I have reviewed the ThreadPool as well now - first iteration, have a few questions, I expect a few more rounds of review. I left some modernization suggestions and fine-grained commit requests to make the change easily reviewable.

    I still share many of @andrewtoth's concerns regarding RAII vs Start/Stop/Interrupt and I think we can modernize the pool from std::condition_variable with locks to C++20 std::counting_semaphore. Besides my suggestions below I think we should merge Stop/Interrupt maybe via a bool join = true parameter.

    There's also some duplication between WorkerThread and ProcessTask that we can likely avoid (the test are passing, we can either simplify or improve our testing). There are also a few remaining inconsistencies (I have mentioned a few, repeating a some of them here for visibility). Similarly to before, I have tracked my suggestions locally, here's the patch to simplify checking my suggestions against your local copy:

    <details> <summary>Threadpool & tests</summary>

    diff --git a/src/httpserver.cpp b/src/httpserver.cpp
    index 6069062abd..c5033462ac 100644
    --- a/src/httpserver.cpp
    +++ b/src/httpserver.cpp
    @@ -253,7 +253,7 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
         // Dispatch to worker thread
         if (i != iend) {
             if ((int)g_threadpool_http.WorkQueueSize() < g_max_queue_depth) {
    -            g_threadpool_http.Submit([req = std::move(hreq), in_path = std::move(path), fn = i->handler]() {
    +            (void)g_threadpool_http.Enqueue([req = std::move(hreq), in_path = std::move(path), fn = i->handler] {
                     fn(req.get(), in_path);
                 });
             } else {
    diff --git a/src/test/threadpool_tests.cpp b/src/test/threadpool_tests.cpp
    index 8cbaf89ddf..56e4fc27e0 100644
    --- a/src/test/threadpool_tests.cpp
    +++ b/src/test/threadpool_tests.cpp
    @@ -3,30 +3,18 @@
     // file COPYING or http://www.opensource.org/licenses/mit-license.php.
     
     #include <common/system.h>
    -#include <util/string.h>
    +#include <test/util/setup_common.h>
     #include <util/threadpool.h>
     #include <util/time.h>
     
     #include <boost/test/unit_test.hpp>
     
    -// Test Cases Overview
    -// 0) Submit task to a non-started pool.
    -// 1) Submit tasks and verify completion.
    -// 2) Maintain all threads busy except one.
    -// 3) Wait for work to finish.
    -// 4) Wait for result object.
    -// 5) The task throws an exception, catch must be done in the consumer side.
    -// 6) Busy workers, help them by processing tasks externally.
    -// 7) Recursive submission of tasks.
    -// 8) Submit task when all threads are busy, stop pool and verify task gets executed.
    -// 9) Congestion test; create more workers than available cores.
    -// 10) Ensure Interrupt() prevents further submissions.
    -BOOST_AUTO_TEST_SUITE(threadpool_tests)
    -
    -// General test values
    -constexpr int NUM_WORKERS_DEFAULT = 3;
    -constexpr char POOL_NAME[] = "test";
    -constexpr auto WAIT_TIMEOUT = 120s;
    +#include <latch>
    +#include <semaphore>
    +
    +using namespace std::chrono;
    +
    +constexpr auto WAIT_TIMEOUT{120s};
     
     #define WAIT_FOR(futures)                                                         \
         do {                                                                          \
    @@ -37,259 +25,233 @@ constexpr auto WAIT_TIMEOUT = 120s;
     
     // Block a number of worker threads by submitting tasks that wait on `blocker_future`.
     // Returns the futures of the blocking tasks, ensuring all have started and are waiting.
    -std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::shared_future<void>& blocker_future, int num_of_threads_to_block)
    +std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::counting_semaphore<>& release_sem, size_t num_of_threads_to_block)
     {
    -    // Per-thread ready promises to ensure all workers are actually blocked
    -    std::vector<std::promise<void>> ready_promises(num_of_threads_to_block);
    -    std::vector<std::future<void>> ready_futures;
    -    ready_futures.reserve(num_of_threads_to_block);
    -    for (auto& p : ready_promises) ready_futures.emplace_back(p.get_future());
    -
    -    // Fill all workers with blocking tasks
    -    std::vector<std::future<void>> blocking_tasks;
    -    for (int i = 0; i < num_of_threads_to_block; i++) {
    -        std::promise<void>& ready = ready_promises[i];
    -        blocking_tasks.emplace_back(threadPool.Submit([blocker_future, &ready]() {
    -            ready.set_value();
    -            blocker_future.wait();
    -        }));
    -    }
    -
    -    // Wait until all threads are actually blocked
    -    WAIT_FOR(ready_futures);
    +    assert(threadPool.WorkersCount() >= num_of_threads_to_block);
    +    std::latch ready{std::ptrdiff_t(num_of_threads_to_block)};
    +    std::vector<std::future<void>> blocking_tasks(num_of_threads_to_block);
    +    for (auto& f : blocking_tasks) f = threadPool.Enqueue([&] {
    +        ready.count_down();
    +        release_sem.acquire();
    +    });
    +    ready.wait();
         return blocking_tasks;
     }
     
    -// Test 0, submit task to a non-started pool
    +BOOST_FIXTURE_TEST_SUITE(threadpool_tests, BasicTestingSetup)
    +
    +// Submit task to a non-started pool
     BOOST_AUTO_TEST_CASE(submit_task_before_start_fails)
     {
    -    ThreadPool threadPool(POOL_NAME);
    -    BOOST_CHECK_EXCEPTION(threadPool.Submit([]{ return false; }), std::runtime_error, [&](const std::runtime_error& e) {
    -        BOOST_CHECK_EQUAL(e.what(), "No active workers; cannot accept new tasks");
    -        return true;
    -    });
    +    ThreadPool threadPool{"not_started"};
    +    BOOST_CHECK_EXCEPTION((void)threadPool.Enqueue([] { return 0; }), std::runtime_error, HasReason{"No active workers"});
     }
     
    -// Test 1, submit tasks and verify completion
    +// Submit tasks and verify completion
     BOOST_AUTO_TEST_CASE(submit_tasks_complete_successfully)
     {
    -    int num_tasks = 50;
    -
    -    ThreadPool threadPool(POOL_NAME);
    -    threadPool.Start(NUM_WORKERS_DEFAULT);
    -    std::atomic<int> counter = 0;
    -
    -    // Store futures to ensure completion before checking counter.
    -    std::vector<std::future<void>> futures;
    -    futures.reserve(num_tasks);
    -    for (int i = 1; i <= num_tasks; i++) {
    -        futures.emplace_back(threadPool.Submit([&counter, i]() {
    -            counter.fetch_add(i, std::memory_order_relaxed);
    -        }));
    +    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
    +    ThreadPool threadPool{"completion"};
    +    threadPool.Start(num_workers);
    +
    +    const auto num_tasks{1 + m_rng.randrange<size_t>(50)};
    +    std::atomic_size_t counter{0};
    +
    +    std::vector<std::future<void>> futures(num_tasks);
    +    for (size_t i{0}; i < num_tasks; ++i) {
    +        futures[i] = threadPool.Enqueue([&counter, i] { counter.fetch_add(i, std::memory_order_relaxed); });
         }
     
    -    // Wait for all tasks to finish
         WAIT_FOR(futures);
    -    int expected_value = (num_tasks * (num_tasks + 1)) / 2; // Gauss sum.
    -    BOOST_CHECK_EQUAL(counter.load(), expected_value);
    +    BOOST_CHECK_EQUAL(counter.load(std::memory_order_relaxed), (num_tasks - 1) * num_tasks / 2);
         BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
     }
     
    -// Test 2, maintain all threads busy except one
    -BOOST_AUTO_TEST_CASE(single_available_worker_executes_all_tasks)
    +// Block varying numbers of workers and verify remaining workers process all tasks
    +BOOST_AUTO_TEST_CASE(limited_free_workers_processes_all_tasks)
     {
    -    ThreadPool threadPool(POOL_NAME);
    -    threadPool.Start(NUM_WORKERS_DEFAULT);
    -    // Single blocking future for all threads
    -    std::promise<void> blocker;
    -    std::shared_future<void> blocker_future(blocker.get_future());
    -    const auto blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT - 1);
    -
    -    // Now execute tasks on the single available worker
    -    // and check that all the tasks are executed.
    -    int num_tasks = 15;
    -    int counter = 0;
    -
    -    // Store futures to wait on
    -    std::vector<std::future<void>> futures(num_tasks);
    -    for (auto& f : futures) f = threadPool.Submit([&counter]{ counter++; });
    +    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
    +    ThreadPool threadPool{"limited_workers"};
    +    threadPool.Start(num_workers);
     
    -    WAIT_FOR(futures);
    -    BOOST_CHECK_EQUAL(counter, num_tasks);
    +    const auto num_tasks{5 + m_rng.randrange<size_t>(20)};
    +
    +    for (size_t free{1}; free < num_workers; ++free) {
    +        BOOST_TEST_MESSAGE("Testing with " << free << " available workers");
    +        std::counting_semaphore sem{0};
    +        const auto blocking_tasks{BlockWorkers(threadPool, sem, num_workers - free)};
    +
    +        size_t counter{0};
    +        std::vector<std::future<void>> futures(num_tasks);
    +        for (auto& f : futures) f = threadPool.Enqueue([&counter] { ++counter; });
    +
    +        WAIT_FOR(futures);
    +        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    +
    +        if (free == 1) {
    +            BOOST_CHECK_EQUAL(counter, num_tasks);
    +        } else {
    +            BOOST_CHECK_LE(counter, num_tasks); // unsynchronized update from multiple threads doesn't guarantee consistency
    +        }
    +
    +        sem.release(num_workers - free);
    +        WAIT_FOR(blocking_tasks);
    +    }
     
    -    blocker.set_value();
    -    WAIT_FOR(blocking_tasks);
         threadPool.Stop();
         BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
     }
     
    -// Test 3, wait for work to finish
    +// Wait for work to finish
     BOOST_AUTO_TEST_CASE(wait_for_task_to_finish)
     {
    -    ThreadPool threadPool(POOL_NAME);
    -    threadPool.Start(NUM_WORKERS_DEFAULT);
    -    std::atomic<bool> flag = false;
    -    std::future<void> future = threadPool.Submit([&flag]() {
    -        UninterruptibleSleep(std::chrono::milliseconds{200});
    -        flag.store(true, std::memory_order_release);
    -    });
    -    BOOST_CHECK(future.wait_for(WAIT_TIMEOUT) == std::future_status::ready);
    -    BOOST_CHECK(flag.load(std::memory_order_acquire));
    +    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
    +    ThreadPool threadPool{"wait_test"};
    +    threadPool.Start(num_workers);
    +
    +    const auto num_tasks{1 + m_rng.randrange<size_t>(10)};
    +    const auto start{SteadyClock::now()};
    +
    +    std::vector<std::future<void>> futures(num_tasks + 1);
    +    for (size_t i{0}; i <= num_tasks; ++i) {
    +        futures[i] = threadPool.Enqueue([i] { UninterruptibleSleep(milliseconds{i}); });
    +    }
    +
    +    WAIT_FOR(futures);
    +    const size_t elapsed_ms{size_t(duration_cast<milliseconds>(steady_clock::now() - start).count())};
    +    BOOST_CHECK(elapsed_ms >= num_tasks);
     }
     
    -// Test 4, obtain result object
    +// Obtain result object
     BOOST_AUTO_TEST_CASE(get_result_from_completed_task)
     {
    -    ThreadPool threadPool(POOL_NAME);
    -    threadPool.Start(NUM_WORKERS_DEFAULT);
    -    std::future<bool> future_bool = threadPool.Submit([]() { return true; });
    -    BOOST_CHECK(future_bool.get());
    -
    -    std::future<std::string> future_str = threadPool.Submit([]() { return std::string("true"); });
    -    std::string result = future_str.get();
    -    BOOST_CHECK_EQUAL(result, "true");
    +    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
    +    ThreadPool threadPool{"result_test"};
    +    threadPool.Start(num_workers);
    +
    +    BOOST_CHECK_EQUAL(threadPool.Enqueue([] { return true; }).get(), true);
    +    BOOST_CHECK_EQUAL(threadPool.Enqueue([] { return 42; }).get(), 42);
    +    BOOST_CHECK_EQUAL(threadPool.Enqueue([] { return std::string{"true"}; }).get(), "true");
     }
     
    -// Test 5, throw exception and catch it on the consumer side
    +// Throw exception and catch it on the consumer side
     BOOST_AUTO_TEST_CASE(task_exception_propagates_to_future)
     {
    -    ThreadPool threadPool(POOL_NAME);
    -    threadPool.Start(NUM_WORKERS_DEFAULT);
    -
    -    int num_tasks = 5;
    -    std::string err_msg{"something wrong happened"};
    -    std::vector<std::future<void>> futures;
    -    futures.reserve(num_tasks);
    -    for (int i = 0; i < num_tasks; i++) {
    -        futures.emplace_back(threadPool.Submit([err_msg, i]() {
    -            throw std::runtime_error(err_msg + util::ToString(i));
    -        }));
    +    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
    +    ThreadPool threadPool{"exception_test"};
    +    threadPool.Start(num_workers);
    +
    +    const auto make_err{[](size_t n) { return strprintf("error on thread #%s", n); }};
    +    const auto num_tasks{5 + m_rng.randrange<size_t>(15)};
    +
    +    std::vector<std::future<void>> futures(num_tasks);
    +    for (size_t i{0}; i < num_tasks; ++i) {
    +        futures[i] = threadPool.Enqueue([&make_err, i] { throw std::runtime_error(make_err(i)); });
         }
     
    -    for (int i = 0; i < num_tasks; i++) {
    -        BOOST_CHECK_EXCEPTION(futures.at(i).get(), std::runtime_error, [&](const std::runtime_error& e) {
    -            BOOST_CHECK_EQUAL(e.what(), err_msg + util::ToString(i));
    -            return true;
    -        });
    +    for (size_t i{0}; i < num_tasks; ++i) {
    +        BOOST_CHECK_EXCEPTION(futures[i].get(), std::runtime_error, HasReason{make_err(i)});
         }
     }
     
    -// Test 6, all workers are busy, help them by processing tasks from outside
    +// All workers are busy, help them by processing tasks from outside
     BOOST_AUTO_TEST_CASE(process_tasks_manually_when_workers_busy)
     {
    -    ThreadPool threadPool(POOL_NAME);
    -    threadPool.Start(NUM_WORKERS_DEFAULT);
    -
    -    std::promise<void> blocker;
    -    std::shared_future<void> blocker_future(blocker.get_future());
    -    const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT);
    -
    -    // Now submit tasks and check that none of them are executed.
    -    int num_tasks = 20;
    -    std::atomic<int> counter = 0;
    -    for (int i = 0; i < num_tasks; i++) {
    -        threadPool.Submit([&counter]() {
    -            counter.fetch_add(1, std::memory_order_relaxed);
    -        });
    -    }
    -    UninterruptibleSleep(std::chrono::milliseconds{100});
    +    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
    +    ThreadPool threadPool{"manual_process"};
    +    threadPool.Start(num_workers);
    +
    +    std::counting_semaphore sem{0};
    +    const auto blocking_tasks{BlockWorkers(threadPool, sem, num_workers)};
    +
    +    const auto num_tasks{10 + m_rng.randrange<size_t>(30)};
    +    std::atomic_size_t counter{0};
    +
    +    std::vector<std::future<void>> futures(num_tasks);
    +    for (auto& f : futures) f = threadPool.Enqueue([&counter] { counter.fetch_add(1, std::memory_order_relaxed); });
    +
    +    UninterruptibleSleep(100ms);
         BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
     
    -    // Now process manually
    -    for (int i = 0; i < num_tasks; i++) {
    -        threadPool.ProcessTask();
    -    }
    -    BOOST_CHECK_EQUAL(counter.load(), num_tasks);
    +    for (size_t i{0}; i < num_tasks; ++i) threadPool.ProcessTask();
    +
    +    BOOST_CHECK_EQUAL(counter.load(std::memory_order_relaxed), num_tasks);
         BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    -    blocker.set_value();
    +
    +    WAIT_FOR(futures);
    +
    +    sem.release(num_workers);
         threadPool.Stop();
         WAIT_FOR(blocking_tasks);
     }
     
    -// Test 7, submit tasks from other tasks
    +// Submit tasks from other tasks
     BOOST_AUTO_TEST_CASE(recursive_task_submission)
     {
    -    ThreadPool threadPool(POOL_NAME);
    -    threadPool.Start(NUM_WORKERS_DEFAULT);
    +    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
    +    ThreadPool threadPool{"recursive"};
    +    threadPool.Start(num_workers);
     
         std::promise<void> signal;
    -    threadPool.Submit([&]() {
    -        threadPool.Submit([&]() {
    -            signal.set_value();
    -        });
    -    });
    +    (void)threadPool.Enqueue([&] { (void)threadPool.Enqueue([&] { signal.set_value(); }); });
     
         signal.get_future().wait();
         threadPool.Stop();
     }
     
    -// Test 8, submit task when all threads are busy and then stop the pool
    +// Submit task when all threads are busy and then stop the pool
     BOOST_AUTO_TEST_CASE(task_submitted_while_busy_completes)
     {
    -    ThreadPool threadPool(POOL_NAME);
    -    threadPool.Start(NUM_WORKERS_DEFAULT);
    -
    -    std::promise<void> blocker;
    -    std::shared_future<void> blocker_future(blocker.get_future());
    -    const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT);
    +    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
    +    ThreadPool threadPool{"graceful_stop"};
    +    threadPool.Start(num_workers);
     
    -    // Submit an extra task that should execute once a worker is free
    -    std::future<bool> future = threadPool.Submit([]() { return true; });
    +    std::counting_semaphore sem{0};
    +    const auto blocking_tasks{BlockWorkers(threadPool, sem, num_workers)};
     
    -    // At this point, all workers are blocked, and the extra task is queued
    +    std::future<bool> future{threadPool.Enqueue([] { return true; })};
         BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
     
    -    // Wait a short moment before unblocking the threads to mimic a concurrent shutdown
    -    std::thread thread_unblocker([&blocker]() {
    -        UninterruptibleSleep(std::chrono::milliseconds{300});
    -        blocker.set_value();
    -    });
    +    std::thread thread_unblocker{[&sem, num_workers] {
    +        UninterruptibleSleep(300ms);
    +        sem.release(num_workers);
    +    }};
     
    -    // Stop the pool while the workers are still blocked
         threadPool.Stop();
     
    -    // Expect the submitted task to complete
         BOOST_CHECK(future.get());
         thread_unblocker.join();
    -
    -    // Obviously all the previously blocking tasks should be completed at this point too
         WAIT_FOR(blocking_tasks);
    -
    -    // Pool should be stopped and no workers remaining
         BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
     }
     
    -// Test 9, more workers than available cores (congestion test)
    +// More workers than available cores (congestion test)
     BOOST_AUTO_TEST_CASE(congestion_more_workers_than_cores)
     {
    -    ThreadPool threadPool(POOL_NAME);
    -    threadPool.Start(std::max(1, GetNumCores() * 2)); // Oversubscribe by 2×
    -
    -    int num_tasks = 200;
    -    std::atomic<int> counter{0};
    -
    -    std::vector<std::future<void>> futures;
    -    futures.reserve(num_tasks);
    -    for (int i = 0; i < num_tasks; i++) {
    -        futures.emplace_back(threadPool.Submit([&counter] {
    -            counter.fetch_add(1, std::memory_order_relaxed);
    -        }));
    +    const auto oversubscribe_factor{2 + m_rng.randrange<int>(3)};
    +    ThreadPool threadPool{"congestion"};
    +    threadPool.Start(std::max(1, GetNumCores() * oversubscribe_factor));
    +
    +    const auto num_tasks{100 + m_rng.randrange<size_t>(200)};
    +    std::atomic_size_t counter{0};
    +
    +    std::vector<std::future<void>> futures(num_tasks);
    +    for (auto& f : futures) {
    +        f = threadPool.Enqueue([&counter] { counter.fetch_add(1, std::memory_order_relaxed); });
         }
     
         WAIT_FOR(futures);
    -    BOOST_CHECK_EQUAL(counter.load(), num_tasks);
    +    BOOST_CHECK_EQUAL(counter.load(std::memory_order_relaxed), num_tasks);
     }
     
    -// Test 10, Interrupt() prevents further submissions
    +// Interrupt() prevents further submissions
     BOOST_AUTO_TEST_CASE(interrupt_blocks_new_submissions)
     {
    -    ThreadPool threadPool(POOL_NAME);
    -    threadPool.Start(NUM_WORKERS_DEFAULT);
    +    const auto num_workers{3 + m_rng.randrange<size_t>(5)};
    +    ThreadPool threadPool{"interrupt"};
    +    threadPool.Start(num_workers);
         threadPool.Interrupt();
    -    BOOST_CHECK_EXCEPTION(threadPool.Submit([]{}), std::runtime_error, [&](const std::runtime_error& e) {
    -        BOOST_CHECK_EQUAL(e.what(), "No active workers; cannot accept new tasks");
    -        return true;
    -    });
    +    BOOST_CHECK_EXCEPTION((void)threadPool.Enqueue([] {}), std::runtime_error, HasReason{"No active workers"});
     }
     
     BOOST_AUTO_TEST_SUITE_END()
    diff --git a/src/util/threadpool.h b/src/util/threadpool.h
    index b489e34c2f..f224c81b55 100644
    --- a/src/util/threadpool.h
    +++ b/src/util/threadpool.h
    @@ -8,19 +8,13 @@
     #include <sync.h>
     #include <tinyformat.h>
     #include <util/check.h>
    -#include <util/string.h>
     #include <util/thread.h>
    -#include <util/threadinterrupt.h>
     
    -#include <algorithm>
    -#include <atomic>
    -#include <condition_variable>
    -#include <cstddef>
    -#include <functional>
    +#include <deque>
     #include <future>
    -#include <memory>
    -#include <queue>
    +#include <semaphore>
     #include <stdexcept>
    +#include <string>
     #include <thread>
     #include <utility>
     #include <vector>
    @@ -46,53 +40,26 @@
      */
     class ThreadPool
     {
    -private:
         std::string m_name;
         Mutex m_mutex;
    -    std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
    -    std::condition_variable m_cv;
    -    // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
    -    // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
    -    // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
    +    std::deque<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
         bool m_interrupt GUARDED_BY(m_mutex){false};
         std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
     
    +    std::counting_semaphore<> m_sem{0};
    +
         void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
         {
    -        WAIT_LOCK(m_mutex, wait_lock);
             for (;;) {
    -            std::packaged_task<void()> task;
    -            {
    -                // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
    -                if (!m_interrupt && m_work_queue.empty()) {
    -                    // Block until the pool is interrupted or a task is available.
    -                    m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
    -                }
    -
    -                // If stopped and no work left, exit worker
    -                if (m_interrupt && m_work_queue.empty()) {
    -                    return;
    -                }
    -
    -                task = std::move(m_work_queue.front());
    -                m_work_queue.pop();
    -            }
    -
    -            {
    -                // Execute the task without the lock
    -                REVERSE_LOCK(wait_lock, m_mutex);
    -                task();
    -            }
    +            m_sem.acquire();
    +            if (!ProcessTask()) return;
             }
         }
     
     public:
         explicit ThreadPool(const std::string& name) : m_name(name) {}
     
    -    ~ThreadPool()
    -    {
    -        Stop(); // In case it hasn't been stopped.
    -    }
    +    ~ThreadPool() { Stop(); }
     
         /**
          * [@brief](/bitcoin-bitcoin/contributor/brief/) Start worker threads.
    @@ -109,7 +76,6 @@ public:
             if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
             m_interrupt = false; // Reset
     
    -        // Create workers
             m_workers.reserve(num_workers);
             for (int i = 0; i < num_workers; i++) {
                 m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
    @@ -130,14 +96,15 @@ public:
             std::vector<std::thread> threads_to_join;
             {
                 LOCK(m_mutex);
    -            // Ensure 'Stop()' isn't called from any worker thread to avoid deadlocks
    -            auto id = std::this_thread::get_id();
    +            // Ensure `Stop()` isn't called from any worker thread to avoid deadlocks
    +            const auto id{std::this_thread::get_id()};
                 for (const auto& worker : m_workers) assert(worker.get_id() != id);
    -            // Early shutdown to return right away on any concurrent 'Submit()' call
    +            // Early shutdown to return right away on any concurrent `Submit()` call
                 m_interrupt = true;
                 threads_to_join.swap(m_workers);
             }
    -        m_cv.notify_all();
    +        m_sem.release(threads_to_join.size());
    +
             for (auto& worker : threads_to_join) worker.join();
             // Since we currently wait for tasks completion, sanity-check empty queue
             WITH_LOCK(m_mutex, Assume(m_work_queue.empty()));
    @@ -147,13 +114,13 @@ public:
         /**
          * [@brief](/bitcoin-bitcoin/contributor/brief/) Enqueues a new task for asynchronous execution.
          *
    -     * Returns a `std::future` that provides the task’s result or propagates
    +     * Returns a `std::future` that provides the task's result or propagates
          * any exception it throws.
          * Note: Ignoring the returned future requires guarding the task against
          * uncaught exceptions, as they would otherwise be silently discarded.
          */
    -    template <class F> EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    -    auto Submit(F&& fn)
    +    template <class F>
    +    [[nodiscard]] EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) auto Enqueue(F&& fn)
         {
             std::packaged_task task{std::forward<F>(fn)};
             auto future{task.get_future()};
    @@ -162,34 +129,34 @@ public:
                 if (m_interrupt || m_workers.empty()) {
                     throw std::runtime_error("No active workers; cannot accept new tasks");
                 }
    -            m_work_queue.emplace(std::move(task));
    +            m_work_queue.emplace_back(std::move(task));
             }
    -        m_cv.notify_one();
    +        m_sem.release();
             return future;
         }
     
         /**
          * [@brief](/bitcoin-bitcoin/contributor/brief/) Execute a single queued task synchronously.
          * Removes one task from the queue and executes it on the calling thread.
    +     * [@return](/bitcoin-bitcoin/contributor/return/) true if a task was executed, false if queue was empty
          */
    -    void ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    +    bool ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
         {
             std::packaged_task<void()> task;
             {
                 LOCK(m_mutex);
    -            if (m_work_queue.empty()) return;
    -
    -            // Pop the task
    +            if (m_work_queue.empty()) return false;
                 task = std::move(m_work_queue.front());
    -            m_work_queue.pop();
    +            m_work_queue.pop_front();
             }
             task();
    +        return true;
         }
     
         void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
         {
             WITH_LOCK(m_mutex, m_interrupt = true);
    -        m_cv.notify_all();
    +        m_sem.release(WorkersCount());
         }
     
         size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    
    

    </details>

  95. DrahtBot added the label Needs rebase on Dec 2, 2025
  96. marcofleon commented at 7:39 PM on December 3, 2025: contributor

    I don’t think this is an issue. You’re just massively oversubscribing the CPU and lowering the timeout to the point where all the context switching triggers it. Switching to notify_all() only forces all workers awake on every submission, which masks the OS scheduler starvation you get in this kind of extreme setup.

    I've been playing with the fuzz test and found that this fixes it for me:

    ThreadPool g_pool{"fuzz"};
    size_t g_num_workers = 3;
    std::atomic<bool> g_pool_started{false};
    
    static void setup_threadpool_test()
    {
        LogInstance().DisableLogging();
    
    }
    
    FUZZ_TARGET(threadpool, .init = setup_threadpool_test)
    {
        if (!g_pool_started.exchange(true)) {
            g_pool.Start(g_num_workers);
        }
        
        FuzzedDataProvider fuzzed_data_provider(buffer.data(), buffer.size());
    

    Oversubscribed by a lot and left it running for a long time with -timeout=10 and it's good to go, no timeouts. This is with notify_one as well.

    There's two changes here. The first is to start the thread pool inside of the first iteration. This fixes a problem I was having with the LibAFL version of libfuzzer. It would hang immediately because LibAFL calls fork() after calling the init function, and so the child processes would end up only having one thread active (the main calling thread) and no workers.

    The second change is just switching from call_once to a global that we check every iteration. This fixes the original non-reproducible timeout issue I had with libFuzzer. I don't really know why, but I'm guessing call_once is more complicated than the simpler check.

    Overall, I've learned that multi core fuzzing and multiple threads in a fuzz test don't mix well it seems. It leads to issues that are hard to reproduce and debug. For the purposes of testing the threadpool under heavy load, I think it's fine to use work arounds like this to get it done.

  97. l0rinc commented at 10:41 AM on December 9, 2025: contributor

    @furszy, are you still working on this?

  98. furszy commented at 2:48 PM on December 9, 2025: member

    @furszy, are you still working on this?

    yes.

  99. ismaelsadeeq approved
  100. ismaelsadeeq commented at 3:35 PM on December 18, 2025: member

    Code review ACK 2de0ce5cd85e1b99e318883964df318ffb615fe4 👾

  101. DrahtBot requested review from pinheadmz on Dec 18, 2025
  102. DrahtBot requested review from sedited on Dec 18, 2025
  103. DrahtBot requested review from Eunovo on Dec 18, 2025
  104. ismaelsadeeq commented at 3:58 PM on December 18, 2025: member
  105. furszy force-pushed on Dec 28, 2025
  106. furszy commented at 6:12 PM on December 28, 2025: member

    thanks @ismaelsadeeq, rebased. Will tackle the remaining comments and update accordingly soon.

  107. DrahtBot removed the label Needs rebase on Dec 28, 2025
  108. DrahtBot added the label Needs rebase on Jan 21, 2026
  109. furszy commented at 2:05 AM on January 28, 2026: member

    working on the update now, almost there.

  110. http-server: guard against crashes from unhandled exceptions
    Currently, if an exception is thrown at the top-level HTTP request
    handler (prior to invoking the command), the program crashes.
    
    Ideally, each handler should catch all exceptions internally and
    be responsible for sanitizing them and crafting the client response.
    This is because only the handler knows the correct response format,
    which differs per server type. However, because this cannot always
    be guaranteed, it is safer to also catch exceptions in the top-level
    server code, log the unexpected error, and disconnect the socket.
    
    This both guards against crashes caused by uncaught exceptions and
    prevents the client from hanging indefinitely while waiting for a
    response that will never arrive.
    
    The following diff can be used to trigger the crash in master
    (just run single node functional tests like feature_shutdown.py):
    ```
    diff --git a/src/httprpc.cpp b/src/httprpc.cpp
    --- a/src/httprpc.cpp
    +++ b/src/httprpc.cpp
    @@ -103,6 +103,9 @@
    
     static bool HTTPReq_JSONRPC(const std::any& context, HTTPRequest* req)
     {
    +    static int i = 0; // skip initial requests as they are used in the RPC warmup phase.
    +    if (i++ > 3) throw std::runtime_error("error from json rpc handler");
    +
         // JSONRPC handles only POST
         if (req->GetRequestMethod() != HTTPRequest::POST) {
             req->WriteReply(HTTP_BAD_METHOD, "JSONRPC server handles only POST requests");
    
    ```
    
    Note:
    This leaves a TODO in the code because error responses should eventually
    be specialized per server type. REST clients expect plain text responses,
    while JSON-RPC clients expect a JSON error object.
    The TODO is there because this is not consistently enforced everywhere
    in the current codebase, and we should tackle them all at once.
    45930a7941
  111. tests: log node JSON-RPC errors during test setup
    Currently, if the node replies to any command with an error during
    the test framework's setup(), we log the generic and not really useful
    "Unexpected exception" from the BaseException catch, with no further
    information.
    This isn't helpful for diagnosing the issue. Fix it by explicitly handling
    JSONRPCException and logging the response error message and http status
    code.
    6354b4fd7f
  112. furszy force-pushed on Jan 29, 2026
  113. furszy commented at 12:34 AM on January 29, 2026: member

    Updated per feedback. Thanks everyone!

    Unit tests updated with most of l0rinc's feedback. And updated the fuzz test to accommodate to #33689 (comment) which seems very deep. Thanks marcofleon for the thorough investigation!

    Also, just to highlight the main change of the last push:

    On master, I was able to crash the server by throwing an exception from the top-level handler, but in this branch, the same scenario no longer causes a crash, which is great. Yet, to make the fix and the scenario explicit, I separated it into its own standalone commit. The diff to reproduce it on master is also in the commit description.

  114. DrahtBot removed the label Needs rebase on Jan 29, 2026
  115. in src/util/threadpool.h:38 in 59f277a407
      33 | + *   workers to join, including the current one.
      34 | + *
      35 | + * - `Submit()` can be called from any thread, including workers. It safely
      36 | + *   enqueues new work for execution as long as the pool has active workers.
      37 | + *
      38 | + * - `Stop()` prevents further task submission and wakes all worker threads.
    


    Eunovo commented at 6:43 PM on January 30, 2026:

    furszy commented at 9:16 PM on January 30, 2026:

    Done.

  116. in src/test/threadpool_tests.cpp:297 in 59f277a407 outdated
     292 | +// Test 10, Interrupt() prevents further submissions
     293 | +BOOST_AUTO_TEST_CASE(interrupt_blocks_new_submissions)
     294 | +{
     295 | +    ThreadPool threadPool(POOL_NAME);
     296 | +    threadPool.Start(NUM_WORKERS_DEFAULT);
     297 | +    threadPool.Interrupt();
    


    Eunovo commented at 6:45 PM on January 30, 2026:

    https://github.com/bitcoin/bitcoin/pull/33689/changes/59f277a407971ceaedcba8641a58f3a3c479085e:

    It seems that Interrupt() is safe to call from a worker thread. If it is intended to be used this way, consider expanding this test to also call Interrupt() from a worker thread.


    furszy commented at 9:16 PM on January 30, 2026:

    Done.

  117. Eunovo commented at 8:15 PM on January 30, 2026: contributor
  118. furszy force-pushed on Jan 30, 2026
  119. util: introduce general purpose thread pool c528dd5f8c
  120. fuzz: add test case for threadpool
    Co-authored-by: furszy <matiasfurszyfer@protonmail.com>
    c323f882ed
  121. http: replace WorkQueue and threads handling for ThreadPool
    Replace the HTTP server's WorkQueue implementation and single threads
    handling code with ThreadPool for processing HTTP requests. The
    ThreadPool class encapsulates all this functionality on a reusable
    class, properly unit and fuzz tested (the previous code was not
    unit nor fuzz tested at all).
    
    This cleanly separates responsibilities:
    The HTTP server now focuses solely on receiving and dispatching requests,
    while ThreadPool handles concurrency, queuing, and execution.
    It simplifies init, shutdown and requests tracking.
    
    This also allows us to experiment with further performance improvements at
    the task queuing and execution level, such as a lock-free structure, task
    prioritization or any other performance improvement in the future, without
    having to deal with HTTP code that lives on a different layer.
    38fd85c676
  122. furszy force-pushed on Jan 30, 2026
  123. furszy commented at 9:17 PM on January 30, 2026: member

    Updated per feedback. Thanks Eunovo.

  124. DrahtBot added the label CI failed on Jan 30, 2026
  125. DrahtBot removed the label CI failed on Jan 30, 2026
  126. Eunovo commented at 1:06 PM on February 2, 2026: contributor
  127. DrahtBot requested review from ismaelsadeeq on Feb 2, 2026
  128. sedited approved
  129. sedited commented at 11:41 AM on February 8, 2026: contributor

    Re-ACK 38fd85c676a072ebf256e806beda9d7533790baa

  130. in src/httpserver.cpp:345 in 45930a7941
     341 | +            }
     342 | +            // Reply so the client doesn't hang waiting for the response.
     343 | +            req->WriteHeader("Connection", "close");
     344 | +            // TODO: Implement specific error formatting for the REST and JSON-RPC servers responses.
     345 | +            req->WriteReply(HTTP_INTERNAL_SERVER_ERROR, err_msg);
     346 | +            return false;
    


    pinheadmz commented at 4:39 PM on February 9, 2026:

    45930a79412dc45f9d391cd7689d029fa4f0189e

    nit: HTTPRequestHandler is defined as returning a bool but I don't think we ever use its return value, can probably be void. No change needed at this time, I just noticed this code either returns a hard coded bool, or the value from some arbitrary function, whose type is not so easy to see on the same page.


    furszy commented at 7:52 PM on February 10, 2026:

    Yeah, the first commit fixes the possible crash using the existing HTTPRequestHandler, which returns a bool. In the removal commit, when HTTPRequestHandler gets dropped, the bool return value is dropped as well.

  131. in test/functional/test_framework/test_framework.py:151 in 6354b4fd7f
     146 | @@ -147,6 +147,9 @@ def main(self):
     147 |          except subprocess.CalledProcessError as e:
     148 |              self.log.exception(f"Called Process failed with stdout='{e.stdout}'; stderr='{e.stderr}';")
     149 |              self.success = TestStatus.FAILED
     150 | +        except JSONRPCException as e:
     151 | +            self.log.exception(f"Failure during setup: error={e.error}, http_status={e.http_status}")
    


    pinheadmz commented at 5:50 PM on February 9, 2026:

    6354b4fd7fe819eb13274b212e426a7d10ca75d3

    This is good but could even be improved (doesn't have to be here). For example with the diff below and the suggested diff in the first commit message, you can actually get all the information logged out:

    current:

    test_framework.authproxy.JSONRPCException: non-JSON HTTP response with '500 Internal Server Error' from server (-342)

    with diff below:

    test_framework.authproxy.JSONRPCException: non-JSON HTTP response with '500 Internal Server Error' from server: error from json rpc handler (-342)

    diff --git a/test/functional/test_framework/authproxy.py b/test/functional/test_framework/authproxy.py
    index 9b2fc0f7f9..051423928d 100644
    --- a/test/functional/test_framework/authproxy.py
    +++ b/test/functional/test_framework/authproxy.py
    @@ -195,7 +195,7 @@ class AuthServiceProxy():
             content_type = http_response.getheader('Content-Type')
             if content_type != 'application/json':
                 raise JSONRPCException(
    -                {'code': -342, 'message': 'non-JSON HTTP response with \'%i %s\' from server' % (http_response.status, http_response.reason)},
    +                {'code': -342, 'message': 'non-JSON HTTP response with \'%i %s\' from server: %s' % (http_response.status, http_response.reason, http_response.read().decode())},
                     http_response.status)
     
             data = http_response.read()
    

    maflcko commented at 4:15 PM on February 12, 2026:

    The commit message seems wrong, because Unexpected exception will also log all details that are available.

    So this seems like the wrong fix. The correct fix would be to add any missing details to the exception string. Fixed in https://github.com/bitcoin/bitcoin/pull/34575

  132. in src/util/threadpool.h:108 in c528dd5f8c
     103 | +    void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     104 | +    {
     105 | +        assert(num_workers > 0);
     106 | +        LOCK(m_mutex);
     107 | +        if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
     108 | +        m_interrupt = false; // Reset
    


    pinheadmz commented at 6:19 PM on February 9, 2026:

    c528dd5f8ccc3955b00bdba869f0a774efa97fe1

    What scenario would we stop and then re-start an instantiated worker pool?


    furszy commented at 8:19 PM on February 10, 2026:

    c528dd5

    What scenario would we stop and then re-start an instantiated worker pool?

    I was thinking about single-shot tasks that require a large number of threads, where keeping those threads alive for the entire lifetime of the software would be unnecessary overhead. E.g. RPC commands such as scanblocks, rescanblockchain, dumputxoset, etc.

    Right now, the thread pool code is simple enough and not very configurable, but we will likely add more features and tuning options over time. Those settings will likely only be available during the node's initialization, so we need to construct the pool early with them (note: we really don't want to go back to the global ArgsManager dependency). And, at the same time, we don’t want to keep a large number of threads running when there's no work for them.

  133. in src/util/threadpool.h:155 in c528dd5f8c
     150 | +     * any exception it throws.
     151 | +     * Note: Ignoring the returned future requires guarding the task against
     152 | +     * uncaught exceptions, as they would otherwise be silently discarded.
     153 | +     */
     154 | +    template <class F> [[nodiscard]] EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     155 | +    auto Submit(F&& fn)
    


    pinheadmz commented at 6:45 PM on February 9, 2026:

    c528dd5f8ccc3955b00bdba869f0a774efa97fe1

    Just curious about your style choice here, I'd've written like this

    template <class F>
    [[nodiscard]] auto Submit(F&& fn) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    

    furszy commented at 8:24 PM on February 10, 2026:

    I'm pretty sure I had it like that, and there was a suggestion to change it to the current form. Since it was merely an aesthetic change and there were so many comments, I didn't put much thought into it.

  134. in src/test/threadpool_tests.cpp:77 in c528dd5f8c
      72 | +// Test 0, submit task to a non-started pool
      73 | +BOOST_AUTO_TEST_CASE(submit_task_before_start_fails)
      74 | +{
      75 | +    ThreadPool threadPool(POOL_NAME);
      76 | +    BOOST_CHECK_EXCEPTION((void)threadPool.Submit([]{ return false; }), std::runtime_error, [&](const std::runtime_error& e) {
      77 | +        BOOST_CHECK_EQUAL(e.what(), "No active workers; cannot accept new tasks");
    


    pinheadmz commented at 12:41 AM on February 10, 2026:

    c528dd5f8ccc3955b00bdba869f0a774efa97fe1

    Why not use HasReason()?

    #include <test/util/setup_common.h>
    ...
        BOOST_CHECK_EXCEPTION(
            (void)threadPool.Submit([]{ return false; }),
            std::runtime_error,
            HasReason{"No active workers; cannot accept new tasks"
        });
    

    Drahtbot should be complaining about this ;-) see #34242 (review)

    and in task_exception_propagates_to_future and interrupt_blocks_new_submissions below


    furszy commented at 8:34 PM on February 10, 2026:

    I think I answered this in a previous comment.

    I'm happy using HasReason, but not adding a dependency to the entire unit test framework machinery (<test/util/setup_common.h>). The rationale is that the thread pool lives at a lower level and shouldn't get access to the chainstate, feerate, or any other higher-level concepts. It is also easier for anyone who wants to experiment with the code to be able to pull the thread pool alone into a separate project. This last sentence comes from experience.. it would have been so nice to have constructed the script interpreter and other primitives in this way, not so tied to the whole project machinery.. we would have found many issues and improvements way faster.

    So, in short, I agree with using it. I just think we should first move the HasReason class to a general utility file, to beak the general framework dependency. It could be done in a quick follow-up, happy to do it or ACK it if someone wants to tackle it too.

  135. in src/httpserver.cpp:279 in 38fd85c676
     283 | -            (void)item.release(); /* if true, queue took ownership */
     284 | -        } else {
     285 | -            LogWarning("Request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting");
     286 | -            item->req->WriteReply(HTTP_SERVICE_UNAVAILABLE, "Work queue depth exceeded");
     287 | -        }
     288 | +        [[maybe_unused]] auto _{g_threadpool_http.Submit(std::move(item))};
    


    pinheadmz commented at 5:29 PM on February 10, 2026:

    38fd85c676a072ebf256e806beda9d7533790baa

    Related to my comment on the first commit about how these workers dont return any thing. So now in the above try/catch wrapper the return values are removed which makes sense to me.

    My guess is that for this use case of threadpool, we don't need to examine the return value of the future from the task, but other uses of threadpool will not discard it?


    furszy commented at 8:39 PM on February 10, 2026:

    My guess is that for this use case of threadpool, we don't need to examine the return value of the future from the task, but other uses of threadpool will not discard it?

    yeah, any parallelizable user single-shot locking command would wait for it to finish. The examples I have in my head are RPC-related like scanblocks, rescanblockchain, dumputxoset, etc. But we could surely find more just by looking at the REST server or the GUI.

  136. pinheadmz approved
  137. pinheadmz commented at 6:23 PM on February 10, 2026: member

    ACK 38fd85c676a072ebf256e806beda9d7533790baa

    Re-reviewed all commits since a lot has changed since last review. Built and tested on arm64/macos and x86/debian.

    Tried breaking it with stuff like -rpcthreads=9000 (expect init failure). Also tried -rpcthreads=1 with RPC-pummeling scripts.

    I roughly rebased #32061 on this branch and tested. Conflicts were extremely minimal.

    I'm excited to use this feature to parallelize other tasks in bitcoin!

    Several questions below, mostly just about style. No blockers there. RFM 🚀

    
     /home/zip/bitcoin/build/bin/bitcoind -regtest -rpcthreads=1000
     ├─ b-scheduler
     ├─ b-http_pool_0
     ├─ b-http_pool_1
     ├─ b-http_pool_2
     ├─ b-http_pool_3
     ├─ b-http_pool_4
     ├─ b-http_pool_5
    ...
    

    <details><summary>Show Signature</summary>

    -----BEGIN PGP SIGNED MESSAGE-----
    Hash: SHA256
    
    ACK 38fd85c676a072ebf256e806beda9d7533790baa
    -----BEGIN PGP SIGNATURE-----
    
    iQIzBAEBCAAdFiEE5hdzzW4BBA4vG9eM5+KYS2KJyToFAmmLdj0ACgkQ5+KYS2KJ
    yTpkNQ//Z8464/u6+Zw6tfIBEtcNCWLi2m3pk2xnJE8q2HIzk2GT6NUzu5f6yRsV
    fkB1AJqxwO9aKf+tmF85c+f6PoKhjyBDeA1nFa39ms8Madkn32W3su6fPcfqjeCy
    ROS3T18QueM7MkH4qezCWZ4TkYYeZu/yEMQdfZzhLJF14DX/8HdL+/fanqyB0CHc
    V8mckYkX9vszKmQz4HZqSSxL9RUvv/ipbLKSU7BgO+EOWHkLAHNr5V27UwySM/9s
    uDDDQGDCsnQqtkajRGE+gg230Xq70Fwp12+japQ5yvpn17HjkkDnCsvoLK01BHAO
    WHASMt6m+RSZGlIwv7WXd0vM2gqIAcrJwkS3VzoDlkVI0fLdcsyacPn2zKL0cOBg
    j7OjaW8KV+MjS1KvRuJR0CAE5akOPTgO9aCi5Aly+LZyyLPoDoQOZYTGDq4TJB3n
    REgN6LuvjBoctWQGYM3HTTMDTchQqd+WWTaF7aKoCM8VTsffN8d00c/Y4ADIQ4Fd
    xRv6jC0QBw434Nb023h15OHNYVgMNr/xqulfGizZUzXOcrcfEuBNYTyK7zBi4njm
    gjEmI2rEoIrCzrgRcwinAF0XpVMca9YyGao4bBtpYY5lmc+LeU7lz2ac1wk3kLGf
    SXfHL6jqetA1QWSVuEZ7ZuFJJSxRRqwjuNOl0YtoJ1kHTH8tiSc=
    =dFzb
    -----END PGP SIGNATURE-----
    

    pinheadmz's public key is on openpgp.org

    </details>

  138. sedited merged this on Feb 11, 2026
  139. sedited closed this on Feb 11, 2026

  140. furszy deleted the branch on Feb 11, 2026
  141. fanquake commented at 11:35 AM on February 12, 2026: member

    This seems to have introduced a bug: #34573.

  142. ismaelsadeeq commented at 3:47 PM on February 16, 2026: member

    post merge ACK 38fd85c676a072ebf256e806beda9d7533790baa 🧑‍🏭


github-metadata-mirror

This is a metadata mirror of the GitHub repository bitcoin/bitcoin. This site is not affiliated with GitHub. Content is generated from a GitHub metadata backup.
generated: 2026-04-28 03:12 UTC

This site is hosted by @0xB10C
More mirrored repositories can be found on mirror.b10c.me