Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
3d860e5
feat(python,cpp): add streaming serialization APIs and flush barriers
chaokunyang Mar 3, 2026
0e4ee28
test(python): remove unused local in varuint64 helper
chaokunyang Mar 3, 2026
738de7a
fix(python): initialize serialization mask byte for stream parity
chaokunyang Mar 3, 2026
0cd5593
Refactor StreamWriter-owned streaming buffer and flush state
chaokunyang Mar 3, 2026
6fff7a7
Set WriteContext buffer_ explicitly for stream serialize path
chaokunyang Mar 3, 2026
07d210d
cpp: use single WriteContext buffer for stream serialization
chaokunyang Mar 3, 2026
e788769
cpp: inline simple StreamWriter methods in header
chaokunyang Mar 3, 2026
e72417b
python: route stream dump through wrapped stream writer
chaokunyang Mar 3, 2026
af9f02f
cpp: align stream try_flush threshold checks
chaokunyang Mar 3, 2026
6c24a71
python: rename stream binding API to bind_stream_writer
chaokunyang Mar 3, 2026
2e1a553
python: forward flush barrier through stream writer
chaokunyang Mar 3, 2026
1cab8d0
rename stream abstractions to output/input types
chaokunyang Mar 3, 2026
b6c62c1
fix(python): preserve stream source buffer on blocked try_flush
chaokunyang Mar 3, 2026
a1aead9
refactor(python): remove flush buffer args and use bound output buffer
chaokunyang Mar 3, 2026
5b6603e
refactor(python): bind output stream to same buffer
chaokunyang Mar 3, 2026
36d0c2d
refactor(stream): inline flush threshold checks in output path
chaokunyang Mar 3, 2026
87540d7
style(python): apply formatter output for stream tests
chaokunyang Mar 3, 2026
341af7e
fix output stream rebind backlink and document dump sink contract
chaokunyang Mar 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/fory/serialization/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,8 @@ void WriteContext::reset() {
has_first_type_info_ = false;
type_info_index_map_active_ = false;
current_dyn_depth_ = 0;
buffer_.clear_output_stream();
output_stream_ = nullptr;
// reset buffer indices for reuse - no memory operations needed
buffer_.writer_index(0);
buffer_.reader_index(0);
Expand Down
68 changes: 55 additions & 13 deletions cpp/fory/serialization/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ class WriteContext {
/// get const reference to internal output buffer.
inline const Buffer &buffer() const { return buffer_; }

inline void set_output_stream(OutputStream *output_stream) {
output_stream_ = output_stream;
}

/// get reference writer for tracking shared references.
inline RefWriter &ref_writer() { return ref_writer_; }

Expand Down Expand Up @@ -167,70 +171,107 @@ class WriteContext {
}
}

inline uint32_t flush_barrier_depth() const {
return output_stream_ == nullptr ? 0
: output_stream_->flush_barrier_depth();
}

inline void enter_flush_barrier() {
if (output_stream_ != nullptr) {
output_stream_->enter_flush_barrier();
}
}

inline void exit_flush_barrier() {
if (output_stream_ != nullptr) {
output_stream_->exit_flush_barrier();
}
}

inline void try_flush() {
if (output_stream_ == nullptr || buffer_.writer_index() <= 4096) {
return;
}
output_stream_->try_flush();
if (FORY_PREDICT_FALSE(output_stream_->has_error())) {
set_error(output_stream_->error());
}
}

inline void force_flush() {
if (output_stream_ == nullptr) {
return;
}
output_stream_->force_flush();
if (FORY_PREDICT_FALSE(output_stream_->has_error())) {
set_error(output_stream_->error());
}
}

/// write uint8_t value to buffer.
FORY_ALWAYS_INLINE void write_uint8(uint8_t value) {
buffer().write_uint8(value);
buffer_.write_uint8(value);
}

/// write int8_t value to buffer.
FORY_ALWAYS_INLINE void write_int8(int8_t value) {
buffer().write_int8(value);
buffer_.write_int8(value);
}

/// write uint16_t value to buffer.
FORY_ALWAYS_INLINE void write_uint16(uint16_t value) {
buffer().write_uint16(value);
buffer_.write_uint16(value);
}

/// write uint32_t value to buffer.
FORY_ALWAYS_INLINE void write_uint32(uint32_t value) {
buffer().write_uint32(value);
buffer_.write_uint32(value);
}

/// write int64_t value to buffer.
FORY_ALWAYS_INLINE void write_int64(int64_t value) {
buffer().write_int64(value);
buffer_.write_int64(value);
}

/// write uint32_t value as varint to buffer.
FORY_ALWAYS_INLINE void write_var_uint32(uint32_t value) {
buffer().write_var_uint32(value);
buffer_.write_var_uint32(value);
}

/// write int32_t value as zigzag varint to buffer.
FORY_ALWAYS_INLINE void write_varint32(int32_t value) {
buffer().write_var_int32(value);
buffer_.write_var_int32(value);
}

/// write uint64_t value as varint to buffer.
FORY_ALWAYS_INLINE void write_var_uint64(uint64_t value) {
buffer().write_var_uint64(value);
buffer_.write_var_uint64(value);
}

/// write int64_t value as zigzag varint to buffer.
FORY_ALWAYS_INLINE void write_varint64(int64_t value) {
buffer().write_var_int64(value);
buffer_.write_var_int64(value);
}

/// write uint64_t value using tagged encoding to buffer.
FORY_ALWAYS_INLINE void write_tagged_uint64(uint64_t value) {
buffer().write_tagged_uint64(value);
buffer_.write_tagged_uint64(value);
}

/// write int64_t value using tagged encoding to buffer.
FORY_ALWAYS_INLINE void write_tagged_int64(int64_t value) {
buffer().write_tagged_int64(value);
buffer_.write_tagged_int64(value);
}

/// write uint64_t value as varuint36small to buffer.
/// This is the special variable-length encoding used for string headers.
FORY_ALWAYS_INLINE void write_var_uint36_small(uint64_t value) {
buffer().write_var_uint36_small(value);
buffer_.write_var_uint36_small(value);
}

/// write raw bytes to buffer.
FORY_ALWAYS_INLINE void write_bytes(const void *data, uint32_t length) {
buffer().write_bytes(data, length);
buffer_.write_bytes(data, length);
}

/// write TypeMeta inline using streaming protocol.
Expand Down Expand Up @@ -329,6 +370,7 @@ class WriteContext {
std::unique_ptr<TypeResolver> type_resolver_;
RefWriter ref_writer_;
uint32_t current_dyn_depth_;
OutputStream *output_stream_ = nullptr;

// Meta sharing state (for streaming inline TypeMeta)
// Maps TypeInfo* to index for reference tracking - uses map size as counter
Expand Down
87 changes: 73 additions & 14 deletions cpp/fory/serialization/fory.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <cstring>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <utility>
#include <vector>
Expand Down Expand Up @@ -501,6 +502,52 @@ class Fory : public BaseFory {
return result;
}

/// Serialize an object to an output stream.
///
/// @tparam T The type of object to serialize.
/// @param output_stream The output stream.
/// @param obj The object to serialize.
/// @return Number of bytes written, or error.
template <typename T>
Result<size_t, Error> serialize(OutputStream &output_stream, const T &obj) {
if (FORY_PREDICT_FALSE(!finalized_)) {
ensure_finalized();
}
WriteContextGuard guard(*write_ctx_);
output_stream.reset();
write_ctx_->set_output_stream(&output_stream);
Buffer &buffer = write_ctx_->buffer();
buffer.bind_output_stream(&output_stream);
auto serialize_result = serialize_impl(obj, buffer);
if (FORY_PREDICT_FALSE(!serialize_result.ok())) {
buffer.clear_output_stream();
write_ctx_->set_output_stream(nullptr);
return Unexpected(std::move(serialize_result).error());
}
output_stream.force_flush();
buffer.clear_output_stream();
write_ctx_->set_output_stream(nullptr);
if (FORY_PREDICT_FALSE(output_stream.has_error())) {
return Unexpected(output_stream.error());
}
if (FORY_PREDICT_FALSE(write_ctx_->has_error())) {
return Unexpected(write_ctx_->take_error());
}
return output_stream.flushed_bytes();
}

/// Serialize an object to a std::ostream.
///
/// @tparam T The type of object to serialize.
/// @param ostream The output stream.
/// @param obj The object to serialize.
/// @return Number of bytes written, or error.
template <typename T>
Result<size_t, Error> serialize(std::ostream &ostream, const T &obj) {
StdOutputStream output_stream(ostream);
return serialize(output_stream, obj);
}

/// Serialize an object to an existing Buffer (fastest path).
///
/// @tparam T The type of object to serialize.
Expand Down Expand Up @@ -627,36 +674,36 @@ class Fory : public BaseFory {
return deserialize_impl<T>(buffer);
}

/// Deserialize an object from a stream reader.
/// Deserialize an object from an input stream.
///
/// This overload obtains the reader-owned Buffer via get_buffer() and
/// continues deserialization on that buffer.
///
/// @tparam T The type of object to deserialize.
/// @param stream_reader Stream reader to read from.
/// @param input_stream Input stream to read from.
/// @return Deserialized object, or error.
template <typename T>
Result<T, Error> deserialize(StreamReader &stream_reader) {
Result<T, Error> deserialize(InputStream &input_stream) {
struct StreamShrinkGuard {
StreamReader *stream_reader = nullptr;
InputStream *input_stream = nullptr;
~StreamShrinkGuard() {
if (stream_reader != nullptr) {
stream_reader->shrink_buffer();
if (input_stream != nullptr) {
input_stream->shrink_buffer();
}
}
};
StreamShrinkGuard shrink_guard{&stream_reader};
Buffer &buffer = stream_reader.get_buffer();
StreamShrinkGuard shrink_guard{&input_stream};
Buffer &buffer = input_stream.get_buffer();
return deserialize<T>(buffer);
}

/// Deserialize an object from ForyInputStream.
/// Deserialize an object from StdInputStream.
///
/// @tparam T The type of object to deserialize.
/// @param stream Input stream wrapper to read from.
/// @return Deserialized object, or error.
template <typename T> Result<T, Error> deserialize(ForyInputStream &stream) {
return deserialize<T>(static_cast<StreamReader &>(stream));
template <typename T> Result<T, Error> deserialize(StdInputStream &stream) {
return deserialize<T>(static_cast<InputStream &>(stream));
}

// ==========================================================================
Expand Down Expand Up @@ -805,6 +852,18 @@ class ThreadSafeFory : public BaseFory {
return fory_handle->serialize(obj);
}

template <typename T>
Result<size_t, Error> serialize(OutputStream &output_stream, const T &obj) {
auto fory_handle = fory_pool_.acquire();
return fory_handle->serialize(output_stream, obj);
}

template <typename T>
Result<size_t, Error> serialize(std::ostream &ostream, const T &obj) {
auto fory_handle = fory_pool_.acquire();
return fory_handle->serialize(ostream, obj);
}

template <typename T>
Result<size_t, Error> serialize_to(Buffer &buffer, const T &obj) {
auto fory_handle = fory_pool_.acquire();
Expand All @@ -830,12 +889,12 @@ class ThreadSafeFory : public BaseFory {
}

template <typename T>
Result<T, Error> deserialize(StreamReader &stream_reader) {
Result<T, Error> deserialize(InputStream &input_stream) {
auto fory_handle = fory_pool_.acquire();
return fory_handle->template deserialize<T>(stream_reader);
return fory_handle->template deserialize<T>(input_stream);
}

template <typename T> Result<T, Error> deserialize(ForyInputStream &stream) {
template <typename T> Result<T, Error> deserialize(StdInputStream &stream) {
auto fory_handle = fory_pool_.acquire();
return fory_handle->template deserialize<T>(stream);
}
Expand Down
13 changes: 13 additions & 0 deletions cpp/fory/serialization/map_serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ inline void write_map_data_fast(const MapType &map, WriteContext &ctx,
// If nullability is needed, use the slow path

if (need_write_header) {
ctx.enter_flush_barrier();
// reserve space for header (1 byte) + chunk size (1 byte)
header_offset = ctx.buffer().writer_index();
ctx.write_uint16(0); // Placeholder for header and chunk size
Expand Down Expand Up @@ -174,6 +175,8 @@ inline void write_map_data_fast(const MapType &map, WriteContext &ctx,
pair_counter++;
if (pair_counter == MAX_CHUNK_SIZE) {
write_chunk_size(ctx, header_offset, pair_counter);
ctx.exit_flush_barrier();
ctx.try_flush();
pair_counter = 0;
need_write_header = true;
}
Expand All @@ -182,6 +185,8 @@ inline void write_map_data_fast(const MapType &map, WriteContext &ctx,
// write final chunk size
if (pair_counter > 0) {
write_chunk_size(ctx, header_offset, pair_counter);
ctx.exit_flush_barrier();
ctx.try_flush();
}
}

Expand Down Expand Up @@ -238,6 +243,7 @@ inline void write_map_data_slow(const MapType &map, WriteContext &ctx,
// Finish current chunk if any
if (pair_counter > 0) {
write_chunk_size(ctx, header_offset, pair_counter);
ctx.exit_flush_barrier();
pair_counter = 0;
need_write_header = true;
}
Expand Down Expand Up @@ -394,9 +400,12 @@ inline void write_map_data_slow(const MapType &map, WriteContext &ctx,
// Finish previous chunk if types changed
if (types_changed && pair_counter > 0) {
write_chunk_size(ctx, header_offset, pair_counter);
ctx.exit_flush_barrier();
ctx.try_flush();
pair_counter = 0;
}

ctx.enter_flush_barrier();
// write new chunk header
header_offset = ctx.buffer().writer_index();
ctx.write_uint16(0); // Placeholder for header and chunk size
Expand Down Expand Up @@ -513,6 +522,8 @@ inline void write_map_data_slow(const MapType &map, WriteContext &ctx,
pair_counter++;
if (pair_counter == MAX_CHUNK_SIZE) {
write_chunk_size(ctx, header_offset, pair_counter);
ctx.exit_flush_barrier();
ctx.try_flush();
pair_counter = 0;
need_write_header = true;
current_key_type_info = nullptr;
Expand All @@ -523,6 +534,8 @@ inline void write_map_data_slow(const MapType &map, WriteContext &ctx,
// write final chunk size
if (pair_counter > 0) {
write_chunk_size(ctx, header_offset, pair_counter);
ctx.exit_flush_barrier();
ctx.try_flush();
}
}

Expand Down
Loading
Loading