diff --git a/rust/fory-core/src/buffer.rs b/rust/fory-core/src/buffer.rs index 4cab51acf1..66526b403c 100644 --- a/rust/fory-core/src/buffer.rs +++ b/rust/fory-core/src/buffer.rs @@ -18,6 +18,7 @@ use crate::error::Error; use crate::float16::float16; use crate::meta::buffer_rw_string::read_latin1_simd; +use crate::stream::ForyStreamBuf; use byteorder::{ByteOrder, LittleEndian}; use std::cmp::max; @@ -506,6 +507,7 @@ impl<'a> Writer<'a> { pub struct Reader<'a> { pub(crate) bf: &'a [u8], pub(crate) cursor: usize, + pub(crate) stream: Option>, } #[allow(clippy::needless_lifetimes)] @@ -514,7 +516,44 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn new(bf: &[u8]) -> Reader<'_> { - Reader { bf, cursor: 0 } + Reader { + bf, + cursor: 0, + stream: None, + } + } + + #[inline(always)] + fn repin_stream_slice(&mut self) { + if let Some(stream) = self.stream.as_ref() { + // SAFETY: + // - stream.buffer is owned by ForyStreamBuf + // - pointer remains valid until next fill_buffer() resize + // - repin always called immediately after fill_buffer() + self.bf = unsafe { std::slice::from_raw_parts(stream.data(), stream.size()) }; + } + } + + /// Construct a stream-backed `Reader`. + pub fn from_stream(mut stream: crate::stream::ForyStreamBuf) -> Reader<'static> { + // ensure initial cursor alignment + let _ = stream.set_reader_index(0); + + let boxed = Box::new(stream); + + // pin slice to current stream buffer window + let bf = unsafe { std::slice::from_raw_parts(boxed.data(), boxed.size()) }; + + Reader { + bf, + cursor: 0, + stream: Some(boxed), + } + } + + #[inline(always)] + pub fn is_stream_backed(&self) -> bool { + self.stream.is_some() } #[inline(always)] @@ -551,29 +590,63 @@ impl<'a> Reader<'a> { self.cursor } - #[inline(always)] - fn value_at(&self, index: usize) -> Result { - match self.bf.get(index) { - None => Err(Error::buffer_out_of_bound( - index, - self.bf.len(), - self.bf.len(), - )), - Some(v) => Ok(*v), + /// Fill stream buffer up to `target_size` total bytes, then re-pin `bf`. + /// Returns `false` if stream is None OR fill failed. + fn fill_to(&mut self, target_size: usize) -> bool { + let stream = match self.stream.as_mut() { + Some(s) => s, + None => return false, + }; + // intentional: fill_buffer validates; set_reader_index only syncs read_pos + if stream.set_reader_index(self.cursor).is_err() { + return false; + } + + let n = target_size.saturating_sub(self.cursor); + if n == 0 { + self.repin_stream_slice(); + return self.bf.len() >= target_size; + } + + if stream.fill_buffer(n).is_err() { + return false; } + + self.repin_stream_slice(); + self.bf.len() >= target_size } + /// Ensure `self.cursor + n` bytes are available. + /// fast path: target <= size_ → return true + /// stream path: call fill_to(target), check again. #[inline(always)] - fn check_bound(&self, n: usize) -> Result<(), Error> { - let end = self + fn ensure_readable(&mut self, n: usize) -> Result<(), Error> { + let target = self .cursor .checked_add(n) .ok_or_else(|| Error::buffer_out_of_bound(self.cursor, n, self.bf.len()))?; - if end > self.bf.len() { - Err(Error::buffer_out_of_bound(self.cursor, n, self.bf.len())) - } else { - Ok(()) + + if target > self.bf.len() && !self.fill_to(target) { + return Err(Error::buffer_out_of_bound(self.cursor, n, self.bf.len())); } + + Ok(()) + } + + #[inline(always)] + fn value_at(&mut self, index: usize) -> Result { + if index >= self.bf.len() && !self.fill_to(index + 1) { + return Err(Error::buffer_out_of_bound(index, 1, self.bf.len())); + } + + Ok(unsafe { *self.bf.get_unchecked(index) }) + } + /// stream fill on miss. Changing to `&mut self` is the single + /// change that gives ALL 27 existing read methods stream support + /// without touching them individually — they all call this. + #[inline(always)] + fn check_bound(&mut self, n: usize) -> Result<(), Error> { + self.ensure_readable(n) } #[inline(always)] @@ -606,8 +679,15 @@ impl<'a> Reader<'a> { } } - pub fn set_cursor(&mut self, cursor: usize) { + /// `stream_->reader_index(reader_index_)` when stream-backed. + pub fn set_cursor(&mut self, cursor: usize) -> Result<(), Error> { self.cursor = cursor; + + if let Some(ref mut stream) = self.stream { + stream.set_reader_index(cursor)?; + } + + Ok(()) } // ============ BOOL (TypeId = 1) ============ @@ -723,6 +803,9 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_varuint32(&mut self) -> Result { + if self.stream.is_some() && self.bf.len().saturating_sub(self.cursor) < 5 { + return self.read_varuint32_stream(); + } let b0 = self.value_at(self.cursor)? as u32; if b0 < 0x80 { self.move_next(1); @@ -770,6 +853,9 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_varuint64(&mut self) -> Result { + if self.stream.is_some() && self.bf.len().saturating_sub(self.cursor) < 9 { + return self.read_varuint64_stream(); + } let b0 = self.value_at(self.cursor)? as u64; if b0 < 0x80 { self.move_next(1); @@ -1000,6 +1086,9 @@ impl<'a> Reader<'a> { #[inline(always)] pub fn read_varuint36small(&mut self) -> Result { + if self.stream.is_some() && self.bf.len().saturating_sub(self.cursor) < 8 { + return self.read_varuint36small_stream(); + } // Keep this API panic-free even if cursor is externally set past buffer end. self.check_bound(0)?; let start = self.cursor; @@ -1046,6 +1135,83 @@ impl<'a> Reader<'a> { } Ok(result) } + + /// Byte-by-byte varuint32 decode for stream-backed path. + fn read_varuint32_stream(&mut self) -> Result { + let mut result = 0u32; + + for i in 0..5 { + let b = self.value_at(self.cursor)? as u32; + self.cursor += 1; + + if i == 4 && (b & 0xF0) != 0 { + return Err(Error::encode_error("var_uint32 overflow")); + } + + result |= (b & 0x7F) << (i * 7); + + if (b & 0x80) == 0 { + return Ok(result); + } + } + + Err(Error::encode_error("Invalid var_uint32 encoding")) + } + + /// Byte-by-byte varuint64 decode for stream-backed path. + fn read_varuint64_stream(&mut self) -> Result { + let mut result = 0u64; + + for i in 0..8u64 { + let b = self.value_at(self.cursor)? as u64; + self.cursor += 1; + + result |= (b & 0x7F) << (i * 7); + + if (b & 0x80) == 0 { + return Ok(result); + } + } + + // 9th byte contains full 8 bits + let b = self.value_at(self.cursor)? as u64; + self.cursor += 1; + + result |= b << 56; + + Ok(result) + } + + /// Byte-by-byte varuint36small decode for stream-backed path. + fn read_varuint36small_stream(&mut self) -> Result { + let mut result = 0u64; + + for i in 0..4u64 { + let b = self.value_at(self.cursor)? as u64; + self.cursor += 1; + + result |= (b & 0x7F) << (i * 7); + + if (b & 0x80) == 0 { + return Ok(result); + } + } + + let b = self.value_at(self.cursor)? as u64; + self.cursor += 1; + + if b >= (1 << 8) { + return Err(Error::encode_error("var_uint36small overflow")); + } + + result |= b << 28; + + if result >= (1u64 << 36) { + return Err(Error::encode_error("var_uint36small overflow")); + } + + Ok(result) + } } #[allow(clippy::needless_lifetimes)] diff --git a/rust/fory-core/src/fory.rs b/rust/fory-core/src/fory.rs index d4e2359b33..b4ba9510b9 100644 --- a/rust/fory-core/src/fory.rs +++ b/rust/fory-core/src/fory.rs @@ -23,9 +23,11 @@ use crate::resolver::context::{ContextCache, ReadContext, WriteContext}; use crate::resolver::type_resolver::TypeResolver; use crate::serializer::ForyDefault; use crate::serializer::{Serializer, StructSerializer}; +use crate::stream::ForyStreamBuf; use crate::types::config_flags::{IS_CROSS_LANGUAGE_FLAG, IS_NULL_FLAG}; use crate::types::{RefMode, SIZE_OF_REF_AND_TYPE}; use std::cell::UnsafeCell; +use std::io::Read; use std::mem; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::OnceLock; @@ -990,13 +992,69 @@ impl Fory { reader: &mut Reader, ) -> Result { self.with_read_context(|context| { - let outlive_buffer = unsafe { mem::transmute::<&[u8], &[u8]>(reader.bf) }; - let mut new_reader = Reader::new(outlive_buffer); - new_reader.set_cursor(reader.cursor); - context.attach_reader(new_reader); + if reader.is_stream_backed() { + // STREAM PATH — single attach + let stream = mem::take(&mut reader.stream) + .expect("is_stream_backed was true but stream is None"); + let cursor = reader.cursor; + let mut stream_reader = Reader::from_stream(*stream); + let _ = stream_reader.set_cursor(cursor); + context.attach_reader(stream_reader); + + let result = self.deserialize_with_context(context); + let returned = context.detach_reader(); + + reader.cursor = returned.cursor; + reader.stream = returned.stream; + + if let Some(ref mut s) = reader.stream { + let _ = s.set_reader_index(reader.cursor); + s.shrink_buffer(); + reader.bf = unsafe { std::slice::from_raw_parts(s.data(), s.size()) }; + reader.cursor = s.reader_index(); + } + result + } else { + // IN-MEMORY PATH — unchanged fast path + let outlive_buffer = unsafe { mem::transmute::<&[u8], &[u8]>(reader.bf) }; + let mut new_reader = Reader::new(outlive_buffer); + let _ = new_reader.set_cursor(reader.cursor); + context.attach_reader(new_reader); + + let result = self.deserialize_with_context(context); + let end = context.detach_reader().get_cursor(); + let _ = reader.set_cursor(end); + result + } + }) + } + + /// Deserializes a single value of type `T` from any `Read` source. + /// + /// Equivalent of C++ `fory.deserialize(Buffer(ForyInputStream(source)))`. + /// Internally wraps the source in a [`crate::stream::ForyStreamBuf`] and calls + /// [`deserialize_from`](Self::deserialize_from). + /// + /// For deserializing **multiple values sequentially** from one stream + /// (e.g. a network socket or pipe), create the reader once and reuse it: + /// + /// ```rust,ignore + /// use fory_core::{Fory, Reader}; + /// use fory_core::stream::ForyStreamBuf; + /// + /// let fory = Fory::default(); + /// let mut reader = Reader::from_stream(ForyStreamBuf::new(my_socket)); + /// let first: i32 = fory.deserialize_from(&mut reader).unwrap(); + /// let second: String = fory.deserialize_from(&mut reader).unwrap(); + /// ``` + pub fn deserialize_from_stream( + &self, + source: impl Read + Send + 'static, + ) -> Result { + self.with_read_context(|context| { + context.attach_reader(Reader::from_stream(ForyStreamBuf::new(source))); let result = self.deserialize_with_context(context); - let end = context.detach_reader().get_cursor(); - reader.set_cursor(end); + context.detach_reader(); result }) } diff --git a/rust/fory-core/src/lib.rs b/rust/fory-core/src/lib.rs index 976a760af6..bee8b59b49 100644 --- a/rust/fory-core/src/lib.rs +++ b/rust/fory-core/src/lib.rs @@ -185,6 +185,7 @@ pub mod meta; pub mod resolver; pub mod row; pub mod serializer; +pub mod stream; pub mod types; pub use float16::float16 as Float16; pub mod util; @@ -201,3 +202,4 @@ pub use crate::resolver::type_resolver::{TypeInfo, TypeResolver}; pub use crate::serializer::weak::{ArcWeak, RcWeak}; pub use crate::serializer::{read_data, write_data, ForyDefault, Serializer, StructSerializer}; pub use crate::types::{RefFlag, RefMode, TypeId}; +pub use stream::ForyStreamBuf; diff --git a/rust/fory-core/src/stream.rs b/rust/fory-core/src/stream.rs new file mode 100644 index 0000000000..b4530ccd07 --- /dev/null +++ b/rust/fory-core/src/stream.rs @@ -0,0 +1,218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Error; +use std::io::{self, Read}; + +const DEFAULT_BUFFER_SIZE: usize = 4096; + +/// Growable internal buffer backed by any [`Read`] source. +/// +/// Bytes are pulled from the source on demand via [`fill_buffer`]. +/// The buffer grows automatically and can be compacted via [`shrink_buffer`]. +/// +/// # Buffer size limit +/// The internal buffer is capped at `u32::MAX` bytes (~4 GiB). +/// Requesting more than this returns [`Error::buffer_out_of_bound`]. +pub struct ForyStreamBuf { + source: Box, + buffer: Vec, + /// Bytes available from source: `buffer[0..valid_len]` + valid_len: usize, + /// Current read position: `buffer[read_pos..valid_len]` is unread + read_pos: usize, + initial_buffer_size: usize, +} + +impl ForyStreamBuf { + pub fn new(source: R) -> Self { + Self::with_capacity(source, DEFAULT_BUFFER_SIZE) + } + + pub fn with_capacity(source: R, buffer_size: usize) -> Self { + let cap = buffer_size.max(1); + Self { + source: Box::new(source), + buffer: vec![0u8; cap], + valid_len: 0, + read_pos: 0, + initial_buffer_size: cap, + } + } + + /// Pulls bytes from the source until at least `min_fill_size` unread bytes + /// are available. Returns `Err` on EOF, I/O error, or 4 GiB overflow. + pub fn fill_buffer(&mut self, min_fill_size: usize) -> Result<(), Error> { + if min_fill_size == 0 || self.remaining() >= min_fill_size { + return Ok(()); + } + + let need = min_fill_size - self.remaining(); + + let required = self + .valid_len + .checked_add(need) + .filter(|&r| r <= u32::MAX as usize) + .ok_or_else(|| { + Error::buffer_out_of_bound(self.read_pos, min_fill_size, self.remaining()) + })?; + + if required > self.buffer.len() { + let new_cap = (self.buffer.len() * 2).max(required); + self.buffer.resize(new_cap, 0); + } + + while self.remaining() < min_fill_size { + let writable = self.buffer.len() - self.valid_len; + if writable == 0 { + let new_cap = self + .buffer + .len() + .checked_mul(2) + .and_then(|n| n.checked_add(1)) + .filter(|&n| n <= u32::MAX as usize) + .ok_or_else(|| { + Error::buffer_out_of_bound(self.read_pos, min_fill_size, self.remaining()) + })?; + self.buffer.resize(new_cap, 0); + } + + match self.source.read(&mut self.buffer[self.valid_len..]) { + Ok(0) => { + return Err(Error::buffer_out_of_bound( + self.read_pos, + min_fill_size, + self.remaining(), + )); + } + Ok(n) => self.valid_len += n, + Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, + Err(_) => { + return Err(Error::buffer_out_of_bound( + self.read_pos, + min_fill_size, + self.remaining(), + )); + } + } + } + Ok(()) + } + + /// Moves the read cursor backward by `size` bytes. + /// Returns `Err` if `size > read_pos`. + pub fn rewind(&mut self, size: usize) -> Result<(), Error> { + if size > self.read_pos { + return Err(Error::buffer_out_of_bound( + self.read_pos, + size, + self.valid_len, + )); + } + self.read_pos -= size; + Ok(()) + } + + /// Advances the read cursor by `size` bytes without reading from source. + /// Returns `Err` if `size > remaining()`. + pub fn consume(&mut self, size: usize) -> Result<(), Error> { + if size > self.remaining() { + return Err(Error::buffer_out_of_bound( + self.read_pos, + size, + self.remaining(), + )); + } + self.read_pos += size; + Ok(()) + } + + /// Raw pointer to the start of the internal buffer window. + /// + /// # Safety + /// Valid until the next [`fill_buffer`] call that causes reallocation. + /// Always re-derive this pointer after any `fill_buffer` call. + #[inline(always)] + pub(crate) fn data(&self) -> *const u8 { + self.buffer.as_ptr() + } + + /// Total bytes fetched from source. + #[inline(always)] + pub fn size(&self) -> usize { + self.valid_len + } + + /// Current read cursor position. + #[inline(always)] + pub fn reader_index(&self) -> usize { + self.read_pos + } + + /// Sets the read cursor to `index`. Returns `Err` if `index > valid_len`. + #[inline(always)] + pub(crate) fn set_reader_index(&mut self, index: usize) -> Result<(), Error> { + if index > self.valid_len { + return Err(Error::buffer_out_of_bound(index, 0, self.valid_len)); + } + self.read_pos = index; + Ok(()) + } + + /// Unread bytes currently available. + #[inline(always)] + pub fn remaining(&self) -> usize { + self.valid_len.saturating_sub(self.read_pos) + } + + /// Compacts consumed bytes and optionally reduces buffer capacity. + /// + /// **Phase 1:** Always moves unread bytes to offset 0. + /// + /// **Phase 2:** Shrinks capacity only when it has grown beyond + /// `initial_buffer_size` and current utilization is low (≤ 25%). + pub fn shrink_buffer(&mut self) { + let remaining = self.remaining(); + + if self.read_pos > 0 { + if remaining > 0 { + self.buffer.copy_within(self.read_pos..self.valid_len, 0); + } + self.read_pos = 0; + self.valid_len = remaining; + } + + let current_capacity = self.buffer.len(); + if current_capacity <= self.initial_buffer_size { + return; + } + + let target_capacity = if remaining == 0 { + self.initial_buffer_size + } else if remaining <= current_capacity / 4 { + let doubled = remaining.saturating_mul(2).max(1); + self.initial_buffer_size.max(doubled) + } else { + current_capacity + }; + + if target_capacity < current_capacity { + // Reduce logical size but keep allocation to avoid allocator churn + self.buffer.truncate(target_capacity); + } + } +} diff --git a/rust/tests/tests/test_stream.rs b/rust/tests/tests/test_stream.rs new file mode 100644 index 0000000000..447a291dcd --- /dev/null +++ b/rust/tests/tests/test_stream.rs @@ -0,0 +1,312 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[cfg(test)] +mod stream_tests { + use fory_core::buffer::Reader; + use fory_core::stream::ForyStreamBuf; + use fory_core::Fory; + use std::fmt::Debug; + use std::io::{Cursor, Read}; + + /// Reader that returns exactly one byte per read call. + /// This stresses the streaming deserializer. + struct OneByte(Cursor>); + + impl Read for OneByte { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + if buf.is_empty() { + return Ok(0); + } + + let mut one = [0u8; 1]; + + match self.0.read(&mut one)? { + 0 => Ok(0), + _ => { + buf[0] = one[0]; + Ok(1) + } + } + } + } + + /// Helper that verifies both in-memory and streaming paths produce identical results. + fn deserialize_helper(fory: &Fory, bytes: &[u8]) -> T + where + T: fory_core::Serializer + fory_core::ForyDefault + PartialEq + Debug, + { + let expected: T = fory + .deserialize(bytes) + .expect("in-memory deserialize failed"); + + let actual: T = fory + .deserialize_from_stream(OneByte(Cursor::new(bytes.to_vec()))) + .expect("stream deserialize failed"); + + assert_eq!( + expected, actual, + "stream and in-memory deserialization results differ" + ); + + expected + } + + // ── Primitive and String ──────────────────────────────────────────────── + + #[test] + fn test_primitive_and_string_round_trip() { + let fory = Fory::default(); + + let bytes = fory.serialize(&-9876543212345i64).unwrap(); + assert_eq!(deserialize_helper::(&fory, &bytes), -9876543212345i64); + + let bytes = fory.serialize(&"stream-hello-世界".to_string()).unwrap(); + assert_eq!( + deserialize_helper::(&fory, &bytes), + "stream-hello-世界" + ); + } + + #[test] + fn test_additional_primitives() { + let fory = Fory::default(); + + let bytes = fory.serialize(&true).unwrap(); + assert!(deserialize_helper::(&fory, &bytes)); + + let bytes = fory.serialize(&-42i32).unwrap(); + assert_eq!(deserialize_helper::(&fory, &bytes), -42i32); + + let bytes = fory.serialize(&std::f64::consts::PI).unwrap(); + assert_eq!( + deserialize_helper::(&fory, &bytes), + std::f64::consts::PI + ); + } + + // ── Large values exercising multi-byte varint paths ───────────────────── + + #[test] + fn test_varuint64_boundary_round_trip() { + let fory = Fory::default(); + + for val in [i64::MAX, i64::MIN, 1i64 << 56, -(1i64 << 56), i64::MAX - 1] { + let bytes = fory.serialize(&val).unwrap(); + assert_eq!( + deserialize_helper::(&fory, &bytes), + val, + "round-trip failed for {}", + val + ); + } + } + + #[test] + fn test_varuint36small_boundary_round_trip() { + let fory = Fory::default(); + + let large_vec: Vec = (0..500).collect(); + let bytes = fory.serialize(&large_vec).unwrap(); + + assert_eq!(deserialize_helper::>(&fory, &bytes), large_vec); + } + + // ── Vec round-trip ───────────────────────────────────────────────────── + + #[test] + fn test_vec_round_trip() { + let fory = Fory::default(); + let vec = vec![1i32, 2, 3, 5, 8]; + + let bytes = fory.serialize(&vec).unwrap(); + + assert_eq!(deserialize_helper::>(&fory, &bytes), vec); + } + + // ── Struct round-trip ────────────────────────────────────────────────── + + #[derive(Debug, PartialEq, fory_derive::ForyObject)] + struct Point { + x: i32, + y: i32, + } + + #[test] + fn test_struct_round_trip() { + let mut fory = Fory::default(); + fory.register::(1).unwrap(); + + let point = Point { x: 42, y: -7 }; + let bytes = fory.serialize(&point).unwrap(); + + assert_eq!(deserialize_helper::(&fory, &bytes), point); + } + + // ── Sequential multi-object stream decode ────────────────────────────── + + #[test] + fn test_sequential_stream_reads() { + let fory = Fory::default(); + + let mut bytes = Vec::new(); + + fory.serialize_to(&mut bytes, &12345i32).unwrap(); + fory.serialize_to(&mut bytes, &"next-value".to_string()) + .unwrap(); + fory.serialize_to(&mut bytes, &99i64).unwrap(); + + let mut reader = Reader::from_stream(ForyStreamBuf::new(OneByte(Cursor::new(bytes)))); + + let first: i32 = fory.deserialize_from(&mut reader).unwrap(); + let second: String = fory.deserialize_from(&mut reader).unwrap(); + let third: i64 = fory.deserialize_from(&mut reader).unwrap(); + + assert_eq!(first, 12345); + assert_eq!(second, "next-value"); + assert_eq!(third, 99); + } + + // ── Truncated stream must return Err ─────────────────────────────────── + + #[test] + fn test_truncated_stream_returns_error() { + let fory = Fory::default(); + + let mut bytes = fory.serialize(&"hello world".to_string()).unwrap(); + bytes.pop(); + + let result: Result = fory.deserialize_from_stream(Cursor::new(bytes)); + + assert!(result.is_err()); + } + + // ── shrink_buffer compaction behavior ────────────────────────────────── + + #[test] + fn test_shrink_between_sequential_reads() { + let fory = Fory::default(); + + let mut bytes = Vec::new(); + + fory.serialize_to(&mut bytes, &42i32).unwrap(); + fory.serialize_to(&mut bytes, &"shrink-test".to_string()) + .unwrap(); + fory.serialize_to(&mut bytes, &100i64).unwrap(); + + let mut reader = + Reader::from_stream(ForyStreamBuf::with_capacity(OneByte(Cursor::new(bytes)), 4)); + + assert_eq!(fory.deserialize_from::(&mut reader).unwrap(), 42); + assert_eq!( + fory.deserialize_from::(&mut reader).unwrap(), + "shrink-test" + ); + assert_eq!(fory.deserialize_from::(&mut reader).unwrap(), 100); + } + + // ── ForyStreamBuf unit tests ─────────────────────────────────────────── + + mod buf_tests { + use fory_core::stream::ForyStreamBuf; + use std::io::{Cursor, Read}; + + struct OneByteCursor(Cursor>); + + impl Read for OneByteCursor { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + if buf.is_empty() { + return Ok(0); + } + + let mut one = [0u8; 1]; + + match self.0.read(&mut one)? { + 0 => Ok(0), + _ => { + buf[0] = one[0]; + Ok(1) + } + } + } + } + + #[test] + fn test_rewind_ok() { + let mut s = + ForyStreamBuf::with_capacity(OneByteCursor(Cursor::new(vec![1, 2, 3, 4, 5])), 2); + + s.fill_buffer(4).unwrap(); + s.consume(3).unwrap(); + + assert_eq!(s.reader_index(), 3); + + s.rewind(2).unwrap(); + + assert_eq!(s.reader_index(), 1); + } + + #[test] + fn test_rewind_err_on_overrun() { + let mut s = ForyStreamBuf::new(Cursor::new(vec![1, 2])); + s.fill_buffer(2).unwrap(); + s.consume(1).unwrap(); + + assert!(s.rewind(2).is_err()); + } + + #[test] + fn test_consume_err_on_overrun() { + let mut s = ForyStreamBuf::new(Cursor::new(vec![1])); + s.fill_buffer(1).unwrap(); + + assert!(s.consume(2).is_err()); + } + + #[test] + fn test_short_read_returns_error() { + let mut s = ForyStreamBuf::new(Cursor::new(vec![1, 2, 3])); + assert!(s.fill_buffer(4).is_err()); + } + + #[test] + fn test_sequential_fill() { + let data: Vec = (0u8..=9).collect(); + let mut s = ForyStreamBuf::with_capacity(OneByteCursor(Cursor::new(data)), 2); + + s.fill_buffer(3).unwrap(); + assert!(s.remaining() >= 3); + + s.consume(3).unwrap(); + + s.fill_buffer(3).unwrap(); + assert!(s.remaining() >= 3); + } + + #[test] + fn test_shrink_phase1_compacts() { + let mut s = ForyStreamBuf::new(Cursor::new(vec![0u8; 8])); + s.fill_buffer(8).unwrap(); + s.consume(6).unwrap(); + + s.shrink_buffer(); + + assert_eq!(s.reader_index(), 0); + assert_eq!(s.remaining(), 2); + } + } +}