From eb7f6a81daccc614d24b9e247d4af4e2c67e0bf4 Mon Sep 17 00:00:00 2001 From: chaokunyang Date: Wed, 4 Mar 2026 11:16:27 +0800 Subject: [PATCH 1/5] shrink stream buffers after struct deserialization --- cpp/fory/serialization/stream_test.cc | 44 ++++++++++++++++++++++ cpp/fory/serialization/struct_serializer.h | 4 ++ cpp/fory/util/buffer.h | 2 +- python/pyfory/buffer.pxi | 3 ++ python/pyfory/includes/libutil.pxd | 2 + python/pyfory/struct.pxi | 1 + python/pyfory/struct.py | 1 + python/pyfory/tests/test_stream.py | 20 ++++++++++ 8 files changed, 76 insertions(+), 1 deletion(-) diff --git a/cpp/fory/serialization/stream_test.cc b/cpp/fory/serialization/stream_test.cc index 98783a37e7..7985587c00 100644 --- a/cpp/fory/serialization/stream_test.cc +++ b/cpp/fory/serialization/stream_test.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -310,6 +311,49 @@ TEST(StreamSerializationTest, SerializeToOStreamOverloadParity) { EXPECT_EQ(out.data(), expected.value()); } +TEST(StreamSerializationTest, + StructDeserializeFromStreamBackedBufferShrinksAfterEachStruct) { + auto fory = Fory::builder().xlang(true).track_ref(true).build(); + register_stream_types(fory); + + std::vector first_values; + std::vector second_values; + first_values.reserve(6000); + second_values.reserve(6000); + for (int32_t i = 0; i < 6000; ++i) { + first_values.push_back(i); + second_values.push_back(6000 - i); + } + + StreamEnvelope first{ + "first", std::move(first_values), {{"a", 11}, {"b", 22}}, {7, 8}, true, + }; + StreamEnvelope second{ + "second", std::move(second_values), {{"c", 33}, {"d", 44}}, {9, 10}, + false, + }; + + std::vector bytes; + ASSERT_TRUE(fory.serialize_to(bytes, first).ok()); + ASSERT_TRUE(fory.serialize_to(bytes, second).ok()); + + std::string payload(reinterpret_cast(bytes.data()), + bytes.size()); + std::istringstream source(payload); + StdInputStream stream(source, 4096); + Buffer &buffer = stream.get_buffer(); + + auto first_result = fory.deserialize(buffer); + ASSERT_TRUE(first_result.ok()) << first_result.error().to_string(); + EXPECT_EQ(first_result.value(), first); + EXPECT_EQ(buffer.reader_index(), 0U); + + auto second_result = fory.deserialize(buffer); + ASSERT_TRUE(second_result.ok()) << second_result.error().to_string(); + EXPECT_EQ(second_result.value(), second); + EXPECT_EQ(buffer.reader_index(), 0U); +} + } // namespace test } // namespace serialization } // namespace fory diff --git a/cpp/fory/serialization/struct_serializer.h b/cpp/fory/serialization/struct_serializer.h index ccde0e480a..e4ceb6bb50 100644 --- a/cpp/fory/serialization/struct_serializer.h +++ b/cpp/fory/serialization/struct_serializer.h @@ -3270,6 +3270,7 @@ struct Serializer>> { if (FORY_PREDICT_FALSE(ctx.has_error())) { return T{}; } + ctx.buffer().shrink_stream_buffer(); return obj; } @@ -3279,6 +3280,7 @@ struct Serializer>> { if (FORY_PREDICT_FALSE(ctx.has_error())) { return T{}; } + ctx.buffer().shrink_stream_buffer(); return obj; } @@ -3290,6 +3292,7 @@ struct Serializer>> { return T{}; } + ctx.buffer().shrink_stream_buffer(); return obj; } @@ -3333,6 +3336,7 @@ struct Serializer>> { return T{}; } + ctx.buffer().shrink_stream_buffer(); return obj; } diff --git a/cpp/fory/util/buffer.h b/cpp/fory/util/buffer.h index a44de2d759..9c80fb4396 100644 --- a/cpp/fory/util/buffer.h +++ b/cpp/fory/util/buffer.h @@ -136,7 +136,7 @@ class Buffer { } FORY_ALWAYS_INLINE void shrink_stream_buffer() { - if (input_stream_ != nullptr) { + if (FORY_PREDICT_FALSE(input_stream_ != nullptr && reader_index_ > 4096)) { input_stream_->shrink_buffer(); } } diff --git a/python/pyfory/buffer.pxi b/python/pyfory/buffer.pxi index d9c66e2b5b..6d950ef324 100644 --- a/python/pyfory/buffer.pxi +++ b/python/pyfory/buffer.pxi @@ -244,6 +244,9 @@ cdef class Buffer: raise ValueError("writer_index must be >= 0") self.c_buffer.writer_index(value) + cpdef inline void shrink_stream_buffer(self): + self.c_buffer.shrink_stream_buffer() + cpdef c_bool own_data(self): return self.c_buffer.own_data() diff --git a/python/pyfory/includes/libutil.pxd b/python/pyfory/includes/libutil.pxd index b0d8914e33..56af95aef5 100644 --- a/python/pyfory/includes/libutil.pxd +++ b/python/pyfory/includes/libutil.pxd @@ -205,6 +205,8 @@ cdef extern from "fory/util/buffer.h" namespace "fory" nogil: void skip(uint32_t length, CError& error) + void shrink_stream_buffer() + void copy(uint32_t start, uint32_t nbytes, uint8_t* out, uint32_t offset) const diff --git a/python/pyfory/struct.pxi b/python/pyfory/struct.pxi index 659f28121e..26c2786823 100644 --- a/python/pyfory/struct.pxi +++ b/python/pyfory/struct.pxi @@ -384,6 +384,7 @@ cdef class DataClassSerializer(Serializer): self._apply_missing_defaults_slots(obj) else: self._apply_missing_defaults_dict(obj.__dict__) + buffer.shrink_stream_buffer() return obj cdef inline void _read_dict(self, Buffer buffer, object obj): diff --git a/python/pyfory/struct.py b/python/pyfory/struct.py index cb61db3429..c3c5e38a74 100644 --- a/python/pyfory/struct.py +++ b/python/pyfory/struct.py @@ -572,6 +572,7 @@ def read(self, buffer): obj_dict[field_name] = value else: setattr(obj, field_name, value) + buffer.shrink_stream_buffer() return obj diff --git a/python/pyfory/tests/test_stream.py b/python/pyfory/tests/test_stream.py index 3276c40542..43ec54d3e6 100644 --- a/python/pyfory/tests/test_stream.py +++ b/python/pyfory/tests/test_stream.py @@ -16,6 +16,7 @@ # under the License. from dataclasses import dataclass +import io import pytest @@ -183,6 +184,25 @@ def test_stream_deserialize_multiple_objects_from_single_stream(xlang): assert reader.get_reader_index() == reader.size() +@pytest.mark.parametrize("xlang", [False, True]) +def test_stream_backed_buffer_struct_deserialize_shrinks_each_struct(xlang): + fory = pyfory.Fory(xlang=xlang, ref=True) + fory.register(StreamStructValue) + first = StreamStructValue(101, "first", list(range(6000))) + second = StreamStructValue(202, "second", list(range(6000, 0, -1))) + + payload = fory.dumps(first) + fory.dumps(second) + reader = Buffer.from_stream(io.BytesIO(payload), 4096) + + first_result = fory.deserialize(reader) + assert first_result == first + assert reader.get_reader_index() == 0 + + second_result = fory.deserialize(reader) + assert second_result == second + assert reader.get_reader_index() == 0 + + @pytest.mark.parametrize("xlang", [False, True]) def test_stream_deserialize_truncated_error(xlang): fory = pyfory.Fory(xlang=xlang, ref=True) From 9a5ab0e7dbbeab3676de42692978224bdfaaf5b1 Mon Sep 17 00:00:00 2001 From: chaokunyang Date: Wed, 4 Mar 2026 11:19:08 +0800 Subject: [PATCH 2/5] Rename shrink_stream_buffer to shrink_input_buffer --- cpp/fory/serialization/struct_serializer.h | 8 ++++---- cpp/fory/util/buffer.h | 2 +- python/pyfory/buffer.pxi | 4 ++-- python/pyfory/includes/libutil.pxd | 2 +- python/pyfory/struct.pxi | 2 +- python/pyfory/struct.py | 2 +- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/cpp/fory/serialization/struct_serializer.h b/cpp/fory/serialization/struct_serializer.h index e4ceb6bb50..927fa52113 100644 --- a/cpp/fory/serialization/struct_serializer.h +++ b/cpp/fory/serialization/struct_serializer.h @@ -3270,7 +3270,7 @@ struct Serializer>> { if (FORY_PREDICT_FALSE(ctx.has_error())) { return T{}; } - ctx.buffer().shrink_stream_buffer(); + ctx.buffer().shrink_input_buffer(); return obj; } @@ -3280,7 +3280,7 @@ struct Serializer>> { if (FORY_PREDICT_FALSE(ctx.has_error())) { return T{}; } - ctx.buffer().shrink_stream_buffer(); + ctx.buffer().shrink_input_buffer(); return obj; } @@ -3292,7 +3292,7 @@ struct Serializer>> { return T{}; } - ctx.buffer().shrink_stream_buffer(); + ctx.buffer().shrink_input_buffer(); return obj; } @@ -3336,7 +3336,7 @@ struct Serializer>> { return T{}; } - ctx.buffer().shrink_stream_buffer(); + ctx.buffer().shrink_input_buffer(); return obj; } diff --git a/cpp/fory/util/buffer.h b/cpp/fory/util/buffer.h index 9c80fb4396..df12100522 100644 --- a/cpp/fory/util/buffer.h +++ b/cpp/fory/util/buffer.h @@ -135,7 +135,7 @@ class Buffer { output_stream_ = nullptr; } - FORY_ALWAYS_INLINE void shrink_stream_buffer() { + FORY_ALWAYS_INLINE void shrink_input_buffer() { if (FORY_PREDICT_FALSE(input_stream_ != nullptr && reader_index_ > 4096)) { input_stream_->shrink_buffer(); } diff --git a/python/pyfory/buffer.pxi b/python/pyfory/buffer.pxi index 6d950ef324..d60392eb61 100644 --- a/python/pyfory/buffer.pxi +++ b/python/pyfory/buffer.pxi @@ -244,8 +244,8 @@ cdef class Buffer: raise ValueError("writer_index must be >= 0") self.c_buffer.writer_index(value) - cpdef inline void shrink_stream_buffer(self): - self.c_buffer.shrink_stream_buffer() + cpdef inline void shrink_input_buffer(self): + self.c_buffer.shrink_input_buffer() cpdef c_bool own_data(self): return self.c_buffer.own_data() diff --git a/python/pyfory/includes/libutil.pxd b/python/pyfory/includes/libutil.pxd index 56af95aef5..097ed3ae88 100644 --- a/python/pyfory/includes/libutil.pxd +++ b/python/pyfory/includes/libutil.pxd @@ -205,7 +205,7 @@ cdef extern from "fory/util/buffer.h" namespace "fory" nogil: void skip(uint32_t length, CError& error) - void shrink_stream_buffer() + void shrink_input_buffer() void copy(uint32_t start, uint32_t nbytes, uint8_t* out, uint32_t offset) const diff --git a/python/pyfory/struct.pxi b/python/pyfory/struct.pxi index 26c2786823..6da7e7f9f3 100644 --- a/python/pyfory/struct.pxi +++ b/python/pyfory/struct.pxi @@ -384,7 +384,7 @@ cdef class DataClassSerializer(Serializer): self._apply_missing_defaults_slots(obj) else: self._apply_missing_defaults_dict(obj.__dict__) - buffer.shrink_stream_buffer() + buffer.shrink_input_buffer() return obj cdef inline void _read_dict(self, Buffer buffer, object obj): diff --git a/python/pyfory/struct.py b/python/pyfory/struct.py index c3c5e38a74..bc24dea931 100644 --- a/python/pyfory/struct.py +++ b/python/pyfory/struct.py @@ -572,7 +572,7 @@ def read(self, buffer): obj_dict[field_name] = value else: setattr(obj, field_name, value) - buffer.shrink_stream_buffer() + buffer.shrink_input_buffer() return obj From 68bb14d5435fe83b2cef7eb8da2ce691f5213387 Mon Sep 17 00:00:00 2001 From: chaokunyang Date: Wed, 4 Mar 2026 11:31:48 +0800 Subject: [PATCH 3/5] Rename stream-backed checks to input/output stream helpers --- cpp/fory/serialization/struct_serializer.h | 4 ++-- cpp/fory/util/buffer.h | 4 ++-- cpp/fory/util/buffer_test.cc | 4 ++-- python/pyfory/cpp/pyfory.cc | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cpp/fory/serialization/struct_serializer.h b/cpp/fory/serialization/struct_serializer.h index 927fa52113..4fe76cff2b 100644 --- a/cpp/fory/serialization/struct_serializer.h +++ b/cpp/fory/serialization/struct_serializer.h @@ -2773,7 +2773,7 @@ void read_struct_fields_impl(T &obj, ReadContext &ctx, // Note: varint bounds checking is done per-byte during reading since // varint lengths are variable (actual size << max possible size) if constexpr (varint_count > 0) { - if (FORY_PREDICT_FALSE(buffer.is_stream_backed())) { + if (FORY_PREDICT_FALSE(buffer.has_input_stream())) { // Stream-backed buffers may not have all varint bytes materialized yet. // Fall back to per-field readers that propagate stream read errors. read_remaining_fields(obj, ctx); @@ -2828,7 +2828,7 @@ read_struct_fields_impl_fast(T &obj, ReadContext &ctx, // Phase 2: Read consecutive varint primitives (int32, int64) if any if constexpr (varint_count > 0) { - if (FORY_PREDICT_FALSE(buffer.is_stream_backed())) { + if (FORY_PREDICT_FALSE(buffer.has_input_stream())) { // Stream-backed buffers may not have all varint bytes materialized yet. // Fall back to per-field readers that propagate stream read errors. read_remaining_fields(obj, ctx); diff --git a/cpp/fory/util/buffer.h b/cpp/fory/util/buffer.h index df12100522..0143c58b69 100644 --- a/cpp/fory/util/buffer.h +++ b/cpp/fory/util/buffer.h @@ -106,11 +106,11 @@ class Buffer { FORY_ALWAYS_INLINE bool own_data() const { return own_data_; } - FORY_ALWAYS_INLINE bool is_stream_backed() const { + FORY_ALWAYS_INLINE bool has_input_stream() const { return input_stream_ != nullptr; } - FORY_ALWAYS_INLINE bool is_output_stream_backed() const { + FORY_ALWAYS_INLINE bool has_output_stream() const { return output_stream_ != nullptr; } diff --git a/cpp/fory/util/buffer_test.cc b/cpp/fory/util/buffer_test.cc index 7f32c06850..b227505767 100644 --- a/cpp/fory/util/buffer_test.cc +++ b/cpp/fory/util/buffer_test.cc @@ -398,8 +398,8 @@ TEST(Buffer, OutputStreamRebindDetachesPreviousBufferBacklink) { first->bind_output_stream(&writer); second->bind_output_stream(&writer); - EXPECT_FALSE(first->is_output_stream_backed()); - EXPECT_TRUE(second->is_output_stream_backed()); + EXPECT_FALSE(first->has_output_stream()); + EXPECT_TRUE(second->has_output_stream()); writer.enter_flush_barrier(); std::vector second_payload(5000, 7); diff --git a/python/pyfory/cpp/pyfory.cc b/python/pyfory/cpp/pyfory.cc index 13c28103b4..217ff20d98 100644 --- a/python/pyfory/cpp/pyfory.cc +++ b/python/pyfory/cpp/pyfory.cc @@ -1491,13 +1491,13 @@ int Fory_PyPrimitiveCollectionReadFromBuffer(PyObject *collection, "tuple collection size is smaller than requested read size"); return -1; } - if (!buffer->is_stream_backed() && kind == PythonCollectionKind::List) { + if (!buffer->has_input_stream() && kind == PythonCollectionKind::List) { return read_primitive_sequence_indexed( buffer, size, type_id, [collection](Py_ssize_t i, PyObject *item) { PyList_SET_ITEM(collection, i, item); }); } - if (!buffer->is_stream_backed() && kind == PythonCollectionKind::Tuple) { + if (!buffer->has_input_stream() && kind == PythonCollectionKind::Tuple) { return read_primitive_sequence_indexed( buffer, size, type_id, [collection](Py_ssize_t i, PyObject *item) { PyTuple_SET_ITEM(collection, i, item); From af0d7a6e81471dc31b34244abd85cadaffe08053 Mon Sep 17 00:00:00 2001 From: chaokunyang Date: Wed, 4 Mar 2026 11:46:07 +0800 Subject: [PATCH 4/5] Make input stream shrink_buffer best-effort --- cpp/fory/serialization/stream_test.cc | 9 ++++++--- cpp/fory/util/buffer.h | 4 ++++ cpp/fory/util/buffer_test.cc | 29 +++++++++++++++++++++++++++ cpp/fory/util/stream.cc | 8 ++++++++ cpp/fory/util/stream.h | 3 +++ python/pyfory/cpp/pyfory.cc | 8 ++++++++ 6 files changed, 58 insertions(+), 3 deletions(-) diff --git a/cpp/fory/serialization/stream_test.cc b/cpp/fory/serialization/stream_test.cc index 7985587c00..0c73bb7aca 100644 --- a/cpp/fory/serialization/stream_test.cc +++ b/cpp/fory/serialization/stream_test.cc @@ -219,17 +219,20 @@ TEST(StreamSerializationTest, SequentialDeserializeFromSingleStream) { auto first = fory.deserialize(stream); ASSERT_TRUE(first.ok()) << first.error().to_string(); EXPECT_EQ(first.value(), 12345); - EXPECT_EQ(stream.get_buffer().reader_index(), 0U); + const uint32_t first_reader_index = stream.get_buffer().reader_index(); + EXPECT_GT(first_reader_index, 0U); auto second = fory.deserialize(stream); ASSERT_TRUE(second.ok()) << second.error().to_string(); EXPECT_EQ(second.value(), "next-value"); - EXPECT_EQ(stream.get_buffer().reader_index(), 0U); + const uint32_t second_reader_index = stream.get_buffer().reader_index(); + EXPECT_GT(second_reader_index, first_reader_index); auto third = fory.deserialize(stream); ASSERT_TRUE(third.ok()) << third.error().to_string(); EXPECT_EQ(third.value(), envelope); - EXPECT_EQ(stream.get_buffer().reader_index(), 0U); + const uint32_t third_reader_index = stream.get_buffer().reader_index(); + EXPECT_GT(third_reader_index, second_reader_index); EXPECT_EQ(stream.get_buffer().remaining_size(), 0U); } diff --git a/cpp/fory/util/buffer.h b/cpp/fory/util/buffer.h index 0143c58b69..a9dc0e1d2d 100644 --- a/cpp/fory/util/buffer.h +++ b/cpp/fory/util/buffer.h @@ -135,6 +135,10 @@ class Buffer { output_stream_ = nullptr; } + // Best-effort stream buffer compaction entry point. + // Stage 1 guard: avoid calling into stream shrinking for very small progress. + // Stage 2 guard lives in InputStream::shrink_buffer(), which can decide based + // on stream-specific configured buffer size. FORY_ALWAYS_INLINE void shrink_input_buffer() { if (FORY_PREDICT_FALSE(input_stream_ != nullptr && reader_index_ > 4096)) { input_stream_->shrink_buffer(); diff --git a/cpp/fory/util/buffer_test.cc b/cpp/fory/util/buffer_test.cc index b227505767..97043a4fd7 100644 --- a/cpp/fory/util/buffer_test.cc +++ b/cpp/fory/util/buffer_test.cc @@ -19,6 +19,8 @@ #include #include +#include +#include #include #include @@ -330,6 +332,33 @@ TEST(Buffer, StreamSkipAndUnread) { EXPECT_EQ(view.reader_index(), 2U); } +TEST(Buffer, StreamShrinkBufferBestEffortUsesConfiguredBufferSize) { + constexpr uint32_t kConfiguredBufferSize = 32768; + constexpr uint32_t kPayloadSize = kConfiguredBufferSize * 2; + std::string payload(kPayloadSize, '\x7'); + std::istringstream source(payload); + StdInputStream stream(source, kConfiguredBufferSize); + Buffer reader(stream); + Error error; + + reader.skip(5000, error); + ASSERT_TRUE(error.ok()) << error.to_string(); + EXPECT_EQ(reader.reader_index(), 5000U); + + // Below configured input buffer size, shrink should be a no-op. + reader.shrink_input_buffer(); + EXPECT_EQ(reader.reader_index(), 5000U); + + reader.skip(kConfiguredBufferSize, error); + ASSERT_TRUE(error.ok()) << error.to_string(); + ASSERT_GT(reader.reader_index(), kConfiguredBufferSize); + + const uint32_t remaining_before = reader.remaining_size(); + reader.shrink_input_buffer(); + EXPECT_EQ(reader.reader_index(), 0U); + EXPECT_EQ(reader.size(), remaining_before); +} + TEST(Buffer, StreamReadErrorWhenInsufficientData) { std::vector raw{0x01, 0x02, 0x03}; OneByteIStream one_byte_stream(raw); diff --git a/cpp/fory/util/stream.cc b/cpp/fory/util/stream.cc index 0d06ec7dee..d694c3f842 100644 --- a/cpp/fory/util/stream.cc +++ b/cpp/fory/util/stream.cc @@ -204,6 +204,14 @@ void StdInputStream::shrink_buffer() { } const uint32_t read_pos = buffer_->reader_index_; + // Best-effort policy: + // 1) keep a hard 4096-byte floor to avoid tiny frequent compactions; + // 2) for larger configured input buffers, require at least one full initial + // buffer worth of consumed bytes before moving unread data. + if (FORY_PREDICT_TRUE(read_pos <= 4096 || read_pos < initial_buffer_size_)) { + return; + } + const uint32_t remaining = remaining_size(); if (read_pos > 0) { if (remaining > 0) { diff --git a/cpp/fory/util/stream.h b/cpp/fory/util/stream.h index c97ceafe04..0927adc270 100644 --- a/cpp/fory/util/stream.h +++ b/cpp/fory/util/stream.h @@ -133,6 +133,9 @@ class InputStream : public std::enable_shared_from_this { virtual Result unread(uint32_t size) = 0; + // Best-effort input-buffer compaction/reclaim hook. Callers may invoke this + // frequently; implementations should return quickly unless configured + // compaction thresholds are met. virtual void shrink_buffer() = 0; virtual Buffer &get_buffer() = 0; diff --git a/python/pyfory/cpp/pyfory.cc b/python/pyfory/cpp/pyfory.cc index 217ff20d98..0f35f8a640 100644 --- a/python/pyfory/cpp/pyfory.cc +++ b/python/pyfory/cpp/pyfory.cc @@ -366,6 +366,14 @@ class PyInputStream final : public InputStream { } const uint32_t read_pos = buffer_->reader_index_; + // Keep Python-backed InputStream shrink behavior aligned with C++: + // best-effort compaction only after both the global floor (4096) and the + // configured stream buffer size threshold are crossed. + if (FORY_PREDICT_TRUE(read_pos <= 4096 || + read_pos < initial_buffer_size_)) { + return; + } + const uint32_t remaining = remaining_size(); if (read_pos > 0) { if (remaining > 0) { From 2ebd3d38ffb39c78658c0391ac05c0d95bc0efe2 Mon Sep 17 00:00:00 2001 From: chaokunyang Date: Wed, 4 Mar 2026 12:05:41 +0800 Subject: [PATCH 5/5] Avoid stream buffer aliasing in read_buffer_object --- python/pyfory/_fory.py | 4 ++++ python/pyfory/buffer.pxi | 3 +++ python/pyfory/includes/libutil.pxd | 2 ++ python/pyfory/serialization.pyx | 4 ++++ python/pyfory/tests/test_stream.py | 34 ++++++++++++++++++++++++++++++ 5 files changed, 47 insertions(+) diff --git a/python/pyfory/_fory.py b/python/pyfory/_fory.py index 2ccb8115b2..b57f639da2 100644 --- a/python/pyfory/_fory.py +++ b/python/pyfory/_fory.py @@ -698,6 +698,8 @@ def write_buffer_object(self, buffer, buffer_object: BufferObject): def read_buffer_object(self, buffer) -> Buffer: if not self.is_peer_out_of_band_enabled: size = buffer.read_var_uint32() + if buffer.has_input_stream(): + return buffer.read_bytes(size) reader_index = buffer.get_reader_index() buf = buffer.slice(reader_index, size) buffer.set_reader_index(reader_index + size) @@ -707,6 +709,8 @@ def read_buffer_object(self, buffer) -> Buffer: assert self._buffers is not None return next(self._buffers) size = buffer.read_var_uint32() + if buffer.has_input_stream(): + return buffer.read_bytes(size) reader_index = buffer.get_reader_index() buf = buffer.slice(reader_index, size) buffer.set_reader_index(reader_index + size) diff --git a/python/pyfory/buffer.pxi b/python/pyfory/buffer.pxi index d60392eb61..2b98353f48 100644 --- a/python/pyfory/buffer.pxi +++ b/python/pyfory/buffer.pxi @@ -247,6 +247,9 @@ cdef class Buffer: cpdef inline void shrink_input_buffer(self): self.c_buffer.shrink_input_buffer() + cpdef inline c_bool has_input_stream(self): + return self.c_buffer.has_input_stream() + cpdef c_bool own_data(self): return self.c_buffer.own_data() diff --git a/python/pyfory/includes/libutil.pxd b/python/pyfory/includes/libutil.pxd index 097ed3ae88..3371b0c023 100644 --- a/python/pyfory/includes/libutil.pxd +++ b/python/pyfory/includes/libutil.pxd @@ -205,6 +205,8 @@ cdef extern from "fory/util/buffer.h" namespace "fory" nogil: void skip(uint32_t length, CError& error) + c_bool has_input_stream() const + void shrink_input_buffer() void copy(uint32_t start, uint32_t nbytes, diff --git a/python/pyfory/serialization.pyx b/python/pyfory/serialization.pyx index 4cfe64894a..dea890fbc0 100644 --- a/python/pyfory/serialization.pyx +++ b/python/pyfory/serialization.pyx @@ -1630,6 +1630,8 @@ cdef class Fory: cdef Buffer buf if not self.is_peer_out_of_band_enabled: size = buffer.read_var_uint32() + if buffer.has_input_stream(): + return buffer.read_bytes(size) reader_index = buffer.get_reader_index() buf = buffer.slice(reader_index, size) buffer.set_reader_index(reader_index + size) @@ -1639,6 +1641,8 @@ cdef class Fory: assert self._buffers is not None return next(self._buffers) size = buffer.read_var_uint32() + if buffer.has_input_stream(): + return buffer.read_bytes(size) reader_index = buffer.get_reader_index() buf = buffer.slice(reader_index, size) buffer.set_reader_index(reader_index + size) diff --git a/python/pyfory/tests/test_stream.py b/python/pyfory/tests/test_stream.py index 43ec54d3e6..efa28d6228 100644 --- a/python/pyfory/tests/test_stream.py +++ b/python/pyfory/tests/test_stream.py @@ -17,6 +17,7 @@ from dataclasses import dataclass import io +import pickle import pytest @@ -112,6 +113,12 @@ class StreamStructValue: values: list +@dataclass +class StreamPickleBufferValue: + idx: int + payload: pickle.PickleBuffer + + @pytest.mark.parametrize("xlang", [False, True]) def test_stream_roundtrip_primitives_and_strings(xlang): fory = pyfory.Fory(xlang=xlang, ref=True) @@ -203,6 +210,33 @@ def test_stream_backed_buffer_struct_deserialize_shrinks_each_struct(xlang): assert reader.get_reader_index() == 0 +def test_stream_backed_buffer_pickle_buffer_not_corrupted_after_next_struct(): + fory = pyfory.Fory(xlang=False, ref=True, strict=False) + fory.register(StreamPickleBufferValue) + first_payload = b"a" * 7000 + second_payload = b"b" * 7000 + first = StreamPickleBufferValue(101, pickle.PickleBuffer(first_payload)) + second = StreamPickleBufferValue(202, pickle.PickleBuffer(second_payload)) + + writer = Buffer.allocate(32768) + fory.serialize(first, writer) + fory.serialize(second, writer) + stream_data = writer.get_bytes(0, writer.get_writer_index()) + reader = Buffer.from_stream(io.BytesIO(stream_data), 4096) + + first_result = fory.deserialize(reader) + assert first_result.idx == first.idx + assert bytes(first_result.payload.raw()) == first_payload + + second_result = fory.deserialize(reader) + assert second_result.idx == second.idx + assert bytes(second_result.payload.raw()) == second_payload + + # Ensure previously returned zero-copy-like payloads remain stable even + # after later stream reads trigger shrink logic. + assert bytes(first_result.payload.raw()) == first_payload + + @pytest.mark.parametrize("xlang", [False, True]) def test_stream_deserialize_truncated_error(xlang): fory = pyfory.Fory(xlang=xlang, ref=True)