From 5e36e54cf6996168881aab922ebeafc92d2c909c Mon Sep 17 00:00:00 2001 From: Tigran Najaryan Date: Thu, 18 Dec 2025 15:20:31 -0500 Subject: [PATCH] Eliminate unnecessary nextChunk logic during reading nextChunk was a remnant from old logic that is no longer used. The current code does not try to read past current frame bytes and there is no need for the logic that loads the next chunk. Renamed chunkedReader to limitedReader for more clarity and added a unit test. --- go/pkg/frame.go | 63 ++------ go/pkg/frame_test.go | 146 +++++++++++++----- java/src/main/java/net/stef/FrameDecoder.java | 15 +- ...{ChunkedReader.java => LimitedReader.java} | 19 +-- .../test/java/net/stef/LimitedReaderTest.java | 69 +++++++++ otelcol/cmd/otelcol/components.go | 3 + 6 files changed, 209 insertions(+), 106 deletions(-) rename java/src/main/java/net/stef/{ChunkedReader.java => LimitedReader.java} (64%) create mode 100644 java/src/test/java/net/stef/LimitedReaderTest.java diff --git a/go/pkg/frame.go b/go/pkg/frame.go index 95a266a7..93623aed 100644 --- a/go/pkg/frame.go +++ b/go/pkg/frame.go @@ -96,19 +96,19 @@ type FrameDecoder struct { frameContentSrc ByteAndBlockReader decompressedContentReader *bufio.Reader decompressor *zstd.Decoder - chunkReader chunkedReader + limitedReader limitedReader flags FrameFlags frameLoaded bool notFirstFrame bool } -type chunkedReader struct { - src ByteAndBlockReader - limit int64 - nextChunk func() error +// limitedReader wraps a ByteAndBlockReader and limits the number of bytes read. +type limitedReader struct { + src ByteAndBlockReader + limit int64 } -func (r *chunkedReader) readByte() (byte, error) { +func (r *limitedReader) ReadByte() (byte, error) { if r.limit <= 0 { return 0, io.EOF } @@ -117,25 +117,7 @@ func (r *chunkedReader) readByte() (byte, error) { return b, err } -func (r *chunkedReader) ReadByte() (byte, error) { -loop: - for { - b, err := r.readByte() - if err == nil { - return b, err - } - if err == io.EOF { - err = r.nextChunk() - if err != nil { - return 0, err - } - goto loop - } - return 0, err - } -} - -func (r *chunkedReader) readBlock(p []byte) (n int, err error) { +func (r *limitedReader) Read(p []byte) (n int, err error) { if r.limit <= 0 { return 0, io.EOF } @@ -147,25 +129,7 @@ func (r *chunkedReader) readBlock(p []byte) (n int, err error) { return } -func (r *chunkedReader) Read(p []byte) (n int, err error) { -loop: - for { - n, err := r.readBlock(p) - if err == nil { - return n, err - } - if err == io.EOF { - err = r.nextChunk() - if err != nil { - return 0, err - } - goto loop - } - return 0, err - } -} - -func (r *chunkedReader) Init(src ByteAndBlockReader) { +func (r *limitedReader) Init(src ByteAndBlockReader) { r.src = src } @@ -176,12 +140,11 @@ const readBufSize = 64 * 1024 func (d *FrameDecoder) Init(src ByteAndBlockReader, compression Compression) error { d.src = src d.compression = compression - d.chunkReader.Init(src) - d.chunkReader.nextChunk = d.nextFrame + d.limitedReader.Init(src) switch d.compression { case CompressionNone: - d.frameContentSrc = &d.chunkReader + d.frameContentSrc = &d.limitedReader case CompressionZstd: var err error @@ -222,11 +185,11 @@ func (d *FrameDecoder) nextFrame() error { if err != nil { return err } - d.chunkReader.limit = int64(compressedSize) + d.limitedReader.limit = int64(compressedSize) if !d.notFirstFrame || d.flags&RestartCompression != 0 { d.notFirstFrame = true - if err := d.decompressor.Reset(&d.chunkReader); err != nil { + if err := d.decompressor.Reset(&d.limitedReader); err != nil { return err } } @@ -234,7 +197,7 @@ func (d *FrameDecoder) nextFrame() error { d.decompressedContentReader.Reset(d.decompressor) } else { compressedSize = uncompressedSize - d.chunkReader.limit = int64(uncompressedSize) + d.limitedReader.limit = int64(uncompressedSize) } d.frameLoaded = true diff --git a/go/pkg/frame_test.go b/go/pkg/frame_test.go index 29061c76..92d63db2 100644 --- a/go/pkg/frame_test.go +++ b/go/pkg/frame_test.go @@ -3,25 +3,28 @@ package pkg import ( "bytes" "io" + "strconv" "strings" "testing" "github.com/stretchr/testify/require" ) -type memChunkReaderWriter struct { +// memReaderWriter is an in-memory implementation of ChunkWriter and ByteAndBlockReader interfaces +// that allows to first write to the buffer and then read from it. +type memReaderWriter struct { buf bytes.Buffer } -func (m *memChunkReaderWriter) ReadByte() (byte, error) { +func (m *memReaderWriter) ReadByte() (byte, error) { return m.buf.ReadByte() } -func (m *memChunkReaderWriter) Read(p []byte) (n int, err error) { +func (m *memReaderWriter) Read(p []byte) (n int, err error) { return m.buf.Read(p) } -func (m *memChunkReaderWriter) WriteChunk(header []byte, content []byte) error { +func (m *memReaderWriter) WriteChunk(header []byte, content []byte) error { _, err := m.buf.Write(header) if err != nil { return err @@ -30,11 +33,11 @@ func (m *memChunkReaderWriter) WriteChunk(header []byte, content []byte) error { return err } -func (m *memChunkReaderWriter) Bytes() []byte { +func (m *memReaderWriter) Bytes() []byte { return m.buf.Bytes() } -func TestLastFrameAndContinue(t *testing.T) { +func testLastFrameAndContinue(t *testing.T, compression Compression) { // This test verifies that it is possible to decode until the end of available // data, get a correct indication that it is the end of the frame and end // of all available data, then once new data becomes available the decoding @@ -43,8 +46,8 @@ func TestLastFrameAndContinue(t *testing.T) { // Encode one frame with some data. encoder := FrameEncoder{} - buf := &memChunkReaderWriter{} - err := encoder.Init(buf, CompressionZstd) + buf := &memReaderWriter{} + err := encoder.Init(buf, compression) require.NoError(t, err) writeStr := []byte(strings.Repeat("hello", 10)) _, err = encoder.Write(writeStr) @@ -55,7 +58,7 @@ func TestLastFrameAndContinue(t *testing.T) { // Now decode that frame. decoder := FrameDecoder{} - err = decoder.Init(buf, CompressionZstd) + err = decoder.Init(buf, compression) require.NoError(t, err) _, err = decoder.Next() require.NoError(t, err) @@ -73,40 +76,113 @@ func TestLastFrameAndContinue(t *testing.T) { require.ErrorIs(t, err, EndOfFrame) require.EqualValues(t, 0, n) - // Try decoding the next frame and make sure we get the EOF from the source byte Reader. - _, err = decoder.Next() + for i := 1; i <= 10; i++ { + // Try decoding the next frame and make sure we get the EOF from the source byte Reader. + _, err = decoder.Next() + require.ErrorIs(t, err, io.EOF) + + // Continue adding to the same source byte buffer using encoder. + + // Open a new frame, write new data and close the frame. + encoder.OpenFrame(0) + writeStr = []byte(strings.Repeat("foo", i)) + _, err = encoder.Write(writeStr) + require.NoError(t, err) + + err = encoder.CloseFrame() + require.NoError(t, err) + + // Try reading again. We should get an EndOfFrame error. + readStr = make([]byte, len(writeStr)) + n, err = decoder.Read(readStr) + require.ErrorIs(t, err, EndOfFrame) + require.EqualValues(t, 0, n) + + // Now try decoding a new frame. This time it should succeed since we added a new frame. + _, err = decoder.Next() + require.NoError(t, err) + + // Read the encoded data. + n, err = decoder.Read(readStr) + require.EqualValues(t, len(writeStr), n) + require.EqualValues(t, writeStr, readStr) + + // Try decoding more, past the end of second frame. + n, err = decoder.Read(readStr) + + // Make sure the error indicates end of the frame. + require.ErrorIs(t, err, EndOfFrame) + require.EqualValues(t, 0, n) + } +} + +func TestLastFrameAndContinue(t *testing.T) { + compressions := []Compression{ + CompressionNone, + CompressionZstd, + } + + for _, compression := range compressions { + t.Run( + strconv.Itoa(int(compression)), func(t *testing.T) { + testLastFrameAndContinue(t, compression) + }, + ) + } +} + +func TestLimitedReader(t *testing.T) { + data := []byte("abcdef") + mem := &memReaderWriter{buf: *bytes.NewBuffer(data)} + var lr limitedReader + lr.Init(mem) + + // Test reading with limit 0 + lr.limit = 0 + buf := make([]byte, 3) + n, err := lr.Read(buf) + require.Equal(t, 0, n) require.ErrorIs(t, err, io.EOF) - // Continue adding to the same source byte buffer using encoder. + // Test ReadByte with limit 0 + lr.limit = 0 + _, err = lr.ReadByte() + require.ErrorIs(t, err, io.EOF) - // Open a new frame, write new data and close the frame. - encoder.OpenFrame(0) - writeStr = []byte(strings.Repeat("foo", 10)) - _, err = encoder.Write(writeStr) + // Reset and test reading less than limit + mem = &memReaderWriter{buf: *bytes.NewBuffer(data)} + lr.Init(mem) + lr.limit = 3 + buf = make([]byte, 2) + n, err = lr.Read(buf) + require.Equal(t, 2, n) require.NoError(t, err) + require.Equal(t, []byte("ab"), buf) + require.Equal(t, int64(1), lr.limit) - err = encoder.CloseFrame() + // Test ReadByte with remaining limit + b, err := lr.ReadByte() require.NoError(t, err) + require.Equal(t, byte('c'), b) + require.Equal(t, int64(0), lr.limit) - // Try reading again. We should get an EndOfFrame error. - readStr = make([]byte, len(writeStr)) - n, err = decoder.Read(readStr) - require.ErrorIs(t, err, EndOfFrame) - require.EqualValues(t, 0, n) + // Test ReadByte at limit 0 after reading + _, err = lr.ReadByte() + require.ErrorIs(t, err, io.EOF) - // Now try decoding a new frame. This time it should succeed since we added a new frame. - _, err = decoder.Next() + // Test reading more than limit + mem = &memReaderWriter{buf: *bytes.NewBuffer(data)} + lr.Init(mem) + lr.limit = 4 + buf = make([]byte, 10) + n, err = lr.Read(buf) + require.Equal(t, 4, n) require.NoError(t, err) + require.Equal(t, []byte("abcd"), buf[:n]) + require.Equal(t, int64(0), lr.limit) - // Read the encoded data. - n, err = decoder.Read(readStr) - require.EqualValues(t, len(writeStr), n) - require.EqualValues(t, writeStr, readStr) - - // Try decoding more, past the end of second frame. - n, err = decoder.Read(readStr) - - // Make sure the error indicates end of the frame. - require.ErrorIs(t, err, EndOfFrame) - require.EqualValues(t, 0, n) + // Test Read after limit exhausted + n, err = lr.Read(buf) + require.Equal(t, 0, n) + require.ErrorIs(t, err, io.EOF) } diff --git a/java/src/main/java/net/stef/FrameDecoder.java b/java/src/main/java/net/stef/FrameDecoder.java index 3beb88d4..5d380ddc 100644 --- a/java/src/main/java/net/stef/FrameDecoder.java +++ b/java/src/main/java/net/stef/FrameDecoder.java @@ -13,7 +13,7 @@ public class FrameDecoder extends InputStream { private int ofs; private InputStream frameContentSrc; private ZstdInputStream decompressor; - private ChunkedReader chunkReader = new ChunkedReader(); + final private LimitedReader limitedReader = new LimitedReader(); private int flags; private boolean frameLoaded; private boolean notFirstFrame; @@ -21,15 +21,14 @@ public class FrameDecoder extends InputStream { public void init(InputStream src, Compression compression) throws IOException { this.src = src; this.compression = compression; - chunkReader.init(src); - chunkReader.setNextChunk(this::nextFrame); + limitedReader.init(src); switch (compression) { case None: - this.frameContentSrc = chunkReader; + this.frameContentSrc = limitedReader; break; case Zstd: - this.decompressor = new ZstdInputStream(chunkReader); + this.decompressor = new ZstdInputStream(limitedReader); this.frameContentSrc = decompressor; break; default: @@ -53,16 +52,16 @@ private void nextFrame() throws IOException { if (compression != Compression.None) { long compressedSize = Serde.readUvarint(src); - chunkReader.setLimit(compressedSize); + limitedReader.setLimit(compressedSize); if (!notFirstFrame || (flags & FrameFlags.RestartCompression)!=0) { notFirstFrame = true; decompressor.close(); - decompressor = new ZstdInputStream(chunkReader); + decompressor = new ZstdInputStream(limitedReader); frameContentSrc = decompressor; } } else { - chunkReader.setLimit(uncompressedSize); + limitedReader.setLimit(uncompressedSize); } frameLoaded = true; diff --git a/java/src/main/java/net/stef/ChunkedReader.java b/java/src/main/java/net/stef/LimitedReader.java similarity index 64% rename from java/src/main/java/net/stef/ChunkedReader.java rename to java/src/main/java/net/stef/LimitedReader.java index d9e68ae4..37601b1b 100644 --- a/java/src/main/java/net/stef/ChunkedReader.java +++ b/java/src/main/java/net/stef/LimitedReader.java @@ -3,10 +3,12 @@ import java.io.IOException; import java.io.InputStream; -public class ChunkedReader extends ByteAndBlockReader { +// LimitedReader is an InputStream wrapper that limits +// the number of bytes that can be read from the underlying +// InputStream. +public class LimitedReader extends ByteAndBlockReader { private InputStream src; private long limit; - private NextChunkCallback nextChunk; public void init(InputStream src) { this.src = src; @@ -16,14 +18,10 @@ public void setLimit(long limit) { this.limit = limit; } - public void setNextChunk(NextChunkCallback nextChunk) { - this.nextChunk = nextChunk; - } - @Override public int read() throws IOException { if (limit <= 0) { - nextChunk.next(); + return -1; } limit--; return src.read(); @@ -32,16 +30,11 @@ public int read() throws IOException { @Override public int read(byte[] b, int off, int len) throws IOException { if (limit <= 0) { - nextChunk.next(); + return -1; } int toRead = (int) Math.min(limit, b.length); int n = src.read(b, off, toRead); limit -= n; return n; } - - @FunctionalInterface - public interface NextChunkCallback { - void next() throws IOException; - } } diff --git a/java/src/test/java/net/stef/LimitedReaderTest.java b/java/src/test/java/net/stef/LimitedReaderTest.java new file mode 100644 index 00000000..be027a0b --- /dev/null +++ b/java/src/test/java/net/stef/LimitedReaderTest.java @@ -0,0 +1,69 @@ +package net.stef; + +import org.junit.jupiter.api.Test; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import static org.junit.jupiter.api.Assertions.*; + +class LimitedReaderTest { + @Test + void testReadSingleBytesWithinLimit() throws IOException { + byte[] data = {1, 2, 3, 4, 5}; + LimitedReader reader = new LimitedReader(); + reader.init(new ByteArrayInputStream(data)); + reader.setLimit(3); + assertEquals(1, reader.read()); + assertEquals(2, reader.read()); + assertEquals(3, reader.read()); + assertEquals(-1, reader.read()); + } + + @Test + void testReadBufferWithinLimit() throws IOException { + byte[] data = {10, 20, 30, 40, 50}; + LimitedReader reader = new LimitedReader(); + reader.init(new ByteArrayInputStream(data)); + reader.setLimit(4); + byte[] buf = new byte[10]; + int n = reader.read(buf, 0, buf.length); + assertEquals(4, n); + assertArrayEquals(new byte[]{10, 20, 30, 40, 0, 0, 0, 0, 0, 0}, buf); + assertEquals(-1, reader.read()); + } + + @Test + void testReadPastLimit() throws IOException { + byte[] data = {1, 2, 3}; + LimitedReader reader = new LimitedReader(); + reader.init(new ByteArrayInputStream(data)); + reader.setLimit(2); + assertEquals(1, reader.read()); + assertEquals(2, reader.read()); + assertEquals(-1, reader.read()); + } + + @Test + void testZeroLimit() throws IOException { + byte[] data = {1, 2, 3}; + LimitedReader reader = new LimitedReader(); + reader.init(new ByteArrayInputStream(data)); + reader.setLimit(0); + assertEquals(-1, reader.read()); + } + + @Test + void testSetLimitMultipleTimes() throws IOException { + byte[] data = {1, 2, 3, 4}; + LimitedReader reader = new LimitedReader(); + reader.init(new ByteArrayInputStream(data)); + reader.setLimit(2); + assertEquals(1, reader.read()); + assertEquals(2, reader.read()); + assertEquals(-1, reader.read()); + reader.setLimit(2); + assertEquals(3, reader.read()); + assertEquals(4, reader.read()); + assertEquals(-1, reader.read()); + } +} + diff --git a/otelcol/cmd/otelcol/components.go b/otelcol/cmd/otelcol/components.go index e1c66712..2460e965 100644 --- a/otelcol/cmd/otelcol/components.go +++ b/otelcol/cmd/otelcol/components.go @@ -31,6 +31,7 @@ import ( "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/otlpreceiver" + "go.opentelemetry.io/collector/service/telemetry/otelconftelemetry" "github.com/splunk/stef/otelcol/internal/stefexporter" "github.com/splunk/stef/otelcol/internal/stefreceiver" @@ -81,5 +82,7 @@ func components() (otelcol.Factories, error) { return otelcol.Factories{}, err } + factories.Telemetry = otelconftelemetry.NewFactory() + return factories, nil }