http: replace WorkQueue and single threads handling for ThreadPool #33689

pull furszy wants to merge 3 commits into bitcoin:master from furszy:2025_threadpool_http_server changing 7 files +599 −119
  1. furszy commented at 2:36 pm on October 23, 2025: member

    This has been a recent discovery; the general thread pool class created for #26966, cleanly integrates into the HTTP server. It simplifies init, shutdown and requests execution logic. Replacing code that was never unit tested for code that is properly unit and fuzz tested. Although our functional test framework extensively uses this RPC interface (that’s how we’ve been ensuring its correct behavior so far - which is not the best).

    This clearly separates the responsibilities: The HTTP server now focuses solely on receiving and dispatching requests, while ThreadPool handles concurrency, queuing, and execution.

    This will also allows us to experiment with further performance improvements at the task queuing and execution level, such as a lock-free structure or task prioritization or any other implementation detail like coroutines in the future, without having to deal with HTTP code that lives on a different layer.

    Note: The rationale behind introducing the ThreadPool first is to be able to easily cherry-pick it across different working paths. Some of the ones that are benefited from it are #26966 for the parallelization of the indexes initial sync, #31132 for the parallelization of the inputs fetching procedure, #32061 for the libevent replacement, the kernel API #30595 (https://github.com/bitcoin/bitcoin/pull/30595#discussion_r2413702370) to avoid blocking validation among others use cases not publicly available.

    Note 2: I could have created a wrapper around the existing code and replaced the WorkQueue in a subsequent commit, but it didn’t seem worth the extra commits and review effort. The ThreadPool implements essentially the same functionality in a more modern and cleaner way.

  2. DrahtBot added the label RPC/REST/ZMQ on Oct 23, 2025
  3. DrahtBot commented at 2:36 pm on October 23, 2025: contributor

    The following sections might be updated with supplementary metadata relevant to reviewers and maintainers.

    Code Coverage & Benchmarks

    For details see: https://corecheck.dev/bitcoin/bitcoin/pulls/33689.

    Reviews

    See the guideline for information on the review process.

    Type Reviewers
    Concept ACK ismaelsadeeq, Eunovo
    Stale ACK pinheadmz, TheCharlatan

    If your review is incorrectly listed, please react with 👎 to this comment and the bot will ignore it on the next update.

    Conflicts

    Reviewers, this pull request conflicts with the following ones:

    • #29641 (scripted-diff: Use LogInfo over LogPrintf [WIP, NOMERGE, DRAFT] by maflcko)
    • #27731 (Prevent file descriptor exhaustion from too many RPC calls by fjahr)

    If you consider this pull request important, please also help to review the conflicting pull requests. Ideally, start with the one that should be merged first.

  4. pinheadmz commented at 2:38 pm on October 23, 2025: member
    concept ACK :-) will be reviewing this
  5. laanwj requested review from laanwj on Oct 23, 2025
  6. laanwj commented at 2:50 pm on October 23, 2025: member
    Adding myself as i wrote the original shitty code.
  7. Raimo33 commented at 9:54 am on October 24, 2025: contributor
    Can’t we use an already existing open source library instead of reinventing the wheel?
  8. in src/test/fuzz/threadpool.cpp:51 in bb71b0e3bf outdated
    48+// Global to verify we always have the same number of threads.
    49+size_t g_num_workers = 3;
    50+
    51+static void setup_threadpool_test()
    52+{
    53+    // Disable logging entirely. It seems to cause memory leaks.
    


    l0rinc commented at 4:31 pm on October 24, 2025:
    Can we investigate that?

    furszy commented at 8:22 pm on October 30, 2025:
    It is a known issue. The pcp fuzz test does it too for the same reason (in an undocumented manner). I only document it properly so we don’t forget this exists. Feel free to investigate it in a separate issue/PR. I’m not planning to do it. It is not an issue of the code introduced in this PR.

    l0rinc commented at 1:22 pm on October 31, 2025:
    k, thanks, please resolve the comment
  9. furszy commented at 7:17 pm on October 24, 2025: member

    Can’t we use an already existing open source library instead of reinventing the wheel?

    That’s a good question. It’s usually where we all start. Generally, the project consensus is to avoid introducing new external dependencies (unless they’re maintained by us) to minimize potential attack vectors. This doesn’t mean we should reinvent everything, just that we need to be very careful about what we decide to include.

    That being said, for the changes introduced in this PR, can argue that we’re encapsulating, documenting, and unit + fuzz testing code that wasn’t covered before, while also improving separation of responsibilities. We’re not adding anything more complex or that behaves radically differently from what we currently have. The nice property of this PR is that it will let us experiment with more complex approaches in the future without having to deal with application-specific code (like the HTTP server code). This also includes learning from other open source libraries for sure.

  10. in src/test/threadpool_tests.cpp:1 in c219b93c3b outdated
    0@@ -0,0 +1,267 @@
    1+// Copyright (c) 2024-present The Bitcoin Core developers
    


    pinheadmz commented at 7:55 pm on October 27, 2025:

    c219b93c3b043de202bdf3c65b433fd17af2da89

    nit: don’t bother with the year any more… (applies to all new files)


    furszy commented at 3:55 pm on October 29, 2025:
    Sure. Done as suggested.
  11. TheCharlatan commented at 10:17 pm on October 27, 2025: contributor
    Approach ACK
  12. in src/util/threadpool.h:53 in c219b93c3b outdated
    48+private:
    49+    std::string m_name;
    50+    Mutex m_mutex;
    51+    std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
    52+    std::condition_variable m_cv;
    53+    // Note: m_interrupt must be modified while holding the same mutex used by threads waiting on the condition variable.
    


    andrewtoth commented at 2:38 pm on October 28, 2025:
    Isn’t this comment already implied by GUARDED_BY(m_mutex)? Also it’s written by the control thread and read by worker threads, so of course it must only be modified while holding the mutex. This comment doesn’t add anything IMO.

    furszy commented at 3:18 pm on October 28, 2025:

    Isn’t this comment already implied by GUARDED_BY(m_mutex)? Also it’s written by the control thread and read by worker threads, so of course it must only be modified while holding the mutex. This comment doesn’t add anything IMO.

    What I meant there is that the variable shouldn’t be an atomic bool alone (in case someone propose to change it in the future). It must share the same mutex as the condition variable; otherwise, workers might not see the update. This is stated in the std::condition_variable ref. I mention it because it is very tempting to avoid locking the mutex during task submission in this way, and might seen like a harmless optimization, but it turns out to cause very subtle issues.

    Maybe you have a better way to state this? Happy to change it if it is not being understood as supposed.


    andrewtoth commented at 4:04 pm on October 28, 2025:

    I see. Then perhaps more directly:

    0    // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
    

    furszy commented at 7:07 pm on October 28, 2025:
    sounds good, taken. thanks!
  13. in src/util/threadpool.h:182 in c219b93c3b outdated
    177+            m_work_queue.pop();
    178+        }
    179+        task();
    180+    }
    181+
    182+    void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    


    andrewtoth commented at 2:51 pm on October 28, 2025:
    I’m unclear what the purpose of this function is. It interrupts the workers, like Stop, but doesn’t join them or wait for them to finish. If the idea is to clear the workqueue and park the threads without being able to add more tasks, it does not accomplish that since the threads will exit their loops as well and not be parked on the cv.

    furszy commented at 3:35 pm on October 28, 2025:

    I’m unclear what the purpose of this function is. It interrupts the workers, like Stop, but doesn’t join them or wait for them to finish. If the idea is to clear the workqueue and park the threads without being able to add more tasks, it does not accomplish that since the threads will exit their loops as well and not be parked on the cv.

    Actually, the goal is to keep the current shutdown semantics unchanged. The process already follows two stages: first, we stop accepting new requests and events (e.g. unregistering the libevent callback to stop incoming requests); then, once no new work can be queued, we call Stop() and wait for the remaining tasks to finish before tearing down the objects.

    Interrupt() belongs to the first stage. It basically block new task submissions during shutdown to avoid queue growth. It’s not intended to clear the queue or park the threads. It is there just to avoid pilling up new tasks during shutdown.


    andrewtoth commented at 3:52 pm on October 28, 2025:
    I see, so the goal is to stop the thread pool but in a non-blocking way. Could the same be done via a bool parameter to Stop to skip joining? Then Stop will be called without that bool set in the destructor to join the threads, if this behavior is desired (like it is in the http server). Also, could we modify Start in that case to not assert on number of threads if m_interrupt is true, and instead join them if we stilll have outstanding threads?

    furszy commented at 7:01 pm on October 28, 2025:

    Could the same be done via a bool parameter to Stop to skip joining?

    If we do that, the threads wouldn’t remain in the m_workers vector anymore. They’d be swapped out on the first call. That means we wouldn’t be able to wait for them to finish later on, which would be quite bad since we’d lose track of when all requests are fulfilled before destroying the backend objects (that’s basically what joining the threads mean for us right now). And this could lead to requests accessing null pointers if we proceed with the shutdown without waiting on the threads to join, etc.

    Also, could we modify Start in that case to not assert on number of threads, and instead join them if m_interrupt is true and we still have outstanding threads?

    We could also help them process task by calling ProcessTask() if something like that happens. But I’m not sure that’s the best design. We would be integrating Stop() inside Start(), which would make callers less careful on when to call Stop(), which should be part of their code design.

    Still, I don’t think we should worry nor overthink this now. The current code starts and stops the thread pool only once during the entire app’s lifecycle.


    andrewtoth commented at 7:09 pm on October 28, 2025:

    I was thinking like this

     0diff --git a/src/util/threadpool.h b/src/util/threadpool.h
     1index 94409facd5..dc1a218abd 100644
     2--- a/src/util/threadpool.h
     3+++ b/src/util/threadpool.h
     4@@ -123,19 +123,19 @@ public:
     5      *
     6      * Must be called from a controller (non-worker) thread.
     7      */
     8-    void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     9+    void Stop(bool join_threads = true) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    10     {
    11         // Notify workers and join them.
    12         std::vector<std::thread> threads_to_join;
    13         {
    14             LOCK(m_mutex);
    15             m_interrupt = true;
    16-            threads_to_join.swap(m_workers);
    17+            if (join_threads) threads_to_join.swap(m_workers);
    18         }
    19         m_cv.notify_all();
    20         for (auto& worker : threads_to_join) worker.join();
    21         // Since we currently wait for tasks completion, sanity-check empty queue
    22-        WITH_LOCK(m_mutex, Assume(m_work_queue.empty()));
    23+         if (join_threads) WITH_LOCK(m_mutex, Assume(m_work_queue.empty()));
    24         // Note: m_interrupt is left true until next Start()
    25     }
    26 
    27@@ -179,12 +179,6 @@ public:
    28         task();
    29     }
    30 
    31-    void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    32-    {
    33-        WITH_LOCK(m_mutex, m_interrupt = true);
    34-        m_cv.notify_all();
    35-    }
    36-
    37     size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    38     {
    39         return WITH_LOCK(m_mutex, return m_work_queue.size());
    

    Still, I don’t think we should worry nor overthink this now. The current code starts and stops the thread pool only once during the entire app’s lifecycle.

    Err, then why not have threads start during construction? Otherwise there’s no need for Start.


    furszy commented at 7:15 pm on October 28, 2025:
    You know, we could call ProcessTask() during Stop() too, which would be a strict improvement over the current locking-wait behavior in master (since we would shut down faster by actively processing pending requests on the shutdown thread as well). This is something that hasn’t been possible until now. Still, this is material for a follow-up; would say that it is better to keep this PR as contained as possible so all other PRs and working paths that depend on this structure can actually make use of it.

    andrewtoth commented at 7:22 pm on October 28, 2025:
    That is a good idea, but not related to my suggestion here :). Yes, that would be a good followup. I don’t think we should have a method that would cause the node to crash if Start is called more than once. I disagree and think just calling Stop at the beginning of Start would be safer than the current implementation.

    furszy commented at 7:37 pm on October 28, 2025:

    hehe, it seems we sent a message at a similar time and missed yours.

    re Interrupt() removal suggestion:

    I find it harder to reason about the current two stages shutdown procedure if we have to call to the same Stop() method twice. At that point it would be simpler to just wait for all tasks to finish on the first call and remove the second one. But that seems suboptimal as there is no rush to wait for them at that point of the shutdown sequence.

    Also, a benefit of keeping Interrupt() is that we can build on top of it and have a faster and non-locking way of checking if the thread pool is enabled. So callers would be able to call something like pool.IsRunning() to see if they can submit tasks without have to worry locking the main mutex and slowing down the workers processing. Something like:

     0diff --git a/src/util/threadpool.h b/src/util/threadpool.h
     1--- a/src/util/threadpool.h	(revision e0ec3232daf2c311471a1da149821bed18853fcc)
     2+++ b/src/util/threadpool.h	(date 1761158485938)
     3@@ -55,6 +55,9 @@
     4     // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
     5     bool m_interrupt GUARDED_BY(m_mutex){false};
     6     std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
     7+    // Enabled only after Start and disabled early on Stop/Interrupt.
     8+    // This lets us do non-blocking 'IsRunning' checks.
     9+    std::atomic<bool> m_running{false};
    10 
    11     void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    12     {
    13@@ -109,9 +112,11 @@
    14         m_interrupt = false; // Reset
    15 
    16         // Create workers
    17         m_workers.reserve(num_workers);
    18         for (int i = 0; i < num_workers; i++) {
    19             m_workers.emplace_back(&util::TraceThread, m_name + "_pool_" + util::ToString(i), [this] { WorkerThread(); });
    20         }
    21+        m_running.store(true, std::memory_order_release);
    22     }
    23 
    24     /**
    25@@ -124,7 +129,9 @@
    26      */
    27     void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    28     {
    29-        // Notify workers and join them.
    30+        // Mark as no longer accepting new tasks
    31+        m_running.store(false, std::memory_order_release);
    32+        // Notify workers and join them
    33         std::vector<std::thread> threads_to_join;
    34         {
    35             LOCK(m_mutex);
    36@@ -147,11 +154,16 @@
    37     template<class F> EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    38     auto Submit(F&& fn)
    39     {
    40+        // Quick rejection based on Start/Stop/Interrupt before locking
    41+        if (!m_running.load(std::memory_order_acquire)) {
    42+            throw std::runtime_error("ThreadPool not running");
    43+        }
    44+
    45         std::packaged_task task{std::forward<F>(fn)};
    46         auto future{task.get_future()};
    47         {
    48             LOCK(m_mutex);
    49             if (m_workers.empty() || m_interrupt) {
    50                 throw std::runtime_error("No active workers; cannot accept new tasks");
    51             }
    52             m_work_queue.emplace(std::move(task));
    53@@ -180,7 +192,9 @@
    54 
    55     void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    56     {
    57+        m_running.store(false, std::memory_order_release);
    58         WITH_LOCK(m_mutex, m_interrupt = true);
    59         m_cv.notify_all();
    60     }
    61 
    62     size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    63@@ -192,6 +206,11 @@
    64     {
    65         return WITH_LOCK(m_mutex, return m_workers.size());
    66     }
    67+
    68+    bool IsRunning() const noexcept
    69+    {
    70+        return m_running.load(std::memory_order_acquire);
    71+    }
    72 };
    73 
    74 #endif // BITCOIN_UTIL_THREADPOOL_H
    

    (this is not needed here because we are disconnecting the http callback prior to interrupting the pool but it seems useful in general).


    furszy commented at 7:46 pm on October 28, 2025:

    I don’t think we should have a method that would cause the node to crash if Start is called more than once. I disagree and think just calling Stop at the beginning of Start would be safer than the current implementation.

    Wouldn’t you agree that if your code calls Start() twice, you have a design issue? Thread pools are typically started once, maintained for the entire app’s lifecycle, and reused across modules. Mainly because creating and destroying threads isn’t cheap. I think it’s fair to expect an exception in that case, as it would indicate poorly structured code.

    I also don’t think anyone would expect Start() to wait for all existing threads to shut down and then spawn new ones. That would be a surprising behavior to me. Why would you spawn new ones if you already have threads available?


    andrewtoth commented at 7:48 pm on October 28, 2025:
    So why have Start and Stop then? Just start threads in constructor and join in destructor? Use Interrupt to stop new tasks from being added. This follows RAII principles.

    furszy commented at 8:11 pm on October 28, 2025:

    So why have Start and Stop then? Just start threads in constructor and join in destructor? Use Interrupt to stop new tasks from being added. This follows RAII principles.

    Good question. We could easily build an RAII wrapper on top of this. The approaches aren’t conflicting; it would just be an additional layer over the current code. Yet, I think the current design smoothly integrates with our existing workflows. We already use the start, interrupt, and stop terms throughout the codebase (check init.cpp and all our components), so following that general convention makes it easier to reason about these components globally. Even without deep knowledge of the http server workflow, anyone familiar with this convention can quickly hook up new functionality.

    Also, I suspect we’ll find opportunities for improvements (or smarter shutdown behaviors) along the way that might require customizing Stop() with some inputs, so might be better not to enforce RAII too early.


    furszy commented at 9:29 pm on October 28, 2025:
    Follow-up to this convo after some great back-and-forth via DM with Andrew. Usually, RAII works best when we don’t care exactly when an object gets destroyed. In this case, though, we actually do care about that moment (or, in our terms, when it’s stopped), since it marks the point where there are no pending tasks and it’s safe to tear down the backend handlers. If we don’t enforce that explicitly, the shutdown procedure could continue while workers are still active, leading them to access null pointers, etc. This is something we can’t guarantee if we just let the object be destroyed whenever the program releases it using RAII. So even using RAII, we would be forced to destroy the object manually anyway. Which kinda defeats the purpose of it.

    l0rinc commented at 3:37 pm on October 30, 2025:
    I was also wondering what this was meant to so, especially since there isn’t any test coverage for it.
  14. in src/test/threadpool_tests.cpp:111 in c219b93c3b outdated
    106+        const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT-1, /*context=*/"test2 blocking tasks enabled");
    107+
    108+        // Now execute tasks on the single available worker
    109+        // and check that all the tasks are executed.
    110+        int num_tasks = 15;
    111+        std::atomic<int> counter = 0;
    


    pinheadmz commented at 2:58 pm on October 28, 2025:

    c219b93c3b043de202bdf3c65b433fd17af2da89

    I was wondering if there’s any way to assert that the unblocked tasks are all executing on one single remaining worker… would it be insane to use a non-atomic int here?


    furszy commented at 3:43 pm on October 29, 2025:

    c219b93

    I was wondering if there’s any way to assert that the unblocked tasks are all executing on one single remaining worker… would it be insane to use a non-atomic int here?

    hmm, even if the non-atomic int works, that doesn’t really guarantee that the increment was done in the same thread. If we want to be 100% correct, we should store the ids of the threads that processed the tasks on a synchronized structure. Then check that only one id is there.


    andrewtoth commented at 3:57 pm on October 29, 2025:
    thread sanitizer would likely pick up on it though if you made it non-atomic.

    furszy commented at 6:37 pm on October 29, 2025:
    true. Closing this.

    andrewtoth commented at 6:44 pm on October 29, 2025:
    I mean pick up that it’s being written by multiple threads if something breaks the assumptions in this test. It seems like an ok idea. Or am I wrong?

    furszy commented at 7:05 pm on October 29, 2025:

    I mean pick up that it’s being written by multiple threads if something breaks the assumptions in this test. It seems like an ok idea. Or am I wrong?

    I don’t think there is any broken assumption. BlockWorkers() ensure that all threads except one are blocked. We could go further and check that all tasks submitted post the BlockWorkers call are executed by the same thread too, but that seems to be an overkill to me. We are basically ensuring that in another way.


    andrewtoth commented at 7:46 pm on October 29, 2025:
    Sorry for being unclear. I don’t think there is any broken assumption either. However, if we made this a non-atomic as suggested, then if something in the future were to break this assumption and run these tasks on multiple threads the unit test would break. Thus it would act as a regression test.

    furszy commented at 8:28 pm on October 29, 2025:

    Sorry for being unclear. I don’t think there is any broken assumption either. However, if we made this a non-atomic as suggested, then if something in the future were to break this assumption and run these tasks on multiple threads the unit test would break. Thus it would act as a regression test.

    But as you said, the thread sanitizer would likely complain about that. We’re accessing the same variable from different threads (main and worker). The main issue, however, will probably be related to thread visibility; changes made in one thread aren’t guaranteed to be visible to another. So that approach seems fragile to me.

    Also, going back to my initial comment: even if we switch to a non-atomic variable, the test could still pass. If we want it to be fully deterministic, we should store the IDs of the threads that processed the tasks in a synchronized structure, and then check that only one ID is present at the end. That seems to be the most secure approach to me.

    I could push this if have to re-touch. But it doesn’t seem to be a blocking feature (at least to me). We do ensure all threads except one are blocked in the test, and we fail if not.


    andrewtoth commented at 10:16 pm on October 29, 2025:

    But as you said, the thread sanitizer would likely complain about that. We’re accessing the same variable from different threads (main and worker).

    It won’t complain if we synchronize access to the same non-atomic variable, which we do here because we lock when calling Submit.

    The main issue, however, will probably be related to thread visibility; changes made in one thread aren’t guaranteed to be visible to another.

    I don’t think this is the main issue. If a future change breaks the ThreadPool such that in this test more than 1 thread will write to this non-atomic variable, then the thread sanitizer will cause the test to fail due to a data race.


    furszy commented at 1:23 am on October 30, 2025:
    Pushed.
  15. in src/test/threadpool_tests.cpp:205 in c219b93c3b outdated
    197+        for (int i = 0; i < num_tasks; i++) {
    198+            threadPool.Submit([&counter]() {
    199+                counter.fetch_add(1);
    200+            });
    201+        }
    202+        std::this_thread::sleep_for(std::chrono::milliseconds{100});
    


    pinheadmz commented at 3:11 pm on October 28, 2025:

    c219b93c3b043de202bdf3c65b433fd17af2da89

    As mentioned before, not a blocker but i feel like sleeps like this are test-flakiness waiting to happen …


    furszy commented at 3:49 pm on October 29, 2025:

    c219b93

    As mentioned before, not a blocker but i feel like sleeps like this are test-flakiness waiting to happen …

    This is one of those “wait and see if something happens” scenarios (if any task gets processed). We expect no activity here since all workers are busy. I’m not sure there is another way of doing this, but if it fails (even if it is not deterministic), that’s still a useful signal because it means something unexpected happened.

  16. in src/test/threadpool_tests.cpp:212 in c219b93c3b outdated
    204+
    205+        // Now process manually
    206+        for (int i = 0; i < num_tasks; i++) {
    207+            threadPool.ProcessTask();
    208+        }
    209+        BOOST_CHECK_EQUAL(counter.load(), num_tasks);
    


    pinheadmz commented at 3:13 pm on October 28, 2025:

    c219b93c3b043de202bdf3c65b433fd17af2da89

    belt-and-suspenders, could BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0); I just like the symmetry in a test since you assert the value is 20 before taking action.


    furszy commented at 3:55 pm on October 29, 2025:
    Sounds good. Done as suggested. Thanks
  17. in src/test/fuzz/threadpool.cpp:28 in bb71b0e3bf
    23+};
    24+
    25+struct CounterTask {
    26+    std::atomic_uint32_t& m_counter;
    27+    explicit CounterTask(std::atomic_uint32_t& counter) : m_counter{counter} {}
    28+    void operator()() const { m_counter.fetch_add(1); }
    


    andrewtoth commented at 5:26 pm on October 28, 2025:

    Maybe we should relax this atomic operation so that it doesn’t hide any issues by synchronizing them for us.

    0    void operator()() const { m_counter.fetch_add(1, std::memory_order_relaxed); }
    

    furszy commented at 7:09 pm on October 28, 2025:
    Sure, pushed. Thanks!
  18. furszy force-pushed on Oct 28, 2025
  19. ismaelsadeeq commented at 7:29 pm on October 28, 2025: member
    Concept ACK, thanks for the seperation from the initial index PR.
  20. pinheadmz approved
  21. pinheadmz commented at 3:28 pm on October 29, 2025: member

    ACK 195a96258f970c384ce180d57e73616904ef5fa1

    Built and tested on macos/arm64 and debian/x86. Reviewed each commit and left a few comments.

    I also tested the branch against other popular software that consumes the HTTP interface by running their CI:

    The http worker threads are clearly labeled and visible in htop!

     0-----BEGIN PGP SIGNED MESSAGE-----
     1Hash: SHA256
     2
     3ACK 195a96258f970c384ce180d57e73616904ef5fa1
     4-----BEGIN PGP SIGNATURE-----
     5
     6iQIzBAEBCAAdFiEE5hdzzW4BBA4vG9eM5+KYS2KJyToFAmkCMdwACgkQ5+KYS2KJ
     7yTriXA//RRnzezUHdmzRlKmoSDA+ZBHz0RY3z2LGk63izb/YdYaJY9JBZL2Y9BA8
     8K2nyexdSDC/DFOm4H56ddEe6ChlB7w+uZc92SgFSLSvavInpZ80KEJRk07vgoIL7
     9hwuyyevWyOOU32iz1NE3q316TMaJmzVsPhRGwbdmTXNwJLtUX6g4czfh28ajW1DC
    10Y9ULKwT36rFHRcKwC1YZYuBJUNBZWQgVBcydcmS1UEykY4mBnCW9knrATwn/29b7
    112AYPV+yuaiy9OpDEOJ8iKtZOPGR36NrIUleMUqruq4Sy2/TnJtm3AKNK0336/Fxu
    12MqVyPKSyusg7kBA6f2h/2+NHpbyLoboYhjZew+HvED/aFfi+Jla+nxybkUYXfciL
    13pzbND22TTuRGB1fKU7AwPD0TO9JwOTU385iEdpoGq8rbT3EpgPr31N4TeDQDJx5t
    14jPzzWZYj43JMuIc3bm/K5S2HYSdFZUEDQC81kbND+jOLF6YkJwS9794anLO38tvi
    15fip/eLK8Nw4pmWnW63/9lc+Y/1gLpgLDMxxhA1NKJyytk7z2IRo7vJKck6TqAjcZ
    16nU8Wv9/ful5ndDJfLIKuYT9jqRk9ORohVwv+P+ppuO8jhFjhuswxPlFJKkMXTeZn
    17hGi8QCrAUuibvVuLfLKVExJqSmmeUAkTVKp5ZTWLwNB7IkZrSoo=
    18=hZYm
    19-----END PGP SIGNATURE-----
    

    pinheadmz’s public key is on openpgp.org

  22. DrahtBot requested review from ismaelsadeeq on Oct 29, 2025
  23. DrahtBot requested review from TheCharlatan on Oct 29, 2025
  24. furszy force-pushed on Oct 29, 2025
  25. furszy commented at 3:56 pm on October 29, 2025: member
    Oh, awesome extensive test and review @pinheadmz! you rock!
  26. pinheadmz approved
  27. pinheadmz commented at 6:20 pm on October 29, 2025: member

    ACK f5eca8d5252a68f0fbc8b5efec021c75744555b6

    minor changes since last ack

     0-----BEGIN PGP SIGNED MESSAGE-----
     1Hash: SHA256
     2
     3ACK f5eca8d5252a68f0fbc8b5efec021c75744555b6
     4-----BEGIN PGP SIGNATURE-----
     5
     6iQIzBAEBCAAdFiEE5hdzzW4BBA4vG9eM5+KYS2KJyToFAmkCWuYACgkQ5+KYS2KJ
     7yTpb9RAAgxNlDQlqkVfnIK2Xt8BQmRsiG+1KHQGJA2d3RHoxjFg2129fj47y5i+t
     8GlatlcsXEcibI2D0F22sKSzY1u0LWy4GYLI1d12JAIewA99n/lRr1ktDk3v64pp8
     9kldvYpd8UBs7DCHSJhPs4HbOPgwIILPASdSbQTb+6m48X5jg+cu+yMBuANVq3sbB
    109I8rUlbUgRva5voy3EGRkXsGTuwcaCoLDNHnnjrqQkJMYTym47rMF/xTZJJ11isF
    11QrpzFu+P2tFy1oj0bGd4e5EhqNk++qfRCuw9sGLgLL6YEHzC/ihVH/L5NAxoeJX4
    12L0yX09BG5wDrbUQvZn4w1uaQz39Uor5mb8+tZyDUyRmO9MrG5AHomUtfdbiYLwSO
    13007bLD+WX4Hxode47xCdhUhqflXqbHD2mhkf5ESyUgGu3smSHSQQosuzIJiVmTn9
    14b2UJG6see/tckFvart6YLn1AIu9uCyUMmB+hZIcSasZv95oEHv1YB3E5dcKUmq0d
    15A1cUHIhNvU4i/hNy2xgijgSjDdpVQMFbLYUt9y4ZnzpJXFd7mnbpHVUVM1P+Onvp
    16ScAtgvosXfbd5PjnAQWsMWgSmVHUtSc4t9GPjGlRYaVpHFkEbqWpWeKrYqMkU3id
    179YiFP9BueWNbNV7OBlB0LcfCpwO2WbvfJOW93/jxDovc0tgohbU=
    18=c8aD
    19-----END PGP SIGNATURE-----
    

    pinheadmz’s public key is on openpgp.org

  28. TheCharlatan approved
  29. TheCharlatan commented at 9:26 pm on October 29, 2025: contributor

    ACK f5eca8d5252a68f0fbc8b5efec021c75744555b6

    I have fuzzed the thread pool for some time again, and did not find any new issues. Also the previously observed memory leak during fuzzing was not observed anymore. The changes to the http workers look sane to me.

    Can you run the commits through clang-format-diff.py? There are a bunch of formatting issues in the first two commits.

  30. util: introduce general purpose thread pool e1eb4cd3a5
  31. fuzz: add test case for threadpool
    Co-authored-by: furszy <matiasfurszyfer@protonmail.com>
    e3430ccf3d
  32. http: replace WorkQueue and threads handling for ThreadPool
    Replace the HTTP server's WorkQueue implementation and single threads
    handling code with ThreadPool for processing HTTP requests. The
    ThreadPool class encapsulates all this functionality on a reusable
    class, properly unit and fuzz tested (the previous code was not
    unit nor fuzz tested at all).
    
    This cleanly separates responsibilities:
    The HTTP server now focuses solely on receiving and dispatching requests,
    while ThreadPool handles concurrency, queuing, and execution.
    It simplifies init, shutdown and requests tracking.
    
    This also allows us to experiment with further performance improvements at
    the task queuing and execution level, such as a lock-free structure, task
    prioritization or any other performance improvement in the future, without
    having to deal with HTTP code that lives on a different layer.
    435e8b5a55
  33. furszy force-pushed on Oct 30, 2025
  34. furszy commented at 1:22 am on October 30, 2025: member

    Can you run the commits through clang-format-diff.py? There are a bunch of formatting issues in the first two commits.

    Pushed.

  35. in src/test/threadpool_tests.cpp:12 in e1eb4cd3a5 outdated
     7+
     8+#include <boost/test/unit_test.hpp>
     9+
    10+BOOST_AUTO_TEST_SUITE(threadpool_tests)
    11+
    12+constexpr auto TIMEOUT_SECS = std::chrono::seconds(120);
    


    l0rinc commented at 10:17 am on October 30, 2025:

    nit: we don’t need to include the type here:

    0constexpr auto TIMEOUT{std::chrono::seconds(120)};
    

    #26966 (review)

  36. in src/util/threadpool.h:114 in e1eb4cd3a5 outdated
    109+        m_interrupt = false; // Reset
    110+
    111+        // Create workers
    112+        m_workers.reserve(num_workers);
    113+        for (int i = 0; i < num_workers; i++) {
    114+            m_workers.emplace_back(&util::TraceThread, m_name + "_pool_" + util::ToString(i), [this] { WorkerThread(); });
    


    l0rinc commented at 10:29 am on October 30, 2025:

    nit: we could use strprintf in a few more places:

    0            m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
    
  37. in src/util/threadpool.h:135 in e1eb4cd3a5 outdated
    130+        {
    131+            LOCK(m_mutex);
    132+            m_interrupt = true;
    133+            threads_to_join.swap(m_workers);
    134+        }
    135+        m_cv.notify_all();
    


    l0rinc commented at 10:33 am on October 30, 2025:
    What’s the reason for locking an notifying manually instead of using higher-level primitives - have you tried these with C++20 https://en.cppreference.com/w/cpp/thread/barrier.html instead?

    furszy commented at 8:43 pm on October 30, 2025:
    That is part of the possible future experimentations written in the PR description. Goal is to keep the same synchronization mechanisms we had in the http server code.

    l0rinc commented at 1:22 pm on October 31, 2025:

    Goal is to keep the same synchronization mechanisms we had in the http server code

    What is the reason for that, isn’t the goal to have a reusable ThreadPool?

  38. in src/util/threadpool.h:129 in e1eb4cd3a5 outdated
    124+     * Must be called from a controller (non-worker) thread.
    125+     */
    126+    void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    127+    {
    128+        // Notify workers and join them.
    129+        std::vector<std::thread> threads_to_join;
    


    l0rinc commented at 10:33 am on October 30, 2025:
    I’m not sure this is necessarry
  39. in src/util/threadpool.h:66 in e1eb4cd3a5 outdated
    61+        WAIT_LOCK(m_mutex, wait_lock);
    62+        for (;;) {
    63+            std::packaged_task<void()> task;
    64+            {
    65+                // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
    66+                if (!m_interrupt && m_work_queue.empty()) {
    


    l0rinc commented at 10:41 am on October 30, 2025:
    do we really need this extra complexity here?

    furszy commented at 8:31 pm on October 30, 2025:
    Yes, we do. The reason is explained there. All thread busy, a new task arrives so notifications goes unnoticed, then threads finish processing and sleep. Loosing the new task wake up notification, not running the new task and sleeping until a new one gets submitted. There is a test exercising this exact behavior.

    l0rinc commented at 1:31 pm on October 31, 2025:
    Can you point me to the test that fails please? I have removed it, ran the tests added in this PR and they all passed.
  40. in src/util/threadpool.h:71 in e1eb4cd3a5 outdated
    66+                if (!m_interrupt && m_work_queue.empty()) {
    67+                    // Block until the pool is interrupted or a task is available.
    68+                    m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
    69+                }
    70+
    71+                // If stopped and no work left, exit worker
    


    l0rinc commented at 10:41 am on October 30, 2025:
    the comment mirrors the code, it doesn’t really add anything that the code doesn’t already say (especially since the comment above already stated the same)
  41. in src/test/threadpool_tests.cpp:89 in e1eb4cd3a5 outdated
    84+
    85+        // Store futures to ensure completion before checking counter.
    86+        std::vector<std::future<void>> futures;
    87+        futures.reserve(num_tasks);
    88+
    89+        for (int i = 1; i <= num_tasks; i++) {
    


    l0rinc commented at 10:48 am on October 30, 2025:

    nit: for consistency

    0        for (size_t i{1}; i <= num_tasks; ++i) {
    

    furszy commented at 8:49 pm on October 30, 2025:
    num_tasks is an int. That would require casting it to size_t.

    l0rinc commented at 1:32 pm on October 31, 2025:
    Please see my suggestions above, it’s a size_t there
  42. in src/test/threadpool_tests.cpp:206 in e1eb4cd3a5 outdated
    201+            threadPool.Submit([&counter]() {
    202+                counter.fetch_add(1);
    203+            });
    204+        }
    205+        std::this_thread::sleep_for(std::chrono::milliseconds{100});
    206+        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 20);
    


    l0rinc commented at 10:51 am on October 30, 2025:
    0        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
    
  43. in src/test/threadpool_tests.cpp:75 in e1eb4cd3a5 outdated
    70+            threadPool.Submit([]() { return false; });
    71+        } catch (const std::runtime_error&) {
    72+            err = true;
    73+        }
    74+        BOOST_CHECK(err);
    75+    }
    


    l0rinc commented at 10:58 am on October 30, 2025:

    I like how thoroughly we’re testing the functionality here, that helps a lot with being able to trust this complex code.

    I wanted to debug these one-by-one and I don’t want to run all other tests before - and it’s a bit awkward to have so many tests in a single place when they’re admittedly separate. Could you please split them out into separate units, such as:

    0BOOST_AUTO_TEST_CASE(submit_to_non_started_pool_throws)
    1{
    2    ThreadPool threadPool{"not_started"};
    3    BOOST_CHECK_EXCEPTION(threadPool.Submit([]{ return false; }), std::runtime_error, HasReason{"No active workers"});
    4}
    
  44. in src/test/threadpool_tests.cpp:51 in e1eb4cd3a5 outdated
    46+    return blocking_tasks;
    47+}
    48+
    49+BOOST_AUTO_TEST_CASE(threadpool_basic)
    50+{
    51+    // Test Cases
    


    l0rinc commented at 11:01 am on October 30, 2025:

    As mentioned before, we’re having a single big-bang test case here (called “threadpool_basic”, but it contains everything we have), announcing the parts in comments followed by duplicating each comment. Can we split them out to focused, self-documenting test cases, something like:

    0BOOST_AUTO_TEST_CASE(submit_to_non_started_pool_throws)
    1BOOST_AUTO_TEST_CASE(submit_and_verify_completion)
    2BOOST_AUTO_TEST_CASE(single_available_worker_processes_all_tasks)
    3BOOST_AUTO_TEST_CASE(future_wait_blocks_until_task_completes)
    4BOOST_AUTO_TEST_CASE(future_get_returns_task_result)
    5BOOST_AUTO_TEST_CASE(task_exception_propagated_to_future)
    6BOOST_AUTO_TEST_CASE(process_task_manually_when_workers_busy)
    7BOOST_AUTO_TEST_CASE(recursive_task_submission)
    8BOOST_AUTO_TEST_CASE(stop_completes_queued_tasks_gracefully)
    

    That would help reviewers experiment and have exact test faulures, we could replace a lot of dead comments with code and provide better structure for the testing efforts.

  45. in src/test/threadpool_tests.cpp:83 in e1eb4cd3a5 outdated
    78+    {
    79+        int num_tasks = 50;
    80+
    81+        ThreadPool threadPool(POOL_NAME);
    82+        threadPool.Start(NUM_WORKERS_DEFAULT);
    83+        std::atomic<int> counter = 0;
    


    l0rinc commented at 11:10 am on October 30, 2025:

    nits: std::atomic_int and its brothers seem more focused (and brace init helps with many narrowing problems) - we’re using these in the fuzzing commit as well:

    0        std::atomic_size_t counter{0};
    
  46. in src/test/threadpool_tests.cpp:90 in e1eb4cd3a5 outdated
    85+        // Store futures to ensure completion before checking counter.
    86+        std::vector<std::future<void>> futures;
    87+        futures.reserve(num_tasks);
    88+
    89+        for (int i = 1; i <= num_tasks; i++) {
    90+            futures.emplace_back(threadPool.Submit([&counter, i]() {
    


    l0rinc commented at 11:11 am on October 30, 2025:

    nit:

    0            futures.emplace_back(threadPool.Submit([&counter, i] {
    
  47. in src/test/threadpool_tests.cpp:96 in e1eb4cd3a5 outdated
    91+                counter.fetch_add(i);
    92+            }));
    93+        }
    94+
    95+        // Wait for all tasks to finish
    96+        WaitFor(futures, /*context=*/"test1 task");
    


    l0rinc commented at 11:13 am on October 30, 2025:
    I’m not yet sure I understand why the test framework needs a wait method, that sounds like something the thread pool should provide. This is why I mentioned in #26966 (review) that the problem has to come before the solution, because the reviewers have to check out the follow-up comments to see if this is representative of actual usage or not.
  48. in src/test/threadpool_tests.cpp:95 in e1eb4cd3a5 outdated
    90+            futures.emplace_back(threadPool.Submit([&counter, i]() {
    91+                counter.fetch_add(i);
    92+            }));
    93+        }
    94+
    95+        // Wait for all tasks to finish
    


    l0rinc commented at 11:13 am on October 30, 2025:
    comment is redundant, the method name and context already state all of that
  49. in src/util/threadpool.h:106 in e1eb4cd3a5 outdated
    101+     *
    102+     * Must be called from a controller (non-worker) thread.
    103+     */
    104+    void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    105+    {
    106+        assert(num_workers > 0);
    


    l0rinc commented at 11:17 am on October 30, 2025:

    Can we narrow the type in that case to at least disallow negative values?

    0    void Start(size_t num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    1    {
    2        assert(num_workers > 0);
    

    furszy commented at 9:01 pm on October 30, 2025:
    This decision was on purpose. To be able to assert and abort the program if a negative number is provided. A negative integer to size_t conversion would be bad.

    l0rinc commented at 1:34 pm on October 31, 2025:

    Where would we be getting negative numbers from? If we care about the value being in a certain range, we could validate that instead:

    0        assert(num_workers > 0 && num_workers <= MAX_WORKER_COUNT);
    
  50. in src/test/threadpool_tests.cpp:91 in e1eb4cd3a5 outdated
    86+        std::vector<std::future<void>> futures;
    87+        futures.reserve(num_tasks);
    88+
    89+        for (int i = 1; i <= num_tasks; i++) {
    90+            futures.emplace_back(threadPool.Submit([&counter, i]() {
    91+                counter.fetch_add(i);
    


    l0rinc commented at 11:23 am on October 30, 2025:

    Since this isn’t a synchronization operations, the default is memory_order_seq_cst seems too strong here and it might hide the data race by providing unintended synchronization:

    0                counter.fetch_add(i, std::memory_order_relaxed);
    

    I have created a godbolt reproducer to understand it better, see: https://godbolt.org/z/o87dsq1fG I couldn’t find any real difference on x86 cpus, but ARM ones show the difference (__aarch64_ldadd4_acq_rel vs __aarch64_ldadd4_relax)

  51. in src/test/threadpool_tests.cpp:77 in e1eb4cd3a5 outdated
    72+            err = true;
    73+        }
    74+        BOOST_CHECK(err);
    75+    }
    76+
    77+    // Test case 1, submit tasks and verify completion.
    


    l0rinc commented at 12:12 pm on October 30, 2025:

    I’m usually coding along while reviewing, to simplify applying the suggestions that you like, I’m posting the final results as well:

     0BOOST_AUTO_TEST_CASE(submit_and_verify_completion)
     1{
     2    ThreadPool threadPool{"completion"};
     3    threadPool.Start(NUM_WORKERS_DEFAULT);
     4
     5    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
     6    std::atomic_size_t counter{0};
     7
     8    std::vector<std::future<void>> futures(num_tasks);
     9    for (size_t i{0}; i < num_tasks; ++i) {
    10        futures[i] = threadPool.Submit([&counter, i] { counter.fetch_add(i, std::memory_order_relaxed); });
    11    }
    12
    13    WaitFor(futures);
    14    BOOST_CHECK_EQUAL(counter.load(std::memory_order_relaxed), (num_tasks - 1) * num_tasks / 2);
    15    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    16}
    
  52. in src/test/threadpool_tests.cpp:79 in e1eb4cd3a5 outdated
    74+        BOOST_CHECK(err);
    75+    }
    76+
    77+    // Test case 1, submit tasks and verify completion.
    78+    {
    79+        int num_tasks = 50;
    


    l0rinc commented at 12:13 pm on October 30, 2025:

    we’re iterating forward, seems simpler to use an unsigned here

    0        constexpr size_t num_tasks{50};
    

    and to add some variance to the tests we could vary these randomly:

    0        const auto num_tasks{1 + m_rng.randrange<size_t>(50)};
    

    note that this will need:

    0BOOST_FIXTURE_TEST_SUITE(threadpool_tests, BasicTestingSetup)
    
  53. in src/test/threadpool_tests.cpp:26 in e1eb4cd3a5 outdated
    21+    }
    22+}
    23+
    24+// Block a number of worker threads by submitting tasks that wait on `blocker_future`.
    25+// Returns the futures of the blocking tasks, ensuring all have started and are waiting.
    26+std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::shared_future<void>& blocker_future, int num_of_threads_to_block, const std::string& context)
    


    l0rinc commented at 12:32 pm on October 30, 2025:

    ThreadPool::WorkersCount() returns size_t, if we restricted num_of_threads_to_block to size_t as well, we could assert here:

    0    assert(threadPool.WorkersCount() >= num_of_threads_to_block);
    
  54. in src/test/threadpool_tests.cpp:22 in e1eb4cd3a5 outdated
    17+    for (size_t i = 0; i < futures.size(); ++i) {
    18+        if (futures[i].wait_for(TIMEOUT_SECS) != std::future_status::ready) {
    19+            throw std::runtime_error("Timeout waiting for: " + context + ", task index " + util::ToString(i));
    20+        }
    21+    }
    22+}
    


    l0rinc commented at 12:55 pm on October 30, 2025:

    As mentioned before, I think these could be simplified to std::span, std::string_view and strprintf. And if we extract each test to a separate unit (as suggested below), we won’t need the context parameter anymore. The thread index also doesn’t really help in case of the failure, which would enable us to simplify this. And lastly, since BlockWorkers is already a std::vector<std::future<void>> we don’t actually need the template here:

    0void WaitFor(std::span<const std::future<void>> futures)
    1{
    2    for (const auto& f : futures) {
    3        BOOST_REQUIRE(f.wait_for(TIMEOUT) == std::future_status::ready);
    4    }
    5}
    
  55. in src/test/threadpool_tests.cpp:102 in e1eb4cd3a5 outdated
     97+        int expected_value = (num_tasks * (num_tasks + 1)) / 2; // Gauss sum.
     98+        BOOST_CHECK_EQUAL(counter.load(), expected_value);
     99+        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    100+    }
    101+
    102+    // Test case 2, maintain all threads busy except one.
    


    l0rinc commented at 1:05 pm on October 30, 2025:

    we could generalize this by counting from 1 to <NUM_WORKERS_DEFAULT, something like:

     0BOOST_AUTO_TEST_CASE(limited_free_workers_processes_all_task)
     1{
     2    ThreadPool threadPool{"block_counts"};
     3    threadPool.Start(NUM_WORKERS_DEFAULT);
     4    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
     5
     6    for (size_t free{1}; free < NUM_WORKERS_DEFAULT; ++free) {
     7        BOOST_TEST_MESSAGE("Testing with " << free << " available workers");
     8        std::counting_semaphore sem{0};
     9        const auto blocking_tasks{BlockWorkers(threadPool, sem, free)};
    10
    11        size_t counter{0};
    12        std::vector<std::future<void>> futures(num_tasks);
    13        for (auto& f : futures) f = threadPool.Submit([&counter] { ++counter; });
    14
    15        WaitFor(futures);
    16        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    17
    18        if (free == 1) {
    19            BOOST_CHECK_EQUAL(counter, num_tasks);
    20        } else {
    21            BOOST_CHECK_LE(counter, num_tasks); // unsynchronized update from multiple threads doesn't guarantee consistency
    22        }
    23
    24        sem.release(free);
    25        WaitFor(blocking_tasks);
    26    }
    27
    28    threadPool.Stop();
    29}
    
  56. in src/test/threadpool_tests.cpp:62 in e1eb4cd3a5 outdated
    57+    // 5) The task throws an exception, catch must be done in the consumer side.
    58+    // 6) Busy workers, help them by processing tasks from outside.
    59+    // 7) Recursive submission of tasks.
    60+    // 8) Submit task when all threads are busy, stop pool and verify the task gets executed.
    61+
    62+    const int NUM_WORKERS_DEFAULT = 3;
    


    l0rinc commented at 1:37 pm on October 30, 2025:

    I needed at least nproc+1 workers to be able to reliably reproduce concurrency issues (such as the non-atomic counter update)

    0    const size_t NUM_WORKERS_DEFAULT{size_t(GetNumCores()) + 1}; // we need to make sure there's *some* contention
    
  57. in src/test/threadpool_tests.cpp:123 in e1eb4cd3a5 outdated
    118+        futures.reserve(num_tasks);
    119+        for (int i = 0; i < num_tasks; i++) {
    120+            futures.emplace_back(threadPool.Submit([&counter]() {
    121+                counter += 1;
    122+            }));
    123+        }
    


    l0rinc commented at 1:48 pm on October 30, 2025:

    nit: these setup parts are longer than they need to be, this is just glue code, I find it distracting from the actual test logic:

    0        std::vector<std::future<void>> futures(num_tasks);
    1        for (auto& f : futures) f = threadPool.Submit([&counter]{ ++counter; });
    
  58. in src/test/threadpool_tests.cpp:143 in e1eb4cd3a5 outdated
    138+        std::atomic<bool> flag = false;
    139+        std::future<void> future = threadPool.Submit([&flag]() {
    140+            std::this_thread::sleep_for(std::chrono::milliseconds{200});
    141+            flag.store(true);
    142+        });
    143+        future.wait();
    


    l0rinc commented at 2:08 pm on October 30, 2025:
    hmm, isn’t this why WaitFor was added with a timeout in the first place?
  59. in src/test/threadpool_tests.cpp:140 in e1eb4cd3a5 outdated
    135+    {
    136+        ThreadPool threadPool(POOL_NAME);
    137+        threadPool.Start(NUM_WORKERS_DEFAULT);
    138+        std::atomic<bool> flag = false;
    139+        std::future<void> future = threadPool.Submit([&flag]() {
    140+            std::this_thread::sleep_for(std::chrono::milliseconds{200});
    


    l0rinc commented at 2:10 pm on October 30, 2025:

    maybe we can test more here, each test waiting a different amount (e.g. work index milliseconds) and we could assert at the end that complection takes at least NUM_WORKERS_DEFAULT milliseconds.

     0BOOST_AUTO_TEST_CASE(future_wait_blocks_until_task_completes)
     1{
     2    ThreadPool threadPool{"wait_test"};
     3    threadPool.Start(NUM_WORKERS_DEFAULT);
     4    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
     5
     6    const auto start{steady_clock::now()};
     7    std::vector<std::future<void>> futures(num_tasks + 1);
     8    for (size_t i{0}; i <= num_tasks; ++i) {
     9        futures[i] = threadPool.Submit([i] { UninterruptibleSleep(milliseconds{i}); });
    10    }
    11    WaitFor(futures);
    12    const size_t elapsed_ms{size_t(duration_cast<milliseconds>(steady_clock::now() - start).count())};
    13    BOOST_CHECK(elapsed_ms >= num_tasks);
    14}
    
  60. in src/test/threadpool_tests.cpp:252 in e1eb4cd3a5 outdated
    247+        // At this point, all workers are blocked, and the extra task is queued
    248+        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
    249+
    250+        // Wait a short moment before unblocking the threads to mimic a concurrent shutdown
    251+        std::thread thread_unblocker([&blocker]() {
    252+            std::this_thread::sleep_for(std::chrono::milliseconds{300});
    


    l0rinc commented at 2:13 pm on October 30, 2025:
    We could likely use UninterruptibleSleep here instead
  61. in src/test/threadpool_tests.cpp:47 in e1eb4cd3a5 outdated
    42+    }
    43+
    44+    // Wait until all threads are actually blocked
    45+    WaitFor(ready_futures, context);
    46+    return blocking_tasks;
    47+}
    


    l0rinc commented at 2:41 pm on October 30, 2025:

    What is the reason for the heavy use of promises? Does it have any advantage compared to C++20 concurrency primitives? This seems to me like a barebones implementation for an std::latch. Can we use that instead?

    This seemed like a std::binary_semaphore to me initially but couldn’t make it work with that.

    But we can still use a simple std::counting_semaphore which would avoid the std::promise + std::shared_future + std::vector<std::future<void>>, something like:

     0std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::counting_semaphore<>& release_sem, size_t num_of_threads_to_block)
     1{
     2    assert(threadPool.WorkersCount() >= num_of_threads_to_block);
     3    std::latch ready{std::ptrdiff_t(num_of_threads_to_block)};
     4
     5    std::vector<std::future<void>> blocking_tasks(num_of_threads_to_block);
     6    for (auto& f : blocking_tasks) f = threadPool.Submit([&] {
     7        ready.count_down();
     8        release_sem.acquire();
     9    });
    10
    11    ready.wait();
    12    return blocking_tasks;
    13}
    
  62. in src/test/threadpool_tests.cpp:147 in e1eb4cd3a5 outdated
    142+        });
    143+        future.wait();
    144+        BOOST_CHECK(flag.load());
    145+    }
    146+
    147+    // Test case 4, obtain result object.
    


    l0rinc commented at 2:47 pm on October 30, 2025:

    we could extract this to something like:

    0BOOST_AUTO_TEST_CASE(future_get_returns_task_result)
    1{
    2    ThreadPool threadPool{"result_test"};
    3    threadPool.Start(NUM_WORKERS_DEFAULT);
    4    
    5    BOOST_CHECK_EQUAL(threadPool.Submit([] { return true; }).get(), true);
    6    BOOST_CHECK_EQUAL(threadPool.Submit([] { return 42; }).get(), 42);
    7    BOOST_CHECK_EQUAL(threadPool.Submit([] { return std::string{"true"}; }).get(), "true");
    8}
    
  63. in src/test/threadpool_tests.cpp:168 in e1eb4cd3a5 outdated
    163+    // Test case 5, throw exception and catch it on the consumer side.
    164+    {
    165+        ThreadPool threadPool(POOL_NAME);
    166+        threadPool.Start(NUM_WORKERS_DEFAULT);
    167+
    168+        int ROUNDS = 5;
    


    l0rinc commented at 2:48 pm on October 30, 2025:
    in other cases we called this num_tasks
  64. in src/test/threadpool_tests.cpp:170 in e1eb4cd3a5 outdated
    165+        ThreadPool threadPool(POOL_NAME);
    166+        threadPool.Start(NUM_WORKERS_DEFAULT);
    167+
    168+        int ROUNDS = 5;
    169+        std::string err_msg{"something wrong happened"};
    170+        std::vector<std::future<void>> futures;
    


    l0rinc commented at 2:52 pm on October 30, 2025:

    do we need a vector here of can we just consume whatever we just submitted?

     0BOOST_AUTO_TEST_CASE(task_exception_propagated_to_future)
     1{
     2    ThreadPool threadPool{"exception_test"};
     3    threadPool.Start(NUM_WORKERS_DEFAULT);
     4
     5    const auto make_err{[&](size_t n) { return strprintf("error on thread #%s", n); }};
     6
     7    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
     8    for (size_t i{0}; i < num_tasks; ++i) {
     9        BOOST_CHECK_EXCEPTION(threadPool.Submit([&] { throw std::runtime_error(make_err(i)); }).get(), std::runtime_error, HasReason{make_err(i)});
    10    }
    11}
    
  65. in src/test/threadpool_tests.cpp:188 in e1eb4cd3a5 outdated
    183+                BOOST_CHECK_EQUAL(e.what(), err_msg + util::ToString(i));
    184+            }
    185+        }
    186+    }
    187+
    188+    // Test case 6, all workers are busy, help them by processing tasks from outside.
    


    l0rinc commented at 3:34 pm on October 30, 2025:

    We can split out the remaining 3 tests as well:

     0
     1BOOST_AUTO_TEST_CASE(process_task_manually_when_workers_busy)
     2{
     3    ThreadPool threadPool{"manual_process"};
     4    threadPool.Start(NUM_WORKERS_DEFAULT);
     5    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
     6
     7    std::counting_semaphore sem{0};
     8    const auto blocking_tasks{BlockWorkers(threadPool, sem, NUM_WORKERS_DEFAULT)};
     9
    10    std::atomic_size_t counter{0};
    11    std::vector<std::future<void>> futures(num_tasks);
    12    for (auto& f : futures) f = threadPool.Submit([&counter] { counter.fetch_add(1, std::memory_order_relaxed); });
    13
    14    UninterruptibleSleep(milliseconds{100});
    15    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
    16
    17    for (size_t i{0}; i < num_tasks; ++i) {
    18        threadPool.ProcessTask();
    19    }
    20
    21    BOOST_CHECK_EQUAL(counter.load(), num_tasks);
    22    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    23
    24    sem.release(NUM_WORKERS_DEFAULT);
    25    WaitFor(blocking_tasks);
    26}
    27
    28BOOST_AUTO_TEST_CASE(recursive_task_submission)
    29{
    30    ThreadPool threadPool{"recursive"};
    31    threadPool.Start(NUM_WORKERS_DEFAULT);
    32
    33    std::promise<void> signal;
    34    threadPool.Submit([&threadPool, &signal] {
    35        threadPool.Submit([&signal] {
    36            signal.set_value();
    37        });
    38    });
    39
    40    signal.get_future().wait();
    41}
    42
    43BOOST_AUTO_TEST_CASE(stop_completes_queued_tasks_gracefully)
    44{
    45    ThreadPool threadPool{"graceful_stop"};
    46    threadPool.Start(NUM_WORKERS_DEFAULT);
    47
    48    std::counting_semaphore sem{0};
    49    const auto blocking_tasks{BlockWorkers(threadPool, sem, NUM_WORKERS_DEFAULT)};
    50
    51    auto future{threadPool.Submit([] { return true; })};
    52    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
    53
    54    std::thread thread_unblocker{[&sem] {
    55        std::this_thread::sleep_for(milliseconds{300});
    56        sem.release(NUM_WORKERS_DEFAULT);
    57    }};
    58
    59    threadPool.Stop();
    60
    61    BOOST_CHECK(future.get());
    62    thread_unblocker.join();
    63    WaitFor(blocking_tasks);
    64    BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
    65}
    
  66. in src/httpserver.cpp:53 in 435e8b5a55
    49@@ -49,83 +50,6 @@ using common::InvalidPortErrMsg;
    50 /** Maximum size of http request (request line + headers) */
    51 static const size_t MAX_HEADERS_SIZE = 8192;
    52 
    53-/** HTTP request work item */
    54-class HTTPWorkItem final : public HTTPClosure
    


    l0rinc commented at 3:40 pm on October 30, 2025:
    Is there a way to do this final threadpool migration in smaller steps?
  67. l0rinc changes_requested
  68. l0rinc commented at 4:05 pm on October 30, 2025: contributor

    I have started reviewing this PR but have only finished the first commit (e1eb4cd3a5eb192cd6d9ee5d255688c06ab2089a).

    The depth of tests gives me hope that this transition will be smooth, the tests cover most of the functionality, even a few corner cases I haven’t thought of!

    It seemed, however, that I had more to say about that than anticipated. I didn’t even get to reviewing the threadpool properly, just most of the tests.

    I know receiving this amount of feedback can be daunting, but we both want this to succeed. I have spent a lot of time meticulously going through the details. I hope you will take it as I meant it: to make absolutely sure this won’t cause any problems and that our test suite is rock solid. I will continue the review, but wanted to make sure you’re aware of the progress and thought we can synchronize more often this way (pun intended).

    In general I think adding a threadpool can untangle complicated code as such, but we have to find a balance between IO and CPU bound tasks. I found that oversubscription is usually a smaller problem, so we can likely solve the IO/CPU contention problem by creating dedicated ThreadPools for each major task (http, script verification, input fetcher, compaction, etc). This way it won’t be a general resource guardian (which can be a ThreadPool’s main purpose), just a tool to avoid the overhead of starting/stopping threads.

    As hinted in the original thread, I find the current structure to be harder to follow, I would appreciate if you would consider doing the extraction in smaller steps:

    • first step would be to use the new ThreadPool’s structure, but extract the old logic into it;
    • add the tests (unit and fuzz, no need to split it) against the old impl in a separate commit;
    • in a last step swap it out with the new logic, keeping the same structure and tests.

    This way we could debug and understand the old code before we jump into the new one - and the tests would guide us in guaranteeing that the behavior stayed basically the same. I understand that’s extra work, but I’m of course willing to help in whatever you need to be able to do a less risky transition.

    I glanced quickly to the actual ThreadPool implementation, looks okay, but I want to investigate as part of my next wave of reviews why we need so much locking here and why we’re relying on old primitives here instead of the C++20 concurrency tools (such as latches and barriers) and whether we can create and destroy the pool via RAII instead of manual start/stops.

    Note that I have done an IBD before and after this change to see if everything was still working and I didn’t see any regression!

      0diff --git a/src/test/threadpool_tests.cpp b/src/test/threadpool_tests.cpp
      1index e8200533cd..052784db37 100644
      2--- a/src/test/threadpool_tests.cpp
      3+++ b/src/test/threadpool_tests.cpp
      4@@ -2,270 +2,208 @@
      5 // Distributed under the MIT software license, see the accompanying
      6 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
      7 
      8+#include <common/system.h>
      9+#include <test/util/setup_common.h>
     10 #include <util/string.h>
     11 #include <util/threadpool.h>
     12+#include <util/time.h>
     13 
     14 #include <boost/test/unit_test.hpp>
     15 
     16-BOOST_AUTO_TEST_SUITE(threadpool_tests)
     17+#include <latch>
     18+#include <semaphore>
     19 
     20-constexpr auto TIMEOUT_SECS = std::chrono::seconds(120);
     21+using namespace std::chrono;
     22+constexpr auto TIMEOUT = seconds(120);
     23 
     24-template <typename T>
     25-void WaitFor(const std::vector<std::future<T>>& futures, const std::string& context)
     26+void WaitFor(std::span<const std::future<void>> futures)
     27 {
     28-    for (size_t i = 0; i < futures.size(); ++i) {
     29-        if (futures[i].wait_for(TIMEOUT_SECS) != std::future_status::ready) {
     30-            throw std::runtime_error("Timeout waiting for: " + context + ", task index " + util::ToString(i));
     31-        }
     32+    for (const auto& f : futures) {
     33+        BOOST_REQUIRE(f.wait_for(TIMEOUT) == std::future_status::ready);
     34     }
     35 }
     36 
     37 // Block a number of worker threads by submitting tasks that wait on `blocker_future`.
     38 // Returns the futures of the blocking tasks, ensuring all have started and are waiting.
     39-std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::shared_future<void>& blocker_future, int num_of_threads_to_block, const std::string& context)
     40+std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::counting_semaphore<>& release_sem, size_t num_of_threads_to_block)
     41 {
     42-    // Per-thread ready promises to ensure all workers are actually blocked
     43-    std::vector<std::promise<void>> ready_promises(num_of_threads_to_block);
     44-    std::vector<std::future<void>> ready_futures;
     45-    ready_futures.reserve(num_of_threads_to_block);
     46-    for (auto& p : ready_promises) ready_futures.emplace_back(p.get_future());
     47-
     48-    // Fill all workers with blocking tasks
     49-    std::vector<std::future<void>> blocking_tasks;
     50-    for (int i = 0; i < num_of_threads_to_block; i++) {
     51-        std::promise<void>& ready = ready_promises[i];
     52-        blocking_tasks.emplace_back(threadPool.Submit([blocker_future, &ready]() {
     53-            ready.set_value();
     54-            blocker_future.wait();
     55-        }));
     56-    }
     57+    assert(threadPool.WorkersCount() >= num_of_threads_to_block);
     58+    std::latch ready{std::ptrdiff_t(num_of_threads_to_block)};
     59 
     60-    // Wait until all threads are actually blocked
     61-    WaitFor(ready_futures, context);
     62+    std::vector<std::future<void>> blocking_tasks(num_of_threads_to_block);
     63+    for (auto& f : blocking_tasks) f = threadPool.Submit([&] {
     64+        ready.count_down();
     65+        release_sem.acquire();
     66+    });
     67+
     68+    ready.wait();
     69     return blocking_tasks;
     70 }
     71 
     72-BOOST_AUTO_TEST_CASE(threadpool_basic)
     73+BOOST_FIXTURE_TEST_SUITE(threadpool_tests, BasicTestingSetup)
     74+
     75+const size_t NUM_WORKERS_DEFAULT{size_t(GetNumCores()) + 1}; // we need to make sure there's *some* contention
     76+
     77+BOOST_AUTO_TEST_CASE(submit_to_non_started_pool_throws)
     78 {
     79-    // Test Cases
     80-    // 0) Submit task to a non-started pool.
     81-    // 1) Submit tasks and verify completion.
     82-    // 2) Maintain all threads busy except one.
     83-    // 3) Wait for work to finish.
     84-    // 4) Wait for result object.
     85-    // 5) The task throws an exception, catch must be done in the consumer side.
     86-    // 6) Busy workers, help them by processing tasks from outside.
     87-    // 7) Recursive submission of tasks.
     88-    // 8) Submit task when all threads are busy, stop pool and verify the task gets executed.
     89-
     90-    const int NUM_WORKERS_DEFAULT = 3;
     91-    const std::string POOL_NAME = "test";
     92-
     93-    // Test case 0, submit task to a non-started pool
     94-    {
     95-        ThreadPool threadPool(POOL_NAME);
     96-        bool err = false;
     97-        try {
     98-            threadPool.Submit([]() { return false; });
     99-        } catch (const std::runtime_error&) {
    100-            err = true;
    101-        }
    102-        BOOST_CHECK(err);
    103+    ThreadPool threadPool{"not_started"};
    104+    BOOST_CHECK_EXCEPTION(threadPool.Submit([] { return 0; }), std::runtime_error, HasReason{"No active workers"});
    105+}
    106+
    107+BOOST_AUTO_TEST_CASE(submit_and_verify_completion)
    108+{
    109+    ThreadPool threadPool{"completion"};
    110+    threadPool.Start(NUM_WORKERS_DEFAULT);
    111+
    112+    const auto num_tasks{1 + m_rng.randrange<size_t>(50)};
    113+    std::atomic_size_t counter{0};
    114+
    115+    std::vector<std::future<void>> futures(num_tasks);
    116+    for (size_t i{0}; i < num_tasks; ++i) {
    117+        futures[i] = threadPool.Submit([&counter, i] { counter.fetch_add(i, std::memory_order_relaxed); });
    118     }
    119 
    120-    // Test case 1, submit tasks and verify completion.
    121-    {
    122-        int num_tasks = 50;
    123+    WaitFor(futures);
    124+    BOOST_CHECK_EQUAL(counter.load(std::memory_order_relaxed), (num_tasks - 1) * num_tasks / 2);
    125+    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    126+}
    127 
    128-        ThreadPool threadPool(POOL_NAME);
    129-        threadPool.Start(NUM_WORKERS_DEFAULT);
    130-        std::atomic<int> counter = 0;
    131+BOOST_AUTO_TEST_CASE(limited_free_workers_processes_all_task)
    132+{
    133+    ThreadPool threadPool{"block_counts"};
    134+    threadPool.Start(NUM_WORKERS_DEFAULT);
    135+    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
    136 
    137-        // Store futures to ensure completion before checking counter.
    138-        std::vector<std::future<void>> futures;
    139-        futures.reserve(num_tasks);
    140+    for (size_t free{1}; free < NUM_WORKERS_DEFAULT; ++free) {
    141+        BOOST_TEST_MESSAGE("Testing with " << free << " available workers");
    142+        std::counting_semaphore sem{0};
    143+        const auto blocking_tasks{BlockWorkers(threadPool, sem, free)};
    144 
    145-        for (int i = 1; i <= num_tasks; i++) {
    146-            futures.emplace_back(threadPool.Submit([&counter, i]() {
    147-                counter.fetch_add(i);
    148-            }));
    149-        }
    150+        size_t counter{0};
    151+        std::vector<std::future<void>> futures(num_tasks);
    152+        for (auto& f : futures) f = threadPool.Submit([&counter] { ++counter; });
    153 
    154-        // Wait for all tasks to finish
    155-        WaitFor(futures, /*context=*/"test1 task");
    156-        int expected_value = (num_tasks * (num_tasks + 1)) / 2; // Gauss sum.
    157-        BOOST_CHECK_EQUAL(counter.load(), expected_value);
    158+        WaitFor(futures);
    159         BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    160-    }
    161 
    162-    // Test case 2, maintain all threads busy except one.
    163-    {
    164-        ThreadPool threadPool(POOL_NAME);
    165-        threadPool.Start(NUM_WORKERS_DEFAULT);
    166-        // Single blocking future for all threads
    167-        std::promise<void> blocker;
    168-        std::shared_future<void> blocker_future(blocker.get_future());
    169-        const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT - 1, /*context=*/"test2 blocking tasks enabled");
    170-
    171-        // Now execute tasks on the single available worker
    172-        // and check that all the tasks are executed.
    173-        int num_tasks = 15;
    174-        int counter = 0;
    175-
    176-        // Store futures to wait on
    177-        std::vector<std::future<void>> futures;
    178-        futures.reserve(num_tasks);
    179-        for (int i = 0; i < num_tasks; i++) {
    180-            futures.emplace_back(threadPool.Submit([&counter]() {
    181-                counter += 1;
    182-            }));
    183+        if (free == 1) {
    184+            BOOST_CHECK_EQUAL(counter, num_tasks);
    185+        } else {
    186+            BOOST_CHECK_LE(counter, num_tasks); // unsynchronized update from multiple threads doesn't guarantee consistency
    187         }
    188 
    189-        WaitFor(futures, /*context=*/"test2 tasks");
    190-        BOOST_CHECK_EQUAL(counter, num_tasks);
    191-
    192-        blocker.set_value();
    193-        WaitFor(blocking_tasks, /*context=*/"test2 blocking tasks disabled");
    194-        threadPool.Stop();
    195-        BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
    196+        sem.release(free);
    197+        WaitFor(blocking_tasks);
    198     }
    199 
    200-    // Test case 3, wait for work to finish.
    201-    {
    202-        ThreadPool threadPool(POOL_NAME);
    203-        threadPool.Start(NUM_WORKERS_DEFAULT);
    204-        std::atomic<bool> flag = false;
    205-        std::future<void> future = threadPool.Submit([&flag]() {
    206-            std::this_thread::sleep_for(std::chrono::milliseconds{200});
    207-            flag.store(true);
    208-        });
    209-        future.wait();
    210-        BOOST_CHECK(flag.load());
    211-    }
    212+    threadPool.Stop();
    213+}
    214 
    215-    // Test case 4, obtain result object.
    216-    {
    217-        ThreadPool threadPool(POOL_NAME);
    218-        threadPool.Start(NUM_WORKERS_DEFAULT);
    219-        std::future<bool> future_bool = threadPool.Submit([]() {
    220-            return true;
    221-        });
    222-        BOOST_CHECK(future_bool.get());
    223+BOOST_AUTO_TEST_CASE(future_wait_blocks_until_task_completes)
    224+{
    225+    ThreadPool threadPool{"wait_test"};
    226+    threadPool.Start(NUM_WORKERS_DEFAULT);
    227+    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
    228 
    229-        std::future<std::string> future_str = threadPool.Submit([]() {
    230-            return std::string("true");
    231-        });
    232-        std::string result = future_str.get();
    233-        BOOST_CHECK_EQUAL(result, "true");
    234+    const auto start{steady_clock::now()};
    235+
    236+    std::vector<std::future<void>> futures(num_tasks + 1);
    237+    for (size_t i{0}; i <= num_tasks; ++i) {
    238+        futures[i] = threadPool.Submit([i] { UninterruptibleSleep(milliseconds{i}); });
    239     }
    240+    WaitFor(futures);
    241 
    242-    // Test case 5, throw exception and catch it on the consumer side.
    243-    {
    244-        ThreadPool threadPool(POOL_NAME);
    245-        threadPool.Start(NUM_WORKERS_DEFAULT);
    246-
    247-        int ROUNDS = 5;
    248-        std::string err_msg{"something wrong happened"};
    249-        std::vector<std::future<void>> futures;
    250-        futures.reserve(ROUNDS);
    251-        for (int i = 0; i < ROUNDS; i++) {
    252-            futures.emplace_back(threadPool.Submit([err_msg, i]() {
    253-                throw std::runtime_error(err_msg + util::ToString(i));
    254-            }));
    255-        }
    256+    const size_t elapsed_ms{size_t(duration_cast<milliseconds>(steady_clock::now() - start).count())};
    257+    BOOST_CHECK(elapsed_ms >= num_tasks);
    258+}
    259 
    260-        for (int i = 0; i < ROUNDS; i++) {
    261-            try {
    262-                futures.at(i).get();
    263-                BOOST_FAIL("Expected exception not thrown");
    264-            } catch (const std::runtime_error& e) {
    265-                BOOST_CHECK_EQUAL(e.what(), err_msg + util::ToString(i));
    266-            }
    267-        }
    268-    }
    269+BOOST_AUTO_TEST_CASE(future_get_returns_task_result)
    270+{
    271+    ThreadPool threadPool{"result_test"};
    272+    threadPool.Start(NUM_WORKERS_DEFAULT);
    273 
    274-    // Test case 6, all workers are busy, help them by processing tasks from outside.
    275-    {
    276-        ThreadPool threadPool(POOL_NAME);
    277-        threadPool.Start(NUM_WORKERS_DEFAULT);
    278-
    279-        std::promise<void> blocker;
    280-        std::shared_future<void> blocker_future(blocker.get_future());
    281-        const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT, /*context=*/"test6 blocking tasks enabled");
    282-
    283-        // Now submit tasks and check that none of them are executed.
    284-        int num_tasks = 20;
    285-        std::atomic<int> counter = 0;
    286-        for (int i = 0; i < num_tasks; i++) {
    287-            threadPool.Submit([&counter]() {
    288-                counter.fetch_add(1);
    289-            });
    290-        }
    291-        std::this_thread::sleep_for(std::chrono::milliseconds{100});
    292-        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 20);
    293+    BOOST_CHECK_EQUAL(threadPool.Submit([] { return true; }).get(), true);
    294+    BOOST_CHECK_EQUAL(threadPool.Submit([] { return 42; }).get(), 42);
    295+    BOOST_CHECK_EQUAL(threadPool.Submit([] { return std::string{"true"}; }).get(), "true");
    296+}
    297 
    298-        // Now process manually
    299-        for (int i = 0; i < num_tasks; i++) {
    300-            threadPool.ProcessTask();
    301-        }
    302-        BOOST_CHECK_EQUAL(counter.load(), num_tasks);
    303-        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    304-        blocker.set_value();
    305-        threadPool.Stop();
    306-        WaitFor(blocking_tasks, "Failure waiting for test6 blocking task futures");
    307+BOOST_AUTO_TEST_CASE(task_exception_propagated_to_future)
    308+{
    309+    ThreadPool threadPool{"exception_test"};
    310+    threadPool.Start(NUM_WORKERS_DEFAULT);
    311+
    312+    const auto err{[&](size_t n) { return strprintf("error on thread #%s", n); }};
    313+
    314+    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
    315+    for (size_t i{0}; i < num_tasks; ++i) {
    316+        BOOST_CHECK_EXCEPTION(threadPool.Submit([&] { throw std::runtime_error(err(i)); }).get(), std::runtime_error, HasReason{err(i)});
    317     }
    318+}
    319+
    320+BOOST_AUTO_TEST_CASE(process_task_manually_when_workers_busy)
    321+{
    322+    ThreadPool threadPool{"manual_process"};
    323+    threadPool.Start(NUM_WORKERS_DEFAULT);
    324+    const auto num_tasks{1 + m_rng.randrange<size_t>(20)};
    325 
    326-    // Test case 7, recursive submission of tasks.
    327-    {
    328-        ThreadPool threadPool(POOL_NAME);
    329-        threadPool.Start(NUM_WORKERS_DEFAULT);
    330+    std::counting_semaphore sem{0};
    331+    const auto blocking_tasks{BlockWorkers(threadPool, sem, NUM_WORKERS_DEFAULT)};
    332 
    333-        std::promise<void> signal;
    334-        threadPool.Submit([&]() {
    335-            threadPool.Submit([&]() {
    336-                signal.set_value();
    337-            });
    338-        });
    339+    std::atomic_size_t counter{0};
    340+    std::vector<std::future<void>> futures(num_tasks);
    341+    for (auto& f : futures) f = threadPool.Submit([&counter] { counter.fetch_add(1, std::memory_order_relaxed); });
    342 
    343-        signal.get_future().wait();
    344-        threadPool.Stop();
    345-    }
    346+    UninterruptibleSleep(milliseconds{100});
    347+    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
    348 
    349-    // Test case 8, submit a task when all threads are busy and then stop the pool.
    350-    {
    351-        ThreadPool threadPool(POOL_NAME);
    352-        threadPool.Start(NUM_WORKERS_DEFAULT);
    353+    for (size_t i{0}; i < num_tasks; ++i) {
    354+        threadPool.ProcessTask();
    355+    }
    356 
    357-        std::promise<void> blocker;
    358-        std::shared_future<void> blocker_future(blocker.get_future());
    359-        const auto& blocking_tasks = BlockWorkers(threadPool, blocker_future, NUM_WORKERS_DEFAULT, /*context=*/"test8 blocking tasks enabled");
    360+    BOOST_CHECK_EQUAL(counter.load(), num_tasks);
    361+    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
    362 
    363-        // Submit an extra task that should execute once a worker is free
    364-        std::future<bool> future = threadPool.Submit([]() { return true; });
    365+    sem.release(NUM_WORKERS_DEFAULT);
    366+    WaitFor(blocking_tasks);
    367+}
    368 
    369-        // At this point, all workers are blocked, and the extra task is queued
    370-        BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
    371+BOOST_AUTO_TEST_CASE(recursive_task_submission)
    372+{
    373+    ThreadPool threadPool{"recursive"};
    374+    threadPool.Start(NUM_WORKERS_DEFAULT);
    375 
    376-        // Wait a short moment before unblocking the threads to mimic a concurrent shutdown
    377-        std::thread thread_unblocker([&blocker]() {
    378-            std::this_thread::sleep_for(std::chrono::milliseconds{300});
    379-            blocker.set_value();
    380+    std::promise<void> signal;
    381+    threadPool.Submit([&threadPool, &signal] {
    382+        threadPool.Submit([&signal] {
    383+            signal.set_value();
    384         });
    385+    });
    386 
    387-        // Stop the pool while the workers are still blocked
    388-        threadPool.Stop();
    389+    signal.get_future().wait();
    390+}
    391 
    392-        // Expect the submitted task to complete
    393-        BOOST_CHECK(future.get());
    394-        thread_unblocker.join();
    395+BOOST_AUTO_TEST_CASE(stop_completes_queued_tasks_gracefully)
    396+{
    397+    ThreadPool threadPool{"graceful_stop"};
    398+    threadPool.Start(NUM_WORKERS_DEFAULT);
    399 
    400-        // Obviously all the previously blocking tasks should be completed at this point too
    401-        WaitFor(blocking_tasks, "Failure waiting for test8 blocking task futures");
    402+    std::counting_semaphore sem{0};
    403+    const auto blocking_tasks{BlockWorkers(threadPool, sem, NUM_WORKERS_DEFAULT)};
    404 
    405-        // Pool should be stopped and no workers remaining
    406-        BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
    407-    }
    408+    auto future{threadPool.Submit([] { return true; })};
    409+    BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
    410+
    411+    std::thread thread_unblocker{[&sem] {
    412+        std::this_thread::sleep_for(milliseconds{300});
    413+        sem.release(NUM_WORKERS_DEFAULT);
    414+    }};
    415+
    416+    threadPool.Stop();
    417+
    418+    BOOST_CHECK(future.get());
    419+    thread_unblocker.join();
    420+    WaitFor(blocking_tasks);
    421+    BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
    422 }
    423 
    424 BOOST_AUTO_TEST_SUITE_END()
    425diff --git a/src/util/threadpool.h b/src/util/threadpool.h
    426index 5d9884086e..c89fda37c2 100644
    427--- a/src/util/threadpool.h
    428+++ b/src/util/threadpool.h
    429@@ -24,6 +24,8 @@
    430 #include <utility>
    431 #include <vector>
    432 
    433+#include <tinyformat.h>
    434+
    435 /**
    436  * [@brief](/bitcoin-bitcoin/contributor/brief/) Fixed-size thread pool for running arbitrary tasks concurrently.
    437  *
    438@@ -62,16 +64,9 @@ private:
    439         for (;;) {
    440             std::packaged_task<void()> task;
    441             {
    442-                // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
    443-                if (!m_interrupt && m_work_queue.empty()) {
    444-                    // Block until the pool is interrupted or a task is available.
    445-                    m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
    446-                }
    447-
    448-                // If stopped and no work left, exit worker
    449-                if (m_interrupt && m_work_queue.empty()) {
    450-                    return;
    451-                }
    452+                // Block until the pool is interrupted or a task is available.
    453+                m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
    454+                if (m_interrupt && m_work_queue.empty()) return;
    455 
    456                 task = std::move(m_work_queue.front());
    457                 m_work_queue.pop();
    458@@ -101,17 +96,16 @@ public:
    459      *
    460      * Must be called from a controller (non-worker) thread.
    461      */
    462-    void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    463+    void Start(size_t num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    464     {
    465         assert(num_workers > 0);
    466         LOCK(m_mutex);
    467         if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
    468         m_interrupt = false; // Reset
    469 
    470-        // Create workers
    471         m_workers.reserve(num_workers);
    472-        for (int i = 0; i < num_workers; i++) {
    473-            m_workers.emplace_back(&util::TraceThread, m_name + "_pool_" + util::ToString(i), [this] { WorkerThread(); });
    474+        for (size_t i{0}; i < num_workers; i++) {
    475+            m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
    476         }
    477     }
    478 
    479@@ -179,12 +173,6 @@ public:
    480         task();
    481     }
    482 
    483-    void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    484-    {
    485-        WITH_LOCK(m_mutex, m_interrupt = true);
    486-        m_cv.notify_all();
    487-    }
    488-
    489     size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
    490     {
    491         return WITH_LOCK(m_mutex, return m_work_queue.size());
    
  69. marcofleon commented at 7:03 pm on October 30, 2025: contributor

    I’ve been playing around with the fuzz test and wanted to update here, even if it’s still a bit inconclusive.

    I’m running into non-reproducible timeouts when using fork with libFuzzer. I’m overusing my cores by a lot (fork=25 plus the 3 workers each), and I’m not yet sure if these timeouts would happen eventually when using less cores.

    I have “fixed” this by changing notify_one in Submit to notify_all, although I can’t say I’m exactly sure why this works. It almost seems like some of the workers aren’t being woken up at all when only one is being notified. Maybe the same one somehow ends up being reused over and over, so it gets stuck waiting? But then with notify_all all idle workers are woken up with every task, so the thread pool is able to handle the cpu overload more effectively.

    edit: This should be reproducible in the sense that if you run with fork close to the number of total cores on your machine and set -timeout=30 you should get timeouts in less than an hour of fuzzing.

  70. in src/util/threadpool.h:102 in e1eb4cd3a5 outdated
     97+     * @brief Start worker threads.
     98+     *
     99+     * Creates and launches `num_workers` threads that begin executing tasks
    100+     * from the queue. If the pool is already started, throws.
    101+     *
    102+     * Must be called from a controller (non-worker) thread.
    


    Eunovo commented at 10:55 am on October 31, 2025:
  71. Eunovo commented at 12:48 pm on October 31, 2025: contributor

    Concept ACK https://github.com/bitcoin/bitcoin/pull/33689/commits/435e8b5a55033fbfc6428a612b9826713b3cf57a

    The PR looks good already, but I think we can block users from calling Threadpool::Start() and Threadpool::Stop() inside Worker threads; We can use a thread local variable to identify worker threads and reject the operation.


github-metadata-mirror

This is a metadata mirror of the GitHub repository bitcoin/bitcoin. This site is not affiliated with GitHub. Content is generated from a GitHub metadata backup.
generated: 2025-10-31 21:13 UTC

This site is hosted by @0xB10C
More mirrored repositories can be found on mirror.b10c.me