From dbc944d87ea255ab4f101c1b06f5a5dda19e20c8 Mon Sep 17 00:00:00 2001 From: Victor Boivie Date: Fri, 23 Jan 2026 20:33:07 +0100 Subject: [PATCH] Add consistency checks These were present in the C++ implementation, but not carried over to this implementation, as they could only be added when the implementation was complete. Now it's getting there, so they can be added. --- src/rx/data_tracker.rs | 4 +++ src/tx/outstanding_data.rs | 34 +++++++++++++++++++++++ src/tx/retransmission_queue.rs | 13 ++++++++- src/tx/send_queue.rs | 50 ++++++++++++++++++++++++++++++++++ src/tx/stream_scheduler.rs | 26 ++++++++++++++++++ 5 files changed, 126 insertions(+), 1 deletion(-) diff --git a/src/rx/data_tracker.rs b/src/rx/data_tracker.rs index ad1a2b6..20e726b 100644 --- a/src/rx/data_tracker.rs +++ b/src/rx/data_tracker.rs @@ -388,6 +388,10 @@ impl DataTracker { } pub(crate) fn restore_from_state(&mut self, state: &SocketHandoverState) { + debug_assert!(self.additional_tsn_blocks.is_empty()); + debug_assert!(self.duplicates.is_empty()); + debug_assert!(!self.seen_packet); + self.last_cumulative_acked_tsn = Tsn(state.rx.last_cumulative_acked_tsn); self.seen_packet = state.rx.seen_packet; } diff --git a/src/tx/outstanding_data.rs b/src/tx/outstanding_data.rs index e54551b..a877474 100644 --- a/src/tx/outstanding_data.rs +++ b/src/tx/outstanding_data.rs @@ -212,6 +212,34 @@ impl OutstandingData { } } + pub(crate) fn is_consistent(&self) -> bool { + let mut actual_unacked_payload_bytes = 0; + let mut actual_unacked_items = 0; + let mut actual_combined_to_be_retransmitted = BTreeSet::new(); + + let mut tsn = self.last_cumulative_tsn_ack; + for item in &self.outstanding_data { + tsn += 1; + if item.is_outstanding() { + actual_unacked_payload_bytes += + round_up_to_4!(self.data_chunk_header_size + item.data.payload.len()); + actual_unacked_items += 1; + } + + if item.should_be_retransmitted() { + actual_combined_to_be_retransmitted.insert(tsn); + } + } + + let mut combined_to_be_retransmitted = BTreeSet::new(); + combined_to_be_retransmitted.extend(self.to_be_retransmitted.iter()); + combined_to_be_retransmitted.extend(self.to_be_fast_retransmitted.iter()); + + actual_unacked_payload_bytes == self.unacked_bytes + && actual_unacked_items == self.unacked_items + && actual_combined_to_be_retransmitted == combined_to_be_retransmitted + } + // Note: This may discard unsent messages - call `get_unsent_messages_to_discard`. pub fn handle_sack( &mut self, @@ -240,6 +268,7 @@ impl OutstandingData { is_in_fast_recovery, &mut ack_info, ); + debug_assert!(self.is_consistent()); ack_info } @@ -440,6 +469,7 @@ impl OutstandingData { // subsequent Fast Retransmit. However, as they are marked for retransmission, they will // be retransmitted later on as soon as cwnd allows." self.to_be_retransmitted.append(&mut tsns); + debug_assert!(self.is_consistent()); chunks } @@ -489,6 +519,7 @@ impl OutstandingData { for tsn in tsns_to_expire { self.abandon_all_for(tsn); } + debug_assert!(self.is_consistent()); } pub fn is_empty(&self) -> bool { @@ -560,9 +591,11 @@ impl OutstandingData { item.data.mid ); self.abandon_all_for(tsn); + debug_assert!(self.is_consistent()); return None; } + debug_assert!(self.is_consistent()); Some(tsn) } @@ -635,6 +668,7 @@ impl OutstandingData { for tsn in &tsns_to_nack { self.nack_chunk(*tsn, true, false); } + debug_assert!(self.is_consistent()); } /// Creates a FORWARD-TSN chunk. diff --git a/src/tx/retransmission_queue.rs b/src/tx/retransmission_queue.rs index e40e8cc..637a349 100644 --- a/src/tx/retransmission_queue.rs +++ b/src/tx/retransmission_queue.rs @@ -139,6 +139,10 @@ impl RetransmissionQueue { } } + fn is_consistent(&self) -> bool { + self.outstanding_data.is_consistent() + } + fn start_t3_rtx_if_outstanding_data(&mut self, now: SocketTime) { // Note: Can't use `unacked_bytes` as that one doesn't count chunks to be retransmitted. if self.outstanding_data.is_empty() { @@ -396,6 +400,7 @@ impl RetransmissionQueue { let reset_error_counter = ack_info.bytes_acked > 0; self.start_t3_rtx_if_outstanding_data(now); + debug_assert!(self.is_consistent()); HandleSackResult::Valid { rtt, reset_error_counter } } @@ -453,6 +458,7 @@ impl RetransmissionQueue { self.unacked_bytes(), old_unacked_bytes ); + debug_assert!(self.is_consistent()); true } @@ -508,6 +514,8 @@ impl RetransmissionQueue { old_unacked_bytes ); + debug_assert!(self.is_consistent()); + to_be_sent } @@ -599,6 +607,7 @@ impl RetransmissionQueue { old_rwnd ); } + debug_assert!(self.is_consistent()); to_be_sent } @@ -682,7 +691,9 @@ impl RetransmissionQueue { } self.outstanding_data.expire_outstanding_chunks(now); - self.outstanding_data.should_send_forward_tsn() + let ret = self.outstanding_data.should_send_forward_tsn(); + debug_assert!(self.is_consistent()); + ret } pub fn create_forward_tsn(&mut self) -> Chunk { diff --git a/src/tx/send_queue.rs b/src/tx/send_queue.rs index 6c0dbc9..13ff475 100644 --- a/src/tx/send_queue.rs +++ b/src/tx/send_queue.rs @@ -32,6 +32,7 @@ use crate::types::Ssn; use crate::types::StreamKey; use std::cell::RefCell; use std::collections::HashMap; +use std::collections::HashSet; use std::collections::VecDeque; use std::ops::AddAssign; use std::ops::SubAssign; @@ -182,6 +183,11 @@ impl<'a> OutgoingStream<'a> { items: VecDeque::new(), } } + + fn is_consistent(&self) -> bool { + let bytes: usize = self.items.iter().map(|i| i.remaining_size).sum(); + bytes == self.buffered_amount.value + } } pub struct SendQueue { @@ -222,6 +228,42 @@ impl SendQueue { } } + fn is_consistent(&self) -> bool { + let mut total_buffered_amount = 0; + let mut expected_active_streams = HashSet::new(); + + for (stream_id, stream) in &self.streams { + if !stream.is_consistent() { + return false; + } + total_buffered_amount += stream.buffered_amount.value; + + let bytes_to_send = if stream.pause_state == PauseState::Paused + || stream.pause_state == PauseState::Resetting + { + 0 + } else { + stream.items.front().map_or(0, |i| i.remaining_size) + }; + + if bytes_to_send > 0 { + expected_active_streams.insert(*stream_id); + } + } + + if total_buffered_amount != self.buffered_amount.value { + return false; + } + + let actual_active_streams: HashSet = self.scheduler.active_streams().collect(); + + if expected_active_streams != actual_active_streams { + return false; + } + + true + } + pub fn enable_message_interleaving(&mut self, enable: bool) { if enable != self.enable_message_interleaving { self.enable_message_interleaving = enable; @@ -305,6 +347,7 @@ impl SendQueue { let priority = self.enable_message_interleaving.then_some(stream.priority); self.scheduler.set_bytes_remaining(stream_id, stream.items[0].remaining_size, priority); } + debug_assert!(self.is_consistent()); } pub fn produce(&mut self, now: SocketTime, max_size: usize) -> Option { @@ -375,6 +418,7 @@ impl SendQueue { item.remaining_offset += size; item.remaining_size -= size; } + debug_assert!(self.is_consistent()); Some(data) } @@ -404,6 +448,7 @@ impl SendQueue { priority, ); } + debug_assert!(self.is_consistent()); } pub fn prepare_reset_stream(&mut self, stream_id: StreamId) { @@ -454,6 +499,7 @@ impl SendQueue { } else { stream.pause_state = PauseState::Pending; } + debug_assert!(self.is_consistent()); } pub fn has_streams_ready_to_be_reset(&self) -> bool { @@ -484,6 +530,7 @@ impl SendQueue { } } }); + debug_assert!(self.is_consistent()); } pub fn rollback_reset_streams(&mut self) { @@ -496,6 +543,7 @@ impl SendQueue { } } }); + debug_assert!(self.is_consistent()); } pub fn reset(&mut self) { @@ -514,6 +562,7 @@ impl SendQueue { self.scheduler.set_bytes_remaining(*stream_id, item_size, priority); } }); + debug_assert!(self.is_consistent()); } pub fn buffered_amount(&self, stream_id: StreamId) -> usize { @@ -556,6 +605,7 @@ impl SendQueue { ) }); stream.priority = priority; + debug_assert!(self.is_consistent()); } pub fn get_priority(&self, stream_id: StreamId) -> u16 { diff --git a/src/tx/stream_scheduler.rs b/src/tx/stream_scheduler.rs index bee830e..989c6ea 100644 --- a/src/tx/stream_scheduler.rs +++ b/src/tx/stream_scheduler.rs @@ -105,6 +105,7 @@ impl StreamScheduler { if self.current_stream == Some(stream_id) { self.current_stream = None; } + debug_assert!(self.is_consistent()); return; } @@ -118,6 +119,7 @@ impl StreamScheduler { active_stream.bytes_remaining = bytes_remaining; active_stream.next_vt = calculate_vt(active_stream, min(active_stream.bytes_remaining, self.max_payload_bytes)); + debug_assert!(self.is_consistent()); } /// Given space for `max_size` bytes, returns which stream that data should be produced from, @@ -165,6 +167,30 @@ impl StreamScheduler { self.current_stream = None; } } + debug_assert!(self.is_consistent()); + } + + fn is_consistent(&self) -> bool { + for (stream_id, stream) in &self.active_streams { + if stream.bytes_remaining == 0 { + log::error!("Stream {} is active but has 0 bytes remaining", stream_id); + return false; + } + if stream.next_vt < stream.start_vt { + log::error!( + "Stream {} has next_vt {} < start_vt {}", + stream_id, + stream.next_vt, + stream.start_vt + ); + return false; + } + } + true + } + + pub fn active_streams(&self) -> impl Iterator + '_ { + self.active_streams.keys().cloned() } }