|
19 | 19 | package org.apache.parquet.hadoop; |
20 | 20 |
|
21 | 21 | import com.github.luben.zstd.Zstd; |
| 22 | +import java.io.BufferedReader; |
| 23 | +import java.io.FileInputStream; |
22 | 24 | import java.io.IOException; |
| 25 | +import java.io.InputStream; |
| 26 | +import java.io.InputStreamReader; |
23 | 27 | import java.nio.ByteBuffer; |
24 | 28 | import java.nio.ByteOrder; |
25 | | -import java.util.Random; |
| 29 | +import java.nio.file.Files; |
| 30 | +import java.nio.file.Path; |
| 31 | +import java.nio.file.Paths; |
| 32 | +import java.util.ArrayList; |
| 33 | +import java.util.List; |
| 34 | +import java.util.zip.GZIPInputStream; |
26 | 35 | import org.apache.parquet.bytes.ByteBufferInputStream; |
27 | 36 | import org.apache.parquet.bytes.HeapByteBufferAllocator; |
28 | 37 | import org.apache.parquet.column.values.alp.AlpValuesReaderForDouble; |
|
42 | 51 | */ |
43 | 52 | public class EncodingCompressionBenchmark { |
44 | 53 |
|
45 | | - private static final int NUM_VALUES = 1_000_000; |
| 54 | + private static final int TILE_FACTOR = 6; // 15K * 6 = 90K values |
46 | 55 | private static final int WARMUP = 10; |
47 | 56 | private static final int MEASURED = 30; |
48 | 57 |
|
49 | | - // Datasets |
50 | | - private static double[] doubleDecimal; |
51 | | - private static double[] doubleInteger; |
52 | | - private static double[] doubleMixed; |
53 | | - private static float[] floatDecimal; |
54 | | - private static float[] floatInteger; |
55 | | - private static float[] floatMixed; |
| 58 | + private static final String CSV_DIR = "parquet-hadoop/src/test/resources"; |
| 59 | + private static final String DOUBLE_CSV = "alp_spotify1_expect.csv.gz"; |
| 60 | + private static final String FLOAT_CSV = "alp_float_spotify1_expect.csv.gz"; |
56 | 61 |
|
57 | | - @BeforeClass |
58 | | - public static void setup() { |
59 | | - Random rng = new Random(42); |
60 | | - |
61 | | - doubleDecimal = new double[NUM_VALUES]; |
62 | | - for (int i = 0; i < NUM_VALUES; i++) { |
63 | | - doubleDecimal[i] = Math.round(rng.nextDouble() * 10000) / 100.0; |
64 | | - } |
| 62 | + private static final String[] COLUMNS = { |
| 63 | + "valence", "danceability", "energy", "loudness", "speechiness", |
| 64 | + "acousticness", "instrumentalness", "liveness", "tempo" |
| 65 | + }; |
65 | 66 |
|
66 | | - doubleInteger = new double[NUM_VALUES]; |
67 | | - for (int i = 0; i < NUM_VALUES; i++) { |
68 | | - doubleInteger[i] = (double) rng.nextInt(100000); |
69 | | - } |
| 67 | + private static double[][] doubleColumns; |
| 68 | + private static float[][] floatColumns; |
70 | 69 |
|
71 | | - doubleMixed = new double[NUM_VALUES]; |
72 | | - for (int i = 0; i < NUM_VALUES; i++) { |
73 | | - doubleMixed[i] = Math.round(rng.nextDouble() * 10000) / 100.0; |
74 | | - } |
75 | | - for (int i = 0; i < NUM_VALUES; i += 50) { |
76 | | - doubleMixed[i] = Double.NaN; |
77 | | - } |
78 | | - |
79 | | - floatDecimal = new float[NUM_VALUES]; |
80 | | - for (int i = 0; i < NUM_VALUES; i++) { |
81 | | - floatDecimal[i] = Math.round(rng.nextFloat() * 10000) / 100.0f; |
82 | | - } |
| 70 | + @BeforeClass |
| 71 | + public static void setup() throws IOException { |
| 72 | + Path csvDir = findCsvDir(); |
83 | 73 |
|
84 | | - floatInteger = new float[NUM_VALUES]; |
85 | | - for (int i = 0; i < NUM_VALUES; i++) { |
86 | | - floatInteger[i] = (float) rng.nextInt(100000); |
| 74 | + double[][] rawDoubles = loadDoubleCsv(csvDir.resolve(DOUBLE_CSV)); |
| 75 | + doubleColumns = new double[rawDoubles.length][]; |
| 76 | + for (int c = 0; c < rawDoubles.length; c++) { |
| 77 | + doubleColumns[c] = tile(rawDoubles[c], rawDoubles[c].length * TILE_FACTOR); |
87 | 78 | } |
88 | 79 |
|
89 | | - floatMixed = new float[NUM_VALUES]; |
90 | | - for (int i = 0; i < NUM_VALUES; i++) { |
91 | | - floatMixed[i] = Math.round(rng.nextFloat() * 10000) / 100.0f; |
92 | | - } |
93 | | - for (int i = 0; i < NUM_VALUES; i += 50) { |
94 | | - floatMixed[i] = Float.NaN; |
| 80 | + float[][] rawFloats = loadFloatCsv(csvDir.resolve(FLOAT_CSV)); |
| 81 | + floatColumns = new float[rawFloats.length][]; |
| 82 | + for (int c = 0; c < rawFloats.length; c++) { |
| 83 | + floatColumns[c] = tile(rawFloats[c], rawFloats[c].length * TILE_FACTOR); |
95 | 84 | } |
96 | 85 | } |
97 | 86 |
|
98 | 87 | @Test |
99 | 88 | public void measureThroughput() throws IOException { |
100 | 89 | System.out.println(); |
101 | | - System.out.println("=== Encoding/Compression Benchmark (1M values, " + MEASURED + " iters) ==="); |
| 90 | + System.out.printf("=== Encoding/Compression Benchmark (%d values per column, Spotify dataset, %d iters) ===%n", |
| 91 | + doubleColumns[0].length, MEASURED); |
102 | 92 | System.out.println(); |
103 | 93 |
|
104 | 94 | String hdr = String.format( |
105 | | - "%-35s %10s %10s %10s %10s %8s", "Dataset / Encoding", "Enc MB/s", "Dec MB/s", "Raw KB", "Comp KB", "Ratio"); |
| 95 | + "%-35s %10s %10s %10s %10s %8s", "Column / Encoding", "Enc MB/s", "Dec MB/s", "Raw KB", "Comp KB", "Ratio"); |
106 | 96 | String sep = "-".repeat(hdr.length()); |
107 | 97 |
|
108 | | - // --- Double datasets --- |
| 98 | + // --- Double columns --- |
109 | 99 | System.out.println("=== DOUBLE (8 bytes/value) ==="); |
110 | 100 | System.out.println(hdr); |
111 | 101 | System.out.println(sep); |
112 | 102 |
|
113 | | - benchAllDouble("double_decimal", doubleDecimal); |
114 | | - System.out.println(); |
115 | | - benchAllDouble("double_integer", doubleInteger); |
116 | | - System.out.println(); |
117 | | - benchAllDouble("double_mixed(2%exc)", doubleMixed); |
118 | | - System.out.println(); |
| 103 | + for (int c = 0; c < doubleColumns.length; c++) { |
| 104 | + benchAllDouble(COLUMNS[c], doubleColumns[c]); |
| 105 | + System.out.println(); |
| 106 | + } |
119 | 107 |
|
120 | | - // --- Float datasets --- |
| 108 | + // --- Float columns --- |
121 | 109 | System.out.println("=== FLOAT (4 bytes/value) ==="); |
122 | 110 | System.out.println(hdr); |
123 | 111 | System.out.println(sep); |
124 | 112 |
|
125 | | - benchAllFloat("float_decimal", floatDecimal); |
126 | | - System.out.println(); |
127 | | - benchAllFloat("float_integer", floatInteger); |
128 | | - System.out.println(); |
129 | | - benchAllFloat("float_mixed(2%exc)", floatMixed); |
130 | | - System.out.println(); |
| 113 | + for (int c = 0; c < floatColumns.length; c++) { |
| 114 | + benchAllFloat(COLUMNS[c], floatColumns[c]); |
| 115 | + System.out.println(); |
| 116 | + } |
131 | 117 | } |
132 | 118 |
|
133 | 119 | // ---- Double benchmarks ---- |
@@ -390,4 +376,96 @@ private static void bssDecodeFloatsFromRaw(byte[] encoded, int numValues) throws |
390 | 376 | reader.readFloat(); |
391 | 377 | } |
392 | 378 | } |
| 379 | + |
| 380 | + // ---- CSV loading and tiling ---- |
| 381 | + |
| 382 | + private static Path findCsvDir() throws IOException { |
| 383 | + Path dir = Paths.get("").toAbsolutePath(); |
| 384 | + for (int i = 0; i < 3; i++) { |
| 385 | + Path candidate = dir.resolve(CSV_DIR); |
| 386 | + if (Files.isDirectory(candidate) && Files.exists(candidate.resolve(DOUBLE_CSV))) { |
| 387 | + return candidate; |
| 388 | + } |
| 389 | + dir = dir.getParent(); |
| 390 | + if (dir == null) break; |
| 391 | + } |
| 392 | + throw new IOException("Cannot find CSV directory '" + CSV_DIR |
| 393 | + + "'. Run from the parquet-java project root."); |
| 394 | + } |
| 395 | + |
| 396 | + private static double[][] loadDoubleCsv(Path csvPath) throws IOException { |
| 397 | + try (InputStream is = new GZIPInputStream(new FileInputStream(csvPath.toFile()))) { |
| 398 | + BufferedReader br = new BufferedReader(new InputStreamReader(is)); |
| 399 | + String header = br.readLine(); |
| 400 | + int numCols = header.split(",").length; |
| 401 | + |
| 402 | + List<double[]> rows = new ArrayList<>(); |
| 403 | + String line; |
| 404 | + while ((line = br.readLine()) != null) { |
| 405 | + String[] parts = line.split(","); |
| 406 | + double[] row = new double[numCols]; |
| 407 | + for (int i = 0; i < numCols; i++) { |
| 408 | + row[i] = Double.parseDouble(parts[i]); |
| 409 | + } |
| 410 | + rows.add(row); |
| 411 | + } |
| 412 | + |
| 413 | + double[][] columns = new double[numCols][rows.size()]; |
| 414 | + for (int r = 0; r < rows.size(); r++) { |
| 415 | + double[] row = rows.get(r); |
| 416 | + for (int c = 0; c < numCols; c++) { |
| 417 | + columns[c][r] = row[c]; |
| 418 | + } |
| 419 | + } |
| 420 | + return columns; |
| 421 | + } |
| 422 | + } |
| 423 | + |
| 424 | + private static float[][] loadFloatCsv(Path csvPath) throws IOException { |
| 425 | + try (InputStream is = new GZIPInputStream(new FileInputStream(csvPath.toFile()))) { |
| 426 | + BufferedReader br = new BufferedReader(new InputStreamReader(is)); |
| 427 | + String header = br.readLine(); |
| 428 | + int numCols = header.split(",").length; |
| 429 | + |
| 430 | + List<float[]> rows = new ArrayList<>(); |
| 431 | + String line; |
| 432 | + while ((line = br.readLine()) != null) { |
| 433 | + String[] parts = line.split(","); |
| 434 | + float[] row = new float[numCols]; |
| 435 | + for (int i = 0; i < numCols; i++) { |
| 436 | + row[i] = Float.parseFloat(parts[i]); |
| 437 | + } |
| 438 | + rows.add(row); |
| 439 | + } |
| 440 | + |
| 441 | + float[][] columns = new float[numCols][rows.size()]; |
| 442 | + for (int r = 0; r < rows.size(); r++) { |
| 443 | + float[] row = rows.get(r); |
| 444 | + for (int c = 0; c < numCols; c++) { |
| 445 | + columns[c][r] = row[c]; |
| 446 | + } |
| 447 | + } |
| 448 | + return columns; |
| 449 | + } |
| 450 | + } |
| 451 | + |
| 452 | + private static double[] tile(double[] source, int targetSize) { |
| 453 | + double[] result = new double[targetSize]; |
| 454 | + int len = source.length; |
| 455 | + for (int i = 0; i < targetSize; i++) { |
| 456 | + int copyIdx = i / len; // 0 for first copy, 1 for second, etc. |
| 457 | + result[i] = source[i % len] + copyIdx; |
| 458 | + } |
| 459 | + return result; |
| 460 | + } |
| 461 | + |
| 462 | + private static float[] tile(float[] source, int targetSize) { |
| 463 | + float[] result = new float[targetSize]; |
| 464 | + int len = source.length; |
| 465 | + for (int i = 0; i < targetSize; i++) { |
| 466 | + int copyIdx = i / len; |
| 467 | + result[i] = source[i % len] + copyIdx; |
| 468 | + } |
| 469 | + return result; |
| 470 | + } |
393 | 471 | } |
0 commit comments