diff --git a/crates/hyperion-proto/src/server_to_proxy.rs b/crates/hyperion-proto/src/server_to_proxy.rs index a20ddcb2..df5a62de 100644 --- a/crates/hyperion-proto/src/server_to_proxy.rs +++ b/crates/hyperion-proto/src/server_to_proxy.rs @@ -2,11 +2,17 @@ use rkyv::{Archive, Deserialize, Serialize, with::InlineAsBox}; use crate::ChunkPosition; +#[derive(Archive, Deserialize, Serialize, Clone, PartialEq)] +#[rkyv(derive(Debug))] +pub struct UpdatePlayerPosition { + pub stream: u64, + pub position: ChunkPosition, +} + #[derive(Archive, Deserialize, Serialize, Clone, PartialEq)] #[rkyv(derive(Debug))] pub struct UpdatePlayerPositions { - pub stream: Vec, - pub positions: Vec, + pub updates: Vec, } #[derive(Archive, Deserialize, Serialize, Clone, Copy, PartialEq)] diff --git a/crates/hyperion-proxy/src/cache.rs b/crates/hyperion-proxy/src/cache.rs index 11fb8566..75d829fe 100644 --- a/crates/hyperion-proxy/src/cache.rs +++ b/crates/hyperion-proxy/src/cache.rs @@ -5,7 +5,7 @@ use bytes::Bytes; use glam::I16Vec2; use hyperion_proto::ArchivedServerToProxyMessage; use rustc_hash::FxHashMap; -use tracing::{debug, error}; +use tracing::{debug, error, warn}; use crate::egress::Egress; @@ -74,18 +74,20 @@ impl BufferedEgress { pub fn handle_packet(&mut self, message: &ArchivedServerToProxyMessage<'_>) { match message { ArchivedServerToProxyMessage::UpdatePlayerPositions(packet) => { - let mut players = Vec::with_capacity(packet.stream.len()); - - for (stream, position) in packet.stream.iter().zip(packet.positions.iter()) { - let Ok(stream) = rkyv::deserialize::(stream); - let Ok(position) = rkyv::deserialize::<_, !>(position); - let position = I16Vec2::from(position); - - players.push(Player { - stream, - chunk_position: position, - }); - } + let mut players = packet + .updates + .iter() + .map(|update| { + let Ok(stream) = rkyv::deserialize::(&update.stream); + let Ok(position) = rkyv::deserialize::<_, !>(&update.position); + let position = I16Vec2::from(position); + + Player { + stream, + chunk_position: position, + } + }) + .collect::>(); self.player_bvh = Bvh::build(&mut players, ()); } @@ -235,6 +237,13 @@ impl BufferedEgress { return; }; + if channel.pending_connections.is_empty() { + warn!( + "server sent SubscribeChannelPackets but there are no pending subscribers" + ); + return; + } + for &stream in &channel.pending_connections { if stream == exclude { continue; diff --git a/crates/hyperion/src/egress/channel.rs b/crates/hyperion/src/egress/channel.rs index db27d3d3..39a240e2 100644 --- a/crates/hyperion/src/egress/channel.rs +++ b/crates/hyperion/src/egress/channel.rs @@ -76,7 +76,7 @@ fn send_subscribe_channel_packets( ) { for event in events.read() { let (entity, uuid, position, pitch, yaw, velocity, &entity_kind, connection_id) = - match query.get(event.0) { + match query.get(event.channel) { Ok(data) => data, Err(e) => { error!("failed to send subscribe channel packets: query failed: {e}"); @@ -85,7 +85,7 @@ fn send_subscribe_channel_packets( }; let mut packet_buf; - let minecraft_id = event.0.minecraft_id(); + let minecraft_id = event.channel.minecraft_id(); match entity_kind { EntityKind::Player => { @@ -150,7 +150,8 @@ fn send_subscribe_channel_packets( } compose.io_buf().send_subscribe_channel_packets( - event.0.into(), + event.channel.into(), + event.requester, &packet_buf, connection_id.copied(), ); diff --git a/crates/hyperion/src/egress/mod.rs b/crates/hyperion/src/egress/mod.rs index 898d556f..4fd006c3 100644 --- a/crates/hyperion/src/egress/mod.rs +++ b/crates/hyperion/src/egress/mod.rs @@ -6,7 +6,9 @@ use crate::{ Blocks, net::{ Compose, ConnectionId, - intermediate::{IntermediateServerToProxyMessage, UpdatePlayerPositions}, + intermediate::{ + IntermediateServerToProxyMessage, UpdatePlayerPosition, UpdatePlayerPositions, + }, }, simulation::Position, }; @@ -27,20 +29,19 @@ fn send_chunk_positions( compose: Res<'_, Compose>, query: Query<'_, '_, (&ConnectionId, &Position)>, ) { - let count = query.iter().count(); - let mut stream = Vec::with_capacity(count); - let mut positions = Vec::with_capacity(count); + let updates = query + .iter() + .map(|(&io, pos)| UpdatePlayerPosition { + stream: io, + position: hyperion_proto::ChunkPosition::from(pos.to_chunk()), + }) + .collect::>(); - for (&io, pos) in query.iter() { - stream.push(io); - positions.push(hyperion_proto::ChunkPosition::from(pos.to_chunk())); - } - - let packet = UpdatePlayerPositions { stream, positions }; - - let chunk_positions = IntermediateServerToProxyMessage::UpdatePlayerPositions(packet); + let message = IntermediateServerToProxyMessage::UpdatePlayerPositions(UpdatePlayerPositions { + updates: &updates, + }); - compose.io_buf().add_proxy_message(&chunk_positions); + compose.io_buf().add_proxy_message(&message); } fn broadcast_chunk_deltas( diff --git a/crates/hyperion/src/net/intermediate.rs b/crates/hyperion/src/net/intermediate.rs index d1a21047..42d1ed47 100644 --- a/crates/hyperion/src/net/intermediate.rs +++ b/crates/hyperion/src/net/intermediate.rs @@ -3,9 +3,14 @@ use hyperion_proto::{ChunkPosition, ServerToProxyMessage, UpdateChannelPosition} use crate::net::{ConnectionId, ProxyId}; #[derive(Clone, PartialEq)] -pub struct UpdatePlayerPositions { - pub stream: Vec, - pub positions: Vec, +pub struct UpdatePlayerPosition { + pub stream: ConnectionId, + pub position: ChunkPosition, +} + +#[derive(Clone, PartialEq)] +pub struct UpdatePlayerPositions<'a> { + pub updates: &'a [UpdatePlayerPosition], } #[derive(Clone, Copy, PartialEq, Eq)] @@ -28,6 +33,7 @@ pub struct RemoveChannel { #[derive(Clone, Copy, PartialEq, Eq)] pub struct SubscribeChannelPackets<'a> { pub channel_id: u32, + pub receiver: ProxyId, pub exclude: Option, pub data: &'a [u8], @@ -75,7 +81,7 @@ pub struct Shutdown { #[derive(Clone, PartialEq)] pub enum IntermediateServerToProxyMessage<'a> { - UpdatePlayerPositions(UpdatePlayerPositions), + UpdatePlayerPositions(UpdatePlayerPositions<'a>), AddChannel(AddChannel<'a>), UpdateChannelPositions(UpdateChannelPositions<'a>), RemoveChannel(RemoveChannel), @@ -116,13 +122,16 @@ impl IntermediateServerToProxyMessage<'_> { Self::UpdatePlayerPositions(message) => { Some(ServerToProxyMessage::UpdatePlayerPositions( hyperion_proto::UpdatePlayerPositions { - stream: message - .stream + updates: message + .updates .iter() - .copied() - .filter_map(filter_map_connection_id) + .filter_map(|update| { + Some(hyperion_proto::UpdatePlayerPosition { + stream: filter_map_connection_id(update.stream)?, + position: update.position, + }) + }) .collect::>(), - positions: message.positions.clone(), }, )) } @@ -144,8 +153,8 @@ impl IntermediateServerToProxyMessage<'_> { channel_id: message.channel_id, }, )), - Self::SubscribeChannelPackets(message) => { - Some(ServerToProxyMessage::SubscribeChannelPackets( + Self::SubscribeChannelPackets(message) => (message.receiver == proxy_id).then(|| { + ServerToProxyMessage::SubscribeChannelPackets( hyperion_proto::SubscribeChannelPackets { channel_id: message.channel_id, exclude: message @@ -154,8 +163,8 @@ impl IntermediateServerToProxyMessage<'_> { .unwrap_or_default(), data: message.data, }, - )) - } + ) + }), Self::BroadcastGlobal(message) => Some(ServerToProxyMessage::BroadcastGlobal( hyperion_proto::BroadcastGlobal { exclude: message diff --git a/crates/hyperion/src/net/mod.rs b/crates/hyperion/src/net/mod.rs index 7742c503..9d355f2d 100644 --- a/crates/hyperion/src/net/mod.rs +++ b/crates/hyperion/src/net/mod.rs @@ -687,12 +687,14 @@ impl IoBuf { pub(crate) fn send_subscribe_channel_packets( &self, channel: ChannelId, + receiver: ProxyId, packets: &[u8], exclude: Option, ) { self.add_proxy_message(&IntermediateServerToProxyMessage::SubscribeChannelPackets( intermediate::SubscribeChannelPackets { channel_id: channel.inner(), + receiver, exclude, data: packets, }, diff --git a/crates/hyperion/src/net/proxy.rs b/crates/hyperion/src/net/proxy.rs index 18570532..059c136b 100644 --- a/crates/hyperion/src/net/proxy.rs +++ b/crates/hyperion/src/net/proxy.rs @@ -87,6 +87,7 @@ async fn handle_proxy_messages( match result { ArchivedProxyToServerMessage::PlayerConnect(message) => { let Ok(stream) = rkyv::deserialize::(&message.stream); + let connection_id = ConnectionId::new(stream, proxy_id); let (sender, receiver) = packet_channel::channel(DEFAULT_FRAGMENT_SIZE); if player_packet_sender.insert(stream, sender).is_some() { @@ -105,14 +106,20 @@ async fn handle_proxy_messages( receiver, )) .id(); - world + let already_exists = world .get_resource_mut::() .expect("StreamLookup resource should exist") - .insert(stream, player); + .insert(connection_id, player) + .is_some(); + + if already_exists { + error!("StreamLookup contains duplicate connection id"); + } }); } ArchivedProxyToServerMessage::PlayerDisconnect(message) => { let Ok(stream) = rkyv::deserialize::(&message.stream); + let connection_id = ConnectionId::new(stream, proxy_id); if player_packet_sender.remove(&stream).is_none() { error!( @@ -121,17 +128,21 @@ async fn handle_proxy_messages( } command_channel.push(move |world: &mut World| { - let player = world + let Some(player) = world .get_resource_mut::() .expect("StreamLookup resource should exist") - .remove(&stream) - .expect("player from PlayerDisconnect must exist in the stream lookup map"); + .remove(&connection_id) + else { + error!("player from PlayerDisconnect must exist in the stream lookup map"); + return; + }; world.despawn(player); }); } ArchivedProxyToServerMessage::PlayerPackets(message) => { let Ok(stream) = rkyv::deserialize::(&message.stream); + let connection_id = ConnectionId::new(stream, proxy_id); let Some(sender) = player_packet_sender.get_mut(&stream) else { error!( @@ -158,9 +169,7 @@ async fn handle_proxy_messages( let compose = world .get_resource::() .expect("Compose resource should exist"); - compose - .io_buf() - .shutdown(ConnectionId::new(stream, proxy_id)); + compose.io_buf().shutdown(connection_id); }); } } @@ -183,7 +192,10 @@ async fn handle_proxy_messages( let channels = channels .into_iter() .filter_map(|channel_id| match Entity::from_id(channel_id, world) { - Ok(channel) => Some(RequestSubscribeChannelPackets(channel)), + Ok(channel) => Some(RequestSubscribeChannelPackets { + channel, + requester: proxy_id, + }), Err(e) => { error!( "RequestSubscribeChannelPackets: channel id is invalid: {e}" diff --git a/crates/hyperion/src/simulation/mod.rs b/crates/hyperion/src/simulation/mod.rs index 387ef825..c0bd785e 100644 --- a/crates/hyperion/src/simulation/mod.rs +++ b/crates/hyperion/src/simulation/mod.rs @@ -23,7 +23,7 @@ use valence_text::IntoText; use crate::{ Global, - net::{Compose, ConnectionId}, + net::{Compose, ConnectionId, ProxyId}, simulation::{ command::CommandPlugin, entity_kind::EntityKind, @@ -49,8 +49,8 @@ pub mod util; #[derive(Resource, Default, Debug, Deref, DerefMut)] pub struct StreamLookup { - /// The UUID of all players - inner: FxHashMap, + /// The connection id of all players + inner: FxHashMap, } #[derive(Component, Default, Debug, Deref, DerefMut)] @@ -723,9 +723,11 @@ impl Plugin for SimPlugin { } /// Event sent when the proxy requests packets to send to a player who has subscribed to a channel. -/// This event stores the channel entity. #[derive(Event)] -pub struct RequestSubscribeChannelPackets(pub Entity); +pub struct RequestSubscribeChannelPackets { + pub channel: Entity, + pub requester: ProxyId, +} #[derive(Component)] pub struct Visible;