If this is changed to:
0- # Create listener with default P2PInterface
1- listener = P2PInterface()
2- actual_to_addr, actual_to_port = self._create_auto_outbound_listener(listener, i)
3+ if self.decide_auto_outbound_where_to:
4+ actual_to_addr, actual_to_port, listener = self.decide_auto_outbound_where_to(i)
5+ else:
6+ # Create listener with default P2PInterface
7+ listener = P2PInterface()
8+ actual_to_addr, actual_to_port = self._create_auto_outbound_listener(listener, i)
9
10 self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} to "
11- f"{format_addr_port(actual_to_addr, actual_to_port)} (a Python node)")
12+ f"{format_addr_port(actual_to_addr, actual_to_port)}")
then it is possible to override from the test:
- the decision where to redirect and
- the listener creation (in case something other than
P2PInterface is needed, e.g. P2PDataStore)
With that change, the private broadcast test can take the following simplification (28 insertions(+), 78 deletions(-)):
0diff --git i/test/functional/p2p_private_broadcast.py w/test/functional/p2p_private_broadcast.py
1index 4e97da666a..697d44c299 100755
2--- i/test/functional/p2p_private_broadcast.py
3+++ w/test/functional/p2p_private_broadcast.py
4@@ -162,95 +162,45 @@ ADDRMAN_ADDRESSES = [
5 "[fc00::20]",
6 ]
7
8
9 class P2PPrivateBroadcast(BitcoinTestFramework):
10 def set_test_params(self):
11- self.disable_autoconnect = False
12+ self.auto_outbound_mode = True
13 self.num_nodes = 2
14
15 def setup_nodes(self):
16- # Start a SOCKS5 proxy server.
17- socks5_server_config = Socks5Configuration()
18- # self.nodes[0] listens on p2p_port(0),
19- # self.nodes[1] listens on p2p_port(1),
20- # thus we tell the SOCKS5 server to listen on p2p_port(self.num_nodes) (self.num_nodes is 2)
21- socks5_server_config.addr = ("127.0.0.1", p2p_port(self.num_nodes))
22- socks5_server_config.unauth = True
23- socks5_server_config.auth = True
24-
25- self.socks5_server = Socks5Server(socks5_server_config)
26- self.socks5_server.start()
27-
28- # Tor ports are the highest among p2p/rpc/tor, so this should be the first available port.
29- ports_base = tor_port(MAX_NODES) + 1
30-
31- self.destinations = []
32-
33- self.destinations_lock = threading.Lock()
34-
35- def destinations_factory(requested_to_addr, requested_to_port):
36- with self.destinations_lock:
37- i = len(self.destinations)
38- actual_to_addr = ""
39- actual_to_port = 0
40- listener = None
41- if i == NUM_INITIAL_CONNECTIONS:
42- # Instruct the SOCKS5 server to redirect the first private
43- # broadcast connection from nodes[0] to nodes[1]
44- actual_to_addr = "127.0.0.1" # nodes[1] listen address
45- actual_to_port = tor_port(1) # nodes[1] listen port for Tor
46- self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} to "
47- f"{format_addr_port(actual_to_addr, actual_to_port)} (nodes[1])")
48- else:
49- # Create a Python P2P listening node and instruct the SOCKS5 proxy to
50- # redirect the connection to it. The first outbound connection is used
51- # later to serve GETDATA, thus make it P2PDataStore().
52- listener = P2PDataStore() if i == 0 else P2PInterface()
53- listener.peer_connect_helper(dstaddr="0.0.0.0", dstport=0, net=self.chain, timeout_factor=self.options.timeout_factor)
54- listener.peer_connect_send_version(services=P2P_SERVICES)
55-
56- def on_listen_done(addr, port):
57- nonlocal actual_to_addr
58- nonlocal actual_to_port
59- actual_to_addr = addr
60- actual_to_port = port
61-
62- self.network_thread.listen(
63- addr="127.0.0.1",
64- port=ports_base + i,
65- p2p=listener,
66- callback=on_listen_done)
67- # Wait until the callback has been called.
68- self.wait_until(lambda: actual_to_port != 0)
69- self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} to "
70- f"{format_addr_port(actual_to_addr, actual_to_port)} (a Python node)")
71-
72- self.destinations.append({
73- "requested_to": format_addr_port(requested_to_addr, requested_to_port),
74- "node": listener,
75- })
76- assert_equal(len(self.destinations), i + 1)
77-
78- return {
79- "actual_to_addr": actual_to_addr,
80- "actual_to_port": actual_to_port,
81- }
82-
83- self.socks5_server.conf.destinations_factory = destinations_factory
84+
85+ def decide_auto_outbound_where_to(connection_index):
86+ actual_to_addr = ""
87+ actual_to_port = 0
88+ listener = None
89+ if connection_index == NUM_INITIAL_CONNECTIONS:
90+ # Instruct the SOCKS5 server to redirect the first private
91+ # broadcast connection from nodes[0] to nodes[1]
92+ actual_to_addr = "127.0.0.1" # nodes[1] listen address
93+ actual_to_port = tor_port(1) # nodes[1] listen port for Tor
94+ else:
95+ # Create a Python P2P listening node and instruct the SOCKS5 proxy to
96+ # redirect the connection to it. The first outbound connection is used
97+ # later to serve GETDATA, thus make it P2PDataStore().
98+ listener = P2PDataStore() if connection_index == 0 else P2PInterface()
99+ actual_to_addr, actual_to_port = self._create_auto_outbound_listener(listener, connection_index)
100+ return actual_to_addr, actual_to_port, listener
101+
102+ self.decide_auto_outbound_where_to = decide_auto_outbound_where_to
103
104 self.extra_args = [
105 [
106 # Needed to be able to add CJDNS addresses to addrman (otherwise they are unroutable).
107 "-cjdnsreachable",
108 # Connecting, sending garbage, being disconnected messes up with this test's
109 # check_broadcasts() which waits for a particular Python node to receive a connection.
110 "-v2transport=0",
111 "-test=addrman",
112 "-privatebroadcast",
113- f"-proxy={socks5_server_config.addr[0]}:{socks5_server_config.addr[1]}",
114 # To increase coverage, make it think that the I2P network is reachable so that it
115 # selects such addresses as well. Pick a proxy address where nobody is listening
116 # and connection attempts fail quickly.
117 "-i2psam=127.0.0.1:1",
118 ],
119 [
120@@ -268,14 +218,14 @@ class P2PPrivateBroadcast(BitcoinTestFramework):
121 i = skip_destinations - 1
122 while broadcasts_done < broadcasts_to_expect:
123 i += 1
124 self.log.debug(f"{label}: waiting for outbound connection i={i}")
125 # At this point the connection may not yet have been established (A),
126 # may be active (B), or may have already been closed (C).
127- self.wait_until(lambda: len(self.destinations) > i)
128- dest = self.destinations[i]
129+ self.wait_until(lambda: len(self.auto_outbound_destinations) > i)
130+ dest = self.auto_outbound_destinations[i]
131 peer = dest["node"]
132 peer.wait_until(lambda: peer.message_count["version"] == 1, check_connected=False)
133 # Now it is either (B) or (C).
134 if peer.last_message["version"].nServices != 0:
135 self.log.debug(f"{label}: outbound connection i={i} to {dest['requested_to']} not a private broadcast, ignoring it (maybe feeler or extra block only)")
136 continue
137@@ -313,13 +263,13 @@ class P2PPrivateBroadcast(BitcoinTestFramework):
138 # Fill tx_originator's addrman.
139 for addr in ADDRMAN_ADDRESSES:
140 res = tx_originator.addpeeraddress(address=addr, port=8333, tried=False)
141 if not res["success"]:
142 self.log.debug(f"Could not add {addr} to tx_originator's addrman (collision?)")
143
144- self.wait_until(lambda: len(self.destinations) == NUM_INITIAL_CONNECTIONS)
145+ self.wait_until(lambda: len(self.auto_outbound_destinations) == NUM_INITIAL_CONNECTIONS)
146
147 # The next opened connection by tx_originator should be "private broadcast"
148 # for sending the transaction. The SOCKS5 proxy should redirect it to tx_receiver.
149
150 txs = wallet.create_self_transfer_chain(chain_length=3)
151 self.log.info(f"Created txid={txs[0]['txid']}: for basic test")
152@@ -353,31 +303,31 @@ class P2PPrivateBroadcast(BitcoinTestFramework):
153 assert_equal(len(tx_originator.getrawmempool()), 0)
154
155 wtxid_int = int(txs[0]["wtxid"], 16)
156 inv = CInv(MSG_WTX, wtxid_int)
157
158 self.log.info("Sending INV and waiting for GETDATA from node")
159- tx_returner = self.destinations[0]["node"] # Will return the transaction back to the originator.
160+ tx_returner = self.auto_outbound_destinations[0]["node"] # Will return the transaction back to the originator.
161 tx_returner.tx_store[wtxid_int] = txs[0]["tx"]
162 assert "getdata" not in tx_returner.last_message
163 received_back_msg = f"Received our privately broadcast transaction (txid={txs[0]['txid']}) from the network"
164 with tx_originator.assert_debug_log(expected_msgs=[received_back_msg]):
165 tx_returner.send_without_ping(msg_inv([inv]))
166 tx_returner.wait_until(lambda: "getdata" in tx_returner.last_message)
167 self.wait_until(lambda: len(tx_originator.getrawmempool()) > 0)
168
169 self.log.info("Waiting for normal broadcast to another peer")
170- self.destinations[1]["node"].wait_for_inv([inv])
171+ self.auto_outbound_destinations[1]["node"].wait_for_inv([inv])
172
173 self.log.info("Sending a transaction that is already in the mempool")
174- skip_destinations = len(self.destinations)
175+ skip_destinations = len(self.auto_outbound_destinations)
176 tx_originator.sendrawtransaction(hexstring=txs[0]["hex"], maxfeerate=0)
177 self.check_broadcasts("Broadcast of mempool transaction", txs[0], NUM_PRIVATE_BROADCAST_PER_TX, skip_destinations)
178
179 self.log.info("Sending a transaction with a dependency in the mempool")
180- skip_destinations = len(self.destinations)
181+ skip_destinations = len(self.auto_outbound_destinations)
182 tx_originator.sendrawtransaction(hexstring=txs[1]["hex"], maxfeerate=0.1)
183 self.check_broadcasts("Dependency in mempool", txs[1], NUM_PRIVATE_BROADCAST_PER_TX, skip_destinations)
184
185 self.log.info("Sending a transaction with a dependency not in the mempool (should be rejected)")
186 assert_equal(len(tx_originator.getrawmempool()), 1)
187 assert_raises_rpc_error(-25, "bad-txns-inputs-missingorspent",
188@@ -388,13 +338,13 @@ class P2PPrivateBroadcast(BitcoinTestFramework):
189 # Since txs[1] has not been received back by tx_originator,
190 # it should be re-broadcast after a while. Advance tx_originator's clock
191 # to trigger a re-broadcast. Should be more than the maximum returned by
192 # NextTxBroadcast() in net_processing.cpp.
193 self.log.info("Checking that rebroadcast works")
194 delta = 20 * 60 # 20min
195- skip_destinations = len(self.destinations)
196+ skip_destinations = len(self.auto_outbound_destinations)
197 rebroadcast_msg = f"Reattempting broadcast of stale txid={txs[1]['txid']}"
198 with tx_originator.busy_wait_for_debug_log(expected_msgs=[rebroadcast_msg.encode()]):
199 tx_originator.setmocktime(int(time.time()) + delta)
200 tx_originator.mockscheduler(delta)
201 self.check_broadcasts("Rebroadcast", txs[1], 1, skip_destinations)
202 tx_originator.setmocktime(0) # Let the clock tick again (it will go backwards due to this).