Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 50 additions & 3 deletions cpp/fory/serialization/stream_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <istream>
#include <map>
#include <memory>
#include <sstream>
#include <streambuf>
#include <string>
#include <utility>
Expand Down Expand Up @@ -218,17 +219,20 @@ TEST(StreamSerializationTest, SequentialDeserializeFromSingleStream) {
auto first = fory.deserialize<int32_t>(stream);
ASSERT_TRUE(first.ok()) << first.error().to_string();
EXPECT_EQ(first.value(), 12345);
EXPECT_EQ(stream.get_buffer().reader_index(), 0U);
const uint32_t first_reader_index = stream.get_buffer().reader_index();
EXPECT_GT(first_reader_index, 0U);

auto second = fory.deserialize<std::string>(stream);
ASSERT_TRUE(second.ok()) << second.error().to_string();
EXPECT_EQ(second.value(), "next-value");
EXPECT_EQ(stream.get_buffer().reader_index(), 0U);
const uint32_t second_reader_index = stream.get_buffer().reader_index();
EXPECT_GT(second_reader_index, first_reader_index);

auto third = fory.deserialize<StreamEnvelope>(stream);
ASSERT_TRUE(third.ok()) << third.error().to_string();
EXPECT_EQ(third.value(), envelope);
EXPECT_EQ(stream.get_buffer().reader_index(), 0U);
const uint32_t third_reader_index = stream.get_buffer().reader_index();
EXPECT_GT(third_reader_index, second_reader_index);

EXPECT_EQ(stream.get_buffer().remaining_size(), 0U);
}
Expand Down Expand Up @@ -310,6 +314,49 @@ TEST(StreamSerializationTest, SerializeToOStreamOverloadParity) {
EXPECT_EQ(out.data(), expected.value());
}

TEST(StreamSerializationTest,
StructDeserializeFromStreamBackedBufferShrinksAfterEachStruct) {
auto fory = Fory::builder().xlang(true).track_ref(true).build();
register_stream_types(fory);

std::vector<int32_t> first_values;
std::vector<int32_t> second_values;
first_values.reserve(6000);
second_values.reserve(6000);
for (int32_t i = 0; i < 6000; ++i) {
first_values.push_back(i);
second_values.push_back(6000 - i);
}

StreamEnvelope first{
"first", std::move(first_values), {{"a", 11}, {"b", 22}}, {7, 8}, true,
};
StreamEnvelope second{
"second", std::move(second_values), {{"c", 33}, {"d", 44}}, {9, 10},
false,
};

std::vector<uint8_t> bytes;
ASSERT_TRUE(fory.serialize_to(bytes, first).ok());
ASSERT_TRUE(fory.serialize_to(bytes, second).ok());

std::string payload(reinterpret_cast<const char *>(bytes.data()),
bytes.size());
std::istringstream source(payload);
StdInputStream stream(source, 4096);
Buffer &buffer = stream.get_buffer();

auto first_result = fory.deserialize<StreamEnvelope>(buffer);
ASSERT_TRUE(first_result.ok()) << first_result.error().to_string();
EXPECT_EQ(first_result.value(), first);
EXPECT_EQ(buffer.reader_index(), 0U);

auto second_result = fory.deserialize<StreamEnvelope>(buffer);
ASSERT_TRUE(second_result.ok()) << second_result.error().to_string();
EXPECT_EQ(second_result.value(), second);
EXPECT_EQ(buffer.reader_index(), 0U);
}

} // namespace test
} // namespace serialization
} // namespace fory
8 changes: 6 additions & 2 deletions cpp/fory/serialization/struct_serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -2773,7 +2773,7 @@ void read_struct_fields_impl(T &obj, ReadContext &ctx,
// Note: varint bounds checking is done per-byte during reading since
// varint lengths are variable (actual size << max possible size)
if constexpr (varint_count > 0) {
if (FORY_PREDICT_FALSE(buffer.is_stream_backed())) {
if (FORY_PREDICT_FALSE(buffer.has_input_stream())) {
// Stream-backed buffers may not have all varint bytes materialized yet.
// Fall back to per-field readers that propagate stream read errors.
read_remaining_fields<T, fixed_count, total_count>(obj, ctx);
Expand Down Expand Up @@ -2828,7 +2828,7 @@ read_struct_fields_impl_fast(T &obj, ReadContext &ctx,

// Phase 2: Read consecutive varint primitives (int32, int64) if any
if constexpr (varint_count > 0) {
if (FORY_PREDICT_FALSE(buffer.is_stream_backed())) {
if (FORY_PREDICT_FALSE(buffer.has_input_stream())) {
// Stream-backed buffers may not have all varint bytes materialized yet.
// Fall back to per-field readers that propagate stream read errors.
read_remaining_fields<T, fixed_count, total_count>(obj, ctx);
Expand Down Expand Up @@ -3270,6 +3270,7 @@ struct Serializer<T, std::enable_if_t<is_fory_serializable_v<T>>> {
if (FORY_PREDICT_FALSE(ctx.has_error())) {
return T{};
}
ctx.buffer().shrink_input_buffer();
return obj;
}

Expand All @@ -3279,6 +3280,7 @@ struct Serializer<T, std::enable_if_t<is_fory_serializable_v<T>>> {
if (FORY_PREDICT_FALSE(ctx.has_error())) {
return T{};
}
ctx.buffer().shrink_input_buffer();
return obj;
}

Expand All @@ -3290,6 +3292,7 @@ struct Serializer<T, std::enable_if_t<is_fory_serializable_v<T>>> {
return T{};
}

ctx.buffer().shrink_input_buffer();
return obj;
}

Expand Down Expand Up @@ -3333,6 +3336,7 @@ struct Serializer<T, std::enable_if_t<is_fory_serializable_v<T>>> {
return T{};
}

ctx.buffer().shrink_input_buffer();
return obj;
}

Expand Down
12 changes: 8 additions & 4 deletions cpp/fory/util/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ class Buffer {

FORY_ALWAYS_INLINE bool own_data() const { return own_data_; }

FORY_ALWAYS_INLINE bool is_stream_backed() const {
FORY_ALWAYS_INLINE bool has_input_stream() const {
return input_stream_ != nullptr;
}

FORY_ALWAYS_INLINE bool is_output_stream_backed() const {
FORY_ALWAYS_INLINE bool has_output_stream() const {
return output_stream_ != nullptr;
}

Expand All @@ -135,8 +135,12 @@ class Buffer {
output_stream_ = nullptr;
}

FORY_ALWAYS_INLINE void shrink_stream_buffer() {
if (input_stream_ != nullptr) {
// Best-effort stream buffer compaction entry point.
// Stage 1 guard: avoid calling into stream shrinking for very small progress.
// Stage 2 guard lives in InputStream::shrink_buffer(), which can decide based
// on stream-specific configured buffer size.
FORY_ALWAYS_INLINE void shrink_input_buffer() {
if (FORY_PREDICT_FALSE(input_stream_ != nullptr && reader_index_ > 4096)) {
input_stream_->shrink_buffer();
}
}
Expand Down
33 changes: 31 additions & 2 deletions cpp/fory/util/buffer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include <iostream>
#include <limits>
#include <sstream>
#include <string>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -330,6 +332,33 @@ TEST(Buffer, StreamSkipAndUnread) {
EXPECT_EQ(view.reader_index(), 2U);
}

TEST(Buffer, StreamShrinkBufferBestEffortUsesConfiguredBufferSize) {
constexpr uint32_t kConfiguredBufferSize = 32768;
constexpr uint32_t kPayloadSize = kConfiguredBufferSize * 2;
std::string payload(kPayloadSize, '\x7');
std::istringstream source(payload);
StdInputStream stream(source, kConfiguredBufferSize);
Buffer reader(stream);
Error error;

reader.skip(5000, error);
ASSERT_TRUE(error.ok()) << error.to_string();
EXPECT_EQ(reader.reader_index(), 5000U);

// Below configured input buffer size, shrink should be a no-op.
reader.shrink_input_buffer();
EXPECT_EQ(reader.reader_index(), 5000U);

reader.skip(kConfiguredBufferSize, error);
ASSERT_TRUE(error.ok()) << error.to_string();
ASSERT_GT(reader.reader_index(), kConfiguredBufferSize);

const uint32_t remaining_before = reader.remaining_size();
reader.shrink_input_buffer();
EXPECT_EQ(reader.reader_index(), 0U);
EXPECT_EQ(reader.size(), remaining_before);
}

TEST(Buffer, StreamReadErrorWhenInsufficientData) {
std::vector<uint8_t> raw{0x01, 0x02, 0x03};
OneByteIStream one_byte_stream(raw);
Expand Down Expand Up @@ -398,8 +427,8 @@ TEST(Buffer, OutputStreamRebindDetachesPreviousBufferBacklink) {

first->bind_output_stream(&writer);
second->bind_output_stream(&writer);
EXPECT_FALSE(first->is_output_stream_backed());
EXPECT_TRUE(second->is_output_stream_backed());
EXPECT_FALSE(first->has_output_stream());
EXPECT_TRUE(second->has_output_stream());

writer.enter_flush_barrier();
std::vector<uint8_t> second_payload(5000, 7);
Expand Down
8 changes: 8 additions & 0 deletions cpp/fory/util/stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,14 @@ void StdInputStream::shrink_buffer() {
}

const uint32_t read_pos = buffer_->reader_index_;
// Best-effort policy:
// 1) keep a hard 4096-byte floor to avoid tiny frequent compactions;
// 2) for larger configured input buffers, require at least one full initial
// buffer worth of consumed bytes before moving unread data.
if (FORY_PREDICT_TRUE(read_pos <= 4096 || read_pos < initial_buffer_size_)) {
return;
}

const uint32_t remaining = remaining_size();
if (read_pos > 0) {
if (remaining > 0) {
Expand Down
3 changes: 3 additions & 0 deletions cpp/fory/util/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ class InputStream : public std::enable_shared_from_this<InputStream> {

virtual Result<void, Error> unread(uint32_t size) = 0;

// Best-effort input-buffer compaction/reclaim hook. Callers may invoke this
// frequently; implementations should return quickly unless configured
// compaction thresholds are met.
virtual void shrink_buffer() = 0;

virtual Buffer &get_buffer() = 0;
Expand Down
4 changes: 4 additions & 0 deletions python/pyfory/_fory.py
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,8 @@ def write_buffer_object(self, buffer, buffer_object: BufferObject):
def read_buffer_object(self, buffer) -> Buffer:
if not self.is_peer_out_of_band_enabled:
size = buffer.read_var_uint32()
if buffer.has_input_stream():
return buffer.read_bytes(size)
reader_index = buffer.get_reader_index()
buf = buffer.slice(reader_index, size)
buffer.set_reader_index(reader_index + size)
Expand All @@ -707,6 +709,8 @@ def read_buffer_object(self, buffer) -> Buffer:
assert self._buffers is not None
return next(self._buffers)
size = buffer.read_var_uint32()
if buffer.has_input_stream():
return buffer.read_bytes(size)
reader_index = buffer.get_reader_index()
buf = buffer.slice(reader_index, size)
buffer.set_reader_index(reader_index + size)
Expand Down
6 changes: 6 additions & 0 deletions python/pyfory/buffer.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,12 @@ cdef class Buffer:
raise ValueError("writer_index must be >= 0")
self.c_buffer.writer_index(<uint32_t>value)

cpdef inline void shrink_input_buffer(self):
self.c_buffer.shrink_input_buffer()

cpdef inline c_bool has_input_stream(self):
return self.c_buffer.has_input_stream()

cpdef c_bool own_data(self):
return self.c_buffer.own_data()

Expand Down
12 changes: 10 additions & 2 deletions python/pyfory/cpp/pyfory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,14 @@ class PyInputStream final : public InputStream {
}

const uint32_t read_pos = buffer_->reader_index_;
// Keep Python-backed InputStream shrink behavior aligned with C++:
// best-effort compaction only after both the global floor (4096) and the
// configured stream buffer size threshold are crossed.
if (FORY_PREDICT_TRUE(read_pos <= 4096 ||
read_pos < initial_buffer_size_)) {
return;
}

const uint32_t remaining = remaining_size();
if (read_pos > 0) {
if (remaining > 0) {
Expand Down Expand Up @@ -1491,13 +1499,13 @@ int Fory_PyPrimitiveCollectionReadFromBuffer(PyObject *collection,
"tuple collection size is smaller than requested read size");
return -1;
}
if (!buffer->is_stream_backed() && kind == PythonCollectionKind::List) {
if (!buffer->has_input_stream() && kind == PythonCollectionKind::List) {
return read_primitive_sequence_indexed(
buffer, size, type_id, [collection](Py_ssize_t i, PyObject *item) {
PyList_SET_ITEM(collection, i, item);
});
}
if (!buffer->is_stream_backed() && kind == PythonCollectionKind::Tuple) {
if (!buffer->has_input_stream() && kind == PythonCollectionKind::Tuple) {
return read_primitive_sequence_indexed(
buffer, size, type_id, [collection](Py_ssize_t i, PyObject *item) {
PyTuple_SET_ITEM(collection, i, item);
Expand Down
4 changes: 4 additions & 0 deletions python/pyfory/includes/libutil.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ cdef extern from "fory/util/buffer.h" namespace "fory" nogil:

void skip(uint32_t length, CError& error)

c_bool has_input_stream() const

void shrink_input_buffer()

void copy(uint32_t start, uint32_t nbytes,
uint8_t* out, uint32_t offset) const

Expand Down
4 changes: 4 additions & 0 deletions python/pyfory/serialization.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1630,6 +1630,8 @@ cdef class Fory:
cdef Buffer buf
if not self.is_peer_out_of_band_enabled:
size = buffer.read_var_uint32()
if buffer.has_input_stream():
return buffer.read_bytes(size)
reader_index = buffer.get_reader_index()
buf = buffer.slice(reader_index, size)
buffer.set_reader_index(reader_index + size)
Expand All @@ -1639,6 +1641,8 @@ cdef class Fory:
assert self._buffers is not None
return next(self._buffers)
size = buffer.read_var_uint32()
if buffer.has_input_stream():
return buffer.read_bytes(size)
reader_index = buffer.get_reader_index()
buf = buffer.slice(reader_index, size)
buffer.set_reader_index(reader_index + size)
Expand Down
1 change: 1 addition & 0 deletions python/pyfory/struct.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ cdef class DataClassSerializer(Serializer):
self._apply_missing_defaults_slots(obj)
else:
self._apply_missing_defaults_dict(obj.__dict__)
buffer.shrink_input_buffer()
return obj

cdef inline void _read_dict(self, Buffer buffer, object obj):
Expand Down
1 change: 1 addition & 0 deletions python/pyfory/struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ def read(self, buffer):
obj_dict[field_name] = value
else:
setattr(obj, field_name, value)
buffer.shrink_input_buffer()
return obj


Expand Down
Loading
Loading