Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/rx/data_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,10 @@ impl DataTracker {
}

pub(crate) fn restore_from_state(&mut self, state: &SocketHandoverState) {
debug_assert!(self.additional_tsn_blocks.is_empty());
debug_assert!(self.duplicates.is_empty());
debug_assert!(!self.seen_packet);

self.last_cumulative_acked_tsn = Tsn(state.rx.last_cumulative_acked_tsn);
self.seen_packet = state.rx.seen_packet;
}
Expand Down
34 changes: 34 additions & 0 deletions src/tx/outstanding_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,34 @@ impl OutstandingData {
}
}

pub(crate) fn is_consistent(&self) -> bool {
let mut actual_unacked_payload_bytes = 0;
let mut actual_unacked_items = 0;
let mut actual_combined_to_be_retransmitted = BTreeSet::new();

let mut tsn = self.last_cumulative_tsn_ack;
for item in &self.outstanding_data {
tsn += 1;
if item.is_outstanding() {
actual_unacked_payload_bytes +=
round_up_to_4!(self.data_chunk_header_size + item.data.payload.len());
actual_unacked_items += 1;
}

if item.should_be_retransmitted() {
actual_combined_to_be_retransmitted.insert(tsn);
}
}

let mut combined_to_be_retransmitted = BTreeSet::new();
combined_to_be_retransmitted.extend(self.to_be_retransmitted.iter());
combined_to_be_retransmitted.extend(self.to_be_fast_retransmitted.iter());

actual_unacked_payload_bytes == self.unacked_bytes
&& actual_unacked_items == self.unacked_items
&& actual_combined_to_be_retransmitted == combined_to_be_retransmitted
}

// Note: This may discard unsent messages - call `get_unsent_messages_to_discard`.
pub fn handle_sack(
&mut self,
Expand Down Expand Up @@ -240,6 +268,7 @@ impl OutstandingData {
is_in_fast_recovery,
&mut ack_info,
);
debug_assert!(self.is_consistent());
ack_info
}

Expand Down Expand Up @@ -440,6 +469,7 @@ impl OutstandingData {
// subsequent Fast Retransmit. However, as they are marked for retransmission, they will
// be retransmitted later on as soon as cwnd allows."
self.to_be_retransmitted.append(&mut tsns);
debug_assert!(self.is_consistent());
chunks
}

Expand Down Expand Up @@ -489,6 +519,7 @@ impl OutstandingData {
for tsn in tsns_to_expire {
self.abandon_all_for(tsn);
}
debug_assert!(self.is_consistent());
}

pub fn is_empty(&self) -> bool {
Expand Down Expand Up @@ -560,9 +591,11 @@ impl OutstandingData {
item.data.mid
);
self.abandon_all_for(tsn);
debug_assert!(self.is_consistent());
return None;
}

debug_assert!(self.is_consistent());
Some(tsn)
}

Expand Down Expand Up @@ -635,6 +668,7 @@ impl OutstandingData {
for tsn in &tsns_to_nack {
self.nack_chunk(*tsn, true, false);
}
debug_assert!(self.is_consistent());
}

/// Creates a FORWARD-TSN chunk.
Expand Down
13 changes: 12 additions & 1 deletion src/tx/retransmission_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ impl RetransmissionQueue {
}
}

fn is_consistent(&self) -> bool {
self.outstanding_data.is_consistent()
}

fn start_t3_rtx_if_outstanding_data(&mut self, now: SocketTime) {
// Note: Can't use `unacked_bytes` as that one doesn't count chunks to be retransmitted.
if self.outstanding_data.is_empty() {
Expand Down Expand Up @@ -396,6 +400,7 @@ impl RetransmissionQueue {
let reset_error_counter = ack_info.bytes_acked > 0;

self.start_t3_rtx_if_outstanding_data(now);
debug_assert!(self.is_consistent());

HandleSackResult::Valid { rtt, reset_error_counter }
}
Expand Down Expand Up @@ -453,6 +458,7 @@ impl RetransmissionQueue {
self.unacked_bytes(),
old_unacked_bytes
);
debug_assert!(self.is_consistent());
true
}

Expand Down Expand Up @@ -508,6 +514,8 @@ impl RetransmissionQueue {
old_unacked_bytes
);

debug_assert!(self.is_consistent());

to_be_sent
}

Expand Down Expand Up @@ -599,6 +607,7 @@ impl RetransmissionQueue {
old_rwnd
);
}
debug_assert!(self.is_consistent());

to_be_sent
}
Expand Down Expand Up @@ -682,7 +691,9 @@ impl RetransmissionQueue {
}

self.outstanding_data.expire_outstanding_chunks(now);
self.outstanding_data.should_send_forward_tsn()
let ret = self.outstanding_data.should_send_forward_tsn();
debug_assert!(self.is_consistent());
ret
}

pub fn create_forward_tsn(&mut self) -> Chunk {
Expand Down
50 changes: 50 additions & 0 deletions src/tx/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::types::Ssn;
use crate::types::StreamKey;
use std::cell::RefCell;
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::ops::AddAssign;
use std::ops::SubAssign;
Expand Down Expand Up @@ -182,6 +183,11 @@ impl<'a> OutgoingStream<'a> {
items: VecDeque::new(),
}
}

fn is_consistent(&self) -> bool {
let bytes: usize = self.items.iter().map(|i| i.remaining_size).sum();
bytes == self.buffered_amount.value
}
}

pub struct SendQueue {
Expand Down Expand Up @@ -222,6 +228,42 @@ impl SendQueue {
}
}

fn is_consistent(&self) -> bool {
let mut total_buffered_amount = 0;
let mut expected_active_streams = HashSet::new();

for (stream_id, stream) in &self.streams {
if !stream.is_consistent() {
return false;
}
total_buffered_amount += stream.buffered_amount.value;

let bytes_to_send = if stream.pause_state == PauseState::Paused
|| stream.pause_state == PauseState::Resetting
{
0
} else {
stream.items.front().map_or(0, |i| i.remaining_size)
};

if bytes_to_send > 0 {
expected_active_streams.insert(*stream_id);
}
}

if total_buffered_amount != self.buffered_amount.value {
return false;
}

let actual_active_streams: HashSet<StreamId> = self.scheduler.active_streams().collect();

if expected_active_streams != actual_active_streams {
return false;
}

true
}

pub fn enable_message_interleaving(&mut self, enable: bool) {
if enable != self.enable_message_interleaving {
self.enable_message_interleaving = enable;
Expand Down Expand Up @@ -305,6 +347,7 @@ impl SendQueue {
let priority = self.enable_message_interleaving.then_some(stream.priority);
self.scheduler.set_bytes_remaining(stream_id, stream.items[0].remaining_size, priority);
}
debug_assert!(self.is_consistent());
}

pub fn produce(&mut self, now: SocketTime, max_size: usize) -> Option<DataToSend> {
Expand Down Expand Up @@ -375,6 +418,7 @@ impl SendQueue {
item.remaining_offset += size;
item.remaining_size -= size;
}
debug_assert!(self.is_consistent());
Some(data)
}

Expand Down Expand Up @@ -404,6 +448,7 @@ impl SendQueue {
priority,
);
}
debug_assert!(self.is_consistent());
}

pub fn prepare_reset_stream(&mut self, stream_id: StreamId) {
Expand Down Expand Up @@ -454,6 +499,7 @@ impl SendQueue {
} else {
stream.pause_state = PauseState::Pending;
}
debug_assert!(self.is_consistent());
}

pub fn has_streams_ready_to_be_reset(&self) -> bool {
Expand Down Expand Up @@ -484,6 +530,7 @@ impl SendQueue {
}
}
});
debug_assert!(self.is_consistent());
}

pub fn rollback_reset_streams(&mut self) {
Expand All @@ -496,6 +543,7 @@ impl SendQueue {
}
}
});
debug_assert!(self.is_consistent());
}

pub fn reset(&mut self) {
Expand All @@ -514,6 +562,7 @@ impl SendQueue {
self.scheduler.set_bytes_remaining(*stream_id, item_size, priority);
}
});
debug_assert!(self.is_consistent());
}

pub fn buffered_amount(&self, stream_id: StreamId) -> usize {
Expand Down Expand Up @@ -556,6 +605,7 @@ impl SendQueue {
)
});
stream.priority = priority;
debug_assert!(self.is_consistent());
}

pub fn get_priority(&self, stream_id: StreamId) -> u16 {
Expand Down
26 changes: 26 additions & 0 deletions src/tx/stream_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl StreamScheduler {
if self.current_stream == Some(stream_id) {
self.current_stream = None;
}
debug_assert!(self.is_consistent());
return;
}

Expand All @@ -118,6 +119,7 @@ impl StreamScheduler {
active_stream.bytes_remaining = bytes_remaining;
active_stream.next_vt =
calculate_vt(active_stream, min(active_stream.bytes_remaining, self.max_payload_bytes));
debug_assert!(self.is_consistent());
}

/// Given space for `max_size` bytes, returns which stream that data should be produced from,
Expand Down Expand Up @@ -165,6 +167,30 @@ impl StreamScheduler {
self.current_stream = None;
}
}
debug_assert!(self.is_consistent());
}

fn is_consistent(&self) -> bool {
for (stream_id, stream) in &self.active_streams {
if stream.bytes_remaining == 0 {
log::error!("Stream {} is active but has 0 bytes remaining", stream_id);
return false;
}
if stream.next_vt < stream.start_vt {
log::error!(
"Stream {} has next_vt {} < start_vt {}",
stream_id,
stream.next_vt,
stream.start_vt
);
return false;
}
}
true
}

pub fn active_streams(&self) -> impl Iterator<Item = StreamId> + '_ {
self.active_streams.keys().cloned()
}
}

Expand Down