From 4dfb60d2b38b6ba5a49c554a517b4ae8ce326ec5 Mon Sep 17 00:00:00 2001 From: Jorge Antonio Date: Wed, 26 Feb 2025 22:14:54 -0700 Subject: [PATCH 1/6] first commit --- Cargo.lock | 70 +++++++++++++++++++++++++++++--------------- atoma-p2p/Cargo.toml | 1 + 2 files changed, 48 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5e9fde92..8c9ce515 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1685,9 +1685,9 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.39" +version = "0.4.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" +checksum = "1a7964611d71df112cb1730f2ee67324fcf4d0fc6606acbbe9bfe06df124637c" dependencies = [ "android-tzdata", "iana-time-zone", @@ -1695,7 +1695,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.52.6", + "windows-link", ] [[package]] @@ -4719,6 +4719,7 @@ dependencies = [ "libp2p-metrics", "libp2p-noise", "libp2p-quic", + "libp2p-request-response", "libp2p-swarm", "libp2p-tcp", "libp2p-upnp", @@ -4973,6 +4974,23 @@ dependencies = [ "tracing", ] +[[package]] +name = "libp2p-request-response" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "548fe44a80ff275d400f1b26b090d441d83ef73efabbeb6415f4ce37e5aed865" +dependencies = [ + "async-trait", + "futures", + "futures-bounded", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "rand 0.8.5", + "smallvec", + "tracing", +] + [[package]] name = "libp2p-swarm" version = "0.46.0" @@ -5107,9 +5125,9 @@ checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" [[package]] name = "litemap" -version = "0.7.4" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ee93343901ab17bd981295f2cf0026d4ad018c7c31ba84549a4ddbb47a45104" +checksum = "23fb14cb19457329c82206317a5663005a4d404783dc74f4252769b0d5f42856" [[package]] name = "lock_api" @@ -7798,9 +7816,9 @@ checksum = "48fd7bd8a6377e15ad9d42a8ec25371b94ddc67abe7c8b9127bec79bebaaae18" [[package]] name = "rust-embed" -version = "8.5.0" +version = "8.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa66af4a4fdd5e7ebc276f115e895611a34739a9c1c01028383d612d550953c0" +checksum = "0b3aba5104622db5c9fc61098de54708feb732e7763d7faa2fa625899f00bf6f" dependencies = [ "rust-embed-impl", "rust-embed-utils", @@ -7809,9 +7827,9 @@ dependencies = [ [[package]] name = "rust-embed-impl" -version = "8.5.0" +version = "8.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6125dbc8867951125eec87294137f4e9c2c96566e61bf72c45095a7c77761478" +checksum = "1f198c73be048d2c5aa8e12f7960ad08443e56fd39cc26336719fdb4ea0ebaae" dependencies = [ "proc-macro2", "quote", @@ -7822,9 +7840,9 @@ dependencies = [ [[package]] name = "rust-embed-utils" -version = "8.5.0" +version = "8.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e5347777e9aacb56039b0e1f28785929a8a3b709e87482e7442c72e7c12529d" +checksum = "5a2fcdc9f40c8dc2922842ca9add611ad19f332227fc651d015881ad1552bd9a" dependencies = [ "sha2 0.10.8", "walkdir", @@ -8074,9 +8092,9 @@ dependencies = [ [[package]] name = "schemars" -version = "0.8.21" +version = "0.8.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09c024468a378b7e36765cd36702b7a90cc3cba11654f6685c8f233408e89e92" +checksum = "3fbf2ae1b8bc8e02df939598064d22402220cd5bbcca1c76f7d6a310974d5615" dependencies = [ "dyn-clone", "either", @@ -8087,9 +8105,9 @@ dependencies = [ [[package]] name = "schemars_derive" -version = "0.8.21" +version = "0.8.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1eee588578aff73f856ab961cd2f79e36bc45d7ded33a7562adba4667aecc0e" +checksum = "32e265784ad618884abaea0600a9adf15393368d840e0222d101a072f3f7534d" dependencies = [ "proc-macro2", "quote", @@ -10663,9 +10681,9 @@ dependencies = [ [[package]] name = "uuid" -version = "1.14.0" +version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93d59ca99a559661b96bf898d8fce28ed87935fd2bea9f05983c1464dd6c71b1" +checksum = "e0f540e3240398cce6128b64ba83fdbdd86129c16a3aa1a3a252efd66eb3d587" dependencies = [ "getrandom 0.3.1", "rand 0.9.0", @@ -11083,6 +11101,12 @@ dependencies = [ "syn 2.0.98", ] +[[package]] +name = "windows-link" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dccfd733ce2b1753b03b6d3c65edf020262ea35e20ccdf3e288043e6dd620e3" + [[package]] name = "windows-registry" version = "0.2.0" @@ -11541,18 +11565,18 @@ dependencies = [ [[package]] name = "zerofrom" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cff3ee08c995dee1859d998dea82f7374f2826091dd9cd47def953cae446cd2e" +checksum = "50cc42e0333e05660c3587f3bf9d0478688e15d870fab3346451ce7f8c9fbea5" dependencies = [ "zerofrom-derive", ] [[package]] name = "zerofrom-derive" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808" +checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" dependencies = [ "proc-macro2", "quote", @@ -11604,9 +11628,9 @@ dependencies = [ [[package]] name = "zip" -version = "2.2.2" +version = "2.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae9c1ea7b3a5e1f4b922ff856a129881167511563dc219869afe3787fc0c1a45" +checksum = "b280484c454e74e5fff658bbf7df8fdbe7a07c6b2de4a53def232c15ef138f3a" dependencies = [ "arbitrary", "crc32fast", diff --git a/atoma-p2p/Cargo.toml b/atoma-p2p/Cargo.toml index 2b257876..62ccd848 100644 --- a/atoma-p2p/Cargo.toml +++ b/atoma-p2p/Cargo.toml @@ -19,6 +19,7 @@ libp2p = { workspace = true, features = [ "mdns", "kad", "macros", + "request-response", "quic", "tcp", "yamux", From 59a768c1e69502cc3dd5ec52a1af15d9d0561273 Mon Sep 17 00:00:00 2001 From: Jorge Antonio Date: Sun, 2 Mar 2025 10:35:31 -0700 Subject: [PATCH 2/6] request response --- Cargo.lock | 1 + Cargo.toml | 1 + atoma-p2p/Cargo.toml | 1 + atoma-p2p/src/lib.rs | 1 + atoma-p2p/src/service.rs | 23 ++++-- atoma-p2p/src/stack_leader.rs | 141 ++++++++++++++++++++++++++++++++++ 6 files changed, 162 insertions(+), 6 deletions(-) create mode 100644 atoma-p2p/src/stack_leader.rs diff --git a/Cargo.lock b/Cargo.lock index a6699c1b..81f13c36 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -899,6 +899,7 @@ dependencies = [ name = "atoma-p2p" version = "0.1.0" dependencies = [ + "async-trait", "blake3", "bytes", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 850f1dbd..1e281ca5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ license = "Apache-2.0" [workspace.dependencies] aes-gcm = "0.10.3" anyhow = "1.0.81" +async-trait = "0.1.86" atoma-confidential = { path = "./atoma-confidential" } atoma-daemon = { path = "./atoma-daemon" } atoma-p2p = { path = "./atoma-p2p" } diff --git a/atoma-p2p/Cargo.toml b/atoma-p2p/Cargo.toml index 8783801e..f0706e7a 100644 --- a/atoma-p2p/Cargo.toml +++ b/atoma-p2p/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true license.workspace = true [dependencies] +async-trait = { workspace = true } blake3 = { workspace = true } bytes = { workspace = true } ciborium = { workspace = true } diff --git a/atoma-p2p/src/lib.rs b/atoma-p2p/src/lib.rs index e0b720ec..31c102fe 100644 --- a/atoma-p2p/src/lib.rs +++ b/atoma-p2p/src/lib.rs @@ -4,6 +4,7 @@ pub mod constants; pub mod errors; pub mod metrics; pub mod service; +pub mod stack_leader; pub mod timer; pub mod types; pub mod utils; diff --git a/atoma-p2p/src/service.rs b/atoma-p2p/src/service.rs index 5eeed34d..6cda39b5 100644 --- a/atoma-p2p/src/service.rs +++ b/atoma-p2p/src/service.rs @@ -2,14 +2,14 @@ use crate::{ config::AtomaP2pNodeConfig, errors::AtomaP2pNodeError, metrics::{ - NetworkMetrics, TOTAL_CONNECTIONS, TOTAL_DIALS_ATTEMPTED, TOTAL_DIALS_FAILED, - TOTAL_INVALID_GOSSIPSUB_MESSAGES_RECEIVED, TOTAL_MDNS_DISCOVERIES, - }, - metrics::{ - KAD_ROUTING_TABLE_SIZE, PEERS_CONNECTED, TOTAL_FAILED_GOSSIPSUB_PUBLISHES, + NetworkMetrics, KAD_ROUTING_TABLE_SIZE, PEERS_CONNECTED, TOTAL_CONNECTIONS, + TOTAL_DIALS_ATTEMPTED, TOTAL_DIALS_FAILED, TOTAL_FAILED_GOSSIPSUB_PUBLISHES, TOTAL_GOSSIPSUB_MESSAGES_FORWARDED, TOTAL_GOSSIPSUB_PUBLISHES, - TOTAL_GOSSIPSUB_SUBSCRIPTIONS, TOTAL_INCOMING_CONNECTIONS, TOTAL_OUTGOING_CONNECTIONS, + TOTAL_GOSSIPSUB_SUBSCRIPTIONS, TOTAL_INCOMING_CONNECTIONS, + TOTAL_INVALID_GOSSIPSUB_MESSAGES_RECEIVED, TOTAL_MDNS_DISCOVERIES, + TOTAL_OUTGOING_CONNECTIONS, }, + stack_leader::{StackLeaderCodec, StackLeaderProtocol}, timer::usage_metrics_timer_task, types::{AtomaP2pEvent, NodeMessage, SerializeWithSignature, SignedNodeMessage}, utils::{extract_gossipsub_metrics, validate_signed_node_message}, @@ -20,6 +20,7 @@ use futures::StreamExt; use libp2p::{ gossipsub::{self}, identify, identity, kad, mdns, noise, + request_response::{self, ProtocolSupport}, swarm::{NetworkBehaviour, SwarmEvent}, tcp, yamux, PeerId, StreamProtocol, Swarm, SwarmBuilder, }; @@ -82,6 +83,10 @@ struct AtomaP2pBehaviour { /// Particularly useful for development and local testing environments where nodes /// need to find each other without explicit configuration. mdns: mdns::tokio::Behaviour, + + /// Provides a way to request-response messages across the P2P network. + /// Used for requesting compute units from the stack leader. + stack_leader_request_response: request_response::Behaviour, } /// A P2P node implementation for the Atoma network that handles peer discovery, @@ -278,11 +283,17 @@ impl AtomaP2pNode { key.public(), )); + let stack_leader_request_response = request_response::Behaviour::new( + vec![(StackLeaderProtocol::default(), ProtocolSupport::Full)], + request_response::Config::default(), + ); + Ok(AtomaP2pBehaviour { gossipsub, identify, kademlia, mdns, + stack_leader_request_response, }) }) .map_err(|e| { diff --git a/atoma-p2p/src/stack_leader.rs b/atoma-p2p/src/stack_leader.rs new file mode 100644 index 00000000..09e0fe1a --- /dev/null +++ b/atoma-p2p/src/stack_leader.rs @@ -0,0 +1,141 @@ +use async_trait::async_trait; +use bytes::{BufMut, BytesMut}; +use futures::io::{self, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use libp2p::request_response::Codec; +use serde::{Deserialize, Serialize}; +use sui_sdk::types::crypto::Signature; + +#[derive(Clone, Debug)] +pub struct StackLeaderProtocol { + _version: u8, +} + +impl Default for StackLeaderProtocol { + fn default() -> Self { + Self { _version: 1 } + } +} + +impl AsRef for StackLeaderProtocol { + fn as_ref(&self) -> &'static str { + "/atoma/stack-leader/0.0.1" + } +} + +#[derive(Clone, Default)] +pub struct StackLeaderCodec(); + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct StackSmallId(u64); + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct NodeSmallId(u64); + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct StackAvailableComputeUnitsRequest { + pub stack_small_id: StackSmallId, + pub node_small_id: NodeSmallId, + pub num_compute_units: u64, + pub timestamp: u64, + pub signature: Signature, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct StackAvailableComputeUnitsResponse { + pub is_permissed: bool, + pub stack_small_id: StackSmallId, + pub timestamp: u64, + pub signature: Signature, + pub remaining_available_compute_units: u64, +} + +#[async_trait] +impl Codec for StackLeaderCodec { + type Request = StackAvailableComputeUnitsRequest; + type Response = StackAvailableComputeUnitsResponse; + type Protocol = StackLeaderProtocol; + + async fn read_request( + &mut self, + _protocol: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + // Read the length prefix (4 bytes) + let mut length_bytes = [0u8; 4]; + io.read_exact(&mut length_bytes).await?; + let length = u32::from_be_bytes(length_bytes) as usize; + + // Read the serialized data + let mut data = vec![0u8; length]; + io.read_exact(&mut data).await?; + + // Deserialize the request + ciborium::de::from_reader(&data[..]) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string())) + } + + async fn read_response( + &mut self, + _protocol: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + // Read the length prefix (4 bytes) + let mut length_bytes = [0u8; 4]; + io.read_exact(&mut length_bytes).await?; + let length = u32::from_be_bytes(length_bytes) as usize; + + // Read the serialized data + let mut data = vec![0u8; length]; + io.read_exact(&mut data).await?; + + // Deserialize the response + ciborium::de::from_reader(&data[..]) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string())) + } + + async fn write_request( + &mut self, + _protocol: &Self::Protocol, + io: &mut T, + request: Self::Request, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + let mut buffer = BytesMut::new(); + ciborium::into_writer(&request, (&mut buffer).writer()) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?; + let bytes = buffer.freeze(); + let length = u32::try_from(bytes.len()) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?; + io.write_all(&length.to_be_bytes()).await?; + io.write_all(&bytes).await?; + Ok(()) + } + + async fn write_response( + &mut self, + _protocol: &Self::Protocol, + io: &mut T, + response: Self::Response, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + let mut buffer = BytesMut::new(); + ciborium::into_writer(&response, (&mut buffer).writer()) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?; + let bytes = buffer.freeze(); + let length = u32::try_from(bytes.len()) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?; + io.write_all(&length.to_be_bytes()).await?; + io.write_all(&bytes).await?; + Ok(()) + } +} From 1318656941a55e17c74e403d16622d5b4582949d Mon Sep 17 00:00:00 2001 From: chad Date: Mon, 3 Mar 2025 21:45:18 -0500 Subject: [PATCH 3/6] wip --- atoma-p2p/src/handlers.rs | 440 ++++++++++++++++++++++++++++++++++++++ atoma-p2p/src/lib.rs | 1 + atoma-p2p/src/service.rs | 409 +---------------------------------- 3 files changed, 443 insertions(+), 407 deletions(-) create mode 100644 atoma-p2p/src/handlers.rs diff --git a/atoma-p2p/src/handlers.rs b/atoma-p2p/src/handlers.rs new file mode 100644 index 00000000..805e9799 --- /dev/null +++ b/atoma-p2p/src/handlers.rs @@ -0,0 +1,440 @@ +use crate::metrics::{ + KAD_ROUTING_TABLE_SIZE, PEERS_CONNECTED, TOTAL_CONNECTIONS, TOTAL_DIALS_ATTEMPTED, + TOTAL_DIALS_FAILED, TOTAL_FAILED_GOSSIPSUB_PUBLISHES, TOTAL_GOSSIPSUB_MESSAGES_FORWARDED, + TOTAL_GOSSIPSUB_PUBLISHES, TOTAL_GOSSIPSUB_SUBSCRIPTIONS, TOTAL_INCOMING_CONNECTIONS, + TOTAL_INVALID_GOSSIPSUB_MESSAGES_RECEIVED, TOTAL_MDNS_DISCOVERIES, TOTAL_OUTGOING_CONNECTIONS, +}; +use libp2p::{gossipsub, swarm::SwarmEvent}; +use opentelemetry::KeyValue; + +pub fn handle_p2p_event(event: SwarmEvent) { + match event { + SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Gossipsub(gossipsub::Event::Message { + message_id, + message, + propagation_source, + })) => { + match self + .handle_gossipsub_message(message.data.into(), &message_id, &propagation_source) + .await + { + Ok(()) => { + TOTAL_GOSSIPSUB_MESSAGES_FORWARDED.add( + 1, + &[KeyValue::new( + "peerId", + self.swarm.local_peer_id().to_base58(), + )], + ); + } + Err(e) => { + TOTAL_INVALID_GOSSIPSUB_MESSAGES_RECEIVED.add( + 1, + &[KeyValue::new( + "peerId", + self.swarm.local_peer_id().to_base58(), + )], + ); + error!( + target = "atoma-p2p", + event = "gossipsub_message_error", + error = %e, + "Failed to handle gossipsub message" + ); + } + } + } + SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Gossipsub( + gossipsub::Event::Subscribed { peer_id, topic }, + )) => { + // Record subscript metrics + TOTAL_GOSSIPSUB_SUBSCRIPTIONS.add(1, &[KeyValue::new("topic", topic.to_string())]); + metrics.record(&gossipsub::Event::Subscribed { + peer_id, + topic: topic.clone(), + }); + + debug!( + target = "atoma-p2p", + event = "gossipsub_subscribed", + peer_id = %peer_id, + topic = %topic, + "Peer subscribed to topic" + ); + } + SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Gossipsub( + gossipsub::Event::Unsubscribed { peer_id, topic }, + )) => { + // Record unsubscription metrics + TOTAL_GOSSIPSUB_SUBSCRIPTIONS.add(-1, &[KeyValue::new("topic", topic.to_string())]); + metrics.record(&gossipsub::Event::Unsubscribed { + peer_id, + topic: topic.clone(), + }); + + debug!( + target = "atoma-p2p", + event = "gossipsub_unsubscribed", + peer_id = %peer_id, + topic = %topic, + "Peer unsubscribed from topic" + ); + } + SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Mdns(mdns::Event::Discovered( + discovered_peers, + ))) => { + let peer_count = discovered_peers.len() as u64; + debug!( + target = "atoma-p2p", + event = "mdns_discovered", + peer_count = %peer_count, + "Mdns discovered peers" + ); + for (peer_id, multiaddr) in discovered_peers { + debug!( + target = "atoma-p2p", + event = "mdns_discovered_peer", + peer_id = %peer_id, + multiaddr = %multiaddr, + "Mdns discovered peer" + ); + self.swarm + .behaviour_mut() + .kademlia + .add_address(&peer_id, multiaddr); + } + // Record discovery metrics + TOTAL_MDNS_DISCOVERIES.add(peer_count, &[KeyValue::new("peerId", peer_id.clone())]); + } + SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Mdns(mdns::Event::Expired( + expired_peers, + ))) => { + debug!( + target = "atoma-p2p", + event = "mdns_expired", + num_expired_peers = %expired_peers.len(), + "Mdns expired" + ); + for (peer_id, multiaddr) in expired_peers { + debug!( + target = "atoma-p2p", + event = "mdns_expired_peer", + peer_id = %peer_id, + multiaddr = %multiaddr, + "Mdns expired peer" + ); + self.swarm + .behaviour_mut() + .kademlia + .remove_address(&peer_id, &multiaddr); + } + } + SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Kademlia(kad::Event::RoutingUpdated { + peer, + is_new_peer, + addresses, + bucket_range, + old_peer, + })) => { + debug!( + target = "atoma-p2p", + event = "kademlia_routing_updated", + peer = %peer, + is_new_peer = %is_new_peer, + addresses = ?addresses, + bucket_range = ?bucket_range, + old_peer = ?old_peer, + "Kademlia routing updated" + ); + KAD_ROUTING_TABLE_SIZE.record( + addresses.len() as u64, + &[KeyValue::new("peerId", peer.to_base58())], + ); + metrics.record(&kad::Event::RoutingUpdated { + peer, + is_new_peer, + addresses, + bucket_range, + old_peer, + }); + } + SwarmEvent::ConnectionEstablished { + peer_id, + num_established, + established_in, + connection_id, + .. + } => { + debug!( + target = "atoma-p2p", + event = "peer_connection_established", + peer_id = %peer_id, + num_established = %num_established, + established_in = ?established_in, + connection_id = %connection_id, + "Peer connection established" + ); + } + SwarmEvent::ConnectionClosed { + peer_id, + connection_id, + num_established, + .. + } => { + debug!( + target = "atoma-p2p", + event = "peer_connection_closed", + peer_id = %peer_id, + connection_id = %connection_id, + num_established = %num_established, + "Peer connection closed" + ); + } + SwarmEvent::NewListenAddr { + listener_id, + address, + .. + } => { + debug!( + target = "atoma-p2p", + event = "new_listen_addr", + listener_id = %listener_id, + address = %address, + "New listen address" + ); + } + SwarmEvent::ExpiredListenAddr { + listener_id, + address, + } => { + debug!( + target = "atoma-p2p", + event = "expired_listen_addr", + listener_id = %listener_id, + address = %address, + "Expired listen address" + ); + } + SwarmEvent::Dialing { + peer_id, + connection_id, + .. + } => { + debug!( + target = "atoma-p2p", + event = "dialing", + peer_id = ?peer_id, + connection_id = %connection_id, + "Dialing peer" + ); + TOTAL_DIALS_ATTEMPTED.add(1, &[KeyValue::new("peerId", peer_id.unwrap().to_base58())]); + } + SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { + TOTAL_DIALS_FAILED.add(1, &[KeyValue::new("peerId", peer_id.unwrap().to_base58())]); + error!( + target = "atoma-p2p", + event = "outgoing_connection_error", + peer_id = ?peer_id, + error = %error, + "Outgoing connection error" + ); + } + SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Identify(identify_event)) => { + tracing::debug!( + target = "atoma-p2p", + event = "identify", + identify_event = ?identify_event, + "Identify event" + ); + metrics.record(&identify_event); + } + SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Kademlia(kad_event)) => { + tracing::debug!( + target = "atoma-p2p", + event = "kad", + kad_event = ?kad_event, + "Kad event" + ); + metrics.record(&kad_event); + } + swarm_event => { + tracing::debug!( + target = "atoma-p2p", + event = "swarm_event", + swarm_event = ?swarm_event, + "Swarm event" + ); + metrics.record(&swarm_event); + } + } +} + +/// Handles incoming gossipsub messages in the P2P network. +/// +/// This method processes UsageMetrics messages by: +/// 1. Validating the message's signature and timestamp +/// 2. Verifying the node's ownership of its small ID +/// 3. Reporting the validation result to the gossipsub protocol +/// 4. Storing the node's public URL in the state manager (if this peer is a client) +/// +/// # Message Flow +/// +/// 1. Receives a message from the gossipsub network +/// 2. Skips processing if the message is from self +/// 3. Deserializes the message into a GossipMessage +/// 4. For UsageMetrics messages: +/// - Validates the message using `validate_usage_metrics_message` +/// - Reports validation result to gossipsub protocol +/// - If this peer is a client, stores the node's public URL in state manager +/// +/// # Arguments +/// +/// * `message_data` - Raw bytes of the received gossipsub message (CBOR encoded) +/// * `message_id` - Unique identifier for the gossipsub message +/// * `propagation_source` - The peer that forwarded this message +/// +/// # Returns +/// +/// Returns `Ok(())` if message processing succeeds, or an error if: +/// - Message deserialization fails +/// - Message validation fails +/// - Reporting validation result fails +/// - Storing public URL fails (for clients) +/// +/// # Errors +/// +/// This function can return the following errors: +/// * `UsageMetricsSerializeError` - If CBOR deserialization fails +/// * `StateManagerError` - If storing public URL fails (for clients) +/// +/// # Security Considerations +/// +/// - Messages from self are ignored to prevent message loops +/// - Messages are validated before processing +/// - Only clients store public URLs to prevent unnecessary data storage +/// - Uses CBOR for efficient binary serialization +/// +/// # Example +/// +/// ```rust,ignore +/// let message_data = // ... received from gossipsub ...; +/// let message_id = // ... message identifier ...; +/// let propagation_source = // ... peer ID ...; +/// +/// match node.handle_gossipsub_message(&message_data, &message_id, &propagation_source).await { +/// Ok(()) => println!("Message processed successfully"), +/// Err(e) => println!("Failed to process message: {}", e), +/// } +/// ``` +/// +/// # Message Validation +/// +/// Messages are validated by: +/// 1. Checking the URL format and timestamp freshness +/// 2. Verifying the cryptographic signature +/// 3. Confirming the node's ownership of its small ID +/// +/// Invalid messages are rejected and not propagated further in the network. +#[instrument(level = "debug", skip_all)] +pub async fn handle_gossipsub_message( + &mut self, + message_data: Bytes, + message_id: &gossipsub::MessageId, + propagation_source: &PeerId, +) -> Result<(), AtomaP2pNodeError> { + debug!( + target = "atoma-p2p", + event = "gossipsub_message", + message_id = %message_id, + propagation_source = %propagation_source, + "Received gossipsub message" + ); + if propagation_source == self.swarm.local_peer_id() { + debug!( + target = "atoma-p2p", + event = "gossipsub_message_from_self", + "Gossipsub message from self" + ); + // Do not re-publish the node's own message, just return `Ok(()) + return Ok(()); + } + // Directly deserialize SignedNodeMessage using new method + let signed_node_message = SignedNodeMessage::deserialize_with_signature(&message_data)?; + let signature_len = signed_node_message.signature.len(); + debug!( + target = "atoma-p2p", + event = "gossipsub_message_data", + message_id = %message_id, + propagation_source = %propagation_source, + "Received gossipsub message data" + ); + let node_message = &signed_node_message.node_message; + let node_message_hash = blake3::hash(&message_data[signature_len..]); + let message_acceptance = match validate_signed_node_message( + node_message, + node_message_hash.as_bytes(), + &signed_node_message.signature, + &self.state_manager_sender, + ) + .await + { + Ok(()) => gossipsub::MessageAcceptance::Accept, + Err(e) => { + error!( + target = "atoma-p2p", + event = "gossipsub_message_validation_error", + error = %e, + "Failed to validate gossipsub message" + ); + // NOTE: We should reject the message if it fails to validate + // as it means the node is not being following the current protocol + if let AtomaP2pNodeError::UrlParseError(_) = e { + // We remove the peer from the gossipsub topic, because it is not a valid URL and therefore cannot be reached + // by clients for processing OpenAI api compatible AI requests, so these peers are not useful for the network + self.swarm + .behaviour_mut() + .gossipsub + .remove_explicit_peer(propagation_source); + } + gossipsub::MessageAcceptance::Reject + } + }; + // Report the message validation result to the gossipsub protocol + let is_in_mempool = self + .swarm + .behaviour_mut() + .gossipsub + .report_message_validation_result(message_id, propagation_source, message_acceptance); + if is_in_mempool { + debug!( + target = "atoma-p2p", + event = "gossipsub_message_in_mempool", + message_id = %message_id, + propagation_source = %propagation_source, + "Gossipsub message already in the mempool, no need to take further actions" + ); + return Ok(()); + } + // If the current peer is a client, we need to store the public URL in the state manager + if self.is_client { + let node_message = signed_node_message.node_message; + let event = AtomaP2pEvent::NodeMetricsRegistrationEvent { + public_url: node_message.node_metadata.node_public_url, + node_small_id: node_message.node_metadata.node_small_id, + timestamp: node_message.node_metadata.timestamp, + country: node_message.node_metadata.country, + node_metrics: node_message.node_metrics, + }; + self.state_manager_sender.send((event, None)).map_err(|e| { + error!( + target = "atoma-p2p", + event = "gossipsub_message_state_manager_error", + error = %e, + "Failed to send event to state manager" + ); + AtomaP2pNodeError::StateManagerError(e) + })?; + } + + Ok(()) +} diff --git a/atoma-p2p/src/lib.rs b/atoma-p2p/src/lib.rs index 31c102fe..0977cf87 100644 --- a/atoma-p2p/src/lib.rs +++ b/atoma-p2p/src/lib.rs @@ -2,6 +2,7 @@ pub mod broadcast_metrics; pub mod config; pub mod constants; pub mod errors; +pub mod handlers; pub mod metrics; pub mod service; pub mod stack_leader; diff --git a/atoma-p2p/src/service.rs b/atoma-p2p/src/service.rs index 6cda39b5..e11cbe76 100644 --- a/atoma-p2p/src/service.rs +++ b/atoma-p2p/src/service.rs @@ -1,6 +1,7 @@ use crate::{ config::AtomaP2pNodeConfig, errors::AtomaP2pNodeError, + handlers::handle_p2p_event, metrics::{ NetworkMetrics, KAD_ROUTING_TABLE_SIZE, PEERS_CONNECTED, TOTAL_CONNECTIONS, TOTAL_DIALS_ATTEMPTED, TOTAL_DIALS_FAILED, TOTAL_FAILED_GOSSIPSUB_PUBLISHES, @@ -547,243 +548,7 @@ impl AtomaP2pNode { } event = self.swarm.select_next_some() => { - match event { - SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Gossipsub(gossipsub::Event::Message { - message_id, - message, - propagation_source, - })) => { - match self.handle_gossipsub_message(message.data.into(), &message_id, &propagation_source).await { - Ok(()) => { - TOTAL_GOSSIPSUB_MESSAGES_FORWARDED.add(1, &[KeyValue::new("peerId", self.swarm.local_peer_id().to_base58())]); - } - Err(e) => { - TOTAL_INVALID_GOSSIPSUB_MESSAGES_RECEIVED.add(1, &[KeyValue::new("peerId", self.swarm.local_peer_id().to_base58())]); - error!( - target = "atoma-p2p", - event = "gossipsub_message_error", - error = %e, - "Failed to handle gossipsub message" - ); - } - } - } - SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Gossipsub(gossipsub::Event::Subscribed { - peer_id, - topic, - })) => { - // Record subscript metrics - TOTAL_GOSSIPSUB_SUBSCRIPTIONS.add(1, &[KeyValue::new("topic", topic.to_string())]); - metrics.record(&gossipsub::Event::Subscribed { - peer_id, - topic: topic.clone(), - }); - - debug!( - target = "atoma-p2p", - event = "gossipsub_subscribed", - peer_id = %peer_id, - topic = %topic, - "Peer subscribed to topic" - ); - } - SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Gossipsub(gossipsub::Event::Unsubscribed { - peer_id, - topic, - })) => { - // Record unsubscription metrics - TOTAL_GOSSIPSUB_SUBSCRIPTIONS.add(-1, &[KeyValue::new("topic", topic.to_string())]); - metrics.record(&gossipsub::Event::Unsubscribed { - peer_id, - topic: topic.clone(), - }); - - debug!( - target = "atoma-p2p", - event = "gossipsub_unsubscribed", - peer_id = %peer_id, - topic = %topic, - "Peer unsubscribed from topic" - ); - } - SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Mdns(mdns::Event::Discovered(discovered_peers))) => { - let peer_count = discovered_peers.len() as u64; - debug!( - target = "atoma-p2p", - event = "mdns_discovered", - peer_count = %peer_count, - "Mdns discovered peers" - ); - for (peer_id, multiaddr) in discovered_peers { - debug!( - target = "atoma-p2p", - event = "mdns_discovered_peer", - peer_id = %peer_id, - multiaddr = %multiaddr, - "Mdns discovered peer" - ); - self.swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr); - } - // Record discovery metrics - TOTAL_MDNS_DISCOVERIES.add(peer_count, &[KeyValue::new("peerId", peer_id.clone())]); - } - SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Mdns(mdns::Event::Expired(expired_peers))) => { - debug!( - target = "atoma-p2p", - event = "mdns_expired", - num_expired_peers = %expired_peers.len(), - "Mdns expired" - ); - for (peer_id, multiaddr) in expired_peers { - debug!( - target = "atoma-p2p", - event = "mdns_expired_peer", - peer_id = %peer_id, - multiaddr = %multiaddr, - "Mdns expired peer" - ); - self.swarm.behaviour_mut().kademlia.remove_address(&peer_id, &multiaddr); - } - } - SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Kademlia(kad::Event::RoutingUpdated { - peer, - is_new_peer, - addresses, - bucket_range, - old_peer, - })) => { - debug!( - target = "atoma-p2p", - event = "kademlia_routing_updated", - peer = %peer, - is_new_peer = %is_new_peer, - addresses = ?addresses, - bucket_range = ?bucket_range, - old_peer = ?old_peer, - "Kademlia routing updated" - ); - KAD_ROUTING_TABLE_SIZE.record(addresses.len() as u64, &[KeyValue::new("peerId", peer.to_base58())]); - metrics.record(&kad::Event::RoutingUpdated { - peer, - is_new_peer, - addresses, - bucket_range, - old_peer, - }); - } - SwarmEvent::ConnectionEstablished { - peer_id, - num_established, - established_in, - connection_id, - .. - } => { - debug!( - target = "atoma-p2p", - event = "peer_connection_established", - peer_id = %peer_id, - num_established = %num_established, - established_in = ?established_in, - connection_id = %connection_id, - "Peer connection established" - ); - } - SwarmEvent::ConnectionClosed { - peer_id, - connection_id, - num_established, - .. - } => { - debug!( - target = "atoma-p2p", - event = "peer_connection_closed", - peer_id = %peer_id, - connection_id = %connection_id, - num_established = %num_established, - "Peer connection closed" - ); - } - SwarmEvent::NewListenAddr { - listener_id, - address, - .. - } => { - debug!( - target = "atoma-p2p", - event = "new_listen_addr", - listener_id = %listener_id, - address = %address, - "New listen address" - ); - } - SwarmEvent::ExpiredListenAddr { - listener_id, - address, - } => { - debug!( - target = "atoma-p2p", - event = "expired_listen_addr", - listener_id = %listener_id, - address = %address, - "Expired listen address" - ); - } - SwarmEvent::Dialing { - peer_id, - connection_id, - .. - } => { - debug!( - target = "atoma-p2p", - event = "dialing", - peer_id = ?peer_id, - connection_id = %connection_id, - "Dialing peer" - ); - TOTAL_DIALS_ATTEMPTED.add(1, &[KeyValue::new("peerId", peer_id.unwrap().to_base58())]); - } - SwarmEvent::OutgoingConnectionError { - peer_id, - error, - .. - } => { - TOTAL_DIALS_FAILED.add(1, &[KeyValue::new("peerId", peer_id.unwrap().to_base58())]); - error!( - target = "atoma-p2p", - event = "outgoing_connection_error", - peer_id = ?peer_id, - error = %error, - "Outgoing connection error" - ); - } - SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Identify(identify_event)) => { - tracing::debug!( - target = "atoma-p2p", - event = "identify", - identify_event = ?identify_event, - "Identify event" - ); - metrics.record(&identify_event); - } - SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Kademlia(kad_event)) => { - tracing::debug!( - target = "atoma-p2p", - event = "kad", - kad_event = ?kad_event, - "Kad event" - ); - metrics.record(&kad_event); - } - swarm_event => { - tracing::debug!( - target = "atoma-p2p", - event = "swarm_event", - swarm_event = ?swarm_event, - "Swarm event" - ); - metrics.record(&swarm_event); - } - } + handle_p2p_event(event); } Some(usage_metrics) = self.usage_metrics_rx.recv() => { if let Err(e) = self.handle_new_usage_metrics_event(usage_metrics) { @@ -826,176 +591,6 @@ impl AtomaP2pNode { Ok(()) } - /// Handles incoming gossipsub messages in the P2P network. - /// - /// This method processes UsageMetrics messages by: - /// 1. Validating the message's signature and timestamp - /// 2. Verifying the node's ownership of its small ID - /// 3. Reporting the validation result to the gossipsub protocol - /// 4. Storing the node's public URL in the state manager (if this peer is a client) - /// - /// # Message Flow - /// - /// 1. Receives a message from the gossipsub network - /// 2. Skips processing if the message is from self - /// 3. Deserializes the message into a GossipMessage - /// 4. For UsageMetrics messages: - /// - Validates the message using `validate_usage_metrics_message` - /// - Reports validation result to gossipsub protocol - /// - If this peer is a client, stores the node's public URL in state manager - /// - /// # Arguments - /// - /// * `message_data` - Raw bytes of the received gossipsub message (CBOR encoded) - /// * `message_id` - Unique identifier for the gossipsub message - /// * `propagation_source` - The peer that forwarded this message - /// - /// # Returns - /// - /// Returns `Ok(())` if message processing succeeds, or an error if: - /// - Message deserialization fails - /// - Message validation fails - /// - Reporting validation result fails - /// - Storing public URL fails (for clients) - /// - /// # Errors - /// - /// This function can return the following errors: - /// * `UsageMetricsSerializeError` - If CBOR deserialization fails - /// * `StateManagerError` - If storing public URL fails (for clients) - /// - /// # Security Considerations - /// - /// - Messages from self are ignored to prevent message loops - /// - Messages are validated before processing - /// - Only clients store public URLs to prevent unnecessary data storage - /// - Uses CBOR for efficient binary serialization - /// - /// # Example - /// - /// ```rust,ignore - /// let message_data = // ... received from gossipsub ...; - /// let message_id = // ... message identifier ...; - /// let propagation_source = // ... peer ID ...; - /// - /// match node.handle_gossipsub_message(&message_data, &message_id, &propagation_source).await { - /// Ok(()) => println!("Message processed successfully"), - /// Err(e) => println!("Failed to process message: {}", e), - /// } - /// ``` - /// - /// # Message Validation - /// - /// Messages are validated by: - /// 1. Checking the URL format and timestamp freshness - /// 2. Verifying the cryptographic signature - /// 3. Confirming the node's ownership of its small ID - /// - /// Invalid messages are rejected and not propagated further in the network. - #[instrument(level = "debug", skip_all)] - pub async fn handle_gossipsub_message( - &mut self, - message_data: Bytes, - message_id: &gossipsub::MessageId, - propagation_source: &PeerId, - ) -> Result<(), AtomaP2pNodeError> { - debug!( - target = "atoma-p2p", - event = "gossipsub_message", - message_id = %message_id, - propagation_source = %propagation_source, - "Received gossipsub message" - ); - if propagation_source == self.swarm.local_peer_id() { - debug!( - target = "atoma-p2p", - event = "gossipsub_message_from_self", - "Gossipsub message from self" - ); - // Do not re-publish the node's own message, just return `Ok(()) - return Ok(()); - } - // Directly deserialize SignedNodeMessage using new method - let signed_node_message = SignedNodeMessage::deserialize_with_signature(&message_data)?; - let signature_len = signed_node_message.signature.len(); - debug!( - target = "atoma-p2p", - event = "gossipsub_message_data", - message_id = %message_id, - propagation_source = %propagation_source, - "Received gossipsub message data" - ); - let node_message = &signed_node_message.node_message; - let node_message_hash = blake3::hash(&message_data[signature_len..]); - let message_acceptance = match validate_signed_node_message( - node_message, - node_message_hash.as_bytes(), - &signed_node_message.signature, - &self.state_manager_sender, - ) - .await - { - Ok(()) => gossipsub::MessageAcceptance::Accept, - Err(e) => { - error!( - target = "atoma-p2p", - event = "gossipsub_message_validation_error", - error = %e, - "Failed to validate gossipsub message" - ); - // NOTE: We should reject the message if it fails to validate - // as it means the node is not being following the current protocol - if let AtomaP2pNodeError::UrlParseError(_) = e { - // We remove the peer from the gossipsub topic, because it is not a valid URL and therefore cannot be reached - // by clients for processing OpenAI api compatible AI requests, so these peers are not useful for the network - self.swarm - .behaviour_mut() - .gossipsub - .remove_explicit_peer(propagation_source); - } - gossipsub::MessageAcceptance::Reject - } - }; - // Report the message validation result to the gossipsub protocol - let is_in_mempool = self - .swarm - .behaviour_mut() - .gossipsub - .report_message_validation_result(message_id, propagation_source, message_acceptance); - if is_in_mempool { - debug!( - target = "atoma-p2p", - event = "gossipsub_message_in_mempool", - message_id = %message_id, - propagation_source = %propagation_source, - "Gossipsub message already in the mempool, no need to take further actions" - ); - return Ok(()); - } - // If the current peer is a client, we need to store the public URL in the state manager - if self.is_client { - let node_message = signed_node_message.node_message; - let event = AtomaP2pEvent::NodeMetricsRegistrationEvent { - public_url: node_message.node_metadata.node_public_url, - node_small_id: node_message.node_metadata.node_small_id, - timestamp: node_message.node_metadata.timestamp, - country: node_message.node_metadata.country, - node_metrics: node_message.node_metrics, - }; - self.state_manager_sender.send((event, None)).map_err(|e| { - error!( - target = "atoma-p2p", - event = "gossipsub_message_state_manager_error", - error = %e, - "Failed to send event to state manager" - ); - AtomaP2pNodeError::StateManagerError(e) - })?; - } - - Ok(()) - } - /// Handles the publishing of new usage metrics to the P2P network. /// /// This method performs the following operations: From 5c705700a3d97a6504c0b675483792adb7f5b8fa Mon Sep 17 00:00:00 2001 From: chad Date: Tue, 4 Mar 2025 14:08:18 -0500 Subject: [PATCH 4/6] refactor: create handlers for events --- atoma-p2p/src/handlers.rs | 80 ++++++++++++++++++++++++++------------- atoma-p2p/src/service.rs | 24 +++++------- 2 files changed, 63 insertions(+), 41 deletions(-) diff --git a/atoma-p2p/src/handlers.rs b/atoma-p2p/src/handlers.rs index 805e9799..ca74efe1 100644 --- a/atoma-p2p/src/handlers.rs +++ b/atoma-p2p/src/handlers.rs @@ -1,39 +1,61 @@ +use crate::errors::AtomaP2pNodeError; use crate::metrics::{ - KAD_ROUTING_TABLE_SIZE, PEERS_CONNECTED, TOTAL_CONNECTIONS, TOTAL_DIALS_ATTEMPTED, - TOTAL_DIALS_FAILED, TOTAL_FAILED_GOSSIPSUB_PUBLISHES, TOTAL_GOSSIPSUB_MESSAGES_FORWARDED, - TOTAL_GOSSIPSUB_PUBLISHES, TOTAL_GOSSIPSUB_SUBSCRIPTIONS, TOTAL_INCOMING_CONNECTIONS, - TOTAL_INVALID_GOSSIPSUB_MESSAGES_RECEIVED, TOTAL_MDNS_DISCOVERIES, TOTAL_OUTGOING_CONNECTIONS, + KAD_ROUTING_TABLE_SIZE, TOTAL_DIALS_ATTEMPTED, TOTAL_DIALS_FAILED, + TOTAL_GOSSIPSUB_MESSAGES_FORWARDED, TOTAL_GOSSIPSUB_SUBSCRIPTIONS, + TOTAL_INVALID_GOSSIPSUB_MESSAGES_RECEIVED, TOTAL_MDNS_DISCOVERIES, }; +use crate::service::{AtomaP2pBehaviour, AtomaP2pBehaviourEvent, StateManagerEvent}; +use crate::types::SerializeWithSignature; +use crate::types::SignedNodeMessage; +use crate::utils::validate_signed_node_message; +use crate::AtomaP2pEvent; +use bytes::Bytes; +use flume::Sender; +use libp2p::metrics::Metrics; +use libp2p::metrics::Recorder; use libp2p::{gossipsub, swarm::SwarmEvent}; +use libp2p::{kad, mdns, PeerId, Swarm}; use opentelemetry::KeyValue; +use tracing::{debug, error, instrument}; -pub fn handle_p2p_event(event: SwarmEvent) { +/// # Panics +/// +/// This function will panic if: +/// - `peer_id` is `None` when unwrapping in the `Dialing` and `OutgoingConnectionError` events +#[allow(clippy::too_many_lines)] +pub async fn handle_p2p_event( + swarm: &mut Swarm, + state_manager_sender: &Sender, + event: SwarmEvent, + metrics: &mut Metrics, + is_client: bool, +) { match event { SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Gossipsub(gossipsub::Event::Message { message_id, message, propagation_source, })) => { - match self - .handle_gossipsub_message(message.data.into(), &message_id, &propagation_source) - .await + match handle_gossipsub_message( + swarm, + state_manager_sender, + message.data.into(), + &message_id, + &propagation_source, + is_client, + ) + .await { Ok(()) => { TOTAL_GOSSIPSUB_MESSAGES_FORWARDED.add( 1, - &[KeyValue::new( - "peerId", - self.swarm.local_peer_id().to_base58(), - )], + &[KeyValue::new("peerId", swarm.local_peer_id().to_base58())], ); } Err(e) => { TOTAL_INVALID_GOSSIPSUB_MESSAGES_RECEIVED.add( 1, - &[KeyValue::new( - "peerId", - self.swarm.local_peer_id().to_base58(), - )], + &[KeyValue::new("peerId", swarm.local_peer_id().to_base58())], ); error!( target = "atoma-p2p", @@ -98,13 +120,16 @@ pub fn handle_p2p_event(event: SwarmEvent) { multiaddr = %multiaddr, "Mdns discovered peer" ); - self.swarm + swarm .behaviour_mut() .kademlia .add_address(&peer_id, multiaddr); } // Record discovery metrics - TOTAL_MDNS_DISCOVERIES.add(peer_count, &[KeyValue::new("peerId", peer_id.clone())]); + TOTAL_MDNS_DISCOVERIES.add( + peer_count, + &[KeyValue::new("peerId", swarm.local_peer_id().to_base58())], + ); } SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Mdns(mdns::Event::Expired( expired_peers, @@ -123,7 +148,7 @@ pub fn handle_p2p_event(event: SwarmEvent) { multiaddr = %multiaddr, "Mdns expired peer" ); - self.swarm + swarm .behaviour_mut() .kademlia .remove_address(&peer_id, &multiaddr); @@ -337,10 +362,12 @@ pub fn handle_p2p_event(event: SwarmEvent) { /// Invalid messages are rejected and not propagated further in the network. #[instrument(level = "debug", skip_all)] pub async fn handle_gossipsub_message( - &mut self, + swarm: &mut Swarm, + state_manager_sender: &Sender, message_data: Bytes, message_id: &gossipsub::MessageId, propagation_source: &PeerId, + is_client: bool, ) -> Result<(), AtomaP2pNodeError> { debug!( target = "atoma-p2p", @@ -349,7 +376,7 @@ pub async fn handle_gossipsub_message( propagation_source = %propagation_source, "Received gossipsub message" ); - if propagation_source == self.swarm.local_peer_id() { + if propagation_source == swarm.local_peer_id() { debug!( target = "atoma-p2p", event = "gossipsub_message_from_self", @@ -374,7 +401,7 @@ pub async fn handle_gossipsub_message( node_message, node_message_hash.as_bytes(), &signed_node_message.signature, - &self.state_manager_sender, + state_manager_sender, ) .await { @@ -391,7 +418,7 @@ pub async fn handle_gossipsub_message( if let AtomaP2pNodeError::UrlParseError(_) = e { // We remove the peer from the gossipsub topic, because it is not a valid URL and therefore cannot be reached // by clients for processing OpenAI api compatible AI requests, so these peers are not useful for the network - self.swarm + swarm .behaviour_mut() .gossipsub .remove_explicit_peer(propagation_source); @@ -400,8 +427,7 @@ pub async fn handle_gossipsub_message( } }; // Report the message validation result to the gossipsub protocol - let is_in_mempool = self - .swarm + let is_in_mempool = swarm .behaviour_mut() .gossipsub .report_message_validation_result(message_id, propagation_source, message_acceptance); @@ -416,7 +442,7 @@ pub async fn handle_gossipsub_message( return Ok(()); } // If the current peer is a client, we need to store the public URL in the state manager - if self.is_client { + if is_client { let node_message = signed_node_message.node_message; let event = AtomaP2pEvent::NodeMetricsRegistrationEvent { public_url: node_message.node_metadata.node_public_url, @@ -425,7 +451,7 @@ pub async fn handle_gossipsub_message( country: node_message.node_metadata.country, node_metrics: node_message.node_metrics, }; - self.state_manager_sender.send((event, None)).map_err(|e| { + state_manager_sender.send((event, None)).map_err(|e| { error!( target = "atoma-p2p", event = "gossipsub_message_state_manager_error", diff --git a/atoma-p2p/src/service.rs b/atoma-p2p/src/service.rs index 137bce02..4a2d3bed 100644 --- a/atoma-p2p/src/service.rs +++ b/atoma-p2p/src/service.rs @@ -3,17 +3,13 @@ use crate::{ errors::AtomaP2pNodeError, handlers::handle_p2p_event, metrics::{ - NetworkMetrics, KAD_ROUTING_TABLE_SIZE, PEERS_CONNECTED, TOTAL_CONNECTIONS, - TOTAL_DIALS_ATTEMPTED, TOTAL_DIALS_FAILED, TOTAL_FAILED_GOSSIPSUB_PUBLISHES, - TOTAL_GOSSIPSUB_MESSAGES_FORWARDED, TOTAL_GOSSIPSUB_PUBLISHES, - TOTAL_GOSSIPSUB_SUBSCRIPTIONS, TOTAL_INCOMING_CONNECTIONS, - TOTAL_INVALID_GOSSIPSUB_MESSAGES_RECEIVED, TOTAL_MDNS_DISCOVERIES, - TOTAL_OUTGOING_CONNECTIONS, + NetworkMetrics, PEERS_CONNECTED, TOTAL_CONNECTIONS, TOTAL_FAILED_GOSSIPSUB_PUBLISHES, + TOTAL_GOSSIPSUB_PUBLISHES, TOTAL_INCOMING_CONNECTIONS, TOTAL_OUTGOING_CONNECTIONS, }, stack_leader::{StackLeaderCodec, StackLeaderProtocol}, timer::usage_metrics_timer_task, types::{AtomaP2pEvent, NodeMessage, SerializeWithSignature, SignedNodeMessage}, - utils::{extract_gossipsub_metrics, validate_signed_node_message}, + utils::extract_gossipsub_metrics, }; use bytes::{BufMut, Bytes, BytesMut}; use flume::Sender; @@ -22,11 +18,11 @@ use libp2p::{ gossipsub::{self}, identify, identity, kad, mdns, noise, request_response::{self, ProtocolSupport}, - swarm::{NetworkBehaviour, SwarmEvent}, + swarm::NetworkBehaviour, tcp, yamux, PeerId, StreamProtocol, Swarm, SwarmBuilder, }; use libp2p::{ - metrics::{Metrics, Recorder, Registry}, + metrics::{Metrics, Registry}, Multiaddr, }; use opentelemetry::KeyValue; @@ -64,11 +60,11 @@ pub type StateManagerEvent = (AtomaP2pEvent, Option>); /// This struct implements the `NetworkBehaviour` trait and coordinates three main networking components /// for peer discovery, message broadcasting, and distributed routing. #[derive(NetworkBehaviour)] -struct AtomaP2pBehaviour { +pub struct AtomaP2pBehaviour { /// Handles publish-subscribe messaging across the P2P network. /// Used for broadcasting node addresses and other network messages using a gossip protocol /// that ensures efficient message propagation. - gossipsub: gossipsub::Behaviour, + pub gossipsub: gossipsub::Behaviour, /// Provides a way to identify the node and its capabilities. /// Used to discover nodes in the network and to share information about the node, @@ -78,7 +74,7 @@ struct AtomaP2pBehaviour { /// Provides distributed hash table (DHT) functionality for peer discovery and routing. /// Helps maintain network connectivity in larger, distributed deployments by implementing /// the Kademlia protocol with a memory-based storage backend. - kademlia: kad::Behaviour, + pub kademlia: kad::Behaviour, /// Enables automatic peer discovery on local networks using multicast DNS. /// Particularly useful for development and local testing environments where nodes @@ -522,7 +518,7 @@ impl AtomaP2pNode { ) -> Result<(), AtomaP2pNodeError> { // Create a metrics update interval let mut metrics_interval = tokio::time::interval(METRICS_UPDATE_INTERVAL); - let metrics = Metrics::new(&mut self.metrics_registry); + let mut metrics = Metrics::new(&mut self.metrics_registry); let peer_id = self.swarm.local_peer_id().to_base58(); loop { @@ -548,7 +544,7 @@ impl AtomaP2pNode { } event = self.swarm.select_next_some() => { - handle_p2p_event(event); + handle_p2p_event(&mut self.swarm, &self.state_manager_sender, event, &mut metrics, self.is_client).await; } Some(usage_metrics) = self.usage_metrics_rx.recv() => { if let Err(e) = self.handle_new_usage_metrics_event(usage_metrics) { From 2a75015b32e705bfb683c76d1dea73f567e1aaac Mon Sep 17 00:00:00 2001 From: chad Date: Thu, 6 Mar 2025 23:54:39 -0500 Subject: [PATCH 5/6] feat: add stack leader / request response --- .vscode/settings.json | 41 +++---- Cargo.lock | 13 +++ atoma-bin/atoma_node.rs | 4 +- atoma-p2p-tester/src/main.rs | 5 +- atoma-p2p/Cargo.toml | 3 + atoma-p2p/src/config.rs | 3 + atoma-p2p/src/errors.rs | 10 ++ atoma-p2p/src/handlers.rs | 50 ++++++++- atoma-p2p/src/lib.rs | 2 +- atoma-p2p/src/service.rs | 43 ++++++-- atoma-p2p/src/stack_leader.rs | 141 ------------------------ atoma-p2p/src/stack_request_response.rs | 81 ++++++++++++++ 12 files changed, 218 insertions(+), 178 deletions(-) delete mode 100644 atoma-p2p/src/stack_leader.rs create mode 100644 atoma-p2p/src/stack_request_response.rs diff --git a/.vscode/settings.json b/.vscode/settings.json index a4653015..eede8944 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,22 +1,23 @@ { - "rust-analyzer.check.command": "clippy", - "rust-analyzer.check.extraArgs": [ - "--", - "-D", - "warnings", - "-W", - "clippy::pedantic", - "-W", - "clippy::nursery", - "-W", - "clippy::style", - "-W", - "clippy::complexity", - "-W", - "clippy::perf", - "-W", - "clippy::suspicious", - "-W", - "clippy::correctness" - ] + "rust-analyzer.check.command": "clippy", + "rust-analyzer.check.extraArgs": [ + "--", + "-D", + "warnings", + "-W", + "clippy::pedantic", + "-W", + "clippy::nursery", + "-W", + "clippy::style", + "-W", + "clippy::complexity", + "-W", + "clippy::perf", + "-W", + "clippy::suspicious", + "-W", + "clippy::correctness" + ], + "idf.pythonInstallPath": "/usr/local/bin/python3" } diff --git a/Cargo.lock b/Cargo.lock index f4c22a05..6ebb8b6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -901,6 +901,7 @@ dependencies = [ name = "atoma-p2p" version = "0.1.0" dependencies = [ + "anyhow", "async-trait", "blake3", "bytes", @@ -918,6 +919,7 @@ dependencies = [ "reqwest", "serde", "serde_json", + "sqlx", "sui-keys", "sui-sdk", "sysinfo 0.33.1", @@ -1658,6 +1660,15 @@ dependencies = [ "toml 0.5.11", ] +[[package]] +name = "cbor4ii" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "472931dd4dfcc785075b09be910147f9c6258883fc4591d0dac6116392b2daa6" +dependencies = [ + "serde", +] + [[package]] name = "cc" version = "1.2.15" @@ -5011,12 +5022,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "548fe44a80ff275d400f1b26b090d441d83ef73efabbeb6415f4ce37e5aed865" dependencies = [ "async-trait", + "cbor4ii", "futures", "futures-bounded", "libp2p-core", "libp2p-identity", "libp2p-swarm", "rand 0.8.5", + "serde", "smallvec", "tracing", ] diff --git a/atoma-bin/atoma_node.rs b/atoma-bin/atoma_node.rs index e32ecf31..057bec2f 100644 --- a/atoma-bin/atoma_node.rs +++ b/atoma-bin/atoma_node.rs @@ -196,7 +196,9 @@ async fn main() -> Result<()> { let p2p_node_service_handle = spawn_with_shutdown( async move { let p2p_node = - AtomaP2pNode::start(config.p2p, Arc::new(keystore), p2p_event_sender, false)?; + AtomaP2pNode::start(config.p2p, Arc::new(keystore), p2p_event_sender, false) + .await + .map_err(|e| anyhow::anyhow!("Failed to start P2P node: {}", e))?; let pinned_future = Box::pin(p2p_node.run(p2p_node_service_shutdown_receiver)); pinned_future.await }, diff --git a/atoma-p2p-tester/src/main.rs b/atoma-p2p-tester/src/main.rs index 70f4ca11..f02c5e36 100644 --- a/atoma-p2p-tester/src/main.rs +++ b/atoma-p2p-tester/src/main.rs @@ -62,10 +62,11 @@ async fn main() -> Result<(), Box> { // Create and start the P2P node let keystore: FileBasedKeystore = FileBasedKeystore::new(&PathBuf::from(&config.sui.sui_keystore_path())) - .context("Failed to create keystore")?; + .with_context(|| "Failed to create keystore")?; let node = AtomaP2pNode::start(config.p2p, Arc::new(keystore), atoma_p2p_sender, false) - .context("Failed to start P2P node")?; + .await + .with_context(|| "Failed to start P2P node")?; let atoma_p2p_node_handle = spawn_with_shutdown(node.run(shutdown_receiver.clone()), shutdown_sender.clone()); diff --git a/atoma-p2p/Cargo.toml b/atoma-p2p/Cargo.toml index f0706e7a..e5873dfc 100644 --- a/atoma-p2p/Cargo.toml +++ b/atoma-p2p/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true license.workspace = true [dependencies] +anyhow = { workspace = true } async-trait = { workspace = true } blake3 = { workspace = true } bytes = { workspace = true } @@ -27,6 +28,7 @@ libp2p = { workspace = true, features = [ "noise", "metrics", "rsa", + "cbor", ] } fastcrypto = { workspace = true } flume = { workspace = true } @@ -39,6 +41,7 @@ sui-keys = { workspace = true } sui-sdk = { workspace = true } serde_json = { workspace = true } sysinfo = { workspace = true } +sqlx = { workspace = true, features = ["runtime-tokio", "postgres"] } thiserror = { workspace = true } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true } diff --git a/atoma-p2p/src/config.rs b/atoma-p2p/src/config.rs index 56db7ecc..3e3cd98e 100644 --- a/atoma-p2p/src/config.rs +++ b/atoma-p2p/src/config.rs @@ -47,6 +47,9 @@ pub struct AtomaP2pNodeConfig { /// of the form (`serving_engine`, `metrics_endpoint`) /// (e.g. `"meta-llama/Llama-3.2-3B-Instruct" => ("vllm", "http://chat-completions:8000/metrics")`) pub metrics_endpoints: HashMap, + + /// Database connection URL for `PostgreSQL` + pub database_url: String, } impl AtomaP2pNodeConfig { diff --git a/atoma-p2p/src/errors.rs b/atoma-p2p/src/errors.rs index 2f986502..66595b32 100644 --- a/atoma-p2p/src/errors.rs +++ b/atoma-p2p/src/errors.rs @@ -62,4 +62,14 @@ pub enum AtomaP2pNodeError { PublishError(String), #[error("DNS resolver error: `{0}`")] DnsError(#[from] std::io::Error), + #[error("Failed to connect to database: `{0}`")] + DatabaseConnectionError(#[from] sqlx::Error), + #[error("External error: `{0}`")] + External(String), +} + +impl From for AtomaP2pNodeError { + fn from(err: anyhow::Error) -> Self { + Self::External(err.to_string()) + } } diff --git a/atoma-p2p/src/handlers.rs b/atoma-p2p/src/handlers.rs index ad76c236..cb9d0eec 100644 --- a/atoma-p2p/src/handlers.rs +++ b/atoma-p2p/src/handlers.rs @@ -5,6 +5,7 @@ use crate::metrics::{ TOTAL_INVALID_GOSSIPSUB_MESSAGES_RECEIVED, TOTAL_MDNS_DISCOVERIES, }; use crate::service::{AtomaP2pBehaviour, AtomaP2pBehaviourEvent, StateManagerEvent}; +use crate::stack_request_response::StackLeader; use crate::types::SerializeWithSignature; use crate::types::SignedNodeMessage; use crate::utils::validate_signed_node_message; @@ -14,9 +15,9 @@ use flume::Sender; use libp2p::metrics::Metrics; use libp2p::metrics::Recorder; use libp2p::{gossipsub, swarm::SwarmEvent}; -use libp2p::{kad, mdns, PeerId, Swarm}; +use libp2p::{kad, mdns, request_response, PeerId, Swarm}; use opentelemetry::KeyValue; -use tracing::{debug, error, instrument}; +use tracing::{debug, error, instrument, trace}; /// # Panics /// @@ -29,6 +30,7 @@ pub async fn handle_p2p_event( event: SwarmEvent, metrics: &mut Metrics, is_client: bool, + stack_leader: &StackLeader, ) { match event { SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::Gossipsub(gossipsub::Event::Message { @@ -286,8 +288,50 @@ pub async fn handle_p2p_event( ); metrics.record(&kad_event); } + SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::StackLeaderRequestResponse( + request_response::Event::Message { message, .. }, + )) => match message { + request_response::Message::Request { + request, channel, .. + } => { + trace!( + target = "atoma-p2p", + event = "stack_leader_request", + "Stack leader request" + ); + let stack_leader_response = stack_leader.can_proceed(&request).await; + if let Err(e) = swarm + .behaviour_mut() + .stack_leader_request_response + .send_response(channel, stack_leader_response) + { + error!( + target = "atoma-p2p", + event = "stack_leader_response_error", + error = ?e, + "Failed to send stack leader response" + ); + } + } + request_response::Message::Response { .. } => { + trace!( + target = "atoma-p2p", + event = "stack_leader_response", + "Stack leader response" + ); + } + }, + SwarmEvent::Behaviour(AtomaP2pBehaviourEvent::StackLeaderRequestResponse( + request_response::Event::ResponseSent { .. }, + )) => { + trace!( + target = "atoma-p2p", + event = "stack_leader_response_sent", + "Stack leader response sent" + ); + } swarm_event => { - tracing::debug!( + debug!( target = "atoma-p2p", event = "swarm_event", swarm_event = ?swarm_event, diff --git a/atoma-p2p/src/lib.rs b/atoma-p2p/src/lib.rs index 0977cf87..f3656033 100644 --- a/atoma-p2p/src/lib.rs +++ b/atoma-p2p/src/lib.rs @@ -5,7 +5,7 @@ pub mod errors; pub mod handlers; pub mod metrics; pub mod service; -pub mod stack_leader; +pub mod stack_request_response; pub mod timer; pub mod types; pub mod utils; diff --git a/atoma-p2p/src/service.rs b/atoma-p2p/src/service.rs index bd7a28e0..24f4efa7 100644 --- a/atoma-p2p/src/service.rs +++ b/atoma-p2p/src/service.rs @@ -6,7 +6,7 @@ use crate::{ NetworkMetrics, PEERS_CONNECTED, TOTAL_CONNECTIONS, TOTAL_FAILED_GOSSIPSUB_PUBLISHES, TOTAL_GOSSIPSUB_PUBLISHES, TOTAL_INCOMING_CONNECTIONS, TOTAL_OUTGOING_CONNECTIONS, }, - stack_leader::{StackLeaderCodec, StackLeaderProtocol}, + stack_request_response::{NodeComputeRequest, StackLeader, StackLeaderResponse}, timer::usage_metrics_timer_task, types::{AtomaP2pEvent, NodeMessage, SerializeWithSignature, SignedNodeMessage}, utils::extract_gossipsub_metrics, @@ -26,6 +26,7 @@ use libp2p::{ Multiaddr, }; use opentelemetry::KeyValue; +use sqlx::PgPool; use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::Arc; use std::time::Duration; @@ -45,6 +46,9 @@ const METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(15); /// The protocol name for the Kademlia DHT const IPFS_PROTO_NAME: StreamProtocol = StreamProtocol::new("/ipfs/kad/1.0.0"); +/// The protocol name for the Stack Leader +const STACK_LEADER_PROTO_NAME: StreamProtocol = StreamProtocol::new("/atoma/stack-leader/0.0.1"); + // Well connected nodes to bootstrap the network (see https://docs.ipfs.tech/concepts/public-utilities/#amino-dht-bootstrappers) const BOOTSTRAP_NODES: [&str; 4] = [ "QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", @@ -83,7 +87,8 @@ pub struct AtomaP2pBehaviour { /// Provides a way to request-response messages across the P2P network. /// Used for requesting compute units from the stack leader. - stack_leader_request_response: request_response::Behaviour, + pub stack_leader_request_response: + request_response::cbor::Behaviour, } /// A P2P node implementation for the Atoma network that handles peer discovery, @@ -122,6 +127,9 @@ pub struct AtomaP2pNode { /// Add registry field metrics_registry: Registry, + + /// Add stack leader + stack_leader: StackLeader, } impl AtomaP2pNode { @@ -199,7 +207,7 @@ impl AtomaP2pNode { /// - Peer connections are authenticated /// - The node validates all incoming messages #[instrument(level = "debug", skip_all)] - pub fn start( + pub async fn start( config: AtomaP2pNodeConfig, keystore: Arc, state_manager_sender: Sender, @@ -280,8 +288,8 @@ impl AtomaP2pNode { key.public(), )); - let stack_leader_request_response = request_response::Behaviour::new( - vec![(StackLeaderProtocol::default(), ProtocolSupport::Full)], + let stack_leader_request_response = request_response::cbor::Behaviour::new( + [(STACK_LEADER_PROTO_NAME, ProtocolSupport::Full)], request_response::Config::default(), ); @@ -397,10 +405,10 @@ impl AtomaP2pNode { for peer_id in BOOTSTRAP_NODES { match peer_id.parse::() { Ok(peer_id) => { - swarm - .behaviour_mut() - .kademlia - .add_address(&peer_id, "/dnsaddr/bootstrap.libp2p.io".parse()?); + swarm.behaviour_mut().kademlia.add_address( + &peer_id, + "/213.130.147.75/config.port_listening_port".parse()?, + ); debug!( target = "atoma-p2p", event = "dialed_bootstrap_node", @@ -420,6 +428,20 @@ impl AtomaP2pNode { } } + // Initialize the StackLeader with database connection + let stack_leader = { + let db_pool = PgPool::connect(&config.database_url).await.map_err(|e| { + error!( + target = "atoma-p2p", + event = "database_connection_error", + error = %e, + "Failed to connect to database" + ); + AtomaP2pNodeError::DatabaseConnectionError(e) + })?; + StackLeader::new(db_pool) + }; + Ok(Self { keystore, swarm, @@ -429,6 +451,7 @@ impl AtomaP2pNode { is_client, network_metrics, metrics_registry, + stack_leader, }) } @@ -544,7 +567,7 @@ impl AtomaP2pNode { } event = self.swarm.select_next_some() => { - handle_p2p_event(&mut self.swarm, &self.state_manager_sender, event, &mut metrics, self.is_client).await; + handle_p2p_event(&mut self.swarm, &self.state_manager_sender, event, &mut metrics, self.is_client, &self.stack_leader).await; } Some(usage_metrics) = self.usage_metrics_rx.recv() => { if let Err(e) = self.handle_new_usage_metrics_event(usage_metrics) { diff --git a/atoma-p2p/src/stack_leader.rs b/atoma-p2p/src/stack_leader.rs deleted file mode 100644 index 09e0fe1a..00000000 --- a/atoma-p2p/src/stack_leader.rs +++ /dev/null @@ -1,141 +0,0 @@ -use async_trait::async_trait; -use bytes::{BufMut, BytesMut}; -use futures::io::{self, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use libp2p::request_response::Codec; -use serde::{Deserialize, Serialize}; -use sui_sdk::types::crypto::Signature; - -#[derive(Clone, Debug)] -pub struct StackLeaderProtocol { - _version: u8, -} - -impl Default for StackLeaderProtocol { - fn default() -> Self { - Self { _version: 1 } - } -} - -impl AsRef for StackLeaderProtocol { - fn as_ref(&self) -> &'static str { - "/atoma/stack-leader/0.0.1" - } -} - -#[derive(Clone, Default)] -pub struct StackLeaderCodec(); - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct StackSmallId(u64); - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct NodeSmallId(u64); - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct StackAvailableComputeUnitsRequest { - pub stack_small_id: StackSmallId, - pub node_small_id: NodeSmallId, - pub num_compute_units: u64, - pub timestamp: u64, - pub signature: Signature, -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct StackAvailableComputeUnitsResponse { - pub is_permissed: bool, - pub stack_small_id: StackSmallId, - pub timestamp: u64, - pub signature: Signature, - pub remaining_available_compute_units: u64, -} - -#[async_trait] -impl Codec for StackLeaderCodec { - type Request = StackAvailableComputeUnitsRequest; - type Response = StackAvailableComputeUnitsResponse; - type Protocol = StackLeaderProtocol; - - async fn read_request( - &mut self, - _protocol: &Self::Protocol, - io: &mut T, - ) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - // Read the length prefix (4 bytes) - let mut length_bytes = [0u8; 4]; - io.read_exact(&mut length_bytes).await?; - let length = u32::from_be_bytes(length_bytes) as usize; - - // Read the serialized data - let mut data = vec![0u8; length]; - io.read_exact(&mut data).await?; - - // Deserialize the request - ciborium::de::from_reader(&data[..]) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string())) - } - - async fn read_response( - &mut self, - _protocol: &Self::Protocol, - io: &mut T, - ) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - // Read the length prefix (4 bytes) - let mut length_bytes = [0u8; 4]; - io.read_exact(&mut length_bytes).await?; - let length = u32::from_be_bytes(length_bytes) as usize; - - // Read the serialized data - let mut data = vec![0u8; length]; - io.read_exact(&mut data).await?; - - // Deserialize the response - ciborium::de::from_reader(&data[..]) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string())) - } - - async fn write_request( - &mut self, - _protocol: &Self::Protocol, - io: &mut T, - request: Self::Request, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - let mut buffer = BytesMut::new(); - ciborium::into_writer(&request, (&mut buffer).writer()) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?; - let bytes = buffer.freeze(); - let length = u32::try_from(bytes.len()) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?; - io.write_all(&length.to_be_bytes()).await?; - io.write_all(&bytes).await?; - Ok(()) - } - - async fn write_response( - &mut self, - _protocol: &Self::Protocol, - io: &mut T, - response: Self::Response, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - let mut buffer = BytesMut::new(); - ciborium::into_writer(&response, (&mut buffer).writer()) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?; - let bytes = buffer.freeze(); - let length = u32::try_from(bytes.len()) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?; - io.write_all(&length.to_be_bytes()).await?; - io.write_all(&bytes).await?; - Ok(()) - } -} diff --git a/atoma-p2p/src/stack_request_response.rs b/atoma-p2p/src/stack_request_response.rs new file mode 100644 index 00000000..ad9bf729 --- /dev/null +++ b/atoma-p2p/src/stack_request_response.rs @@ -0,0 +1,81 @@ +use serde::{Deserialize, Serialize}; +use sqlx::PgPool; +use sqlx::Row; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct NodeComputeRequest { + pub node_id: u64, + pub stack_small_id: i64, + pub requested_num_compute_units: u64, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct StackLeaderResponse { + pub can_proceed: bool, +} + +pub struct StackLeader { + db: PgPool, +} + +impl StackLeader { + /// Creates a new `StackLeaderResponse` based on the compute unit request and available capacity + /// + /// # Arguments + /// * `request` - The `NodeComputeRequest` containing requested compute units + /// * `available_compute_units` - Total remaining compute units available in the stack + /// + /// # Returns + /// Returns a `StackLeaderResponse` indicating whether the request can proceed based on available capacity + #[must_use] + pub const fn new(db: PgPool) -> Self { + Self { db } + } + + /// Retrieves the available compute units for a specific stack from the database + /// + /// # Arguments + /// * `db` - Database connection or reference + /// * `stack_small_id` - The unique identifier for the stack + /// + /// # Returns + /// Result containing the available compute units or a database error + /// + /// # Errors + /// Returns a `sqlx::Error` if the database query fails or if the stack is not found + #[allow(clippy::cast_sign_loss)] + pub async fn get_stack_available_compute_units( + &self, + node_compute_request: &NodeComputeRequest, + ) -> Result { + // Query the database for the available compute units for this stack + let mut tx = self.db.begin().await?; + + let row = + sqlx::query("SELECT available_compute_units FROM stacks WHERE stack_small_id = $1") + .bind(node_compute_request.stack_small_id) + .fetch_one(&mut *tx) + .await?; + + let compute_units_i64: i64 = row.get("available_compute_units"); + let available_compute_units = compute_units_i64 as u64; + tx.commit().await?; + + Ok(available_compute_units) + } + + pub async fn can_proceed( + &self, + node_compute_request: &NodeComputeRequest, + ) -> StackLeaderResponse { + self.get_stack_available_compute_units(node_compute_request) + .await + .map_or_else( + |_| StackLeaderResponse { can_proceed: false }, + |available_compute_units| StackLeaderResponse { + can_proceed: available_compute_units + >= node_compute_request.requested_num_compute_units, + }, + ) + } +} From 84adb0cdbba830517d7e0ada45eef75386864f94 Mon Sep 17 00:00:00 2001 From: chad Date: Fri, 14 Mar 2025 13:17:13 +0530 Subject: [PATCH 6/6] build: update security deny.toml --- deny.toml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/deny.toml b/deny.toml index db381e15..70e594a5 100644 --- a/deny.toml +++ b/deny.toml @@ -26,6 +26,14 @@ ignore = [ "RUSTSEC-2024-0370", # ring is unmaintained but is a critical dependency through sui-sdk and libp2p "RUSTSEC-2025-0007", + # protobuf stack overflow vulnerability + "RUSTSEC-2024-0437", + # ring AES functions may panic when overflow checking is enabled + "RUSTSEC-2025-0009", + # Ring versions prior to 0.17 are unmaintained + "RUSTSEC-2025-0010", + # paste is no longer maintained + "RUSTSEC-2024-0436", ] # Threshold for security vulnerabilities, any vulnerability with a CVSS score # lower than the range specified will be ignored. Note that ignored advisories