feat(go): add go desrialization support via io streams#3374
feat(go): add go desrialization support via io streams#3374ayush00git wants to merge 30 commits intoapache:mainfrom
Conversation
|
Hey @chaokunyang |
|
hey @ayush00git, looked through this and the main issue i see is in func (f *Fory) DeserializeFromReader(r io.Reader, v any) error {
defer f.resetReadState()
f.readCtx.buffer.ResetWithReader(r, 0) // this wipes the prefetch window every timeso if fill() reads ahead past the first object boundary (which it will), those bytes for {
var msg Msg
f.DeserializeFromReader(conn, &msg) // bytes after first object get thrown away
}if you look at how he handles this for c++/python — the Buffer is constructed the go version probably needs something similar — a stream reader type that owns the Happy to discuss if I'm misreading the flow here |
|
Hiii @Zakir032002 |
|
hey @ayush00git , one more thing — ReadBinary and ReadBytes return a direct slice into the problem is fill() compacts the buffer in-place: so if someone reads a []byte field and holds onto that slice, then the next in stream mode you probably want to copy before returning instead of aliasing: in-memory path stays as is. |
|
also noticed — easiest fix is probably just routing the multi-byte case through Happy to discuss if I'm misreading the flow here |
|
Hey @Zakir032002 |
|
Hii @Zakir032002
But i think you misunderstood the if len(b.data)-readIdx >= 5 {
}If we are near a chunk boundary (less than 5 bytes remaining in the buffer), the execution completely skips |
…le stateful deserialization
…le stateful deserialization
|
I've added the |
|
Hii @chaokunyang |
|
Please take #3307 as reference to finish the remaining works. And create a Deseralize help methods in tests, then use that instead of Then run benchmarks/go to compare with asf/main to enure your code change don't introduce any performance regression. |
|
Hey @chaokunyang |
|
Hii @chaokunyang |
go/fory/fory.go
Outdated
| // It maintains the ByteBuffer and ReadContext state across multiple Deserialize calls, | ||
| // preventing data loss from prefetched buffers and preserving TypeResolver metadata | ||
| // (Meta Sharing) across object boundaries. | ||
| type StreamReader struct { |
There was a problem hiding this comment.
create a stream.go and put it there
go/fory/fory.go
Outdated
|
|
||
| // NewStreamReader creates a new StreamReader that reads from the provided io.Reader. | ||
| // The StreamReader owns the buffer and maintains state across sequential Deserialize calls. | ||
| func (f *Fory) NewStreamReader(r io.Reader) *StreamReader { |
There was a problem hiding this comment.
Looks really strange that the StreamReader is coupled with Fory.
Please read fory c++ implementation detaily and refine this PR, especially for API surface
There was a problem hiding this comment.
yaa in c++ the streamreader acted as a standalone type which had nothing to do with deserialization it just know to fetch bytes when needed, let me implement the same for go
|
@chaokunyang |
Why?
To enable stream-based deserialization in Fory's Go library, allowing for direct reading from
io.Readerwithout pre-buffering the entire payload. This improves efficiency for network and file-based transport and brings the Go implementation into feature-parity with the python and C++ libraries.What does this PR do?
1. Stream Infrastructure in
go/fory/buffer.goEnhanced
ByteBufferto supportio.Readerwith an internal sliding window and automatic filling.reader io.ReaderandminCap intfields.fill(n int, err *Error) boolfor on-demand data fetching and buffer compaction.CheckReadable(n)andSkip(n)memory-safe routines that pull from the underlying stream when necessary to avoid out-of-bounds panics.ReadBinaryandReadBytesto safely copy slices when streaming to prevent silent data corruption on compaction.Read*methods (fixed-size, varint, tagged) to fetch data from the reader safely if not cached.2. Stateful StreamReader in
go/fory/stream.goAdded the
StreamReaderfeature to support true, stateful sequential stream reads.StreamReaderwhich persists the buffered byte window and TypeResolver metadata (Meta Sharing) across multiple object decodes on the same stream, decoupled fromForyto mirror the C++ForyInputStreamimplementation.fory.DeserializeFromStream(sr, target)method to process continuous streamed data.DeserializeFromReadermethod as an API for simple one-off stream object reads.3. Stream-Safe Deserialization Paths
Updated internal deserialization pipelines in
struct.goandtype_def.goto be stream-safe:CheckReadablebounds-checking into thestruct.gofast paths for fixed-size primitives.skipTypeDef) intype_def.goto use bounds-checkedSkip()rather than unboundedreaderIndexoverrides.4. Comprehensive Stream Tests
oneByteReaderwrapper (go/fory/test_helper_test.go) that artificially feeds the deserialization engine exactly 1 byte at a time.struct_test.go,primitive_test.go,slice_primitive_test.go, etc.) to run all standard tests through this aggressive 1-byte fragmented stream reader via a newtestDeserializehelper to guarantee total stream robustness.Related issues
Closes #3302
Does this PR introduce any user-facing change?
NewStreamReader,DeserializeFromStream,DeserializeFromReader,NewByteBufferFromReader)Benchmark
Added boundaries checks did not cause performance regressions in the
structSerializerfast paths. Tested usingbenchstatagainst themainbranch.