proxy: add local connection limit to ListenConnections #269

pull enirox001 wants to merge 2 commits into bitcoin-core:master from enirox001:04-26-ipc-local-connection-limit changing 7 files +318 −16
  1. enirox001 commented at 10:00 AM on April 8, 2026: contributor

    This adds an optional local connection limit toListenConnections().

    Previously, ListenConnections() would accept incoming connections indefinitely. This branch adds an optional max_connections parameter so a listener can stop accepting new connections once a per-listener cap is reached, and resume accepting when an existing connection disconnects.

    The limit is local to the listener instead of global to the EventLoop. This keeps the state and behavior scoped to the listening socket, and is closer to the direction discussed downstream for per--ipcbind limits.

    This also adds a test covering the behavior with max_connections=1, verifying that:

    • the first client is accepted normally
    • a second client is not accepted while the first remains connected
    • the second client is accepted after the first disconnects

    This is a follow-up to the IPC FD reservation work

  2. DrahtBot commented at 10:01 AM on April 8, 2026: none

    <!--e57a25ab6845829454e8d69fc972939a-->

    The following sections might be updated with supplementary metadata relevant to reviewers and maintainers.

    <!--021abf342d371248e50ceaed478a90ca-->

    Reviews

    See the guideline for information on the review process.

    Type Reviewers
    ACK xyzconstant
    Approach ACK ryanofsky

    If your review is incorrectly listed, please copy-paste <code>&lt;!--meta-tag:bot-skip--&gt;</code> into the comment that the bot should ignore.

    <!--174a7506f384e20aa4161008e828411d-->

    Conflicts

    Reviewers, this pull request conflicts with the following ones:

    • #276 (build: prepare for subtree split by Sjors)
    • #274 (Add nonunix platform support by ryanofsky)
    • #272 (ci: add Windows job by Sjors)
    • #231 (Add windows support by ryanofsky)

    If you consider this pull request important, please also help to review the conflicting pull requests. Ideally, start with the one that should be merged first.

    <!--5faf32d7da4f0f540f40219e4f7537a3-->

  3. enirox001 marked this as a draft on Apr 8, 2026
  4. enirox001 force-pushed on Apr 8, 2026
  5. enirox001 force-pushed on Apr 8, 2026
  6. enirox001 force-pushed on Apr 8, 2026
  7. in include/mp/proxy-io.h:829 in 84ed607b39
     821 | @@ -820,8 +822,19 @@ std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int f
     822 |  //! handles requests from the stream by calling the init object. Embed the
     823 |  //! ProxyServer in a Connection object that is stored and erased if
     824 |  //! disconnected. This should be called from the event loop thread.
     825 | +template <typename InitInterface, typename InitImpl, typename OnDisconnect>
     826 | +void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init, OnDisconnect&& on_disconnect);
     827 | +
     828 |  template <typename InitInterface, typename InitImpl>
     829 |  void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init)
    


    ryanofsky commented at 2:02 PM on April 8, 2026:

    In commit "proxy: add local connection limit to ListenConnections" (84ed607b390e24dd0d41cd95d0159bfeadfaf5d8)

    Since _Serve is an internal function only called in a few places, I think it would be less confusing if it was not overloaded, and just always required an on_disconnect parameter, even if it requires a little extra verbosity at some call sites.


    enirox001 commented at 9:58 AM on April 9, 2026:

    Done. Simplified this now so _Serve always takes an on_disconnect callback instead of overloading it. Since it is only used internally

  8. in include/mp/proxy-io.h:865 in 84ed607b39 outdated
     861 | +        : listener(kj::mv(listener_)), max_connections(max_connections_) {}
     862 | +
     863 | +    kj::Own<kj::ConnectionReceiver> listener;
     864 | +    std::optional<size_t> max_connections;
     865 | +    size_t active_connections{0};
     866 | +    bool accept_pending{false};
    


    ryanofsky commented at 2:08 PM on April 8, 2026:

    In commit "proxy: add local connection limit to ListenConnections" (84ed607b390e24dd0d41cd95d0159bfeadfaf5d8)

    Curious if this accept_pending variable is actually necessary or if code could just compare active_connections and max_connections when deciding whether to listen. Would prefer to avoid redundancy in the state representation if possible even if makes individual checks more a little more verbose.

    If accept_pending really is necessary would be a good to have a short comment about why.


    enirox001 commented at 9:59 AM on April 9, 2026:

    I kept accept_pending, but added a short comment explaining why it is needed here. active_connections only counts accepted connections, so without a separate flag nested _Listen() calls could post multiple pending accept() calls before active_connections is incremented.


    xyzconstant commented at 7:47 PM on May 20, 2026:

    FWIW I tested the code without accept_pending field and hence its check at _Listen removed, the tests passed with no issues. Then, asked Claude to generate a test that exercises it and produced this:

    diff --git a/test/mp/test/listen_tests.cpp b/test/mp/test/listen_tests.cpp
    index b367938..78298c7 100644
    --- a/test/mp/test/listen_tests.cpp
    +++ b/test/mp/test/listen_tests.cpp
    @@ -24,8 +24,10 @@
     #include <string>
     #include <sys/socket.h>
     #include <sys/un.h>
    +#include <kj/exception.h>
     #include <thread>
     #include <unistd.h>
    +#include <vector>
     
     namespace mp {
     namespace test {
    @@ -112,11 +114,39 @@ public:
         std::thread thread;
     };
     
    +//! kj::ExceptionCallback that captures KJ_LOG output into an external sink.
    +//! Must be instantiated on the thread whose KJ logs you want to capture; it
    +//! installs itself onto that thread's ExceptionCallback stack via its base
    +//! constructor and removes itself in the destructor.
    +class CaptureLogCallback : public kj::ExceptionCallback
    +{
    +public:
    +    CaptureLogCallback(std::mutex& mu, std::string& sink) : m_mu(mu), m_sink(sink) {}
    +
    +    void logMessage(kj::LogSeverity severity, const char* file, int line, int contextDepth,
    +                    kj::String&& text) override
    +    {
    +        {
    +            std::lock_guard<std::mutex> lock(m_mu);
    +            m_sink.append(text.cStr(), text.size());
    +            m_sink.push_back('\n');
    +        }
    +        // Still let the default callback emit to stderr so test debug output
    +        // isn't silenced for other observers.
    +        kj::ExceptionCallback::logMessage(severity, file, line, contextDepth, kj::mv(text));
    +    }
    +
    +private:
    +    std::mutex& m_mu;
    +    std::string& m_sink;
    +};
    +
     class ListenSetup
     {
     public:
         explicit ListenSetup(std::optional<size_t> max_connections = std::nullopt)
             : capped_listener(max_connections.has_value()), thread([this, max_connections] {
    +              CaptureLogCallback log_capture(captured_log_mutex, captured_log);
                   EventLoop loop("mptest-server", [this](mp::LogMessage log) {
                       if (log.level == mp::Log::Raise) throw std::runtime_error(log.message);
                       if (log.message.find("IPC server: socket connected.") != std::string::npos) {
    @@ -144,6 +174,18 @@ public:
     
         ~ListenSetup()
         {
    +        forceShutdown();
    +        thread.join();
    +    }
    +
    +    //! Synchronously tear down the event loop's task set so any pending accept
    +    //! promises are destroyed now (rather than when the destructor runs later).
    +    //! This makes it possible to assert on captured KJ log output before the
    +    //! ListenSetup goes out of scope. Idempotent.
    +    void forceShutdown()
    +    {
    +        if (shutdown_done) return;
    +        shutdown_done = true;
             if (capped_listener) {
                 EventLoop* loop;
                 {
    @@ -152,7 +194,6 @@ public:
                 }
                 if (loop) loop->sync([&] { loop->m_task_set.reset(); });
             }
    -        thread.join();
         }
     
         size_t ConnectedCount()
    @@ -184,11 +225,15 @@ public:
         UnixListener listener;
         std::promise<void> ready_promise;
         bool capped_listener{false};
    +    bool shutdown_done{false};
         std::mutex counter_mutex;
         std::condition_variable counter_cv;
         EventLoop* event_loop{nullptr};
         size_t connected_count{0};
         size_t disconnected_count{0};
    +    //! KJ log output captured from the server thread via CaptureLogCallback.
    +    std::mutex captured_log_mutex;
    +    std::string captured_log;
         std::thread thread;
     };
     
    @@ -245,6 +290,34 @@ KJ_TEST("ListenConnections keeps capped listeners alive before reaching the limi
         KJ_EXPECT(client2->client->add(2, 3) == 5);
     }
     
    +// Without `accept_pending`, cascaded close handlers each post a duplicate
    +// accept(). KJ silently serializes them so the cap isn't exceeded, but the
    +// extra pending promises are destroyed at cleanup and logged as
    +// "PromiseFulfiller was destroyed without fulfilling the promise."
    +// This test fails when accept_pending is removed and passes when it's intact.
    +KJ_TEST("ListenConnections does not leak accept promises during disconnect burst")
    +{
    +    constexpr size_t kCap = 2;
    +    ListenSetup setup(/*max_connections=*/kCap);
    +
    +    std::vector<std::unique_ptr<ClientSetup>> filling;
    +    filling.reserve(kCap);
    +    for (size_t i = 0; i < kCap; ++i) {
    +        filling.push_back(std::make_unique<ClientSetup>(setup.listener.Connect()));
    +    }
    +    setup.WaitForConnectedCount(kCap);
    +
    +    filling.clear();
    +    setup.WaitForDisconnectedCount(kCap);
    +
    +    // Trigger m_task_set.reset() now so any leaked accept promises get destroyed
    +    // before we read captured_log.
    +    setup.forceShutdown();
    +
    +    std::lock_guard<std::mutex> lock(setup.captured_log_mutex);
    +    KJ_EXPECT(setup.captured_log.find("PromiseFulfiller was destroyed") == std::string::npos);
    +}
    +
     } // namespace
     } // namespace test
     } // namespace mp
    

    Basically what this does is install a kj::ExceptionCallback in the server thread to capture the log "PromiseFulfiller was destroyed" generated by KJ runtime if cascaded disconnects each call _Listen and post a fresh accept() promise. The test fails with accept_pending removed and pass with it back.


    xyzconstant commented at 7:50 PM on May 20, 2026:

    IMO it's not so obvious why this field is needed here, this patch basically guarantee the same outcome and also pass the test generated by Claude above:

    diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h
    index 78924c6..c002811 100644
    --- a/include/mp/proxy-io.h
    +++ b/include/mp/proxy-io.h
    @@ -851,11 +851,6 @@ struct ListenState
         kj::Own<kj::ConnectionReceiver> listener;
         std::optional<size_t> max_connections;
         size_t active_connections{0};
    -    //! Tracks whether accept() has already been posted. This is needed because
    -    //! active_connections only counts accepted connections, so without a
    -    //! separate flag, nested _Listen() calls could queue multiple pending
    -    //! accepts before active_connections increases.
    -    bool accept_pending{false};
     };
     
     template <typename InitInterface, typename InitImpl>
    @@ -866,9 +861,12 @@ void _ServeAccepted(EventLoop& loop, InitImpl& init, const std::shared_ptr<Liste
     {
         ++state->active_connections;
         _Serve<InitInterface>(loop, kj::mv(stream), init, [&loop, &init, state] {
    +        const bool was_at_cap = state->max_connections && state->active_connections == *state->max_connections;
             assert(state->active_connections > 0);
             --state->active_connections;
    -        _Listen<InitInterface>(loop, init, state);
    +        if (was_at_cap) {
    +            _Listen<InitInterface>(loop, init, state);
    +        }
         });
     }
     
    @@ -885,15 +883,12 @@ inline std::unique_ptr<EventLoopRef> _MakeCappedListenerRef(EventLoop& loop, con
     template <typename InitInterface, typename InitImpl>
     void _Listen(EventLoop& loop, InitImpl& init, const std::shared_ptr<ListenState>& state)
     {
    -    if (state->accept_pending) return;
         if (_ListenAtCapacity(*state)) return;
     
    -    state->accept_pending = true;
         auto* ptr = state->listener.get();
         auto accept_ref{_MakeCappedListenerRef(loop, *state)};
         loop.m_task_set->add(ptr->accept().then(
             [&loop, &init, state, accept_ref = std::move(accept_ref)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
    -            state->accept_pending = false;
                 _ServeAccepted<InitInterface>(loop, init, state, kj::mv(stream));
                 _Listen<InitInterface>(loop, init, state);
             }));
    

    enirox001 commented at 11:39 AM on May 21, 2026:

    I think this is a better invariant than tracking accept_pending.

    There should already be one pending accept whenever the capped listener is below capacity, and disconnects only need to post a new accept when they transition the listener from full to below full, because that is the only state where nocaccept was pending.

    So checking this before decrementing active_connections avoids the duplicate-accept case without adding another state variable.

    Taken and decided to use resume_accept for the local boolean instead

  9. in test/mp/test/test.cpp:51 in 84ed607b39
      46 |  namespace mp {
      47 |  namespace test {
      48 |  
      49 | +namespace {
      50 | +
      51 | +class UnixListener
    


    ryanofsky commented at 2:43 PM on April 8, 2026:

    In commit "proxy: add local connection limit to ListenConnections" (84ed607b390e24dd0d41cd95d0159bfeadfaf5d8)

    Would be good to introduce the test in a separate commit and maybe separate file if it doesn't share much code with existing test.

    IMO would be nice if it test was introduced in an intial commit then updated after connection limits are added so it's easier to see how connection limits are tested separately from connection setup.


    enirox001 commented at 10:02 AM on April 9, 2026:

    Split the listener coverage out into a dedicated listen_tests.cpp file and reordered the history so the baseline ListenConnections() test is introduced first, then extended in follow-up commit with the local connection limit coverage.

  10. ryanofsky commented at 2:44 PM on April 8, 2026: collaborator

    Approach ACK 84ed607b390e24dd0d41cd95d0159bfeadfaf5d8. Implementation of local connection limit here looks almost exactly like I would have expected.

    I do think it would be helpful to see a draft PR in the bitcoin repo using this API (it should be fine to make libmultiprocess changes there and let the lint CI job fail so you don't need to mess with subtrees) because the approach in https://github.com/bitcoin/bitcoin/pull/34978 of adding a global connection limit option isn't exactly compatible with the implementation here of implementing a per-address connection limit.

    I'd personally prefer using per-address limits over introducing a global limit but both approaches seem reasonable

  11. enirox001 force-pushed on Apr 9, 2026
  12. enirox001 force-pushed on Apr 9, 2026
  13. enirox001 force-pushed on Apr 9, 2026
  14. enirox001 force-pushed on Apr 9, 2026
  15. enirox001 commented at 10:29 AM on April 9, 2026: contributor

    Thanks for the review @ryanofsky .

    I addressed the cleanup points in the latest push:

    • _Serve is now explicit at all call sites
    • accept_pending is documented,
    • listener coverage now lives in a dedicated listen_tests.cpp file introduced in a separate commit before the local-limit extension.

    I’m also planning to put together a draft Bitcoin Core PR using this API so the per-address approach can be evaluated downstream against the current global-limit direction.

  16. enirox001 commented at 1:48 PM on April 9, 2026: contributor

    I put together the downstream draft using this API here bitcoin/bitcoin#35037

    It uses per--ipcbind max-connections=<n> options, threads the parsed per-address limit into ListenConnections(), and adds downstream coverage for the local-limit behavior.

  17. enirox001 marked this as ready for review on May 4, 2026
  18. enirox001 commented at 9:43 AM on May 4, 2026: contributor

    This PR is now ready for review

  19. in test/mp/test/listen_tests.cpp:133 in 8511c68f8b outdated
     128 | +                      ++disconnected_count;
     129 | +                      counter_cv.notify_all();
     130 | +                  }
     131 | +              });
     132 | +              FooImplementation foo;
     133 | +              ListenConnections<messages::FooInterface>(loop, listener.release(), foo, max_connections);
    


    xyzconstant commented at 2:58 AM on May 13, 2026:

    nit: This will fail to compile mptest at commit 8c47a3aba8baf145d7ef9d0efe3a1a0f67be2331 since the 4th parameter max_connections is not introduced in ListenConnections's signature until next commit. I'd fix the call for this commit and then adjust the setup accordingly in 8511c68.


    enirox001 commented at 8:41 AM on May 14, 2026:

    Fixed, thanks

  20. in test/mp/test/listen_tests.cpp:190 in 8511c68f8b outdated
     185 | +
     186 | +    setup.WaitForConnectedCount(1);
     187 | +    KJ_EXPECT(client->client->add(1, 2) == 3);
     188 | +}
     189 | +
     190 | +KJ_TEST("ListenConnections enforces a local connection limit")
    


    xyzconstant commented at 3:06 AM on May 13, 2026:

    nit: Unlike in ListenConnections's call issue (compilation error), this will still fail for this commit (8c47a3aba8baf145d7ef9d0efe3a1a0f67be2331). Following ryanosfky's reasoning (https://github.com/bitcoin-core/libmultiprocess/pull/269#discussion_r3052126251) I believe this suite would be better introduced in 8511c68f8b6827f4e9b306e0245929965a363930 so the test lands with the feature it exercises.


    enirox001 commented at 8:42 AM on May 14, 2026:

    Done

  21. xyzconstant commented at 3:15 AM on May 13, 2026: none

    Code review ACK 8511c68f8b6827f4e9b306e0245929965a363930

    Reviewed each commit separately, compiled and ran tests. The changes look good to me, only left a couple of inline nits noting a compilation error + failing tests in the first commit.

  22. DrahtBot requested review from ryanofsky on May 13, 2026
  23. test: add dedicated ListenConnections coverage
    Add a separate listen_tests.cpp file with reusable UnixListener, ClientSetup, and ListenSetup helpers for exercising ListenConnections() with real Unix domain sockets.
    
    The new test covers the baseline behavior that ListenConnections() accepts an incoming connection and serves requests over it. Keeping this coverage separate from the existing general proxy tests makes the socket listener setup easier to review and provides a clearer place to extend listener-specific behavior in follow-up commits.
    d0cff019a9
  24. enirox001 force-pushed on May 14, 2026
  25. enirox001 commented at 8:43 AM on May 14, 2026: contributor

    Thanks for the review @xyzconstant . Fixed both commit-structure issues: the first test commit now only adds baseline ListenConnections() coverage using the existing 3-argument API, and the max_connections setup/test now lands with the feature commit that introduces the new parameter

  26. enirox001 commented at 8:44 AM on May 14, 2026: contributor

    Also tightened the capped listener behavior. It now stops posting accepts once the limit is reached, and keeps capped pending accepts alive so later clients can connect after an idle gap, including before the cap has been reached.

    Added coverage for the reconnect cases as well

  27. enirox001 force-pushed on May 14, 2026
  28. in include/mp/proxy-io.h:898 in 19e1386bca
     899 | -            _Serve<InitInterface>(loop, kj::mv(stream), init);
     900 | -            _Listen<InitInterface>(loop, kj::mv(listener), init);
     901 | +        [&loop, &init, state, accept_ref = std::move(accept_ref)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
     902 | +            state->accept_pending = false;
     903 | +            _ServeAccepted<InitInterface>(loop, init, state, kj::mv(stream));
     904 | +            if (_ListenAtCapacity(*state)) return;
    


    xyzconstant commented at 3:13 AM on May 19, 2026:

    In commit "proxy: add local connection limit to ListenConnections" (19e1386bca3cee7e2d5cc775b74eac064d522fed)

    Not sure if I'm missing something but this check here seems redundant with the same _ListenAtCapacity check at the start of _Listen (line 889).

    I tested commenting this line out and left the upper-level check (and vice-versa) and the tests passed with no issues. I'd suggest dropping any of these duplicates.


    enirox001 commented at 9:06 AM on May 19, 2026:

    Good catch, thanks. Dropped the lower _ListenAtCapacity() check since _Listen() already handles the capacity check before posting another accept.

  29. enirox001 force-pushed on May 19, 2026
  30. in include/mp/proxy-io.h:898 in b0207dd406 outdated
     899 | -            _Serve<InitInterface>(loop, kj::mv(stream), init);
     900 | -            _Listen<InitInterface>(loop, kj::mv(listener), init);
     901 | +        [&loop, &init, state, accept_ref = std::move(accept_ref)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
     902 | +            state->accept_pending = false;
     903 | +            _ServeAccepted<InitInterface>(loop, init, state, kj::mv(stream));
     904 | +            _Listen<InitInterface>(loop, init, state);
    


    xyzconstant commented at 7:56 PM on May 20, 2026:

    With the accept_pending check in place, this _Listen call will never be reached.

    NOTE: if you apply this patch here (comment), then it will be needed here because we can't rely on the second _Listen call in _ServeAccepted which is executed conditionally after active_connections reached the cap.

  31. xyzconstant commented at 8:13 PM on May 20, 2026: none

    Thanks for the update @enirox001!

    I've been playing around with the PR code more throughly this time and have a different take on accept_pending now, so left a few comments.

    Overall the code is factually correct and works as expected. And I think it could be merged (despite my latest thoughts on accept_pending). So I'll just re-ACK b0207dd406b206e0cf2835f10e36dd0ad5a34c96

  32. enirox001 force-pushed on May 21, 2026
  33. proxy: add local connection limit to ListenConnections
    Add an optional max_connections parameter to ListenConnections() so a listener can stop accepting new connections after reaching a local connection cap and resume accepting after an existing connection disconnects.
    
    Implement the limit with listener-local state tracking the listening socket, maximum number of active connections, and whether an async accept() has already been posted. This keeps the limit scoped to the individual listener instead of introducing global EventLoop state.
    
    Extend the dedicated listener test coverage to verify that with max_connections=1 the first client is accepted normally, a second client is not accepted while the first remains connected, and the second client is accepted after the first disconnects.
    7e51ab2353
  34. enirox001 force-pushed on May 21, 2026
  35. enirox001 commented at 11:43 AM on May 21, 2026: contributor

    I've been playing around with the PR code more throughly this time and have a different take on accept_pending now, so left a few comments.

    Thanks for taking another look @xyzconstant

    I dropped accept_pending and switched to the simpler invariant you suggested.

  36. xyzconstant commented at 4:20 PM on May 21, 2026: none

    re-ACK 7e51ab235330d1a198b910871fac09a23ffdd5f4

    Thanks for the updates @enirox001!


ryanofsky


github-metadata-mirror

This is a metadata mirror of the GitHub repository bitcoin-core/libmultiprocess. This site is not affiliated with GitHub. Content is generated from a GitHub metadata backup.
generated: 2026-05-31 17:30 UTC

This site is hosted by @0xB10C
More mirrored repositories can be found on mirror.b10c.me