From da913df9475694e4851ae8ec827a93753a257b27 Mon Sep 17 00:00:00 2001 From: Corey Kosak Date: Wed, 14 Jan 2026 21:25:46 -0500 Subject: [PATCH] fix: DH-21412: flush largeArray queue sooner --- .../densestorage/DenseStorageConstants.java | 11 +++++-- .../csv/densestorage/DenseStorageWriter.java | 8 ++++- .../java/io/deephaven/csv/CsvReaderTest.java | 33 +++++++++++++++++++ 3 files changed, 48 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/deephaven/csv/densestorage/DenseStorageConstants.java b/src/main/java/io/deephaven/csv/densestorage/DenseStorageConstants.java index 4f7a58fa..cc849098 100644 --- a/src/main/java/io/deephaven/csv/densestorage/DenseStorageConstants.java +++ b/src/main/java/io/deephaven/csv/densestorage/DenseStorageConstants.java @@ -19,10 +19,15 @@ public class DenseStorageConstants { */ public static final int PACKED_QUEUE_SIZE = LARGE_THRESHOLD * 1024; /** - * Size of the "large array queue". Somewhat arbitrary but should be large-ish. We have arbitrarily chosen 100K - * here. 10K might also be reasonable. + * Size of the "large array queue" in terms of number of slots in the large array. We have arbitrarily chosen 1K + * here. */ - public static final int LARGE_ARRAY_QUEUE_SIZE = 100_000; + public static final int LARGE_ARRAY_QUEUE_SIZE = 1024; + /** + * The threshold that will trigger a flush of the "large array queue" when it is exceeded. We have arbitrarily + * chosen 1M here. + */ + public static final int LARGE_ARRAY_CONTENT_LENGTH_FLUSH_THRESHOLD = LARGE_ARRAY_QUEUE_SIZE * LARGE_THRESHOLD; /** * This sentinel value is used to indicate that the next value being read is not bytes packed into a byte block but * rather its own byte array. diff --git a/src/main/java/io/deephaven/csv/densestorage/DenseStorageWriter.java b/src/main/java/io/deephaven/csv/densestorage/DenseStorageWriter.java index 1fdcdabd..f04bd0d8 100644 --- a/src/main/java/io/deephaven/csv/densestorage/DenseStorageWriter.java +++ b/src/main/java/io/deephaven/csv/densestorage/DenseStorageWriter.java @@ -162,6 +162,7 @@ public static Pair create(final boolean private byte[][] largeArrayBuffer = new byte[DenseStorageConstants.LARGE_ARRAY_QUEUE_SIZE][]; private int largeArrayBegin = 0; private int largeArrayCurrent = 0; + private int largeArrayContentLength = 0; private final ByteSlice controlWordByteSlice = new ByteSlice(new byte[4], 0, 4); @@ -223,14 +224,19 @@ private void addBytes(ByteSlice bs) { } private void addLargeArray(byte[] largeArray) { - if (largeArrayCurrent == largeArrayBuffer.length) { + // The large array buffer will allocate a new buffer when it is full (of course), or when its content length + // exceeds a threshold. This should allow the garbage collector to collect old large array pieces sooner. + if (largeArrayCurrent == largeArrayBuffer.length + || largeArrayContentLength >= DenseStorageConstants.LARGE_ARRAY_CONTENT_LENGTH_FLUSH_THRESHOLD) { flush(); largeArrayBuffer = new byte[DenseStorageConstants.LARGE_ARRAY_QUEUE_SIZE][]; largeArrayBegin = 0; largeArrayCurrent = 0; + largeArrayContentLength = 0; } largeArrayBuffer[largeArrayCurrent] = largeArray; ++largeArrayCurrent; + largeArrayContentLength += largeArray.length; } private void flush() { diff --git a/src/test/java/io/deephaven/csv/CsvReaderTest.java b/src/test/java/io/deephaven/csv/CsvReaderTest.java index 6aedc6f6..c2e17374 100644 --- a/src/test/java/io/deephaven/csv/CsvReaderTest.java +++ b/src/test/java/io/deephaven/csv/CsvReaderTest.java @@ -326,6 +326,39 @@ public void bug251(boolean concurrent) { .hasRootCauseMessage("Cell did not have closing quote character"); } + /** + * Alternates input between small strings and large strings, in order to exercise the large string queue and its + * early spill threshold. + */ + @Test + public void exerciseLargeThreshold() throws CsvReaderException { + final String smallString = "ss"; + + final String largeString; + { + final int numCharsInBigCell = DenseStorageConstants.LARGE_THRESHOLD * 4 + 111; + final StringBuilder bcb = new StringBuilder(); + for (int i = 0; i < numCharsInBigCell; ++i) { + bcb.append('L'); + } + largeString = bcb.toString(); + } + + // Make a buffer that alternates small strings and large strings + List expectedValues = new ArrayList<>(); + for (int i = 0; i != DenseStorageConstants.LARGE_ARRAY_QUEUE_SIZE; ++i) { + expectedValues.add(smallString); + expectedValues.add(largeString); + } + final String input = "Values\n" + String.join("\n", expectedValues); + + final ColumnSet expected = + ColumnSet.of( + Column.ofRefs("Values", expectedValues.toArray(new String[0]))); + CsvTestUtil.invokeTests(CsvTestUtil.defaultCsvSpecs(), input, expected); + } + + @Test public void validates() { final String lengthyMessage = "CsvSpecs failed validation for the following reasons: "