Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 13 additions & 50 deletions go/pkg/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,19 @@ type FrameDecoder struct {
frameContentSrc ByteAndBlockReader
decompressedContentReader *bufio.Reader
decompressor *zstd.Decoder
chunkReader chunkedReader
limitedReader limitedReader
flags FrameFlags
frameLoaded bool
notFirstFrame bool
}

type chunkedReader struct {
src ByteAndBlockReader
limit int64
nextChunk func() error
// limitedReader wraps a ByteAndBlockReader and limits the number of bytes read.
type limitedReader struct {
src ByteAndBlockReader
limit int64
}

func (r *chunkedReader) readByte() (byte, error) {
func (r *limitedReader) ReadByte() (byte, error) {
if r.limit <= 0 {
return 0, io.EOF
}
Expand All @@ -117,25 +117,7 @@ func (r *chunkedReader) readByte() (byte, error) {
return b, err
}

func (r *chunkedReader) ReadByte() (byte, error) {
loop:
for {
b, err := r.readByte()
if err == nil {
return b, err
}
if err == io.EOF {
err = r.nextChunk()
if err != nil {
return 0, err
}
goto loop
}
return 0, err
}
}

func (r *chunkedReader) readBlock(p []byte) (n int, err error) {
func (r *limitedReader) Read(p []byte) (n int, err error) {
if r.limit <= 0 {
return 0, io.EOF
}
Expand All @@ -147,25 +129,7 @@ func (r *chunkedReader) readBlock(p []byte) (n int, err error) {
return
}

func (r *chunkedReader) Read(p []byte) (n int, err error) {
loop:
for {
n, err := r.readBlock(p)
if err == nil {
return n, err
}
if err == io.EOF {
err = r.nextChunk()
if err != nil {
return 0, err
}
goto loop
}
return 0, err
}
}

func (r *chunkedReader) Init(src ByteAndBlockReader) {
func (r *limitedReader) Init(src ByteAndBlockReader) {
r.src = src
}

Expand All @@ -176,12 +140,11 @@ const readBufSize = 64 * 1024
func (d *FrameDecoder) Init(src ByteAndBlockReader, compression Compression) error {
d.src = src
d.compression = compression
d.chunkReader.Init(src)
d.chunkReader.nextChunk = d.nextFrame
d.limitedReader.Init(src)

switch d.compression {
case CompressionNone:
d.frameContentSrc = &d.chunkReader
d.frameContentSrc = &d.limitedReader

case CompressionZstd:
var err error
Expand Down Expand Up @@ -222,19 +185,19 @@ func (d *FrameDecoder) nextFrame() error {
if err != nil {
return err
}
d.chunkReader.limit = int64(compressedSize)
d.limitedReader.limit = int64(compressedSize)

if !d.notFirstFrame || d.flags&RestartCompression != 0 {
d.notFirstFrame = true
if err := d.decompressor.Reset(&d.chunkReader); err != nil {
if err := d.decompressor.Reset(&d.limitedReader); err != nil {
return err
}
}

d.decompressedContentReader.Reset(d.decompressor)
} else {
compressedSize = uncompressedSize
d.chunkReader.limit = int64(uncompressedSize)
d.limitedReader.limit = int64(uncompressedSize)
}

d.frameLoaded = true
Expand Down
146 changes: 111 additions & 35 deletions go/pkg/frame_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,28 @@ package pkg
import (
"bytes"
"io"
"strconv"
"strings"
"testing"

"github.com/stretchr/testify/require"
)

type memChunkReaderWriter struct {
// memReaderWriter is an in-memory implementation of ChunkWriter and ByteAndBlockReader interfaces
// that allows to first write to the buffer and then read from it.
type memReaderWriter struct {
buf bytes.Buffer
}

func (m *memChunkReaderWriter) ReadByte() (byte, error) {
func (m *memReaderWriter) ReadByte() (byte, error) {
return m.buf.ReadByte()
}

func (m *memChunkReaderWriter) Read(p []byte) (n int, err error) {
func (m *memReaderWriter) Read(p []byte) (n int, err error) {
return m.buf.Read(p)
}

func (m *memChunkReaderWriter) WriteChunk(header []byte, content []byte) error {
func (m *memReaderWriter) WriteChunk(header []byte, content []byte) error {
_, err := m.buf.Write(header)
if err != nil {
return err
Expand All @@ -30,11 +33,11 @@ func (m *memChunkReaderWriter) WriteChunk(header []byte, content []byte) error {
return err
}

func (m *memChunkReaderWriter) Bytes() []byte {
func (m *memReaderWriter) Bytes() []byte {
return m.buf.Bytes()
}

func TestLastFrameAndContinue(t *testing.T) {
func testLastFrameAndContinue(t *testing.T, compression Compression) {
// This test verifies that it is possible to decode until the end of available
// data, get a correct indication that it is the end of the frame and end
// of all available data, then once new data becomes available the decoding
Expand All @@ -43,8 +46,8 @@ func TestLastFrameAndContinue(t *testing.T) {

// Encode one frame with some data.
encoder := FrameEncoder{}
buf := &memChunkReaderWriter{}
err := encoder.Init(buf, CompressionZstd)
buf := &memReaderWriter{}
err := encoder.Init(buf, compression)
require.NoError(t, err)
writeStr := []byte(strings.Repeat("hello", 10))
_, err = encoder.Write(writeStr)
Expand All @@ -55,7 +58,7 @@ func TestLastFrameAndContinue(t *testing.T) {

// Now decode that frame.
decoder := FrameDecoder{}
err = decoder.Init(buf, CompressionZstd)
err = decoder.Init(buf, compression)
require.NoError(t, err)
_, err = decoder.Next()
require.NoError(t, err)
Expand All @@ -73,40 +76,113 @@ func TestLastFrameAndContinue(t *testing.T) {
require.ErrorIs(t, err, EndOfFrame)
require.EqualValues(t, 0, n)

// Try decoding the next frame and make sure we get the EOF from the source byte Reader.
_, err = decoder.Next()
for i := 1; i <= 10; i++ {
// Try decoding the next frame and make sure we get the EOF from the source byte Reader.
_, err = decoder.Next()
require.ErrorIs(t, err, io.EOF)

// Continue adding to the same source byte buffer using encoder.

// Open a new frame, write new data and close the frame.
encoder.OpenFrame(0)
writeStr = []byte(strings.Repeat("foo", i))
_, err = encoder.Write(writeStr)
require.NoError(t, err)

err = encoder.CloseFrame()
require.NoError(t, err)

// Try reading again. We should get an EndOfFrame error.
readStr = make([]byte, len(writeStr))
n, err = decoder.Read(readStr)
require.ErrorIs(t, err, EndOfFrame)
require.EqualValues(t, 0, n)

// Now try decoding a new frame. This time it should succeed since we added a new frame.
_, err = decoder.Next()
require.NoError(t, err)

// Read the encoded data.
n, err = decoder.Read(readStr)
require.EqualValues(t, len(writeStr), n)
require.EqualValues(t, writeStr, readStr)

// Try decoding more, past the end of second frame.
n, err = decoder.Read(readStr)

// Make sure the error indicates end of the frame.
require.ErrorIs(t, err, EndOfFrame)
require.EqualValues(t, 0, n)
}
}

func TestLastFrameAndContinue(t *testing.T) {
compressions := []Compression{
CompressionNone,
CompressionZstd,
}

for _, compression := range compressions {
t.Run(
strconv.Itoa(int(compression)), func(t *testing.T) {
testLastFrameAndContinue(t, compression)
},
)
}
}

func TestLimitedReader(t *testing.T) {
data := []byte("abcdef")
mem := &memReaderWriter{buf: *bytes.NewBuffer(data)}
var lr limitedReader
lr.Init(mem)

// Test reading with limit 0
lr.limit = 0
buf := make([]byte, 3)
n, err := lr.Read(buf)
require.Equal(t, 0, n)
require.ErrorIs(t, err, io.EOF)

// Continue adding to the same source byte buffer using encoder.
// Test ReadByte with limit 0
lr.limit = 0
_, err = lr.ReadByte()
require.ErrorIs(t, err, io.EOF)

// Open a new frame, write new data and close the frame.
encoder.OpenFrame(0)
writeStr = []byte(strings.Repeat("foo", 10))
_, err = encoder.Write(writeStr)
// Reset and test reading less than limit
mem = &memReaderWriter{buf: *bytes.NewBuffer(data)}
lr.Init(mem)
lr.limit = 3
buf = make([]byte, 2)
n, err = lr.Read(buf)
require.Equal(t, 2, n)
require.NoError(t, err)
require.Equal(t, []byte("ab"), buf)
require.Equal(t, int64(1), lr.limit)

err = encoder.CloseFrame()
// Test ReadByte with remaining limit
b, err := lr.ReadByte()
require.NoError(t, err)
require.Equal(t, byte('c'), b)
require.Equal(t, int64(0), lr.limit)

// Try reading again. We should get an EndOfFrame error.
readStr = make([]byte, len(writeStr))
n, err = decoder.Read(readStr)
require.ErrorIs(t, err, EndOfFrame)
require.EqualValues(t, 0, n)
// Test ReadByte at limit 0 after reading
_, err = lr.ReadByte()
require.ErrorIs(t, err, io.EOF)

// Now try decoding a new frame. This time it should succeed since we added a new frame.
_, err = decoder.Next()
// Test reading more than limit
mem = &memReaderWriter{buf: *bytes.NewBuffer(data)}
lr.Init(mem)
lr.limit = 4
buf = make([]byte, 10)
n, err = lr.Read(buf)
require.Equal(t, 4, n)
require.NoError(t, err)
require.Equal(t, []byte("abcd"), buf[:n])
require.Equal(t, int64(0), lr.limit)

// Read the encoded data.
n, err = decoder.Read(readStr)
require.EqualValues(t, len(writeStr), n)
require.EqualValues(t, writeStr, readStr)

// Try decoding more, past the end of second frame.
n, err = decoder.Read(readStr)

// Make sure the error indicates end of the frame.
require.ErrorIs(t, err, EndOfFrame)
require.EqualValues(t, 0, n)
// Test Read after limit exhausted
n, err = lr.Read(buf)
require.Equal(t, 0, n)
require.ErrorIs(t, err, io.EOF)
}
15 changes: 7 additions & 8 deletions java/src/main/java/net/stef/FrameDecoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,22 @@ public class FrameDecoder extends InputStream {
private int ofs;
private InputStream frameContentSrc;
private ZstdInputStream decompressor;
private ChunkedReader chunkReader = new ChunkedReader();
final private LimitedReader limitedReader = new LimitedReader();
private int flags;
private boolean frameLoaded;
private boolean notFirstFrame;

public void init(InputStream src, Compression compression) throws IOException {
this.src = src;
this.compression = compression;
chunkReader.init(src);
chunkReader.setNextChunk(this::nextFrame);
limitedReader.init(src);

switch (compression) {
case None:
this.frameContentSrc = chunkReader;
this.frameContentSrc = limitedReader;
break;
case Zstd:
this.decompressor = new ZstdInputStream(chunkReader);
this.decompressor = new ZstdInputStream(limitedReader);
this.frameContentSrc = decompressor;
break;
default:
Expand All @@ -53,16 +52,16 @@ private void nextFrame() throws IOException {
if (compression != Compression.None) {
long compressedSize = Serde.readUvarint(src);

chunkReader.setLimit(compressedSize);
limitedReader.setLimit(compressedSize);

if (!notFirstFrame || (flags & FrameFlags.RestartCompression)!=0) {
notFirstFrame = true;
decompressor.close();
decompressor = new ZstdInputStream(chunkReader);
decompressor = new ZstdInputStream(limitedReader);
frameContentSrc = decompressor;
}
} else {
chunkReader.setLimit(uncompressedSize);
limitedReader.setLimit(uncompressedSize);
}

frameLoaded = true;
Expand Down
Loading
Loading