It seems slightly odd to me that this code, which we only ever expect to run once, should be in a while
loop. I think it’d be clearer to split it out into its own function. Something like:
0def sync_mempools(rpc_connections, *, wait=1, timeout=60, use_rpc_sync=False, flush_scheduler=True):
1 """
2 Wait until everybody has the same transactions in their memory
3 pools. If use_rpc_sync is set, sync all transactions right away.
4 """
5 if use_rpc_sync:
6 force_sync_mempools(rpc_connections)
7 else:
8 stop_time = time.time() + timeout
9 while time.time() <= stop_time:
10 pool = [set(r.getrawmempool()) for r in rpc_connections]
11 if pool.count(pool[0]) == len(rpc_connections):
12 break
13 time.sleep(wait)
14 else:
15 raise AssertionError("Mempool sync timed out:{}".format("".join("\n {!r}".format(m) for m in pool)))
16
17 if flush_scheduler:
18 for r in rpc_connections:
19 r.syncwithvalidationinterfacequeue()
20
21def force_sync_mempools(rpc_connections):
22
23 class TxInfo:
24 def __init__(self, *, raw, ancestors):
25 self.raw = raw
26 self.ancestors = ancestors
27
28 def topo_send(txs, rpc, pool_add):
29 for i in txs:
30 topo_send(txs[i].ancestors, rpc, pool_add)
31 if i not in pool_add:
32 try:
33 assert_equal(i, rpc.sendrawtransaction(txs[i].raw))
34 pool_add.add(i)
35 # Note that conflicted txs (due to RBF) are not removed
36 # from the pool
37 except JSONRPCException as e:
38 # This transaction violates policy (e.g. RBF policy). The
39 # mempools should still converge when the high-fee
40 # replacement is synced in a later call
41 assert 'insufficient fee' in e.error['message']
42
43 pool = [set(r.getrawmempool()) for r in rpc_connections]
44 # Iterate over all nodes, get their raw mempool and send the
45 # missing txs to all other nodes
46 for i_remote, rpc_remote in enumerate(rpc_connections):
47 pool_remote = {
48 txid: TxInfo(raw=rpc_remote.getrawtransaction(txid), ancestors=info['depends'])
49 for txid, info in rpc_remote.getrawmempool(verbose=True).items()
50 }
51 # Create "recursive pools" for ancestors
52 for tx in pool_remote:
53 pool_remote[tx].ancestors = {a: pool_remote[a] for a in pool_remote[tx].ancestors}
54
55 # Push this pool to all targets
56 for i_target, rpc_target in enumerate(rpc_connections):
57 missing_txids = pool[i_remote].difference(pool[i_target])
58 # Send missing txs
59 topo_send(
60 txs={txid: pool_remote[txid]
61 for txid in pool_remote if txid in missing_txids},
62 rpc=rpc_target,
63 pool_add=pool[i_target],
64 )
65 # If the sync fails there is a logic error in the sync or test code
66 pool = [set(r.getrawmempool()) for r in rpc_connections]
67 assert pool.count(pool[0]) == len(rpc_connections)