FWIW I tested the code without accept_pending field and hence its check at _Listen removed, the tests passed with no issues. Then, asked Claude to generate a test that exercises it and produced this:
diff --git a/test/mp/test/listen_tests.cpp b/test/mp/test/listen_tests.cpp
index b367938..78298c7 100644
--- a/test/mp/test/listen_tests.cpp
+++ b/test/mp/test/listen_tests.cpp
@@ -24,8 +24,10 @@
#include <string>
#include <sys/socket.h>
#include <sys/un.h>
+#include <kj/exception.h>
#include <thread>
#include <unistd.h>
+#include <vector>
namespace mp {
namespace test {
@@ -112,11 +114,39 @@ public:
std::thread thread;
};
+//! kj::ExceptionCallback that captures KJ_LOG output into an external sink.
+//! Must be instantiated on the thread whose KJ logs you want to capture; it
+//! installs itself onto that thread's ExceptionCallback stack via its base
+//! constructor and removes itself in the destructor.
+class CaptureLogCallback : public kj::ExceptionCallback
+{
+public:
+ CaptureLogCallback(std::mutex& mu, std::string& sink) : m_mu(mu), m_sink(sink) {}
+
+ void logMessage(kj::LogSeverity severity, const char* file, int line, int contextDepth,
+ kj::String&& text) override
+ {
+ {
+ std::lock_guard<std::mutex> lock(m_mu);
+ m_sink.append(text.cStr(), text.size());
+ m_sink.push_back('\n');
+ }
+ // Still let the default callback emit to stderr so test debug output
+ // isn't silenced for other observers.
+ kj::ExceptionCallback::logMessage(severity, file, line, contextDepth, kj::mv(text));
+ }
+
+private:
+ std::mutex& m_mu;
+ std::string& m_sink;
+};
+
class ListenSetup
{
public:
explicit ListenSetup(std::optional<size_t> max_connections = std::nullopt)
: capped_listener(max_connections.has_value()), thread([this, max_connections] {
+ CaptureLogCallback log_capture(captured_log_mutex, captured_log);
EventLoop loop("mptest-server", [this](mp::LogMessage log) {
if (log.level == mp::Log::Raise) throw std::runtime_error(log.message);
if (log.message.find("IPC server: socket connected.") != std::string::npos) {
@@ -144,6 +174,18 @@ public:
~ListenSetup()
{
+ forceShutdown();
+ thread.join();
+ }
+
+ //! Synchronously tear down the event loop's task set so any pending accept
+ //! promises are destroyed now (rather than when the destructor runs later).
+ //! This makes it possible to assert on captured KJ log output before the
+ //! ListenSetup goes out of scope. Idempotent.
+ void forceShutdown()
+ {
+ if (shutdown_done) return;
+ shutdown_done = true;
if (capped_listener) {
EventLoop* loop;
{
@@ -152,7 +194,6 @@ public:
}
if (loop) loop->sync([&] { loop->m_task_set.reset(); });
}
- thread.join();
}
size_t ConnectedCount()
@@ -184,11 +225,15 @@ public:
UnixListener listener;
std::promise<void> ready_promise;
bool capped_listener{false};
+ bool shutdown_done{false};
std::mutex counter_mutex;
std::condition_variable counter_cv;
EventLoop* event_loop{nullptr};
size_t connected_count{0};
size_t disconnected_count{0};
+ //! KJ log output captured from the server thread via CaptureLogCallback.
+ std::mutex captured_log_mutex;
+ std::string captured_log;
std::thread thread;
};
@@ -245,6 +290,34 @@ KJ_TEST("ListenConnections keeps capped listeners alive before reaching the limi
KJ_EXPECT(client2->client->add(2, 3) == 5);
}
+// Without `accept_pending`, cascaded close handlers each post a duplicate
+// accept(). KJ silently serializes them so the cap isn't exceeded, but the
+// extra pending promises are destroyed at cleanup and logged as
+// "PromiseFulfiller was destroyed without fulfilling the promise."
+// This test fails when accept_pending is removed and passes when it's intact.
+KJ_TEST("ListenConnections does not leak accept promises during disconnect burst")
+{
+ constexpr size_t kCap = 2;
+ ListenSetup setup(/*max_connections=*/kCap);
+
+ std::vector<std::unique_ptr<ClientSetup>> filling;
+ filling.reserve(kCap);
+ for (size_t i = 0; i < kCap; ++i) {
+ filling.push_back(std::make_unique<ClientSetup>(setup.listener.Connect()));
+ }
+ setup.WaitForConnectedCount(kCap);
+
+ filling.clear();
+ setup.WaitForDisconnectedCount(kCap);
+
+ // Trigger m_task_set.reset() now so any leaked accept promises get destroyed
+ // before we read captured_log.
+ setup.forceShutdown();
+
+ std::lock_guard<std::mutex> lock(setup.captured_log_mutex);
+ KJ_EXPECT(setup.captured_log.find("PromiseFulfiller was destroyed") == std::string::npos);
+}
+
} // namespace
} // namespace test
} // namespace mp
Basically what this does is install a kj::ExceptionCallback in the server thread to capture the log "PromiseFulfiller was destroyed" generated by KJ runtime if cascaded disconnects each call _Listen and post a fresh accept() promise. The test fails with accept_pending removed and pass with it back.