Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions crates/hyperion-proto/src/server_to_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@ use rkyv::{Archive, Deserialize, Serialize, with::InlineAsBox};

use crate::ChunkPosition;

#[derive(Archive, Deserialize, Serialize, Clone, PartialEq)]
#[rkyv(derive(Debug))]
pub struct UpdatePlayerPosition {
pub stream: u64,
pub position: ChunkPosition,
}

#[derive(Archive, Deserialize, Serialize, Clone, PartialEq)]
#[rkyv(derive(Debug))]
pub struct UpdatePlayerPositions {
pub stream: Vec<u64>,
pub positions: Vec<ChunkPosition>,
pub updates: Vec<UpdatePlayerPosition>,
}

#[derive(Archive, Deserialize, Serialize, Clone, Copy, PartialEq)]
Expand Down
35 changes: 22 additions & 13 deletions crates/hyperion-proxy/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use bytes::Bytes;
use glam::I16Vec2;
use hyperion_proto::ArchivedServerToProxyMessage;
use rustc_hash::FxHashMap;
use tracing::{debug, error};
use tracing::{debug, error, warn};

use crate::egress::Egress;

Expand Down Expand Up @@ -74,18 +74,20 @@ impl BufferedEgress {
pub fn handle_packet(&mut self, message: &ArchivedServerToProxyMessage<'_>) {
match message {
ArchivedServerToProxyMessage::UpdatePlayerPositions(packet) => {
let mut players = Vec::with_capacity(packet.stream.len());

for (stream, position) in packet.stream.iter().zip(packet.positions.iter()) {
let Ok(stream) = rkyv::deserialize::<u64, !>(stream);
let Ok(position) = rkyv::deserialize::<_, !>(position);
let position = I16Vec2::from(position);

players.push(Player {
stream,
chunk_position: position,
});
}
let mut players = packet
.updates
.iter()
.map(|update| {
let Ok(stream) = rkyv::deserialize::<u64, !>(&update.stream);
let Ok(position) = rkyv::deserialize::<_, !>(&update.position);
let position = I16Vec2::from(position);

Player {
stream,
chunk_position: position,
}
})
.collect::<Vec<_>>();

self.player_bvh = Bvh::build(&mut players, ());
}
Expand Down Expand Up @@ -235,6 +237,13 @@ impl BufferedEgress {
return;
};

if channel.pending_connections.is_empty() {
warn!(
"server sent SubscribeChannelPackets but there are no pending subscribers"
);
return;
}

for &stream in &channel.pending_connections {
if stream == exclude {
continue;
Expand Down
7 changes: 4 additions & 3 deletions crates/hyperion/src/egress/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ fn send_subscribe_channel_packets(
) {
for event in events.read() {
let (entity, uuid, position, pitch, yaw, velocity, &entity_kind, connection_id) =
match query.get(event.0) {
match query.get(event.channel) {
Ok(data) => data,
Err(e) => {
error!("failed to send subscribe channel packets: query failed: {e}");
Expand All @@ -85,7 +85,7 @@ fn send_subscribe_channel_packets(
};

let mut packet_buf;
let minecraft_id = event.0.minecraft_id();
let minecraft_id = event.channel.minecraft_id();

match entity_kind {
EntityKind::Player => {
Expand Down Expand Up @@ -150,7 +150,8 @@ fn send_subscribe_channel_packets(
}

compose.io_buf().send_subscribe_channel_packets(
event.0.into(),
event.channel.into(),
event.requester,
&packet_buf,
connection_id.copied(),
);
Expand Down
27 changes: 14 additions & 13 deletions crates/hyperion/src/egress/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use crate::{
Blocks,
net::{
Compose, ConnectionId,
intermediate::{IntermediateServerToProxyMessage, UpdatePlayerPositions},
intermediate::{
IntermediateServerToProxyMessage, UpdatePlayerPosition, UpdatePlayerPositions,
},
},
simulation::Position,
};
Expand All @@ -27,20 +29,19 @@ fn send_chunk_positions(
compose: Res<'_, Compose>,
query: Query<'_, '_, (&ConnectionId, &Position)>,
) {
let count = query.iter().count();
let mut stream = Vec::with_capacity(count);
let mut positions = Vec::with_capacity(count);
let updates = query
.iter()
.map(|(&io, pos)| UpdatePlayerPosition {
stream: io,
position: hyperion_proto::ChunkPosition::from(pos.to_chunk()),
})
.collect::<Vec<_>>();

for (&io, pos) in query.iter() {
stream.push(io);
positions.push(hyperion_proto::ChunkPosition::from(pos.to_chunk()));
}

let packet = UpdatePlayerPositions { stream, positions };

let chunk_positions = IntermediateServerToProxyMessage::UpdatePlayerPositions(packet);
let message = IntermediateServerToProxyMessage::UpdatePlayerPositions(UpdatePlayerPositions {
updates: &updates,
});

compose.io_buf().add_proxy_message(&chunk_positions);
compose.io_buf().add_proxy_message(&message);
}

fn broadcast_chunk_deltas(
Expand Down
35 changes: 22 additions & 13 deletions crates/hyperion/src/net/intermediate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@ use hyperion_proto::{ChunkPosition, ServerToProxyMessage, UpdateChannelPosition}
use crate::net::{ConnectionId, ProxyId};

#[derive(Clone, PartialEq)]
pub struct UpdatePlayerPositions {
pub stream: Vec<ConnectionId>,
pub positions: Vec<ChunkPosition>,
pub struct UpdatePlayerPosition {
pub stream: ConnectionId,
pub position: ChunkPosition,
}

#[derive(Clone, PartialEq)]
pub struct UpdatePlayerPositions<'a> {
pub updates: &'a [UpdatePlayerPosition],
}

#[derive(Clone, Copy, PartialEq, Eq)]
Expand All @@ -28,6 +33,7 @@ pub struct RemoveChannel {
#[derive(Clone, Copy, PartialEq, Eq)]
pub struct SubscribeChannelPackets<'a> {
pub channel_id: u32,
pub receiver: ProxyId,
pub exclude: Option<ConnectionId>,

pub data: &'a [u8],
Expand Down Expand Up @@ -75,7 +81,7 @@ pub struct Shutdown {

#[derive(Clone, PartialEq)]
pub enum IntermediateServerToProxyMessage<'a> {
UpdatePlayerPositions(UpdatePlayerPositions),
UpdatePlayerPositions(UpdatePlayerPositions<'a>),
AddChannel(AddChannel<'a>),
UpdateChannelPositions(UpdateChannelPositions<'a>),
RemoveChannel(RemoveChannel),
Expand Down Expand Up @@ -116,13 +122,16 @@ impl IntermediateServerToProxyMessage<'_> {
Self::UpdatePlayerPositions(message) => {
Some(ServerToProxyMessage::UpdatePlayerPositions(
hyperion_proto::UpdatePlayerPositions {
stream: message
.stream
updates: message
.updates
.iter()
.copied()
.filter_map(filter_map_connection_id)
.filter_map(|update| {
Some(hyperion_proto::UpdatePlayerPosition {
stream: filter_map_connection_id(update.stream)?,
position: update.position,
})
})
.collect::<Vec<_>>(),
positions: message.positions.clone(),
},
))
}
Expand All @@ -144,8 +153,8 @@ impl IntermediateServerToProxyMessage<'_> {
channel_id: message.channel_id,
},
)),
Self::SubscribeChannelPackets(message) => {
Some(ServerToProxyMessage::SubscribeChannelPackets(
Self::SubscribeChannelPackets(message) => (message.receiver == proxy_id).then(|| {
ServerToProxyMessage::SubscribeChannelPackets(
hyperion_proto::SubscribeChannelPackets {
channel_id: message.channel_id,
exclude: message
Expand All @@ -154,8 +163,8 @@ impl IntermediateServerToProxyMessage<'_> {
.unwrap_or_default(),
data: message.data,
},
))
}
)
}),
Self::BroadcastGlobal(message) => Some(ServerToProxyMessage::BroadcastGlobal(
hyperion_proto::BroadcastGlobal {
exclude: message
Expand Down
2 changes: 2 additions & 0 deletions crates/hyperion/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,12 +687,14 @@ impl IoBuf {
pub(crate) fn send_subscribe_channel_packets(
&self,
channel: ChannelId,
receiver: ProxyId,
packets: &[u8],
exclude: Option<ConnectionId>,
) {
self.add_proxy_message(&IntermediateServerToProxyMessage::SubscribeChannelPackets(
intermediate::SubscribeChannelPackets {
channel_id: channel.inner(),
receiver,
exclude,
data: packets,
},
Expand Down
30 changes: 21 additions & 9 deletions crates/hyperion/src/net/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ async fn handle_proxy_messages(
match result {
ArchivedProxyToServerMessage::PlayerConnect(message) => {
let Ok(stream) = rkyv::deserialize::<u64, !>(&message.stream);
let connection_id = ConnectionId::new(stream, proxy_id);

let (sender, receiver) = packet_channel::channel(DEFAULT_FRAGMENT_SIZE);
if player_packet_sender.insert(stream, sender).is_some() {
Expand All @@ -105,14 +106,20 @@ async fn handle_proxy_messages(
receiver,
))
.id();
world
let already_exists = world
.get_resource_mut::<StreamLookup>()
.expect("StreamLookup resource should exist")
.insert(stream, player);
.insert(connection_id, player)
.is_some();

if already_exists {
error!("StreamLookup contains duplicate connection id");
}
});
}
ArchivedProxyToServerMessage::PlayerDisconnect(message) => {
let Ok(stream) = rkyv::deserialize::<u64, !>(&message.stream);
let connection_id = ConnectionId::new(stream, proxy_id);

if player_packet_sender.remove(&stream).is_none() {
error!(
Expand All @@ -121,17 +128,21 @@ async fn handle_proxy_messages(
}

command_channel.push(move |world: &mut World| {
let player = world
let Some(player) = world
.get_resource_mut::<StreamLookup>()
.expect("StreamLookup resource should exist")
.remove(&stream)
.expect("player from PlayerDisconnect must exist in the stream lookup map");
.remove(&connection_id)
else {
error!("player from PlayerDisconnect must exist in the stream lookup map");
return;
};

world.despawn(player);
});
}
ArchivedProxyToServerMessage::PlayerPackets(message) => {
let Ok(stream) = rkyv::deserialize::<u64, !>(&message.stream);
let connection_id = ConnectionId::new(stream, proxy_id);

let Some(sender) = player_packet_sender.get_mut(&stream) else {
error!(
Expand All @@ -158,9 +169,7 @@ async fn handle_proxy_messages(
let compose = world
.get_resource::<Compose>()
.expect("Compose resource should exist");
compose
.io_buf()
.shutdown(ConnectionId::new(stream, proxy_id));
compose.io_buf().shutdown(connection_id);
});
}
}
Expand All @@ -183,7 +192,10 @@ async fn handle_proxy_messages(
let channels = channels
.into_iter()
.filter_map(|channel_id| match Entity::from_id(channel_id, world) {
Ok(channel) => Some(RequestSubscribeChannelPackets(channel)),
Ok(channel) => Some(RequestSubscribeChannelPackets {
channel,
requester: proxy_id,
}),
Err(e) => {
error!(
"RequestSubscribeChannelPackets: channel id is invalid: {e}"
Expand Down
12 changes: 7 additions & 5 deletions crates/hyperion/src/simulation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use valence_text::IntoText;

use crate::{
Global,
net::{Compose, ConnectionId},
net::{Compose, ConnectionId, ProxyId},
simulation::{
command::CommandPlugin,
entity_kind::EntityKind,
Expand All @@ -49,8 +49,8 @@ pub mod util;

#[derive(Resource, Default, Debug, Deref, DerefMut)]
pub struct StreamLookup {
/// The UUID of all players
inner: FxHashMap<u64, Entity>,
/// The connection id of all players
inner: FxHashMap<ConnectionId, Entity>,
}

#[derive(Component, Default, Debug, Deref, DerefMut)]
Expand Down Expand Up @@ -723,9 +723,11 @@ impl Plugin for SimPlugin {
}

/// Event sent when the proxy requests packets to send to a player who has subscribed to a channel.
/// This event stores the channel entity.
#[derive(Event)]
pub struct RequestSubscribeChannelPackets(pub Entity);
pub struct RequestSubscribeChannelPackets {
pub channel: Entity,
pub requester: ProxyId,
}

#[derive(Component)]
pub struct Visible;
Expand Down
Loading