Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughSplit the monolithic p2p example into three example crates; added initiator/responder binaries with OTEL; refactored pallas-network2 into modular initiator/responder behaviors, responder subcomponents, a TCP connection pool + listener, emulation mocks, tests, and related manifest updates. Changes
Sequence Diagram(s)sequenceDiagram
rect rgba(200,200,255,0.5)
participant Initiator as Initiator Node
participant Network as Bearer / Interface / TcpConnectionPool
participant Responder as Responder Node
end
Initiator->>Network: Connect(peer)
Network->>Responder: InterfaceEvent::Connected(pid)
Initiator->>Network: Send(ProposeHandshake)
Network->>Responder: InterfaceEvent::Recv(ProposeHandshake)
Responder->>Responder: HandshakeResponder::try_accept
Responder->>Network: Send(AcceptHandshake)
Network->>Initiator: InterfaceEvent::Recv(AcceptHandshake)
Initiator->>Initiator: Emit InitiatorEvent::PeerInitialized
sequenceDiagram
rect rgba(200,255,200,0.5)
participant Initiator as Initiator Node
participant Network as Bearer / Interface / TcpConnectionPool
participant Responder as Responder Node
end
Initiator->>Network: Send(FindIntersect)
Network->>Responder: InterfaceEvent::Recv(FindIntersect)
Responder->>Responder: ChainSyncResponder::check_requests
Responder->>Network: Send(IntersectionFound)
Network->>Initiator: InterfaceEvent::Recv(IntersectionFound)
Initiator->>Network: Send(RequestRange)
Network->>Responder: InterfaceEvent::Recv(RequestRange)
Responder->>Responder: BlockFetchResponder::check_requests
Responder->>Network: Send(Blocks)
Network->>Initiator: InterfaceEvent::Recv(Blocks)
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 12
🧹 Nitpick comments (4)
examples/p2p-initiator/src/main.rs (1)
109-116: Consider making the bootstrap peer configurable instead of hardcoded localhost.This keeps the example usable without source edits.
Suggested diff
- node.network - .execute(InitiatorCommand::IncludePeer( - "127.0.0.1:3000".parse().unwrap(), - )); + let peer_addr = std::env::var("P2P_RESPONDER_ADDR") + .unwrap_or_else(|_| "127.0.0.1:3000".to_string()) + .parse() + .expect("invalid P2P_RESPONDER_ADDR"); + node.network + .execute(InitiatorCommand::IncludePeer(peer_addr));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/p2p-initiator/src/main.rs` around lines 109 - 116, Replace the hardcoded bootstrap address used in node.network.execute(InitiatorCommand::IncludePeer(...)) and StartSync with a configurable value (e.g., read from an environment variable or CLI argument) so the example can run without source edits; update the code around InitiatorCommand::IncludePeer and InitiatorCommand::StartSync to read and parse the bootstrap peer string (default to "127.0.0.1:3000" if unset), handle parse errors gracefully instead of unwrap(), and use that parsed SocketAddr when calling node.network.execute.examples/p2p-initiator/Cargo.toml (1)
1-5: Mark this example crate as non-publishable for consistency.Other example crates in this PR set
publish = false; doing the same here avoids accidental publication.Suggested diff
[package] name = "p2p-initiator" version = "0.1.0" edition = "2021" +publish = false🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/p2p-initiator/Cargo.toml` around lines 1 - 5, Add publish = false to the [package] table in the Cargo.toml for the p2p-initiator crate so it is marked non-publishable; specifically update the package metadata for the crate named "p2p-initiator" (the [package] section containing name = "p2p-initiator", version = "0.1.0", edition = "2021") to include publish = false to match the other example crates and prevent accidental publication.pallas-network2/src/interface.rs (1)
283-288: Avoid synthetic peer IDs for listener accept failures.Using
"accept-error:0"as a fake peer identifier can leak into peer-scoped handling and metrics. Prefer a dedicated non-peer accept-error path (or log-and-rearm without emitting a peer-keyed error event).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pallas-network2/src/interface.rs` around lines 283 - 288, Do not fabricate a PeerId for listener accept failures; add a dedicated non-peer event like a new InternalEvent::ListenerAcceptError(io::Error) (or InternalEvent::AcceptError) and emit that instead of creating PeerId in the accept error path, then update any match/metrics/handlers that previously assumed InternalEvent::Error with a PeerId to handle the new variant (or alternately log the error and re-arm the listener instead of emitting a peer-scoped event). Ensure changes touch the accept error site that currently constructs PeerId and the consumers that match InternalEvent::Error.pallas-network2/src/emulation/initiator_mock.rs (1)
81-88: Consider clearingprogresson disconnect events.
progressentries survive disconnects, so reconnecting the samePeerIdcan resume with staleheaders_sent.Small cleanup patch
- if let Poll::Ready(Some(InterfaceEvent::Connected(pid))) = &event { - self.on_connected(pid); + if let Poll::Ready(Some(ev)) = &event { + match ev { + InterfaceEvent::Connected(pid) => self.on_connected(pid), + InterfaceEvent::Disconnected(pid) => { + self.progress.remove(pid); + } + _ => {} + } }Also applies to: 221-229
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pallas-network2/src/emulation/initiator_mock.rs` around lines 81 - 88, When emitting disconnect events (e.g., in push_jittered_disconnect where you produce InterfaceEvent::Disconnected(pid)), clear any stored progress for that peer so a reconnect doesn't resume with stale headers_sent; remove the entry from the progress map keyed by the same PeerId (and do the same in the other disconnect site around lines 221-229). Locate the progress collection usage (the progress map/field) and ensure you call its remove/clear method for pid immediately when pushing/sending a Disconnected event for that PeerId.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@examples/p2p-responder/src/main.rs`:
- Around line 147-149: The startup logging currently calls
listener.local_addr().unwrap() which can panic; replace the unwrap with a safe
extraction (e.g., call listener.local_addr().map(|a|
a.to_string()).unwrap_or_else(|e| format!("unknown (error: {})", e)) or use a
match/if let to produce a String) and pass that safe String into tracing::info!
so logging never panics; update the tracing::info! invocation (the macro usage
and the listener variable) to log the resolved address or the error string
instead of unwrapping.
In `@pallas-network2/src/bearer.rs`:
- Around line 202-205: The partial chunk map is being indexed by the masked
`channel` (raw_channel & !crate::protocol::PROTOCOL_SERVER), which can merge
fragments from different mode channels; change accesses to use the unmasked
`raw_channel` as the key when inserting, removing, or looking up in
`partial_chunks` (e.g., where `previous = partial_chunks.remove(&channel)` and
similarly at the other occurrences around lines 222-224), while still using the
masked `channel` value for any mode-specific logic—keep `read_segment().await?`
-> `raw_channel` and use `partial_chunks` with `&raw_channel` to prevent
cross-mode mixing.
- Around line 86-89: The current code unconditionally calls
socket2::SockRef::from(&stream).set_linger(Some(Duration::from_secs(0))) which
forces abortive closes (RST) and can drop unsent bytes; remove this default
behavior and make it opt-in. Change the code around SockRef::from(&stream) /
set_linger so that linger is not set by default (leave None), or wrap the
set_linger call behind a configurable flag (e.g.,
TransportOptions::aggressive_linger or a parameter to the function that
constructs Self::Tcp) and only call set_linger(Some(Duration::from_secs(0)))
when that flag is true; ensure Self::Tcp construction path and any callers are
updated to use the new default (no linger) and only opt in when explicitly
requested.
In `@pallas-network2/src/behavior/initiator/mod.rs`:
- Line 510: The InitiatorCommand::SendTx arm currently calls todo!() which will
panic; replace it with the non-panicking flow used by the tx submission state
machine: extract the transaction payload from InitiatorCommand::SendTx,
validate/convert as needed, then forward it into the existing tx_submission
handler (e.g., call the tx_submission module's submit/send function or enqueue
it on self.tx_submission), update the initiator state to the
TxSubmission/awaiting-response state, and ensure errors are returned/handled
instead of panicking; look for symbols like InitiatorCommand::SendTx,
tx_submission (module/struct), and any submit/handle_send_tx or
State::TxSubmission to hook into the current flow.
- Around line 247-256: The reset() method currently resets many fields but omits
clearing protocol sub-states, causing prior-session state to leak; update the
reset(&mut self) implementation in initiator::mod.rs to also reset
self.chainsync, self.tx_submission, and self.continue_sync to their default
states (keeping existing behavior of not touching the error count field and
preserving other resets like connection, promotion, handshake, keepalive,
peersharing, blockfetch, and violation). Ensure you call the appropriate
Default::default() for chainsync, tx_submission, and continue_sync so all
protocol state is cleared between sessions.
In `@pallas-network2/src/behavior/responder/connection.rs`:
- Around line 106-137: When rejecting connections we never increment
active_peers but still unconditionally decrement in visit_disconnected, causing
drift; fix by tracking which peers were actually accepted (e.g., add a
collection like accepted_peers: HashSet<PeerId> on the behavior or a boolean
flag on ResponderState), insert the pid into accepted_peers in the accepted
branch where active_peers is incremented and increment
connections_accepted_counter, and on disconnect (visit_disconnected) only
decrement active_peers, update active_peers_gauge, and remove from
accepted_peers if and only if the pid was present in that accepted set; leave
rejected-peers handling (connections_rejected_counter and no accepted_peers
insertion) unchanged.
In `@pallas-network2/src/behavior/responder/handshake.rs`:
- Around line 73-88: The negotiation currently picks the highest version key
from proposed.values and accepts the peer-provided data without checking
compatibility; update the logic around negotiated in handshake.rs so that after
selecting a mutually supported version (using proposed.values and
self.config.supported_version.keys) you validate that the peer's data is
compatible with the local expected parameters (from
self.config.supported_version.get(&version) or equivalent) and only accept if
they match or are compatible; when compatible, construct the Accept message
using the local/configured data (not blindly using peer-provided `data`) before
sending via BehaviorOutput::InterfaceCommand(InterfaceCommand::Send(...,
AnyMessage::Handshake(...))), and if incompatible fall through to reject or
negotiate a different version.
In `@pallas-network2/src/behavior/responder/mod.rs`:
- Around line 295-323: After calling state.apply_msg(msg) inside the
peers.entry(...).and_modify closure, bail out of any protocol-specific visitor
dispatch if the state recorded a state-machine violation; specifically check
state.violation (or equivalent field) immediately after state.apply_msg and if
set, skip calling self.connection.visit_inbound_msg and the per-protocol
visitors (self.handshake.visit_inbound_msg, self.keepalive.visit_inbound_msg,
self.chainsync.visit_inbound_msg, self.blockfetch.visit_inbound_msg,
self.peersharing.visit_inbound_msg, self.txsubmission.visit_inbound_msg) to
avoid generating responses from stale/invalid state. Ensure the check is
performed inside the same closure so no further outbound messages are pushed
when a violation is present.
- Around line 352-360: on_disconnected currently marks a peer disconnected and
resets its state but leaves the peer in self.peers causing unbounded growth;
after running the existing modification block (setting
ConnectionState::Disconnected, calling state.reset(), and invoking
all_visitors!(... visit_disconnected)), remove the peer entry from self.peers
(e.g., call remove or use entry API to take it) so dead peers are not retained;
ensure you still perform the visit_disconnected callbacks before removal and
reference the same symbols: on_disconnected, self.peers,
ConnectionState::Disconnected, state.reset(), and all_visitors!.
In `@pallas-network2/src/behavior/responder/txsubmission.rs`:
- Around line 84-95: The try_extract_txs function currently drops inbound tx
data by only logging when state.tx_submission matches
txsubmission_proto::State::Txs; modify try_extract_txs (and if needed
visit_inbound_msg) to extract the received transactions from state.tx_submission
(or the inbound message supplied to visit_inbound_msg) and emit a
ResponderEvent::TxReceived for each transaction (or a batch event) so the rest
of the system sees the incoming TXs; locate the logic in try_extract_txs,
reference ResponderState.tx_submission and ResponderEvent::TxReceived, and push
the event into the outbound queue or call the existing enqueue/emit helper used
elsewhere in ResponderBehavior to deliver the event.
In `@pallas-network2/src/interface.rs`:
- Around line 183-193: dispatch_disconnect races with InterfaceCommand::Send
because take_writer currently clones the writer instead of removing it from
self.writers; fix by ensuring the writer is removed before scheduling the
disconnect: change or implement take_writer to remove and return the writer
entry (or explicitly call self.writers.remove(&pid) before creating the
disconnect future), so that subsequent Send commands for that pid fail
immediately; keep the existing behavior of emitting InternalEvent::Disconnected
when no writer exists and push the disconnect future (disconnect(pid, stream))
only after the writer has been removed.
In `@pallas-network2/tests/responder_happy_path.rs`:
- Around line 108-125: The test currently breaks out as soon as
node.initialized_peers.len() == num_peers in the tokio::time::timeout loop,
which allows later protocol stages to fail unnoticed; update the wait condition
to verify each peer has reached the full protocol milestone (not just
initialized) by either checking a per-peer milestone set/map on node (e.g.,
node.peer_milestones or node.peers_ready_for_protocol) or adding a helper like
node.all_peers_reached_stage("completed") and loop until that returns true
(still using tokio::time::timeout and node.tick().await), then assert that all
peers reached that milestone instead of only initialized_peers.
---
Nitpick comments:
In `@examples/p2p-initiator/Cargo.toml`:
- Around line 1-5: Add publish = false to the [package] table in the Cargo.toml
for the p2p-initiator crate so it is marked non-publishable; specifically update
the package metadata for the crate named "p2p-initiator" (the [package] section
containing name = "p2p-initiator", version = "0.1.0", edition = "2021") to
include publish = false to match the other example crates and prevent accidental
publication.
In `@examples/p2p-initiator/src/main.rs`:
- Around line 109-116: Replace the hardcoded bootstrap address used in
node.network.execute(InitiatorCommand::IncludePeer(...)) and StartSync with a
configurable value (e.g., read from an environment variable or CLI argument) so
the example can run without source edits; update the code around
InitiatorCommand::IncludePeer and InitiatorCommand::StartSync to read and parse
the bootstrap peer string (default to "127.0.0.1:3000" if unset), handle parse
errors gracefully instead of unwrap(), and use that parsed SocketAddr when
calling node.network.execute.
In `@pallas-network2/src/emulation/initiator_mock.rs`:
- Around line 81-88: When emitting disconnect events (e.g., in
push_jittered_disconnect where you produce InterfaceEvent::Disconnected(pid)),
clear any stored progress for that peer so a reconnect doesn't resume with stale
headers_sent; remove the entry from the progress map keyed by the same PeerId
(and do the same in the other disconnect site around lines 221-229). Locate the
progress collection usage (the progress map/field) and ensure you call its
remove/clear method for pid immediately when pushing/sending a Disconnected
event for that PeerId.
In `@pallas-network2/src/interface.rs`:
- Around line 283-288: Do not fabricate a PeerId for listener accept failures;
add a dedicated non-peer event like a new
InternalEvent::ListenerAcceptError(io::Error) (or InternalEvent::AcceptError)
and emit that instead of creating PeerId in the accept error path, then update
any match/metrics/handlers that previously assumed InternalEvent::Error with a
PeerId to handle the new variant (or alternately log the error and re-arm the
listener instead of emitting a peer-scoped event). Ensure changes touch the
accept error site that currently constructs PeerId and the consumers that match
InternalEvent::Error.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pallas-network2/network-spec.pdfis excluded by!**/*.pdf
📒 Files selected for processing (35)
Cargo.tomlexamples/otel/otel-collector.yamlexamples/p2p-discovery/.gitignoreexamples/p2p-discovery/Cargo.tomlexamples/p2p-discovery/src/main.rsexamples/p2p-discovery/src/node.rsexamples/p2p-discovery/src/otel.rsexamples/p2p-initiator/Cargo.tomlexamples/p2p-initiator/src/main.rsexamples/p2p-responder/Cargo.tomlexamples/p2p-responder/src/main.rsexamples/p2p-responder/src/otel.rspallas-network2/Cargo.tomlpallas-network2/src/bearer.rspallas-network2/src/behavior/initiator/blockfetch.rspallas-network2/src/behavior/initiator/chainsync.rspallas-network2/src/behavior/initiator/connection.rspallas-network2/src/behavior/initiator/discovery.rspallas-network2/src/behavior/initiator/handshake.rspallas-network2/src/behavior/initiator/keepalive.rspallas-network2/src/behavior/initiator/mod.rspallas-network2/src/behavior/initiator/promotion.rspallas-network2/src/behavior/mod.rspallas-network2/src/behavior/responder/blockfetch.rspallas-network2/src/behavior/responder/chainsync.rspallas-network2/src/behavior/responder/connection.rspallas-network2/src/behavior/responder/handshake.rspallas-network2/src/behavior/responder/keepalive.rspallas-network2/src/behavior/responder/mod.rspallas-network2/src/behavior/responder/peersharing.rspallas-network2/src/behavior/responder/txsubmission.rspallas-network2/src/emulation/initiator_mock.rspallas-network2/src/emulation/mod.rspallas-network2/src/interface.rspallas-network2/tests/responder_happy_path.rs
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (8)
pallas-network2/src/interface.rs (1)
118-120:⚠️ Potential issue | 🟠 MajorMake disconnect atomic by removing the writer before scheduling shutdown.
dispatch_disconnectcurrently clones the writer instead of removing it, soSendcan still be queued for that peer until disconnect finishes.Suggested fix
- fn take_writer(&mut self, pid: &PeerId) -> Option<SharedWriter> { + fn take_writer(&self, pid: &PeerId) -> Option<SharedWriter> { self.writers.get(pid).cloned() } + + fn remove_writer(&mut self, pid: &PeerId) -> Option<SharedWriter> { + self.writers.remove(pid) + } @@ - let Some(stream) = self.take_writer(&pid) else { + let Some(stream) = self.remove_writer(&pid) else { tracing::warn!(%pid, "trying to disconnect a peer not connected"); self.futures .push(Box::pin(ready(InternalEvent::Disconnected(pid.clone())))); return; };Also applies to: 183-193
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pallas-network2/src/interface.rs` around lines 118 - 120, The disconnect flow is not atomic because take_writer clones the SharedWriter instead of removing it; change take_writer to remove the writer from the map (use self.writers.remove(pid)) so the writer is taken ownership and no further Send can be queued, and update the other disconnect site (the code around dispatch_disconnect / the block at lines 183-193 that similarly clones a writer) to remove the writer from self.writers rather than cloning it; ensure you return Option<SharedWriter> as before but by taking ownership via remove so shutdown scheduling happens after the writer is removed.pallas-network2/src/bearer.rs (2)
201-205:⚠️ Potential issue | 🟠 MajorUse
raw_channelas the reassembly key for partial chunks.Using the masked
channelforpartial_chunkscan merge fragments across mode variants. Keep decode logic on maskedchannel, but index the chunk buffer withraw_channel.Suggested fix
- let previous = partial_chunks.remove(&channel); + let previous = partial_chunks.remove(&raw_channel); @@ - partial_chunks.insert(channel, payload); + partial_chunks.insert(raw_channel, payload);Also applies to: 222-224
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pallas-network2/src/bearer.rs` around lines 201 - 205, The code currently masks raw_channel into channel (raw_channel & !crate::protocol::PROTOCOL_SERVER) but uses the masked channel as the key into partial_chunks, which can merge fragments across mode variants; change accesses to partial_chunks (remove/insert/get) to use raw_channel as the reassembly key while keeping all decode/processing logic using the masked channel variable; update the occurrences around read_segment() (where raw_channel and channel are set) and the similar block near lines handling partial chunk insertion (the later 222-224 region) to index partial_chunks with raw_channel instead of channel.
86-87:⚠️ Potential issue | 🟠 MajorMake aggressive TCP linger opt-in, not default.
Line 87 still forces abortive close for every outbound TCP connection, which can drop unsent bytes during shutdown. This should be configurable and off by default.
Suggested fix
- // Aggressive linger avoids TIME_WAIT accumulation when connecting to many nodes - socket2::SockRef::from(&stream).set_linger(Some(std::time::Duration::from_secs(0)))?; + // Keep OS/default linger for graceful close semantics. + // If needed, make aggressive linger an explicit transport option.#!/bin/bash # Verify whether outbound connect path still forces aggressive linger. rg -n "connect_tcp|set_linger" pallas-network2/src/bearer.rsExpected result: no unconditional
set_linger(Some(Duration::from_secs(0)))inconnect_tcp.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pallas-network2/src/bearer.rs` around lines 86 - 87, The code unconditionally calls socket2::SockRef::from(&stream).set_linger(Some(Duration::from_secs(0))) (in the outbound TCP connect path, e.g. connect_tcp) which forces abortive closes; make this behavior opt-in and disabled by default: add a boolean option (e.g. allow_aggressive_linger or enable_linger) to the Bearer/config or to the connect_tcp function signature, wire it from initialization (or an env/config flag), and only call set_linger(...) when that option is true; update any callers of connect_tcp/Bearer construction to pass the default false and add a short comment/doc noting the risk of dropping unsent bytes when enabled.pallas-network2/src/behavior/responder/connection.rs (1)
21-26:⚠️ Potential issue | 🟠 MajorTrack accepted peers before decrementing
active_peers.
active_peersis incremented only on accepted connections but decremented on every disconnect, so rejected-connection disconnects can skew the gauge downward.Suggested fix
pub struct ConnectionResponder { config: ConnectionResponderConfig, pub(crate) banned_peers: HashSet<PeerId>, + accepted_peers: HashSet<PeerId>, connections_per_ip: HashMap<String, usize>, active_peers: usize, @@ Self { config, banned_peers: HashSet::new(), + accepted_peers: HashSet::new(), connections_per_ip: HashMap::new(), active_peers: 0, @@ if *count > self.config.max_connections_per_ip { @@ } else { - self.active_peers += 1; - self.active_peers_gauge - .record(self.active_peers as u64, &[]); + if self.accepted_peers.insert(pid.clone()) { + self.active_peers += 1; + self.active_peers_gauge.record(self.active_peers as u64, &[]); + } self.connections_accepted_counter.add(1, &[]); } @@ - self.active_peers = self.active_peers.saturating_sub(1); - self.active_peers_gauge - .record(self.active_peers as u64, &[]); + if self.accepted_peers.remove(pid) { + self.active_peers = self.active_peers.saturating_sub(1); + self.active_peers_gauge.record(self.active_peers as u64, &[]); + }Also applies to: 103-117, 120-136
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pallas-network2/src/behavior/responder/connection.rs` around lines 21 - 26, The decrement of ConnectionResponder::active_peers must only occur for peers whose connections were previously accepted; change the disconnect logic to check whether the peer was counted as accepted before subtracting (e.g., by consulting the same acceptance state used when incrementing, such as an accepted_peers set or the existing connections_per_ip/active_peers bookkeeping) and only decrement active_peers when that check passes; update all disconnect/cleanup paths that touch active_peers (the code around ConnectionResponder methods that accept connections and the disconnect handlers referenced in the diff) to perform this conditional decrement so rejected-connection disconnects no longer skew the gauge.pallas-network2/tests/responder_happy_path.rs (1)
112-114:⚠️ Potential issue | 🟠 MajorDon’t stop at initialization; wait for full protocol milestones.
Breaking as soon as peers initialize makes this test pass even if later responder stages regress.
Suggested fix (track and assert stage completion)
struct TestResponderNode { network: Manager<MockInitiatorInterface, ResponderBehavior, AnyMessage>, initialized_peers: HashSet<PeerId>, + intersection_peers: HashSet<PeerId>, + header_peers: HashSet<PeerId>, + block_range_peers: HashSet<PeerId>, + peers_requested_peers: HashSet<PeerId>, @@ Self { network, initialized_peers: HashSet::new(), + intersection_peers: HashSet::new(), + header_peers: HashSet::new(), + block_range_peers: HashSet::new(), + peers_requested_peers: HashSet::new(), disconnected_peers: HashSet::new(), mock_slot: 1, } @@ ResponderEvent::IntersectionRequested(pid, _points) => { + self.intersection_peers.insert(pid.clone()); let point = proto::Point::Origin; let tip = self.mock_tip(); self.network .execute(ResponderCommand::ProvideIntersection(pid, point, tip)); } @@ ResponderEvent::NextHeaderRequested(pid) => { + self.header_peers.insert(pid.clone()); self.mock_slot += 1; @@ ResponderEvent::BlockRangeRequested(pid, (_start, _end)) => { + self.block_range_peers.insert(pid.clone()); let blocks: Vec<Vec<u8>> = (0..3).map(|i| vec![0xBE; 64 + i]).collect(); self.network .execute(ResponderCommand::ProvideBlocks(pid, blocks)); } @@ ResponderEvent::PeersRequested(pid, amount) => { + self.peers_requested_peers.insert(pid.clone()); let peers: Vec<peersharing::PeerAddress> = (0..amount.min(5)) @@ - if node.initialized_peers.len() == num_peers as usize { + if node.initialized_peers.len() == num_peers as usize + && node.intersection_peers.len() == num_peers as usize + && node.header_peers.len() == num_peers as usize + && node.block_range_peers.len() == num_peers as usize + && node.peers_requested_peers.len() == num_peers as usize + { break; } @@ assert_eq!(node.initialized_peers.len(), num_peers as usize, "expected all peers to initialize"); + assert_eq!(node.intersection_peers.len(), num_peers as usize, "expected all peers to request intersection"); + assert_eq!(node.header_peers.len(), num_peers as usize, "expected all peers to request headers"); + assert_eq!(node.block_range_peers.len(), num_peers as usize, "expected all peers to request block ranges"); + assert_eq!(node.peers_requested_peers.len(), num_peers as usize, "expected all peers to request peers");🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pallas-network2/tests/responder_happy_path.rs` around lines 112 - 114, The test currently breaks out of the wait loop as soon as node.initialized_peers.len() == num_peers, which lets later responder protocol regressions slip; instead extend the waiting logic in the responder_happy_path test to wait for explicit protocol milestone(s) after initialization (e.g., a per-peer "handshake_complete" or "responder_complete" flag, a node.completed_stages set, or a node.stage == FinalStage) and only exit the loop when all peers have reached that milestone (with the existing timeout). Replace the simple initialized_peers length check with checks against those final-stage indicators (and add explicit assertions that every peer reached the final stage) so the test validates full protocol completion rather than just initialization.pallas-network2/src/behavior/responder/mod.rs (2)
363-368:⚠️ Potential issue | 🟠 MajorRemove disconnected peers from
self.peersto avoid unbounded growth.Line 363 updates/reset state but leaves the peer entry resident forever; over time this grows memory and housekeeping cost.
Suggested fix
- self.peers.entry(pid.clone()).and_modify(|state| { - state.connection = ConnectionState::Disconnected; - state.reset(); - - all_visitors!(self, pid, state, visit_disconnected); - }); + if let Some(mut state) = self.peers.remove(pid) { + state.connection = ConnectionState::Disconnected; + state.reset(); + all_visitors!(self, pid, &mut state, visit_disconnected); + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pallas-network2/src/behavior/responder/mod.rs` around lines 363 - 368, The code leaves a peer entry in self.peers after marking it ConnectionState::Disconnected and resetting it, causing unbounded growth; after performing state.reset() and calling all_visitors!(self, pid, state, visit_disconnected), remove the peer entry from self.peers (e.g. call self.peers.remove(&pid) or use the entry API to remove) so the disconnected peer is dropped and memory/housekeeping doesn't grow indefinitely; update the block that currently uses self.peers.entry(pid.clone()).and_modify(...) to perform the same notifications then remove the entry for pid.
296-331:⚠️ Potential issue | 🟠 MajorStop protocol-specific inbound dispatch after a state-machine violation.
After Line 297, dispatch still proceeds even if
state.violationwas set. That allows responders to act on stale/invalid protocol state.Suggested fix
self.peers.entry(pid.clone()).and_modify(|state| { state.apply_msg(msg); + if state.violation { + // Let connection logic react (e.g., disconnect), but do not + // dispatch mini-protocol responders from invalid state. + self.connection + .visit_inbound_msg(pid, state, &mut self.outbound); + return; + } // Dispatch only to the visitor that owns the inbound message's // protocol. The previous `all_visitors!` call triggered every // visitor on every message, which caused duplicate responses🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pallas-network2/src/behavior/responder/mod.rs` around lines 296 - 331, state.apply_msg(msg) can set state.violation but the code continues to dispatch protocol handlers; change the closure in self.peers.entry(pid.clone()).and_modify(|state| { ... }) to check state.violation immediately after state.apply_msg(msg) and, if true, return early (skip connection.visit_inbound_msg and all per-protocol visit_inbound_msg calls for handshake/keepalive/chainsync/blockfetch/peersharing/txsubmission) so no further protocol-specific processing runs on a violated state.pallas-network2/src/behavior/responder/handshake.rs (1)
73-95:⚠️ Potential issue | 🔴 CriticalValidate version-data compatibility before handshake accept.
Line 76 only checks version key overlap, and Line 84 accepts using peer-provided data. This can accept incompatible handshake parameters (e.g., network settings) and initialize invalid peer state.
Suggested fix
- let negotiated = proposed - .values - .iter() - .filter(|(num, _)| self.config.supported_version.values.contains_key(num)) - .max_by_key(|(num, _)| *num) - .map(|(num, data)| (*num, data.clone())); + let negotiated = proposed + .values + .iter() + .filter_map(|(num, peer_data)| { + self.config + .supported_version + .values + .get(num) + .and_then(|local_data| { + (local_data.network_magic == peer_data.network_magic + && local_data.initiator_only_diffusion_mode + == peer_data.initiator_only_diffusion_mode + && local_data.peer_sharing == peer_data.peer_sharing + && local_data.query == peer_data.query) + .then(|| (*num, local_data.clone())) + }) + }) + .max_by_key(|(num, _)| *num); match negotiated { - Some((version, data)) => { + Some((version, local_data)) => { tracing::info!(version, "accepting handshake"); - let msg = handshake_proto::Message::Accept(version, data.clone()); + let msg = handshake_proto::Message::Accept(version, local_data.clone()); outbound.push_ready(BehaviorOutput::InterfaceCommand(InterfaceCommand::Send( pid.clone(), AnyMessage::Handshake(msg), ))); @@ self.handshakes_completed_counter.add(1, &[]); outbound.push_ready(BehaviorOutput::ExternalEvent( - ResponderEvent::PeerInitialized(pid.clone(), (version, data)), + ResponderEvent::PeerInitialized(pid.clone(), (version, local_data)), )); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pallas-network2/src/behavior/responder/handshake.rs` around lines 73 - 95, The code currently chooses a negotiated (version, data) pair by only matching version keys (proposed.values vs self.config.supported_version.values) and then accepts the peer-provided data; update the logic in the handshake selection so you verify the peer's data is compatible with your supported parameters for that version before sending Message::Accept and transitioning state. Concretely, after mapping to Some((version, data)) check against the expected config entry (self.config.supported_version.values.get(&version)) and validate field-level compatibility (network IDs, capabilities, timeouts, etc. per your protocol) and only perform the Accept send, set ConnectionState::Initialized and emit ResponderEvent::PeerInitialized if validation passes; otherwise handle rejection/negotiation (send a reject or ignore) to avoid initializing with incompatible data.
🧹 Nitpick comments (2)
pallas-network2/src/behavior/initiator/connection.rs (1)
50-67: Consider making the metric name/description more precise.The counter increments when a connection is attempted (before it's established), not when a connection successfully completes. The current name "connections" and description "Total connections" may be misleading. Consider renaming to reflect that this tracks connection attempts (e.g., "connection_attempts" or updating the description to "Total connection attempts").
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pallas-network2/src/behavior/initiator/connection.rs` around lines 50 - 67, The metric name/description is misleading because ConnectionBehavior increments the counter on attempts (see ConnectionBehavior::new building meter.u64_counter and the increment_counter method using connection_counter.add); change the counter to a more precise name/description (e.g., build the counter as "connection_attempts" with description "Total connection attempts" or keep the name and update the description accordingly) inside ConnectionBehavior::new so the metric accurately reflects that increment_counter tracks attempts rather than established connections.examples/p2p-responder/src/otel.rs (1)
92-95: Prefertry_init()overinit()to avoid process panic on re-initialization.Using
init()will panic if any subscriber was already installed in the same process (common in integration tests or repeated setup paths).🔧 Minimal change
- tracing_subscriber::registry() - .with(otel_layer) - .with(fmt_layer) - .init(); + tracing_subscriber::registry() + .with(otel_layer) + .with(fmt_layer) + .try_init() + .expect("failed to initialize tracing subscriber");🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/p2p-responder/src/otel.rs` around lines 92 - 95, The tracing subscriber is being installed with .init(), which panics if a subscriber is already set; replace the final .init() on the tracing_subscriber::registry().with(otel_layer).with(fmt_layer) call with .try_init() and handle the Result (e.g., log or ignore the AlreadyInitialized error) so repeated initialization (tests or repeated setup) does not abort the process; locate the chain that composes otel_layer and fmt_layer and change .init() to .try_init() with appropriate error handling.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@examples/p2p-responder/src/otel.rs`:
- Around line 70-102: The setup_otel function currently creates logger_provider,
tracer_provider, and meter_provider locally (via init_logs, init_traces,
init_metrics), registers tracer/meter globally, and returns (), which prevents
callers from flushing/shutting down batch exporters; change setup_otel to return
the created providers (e.g., return the logger_provider plus the tracer_provider
and meter_provider or a small struct holding them) so the caller can call their
lifecycle methods (force_flush and shutdown_with_timeout/ shutdown) on those
providers before exit; ensure you propagate the original providers (not throw
away clones) and update callers to invoke provider.force_flush() and
provider.shutdown_with_timeout(Duration::from_secs(...)) for logger_provider,
tracer_provider, and meter_provider during shutdown.
---
Duplicate comments:
In `@pallas-network2/src/bearer.rs`:
- Around line 201-205: The code currently masks raw_channel into channel
(raw_channel & !crate::protocol::PROTOCOL_SERVER) but uses the masked channel as
the key into partial_chunks, which can merge fragments across mode variants;
change accesses to partial_chunks (remove/insert/get) to use raw_channel as the
reassembly key while keeping all decode/processing logic using the masked
channel variable; update the occurrences around read_segment() (where
raw_channel and channel are set) and the similar block near lines handling
partial chunk insertion (the later 222-224 region) to index partial_chunks with
raw_channel instead of channel.
- Around line 86-87: The code unconditionally calls
socket2::SockRef::from(&stream).set_linger(Some(Duration::from_secs(0))) (in the
outbound TCP connect path, e.g. connect_tcp) which forces abortive closes; make
this behavior opt-in and disabled by default: add a boolean option (e.g.
allow_aggressive_linger or enable_linger) to the Bearer/config or to the
connect_tcp function signature, wire it from initialization (or an env/config
flag), and only call set_linger(...) when that option is true; update any
callers of connect_tcp/Bearer construction to pass the default false and add a
short comment/doc noting the risk of dropping unsent bytes when enabled.
In `@pallas-network2/src/behavior/responder/connection.rs`:
- Around line 21-26: The decrement of ConnectionResponder::active_peers must
only occur for peers whose connections were previously accepted; change the
disconnect logic to check whether the peer was counted as accepted before
subtracting (e.g., by consulting the same acceptance state used when
incrementing, such as an accepted_peers set or the existing
connections_per_ip/active_peers bookkeeping) and only decrement active_peers
when that check passes; update all disconnect/cleanup paths that touch
active_peers (the code around ConnectionResponder methods that accept
connections and the disconnect handlers referenced in the diff) to perform this
conditional decrement so rejected-connection disconnects no longer skew the
gauge.
In `@pallas-network2/src/behavior/responder/handshake.rs`:
- Around line 73-95: The code currently chooses a negotiated (version, data)
pair by only matching version keys (proposed.values vs
self.config.supported_version.values) and then accepts the peer-provided data;
update the logic in the handshake selection so you verify the peer's data is
compatible with your supported parameters for that version before sending
Message::Accept and transitioning state. Concretely, after mapping to
Some((version, data)) check against the expected config entry
(self.config.supported_version.values.get(&version)) and validate field-level
compatibility (network IDs, capabilities, timeouts, etc. per your protocol) and
only perform the Accept send, set ConnectionState::Initialized and emit
ResponderEvent::PeerInitialized if validation passes; otherwise handle
rejection/negotiation (send a reject or ignore) to avoid initializing with
incompatible data.
In `@pallas-network2/src/behavior/responder/mod.rs`:
- Around line 363-368: The code leaves a peer entry in self.peers after marking
it ConnectionState::Disconnected and resetting it, causing unbounded growth;
after performing state.reset() and calling all_visitors!(self, pid, state,
visit_disconnected), remove the peer entry from self.peers (e.g. call
self.peers.remove(&pid) or use the entry API to remove) so the disconnected peer
is dropped and memory/housekeeping doesn't grow indefinitely; update the block
that currently uses self.peers.entry(pid.clone()).and_modify(...) to perform the
same notifications then remove the entry for pid.
- Around line 296-331: state.apply_msg(msg) can set state.violation but the code
continues to dispatch protocol handlers; change the closure in
self.peers.entry(pid.clone()).and_modify(|state| { ... }) to check
state.violation immediately after state.apply_msg(msg) and, if true, return
early (skip connection.visit_inbound_msg and all per-protocol visit_inbound_msg
calls for handshake/keepalive/chainsync/blockfetch/peersharing/txsubmission) so
no further protocol-specific processing runs on a violated state.
In `@pallas-network2/src/interface.rs`:
- Around line 118-120: The disconnect flow is not atomic because take_writer
clones the SharedWriter instead of removing it; change take_writer to remove the
writer from the map (use self.writers.remove(pid)) so the writer is taken
ownership and no further Send can be queued, and update the other disconnect
site (the code around dispatch_disconnect / the block at lines 183-193 that
similarly clones a writer) to remove the writer from self.writers rather than
cloning it; ensure you return Option<SharedWriter> as before but by taking
ownership via remove so shutdown scheduling happens after the writer is removed.
In `@pallas-network2/tests/responder_happy_path.rs`:
- Around line 112-114: The test currently breaks out of the wait loop as soon as
node.initialized_peers.len() == num_peers, which lets later responder protocol
regressions slip; instead extend the waiting logic in the responder_happy_path
test to wait for explicit protocol milestone(s) after initialization (e.g., a
per-peer "handshake_complete" or "responder_complete" flag, a
node.completed_stages set, or a node.stage == FinalStage) and only exit the loop
when all peers have reached that milestone (with the existing timeout). Replace
the simple initialized_peers length check with checks against those final-stage
indicators (and add explicit assertions that every peer reached the final stage)
so the test validates full protocol completion rather than just initialization.
---
Nitpick comments:
In `@examples/p2p-responder/src/otel.rs`:
- Around line 92-95: The tracing subscriber is being installed with .init(),
which panics if a subscriber is already set; replace the final .init() on the
tracing_subscriber::registry().with(otel_layer).with(fmt_layer) call with
.try_init() and handle the Result (e.g., log or ignore the AlreadyInitialized
error) so repeated initialization (tests or repeated setup) does not abort the
process; locate the chain that composes otel_layer and fmt_layer and change
.init() to .try_init() with appropriate error handling.
In `@pallas-network2/src/behavior/initiator/connection.rs`:
- Around line 50-67: The metric name/description is misleading because
ConnectionBehavior increments the counter on attempts (see
ConnectionBehavior::new building meter.u64_counter and the increment_counter
method using connection_counter.add); change the counter to a more precise
name/description (e.g., build the counter as "connection_attempts" with
description "Total connection attempts" or keep the name and update the
description accordingly) inside ConnectionBehavior::new so the metric accurately
reflects that increment_counter tracks attempts rather than established
connections.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (20)
examples/p2p-discovery/src/otel.rsexamples/p2p-initiator/src/main.rsexamples/p2p-responder/src/main.rsexamples/p2p-responder/src/otel.rspallas-network2/src/bearer.rspallas-network2/src/behavior/initiator/connection.rspallas-network2/src/behavior/initiator/discovery.rspallas-network2/src/behavior/initiator/handshake.rspallas-network2/src/behavior/initiator/keepalive.rspallas-network2/src/behavior/responder/blockfetch.rspallas-network2/src/behavior/responder/chainsync.rspallas-network2/src/behavior/responder/connection.rspallas-network2/src/behavior/responder/handshake.rspallas-network2/src/behavior/responder/keepalive.rspallas-network2/src/behavior/responder/mod.rspallas-network2/src/behavior/responder/peersharing.rspallas-network2/src/behavior/responder/txsubmission.rspallas-network2/src/emulation/initiator_mock.rspallas-network2/src/interface.rspallas-network2/tests/responder_happy_path.rs
🚧 Files skipped from review as they are similar to previous changes (6)
- pallas-network2/src/behavior/initiator/handshake.rs
- examples/p2p-responder/src/main.rs
- pallas-network2/src/behavior/responder/blockfetch.rs
- examples/p2p-initiator/src/main.rs
- pallas-network2/src/behavior/responder/txsubmission.rs
- pallas-network2/src/behavior/initiator/discovery.rs
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (1)
pallas-network2/src/behavior/initiator/mod.rs (1)
513-515:⚠️ Potential issue | 🟠 Major
SendTxstill drops a valid command payload.Line 513 accepts
SendTx(PeerId, EraTxId, EraTxBody)but Line 514 only logs a warning and does not enqueue or transition any tx-submission flow, so the command is effectively ignored at runtime.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pallas-network2/src/behavior/initiator/mod.rs` around lines 513 - 515, The match arm for InitiatorCommand::SendTx currently only logs a warning and drops the payload; update the InitiatorCommand::SendTx(PeerId, EraTxId, EraTxBody) branch in mod.rs to forward the values into the tx-submission flow instead of ignoring them—specifically, call the existing enqueue/dispatch function or state transition used for transaction submission (e.g., the module's tx queue or transition_to_tx_submission/submit_tx helper), passing the PeerId, EraTxId and EraTxBody, and ensure any necessary acknowledgements or state updates are performed so the command is not silently discarded.
🧹 Nitpick comments (1)
pallas-network2/src/behavior/initiator/mod.rs (1)
324-328: Avoid silently dropping IO/lifecycle events for unknown peers.At Line 324 (and same pattern in the other handlers),
entry(...).and_modify(...)is a no-op when the peer is absent. That makes missed/early events invisible and hard to debug. Consider at least logging when a peer is unknown, or centralizing anor_insert_with(InitiatorState::new)path if unknown peers are valid.Also applies to: 335-339, 346-350, 357-362, 369-374, 381-385
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pallas-network2/src/behavior/initiator/mod.rs` around lines 324 - 328, The current peers.entry(pid.clone()).and_modify(...) calls (used around apply_msg and visit_inbound_msg handlers) silently ignore events for unknown peers; modify these handlers to either (a) use entry(pid.clone()).and_modify(...).or_insert_with(|| InitiatorState::new(pid.clone())) so missing peers are created before calling state.apply_msg and all_visitors!(..., visit_inbound_msg), or (b) at minimum add a clear debug/error log inside the else branch when the entry is absent (include pid and brief context) so lifecycle/IO events aren't dropped silently; apply the same change to the other similar blocks referenced (the handlers that call apply_msg/visit_*).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pallas-network2/src/behavior/initiator/mod.rs`:
- Around line 389-395: on_discovered currently unconditionally creates a new
InitiatorState and calls self.peers.insert(pid.clone(), state), which overwrites
any existing state (losing error_count, promotion tag, and protocol progress);
change it to check for an existing entry first (use
self.peers.entry(pid.clone()).or_insert_with(...) or self.peers.get_mut(pid))
and if a state exists call all_visitors!(self, pid, existing_state,
visit_discovered) without replacing it, otherwise create and insert a new
InitiatorState; ensure you do not reset fields like error_count, promotion tag,
or protocol progress when a peer is already present.
- Around line 437-442: The match over the result of
self.outbound.futures.poll_next_unpin(cx) is converting Poll::Ready(None) into
Poll::Pending and thus hides stream termination; update the code that calls
self.outbound.futures.poll_next_unpin(cx) (the current match in
initiator::mod.rs) to propagate Poll::Ready(None) instead of mapping it to
Pending — e.g. return the poll directly or add an explicit arm for
Poll::Ready(None) => Poll::Ready(None); apply the same change to the analogous
code in responder::mod.rs.
In `@pallas-network2/src/behavior/responder/connection.rs`:
- Around line 96-120: The visit_connected handler currently doesn't check
banned_peers, so banned PeerId values are being inserted into accepted_peers and
counted; modify visit_connected (function visit_connected) to first check if pid
is in self.banned_peers and immediately reject it: increment
connections_rejected_counter, log/warn as appropriate, push
InterfaceCommand::Disconnect via outbound.push_ready, and return early — do this
before updating connections_per_ip, accepted_peers, active_peers,
active_peers_gauge, or connections_accepted_counter to avoid inflating
acceptance metrics; ensure existing IP-based rejection (connections_per_ip and
config.max_connections_per_ip) still runs for non-banned peers.
---
Duplicate comments:
In `@pallas-network2/src/behavior/initiator/mod.rs`:
- Around line 513-515: The match arm for InitiatorCommand::SendTx currently only
logs a warning and drops the payload; update the
InitiatorCommand::SendTx(PeerId, EraTxId, EraTxBody) branch in mod.rs to forward
the values into the tx-submission flow instead of ignoring them—specifically,
call the existing enqueue/dispatch function or state transition used for
transaction submission (e.g., the module's tx queue or
transition_to_tx_submission/submit_tx helper), passing the PeerId, EraTxId and
EraTxBody, and ensure any necessary acknowledgements or state updates are
performed so the command is not silently discarded.
---
Nitpick comments:
In `@pallas-network2/src/behavior/initiator/mod.rs`:
- Around line 324-328: The current peers.entry(pid.clone()).and_modify(...)
calls (used around apply_msg and visit_inbound_msg handlers) silently ignore
events for unknown peers; modify these handlers to either (a) use
entry(pid.clone()).and_modify(...).or_insert_with(||
InitiatorState::new(pid.clone())) so missing peers are created before calling
state.apply_msg and all_visitors!(..., visit_inbound_msg), or (b) at minimum add
a clear debug/error log inside the else branch when the entry is absent (include
pid and brief context) so lifecycle/IO events aren't dropped silently; apply the
same change to the other similar blocks referenced (the handlers that call
apply_msg/visit_*).
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
examples/p2p-initiator/Cargo.tomlexamples/p2p-responder/Cargo.tomlpallas-network2/src/behavior/initiator/mod.rspallas-network2/src/behavior/responder/connection.rspallas-network2/src/behavior/responder/mod.rs
🚧 Files skipped from review as they are similar to previous changes (3)
- examples/p2p-responder/Cargo.toml
- examples/p2p-initiator/Cargo.toml
- pallas-network2/src/behavior/responder/mod.rs
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
pallas-network2/src/behavior/responder/handshake.rs (1)
73-103:⚠️ Potential issue | 🟠 MajorPick the highest compatible handshake version before
max_by_key.At Line [73], version selection is done before compatibility filtering. That can wrongly refuse when the top shared version mismatches
network_magicbut a lower shared version is valid.✅ Suggested fix
- let negotiated = proposed - .values - .iter() - .filter(|(num, _)| self.config.supported_version.values.contains_key(num)) - .max_by_key(|(num, _)| *num) - .map(|(num, peer_data)| { - let our_data = &self.config.supported_version.values[num]; - (*num, peer_data.clone(), our_data.clone()) - }); + let negotiated = proposed + .values + .iter() + .filter_map(|(num, peer_data)| { + self.config + .supported_version + .values + .get(num) + .and_then(|our_data| { + (peer_data.network_magic == our_data.network_magic) + .then(|| (*num, our_data.clone())) + }) + }) + .max_by_key(|(num, _)| *num); match negotiated { - Some((version, peer_data, our_data)) => { - if peer_data.network_magic != our_data.network_magic { - tracing::warn!( - peer_magic = peer_data.network_magic, - our_magic = our_data.network_magic, - "refusing handshake: network magic mismatch" - ); - self.handshakes_refused_counter.add(1, &[]); - let msg = handshake_proto::Message::Refuse( - handshake_proto::RefuseReason::Refused( - version, - "network magic mismatch".to_string(), - ), - ); - outbound.push_ready(BehaviorOutput::InterfaceCommand(InterfaceCommand::Send( - pid.clone(), - AnyMessage::Handshake(msg), - ))); - return; - } - + Some((version, our_data)) => { tracing::info!(version, "accepting handshake"); let msg = handshake_proto::Message::Accept(version, our_data.clone()); outbound.push_ready(BehaviorOutput::InterfaceCommand(InterfaceCommand::Send( pid.clone(), AnyMessage::Handshake(msg), )));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pallas-network2/src/behavior/responder/handshake.rs` around lines 73 - 103, The current negotiation picks the highest mutually-supported version into `negotiated` without ensuring the peer's `network_magic` matches ours, causing refusal even when a lower shared version would work; change the selection logic that builds `negotiated` (currently using `proposed.values.iter().filter(...).max_by_key(...)`) to instead choose the highest version that is present in `self.config.supported_version.values` AND whose `peer_data.network_magic == our_data.network_magic` (e.g., iterate the filtered entries in descending order or filter to include the network_magic equality before taking the max), leaving the existing refusal path (the `handshake_proto::Refuse` and `handshakes_refused_counter` code) to run only when no compatible version is found.
🧹 Nitpick comments (1)
pallas-network2/src/behavior/responder/txsubmission.rs (1)
77-90: Avoid cloning and retaining consumed tx batches intry_extract_txs.At Line [83], events are emitted via
tx.clone(), which duplicates payloads and keeps old tx bodies in state. Draining once is cheaper and avoids stale retention.♻️ Proposed refactor
- fn try_extract_txs( - &self, - pid: &PeerId, - state: &ResponderState, - outbound: &mut OutboundQueue<ResponderBehavior>, - ) { - if let txsubmission_proto::State::Txs(txs) = &state.tx_submission { - for tx in txs { - outbound.push_ready(BehaviorOutput::ExternalEvent( - ResponderEvent::TxReceived(pid.clone(), tx.clone()), - )); - } - } - } + fn try_extract_txs( + &self, + pid: &PeerId, + state: &mut ResponderState, + outbound: &mut OutboundQueue<ResponderBehavior>, + ) { + if let txsubmission_proto::State::Txs(txs) = &mut state.tx_submission { + for tx in std::mem::take(txs) { + outbound.push_ready(BehaviorOutput::ExternalEvent( + ResponderEvent::TxReceived(pid.clone(), tx), + )); + } + } + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pallas-network2/src/behavior/responder/txsubmission.rs` around lines 77 - 90, The current try_extract_txs clones each tx from state.tx_submission, retaining the batch in state; change the function to take a mutable ResponderState (or otherwise obtain &mut state.tx_submission) and drain/move the transactions out instead of cloning: match on txsubmission_proto::State::Txs(txs) as a mutable reference and use txs.drain(..) (or mem::take to replace the variant) to iterate owned tx values, then call outbound.push_ready(BehaviorOutput::ExternalEvent(ResponderEvent::TxReceived(pid.clone(), tx))) so the tx is moved, not cloned, and the consumed batch is removed from ResponderState.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pallas-network2/src/behavior/responder/mod.rs`:
- Around line 344-348: In on_outbound_msg, after calling state.apply_msg(msg)
inside the self.peers.entry(pid.clone()).and_modify block, detect whether the
state recorded a protocol violation (e.g., a flag or method on the peer state
such as has_violation/violation() or the value set by state.apply_msg) and
short-circuit by NOT invoking the visitor dispatch macro; i.e., only call
all_visitors!(self, pid, state, visit_outbound_msg) when no violation was
recorded. Update the and_modify closure in on_outbound_msg to check the peer
state’s violation indicator and return/skip the visitor call when a violation is
present to avoid emitting follow-up actions from invalid state.
---
Duplicate comments:
In `@pallas-network2/src/behavior/responder/handshake.rs`:
- Around line 73-103: The current negotiation picks the highest
mutually-supported version into `negotiated` without ensuring the peer's
`network_magic` matches ours, causing refusal even when a lower shared version
would work; change the selection logic that builds `negotiated` (currently using
`proposed.values.iter().filter(...).max_by_key(...)`) to instead choose the
highest version that is present in `self.config.supported_version.values` AND
whose `peer_data.network_magic == our_data.network_magic` (e.g., iterate the
filtered entries in descending order or filter to include the network_magic
equality before taking the max), leaving the existing refusal path (the
`handshake_proto::Refuse` and `handshakes_refused_counter` code) to run only
when no compatible version is found.
---
Nitpick comments:
In `@pallas-network2/src/behavior/responder/txsubmission.rs`:
- Around line 77-90: The current try_extract_txs clones each tx from
state.tx_submission, retaining the batch in state; change the function to take a
mutable ResponderState (or otherwise obtain &mut state.tx_submission) and
drain/move the transactions out instead of cloning: match on
txsubmission_proto::State::Txs(txs) as a mutable reference and use txs.drain(..)
(or mem::take to replace the variant) to iterate owned tx values, then call
outbound.push_ready(BehaviorOutput::ExternalEvent(ResponderEvent::TxReceived(pid.clone(),
tx))) so the tx is moved, not cloned, and the consumed batch is removed from
ResponderState.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
pallas-network2/src/behavior/responder/handshake.rspallas-network2/src/behavior/responder/mod.rspallas-network2/src/behavior/responder/txsubmission.rspallas-network2/src/protocol/txsubmission.rspallas-network2/tests/responder_happy_path.rs
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (3)
pallas-network2/src/behavior/initiator/mod.rs (2)
514-516:⚠️ Potential issue | 🟠 Major
SendTxis still effectively dropped.This path no longer panics, but it still accepts a valid command and does nothing beyond logging. That leaves callers with false-success behavior and no tx-submission side effect.
If helpful, I can draft a minimal non-panicking wiring that enqueues/schedules tx-submission through the existing initiator flow.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pallas-network2/src/behavior/initiator/mod.rs` around lines 514 - 516, The InitiatorCommand::SendTx arm currently only logs and drops the command causing false-success behavior; update the InitiatorCommand::SendTx handling in mod.rs to enqueue the transaction into the existing initiator submission flow instead of returning early: locate the InitiatorCommand::SendTx match arm and invoke the same scheduling/enqueue path used by other submit actions (reuse the initiator's tx queue/submit function or call the existing submit_tx / schedule_tx helper used elsewhere in this module), propagate any errors back to the caller or emit an error result via the same notifier/channel the initiator uses, and ensure you include any necessary metadata (originator ID / nonce) so the tx is actually processed rather than silently ignored.
389-395:⚠️ Potential issue | 🟠 MajorAvoid overwriting existing peer state in discovery path.
Line 394 still unconditionally inserts and replaces existing state. Repeated discovery/
IncludePeercan wipeerror_count, promotion, and protocol progress.Suggested idempotent update
fn on_discovered(&mut self, pid: &PeerId) { - let mut state = InitiatorState::new(); - - all_visitors!(self, pid, &mut state, visit_discovered); - - self.peers.insert(pid.clone(), state); + match self.peers.entry(pid.clone()) { + std::collections::hash_map::Entry::Occupied(mut entry) => { + all_visitors!(self, pid, entry.get_mut(), visit_discovered); + } + std::collections::hash_map::Entry::Vacant(entry) => { + let mut state = InitiatorState::new(); + all_visitors!(self, pid, &mut state, visit_discovered); + entry.insert(state); + } + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pallas-network2/src/behavior/initiator/mod.rs` around lines 389 - 395, The on_discovered implementation currently always creates a fresh InitiatorState and unconditionally replaces any existing entry via self.peers.insert, which wipes error_count, promotion, and protocol progress; change it to update idempotently by using the map entry API (self.peers.entry(pid.clone())) to only insert a new InitiatorState when no state exists, and run all_visitors! (visit_discovered) against that existing-or-new state instead of always creating a brand new one; in short, replace the unconditional InitiatorState::new + insert with entry().or_insert_with(...) (or entry().or_default()) and call all_visitors! on the resulting mutable reference so existing state is preserved.examples/p2p-responder/src/main.rs (1)
151-153:⚠️ Potential issue | 🟡 MinorAvoid
unwrap()in startup logging path (still panic-prone).Line 152 can still crash the process before serving traffic if
local_addr()fails.Safer logging variant
- tracing::info!( - addr = %listener.local_addr().unwrap(), - "listening for inbound connections" - ); + match listener.local_addr() { + Ok(addr) => tracing::info!(%addr, "listening for inbound connections"), + Err(error) => tracing::warn!(%error, "listener bound but local_addr() failed"), + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/p2p-responder/src/main.rs` around lines 151 - 153, The startup log calls listener.local_addr().unwrap() which can panic; instead handle the Result from listener.local_addr() explicitly (e.g., match or if let) and log safely: call listener.local_addr(), on Ok(addr) include addr in the tracing::info! message, and on Err(err) emit a tracing::error! (or tracing::warn!) with the error and a fallback message so the process doesn't panic during startup; update the logging site that currently uses tracing::info! to use the handled result.
🧹 Nitpick comments (7)
pallas-network2/src/behavior/responder/mod.rs (3)
490-505: Stream implementation simply delegates to outbound queue.The
poll_nextimplementation is straightforward. The match arms could be simplified to justpollsince all variants are passed through unchanged, but this is a minor style preference.♻️ Optional simplification
fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll<Option<Self::Item>> { - let poll = self.outbound.futures.poll_next_unpin(cx); - - match poll { - Poll::Ready(Some(x)) => Poll::Ready(Some(x)), - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - } + self.outbound.futures.poll_next_unpin(cx) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pallas-network2/src/behavior/responder/mod.rs` around lines 490 - 505, The Stream::poll_next implementation for ResponderBehavior currently matches on the result of self.outbound.futures.poll_next_unpin(cx) and re-returns each arm unchanged; simplify by directly returning the polled value (i.e., return self.outbound.futures.poll_next_unpin(cx)) inside poll_next to remove the redundant match while keeping the same behavior; update the body of ResponderBehavior::poll_next to delegate directly to outbound.futures.poll_next_unpin(cx).
77-89: Consider documenting the public fields ofResponderState.While fields are marked
pub(crate), external consumers interacting via the public methods (is_initialized(),version()) would benefit from understanding the state model. A brief doc comment explaining the lifecycle (Connected → Initialized → Disconnected) and whenviolationis set would aid maintainability.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pallas-network2/src/behavior/responder/mod.rs` around lines 77 - 89, Add doc comments to the ResponderState struct and its public(crate) fields explaining the lifecycle and semantics: describe the overall state model (e.g., Connected -> Initialized -> Disconnected), what handshake/keepalive/peersharing/blockfetch/chainsync/tx_submission represent, when violation is set and how error_count and violations_counter are used, and note which public methods (is_initialized(), version()) rely on these fields; update the doc comment on ResponderState (and brief inline comments for fields like connection, handshake, violation, error_count, violations_counter) so future readers can map fields to lifecycle transitions and observable behavior.
200-210:reset()does not clearerror_count.When a peer disconnects and state is reset,
error_countpersists. Sinceon_disconnectedcallsreset()and then removes the peer entry (Line 379), this is currently benign. However, if the reset logic is ever reused for reconnection scenarios, stale error counts could cause premature bans.🛡️ Suggested defensive fix
pub fn reset(&mut self) { self.connection = ConnectionState::default(); self.handshake = proto::handshake::State::default(); self.keepalive = proto::keepalive::State::default(); self.peersharing = proto::peersharing::State::default(); self.blockfetch = proto::blockfetch::State::default(); self.chainsync = proto::chainsync::State::default(); self.tx_submission = proto::txsubmission::State::default(); self.violation = false; + self.error_count = 0; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pallas-network2/src/behavior/responder/mod.rs` around lines 200 - 210, The reset() method currently reinitializes most per-peer fields but omits clearing error_count; update responder::reset() to also reset self.error_count (e.g., set self.error_count = 0 or Default::default()) so stale error counts don't persist across resets (used by reset() and referenced by on_disconnected); modify the reset() function in mod.rs (the same block that touches ConnectionState, handshake, keepalive, etc.) to include resetting error_count.pallas-network2/src/behavior/responder/connection.rs (2)
109-120: Per-IP count increments before rejection check.Line 110 increments
*countbefore checking if it exceeds the limit at Line 112. This means a rejected connection still increments the per-IP count. However, since the connection is immediately disconnected,visit_disconnectedwill decrement it back.This is technically correct but creates a brief inconsistency where the count temporarily exceeds the limit. If multiple connections from the same IP arrive simultaneously, they could all pass the check before any are rejected.
🛡️ Optional fix for race-free rejection
let count = self.connections_per_ip.entry(pid.host.clone()).or_insert(0); - *count += 1; - if *count > self.config.max_connections_per_ip { + if *count >= self.config.max_connections_per_ip { tracing::warn!( ip = %pid.host, - count = *count, + count = *count + 1, max = self.config.max_connections_per_ip, "too many connections from IP, disconnecting" ); self.connections_rejected_counter.add(1, &[]); outbound.push_ready(InterfaceCommand::Disconnect(pid.clone())); } else if self.accepted_peers.insert(pid.clone()) { + *count += 1; self.active_peers += 1;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pallas-network2/src/behavior/responder/connection.rs` around lines 109 - 120, The per-IP counter (self.connections_per_ip) is incremented before the enforcement check, allowing brief overcounting and race windows; change the logic in the connection acceptance path so you check the current count against self.config.max_connections_per_ip first and only increment the entry when you decide to accept the connection, ensuring rejected connections trigger outbound.push_ready(InterfaceCommand::Disconnect(pid.clone())) without modifying the counter; keep visit_disconnected as the decrement path for accepted connections and ensure the same pid.host key is used for both increment and decrement to avoid leaks.
7-19: Default config usesmax_error_count: 1, which bans after 2 errors.Line 91 checks
state.error_count > self.config.max_error_count, so with the default of 1, a peer is banned after their error count exceeds 1 (i.e., on the second error). This may be intentional but is worth confirming—"max 1 error" might be interpreted as banning on the first error.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pallas-network2/src/behavior/responder/connection.rs` around lines 7 - 19, The default max_error_count is 1 but the ban check uses `state.error_count > self.config.max_error_count`, which causes a ban on the second error (ambiguous semantics); decide the intended meaning and fix either by adjusting the default in ConnectionResponderConfig::default (e.g., set max_error_count to 0 if you want "ban on first error" or to 2 if you want to allow two errors) or by changing the ban condition in the code that checks `state.error_count > self.config.max_error_count` to use `>=` so that `max_error_count` clearly means "ban at this many errors" (update the comparison where `state.error_count` is compared to `self.config.max_error_count` to match the chosen semantics).pallas-network2/src/behavior/responder/handshake.rs (1)
104-117: State transition happens after sending Accept message.Line 112 sets
state.connection = ConnectionState::Initializedafter queueing the Accept message. Since the outbound message is only queued (not sent synchronously), if message transmission fails, the state will incorrectly reflect "Initialized" even though the peer never received confirmation.Consider whether this is acceptable behavior or if state transition should be deferred until
on_outbound_msgconfirms the Accept was sent.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pallas-network2/src/behavior/responder/handshake.rs` around lines 104 - 117, The state transition to ConnectionState::Initialized and the handshake completion side-effects must not occur immediately after outbound.push_ready; instead defer them until the Accept is actually sent (or handle send failure to revert). Move the state update (state.connection = ConnectionState::Initialized), the handshakes_completed_counter.add(1, &[]), and the outbound ExternalEvent (ResponderEvent::PeerInitialized(...)) out of this block and perform them from the send-confirmation path (e.g., in on_outbound_msg or the code that handles successful InterfaceCommand::Send for AnyMessage::Handshake::Accept), or add explicit failure handling to revert the state when InterfaceCommand::Send fails; keep references to outbound.push_ready, BehaviorOutput::InterfaceCommand, InterfaceCommand::Send, AnyMessage::Handshake, ConnectionState::Initialized, handshakes_completed_counter, and ResponderEvent::PeerInitialized to locate the changes.pallas-network2/src/behavior/responder/txsubmission.rs (1)
55-75:try_request_tx_idsalways requests withblocking=trueandack=0.Line 70 hardcodes
RequestTxIds(true, 0, self.config.max_tx_request). The first parameter (true) indicates blocking mode, and the second (0) is the acknowledgment count.In blocking mode with ack=0, the responder signals it has no transactions to acknowledge and wants to wait for new tx IDs. This may be intentional for a simple responder, but consider whether the ack count should track previously processed transactions.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pallas-network2/src/behavior/responder/txsubmission.rs` around lines 55 - 75, The code in try_request_tx_ids always sends RequestTxIds(true, 0, ...) which hardcodes blocking=true and ack=0; update it to compute and pass an appropriate ack count and blocking flag instead of literals. Locate try_request_tx_ids and replace the literal RequestTxIds(true, 0, self.config.max_tx_request) with values derived from responder state or config (e.g. let blocking = /* compute from state or config */; let ack = state.acknowledged_count() or state.processed_tx_count() or a configurable field), then call txsubmission_proto::Message::RequestTxIds(blocking, ack, self.config.max_tx_request) so the ack reflects previously processed transactions and blocking is adjustable. Ensure any new accessor (e.g. acknowledged_count) is implemented on ResponderState or a config flag is added.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@examples/p2p-responder/src/main.rs`:
- Around line 128-131: Modify tick() to return a bool and use that to detect a
terminated network stream: inside tick(), when self.network.poll_next() yields
None, return false to signal stream exhaustion; when an event is received and
handled via self.handle_event(evt).await, return true so processing continues.
Then change run_forever() to loop using the boolean result (e.g., while
self.tick().await { } ) so the loop breaks when tick() returns false instead of
busy-looping.
---
Duplicate comments:
In `@examples/p2p-responder/src/main.rs`:
- Around line 151-153: The startup log calls listener.local_addr().unwrap()
which can panic; instead handle the Result from listener.local_addr() explicitly
(e.g., match or if let) and log safely: call listener.local_addr(), on Ok(addr)
include addr in the tracing::info! message, and on Err(err) emit a
tracing::error! (or tracing::warn!) with the error and a fallback message so the
process doesn't panic during startup; update the logging site that currently
uses tracing::info! to use the handled result.
In `@pallas-network2/src/behavior/initiator/mod.rs`:
- Around line 514-516: The InitiatorCommand::SendTx arm currently only logs and
drops the command causing false-success behavior; update the
InitiatorCommand::SendTx handling in mod.rs to enqueue the transaction into the
existing initiator submission flow instead of returning early: locate the
InitiatorCommand::SendTx match arm and invoke the same scheduling/enqueue path
used by other submit actions (reuse the initiator's tx queue/submit function or
call the existing submit_tx / schedule_tx helper used elsewhere in this module),
propagate any errors back to the caller or emit an error result via the same
notifier/channel the initiator uses, and ensure you include any necessary
metadata (originator ID / nonce) so the tx is actually processed rather than
silently ignored.
- Around line 389-395: The on_discovered implementation currently always creates
a fresh InitiatorState and unconditionally replaces any existing entry via
self.peers.insert, which wipes error_count, promotion, and protocol progress;
change it to update idempotently by using the map entry API
(self.peers.entry(pid.clone())) to only insert a new InitiatorState when no
state exists, and run all_visitors! (visit_discovered) against that
existing-or-new state instead of always creating a brand new one; in short,
replace the unconditional InitiatorState::new + insert with
entry().or_insert_with(...) (or entry().or_default()) and call all_visitors! on
the resulting mutable reference so existing state is preserved.
---
Nitpick comments:
In `@pallas-network2/src/behavior/responder/connection.rs`:
- Around line 109-120: The per-IP counter (self.connections_per_ip) is
incremented before the enforcement check, allowing brief overcounting and race
windows; change the logic in the connection acceptance path so you check the
current count against self.config.max_connections_per_ip first and only
increment the entry when you decide to accept the connection, ensuring rejected
connections trigger
outbound.push_ready(InterfaceCommand::Disconnect(pid.clone())) without modifying
the counter; keep visit_disconnected as the decrement path for accepted
connections and ensure the same pid.host key is used for both increment and
decrement to avoid leaks.
- Around line 7-19: The default max_error_count is 1 but the ban check uses
`state.error_count > self.config.max_error_count`, which causes a ban on the
second error (ambiguous semantics); decide the intended meaning and fix either
by adjusting the default in ConnectionResponderConfig::default (e.g., set
max_error_count to 0 if you want "ban on first error" or to 2 if you want to
allow two errors) or by changing the ban condition in the code that checks
`state.error_count > self.config.max_error_count` to use `>=` so that
`max_error_count` clearly means "ban at this many errors" (update the comparison
where `state.error_count` is compared to `self.config.max_error_count` to match
the chosen semantics).
In `@pallas-network2/src/behavior/responder/handshake.rs`:
- Around line 104-117: The state transition to ConnectionState::Initialized and
the handshake completion side-effects must not occur immediately after
outbound.push_ready; instead defer them until the Accept is actually sent (or
handle send failure to revert). Move the state update (state.connection =
ConnectionState::Initialized), the handshakes_completed_counter.add(1, &[]), and
the outbound ExternalEvent (ResponderEvent::PeerInitialized(...)) out of this
block and perform them from the send-confirmation path (e.g., in on_outbound_msg
or the code that handles successful InterfaceCommand::Send for
AnyMessage::Handshake::Accept), or add explicit failure handling to revert the
state when InterfaceCommand::Send fails; keep references to outbound.push_ready,
BehaviorOutput::InterfaceCommand, InterfaceCommand::Send, AnyMessage::Handshake,
ConnectionState::Initialized, handshakes_completed_counter, and
ResponderEvent::PeerInitialized to locate the changes.
In `@pallas-network2/src/behavior/responder/mod.rs`:
- Around line 490-505: The Stream::poll_next implementation for
ResponderBehavior currently matches on the result of
self.outbound.futures.poll_next_unpin(cx) and re-returns each arm unchanged;
simplify by directly returning the polled value (i.e., return
self.outbound.futures.poll_next_unpin(cx)) inside poll_next to remove the
redundant match while keeping the same behavior; update the body of
ResponderBehavior::poll_next to delegate directly to
outbound.futures.poll_next_unpin(cx).
- Around line 77-89: Add doc comments to the ResponderState struct and its
public(crate) fields explaining the lifecycle and semantics: describe the
overall state model (e.g., Connected -> Initialized -> Disconnected), what
handshake/keepalive/peersharing/blockfetch/chainsync/tx_submission represent,
when violation is set and how error_count and violations_counter are used, and
note which public methods (is_initialized(), version()) rely on these fields;
update the doc comment on ResponderState (and brief inline comments for fields
like connection, handshake, violation, error_count, violations_counter) so
future readers can map fields to lifecycle transitions and observable behavior.
- Around line 200-210: The reset() method currently reinitializes most per-peer
fields but omits clearing error_count; update responder::reset() to also reset
self.error_count (e.g., set self.error_count = 0 or Default::default()) so stale
error counts don't persist across resets (used by reset() and referenced by
on_disconnected); modify the reset() function in mod.rs (the same block that
touches ConnectionState, handshake, keepalive, etc.) to include resetting
error_count.
In `@pallas-network2/src/behavior/responder/txsubmission.rs`:
- Around line 55-75: The code in try_request_tx_ids always sends
RequestTxIds(true, 0, ...) which hardcodes blocking=true and ack=0; update it to
compute and pass an appropriate ack count and blocking flag instead of literals.
Locate try_request_tx_ids and replace the literal RequestTxIds(true, 0,
self.config.max_tx_request) with values derived from responder state or config
(e.g. let blocking = /* compute from state or config */; let ack =
state.acknowledged_count() or state.processed_tx_count() or a configurable
field), then call txsubmission_proto::Message::RequestTxIds(blocking, ack,
self.config.max_tx_request) so the ack reflects previously processed
transactions and blocking is adjustable. Ensure any new accessor (e.g.
acknowledged_count) is implemented on ResponderState or a config flag is added.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
examples/p2p-responder/src/main.rspallas-network2/src/behavior/initiator/mod.rspallas-network2/src/behavior/responder/connection.rspallas-network2/src/behavior/responder/handshake.rspallas-network2/src/behavior/responder/mod.rspallas-network2/src/behavior/responder/txsubmission.rs
Summary by CodeRabbit
New Features
Observability
Examples
Tests
Chores