diff --git a/go/pkg/frame.go b/go/pkg/frame.go index 95a266a7..93623aed 100644 --- a/go/pkg/frame.go +++ b/go/pkg/frame.go @@ -96,19 +96,19 @@ type FrameDecoder struct { frameContentSrc ByteAndBlockReader decompressedContentReader *bufio.Reader decompressor *zstd.Decoder - chunkReader chunkedReader + limitedReader limitedReader flags FrameFlags frameLoaded bool notFirstFrame bool } -type chunkedReader struct { - src ByteAndBlockReader - limit int64 - nextChunk func() error +// limitedReader wraps a ByteAndBlockReader and limits the number of bytes read. +type limitedReader struct { + src ByteAndBlockReader + limit int64 } -func (r *chunkedReader) readByte() (byte, error) { +func (r *limitedReader) ReadByte() (byte, error) { if r.limit <= 0 { return 0, io.EOF } @@ -117,25 +117,7 @@ func (r *chunkedReader) readByte() (byte, error) { return b, err } -func (r *chunkedReader) ReadByte() (byte, error) { -loop: - for { - b, err := r.readByte() - if err == nil { - return b, err - } - if err == io.EOF { - err = r.nextChunk() - if err != nil { - return 0, err - } - goto loop - } - return 0, err - } -} - -func (r *chunkedReader) readBlock(p []byte) (n int, err error) { +func (r *limitedReader) Read(p []byte) (n int, err error) { if r.limit <= 0 { return 0, io.EOF } @@ -147,25 +129,7 @@ func (r *chunkedReader) readBlock(p []byte) (n int, err error) { return } -func (r *chunkedReader) Read(p []byte) (n int, err error) { -loop: - for { - n, err := r.readBlock(p) - if err == nil { - return n, err - } - if err == io.EOF { - err = r.nextChunk() - if err != nil { - return 0, err - } - goto loop - } - return 0, err - } -} - -func (r *chunkedReader) Init(src ByteAndBlockReader) { +func (r *limitedReader) Init(src ByteAndBlockReader) { r.src = src } @@ -176,12 +140,11 @@ const readBufSize = 64 * 1024 func (d *FrameDecoder) Init(src ByteAndBlockReader, compression Compression) error { d.src = src d.compression = compression - d.chunkReader.Init(src) - d.chunkReader.nextChunk = d.nextFrame + d.limitedReader.Init(src) switch d.compression { case CompressionNone: - d.frameContentSrc = &d.chunkReader + d.frameContentSrc = &d.limitedReader case CompressionZstd: var err error @@ -222,11 +185,11 @@ func (d *FrameDecoder) nextFrame() error { if err != nil { return err } - d.chunkReader.limit = int64(compressedSize) + d.limitedReader.limit = int64(compressedSize) if !d.notFirstFrame || d.flags&RestartCompression != 0 { d.notFirstFrame = true - if err := d.decompressor.Reset(&d.chunkReader); err != nil { + if err := d.decompressor.Reset(&d.limitedReader); err != nil { return err } } @@ -234,7 +197,7 @@ func (d *FrameDecoder) nextFrame() error { d.decompressedContentReader.Reset(d.decompressor) } else { compressedSize = uncompressedSize - d.chunkReader.limit = int64(uncompressedSize) + d.limitedReader.limit = int64(uncompressedSize) } d.frameLoaded = true diff --git a/go/pkg/frame_test.go b/go/pkg/frame_test.go index 29061c76..92d63db2 100644 --- a/go/pkg/frame_test.go +++ b/go/pkg/frame_test.go @@ -3,25 +3,28 @@ package pkg import ( "bytes" "io" + "strconv" "strings" "testing" "github.com/stretchr/testify/require" ) -type memChunkReaderWriter struct { +// memReaderWriter is an in-memory implementation of ChunkWriter and ByteAndBlockReader interfaces +// that allows to first write to the buffer and then read from it. +type memReaderWriter struct { buf bytes.Buffer } -func (m *memChunkReaderWriter) ReadByte() (byte, error) { +func (m *memReaderWriter) ReadByte() (byte, error) { return m.buf.ReadByte() } -func (m *memChunkReaderWriter) Read(p []byte) (n int, err error) { +func (m *memReaderWriter) Read(p []byte) (n int, err error) { return m.buf.Read(p) } -func (m *memChunkReaderWriter) WriteChunk(header []byte, content []byte) error { +func (m *memReaderWriter) WriteChunk(header []byte, content []byte) error { _, err := m.buf.Write(header) if err != nil { return err @@ -30,11 +33,11 @@ func (m *memChunkReaderWriter) WriteChunk(header []byte, content []byte) error { return err } -func (m *memChunkReaderWriter) Bytes() []byte { +func (m *memReaderWriter) Bytes() []byte { return m.buf.Bytes() } -func TestLastFrameAndContinue(t *testing.T) { +func testLastFrameAndContinue(t *testing.T, compression Compression) { // This test verifies that it is possible to decode until the end of available // data, get a correct indication that it is the end of the frame and end // of all available data, then once new data becomes available the decoding @@ -43,8 +46,8 @@ func TestLastFrameAndContinue(t *testing.T) { // Encode one frame with some data. encoder := FrameEncoder{} - buf := &memChunkReaderWriter{} - err := encoder.Init(buf, CompressionZstd) + buf := &memReaderWriter{} + err := encoder.Init(buf, compression) require.NoError(t, err) writeStr := []byte(strings.Repeat("hello", 10)) _, err = encoder.Write(writeStr) @@ -55,7 +58,7 @@ func TestLastFrameAndContinue(t *testing.T) { // Now decode that frame. decoder := FrameDecoder{} - err = decoder.Init(buf, CompressionZstd) + err = decoder.Init(buf, compression) require.NoError(t, err) _, err = decoder.Next() require.NoError(t, err) @@ -73,40 +76,113 @@ func TestLastFrameAndContinue(t *testing.T) { require.ErrorIs(t, err, EndOfFrame) require.EqualValues(t, 0, n) - // Try decoding the next frame and make sure we get the EOF from the source byte Reader. - _, err = decoder.Next() + for i := 1; i <= 10; i++ { + // Try decoding the next frame and make sure we get the EOF from the source byte Reader. + _, err = decoder.Next() + require.ErrorIs(t, err, io.EOF) + + // Continue adding to the same source byte buffer using encoder. + + // Open a new frame, write new data and close the frame. + encoder.OpenFrame(0) + writeStr = []byte(strings.Repeat("foo", i)) + _, err = encoder.Write(writeStr) + require.NoError(t, err) + + err = encoder.CloseFrame() + require.NoError(t, err) + + // Try reading again. We should get an EndOfFrame error. + readStr = make([]byte, len(writeStr)) + n, err = decoder.Read(readStr) + require.ErrorIs(t, err, EndOfFrame) + require.EqualValues(t, 0, n) + + // Now try decoding a new frame. This time it should succeed since we added a new frame. + _, err = decoder.Next() + require.NoError(t, err) + + // Read the encoded data. + n, err = decoder.Read(readStr) + require.EqualValues(t, len(writeStr), n) + require.EqualValues(t, writeStr, readStr) + + // Try decoding more, past the end of second frame. + n, err = decoder.Read(readStr) + + // Make sure the error indicates end of the frame. + require.ErrorIs(t, err, EndOfFrame) + require.EqualValues(t, 0, n) + } +} + +func TestLastFrameAndContinue(t *testing.T) { + compressions := []Compression{ + CompressionNone, + CompressionZstd, + } + + for _, compression := range compressions { + t.Run( + strconv.Itoa(int(compression)), func(t *testing.T) { + testLastFrameAndContinue(t, compression) + }, + ) + } +} + +func TestLimitedReader(t *testing.T) { + data := []byte("abcdef") + mem := &memReaderWriter{buf: *bytes.NewBuffer(data)} + var lr limitedReader + lr.Init(mem) + + // Test reading with limit 0 + lr.limit = 0 + buf := make([]byte, 3) + n, err := lr.Read(buf) + require.Equal(t, 0, n) require.ErrorIs(t, err, io.EOF) - // Continue adding to the same source byte buffer using encoder. + // Test ReadByte with limit 0 + lr.limit = 0 + _, err = lr.ReadByte() + require.ErrorIs(t, err, io.EOF) - // Open a new frame, write new data and close the frame. - encoder.OpenFrame(0) - writeStr = []byte(strings.Repeat("foo", 10)) - _, err = encoder.Write(writeStr) + // Reset and test reading less than limit + mem = &memReaderWriter{buf: *bytes.NewBuffer(data)} + lr.Init(mem) + lr.limit = 3 + buf = make([]byte, 2) + n, err = lr.Read(buf) + require.Equal(t, 2, n) require.NoError(t, err) + require.Equal(t, []byte("ab"), buf) + require.Equal(t, int64(1), lr.limit) - err = encoder.CloseFrame() + // Test ReadByte with remaining limit + b, err := lr.ReadByte() require.NoError(t, err) + require.Equal(t, byte('c'), b) + require.Equal(t, int64(0), lr.limit) - // Try reading again. We should get an EndOfFrame error. - readStr = make([]byte, len(writeStr)) - n, err = decoder.Read(readStr) - require.ErrorIs(t, err, EndOfFrame) - require.EqualValues(t, 0, n) + // Test ReadByte at limit 0 after reading + _, err = lr.ReadByte() + require.ErrorIs(t, err, io.EOF) - // Now try decoding a new frame. This time it should succeed since we added a new frame. - _, err = decoder.Next() + // Test reading more than limit + mem = &memReaderWriter{buf: *bytes.NewBuffer(data)} + lr.Init(mem) + lr.limit = 4 + buf = make([]byte, 10) + n, err = lr.Read(buf) + require.Equal(t, 4, n) require.NoError(t, err) + require.Equal(t, []byte("abcd"), buf[:n]) + require.Equal(t, int64(0), lr.limit) - // Read the encoded data. - n, err = decoder.Read(readStr) - require.EqualValues(t, len(writeStr), n) - require.EqualValues(t, writeStr, readStr) - - // Try decoding more, past the end of second frame. - n, err = decoder.Read(readStr) - - // Make sure the error indicates end of the frame. - require.ErrorIs(t, err, EndOfFrame) - require.EqualValues(t, 0, n) + // Test Read after limit exhausted + n, err = lr.Read(buf) + require.Equal(t, 0, n) + require.ErrorIs(t, err, io.EOF) } diff --git a/java/src/main/java/net/stef/FrameDecoder.java b/java/src/main/java/net/stef/FrameDecoder.java index 3beb88d4..5d380ddc 100644 --- a/java/src/main/java/net/stef/FrameDecoder.java +++ b/java/src/main/java/net/stef/FrameDecoder.java @@ -13,7 +13,7 @@ public class FrameDecoder extends InputStream { private int ofs; private InputStream frameContentSrc; private ZstdInputStream decompressor; - private ChunkedReader chunkReader = new ChunkedReader(); + final private LimitedReader limitedReader = new LimitedReader(); private int flags; private boolean frameLoaded; private boolean notFirstFrame; @@ -21,15 +21,14 @@ public class FrameDecoder extends InputStream { public void init(InputStream src, Compression compression) throws IOException { this.src = src; this.compression = compression; - chunkReader.init(src); - chunkReader.setNextChunk(this::nextFrame); + limitedReader.init(src); switch (compression) { case None: - this.frameContentSrc = chunkReader; + this.frameContentSrc = limitedReader; break; case Zstd: - this.decompressor = new ZstdInputStream(chunkReader); + this.decompressor = new ZstdInputStream(limitedReader); this.frameContentSrc = decompressor; break; default: @@ -53,16 +52,16 @@ private void nextFrame() throws IOException { if (compression != Compression.None) { long compressedSize = Serde.readUvarint(src); - chunkReader.setLimit(compressedSize); + limitedReader.setLimit(compressedSize); if (!notFirstFrame || (flags & FrameFlags.RestartCompression)!=0) { notFirstFrame = true; decompressor.close(); - decompressor = new ZstdInputStream(chunkReader); + decompressor = new ZstdInputStream(limitedReader); frameContentSrc = decompressor; } } else { - chunkReader.setLimit(uncompressedSize); + limitedReader.setLimit(uncompressedSize); } frameLoaded = true; diff --git a/java/src/main/java/net/stef/ChunkedReader.java b/java/src/main/java/net/stef/LimitedReader.java similarity index 64% rename from java/src/main/java/net/stef/ChunkedReader.java rename to java/src/main/java/net/stef/LimitedReader.java index d9e68ae4..37601b1b 100644 --- a/java/src/main/java/net/stef/ChunkedReader.java +++ b/java/src/main/java/net/stef/LimitedReader.java @@ -3,10 +3,12 @@ import java.io.IOException; import java.io.InputStream; -public class ChunkedReader extends ByteAndBlockReader { +// LimitedReader is an InputStream wrapper that limits +// the number of bytes that can be read from the underlying +// InputStream. +public class LimitedReader extends ByteAndBlockReader { private InputStream src; private long limit; - private NextChunkCallback nextChunk; public void init(InputStream src) { this.src = src; @@ -16,14 +18,10 @@ public void setLimit(long limit) { this.limit = limit; } - public void setNextChunk(NextChunkCallback nextChunk) { - this.nextChunk = nextChunk; - } - @Override public int read() throws IOException { if (limit <= 0) { - nextChunk.next(); + return -1; } limit--; return src.read(); @@ -32,16 +30,11 @@ public int read() throws IOException { @Override public int read(byte[] b, int off, int len) throws IOException { if (limit <= 0) { - nextChunk.next(); + return -1; } int toRead = (int) Math.min(limit, b.length); int n = src.read(b, off, toRead); limit -= n; return n; } - - @FunctionalInterface - public interface NextChunkCallback { - void next() throws IOException; - } } diff --git a/java/src/test/java/net/stef/LimitedReaderTest.java b/java/src/test/java/net/stef/LimitedReaderTest.java new file mode 100644 index 00000000..be027a0b --- /dev/null +++ b/java/src/test/java/net/stef/LimitedReaderTest.java @@ -0,0 +1,69 @@ +package net.stef; + +import org.junit.jupiter.api.Test; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import static org.junit.jupiter.api.Assertions.*; + +class LimitedReaderTest { + @Test + void testReadSingleBytesWithinLimit() throws IOException { + byte[] data = {1, 2, 3, 4, 5}; + LimitedReader reader = new LimitedReader(); + reader.init(new ByteArrayInputStream(data)); + reader.setLimit(3); + assertEquals(1, reader.read()); + assertEquals(2, reader.read()); + assertEquals(3, reader.read()); + assertEquals(-1, reader.read()); + } + + @Test + void testReadBufferWithinLimit() throws IOException { + byte[] data = {10, 20, 30, 40, 50}; + LimitedReader reader = new LimitedReader(); + reader.init(new ByteArrayInputStream(data)); + reader.setLimit(4); + byte[] buf = new byte[10]; + int n = reader.read(buf, 0, buf.length); + assertEquals(4, n); + assertArrayEquals(new byte[]{10, 20, 30, 40, 0, 0, 0, 0, 0, 0}, buf); + assertEquals(-1, reader.read()); + } + + @Test + void testReadPastLimit() throws IOException { + byte[] data = {1, 2, 3}; + LimitedReader reader = new LimitedReader(); + reader.init(new ByteArrayInputStream(data)); + reader.setLimit(2); + assertEquals(1, reader.read()); + assertEquals(2, reader.read()); + assertEquals(-1, reader.read()); + } + + @Test + void testZeroLimit() throws IOException { + byte[] data = {1, 2, 3}; + LimitedReader reader = new LimitedReader(); + reader.init(new ByteArrayInputStream(data)); + reader.setLimit(0); + assertEquals(-1, reader.read()); + } + + @Test + void testSetLimitMultipleTimes() throws IOException { + byte[] data = {1, 2, 3, 4}; + LimitedReader reader = new LimitedReader(); + reader.init(new ByteArrayInputStream(data)); + reader.setLimit(2); + assertEquals(1, reader.read()); + assertEquals(2, reader.read()); + assertEquals(-1, reader.read()); + reader.setLimit(2); + assertEquals(3, reader.read()); + assertEquals(4, reader.read()); + assertEquals(-1, reader.read()); + } +} + diff --git a/otelcol/cmd/otelcol/components.go b/otelcol/cmd/otelcol/components.go index e1c66712..2460e965 100644 --- a/otelcol/cmd/otelcol/components.go +++ b/otelcol/cmd/otelcol/components.go @@ -31,6 +31,7 @@ import ( "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/otlpreceiver" + "go.opentelemetry.io/collector/service/telemetry/otelconftelemetry" "github.com/splunk/stef/otelcol/internal/stefexporter" "github.com/splunk/stef/otelcol/internal/stefreceiver" @@ -81,5 +82,7 @@ func components() (otelcol.Factories, error) { return otelcol.Factories{}, err } + factories.Telemetry = otelconftelemetry.NewFactory() + return factories, nil }