Keep track of per-peer reconciliation sets containing transactions to be exchanged efficiently. The remaining transactions are announced via usual flooding.
Erlay Project Tracking: #28646
Keep track of per-peer reconciliation sets containing transactions to be exchanged efficiently. The remaining transactions are announced via usual flooding.
Erlay Project Tracking: #28646
The following sections might be updated with supplementary metadata relevant to reviewers and maintainers.
For detailed information about the code coverage, see the test coverage report.
See the guideline for information on the review process.
If your review is incorrectly listed, please react with 👎 to this comment and the bot will ignore it on the next update.
No conflicts as of last run.
5758@@ -5752,9 +5759,12 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
5759 }
5760
5761 if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) {
5762+ // Lock way before it's used to maintain lock ordering.
I tried a whole bunch of combinations. Say, you move the LOCK(m_peer_mutex);
to L5831, where m_peer_map
is used.
Then you get something like this in Cirrus (not exactly!).
0 node0 2023-08-17T12:01:53.647510Z [msghand] [sync.cpp:97] [potential_deadlock_detected] POTENTIAL DEADLOCK DETECTED
1 node0 2023-08-17T12:01:53.647516Z [msghand] [sync.cpp:98] [potential_deadlock_detected] Previous lock order was:
2 node0 2023-08-17T12:01:53.647523Z [msghand] [sync.cpp:107] [potential_deadlock_detected] 'NetEventsInterface::g_msgproc_mutex' in net.cpp:2095 (in thread 'msghand')
3 node0 2023-08-17T12:01:53.647531Z [msghand] [sync.cpp:107] [potential_deadlock_detected] 'cs_main' in net_processing.cpp:5473 (in thread 'msghand')
4 node0 2023-08-17T12:01:53.647574Z [msghand] [sync.cpp:107] [potential_deadlock_detected] (2) 'm_peer_mutex' in net_processing.cpp:5686 (in thread 'msghand')
5 node0 2023-08-17T12:01:53.647581Z [msghand] [sync.cpp:107] [potential_deadlock_detected] 'tx_relay->m_tx_inventory_mutex' in net_processing.cpp:5688 (in thread 'msghand')
6 node0 2023-08-17T12:01:53.647587Z [msghand] [sync.cpp:107] [potential_deadlock_detected] 'tx_relay->m_bloom_filter_mutex' in net_processing.cpp:5768 (in thread 'msghand')
7 node0 2023-08-17T12:01:53.647593Z [msghand] [sync.cpp:107] [potential_deadlock_detected] (1) 'm_mempool.cs' in net_processing.cpp:5850 (in thread 'msghand')
8 node0 2023-08-17T12:01:53.647598Z [msghand] [sync.cpp:111] [potential_deadlock_detected] Current lock order is:
9 node0 2023-08-17T12:01:53.647604Z [msghand] [sync.cpp:122] [potential_deadlock_detected] 'NetEventsInterface::g_msgproc_mutex' in net.cpp:2095 (in thread 'msghand')
10 node0 2023-08-17T12:01:53.647610Z [msghand] [sync.cpp:122] [potential_deadlock_detected] 'm_chainstate_mutex' in validation.cpp:3102 (in thread 'msghand')
11 node0 2023-08-17T12:01:53.647616Z [msghand] [sync.cpp:122] [potential_deadlock_detected] 'cs_main' in validation.cpp:3124 (in thread 'msghand')
12 node0 2023-08-17T12:01:53.647622Z [msghand] [sync.cpp:122] [potential_deadlock_detected] (1) 'MempoolMutex()' in validation.cpp:3126 (in thread 'msghand')
13 node0 2023-08-17T12:01:53.647627Z [msghand] [sync.cpp:122] [potential_deadlock_detected] 'cs_main' in net_processing.cpp:2013 (in thread 'msghand')
14 node0 2023-08-17T12:01:53.647633Z [msghand] [sync.cpp:122] [potential_deadlock_detected] (2) 'm_peer_mutex' in net_processing.cpp:1593 (in thread 'msghand')
From this log you see that m_peer_mutex
should go before m_mempool.cs
. I admit it might be not the 100% optimal placement, feels NP-hard to me :) Do you know how to approach this better?
139+ /*inbounds_nonrcncl_tx_relay=*/0, /*outbounds_nonrcncl_tx_relay=*/0);
140+ }
141+ BOOST_CHECK_EQUAL(total_fanouted, 3);
142+ }
143+
144+ // // Don't relay if there is sufficient non-reconciling peers
//
209@@ -193,6 +210,104 @@ class TxReconciliationTracker::Impl
210 LOCK(m_txreconciliation_mutex);
211 return IsPeerRegistered(peer_id);
212 }
213+
214+ std::vector<NodeId> GetFanoutTargets(CSipHasher& deterministic_randomizer_with_wtxid,
I wonder if this algorithm (which took me a while to fully understand) could be simpler.
E.g., if we used a sorted container for best_peers
instead of a vector, inserted all of the peers, and then finally return the first targets
elements of that container, I think we could do without the try_fanout_candidate
lambda.
Or would that be incorrect / less performant?
I’m thinking of something like the following (just to show idea, I didn’t test it):
struct ComparePairs {
bool operator()(const std::pair<uint64_t, NodeId>& left, const std::pair<uint64_t, NodeId>& right) const {
return left.first > right.first;
}
};
std::vector<NodeId> GetFanoutTargets(CSipHasher& deterministic_randomizer_with_wtxid,
bool we_initiate, double limit) const EXCLUSIVE_LOCKS_REQUIRED(m_txreconciliation_mutex)
{
// The algorithm works as follows. We iterate through the peers (of a given direction)
// hashing them with the given wtxid, and sort them by this hash.
// We then consider top `limit` peers to be low-fanout flood targets.
// The randomness should be seeded with wtxid to return consistent results for every call.
double integer_part;
double fractional_peer = std::modf(limit, &integer_part);
const bool drop_peer_if_extra = deterministic_randomizer_with_wtxid.Finalize() > fractional_peer * double(UINT64_MAX);
const size_t targets = drop_peer_if_extra ? size_t(integer_part): size_t(integer_part) + 1;
std::set<std::pair<uint64_t, NodeId>, ComparePairs> best_peers;
for (auto indexed_state : m_states) {
const auto cur_state = std::get_if<TxReconciliationState>(&indexed_state.second);
if (cur_state && cur_state->m_we_initiate == we_initiate) {
uint64_t hash_key = deterministic_randomizer_with_wtxid.Write(cur_state->m_k0).Finalize();
best_peers.insert(std::make_pair(hash_key, indexed_state.first));
}
}
std::vector<NodeId> result;
auto it = best_peers.begin();
for (size_t i = 0; i < targets && it != best_peers.end(); ++i, ++it) {
result.push_back(it->second);
}
return result;
}
222+ // To handle fractional values, we add one peer optimistically and then probabilistically
223+ // drop it later.
224+ double integer_part;
225+ double fractional_peer = std::modf(limit, &integer_part);
226+ const size_t targets = size_t(integer_part) + 1;
227+ const bool drop_peer_if_extra = deterministic_randomizer_with_wtxid.Finalize() > fractional_peer * double(UINT64_MAX);
121+ for (int i = 0; i < 100; ++i) {
122+ BOOST_CHECK(tracker.ShouldFanoutTo(GetRandHash(), hasher, peer_id0,
123+ /*inbounds_nonrcncl_tx_relay=*/0, /*outbounds_nonrcncl_tx_relay=*/0));
124+ }
125+
126+ // Now for inbound connections.
total_fanouted
are possible.
In commit “p2p: Add transactions to reconciliation set”
You could pick a fixed seed and wtxid for which you assert that with say 35 peers 3 are picked, and another seed/wtxid for which you assert that with 35 peers 4 are picked. That would at least show that the fractional probability code isn’t just rounding down.
125+
126+ // Now for inbound connections.
127+ for (int i = 1; i < 31; ++i) {
128+ tracker.PreRegisterPeer(i);
129+ BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(i, /*is_peer_inbound=*/true, 1, 1), ReconciliationRegisterResult::SUCCESS);
130+
5888+ // child we need to to check if any of the parents is currently
5889+ // reconciled so that the child isn't fanouted ahead. But then
5890+ // it gets tricky when reconciliation sets are full: a) the child
5891+ // can't just be added; b) removing parents from reconciliation
5892+ // sets for this one child is not good either.
5893+ fanout = true;
In 983f8c6e305fd9707c109c2a92637825262b9b09: Since fanout
is already initialized true
, couldn’t we simplify it?
0@@ -5878,19 +5878,17 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
1 if (reconciles_txs) {
2 auto txiter = m_mempool.GetIter(txinfo.tx->GetHash());
3 if (txiter) {
4- if ((*txiter)->GetCountWithDescendants() > 1) {
5- // If a transaction has in-mempool children, always fanout it.
6- // Until package relay is implemented, this is needed to avoid
7- // breaking parent+child relay expectations in some cases.
8- //
9- // Potentially reconciling parent+child would mean that for every
10- // child we need to to check if any of the parents is currently
11- // reconciled so that the child isn't fanouted ahead. But then
12- // it gets tricky when reconciliation sets are full: a) the child
13- // can't just be added; b) removing parents from reconciliation
14- // sets for this one child is not good either.
15- fanout = true;
16- } else {
17+ // If a transaction has in-mempool children, always fanout it.
18+ // Until package relay is implemented, this is needed to avoid
19+ // breaking parent+child relay expectations in some cases.
20+ //
21+ // Potentially reconciling parent+child would mean that for every
22+ // child we need to to check if any of the parents is currently
23+ // reconciled so that the child isn't fanouted ahead. But then
24+ // it gets tricky when reconciliation sets are full: a) the child
25+ // can't just be added; b) removing parents from reconciliation
26+ // sets for this one child is not good either.
27+ if ((*txiter)->GetCountWithDescendants() <= 1) {
28 auto fanout_randomizer = m_connman.GetDeterministicRandomizer(RANDOMIZER_ID_FANOUTTARGET);
29 fanout = m_txreconciliat
30ion->ShouldFanoutTo(wtxid, fanout_randomizer, pto->GetId(),
31
32 inbounds_nonrcncl_tx_relay, outbounds_non
33rcncl_tx_relay);
5823@@ -5814,6 +5824,28 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
5824 // No reason to drain out at many times the network's capacity,
5825 // especially since we have many peers and some will draw much shorter delays.
5826 unsigned int nRelayedTransactions = 0;
5827+
5828+ size_t inbounds_nonrcncl_tx_relay = 0, outbounds_nonrcncl_tx_relay = 0;
5829+ if (m_txreconciliation) {
In 983f8c6e305fd9707c109c2a92637825262b9b09: Correct me if I’m wrong, but we’re going use inbounds_nonrcncl_tx_relay
and outbounds_nonrcncl_tx_relay
only whether reconciles_txs
is true, couldn’t we only fill them if so?:
0 if (reconciles_txs) {
80@@ -81,4 +81,75 @@ BOOST_AUTO_TEST_CASE(IsPeerRegisteredTest)
81 BOOST_CHECK(!tracker.IsPeerRegistered(peer_id0));
82 }
83
84+BOOST_AUTO_TEST_CASE(ShouldFanoutToTest)
AddToSet
and the check of the boundary MAX_SET_SIZE
value.
149+ // (2) really a lot of valid fee-paying transactions were dumped on us at once.
150+ // We don't care about a laggy peer (1) because we probably can't help them even if we fanout transactions.
151+ // However, exploiting (2) should not prevent us from relaying certain transactions.
152+ //
153+ // Transactions which don't make it to the set due to the limit are announced via fan-out.
154+ if (recon_state.m_local_set.size() >= MAX_SET_SIZE) return false;
>
only. Generally sounds maximum in net_processing is understood as strictly superior, e.g MAX_INV_SZ
usage.
0node/txreconciliation.cpp:173 AddToSet: Assertion `recon_state.m_local_set.insert(wtxid).second' failed.
73@@ -74,6 +74,22 @@ class TxReconciliationTracker
74 ReconciliationRegisterResult RegisterPeer(NodeId peer_id, bool is_peer_inbound,
75 uint32_t peer_recon_version, uint64_t remote_salt);
76
77+ /**
78+ * Step 1. Add a new transaction we want to announce to the peer to the local reconciliation set
79+ * of the peer, so that it will be reconciled later, unless the set limit is reached.
80+ * Returns whether it was added.
81+ */
{Pre}RegisterPeer
and the last paragraph on TryRemovingFromSet
, shouldn’t this also advise the caller to make sure the peer is registered?
TryRemovingFromSet
. It’s not that strong anymore anyway, doesn’t help much and pretty obvious i think. Let me know if you think differently.
155+ AssertLockNotHeld(m_txreconciliation_mutex);
156+ LOCK(m_txreconciliation_mutex);
157+ if (!IsPeerRegistered(peer_id)) return false;
158+ auto& recon_state = std::get<TxReconciliationState>(m_states.find(peer_id)->second);
159+
160+ // Check if reconciliation set is not at capacity for two reasons:
172+
173+ Assume(recon_state.m_local_set.insert(wtxid).second);
174+ LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Added %s to the reconciliation set for peer=%d. " /* Continued */
175+ "Now the set contains %i transactions.\n",
176+ wtxid.ToString(), peer_id, recon_state.m_local_set.size());
177+ return true;
I think this is a bit counter-intuitive. If the transaction to be added is already in the set, we will treat this as if it was added when that is actually not the case.
I guess the reasoning is that there should be no way of adding the same transaction more than once, so false
is being used to signal failures. However, this seems like a way to potentially shoot ourselves in the foot.
If we want to keep the logic as is, I’d suggest that we at least mention this in the functions docs, given it currently reads:
0[...]
1* Returns whether it was added.
84@@ -77,6 +85,11 @@ class TxReconciliationTracker::Impl
85 private:
86 mutable Mutex m_txreconciliation_mutex;
87
88+ /**
89+ * ReconciliationTracker-wide randomness to choose fanout targets for a given txid.
90+ */
91+ const SaltedTxidHasher m_txid_hasher;
92+
218+ // hashing them with the given wtxid, and sort them by this hash.
219+ // We then consider top `limit` peers to be low-fanout flood targets.
220+ // The randomness should be seeded with wtxid to return consistent results for every call.
221+
222+ double integer_part;
223+ double fractional_peer = std::modf(limit, &integer_part);
I’m guessing this is supposed to be integral_part and fractional_part (?)
221+
222+ double integer_part;
223+ double fractional_peer = std::modf(limit, &integer_part);
224+ // Handle fractional value.
225+ const bool add_extra = deterministic_randomizer_with_wtxid.Finalize() > fractional_peer * double(UINT64_MAX);
226+ const size_t targets = add_extra ? size_t(integer_part): size_t(integer_part) + 1;
Shouldn’t this need to be the other way around?
Maybe I’m confused by the name of the variables, but it seems like if add_extra
is true, then you should do x+1
tagets_size
may be a better name, in lieu of limit
which is already being used, given this does not really refer to the targets
themselves, but to the size of the returned targets
collection
<
and flipped the ternary conditions. The behavior remains the same. Indeed, it was double-upside-down before.
m_we_initiate
and m_k0
should be fixable at this point, given they are both used by GetFanoutTargets
, shouldn’t they?
281+ // number of inbound targets.
282+ const double inbound_targets = (inbounds_nonrcncl_tx_relay + inbound_rcncl_peers) * INBOUND_FANOUT_DESTINATIONS_FRACTION;
283+ destinations = inbound_targets - inbounds_nonrcncl_tx_relay;
284+ }
285+
286+ if (destinations < 0.01) {
destinations
smaller than 1 but we still want to pass it to GetFanoutTargets
?
Say we have 5 inbound Erlay peers and 0 inbound legacy peers.
inbound_targets = (5+0) * 0.1 = 0.5
destinations = 0.5 - 0 = 0.5
It just means that we will take 1 inbound peer for fanout with a 50% chance.
I guess it will work just fine if i drop the 0.01
(1% or less chance) check. It’s kinda unfortunate we have to do the compute if chance is that small. Fine with me either way, let me know what you think.
254+ AssertLockNotHeld(m_txreconciliation_mutex);
255+ LOCK(m_txreconciliation_mutex);
256+ if (!IsPeerRegistered(peer_id)) return true;
257+ // We use the pre-determined randomness to give a consistent result per transaction,
258+ // thus making sure that no transaction gets "unlucky" if every per-peer roll fails.
259+ deterministic_randomizer.Write(wtxid.GetUint64(0));
wtxid
in, is writing to the randomizer is cheap enough.
5757@@ -5751,9 +5758,12 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
5758 }
5759
5760 if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) {
5761+ // Lock way before it's used to maintain lock ordering.
5762+ LOCK2(m_mempool.cs, m_peer_mutex);
5763 LOCK(tx_relay->m_tx_inventory_mutex);
5764 // Check whether periodic sends should happen
5765 bool fSendTrickle = pto->HasPermission(NetPermissionFlags::NoBan);
5766+ const bool reconciles_txs = m_txreconciliation && m_txreconciliation->IsPeerRegistered(pto->GetId());
reconciles_txs
only used within fSendTrickle == true
? Wouldn’t it be worth moving it to that context?
73@@ -74,6 +74,22 @@ class TxReconciliationTracker
74 ReconciliationRegisterResult RegisterPeer(NodeId peer_id, bool is_peer_inbound,
75 uint32_t peer_recon_version, uint64_t remote_salt);
76
77+ /**
78+ * Step 1. Add a new transaction we want to announce to the peer to the local reconciliation set
79+ * of the peer, so that it will be reconciled later, unless the set limit is reached.
80+ * Returns whether it was added.
81+ */
82+ bool AddToSet(NodeId peer_id, const uint256& wtxid);
Wtxid
type?
155+ // However, exploiting (2) should not prevent us from relaying certain transactions.
156+ //
157+ // Transactions which don't make it to the set due to the limit are announced via fan-out.
158+ if (recon_state.m_local_set.size() >= MAX_SET_SIZE) return false;
159+
160+ Assume(recon_state.m_local_set.insert(wtxid).second);
5887+ // Until package relay is implemented, this is needed to avoid
5888+ // breaking parent+child relay expectations in some cases.
5889+ //
5890+ // Potentially reconciling parent+child would mean that for every
5891+ // child we need to to check if any of the parents is currently
5892+ // reconciled so that the child isn't fanouted ahead. But then
You’re right.
My fear with this is unexpected behavior for tx sender: e.g., you craft a “package” thinking parent always goes ahead, but then child gets ahead (potentially with the attacker’s help) and dropped on the floor due to some policy. Something along this, but maybe I’m making it up. Are these concerns at least semi-valid? @glozow
I can add “see whether a parent is in the set already” check, when looking at a child, if we think it’s worth it.
I confirm with current implemented bip331 approach there is currently a MAX_ORPHAN_TOTAL_SIZE
limit.
You can always get a non-standard parent (e.g an under-dust output) and yet the child be policy valid.
There is currently no sanitization of policy equivalence among a set of parent within ancpkginfo
.
In the future, you could have reconciliation at the pkgtxns
-level or at package announcement (ancpkginfo
).
Ideally both, though that something that can be seen once erlay or bip331 are deployed.
Assuming there is no substantial timely delay between the parent being reconciliated and the child being fanout to the peer which would allow an overflow of MAX_ORPHAN_TOTAL_SIZE
, I don’t think it’s altering the package acceptance of the receiving peer.
Assuming no exploitable timers, one can still make the simulation to quantity the “child drift” risk for a distribution of parent / child being reconciliated / fanout on the average time discrepancies between those 2 tx announcement strategy. Ideally in the future, we would move to sender-initiated package, which would remove this concern from my understanding. However, this is already a post-bip331 future, we’re talking about.
289+ if (destinations < 0.01) {
290+ return false;
291+ }
292+
293+ auto fanout_candidates = GetFanoutTargets(deterministic_randomizer, recon_state.m_we_initiate, destinations);
294+ return std::count(fanout_candidates.begin(), fanout_candidates.end(), peer_id);
I wonder why are we using std::count
here when we will find, at most, a single instance of peer_id
within fanout_candidates
. Wouldn’t std::find
be a better option?
On the same line, it may even be worth making GetFanoutTargets
return a set instead of a vector, given we are sure that the elements of the collection won’t be repeated
218+ bool we_initiate, double limit) const EXCLUSIVE_LOCKS_REQUIRED(m_txreconciliation_mutex)
219+ {
220+ // The algorithm works as follows. We iterate through the peers (of a given direction)
221+ // hashing them with the given wtxid, and sort them by this hash.
222+ // We then consider top `limit` peers to be low-fanout flood targets.
223+ // The randomness should be seeded with wtxid to return consistent results for every call.
wtxid
? I wonder because it feels like the results may be deterministic based on the ordering of the m_states
, which may not be persistent if a peer changes (?)
wtxid
. The risk of changing m_states
i thought is acceptable, it’s a rare event (outbound peers) and hard to exploit in a meaningful way. But I’m open to more analysis.
m_states
? That way you ensure that a change in the ordering is not going to, potentially, produce a completely different order of the whole best_peers
set. In this case, the worst that could happen would be that the newly added peer jumps places, and becomes one of the selected/not selected peers, instead of a complete rearrange of the collection
5890+ // Potentially reconciling parent+child would mean that for every
5891+ // child we need to to check if any of the parents is currently
5892+ // reconciled so that the child isn't fanouted ahead. But then
5893+ // it gets tricky when reconciliation sets are full: a) the child
5894+ // can't just be added; b) removing parents from reconciliation
5895+ // sets for this one child is not good either.
Sure, what exactly?
here I talk about an alternative way to implement this.
Say, we want to add a child to the set, because the parent is already there, and we don’t want child ahead of parent. But the set is full. We can’t ignore the limit — that’s (a). (b) means that we could fanout child+parent in this case, but this remove parent from set
operation is harder to reason about. Should we then remove parents of a parent too?
Maybe I overthink this issue.
58@@ -51,9 +59,6 @@ class TxReconciliationState
59 bool m_we_initiate;
60
61 /**
62- * TODO: These fields are public to ignore -Wunused-private-field. Make private once used in
63- * the following commits.
64- *
nit: if you have created a refactor commit to deal with removing the TODOs, it may be worth moving that change there.
Not sure if you’re planning to squash it or not. Feel free to disregard.
re-ACK 3a062b2 the diff is mainly moving the removal of TODOs between commits.
I’ve noticed that the co-authorship of 3a062b2bdc6dc787b967947872f55131522cd2ac was dropped, which may have been unintended.
Also, looks like this is failing CI, but it may be unrelated.
These comments became irrelevant in one of the previous code changes.
They simply don't make sense anymore.
141+ CSipHasher hasher(0x0706050403020100ULL, 0x0F0E0D0C0B0A0908ULL);
142+
143+ // If peer is not registered for reconciliation, it should be always chosen for flooding.
144+ BOOST_REQUIRE(!tracker.IsPeerRegistered(peer_id0));
145+ for (int i = 0; i < 100; ++i) {
146+ BOOST_CHECK(tracker.ShouldFanoutTo(GetRandHash(), hasher, peer_id0,
97@@ -84,6 +98,12 @@ class TxReconciliationTracker
98 * Check if a peer is registered to reconcile transactions with us.
99 */
100 bool IsPeerRegistered(NodeId peer_id) const;
101+
102+ /**
103+ * Returns whether the peer is chosen as a low-fanout destination for a given tx.
104+ */
105+ bool ShouldFanoutTo(const uint256& wtxid, CSipHasher deterministic_randomizer, NodeId peer_id,
Ups, I forgot to add this.
nit: Shouldn’t this also be Wtxid&
for consistency with the rest of the interface?
This is non-blocking, but I realized when reviewing the last changes to the tests
238+ result.insert(it->second);
239+ }
240+ return result;
241+ }
242+
243+ bool ShouldFanoutTo(const Wtxid& wtxid, CSipHasher deterministic_randomizer, NodeId peer_id,
deterministic_randomizer
could be passed by reference
CSipHasher&& deterministic_randomizer
. This avoids a copy, allows the local variable inside ShouldFanoutTo
to be modified, and the caller is constructing a fresh CSipHasher
anyway.
Code Review ACK f56ec8a3b086b0f2f8a0fbde861447e40ffbc3d9
One thing I’m unsure about is how the way we call ShouldFanoutTo()
multiple times for each transaction might affect performance.
If we have 120 reconciling peers, and get a dump of MAX_SET_SIZE=3000
transactions, I think we’d call this function 360000
times within a short timeframe. I wonder how long that would take, maybe we could add a benchmark for this? (could be done a follow-up)
GetFanoutTargets
once for every transaction, and then memorize it in a std::map<Wtxid, list_of_peers>
inside the TxReconciliation module. We can cap the map at 1000 in a FIFO-fashion. Worst case we drop a relevant transaction and then have to recompute it (that’s current behaviour default case). Memory should be ok too: 1000 txs * 120 peers * 0.1 ratio = 12,000 entries of NodeId
. What do you think?
What do you think?
To get an estimate about performance, I wrote a simple benchmark, see https://github.com/mzumsande/bitcoin/commit/22f98d6f54042b3e8ee41889ceb362f6355e4fa0. On my computer (neither particularly fast nor slow) I get ~1000op/s
per wtxid to call ShouldFanoutTo()
for all 120 peers, so a batch of 3000 txs should take ~3s time in ShouldFanoutTo
.
So I think that caching could make sense.
I wonder if there is there a good way to restrict this map to wtxids that are still in some peer’s inv queue and remove them afterwards, or would the map just always fill up?
205+ AssertLockNotHeld(m_txreconciliation_mutex);
206+ LOCK(m_txreconciliation_mutex);
207+ return IsPeerRegistered(peer_id);
208+ }
209+
210+ std::set<NodeId> GetFanoutTargets(CSipHasher& deterministic_randomizer_with_wtxid,
deterministic_randomizer_with_wtxid
variable could be const CSipHasher&
here (to make it clear that this object is only ever used as a starting point for (independent) hashes).
223+
224+ auto cmp_by_key = [](const std::pair<uint64_t, NodeId>& left, const std::pair<uint64_t, NodeId>& right) {
225+ return left.first > right.first;
226+ };
227+
228+ std::set<std::pair<uint64_t, NodeId>, decltype(cmp_by_key)> best_peers(cmp_by_key);
std::vector<std::pair<uint64_t, NodeId>>
, .reserve()
it once, fill it up like in the loop below, and then std::sort
it in-place using cmp_by_key
. This has far better memory locality, less allocation overhead, and ought to have the same asymptotic complexity (O(n log n)).
284+
285+ // We use the pre-determined randomness to give a consistent result per transaction,
286+ // thus making sure that no transaction gets "unlucky" if every per-peer roll fails.
287+ deterministic_randomizer.Write(wtxid.ToUint256());
288+
289+ auto fanout_candidates = GetFanoutTargets(deterministic_randomizer, recon_state.m_we_initiate, destinations);
Constructing the full set of fanout candidates any time you want to consider relaying anything to any node has O(n^2 log n) scaling (you’ll call GetFanoutTargets
O(n) times, and each does O(n log n) work going over all nodes), I believe.
That may or may not matter, but I’d consider at least one of these variants:
GetFanoutTargets
result per transaction as discussed elsewhere in this PR, making it O(n log n) only.std::set<NodeId>
in GetFanoutTargets
(which at least involves an allocation per result), pass the peer_id
to GetFanoutTargets
, which can then just return a boolean (see if the peer_id
appears in the first target_size
elements of best_peers
after sorting). This doesn’t change the asymptotic behavior, but means there is just one allocation per ShouldFanoutTo
.217+
218+ double integral_part;
219+ double fractional_part = std::modf(limit, &integral_part);
220+ // Handle fractional value.
221+ const bool add_extra = deterministic_randomizer_with_wtxid.Finalize() < fractional_part * double(UINT64_MAX);
222+ const size_t targets_size = add_extra ? size_t(integral_part) + 1 : size_t(integral_part);
const size_t targets_size = integral_part + add_extra
.
An alternative which may be slightly faster (unsure, depends on how fast std::modf
is):
const size_t targets_size = ((deterministic_randomizer_with_wtxid.Finalize() & 0xFFFFFFFF) + uint64_t(limit * 0x100000000)) >> 32;
It has lower precision, but I suspect 32 bits is more than sufficient here.
262+ double destinations;
263+ if (recon_state.m_we_initiate) {
264+ destinations = OUTBOUND_FANOUT_DESTINATIONS - outbounds_nonrcncl_tx_relay;
265+ } else {
266+ const size_t inbound_rcncl_peers = std::count_if(m_states.begin(), m_states.end(),
267+ [](std::pair<NodeId, std::variant<uint64_t, TxReconciliationState>> indexed_state) {
Pass indexed_state
by reference. This is creating a copy of the entire TxReconciliationState
for every element in m_states
.
[](const auto& indexed_state) { ... }
works.
229+
230+ for (auto indexed_state : m_states) {
231+ const auto cur_state = std::get_if<TxReconciliationState>(&indexed_state.second);
232+ if (cur_state && cur_state->m_we_initiate == we_initiate) {
233+ uint64_t hash_key = CSipHasher(deterministic_randomizer_with_wtxid).Write(cur_state->m_k0).Finalize();
234+ best_peers.insert(std::make_pair(hash_key, indexed_state.first));
best_peers.emplace(hash_key, indexed_state.first);
(avoids a temporary std::pair
object in the caller).
(it’d become best_peers.emplace_back(hash_key indexed_state.first);
if best_peers
is converted to a vector as suggested above.
225+ return left.first > right.first;
226+ };
227+
228+ std::set<std::pair<uint64_t, NodeId>, decltype(cmp_by_key)> best_peers(cmp_by_key);
229+
230+ for (auto indexed_state : m_states) {
const auto& indexed_state : m_states)
here; you’re making a copy of the entire TxReconciliationState
for every iteration as written here.
5826@@ -5818,6 +5827,29 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
5827 // No reason to drain out at many times the network's capacity,
5828 // especially since we have many peers and some will draw much shorter delays.
5829 unsigned int nRelayedTransactions = 0;
5830+
5831+ size_t inbounds_nonrcncl_tx_relay = 0, outbounds_nonrcncl_tx_relay = 0;
5832+ const bool reconciles_txs = m_txreconciliation && m_txreconciliation->IsPeerRegistered(pto->GetId());
5833+ if (reconciles_txs) {
5834+ for (auto [cur_peer_id, cur_peer] : m_peer_map) {
const auto& [cur_peer_id, cur_peer] : m_peer_map
here. You’re making a copy of the PeerRef
object for every iteration here (which isn’t the end of the world, it’s just a shared pointer, but copying those does involve an atomic memory operation).
Not a full review; mostly performance improvement suggestions as I saw there was a suggestion to cache things. Before considering that, it may be worth benchmarking if that’s still relevant with these review comments addressed.
See https://github.com/sipa/bitcoin/commits/pr28765 for a commit incorporating these changes (excluding caching). Feel free to use whatever part of it. I assume you’d at least want to rename GetFanoutTargets
if its return value becomes a bool
like here, but I haven’t included that to minimize changes.
Before @sipa optimization:
0| ns/op | op/s | err% | total | benchmark
1|--------------------:|--------------------:|--------:|----------:|:----------
2| 1,401,785.00 | 713.38 | 1.6% | 0.02 | `ShouldFanoutTo`
After:
0| ns/op | op/s | err% | total | benchmark
1|--------------------:|--------------------:|--------:|----------:|:----------
2| 369,620.50 | 2,705.48 | 1.4% | 0.01 | `ShouldFanoutTo`
4x improvement, I think batching is still worth implementing. I will suggest code changes shortly.
Finally submitted the optimized code.
Did you perhaps push the wrong branch? You upvoted several of sipa’s suggestions but they don’t seem to be addressed in the current code.
@mzumsande you’re right i lost between branches, sorry.
Apparently caching gives another 12x boost, assuming it works correctly :)
0| ns/op | op/s | err% | total | benchmark
1|--------------------:|--------------------:|--------:|----------:|:----------
2| 35,617.11 | 28,076.40 | 1.9% | 0.01 | `ShouldFanoutTo`
98+ * be hard to manipulate).
99+ *
100+ * No need to use LRU (bump transaction order upon access) because in most cases
101+ * transactions are processed almost-sequentially.
102+ */
103+ std::deque<Wtxid> tx_fanout_targes_cache_order;
targes
-> targets
266+ new_fanout_candidates.insert(it->second);
267+ }
268+
269+ tx_fanout_targets_cache_data.emplace(wtxid, new_fanout_candidates);
270+ // Replace the oldest cache item with this new one.
271+ if (tx_fanout_targes_cache_order.size () == 3000) {
270+ // Replace the oldest cache item with this new one.
271+ if (tx_fanout_targes_cache_order.size () == 3000) {
272+ auto expired_tx = tx_fanout_targes_cache_order.front();
273+ tx_fanout_targets_cache_data.erase(expired_tx);
274+ tx_fanout_targes_cache_order.pop_front();
275+ tx_fanout_targes_cache_order.push_back(wtxid);
I don’t think this is right(?), this logic is never triggered.
You are checking whether the size of tx_fanout_targes_cache_order
is over the limit, but you are only adding elements to the queue within that same if
context, meaning that the queue is always empty.
127@@ -128,14 +128,20 @@ class TxReconciliationTracker::Impl
128 }
129 }
130
131- bool IsPeerRegistered(NodeId peer_id) const EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
132+ bool IsPeerRegistered(NodeId peer_id) const EXCLUSIVE_LOCKS_REQUIRED(m_txreconciliation_mutex)
In “refactor: Add a pre-mutexed version of IsPeerRegistered”:
I’d prefer to swap the names (= have an external IsPeerRegistered
, and an internal IsPeerRegisteredInternal
), as external callers ought to only care about the external one. Also is it possible to make the internal one private
?
56+ * Store all wtxids which we would announce to the peer (policy checks passed, etc.)
57+ * in this set instead of announcing them right away. When reconciliation time comes, we will
58+ * compute a compressed representation of this set ("sketch") and use it to efficiently
59+ * reconcile this set with a set on the peer's side.
60+ */
61+ std::set<Wtxid> m_local_set;
MAX_SET_SIZE
) elements, it may be worth using an std::unordered_set
instead (with something like SaltedTxidHasher
as hasher).
231+ }
232+
233+ static constexpr auto cmp_by_key = [](const auto& left, const auto& right) {
234+ return left.first > right.first;
235+ };
236+ std::sort(best_peers.begin(), best_peers.end(), cmp_by_key);
In commit “p2p: Add transactions to reconciliation sets”
It doesn’t actually matter whether we pick the lowest-hash_key
entries or highest-hash_key
entries to fanout to, I think? If so, can just use std::sort(best_peers.begin(), best_peers.end());
, and drop the cmp_by_key
.
Also, since you only care about the top targets_size
results, I believe using std::partial_sort
may be faster (but benchmark it).
ShouldFanoutTo
looks fitting for benchmarking, and partial_sort
had no effect.
I took the former suggestion of course.
246+ size_t inbounds_nonrcncl_tx_relay, size_t outbounds_nonrcncl_tx_relay)
247+ const EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
248+ {
249+ AssertLockNotHeld(m_txreconciliation_mutex);
250+ LOCK(m_txreconciliation_mutex);
251+ if (!IsPeerRegistered(peer_id)) return true;
In commit “p2p: Add transactions to reconciliation set”
Would it make sense to have an (internal) helper function instead of IsPeerRegistered
, that instead of returning a bool
returns a TxReconciliationState*
(nullptr
if not found)? That would avoid locking/finding twice. I suspect it’d be usable in other places too.
260+ // whether the given peer falls into this list.
261+ double destinations;
262+ if (recon_state.m_we_initiate) {
263+ destinations = OUTBOUND_FANOUT_DESTINATIONS - outbounds_nonrcncl_tx_relay;
264+ } else {
265+ const size_t inbound_rcncl_peers = std::count_if(m_states.begin(), m_states.end(),
In commit “p2p: Add transactions to reconciliation set”
This value could be cached inside TxReconciliationTracker::Impl
I think, and updated on registering/deregister, to avoid recomputating it every time?
103+ *
104+ * No need to use LRU (bump transaction order upon access) because in most cases
105+ * transactions are processed almost-sequentially.
106+ */
107+ std::deque<Wtxid> tx_fanout_targets_cache_order;
108+ std::map<Wtxid, std::set<NodeId>> tx_fanout_targets_cache_data GUARDED_BY(m_txreconciliation_mutex);
In commit “Cache fanout candidates to optimize txreconciliation”
Since the cache entry std::set<NodeId>
don’t change until being replaced entirely, I believe it would be more efficient to use a std::vector<NodeId>
, with the nodeids in sorted order. You can then use std::binary_search
to query in that vector.
It’d be more efficient as std::vector
has much better memory locality than std::set
, and fewer indirections.
If you do this, can also just reuse the best_peers
vector, and shrink it to target_size
(but make sure to call shrink_to_fit
, as otherwise the memory for the remainder won’t be released.
267- if (it->second == peer_id) return true;
268+ new_fanout_candidates.insert(it->second);
269 }
270- return false;
271+
272+ tx_fanout_targets_cache_data.emplace(wtxid, new_fanout_candidates);
In commit “Cache fanout candidates to optimize txreconciliation”
Use std::move
around new_fanout_candidates
(or best_peers
if you take my earlier suggestion to use a vector), to avoid a copy. If so, lookup the peer_id
in it before moving, so you can return that value.
They will be used later on.
5827@@ -5818,6 +5828,29 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
5828 // No reason to drain out at many times the network's capacity,
5829 // especially since we have many peers and some will draw much shorter delays.
5830 unsigned int nRelayedTransactions = 0;
5831+
5832+ size_t inbounds_nonrcncl_tx_relay = 0, outbounds_nonrcncl_tx_relay = 0;
inbounds_flooding_tx_relay
/ outbounds_flooding_tx_relay
, which is matching the low-fanout flooding strategy used for them.
5893+ // reconciled so that the child isn't fanouted ahead. But then
5894+ // it gets tricky when reconciliation sets are full: a) the child
5895+ // can't just be added; b) removing parents from reconciliation
5896+ // sets for this one child is not good either.
5897+ if ((*txiter)->GetCountWithDescendants() <= 1) {
5898+ fanout = m_txreconciliation->ShouldFanoutTo(wtxid, pto->GetId(),
LogPrint(BCLog::NET, “Non-signaling reconciliation inbound peers flooding %d Outbound peers flooding %d for debug”);
for debug purpose and observation
337+ destinations = inbound_targets - inbounds_nonrcncl_tx_relay;
338+ }
339+
340+ // Pure optimization to avoid going through the peers when the odds of picking one are
341+ // too low.
342+ if (destinations < 0.001) {
OUTBOUND_FANOUT_DESTINATIONS
and INBOUND_FANOUT_DESTINATIONS_FRACTION
as explanation for this value is tight to them
0.001
value won’t be affected.
Transactions eligible for reconciliation are added to the
reconciliation sets. For the remaining txs, low-fanout is used.
Co-authored-by: Martin Zumsande <mzumsande@gmail.com>
Co-authored-by: Pieter Wuille <pieter.wuille@gmail.com>
301+
302+ // If the cache is full, make room for the new entry.
303+ if (m_tx_fanout_targets_cache_order.size() == FANOUT_TARGETS_PER_TX_CACHE_SIZE) {
304+ auto expired_tx = m_tx_fanout_targets_cache_order.front();
305+ m_tx_fanout_targets_cache_data.erase(expired_tx);
306+ m_tx_fanout_targets_cache_order.pop_front();
In the eventuality of an influx of inbound transactions, faster than we can flush out them to low-fanout flooding peers, my understanding of dropping the upfront wtxid candidate, we would keep propagating this transaction only according to tx-relay policy and connection state of other peers (not this NodeId
anymore).
I understand we’re fanning out only to outbound peers (m_tx_fanout_targets_cache_data
doc), though here it’s more a dependency on the perfomance capabilities of the full-node itself (i.e how fast you process vInv(MSG_WTX
) and how fast you-reannounce them to downstream peers if valid). To interferes with a transaction propagation, assuming a non-listening node, an attacker would have to be puppet or compromise all our low-fanout outbound peers, I think ? Obviously more outbound peers would make things better on this front, which should be allowed by Erlay tx-relay bandwidth savings.
67 * These values are used to salt short IDs, which is necessary for transaction reconciliations.
68 */
69 uint64_t m_k0, m_k1;
70
71+ /**
72+ * Store all wtxids which we would announce to the peer (policy checks passed, etc.)
In terms of peers-side policy check (i.e m_fee_filter_received
), this policy limit is at the tx-relay link level and this is unilaterally initiated by the peer. As such I think there is no guarantee that between time point A we add a Wtxid
in m_local_set
and time point B we reconciliate, we have not received a new bip133 message, updating the m_fee_filter_received
. I believe we can retro-actively stale stored Wtxid
and as such a bandwidth performance leak, under situations of sudden network mempool spikes.
I don’t think there is that much a tx-announcement strategy (either flooding or reconciliation) can do it in itself, unless assuming some extensions to bip133 messages to commit on a feerate-level duration. As such, I think any improvement is out of scope for this PR.
196+ // Check if the reconciliation set is not at capacity for two reasons:
197+ // - limit sizes of reconciliation sets and short id mappings;
198+ // - limit CPU use for sketch computations.
199+ //
200+ // Since we reconcile frequently, reaching capacity either means:
201+ // (1) a peer for some reason does not request reconciliations from us for a long while, or
DEFAULT_MAX_PEER_CONNECTIONS
and reconciliation state is clean up with FinalizeNode
".
be8ef38d29
, still reading back txrelayism issue on announcement-related bandwidth / latency and responsibilities trade-off for the choice of current constants.
29+ for (size_t i = 0; i < 1000; i++) {
30+ txs.push_back(Wtxid::FromUint256(rc.rand256()));
31+ }
32+
33+ bench.run([&] {
34+ size_t rand_i = rand() % txs.size();
rand_i
into bench.run
? Couldn’t we have it out of it? Notice that without it, the bench is 70% faster.
Do you understand why this simple op as var assignment eats so much perf? :)
I don’t think std::rand()
has any quality or performance documentations, so it is free to do whatever it wants
It helps to avoid recomputing every time we consider
a transaction for fanout/reconciliation.
5892+ // child we need to check if any of the parents is currently
5893+ // reconciled so that the child isn't fanouted ahead. But then
5894+ // it gets tricky when reconciliation sets are full: a) the child
5895+ // can't just be added; b) removing parents from reconciliation
5896+ // sets for this one child is not good either.
5897+ if ((*txiter)->GetCountWithDescendants() <= 1) {
GetCountWithDescendants()
could be marked with parent_fanout=true
, that way we guarantee more stringently that all the members of a chain of transactions are tx-announcement relayed through the same strategy (either erlay or low-fanout flooding). I’ll check if there is test coverage here.
174@@ -175,6 +175,8 @@ static constexpr double MAX_ADDR_RATE_PER_SECOND{0.1};
175 static constexpr size_t MAX_ADDR_PROCESSING_TOKEN_BUCKET{MAX_ADDR_TO_SEND};
176 /** The compactblocks version we support. See BIP 152. */
177 static constexpr uint64_t CMPCTBLOCKS_VERSION{2};
178+/** Used to determine whether to use low-fanout flooding (or reconciliation) for a tx relay event. */
179+static const uint64_t RANDOMIZER_ID_FANOUTTARGET = 0xbac89af818407b6aULL; // SHA256("fanouttarget")[0:8]
constexpr
?
FANOUTTARGET
-> FANOUT_TARGET
?
290+ std::sort(best_peers.begin(), best_peers.end());
291+ best_peers.resize(targets_size);
292+
293+ std::vector<NodeId> new_fanout_candidates;
294+ new_fanout_candidates.reserve(targets_size);
295+ for_each(best_peers.begin(), best_peers.end(),
std::for_each
?
260+ }
261+
262+ // We use the pre-determined randomness to give a consistent result per transaction,
263+ // thus making sure that no transaction gets "unlucky" if every per-peer roll fails.
264+ CSipHasher deterministic_randomizer{m_deterministic_randomizer};
265+ deterministic_randomizer.Write(wtxid.ToUint256());
CSipHasher
, given it’s a pseudo-random hash function, verified it’s well-initialized from two hidden random 64-bit seeds in src/init.cpp
(L1239
). Then we add a CSipHasher
instance provided by CConman
at TxReconciliationTracker
initialization in src/net_processing.cpp
. This respect the SipHash’s PRF’s requirement to initialize it with a random 128-bit key. I still wonder if in the future TxReconciliationTracker
shouldn’t get it’s own random seed (i.e use GetRand()
, it promises fast entropy generation) to isolate tx-announcement from the rest of network connection management.
249@@ -142,9 +250,104 @@ class TxReconciliationTracker::Impl
250 return (recon_state != m_states.end() &&
251 std::holds_alternative<TxReconciliationState>(recon_state->second));
252 }
253+
254+ // Not const because of caching.
255+ bool IsFanoutTarget(const Wtxid& wtxid, NodeId peer_id, bool we_initiate, double limit) EXCLUSIVE_LOCKS_REQUIRED(m_txreconciliation_mutex)
destination
rather than limit
to be consistent with ShouldFanoutTo
and denotates more clearly it’s the sample space boundary.
279+ best_peers.reserve(m_states.size());
280+
281+ for (const auto& indexed_state : m_states) {
282+ const auto cur_state = std::get_if<TxReconciliationState>(&indexed_state.second);
283+ if (cur_state && cur_state->m_we_initiate == we_initiate) {
284+ uint64_t hash_key = CSipHasher(deterministic_randomizer).Write(cur_state->m_k0).Finalize();
src/node/txreconciliation.cpp
can be updated to reflect the usage of m_k0
as a siphash input string for low-fanout flood peers selection. Not only used in ComputeShortID
.
m_k0
. IMO it’d better not be, to not repurpose something that is meant for something completely different
88+ {
89+ AssertLockHeld(m_txreconciliation_mutex);
90+ auto salt_or_state = m_states.find(peer_id);
91+ if (salt_or_state == m_states.end()) return nullptr;
92+
93+ auto* state = std::get_if<TxReconciliationState>(&salt_or_state->second);
198+ // - limit CPU use for sketch computations.
199+ //
200+ // Since we reconcile frequently, reaching capacity either means:
201+ // (1) a peer for some reason does not request reconciliations from us for a long while, or
202+ // (2) really a lot of valid fee-paying transactions were dumped on us at once.
203+ // We don't care about a laggy peer (1) because we probably can't help them even if we fanout transactions.