Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@ public class DenseStorageConstants {
*/
public static final int PACKED_QUEUE_SIZE = LARGE_THRESHOLD * 1024;
/**
* Size of the "large array queue". Somewhat arbitrary but should be large-ish. We have arbitrarily chosen 100K
* here. 10K might also be reasonable.
* Size of the "large array queue" in terms of number of slots in the large array. We have arbitrarily chosen 1K
* here.
*/
public static final int LARGE_ARRAY_QUEUE_SIZE = 100_000;
public static final int LARGE_ARRAY_QUEUE_SIZE = 1024;
/**
* The threshold that will trigger a flush of the "large array queue" when it is exceeded. We have arbitrarily
* chosen 1M here.
*/
public static final int LARGE_ARRAY_CONTENT_LENGTH_FLUSH_THRESHOLD = LARGE_ARRAY_QUEUE_SIZE * LARGE_THRESHOLD;
/**
* This sentinel value is used to indicate that the next value being read is not bytes packed into a byte block but
* rather its own byte array.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public static Pair<DenseStorageWriter, DenseStorageReader> create(final boolean
private byte[][] largeArrayBuffer = new byte[DenseStorageConstants.LARGE_ARRAY_QUEUE_SIZE][];
private int largeArrayBegin = 0;
private int largeArrayCurrent = 0;
private int largeArrayContentLength = 0;

private final ByteSlice controlWordByteSlice = new ByteSlice(new byte[4], 0, 4);

Expand Down Expand Up @@ -223,14 +224,19 @@ private void addBytes(ByteSlice bs) {
}

private void addLargeArray(byte[] largeArray) {
if (largeArrayCurrent == largeArrayBuffer.length) {
// The large array buffer will allocate a new buffer when it is full (of course), or when its content length
// exceeds a threshold. This should allow the garbage collector to collect old large array pieces sooner.
if (largeArrayCurrent == largeArrayBuffer.length
|| largeArrayContentLength >= DenseStorageConstants.LARGE_ARRAY_CONTENT_LENGTH_FLUSH_THRESHOLD) {
flush();
largeArrayBuffer = new byte[DenseStorageConstants.LARGE_ARRAY_QUEUE_SIZE][];
largeArrayBegin = 0;
largeArrayCurrent = 0;
largeArrayContentLength = 0;
}
largeArrayBuffer[largeArrayCurrent] = largeArray;
++largeArrayCurrent;
largeArrayContentLength += largeArray.length;
}

private void flush() {
Expand Down
33 changes: 33 additions & 0 deletions src/test/java/io/deephaven/csv/CsvReaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,39 @@ public void bug251(boolean concurrent) {
.hasRootCauseMessage("Cell did not have closing quote character");
}

/**
* Alternates input between small strings and large strings, in order to exercise the large string queue and its
* early spill threshold.
*/
@Test
public void exerciseLargeThreshold() throws CsvReaderException {
final String smallString = "ss";

final String largeString;
{
final int numCharsInBigCell = DenseStorageConstants.LARGE_THRESHOLD * 4 + 111;
final StringBuilder bcb = new StringBuilder();
for (int i = 0; i < numCharsInBigCell; ++i) {
bcb.append('L');
}
largeString = bcb.toString();
}

// Make a buffer that alternates small strings and large strings
List<String> expectedValues = new ArrayList<>();
for (int i = 0; i != DenseStorageConstants.LARGE_ARRAY_QUEUE_SIZE; ++i) {
expectedValues.add(smallString);
expectedValues.add(largeString);
}
final String input = "Values\n" + String.join("\n", expectedValues);

final ColumnSet expected =
ColumnSet.of(
Column.ofRefs("Values", expectedValues.toArray(new String[0])));
CsvTestUtil.invokeTests(CsvTestUtil.defaultCsvSpecs(), input, expected);
}


@Test
public void validates() {
final String lengthyMessage = "CsvSpecs failed validation for the following reasons: "
Expand Down