It seems slightly odd to me that this code, which we only ever expect to run once, should be in a while loop. I think it'd be clearer to split it out into its own function. Something like:
def sync_mempools(rpc_connections, *, wait=1, timeout=60, use_rpc_sync=False, flush_scheduler=True):
"""
Wait until everybody has the same transactions in their memory
pools. If use_rpc_sync is set, sync all transactions right away.
"""
if use_rpc_sync:
force_sync_mempools(rpc_connections)
else:
stop_time = time.time() + timeout
while time.time() <= stop_time:
pool = [set(r.getrawmempool()) for r in rpc_connections]
if pool.count(pool[0]) == len(rpc_connections):
break
time.sleep(wait)
else:
raise AssertionError("Mempool sync timed out:{}".format("".join("\n {!r}".format(m) for m in pool)))
if flush_scheduler:
for r in rpc_connections:
r.syncwithvalidationinterfacequeue()
def force_sync_mempools(rpc_connections):
class TxInfo:
def __init__(self, *, raw, ancestors):
self.raw = raw
self.ancestors = ancestors
def topo_send(txs, rpc, pool_add):
for i in txs:
topo_send(txs[i].ancestors, rpc, pool_add)
if i not in pool_add:
try:
assert_equal(i, rpc.sendrawtransaction(txs[i].raw))
pool_add.add(i)
# Note that conflicted txs (due to RBF) are not removed
# from the pool
except JSONRPCException as e:
# This transaction violates policy (e.g. RBF policy). The
# mempools should still converge when the high-fee
# replacement is synced in a later call
assert 'insufficient fee' in e.error['message']
pool = [set(r.getrawmempool()) for r in rpc_connections]
# Iterate over all nodes, get their raw mempool and send the
# missing txs to all other nodes
for i_remote, rpc_remote in enumerate(rpc_connections):
pool_remote = {
txid: TxInfo(raw=rpc_remote.getrawtransaction(txid), ancestors=info['depends'])
for txid, info in rpc_remote.getrawmempool(verbose=True).items()
}
# Create "recursive pools" for ancestors
for tx in pool_remote:
pool_remote[tx].ancestors = {a: pool_remote[a] for a in pool_remote[tx].ancestors}
# Push this pool to all targets
for i_target, rpc_target in enumerate(rpc_connections):
missing_txids = pool[i_remote].difference(pool[i_target])
# Send missing txs
topo_send(
txs={txid: pool_remote[txid]
for txid in pool_remote if txid in missing_txids},
rpc=rpc_target,
pool_add=pool[i_target],
)
# If the sync fails there is a logic error in the sync or test code
pool = [set(r.getrawmempool()) for r in rpc_connections]
assert pool.count(pool[0]) == len(rpc_connections)