From a51bcae8e31f4e974a103e5ad020602cf83d1006 Mon Sep 17 00:00:00 2001 From: Jiayi-Wang-db Date: Thu, 23 Oct 2025 14:05:17 +0200 Subject: [PATCH 1/7] Avoid flushing data to cloud when exception is thrown. --- .../org/apache/parquet/hadoop/InternalParquetRecordWriter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index f296286800..309df6fe93 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -140,8 +140,9 @@ public void close() throws IOException, InterruptedException { } finalMetadata.putAll(finalWriteContext.getExtraMetaData()); parquetFileWriter.end(finalMetadata); + AutoCloseables.uncheckedClose(parquetFileWriter); } finally { - AutoCloseables.uncheckedClose(columnStore, pageStore, bloomFilterWriteStore, parquetFileWriter); + AutoCloseables.uncheckedClose(columnStore, pageStore, bloomFilterWriteStore); closed = true; } } From 52a7a416e2e4b740aa6c1dd1a8b1c4171f272a01 Mon Sep 17 00:00:00 2001 From: Jiayi-Wang-db Date: Thu, 23 Oct 2025 14:07:00 +0200 Subject: [PATCH 2/7] Added unit test --- .../parquet/hadoop/TestParquetWriter.java | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index d73079c92f..d2c4124239 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -780,4 +780,44 @@ public void testParquetWriterBuilderCanNotConfigurePathAndFile() throws IOExcept "Cannot set both path and file", IllegalStateException.class, (Callable>) () -> ExampleParquetWriter.builder(path).withFile(outputFile).build()); } + + @Test + public void testNoFlushAfterException() throws Exception { + final File testDir = temp.newFile(); + testDir.delete(); + + final Path file = new Path(testDir.getAbsolutePath(), "test.parquet"); + + MessageType schema = Types.buildMessage() + .required(BINARY) + .named("binary_field") + .required(INT32) + .named("int32_field") + .named("test_schema_abort"); + Configuration conf = new Configuration(); + + try (ParquetWriter writer = ExampleParquetWriter.builder(new Path(file.toString())) + .withAllocator(allocator) + .withType(schema) + .build()) { + + SimpleGroupFactory f = new SimpleGroupFactory(schema); + writer.write(f.newGroup() + .append("binary_field", "hello") + .append("int32_field", 123)); + + Field internalWriterField = ParquetWriter.class.getDeclaredField("writer"); + internalWriterField.setAccessible(true); + Object internalWriter = internalWriterField.get(writer); + + Field abortedField = internalWriter.getClass().getDeclaredField("aborted"); + abortedField.setAccessible(true); + abortedField.setBoolean(internalWriter, true); + writer.close(); + } + + // After closing, check whether file exists or is empty + FileSystem fs = file.getFileSystem(conf); + assertTrue(!fs.exists(file) || fs.getFileStatus(file).getLen() == 0); + } } From d4b1efa10792606df77424a56837089fe0513af7 Mon Sep 17 00:00:00 2001 From: Jiayi-Wang-db Date: Fri, 24 Oct 2025 11:40:47 +0200 Subject: [PATCH 3/7] Update ParquetFileWriter.java --- .../java/org/apache/parquet/hadoop/ParquetFileWriter.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 4d17a1d6e4..7863ec2f4e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -113,7 +113,7 @@ public class ParquetFileWriter implements AutoCloseable { public static final byte[] EFMAGIC = EF_MAGIC_STR.getBytes(StandardCharsets.US_ASCII); public static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata"; public static final int CURRENT_VERSION = 1; - + // File creation modes public static enum Mode { CREATE, @@ -173,6 +173,7 @@ public static enum Mode { // set when end is called private ParquetMetadata footer = null; + private boolean aborted; private boolean closed; private final CRC32 crc; @@ -1812,6 +1813,8 @@ public void end(Map extraMetaData) throws IOException { LOG.debug("{}: end", out.getPos()); this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks); serializeFooter(footer, out, fileEncryptor, metadataConverter); + } catch (Exception e) { + aborted = true; } finally { close(); } @@ -1823,7 +1826,7 @@ public void close() throws IOException { return; } try (PositionOutputStream temp = out) { - temp.flush(); + if (!aborted) temp.flush(); if (crcAllocator != null) { crcAllocator.close(); } From f2333185197a3118481455842fdb28cce4c64b3f Mon Sep 17 00:00:00 2001 From: Jiayi-Wang-db Date: Fri, 24 Oct 2025 12:33:08 +0200 Subject: [PATCH 4/7] fix --- .../hadoop/InternalParquetRecordWriter.java | 7 +++++-- .../parquet/hadoop/ParquetFileWriter.java | 21 ++++++++++++++----- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index 309df6fe93..41b068d01a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -129,6 +129,7 @@ public void close() throws IOException, InterruptedException { if (!closed) { try { if (aborted) { + parquetFileWriter.abort(); return; } flushRowGroupToStore(); @@ -140,9 +141,11 @@ public void close() throws IOException, InterruptedException { } finalMetadata.putAll(finalWriteContext.getExtraMetaData()); parquetFileWriter.end(finalMetadata); - AutoCloseables.uncheckedClose(parquetFileWriter); + } catch (Exception e) { + parquetFileWriter.abort(); + throw e; } finally { - AutoCloseables.uncheckedClose(columnStore, pageStore, bloomFilterWriteStore); + AutoCloseables.uncheckedClose(columnStore, pageStore, bloomFilterWriteStore, parquetFileWriter); closed = true; } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 7863ec2f4e..f6fc2a7e5a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -1813,24 +1813,35 @@ public void end(Map extraMetaData) throws IOException { LOG.debug("{}: end", out.getPos()); this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks); serializeFooter(footer, out, fileEncryptor, metadataConverter); - } catch (Exception e) { - aborted = true; + } catch (IOException e) { + abort(); + throw e; } finally { close(); } } + /* Mark the writer as aborted to avoid flushing incomplete data to the cloud. */ + public void abort() { + aborted = true; + } + @Override public void close() throws IOException { if (closed) { return; } - try (PositionOutputStream temp = out) { - if (!aborted) temp.flush(); + + try { + if (!aborted && out != null) { + out.flush(); + } + } catch (IOException e) { + throw e; + } finally { if (crcAllocator != null) { crcAllocator.close(); } - } finally { closed = true; } } From 897f96a2f1e0a1f82881cd19de9c0f539766bb9f Mon Sep 17 00:00:00 2001 From: Jiayi-Wang-db Date: Fri, 24 Oct 2025 12:34:17 +0200 Subject: [PATCH 5/7] lint --- .../main/java/org/apache/parquet/hadoop/ParquetFileWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index f6fc2a7e5a..07109b24cf 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -113,7 +113,7 @@ public class ParquetFileWriter implements AutoCloseable { public static final byte[] EFMAGIC = EF_MAGIC_STR.getBytes(StandardCharsets.US_ASCII); public static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata"; public static final int CURRENT_VERSION = 1; - + // File creation modes public static enum Mode { CREATE, From 3a8e343c8b91bf68cf9187cdf0ae93558c74c014 Mon Sep 17 00:00:00 2001 From: Jiayi-Wang-db Date: Thu, 30 Oct 2025 16:59:26 +0100 Subject: [PATCH 6/7] fix more public functions --- .../parquet/hadoop/ParquetFileWriter.java | 859 ++++++++++-------- .../parquet/hadoop/TestParquetWriter.java | 2 + 2 files changed, 462 insertions(+), 399 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 07109b24cf..65bc42e296 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -336,6 +336,34 @@ public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode, long ro ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED); } + @FunctionalInterface + interface IOCallable { + T call() throws IOException; + } + + private T withAbortOnFailure(IOCallable action) throws IOException { + try { + return action.call(); + } catch (IOException e) { + aborted = true; + throw e; + } + } + + @FunctionalInterface + interface IORunnable { + void run() throws IOException; + } + + private void withAbortOnFailure(IORunnable action) throws IOException { + try { + action.run(); + } catch (IOException e) { + aborted = true; + throw e; + } + } + /** * @param file OutputFile to create or overwrite * @param schema the schema of the data @@ -566,13 +594,15 @@ private ParquetFileWriter( * @throws IOException if there is an error while writing */ public void start() throws IOException { - state = state.start(); - LOG.debug("{}: start", out.getPos()); - byte[] magic = MAGIC; - if (null != fileEncryptor && fileEncryptor.isFooterEncrypted()) { - magic = EFMAGIC; - } - out.write(magic); + withAbortOnFailure(() -> { + state = state.start(); + LOG.debug("{}: start", out.getPos()); + byte[] magic = MAGIC; + if (null != fileEncryptor && fileEncryptor.isFooterEncrypted()) { + magic = EFMAGIC; + } + out.write(magic); + }); } public InternalFileEncryptor getEncryptor() { @@ -586,19 +616,21 @@ public InternalFileEncryptor getEncryptor() { * @throws IOException if there is an error while writing */ public void startBlock(long recordCount) throws IOException { - state = state.startBlock(); - LOG.debug("{}: start block", out.getPos()); - // out.write(MAGIC); // TODO: add a magic delimiter + withAbortOnFailure(() -> { + state = state.startBlock(); + LOG.debug("{}: start block", out.getPos()); + // out.write(MAGIC); // TODO: add a magic delimiter - alignment.alignForRowGroup(out); + alignment.alignForRowGroup(out); - currentBlock = new BlockMetaData(); - currentRecordCount = recordCount; + currentBlock = new BlockMetaData(); + currentRecordCount = recordCount; - currentColumnIndexes = new ArrayList<>(); - currentOffsetIndexes = new ArrayList<>(); + currentColumnIndexes = new ArrayList<>(); + currentOffsetIndexes = new ArrayList<>(); - currentBloomFilters = new HashMap<>(); + currentBloomFilters = new HashMap<>(); + }); } /** @@ -611,28 +643,30 @@ public void startBlock(long recordCount) throws IOException { */ public void startColumn(ColumnDescriptor descriptor, long valueCount, CompressionCodecName compressionCodecName) throws IOException { - state = state.startColumn(); - encodingStatsBuilder.clear(); - currentEncodings = new HashSet(); - currentChunkPath = ColumnPath.get(descriptor.getPath()); - currentChunkType = descriptor.getPrimitiveType(); - currentChunkCodec = compressionCodecName; - currentChunkValueCount = valueCount; - currentChunkFirstDataPage = -1; - compressedLength = 0; - uncompressedLength = 0; - // The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed one - currentStatistics = null; - currentSizeStatistics = SizeStatistics.newBuilder( - descriptor.getPrimitiveType(), - descriptor.getMaxRepetitionLevel(), - descriptor.getMaxDefinitionLevel()) - .build(); - currentGeospatialStatistics = - GeospatialStatistics.newBuilder(descriptor.getPrimitiveType()).build(); - - columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength); - offsetIndexBuilder = OffsetIndexBuilder.getBuilder(); + withAbortOnFailure(() -> { + state = state.startColumn(); + encodingStatsBuilder.clear(); + currentEncodings = new HashSet(); + currentChunkPath = ColumnPath.get(descriptor.getPath()); + currentChunkType = descriptor.getPrimitiveType(); + currentChunkCodec = compressionCodecName; + currentChunkValueCount = valueCount; + currentChunkFirstDataPage = -1; + compressedLength = 0; + uncompressedLength = 0; + // The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed one + currentStatistics = null; + currentSizeStatistics = SizeStatistics.newBuilder( + descriptor.getPrimitiveType(), + descriptor.getMaxRepetitionLevel(), + descriptor.getMaxDefinitionLevel()) + .build(); + currentGeospatialStatistics = + GeospatialStatistics.newBuilder(descriptor.getPrimitiveType()).build(); + + columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength); + offsetIndexBuilder = OffsetIndexBuilder.getBuilder(); + }); } /** @@ -642,45 +676,49 @@ public void startColumn(ColumnDescriptor descriptor, long valueCount, Compressio * @throws IOException if there is an error while writing */ public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException { - writeDictionaryPage(dictionaryPage, null, null); + withAbortOnFailure(() -> { + writeDictionaryPage(dictionaryPage, null, null); + }); } public void writeDictionaryPage( DictionaryPage dictionaryPage, BlockCipher.Encryptor headerBlockEncryptor, byte[] AAD) throws IOException { - state = state.write(); - LOG.debug("{}: write dictionary page: {} values", out.getPos(), dictionaryPage.getDictionarySize()); - currentChunkDictionaryPageOffset = out.getPos(); - int uncompressedSize = dictionaryPage.getUncompressedSize(); - int compressedPageSize = Math.toIntExact(dictionaryPage.getBytes().size()); - if (pageWriteChecksumEnabled) { - crc.reset(); - crcUpdate(dictionaryPage.getBytes()); - metadataConverter.writeDictionaryPageHeader( - uncompressedSize, - compressedPageSize, - dictionaryPage.getDictionarySize(), - dictionaryPage.getEncoding(), - (int) crc.getValue(), - out, - headerBlockEncryptor, - AAD); - } else { - metadataConverter.writeDictionaryPageHeader( - uncompressedSize, - compressedPageSize, - dictionaryPage.getDictionarySize(), - dictionaryPage.getEncoding(), - out, - headerBlockEncryptor, - AAD); - } - long headerSize = out.getPos() - currentChunkDictionaryPageOffset; - this.uncompressedLength += uncompressedSize + headerSize; - this.compressedLength += compressedPageSize + headerSize; - LOG.debug("{}: write dictionary page content {}", out.getPos(), compressedPageSize); - dictionaryPage.getBytes().writeAllTo(out); // for encrypted column, dictionary page bytes are already encrypted - encodingStatsBuilder.addDictEncoding(dictionaryPage.getEncoding()); - currentEncodings.add(dictionaryPage.getEncoding()); + withAbortOnFailure(() -> { + state = state.write(); + LOG.debug("{}: write dictionary page: {} values", out.getPos(), dictionaryPage.getDictionarySize()); + currentChunkDictionaryPageOffset = out.getPos(); + int uncompressedSize = dictionaryPage.getUncompressedSize(); + int compressedPageSize = Math.toIntExact(dictionaryPage.getBytes().size()); + if (pageWriteChecksumEnabled) { + crc.reset(); + crcUpdate(dictionaryPage.getBytes()); + metadataConverter.writeDictionaryPageHeader( + uncompressedSize, + compressedPageSize, + dictionaryPage.getDictionarySize(), + dictionaryPage.getEncoding(), + (int) crc.getValue(), + out, + headerBlockEncryptor, + AAD); + } else { + metadataConverter.writeDictionaryPageHeader( + uncompressedSize, + compressedPageSize, + dictionaryPage.getDictionarySize(), + dictionaryPage.getEncoding(), + out, + headerBlockEncryptor, + AAD); + } + long headerSize = out.getPos() - currentChunkDictionaryPageOffset; + this.uncompressedLength += uncompressedSize + headerSize; + this.compressedLength += compressedPageSize + headerSize; + LOG.debug("{}: write dictionary page content {}", out.getPos(), compressedPageSize); + dictionaryPage.getBytes().writeAllTo(out); // for encrypted column, dictionary page bytes are already encrypted + encodingStatsBuilder.addDictEncoding(dictionaryPage.getEncoding()); + currentEncodings.add(dictionaryPage.getEncoding()); + }); } /** @@ -872,22 +910,24 @@ public void writeDataPage( byte[] pageHeaderAAD, SizeStatistics sizeStatistics) throws IOException { - long beforeHeader = out.getPos(); - innerWriteDataPage( - valueCount, - uncompressedPageSize, - bytes, - statistics, - rlEncoding, - dlEncoding, - valuesEncoding, - metadataBlockEncryptor, - pageHeaderAAD, - sizeStatistics); - offsetIndexBuilder.add( - toIntWithCheck(out.getPos() - beforeHeader, "page"), - rowCount, - sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); + withAbortOnFailure(() -> { + long beforeHeader = out.getPos(); + innerWriteDataPage( + valueCount, + uncompressedPageSize, + bytes, + statistics, + rlEncoding, + dlEncoding, + valuesEncoding, + metadataBlockEncryptor, + pageHeaderAAD, + sizeStatistics); + offsetIndexBuilder.add( + toIntWithCheck(out.getPos() - beforeHeader, "page"), + rowCount, + sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); + }); } private void innerWriteDataPage( @@ -979,51 +1019,53 @@ public void writeDataPage( byte[] pageHeaderAAD, SizeStatistics sizeStatistics) throws IOException { - state = state.write(); - long beforeHeader = out.getPos(); - if (currentChunkFirstDataPage < 0) { - currentChunkFirstDataPage = beforeHeader; - } - LOG.debug("{}: write data page: {} values", beforeHeader, valueCount); - int compressedPageSize = toIntWithCheck(bytes.size(), "page"); - if (pageWriteChecksumEnabled) { - crc.reset(); - crcUpdate(bytes); - metadataConverter.writeDataPageV1Header( - uncompressedPageSize, - compressedPageSize, - valueCount, - rlEncoding, - dlEncoding, - valuesEncoding, - (int) crc.getValue(), - out, - metadataBlockEncryptor, - pageHeaderAAD); - } else { - metadataConverter.writeDataPageV1Header( - uncompressedPageSize, - compressedPageSize, - valueCount, - rlEncoding, - dlEncoding, - valuesEncoding, - out, - metadataBlockEncryptor, - pageHeaderAAD); - } - long headerSize = out.getPos() - beforeHeader; - this.uncompressedLength += uncompressedPageSize + headerSize; - this.compressedLength += compressedPageSize + headerSize; - LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize); - bytes.writeAllTo(out); + withAbortOnFailure(() -> { + state = state.write(); + long beforeHeader = out.getPos(); + if (currentChunkFirstDataPage < 0) { + currentChunkFirstDataPage = beforeHeader; + } + LOG.debug("{}: write data page: {} values", beforeHeader, valueCount); + int compressedPageSize = toIntWithCheck(bytes.size(), "page"); + if (pageWriteChecksumEnabled) { + crc.reset(); + crcUpdate(bytes); + metadataConverter.writeDataPageV1Header( + uncompressedPageSize, + compressedPageSize, + valueCount, + rlEncoding, + dlEncoding, + valuesEncoding, + (int) crc.getValue(), + out, + metadataBlockEncryptor, + pageHeaderAAD); + } else { + metadataConverter.writeDataPageV1Header( + uncompressedPageSize, + compressedPageSize, + valueCount, + rlEncoding, + dlEncoding, + valuesEncoding, + out, + metadataBlockEncryptor, + pageHeaderAAD); + } + long headerSize = out.getPos() - beforeHeader; + this.uncompressedLength += uncompressedPageSize + headerSize; + this.compressedLength += compressedPageSize + headerSize; + LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize); + bytes.writeAllTo(out); - mergeColumnStatistics(statistics, sizeStatistics); + mergeColumnStatistics(statistics, sizeStatistics); - encodingStatsBuilder.addDataEncoding(valuesEncoding); - currentEncodings.add(rlEncoding); - currentEncodings.add(dlEncoding); - currentEncodings.add(valuesEncoding); + encodingStatsBuilder.addDataEncoding(valuesEncoding); + currentEncodings.add(rlEncoding); + currentEncodings.add(dlEncoding); + currentEncodings.add(valuesEncoding); + }); } /** @@ -1298,76 +1340,78 @@ public void writeDataPageV2( byte[] pageHeaderAAD, SizeStatistics sizeStatistics) throws IOException { - state = state.write(); - int rlByteLength = toIntWithCheck(repetitionLevels.size(), "page repetition levels"); - int dlByteLength = toIntWithCheck(definitionLevels.size(), "page definition levels"); + withAbortOnFailure(() -> { + state = state.write(); + int rlByteLength = toIntWithCheck(repetitionLevels.size(), "page repetition levels"); + int dlByteLength = toIntWithCheck(definitionLevels.size(), "page definition levels"); - int compressedSize = toIntWithCheck(bytes.size() + repetitionLevels.size() + definitionLevels.size(), "page"); + int compressedSize = toIntWithCheck(bytes.size() + repetitionLevels.size() + definitionLevels.size(), "page"); - int uncompressedSize = - toIntWithCheck(uncompressedDataSize + repetitionLevels.size() + definitionLevels.size(), "page"); + int uncompressedSize = + toIntWithCheck(uncompressedDataSize + repetitionLevels.size() + definitionLevels.size(), "page"); - long beforeHeader = out.getPos(); - if (currentChunkFirstDataPage < 0) { - currentChunkFirstDataPage = beforeHeader; - } - - if (pageWriteChecksumEnabled) { - crc.reset(); - if (repetitionLevels.size() > 0) { - crcUpdate(repetitionLevels); - } - if (definitionLevels.size() > 0) { - crcUpdate(definitionLevels); + long beforeHeader = out.getPos(); + if (currentChunkFirstDataPage < 0) { + currentChunkFirstDataPage = beforeHeader; } - if (bytes.size() > 0) { - crcUpdate(bytes); + + if (pageWriteChecksumEnabled) { + crc.reset(); + if (repetitionLevels.size() > 0) { + crcUpdate(repetitionLevels); + } + if (definitionLevels.size() > 0) { + crcUpdate(definitionLevels); + } + if (bytes.size() > 0) { + crcUpdate(bytes); + } + metadataConverter.writeDataPageV2Header( + uncompressedSize, + compressedSize, + valueCount, + nullCount, + rowCount, + dataEncoding, + rlByteLength, + dlByteLength, + compressed, + (int) crc.getValue(), + out, + metadataBlockEncryptor, + pageHeaderAAD); + } else { + metadataConverter.writeDataPageV2Header( + uncompressedSize, + compressedSize, + valueCount, + nullCount, + rowCount, + dataEncoding, + rlByteLength, + dlByteLength, + compressed, + out, + metadataBlockEncryptor, + pageHeaderAAD); } - metadataConverter.writeDataPageV2Header( - uncompressedSize, - compressedSize, - valueCount, - nullCount, - rowCount, - dataEncoding, - rlByteLength, - dlByteLength, - compressed, - (int) crc.getValue(), - out, - metadataBlockEncryptor, - pageHeaderAAD); - } else { - metadataConverter.writeDataPageV2Header( - uncompressedSize, - compressedSize, - valueCount, - nullCount, - rowCount, - dataEncoding, - rlByteLength, - dlByteLength, - compressed, - out, - metadataBlockEncryptor, - pageHeaderAAD); - } - long headersSize = out.getPos() - beforeHeader; - this.uncompressedLength += uncompressedSize + headersSize; - this.compressedLength += compressedSize + headersSize; + long headersSize = out.getPos() - beforeHeader; + this.uncompressedLength += uncompressedSize + headersSize; + this.compressedLength += compressedSize + headersSize; - mergeColumnStatistics(statistics, sizeStatistics); + mergeColumnStatistics(statistics, sizeStatistics); - currentEncodings.add(dataEncoding); - encodingStatsBuilder.addDataEncoding(dataEncoding); + currentEncodings.add(dataEncoding); + encodingStatsBuilder.addDataEncoding(dataEncoding); - BytesInput.concat(repetitionLevels, definitionLevels, bytes).writeAllTo(out); + BytesInput.concat(repetitionLevels, definitionLevels, bytes).writeAllTo(out); - offsetIndexBuilder.add( - toIntWithCheck(out.getPos() - beforeHeader, "page"), - rowCount, - sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); + offsetIndexBuilder.add( + toIntWithCheck(out.getPos() - beforeHeader, "page"), + rowCount, + sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); + }); } private void crcUpdate(BytesInput bytes) { @@ -1458,58 +1502,60 @@ void writeColumnChunk( int columnOrdinal, byte[] fileAAD) throws IOException { - startColumn(descriptor, valueCount, compressionCodecName); - - state = state.write(); - if (dictionaryPage != null) { - byte[] dictonaryPageHeaderAAD = null; - if (null != headerBlockEncryptor) { - dictonaryPageHeaderAAD = AesCipher.createModuleAAD( - fileAAD, ModuleType.DictionaryPageHeader, rowGroupOrdinal, columnOrdinal, -1); + withAbortOnFailure(() -> { + startColumn(descriptor, valueCount, compressionCodecName); + + state = state.write(); + if (dictionaryPage != null) { + byte[] dictonaryPageHeaderAAD = null; + if (null != headerBlockEncryptor) { + dictonaryPageHeaderAAD = AesCipher.createModuleAAD( + fileAAD, ModuleType.DictionaryPageHeader, rowGroupOrdinal, columnOrdinal, -1); + } + writeDictionaryPage(dictionaryPage, headerBlockEncryptor, dictonaryPageHeaderAAD); } - writeDictionaryPage(dictionaryPage, headerBlockEncryptor, dictonaryPageHeaderAAD); - } - if (bloomFilter != null) { - // write bloom filter if one of data pages is not dictionary encoded - boolean isWriteBloomFilter = false; - for (Encoding encoding : dataEncodings) { - // dictionary encoding: `PLAIN_DICTIONARY` is used in parquet v1, `RLE_DICTIONARY` is used in parquet v2 - if (encoding != Encoding.PLAIN_DICTIONARY && encoding != Encoding.RLE_DICTIONARY) { - isWriteBloomFilter = true; - break; + if (bloomFilter != null) { + // write bloom filter if one of data pages is not dictionary encoded + boolean isWriteBloomFilter = false; + for (Encoding encoding : dataEncodings) { + // dictionary encoding: `PLAIN_DICTIONARY` is used in parquet v1, `RLE_DICTIONARY` is used in parquet v2 + if (encoding != Encoding.PLAIN_DICTIONARY && encoding != Encoding.RLE_DICTIONARY) { + isWriteBloomFilter = true; + break; + } + } + if (isWriteBloomFilter) { + currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter); + } else { + LOG.info( + "No need to write bloom filter because column {} data pages are all encoded as dictionary.", + descriptor.getPath()); } } - if (isWriteBloomFilter) { - currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter); - } else { - LOG.info( - "No need to write bloom filter because column {} data pages are all encoded as dictionary.", - descriptor.getPath()); + LOG.debug("{}: write data pages", out.getPos()); + long headersSize = bytes.size() - compressedTotalPageSize; + this.uncompressedLength += uncompressedTotalPageSize + headersSize; + this.compressedLength += compressedTotalPageSize + headersSize; + LOG.debug("{}: write data pages content", out.getPos()); + currentChunkFirstDataPage = out.getPos(); + bytes.writeAllTo(out); + encodingStatsBuilder.addDataEncodings(dataEncodings); + if (rlEncodings.isEmpty()) { + encodingStatsBuilder.withV2Pages(); } - } - LOG.debug("{}: write data pages", out.getPos()); - long headersSize = bytes.size() - compressedTotalPageSize; - this.uncompressedLength += uncompressedTotalPageSize + headersSize; - this.compressedLength += compressedTotalPageSize + headersSize; - LOG.debug("{}: write data pages content", out.getPos()); - currentChunkFirstDataPage = out.getPos(); - bytes.writeAllTo(out); - encodingStatsBuilder.addDataEncodings(dataEncodings); - if (rlEncodings.isEmpty()) { - encodingStatsBuilder.withV2Pages(); - } - currentEncodings.addAll(rlEncodings); - currentEncodings.addAll(dlEncodings); - currentEncodings.addAll(dataEncodings); - currentStatistics = totalStats; - currentSizeStatistics = totalSizeStats; - currentGeospatialStatistics = totalGeospatialStats; + currentEncodings.addAll(rlEncodings); + currentEncodings.addAll(dlEncodings); + currentEncodings.addAll(dataEncodings); + currentStatistics = totalStats; + currentSizeStatistics = totalSizeStats; + currentGeospatialStatistics = totalGeospatialStats; - this.columnIndexBuilder = columnIndexBuilder; - this.offsetIndexBuilder = offsetIndexBuilder; + this.columnIndexBuilder = columnIndexBuilder; + this.offsetIndexBuilder = offsetIndexBuilder; - endColumn(); + endColumn(); + }); } /** @@ -1531,34 +1577,36 @@ public void invalidateStatistics(Statistics totalStatistics) { * @throws IOException if there is an error while writing */ public void endColumn() throws IOException { - state = state.endColumn(); - LOG.debug("{}: end column", out.getPos()); - if (columnIndexBuilder.getMinMaxSize() > columnIndexBuilder.getPageCount() * MAX_STATS_SIZE) { - currentColumnIndexes.add(null); - } else { - currentColumnIndexes.add(columnIndexBuilder.build()); - } - currentOffsetIndexes.add(offsetIndexBuilder.build(currentChunkFirstDataPage)); - currentBlock.addColumn(ColumnChunkMetaData.get( - currentChunkPath, - currentChunkType, - currentChunkCodec, - encodingStatsBuilder.build(), - currentEncodings, - currentStatistics, - currentChunkFirstDataPage, - currentChunkDictionaryPageOffset, - currentChunkValueCount, - compressedLength, - uncompressedLength, - currentSizeStatistics, - currentGeospatialStatistics)); - this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength); - this.uncompressedLength = 0; - this.compressedLength = 0; - this.currentChunkDictionaryPageOffset = 0; - columnIndexBuilder = null; - offsetIndexBuilder = null; + withAbortOnFailure(() -> { + state = state.endColumn(); + LOG.debug("{}: end column", out.getPos()); + if (columnIndexBuilder.getMinMaxSize() > columnIndexBuilder.getPageCount() * MAX_STATS_SIZE) { + currentColumnIndexes.add(null); + } else { + currentColumnIndexes.add(columnIndexBuilder.build()); + } + currentOffsetIndexes.add(offsetIndexBuilder.build(currentChunkFirstDataPage)); + currentBlock.addColumn(ColumnChunkMetaData.get( + currentChunkPath, + currentChunkType, + currentChunkCodec, + encodingStatsBuilder.build(), + currentEncodings, + currentStatistics, + currentChunkFirstDataPage, + currentChunkDictionaryPageOffset, + currentChunkValueCount, + compressedLength, + uncompressedLength, + currentSizeStatistics, + currentGeospatialStatistics)); + this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength); + this.uncompressedLength = 0; + this.compressedLength = 0; + this.currentChunkDictionaryPageOffset = 0; + columnIndexBuilder = null; + offsetIndexBuilder = null; + }); } /** @@ -1567,22 +1615,24 @@ public void endColumn() throws IOException { * @throws IOException if there is an error while writing */ public void endBlock() throws IOException { - if (currentRecordCount == 0) { - throw new ParquetEncodingException("End block with zero record"); - } + withAbortOnFailure(() -> { + if (currentRecordCount == 0) { + throw new ParquetEncodingException("End block with zero record"); + } - state = state.endBlock(); - LOG.debug("{}: end block", out.getPos()); - currentBlock.setRowCount(currentRecordCount); - currentBlock.setOrdinal(blocks.size()); - blocks.add(currentBlock); - columnIndexes.add(currentColumnIndexes); - offsetIndexes.add(currentOffsetIndexes); - bloomFilters.add(currentBloomFilters); - currentColumnIndexes = null; - currentOffsetIndexes = null; - currentBloomFilters = null; - currentBlock = null; + state = state.endBlock(); + LOG.debug("{}: end block", out.getPos()); + currentBlock.setRowCount(currentRecordCount); + currentBlock.setOrdinal(blocks.size()); + blocks.add(currentBlock); + columnIndexes.add(currentColumnIndexes); + offsetIndexes.add(currentOffsetIndexes); + bloomFilters.add(currentBloomFilters); + currentColumnIndexes = null; + currentOffsetIndexes = null; + currentBloomFilters = null; + currentBlock = null; + }); } /** @@ -1599,9 +1649,11 @@ public void appendFile(Configuration conf, Path file) throws IOException { } public void appendFile(InputFile file) throws IOException { - try (ParquetFileReader reader = ParquetFileReader.open(file)) { - reader.appendTo(this); - } + withAbortOnFailure(() -> { + try (ParquetFileReader reader = ParquetFileReader.open(file)) { + reader.appendTo(this); + } + }); } /** @@ -1620,9 +1672,11 @@ public void appendRowGroups(FSDataInputStream file, List rowGroup public void appendRowGroups(SeekableInputStream file, List rowGroups, boolean dropColumns) throws IOException { - for (BlockMetaData block : rowGroups) { - appendRowGroup(file, block, dropColumns); - } + withAbortOnFailure(() -> { + for (BlockMetaData block : rowGroups) { + appendRowGroup(file, block, dropColumns); + } + }); } /** @@ -1640,83 +1694,85 @@ public void appendRowGroup(FSDataInputStream from, BlockMetaData rowGroup, boole public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup, boolean dropColumns) throws IOException { - startBlock(rowGroup.getRowCount()); - - Map columnsToCopy = new HashMap(); - for (ColumnChunkMetaData chunk : rowGroup.getColumns()) { - columnsToCopy.put(chunk.getPath().toDotString(), chunk); - } - - List columnsInOrder = new ArrayList(); + withAbortOnFailure(() -> { + startBlock(rowGroup.getRowCount()); - for (ColumnDescriptor descriptor : schema.getColumns()) { - String path = ColumnPath.get(descriptor.getPath()).toDotString(); - ColumnChunkMetaData chunk = columnsToCopy.remove(path); - if (chunk != null) { - columnsInOrder.add(chunk); - } else { - throw new IllegalArgumentException( - String.format("Missing column '%s', cannot copy row group: %s", path, rowGroup)); + Map columnsToCopy = new HashMap(); + for (ColumnChunkMetaData chunk : rowGroup.getColumns()) { + columnsToCopy.put(chunk.getPath().toDotString(), chunk); } - } - - // complain if some columns would be dropped and that's not okay - if (!dropColumns && !columnsToCopy.isEmpty()) { - throw new IllegalArgumentException(String.format( - "Columns cannot be copied (missing from target schema): %s", - String.join(", ", columnsToCopy.keySet()))); - } - // copy the data for all chunks - long start = -1; - long length = 0; - long blockUncompressedSize = 0L; - for (int i = 0; i < columnsInOrder.size(); i += 1) { - ColumnChunkMetaData chunk = columnsInOrder.get(i); + List columnsInOrder = new ArrayList(); - // get this chunk's start position in the new file - long newChunkStart = out.getPos() + length; - - // add this chunk to be copied with any previous chunks - if (start < 0) { - // no previous chunk included, start at this chunk's starting pos - start = chunk.getStartingPos(); + for (ColumnDescriptor descriptor : schema.getColumns()) { + String path = ColumnPath.get(descriptor.getPath()).toDotString(); + ColumnChunkMetaData chunk = columnsToCopy.remove(path); + if (chunk != null) { + columnsInOrder.add(chunk); + } else { + throw new IllegalArgumentException( + String.format("Missing column '%s', cannot copy row group: %s", path, rowGroup)); + } } - length += chunk.getTotalSize(); - - if ((i + 1) == columnsInOrder.size() || columnsInOrder.get(i + 1).getStartingPos() != (start + length)) { - // not contiguous. do the copy now. - copy(from, out, start, length); - // reset to start at the next column chunk - start = -1; - length = 0; + + // complain if some columns would be dropped and that's not okay + if (!dropColumns && !columnsToCopy.isEmpty()) { + throw new IllegalArgumentException(String.format( + "Columns cannot be copied (missing from target schema): %s", + String.join(", ", columnsToCopy.keySet()))); } - // TODO: column/offset indexes are not copied - // (it would require seeking to the end of the file for each row groups) - currentColumnIndexes.add(null); - currentOffsetIndexes.add(null); + // copy the data for all chunks + long start = -1; + long length = 0; + long blockUncompressedSize = 0L; + for (int i = 0; i < columnsInOrder.size(); i += 1) { + ColumnChunkMetaData chunk = columnsInOrder.get(i); - Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart); - currentBlock.addColumn(ColumnChunkMetaData.get( - chunk.getPath(), - chunk.getPrimitiveType(), - chunk.getCodec(), - chunk.getEncodingStats(), - chunk.getEncodings(), - chunk.getStatistics(), - offsets.firstDataPageOffset, - offsets.dictionaryPageOffset, - chunk.getValueCount(), - chunk.getTotalSize(), - chunk.getTotalUncompressedSize())); + // get this chunk's start position in the new file + long newChunkStart = out.getPos() + length; - blockUncompressedSize += chunk.getTotalUncompressedSize(); - } + // add this chunk to be copied with any previous chunks + if (start < 0) { + // no previous chunk included, start at this chunk's starting pos + start = chunk.getStartingPos(); + } + length += chunk.getTotalSize(); + + if ((i + 1) == columnsInOrder.size() || columnsInOrder.get(i + 1).getStartingPos() != (start + length)) { + // not contiguous. do the copy now. + copy(from, out, start, length); + // reset to start at the next column chunk + start = -1; + length = 0; + } - currentBlock.setTotalByteSize(blockUncompressedSize); + // TODO: column/offset indexes are not copied + // (it would require seeking to the end of the file for each row groups) + currentColumnIndexes.add(null); + currentOffsetIndexes.add(null); + + Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart); + currentBlock.addColumn(ColumnChunkMetaData.get( + chunk.getPath(), + chunk.getPrimitiveType(), + chunk.getCodec(), + chunk.getEncodingStats(), + chunk.getEncodings(), + chunk.getStatistics(), + offsets.firstDataPageOffset, + offsets.dictionaryPageOffset, + chunk.getValueCount(), + chunk.getTotalSize(), + chunk.getTotalUncompressedSize())); + + blockUncompressedSize += chunk.getTotalUncompressedSize(); + } - endBlock(); + currentBlock.setTotalByteSize(blockUncompressedSize); + + endBlock(); + }); } /** @@ -1736,36 +1792,42 @@ public void appendColumnChunk( ColumnIndex columnIndex, OffsetIndex offsetIndex) throws IOException { - long start = chunk.getStartingPos(); - long length = chunk.getTotalSize(); - long newChunkStart = out.getPos(); + withAbortOnFailure(() -> { + long start = chunk.getStartingPos(); + long length = chunk.getTotalSize(); + long newChunkStart = out.getPos(); - if (offsetIndex != null && newChunkStart != start) { - offsetIndex = - OffsetIndexBuilder.getBuilder().fromOffsetIndex(offsetIndex).build(newChunkStart - start); - } + OffsetIndex effectiveOffsetIndex = offsetIndex; - copy(from, out, start, length); + if (effectiveOffsetIndex != null && newChunkStart != start) { + effectiveOffsetIndex = OffsetIndexBuilder.getBuilder() + .fromOffsetIndex(effectiveOffsetIndex) + .build(newChunkStart - start); + } - currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter); - currentColumnIndexes.add(columnIndex); - currentOffsetIndexes.add(offsetIndex); + copy(from, out, start, length); - Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart); - currentBlock.addColumn(ColumnChunkMetaData.get( - chunk.getPath(), - chunk.getPrimitiveType(), - chunk.getCodec(), - chunk.getEncodingStats(), - chunk.getEncodings(), - chunk.getStatistics(), - offsets.firstDataPageOffset, - offsets.dictionaryPageOffset, - chunk.getValueCount(), - chunk.getTotalSize(), - chunk.getTotalUncompressedSize())); + currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter); + currentColumnIndexes.add(columnIndex); + currentOffsetIndexes.add(effectiveOffsetIndex); + + Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart); + currentBlock.addColumn(ColumnChunkMetaData.get( + chunk.getPath(), + chunk.getPrimitiveType(), + chunk.getCodec(), + chunk.getEncodingStats(), + chunk.getEncodings(), + chunk.getStatistics(), + offsets.firstDataPageOffset, + offsets.dictionaryPageOffset, + chunk.getValueCount(), + chunk.getTotalSize(), + chunk.getTotalUncompressedSize())); - currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + chunk.getTotalUncompressedSize()); + currentBlock.setTotalByteSize( + currentBlock.getTotalByteSize() + chunk.getTotalUncompressedSize()); + }); } // Buffers for the copy function. @@ -1805,23 +1867,22 @@ private static void copy(SeekableInputStream from, PositionOutputStream to, long * @throws IOException if there is an error while writing */ public void end(Map extraMetaData) throws IOException { - try { - state = state.end(); - serializeColumnIndexes(columnIndexes, blocks, out, fileEncryptor); - serializeOffsetIndexes(offsetIndexes, blocks, out, fileEncryptor); - serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor); - LOG.debug("{}: end", out.getPos()); - this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks); - serializeFooter(footer, out, fileEncryptor, metadataConverter); - } catch (IOException e) { - abort(); - throw e; - } finally { - close(); - } + withAbortOnFailure(() -> { + try { + state = state.end(); + serializeColumnIndexes(columnIndexes, blocks, out, fileEncryptor); + serializeOffsetIndexes(offsetIndexes, blocks, out, fileEncryptor); + serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor); + LOG.debug("{}: end", out.getPos()); + this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks); + serializeFooter(footer, out, fileEncryptor, metadataConverter); + } finally { + close(); + } + }); } - /* Mark the writer as aborted to avoid flushing incomplete data to the cloud. */ + /* Mark the writer as aborted to avoid flushing incomplete data. */ public void abort() { aborted = true; } @@ -1833,15 +1894,15 @@ public void close() throws IOException { } try { - if (!aborted && out != null) { - out.flush(); + if (!aborted) { + try (PositionOutputStream temp = out) { + temp.flush(); + } } - } catch (IOException e) { - throw e; - } finally { if (crcAllocator != null) { crcAllocator.close(); } + } finally { closed = true; } } @@ -2288,11 +2349,11 @@ static ParquetMetadata mergeFooters( * @throws IOException if there is an error while getting the current stream's position */ public long getPos() throws IOException { - return out.getPos(); + return withAbortOnFailure(() -> out.getPos()); } public long getNextRowGroupSize() throws IOException { - return alignment.nextRowGroupSize(out); + return withAbortOnFailure(() -> alignment.nextRowGroupSize(out)); } /** diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index d2c4124239..9399d69ca1 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -44,6 +44,7 @@ import com.google.common.collect.ImmutableMap; import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -52,6 +53,7 @@ import net.openhft.hashing.LongHashFunction; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.bytes.HeapByteBufferAllocator; From fa193d851f101b43c164ec1e35be3df168ebf27e Mon Sep 17 00:00:00 2001 From: Jiayi-Wang-db Date: Fri, 31 Oct 2025 12:19:40 +0100 Subject: [PATCH 7/7] spotless --- .../parquet/hadoop/ParquetFileWriter.java | 32 +++++++++++-------- .../parquet/hadoop/TestParquetWriter.java | 6 ++-- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 65bc42e296..82f4577b83 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -654,15 +654,16 @@ public void startColumn(ColumnDescriptor descriptor, long valueCount, Compressio currentChunkFirstDataPage = -1; compressedLength = 0; uncompressedLength = 0; - // The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed one + // The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed + // one currentStatistics = null; currentSizeStatistics = SizeStatistics.newBuilder( descriptor.getPrimitiveType(), descriptor.getMaxRepetitionLevel(), descriptor.getMaxDefinitionLevel()) .build(); - currentGeospatialStatistics = - GeospatialStatistics.newBuilder(descriptor.getPrimitiveType()).build(); + currentGeospatialStatistics = GeospatialStatistics.newBuilder(descriptor.getPrimitiveType()) + .build(); columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength); offsetIndexBuilder = OffsetIndexBuilder.getBuilder(); @@ -715,7 +716,9 @@ public void writeDictionaryPage( this.uncompressedLength += uncompressedSize + headerSize; this.compressedLength += compressedPageSize + headerSize; LOG.debug("{}: write dictionary page content {}", out.getPos(), compressedPageSize); - dictionaryPage.getBytes().writeAllTo(out); // for encrypted column, dictionary page bytes are already encrypted + dictionaryPage + .getBytes() + .writeAllTo(out); // for encrypted column, dictionary page bytes are already encrypted encodingStatsBuilder.addDictEncoding(dictionaryPage.getEncoding()); currentEncodings.add(dictionaryPage.getEncoding()); }); @@ -1345,7 +1348,8 @@ public void writeDataPageV2( int rlByteLength = toIntWithCheck(repetitionLevels.size(), "page repetition levels"); int dlByteLength = toIntWithCheck(definitionLevels.size(), "page definition levels"); - int compressedSize = toIntWithCheck(bytes.size() + repetitionLevels.size() + definitionLevels.size(), "page"); + int compressedSize = + toIntWithCheck(bytes.size() + repetitionLevels.size() + definitionLevels.size(), "page"); int uncompressedSize = toIntWithCheck(uncompressedDataSize + repetitionLevels.size() + definitionLevels.size(), "page"); @@ -1519,7 +1523,8 @@ void writeColumnChunk( // write bloom filter if one of data pages is not dictionary encoded boolean isWriteBloomFilter = false; for (Encoding encoding : dataEncodings) { - // dictionary encoding: `PLAIN_DICTIONARY` is used in parquet v1, `RLE_DICTIONARY` is used in parquet v2 + // dictionary encoding: `PLAIN_DICTIONARY` is used in parquet v1, `RLE_DICTIONARY` is used in + // parquet v2 if (encoding != Encoding.PLAIN_DICTIONARY && encoding != Encoding.RLE_DICTIONARY) { isWriteBloomFilter = true; break; @@ -1739,7 +1744,8 @@ public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup, boo } length += chunk.getTotalSize(); - if ((i + 1) == columnsInOrder.size() || columnsInOrder.get(i + 1).getStartingPos() != (start + length)) { + if ((i + 1) == columnsInOrder.size() + || columnsInOrder.get(i + 1).getStartingPos() != (start + length)) { // not contiguous. do the copy now. copy(from, out, start, length); // reset to start at the next column chunk @@ -1800,9 +1806,9 @@ public void appendColumnChunk( OffsetIndex effectiveOffsetIndex = offsetIndex; if (effectiveOffsetIndex != null && newChunkStart != start) { - effectiveOffsetIndex = OffsetIndexBuilder.getBuilder() - .fromOffsetIndex(effectiveOffsetIndex) - .build(newChunkStart - start); + effectiveOffsetIndex = OffsetIndexBuilder.getBuilder() + .fromOffsetIndex(effectiveOffsetIndex) + .build(newChunkStart - start); } copy(from, out, start, length); @@ -1825,8 +1831,7 @@ public void appendColumnChunk( chunk.getTotalSize(), chunk.getTotalUncompressedSize())); - currentBlock.setTotalByteSize( - currentBlock.getTotalByteSize() + chunk.getTotalUncompressedSize()); + currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + chunk.getTotalUncompressedSize()); }); } @@ -1874,7 +1879,8 @@ public void end(Map extraMetaData) throws IOException { serializeOffsetIndexes(offsetIndexes, blocks, out, fileEncryptor); serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor); LOG.debug("{}: end", out.getPos()); - this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks); + this.footer = + new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks); serializeFooter(footer, out, fileEncryptor, metadataConverter); } finally { close(); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index 9399d69ca1..9a69ee478a 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -798,15 +798,13 @@ public void testNoFlushAfterException() throws Exception { .named("test_schema_abort"); Configuration conf = new Configuration(); - try (ParquetWriter writer = ExampleParquetWriter.builder(new Path(file.toString())) + try (ParquetWriter writer = ExampleParquetWriter.builder(new Path(file.toString())) .withAllocator(allocator) .withType(schema) .build()) { SimpleGroupFactory f = new SimpleGroupFactory(schema); - writer.write(f.newGroup() - .append("binary_field", "hello") - .append("int32_field", 123)); + writer.write(f.newGroup().append("binary_field", "hello").append("int32_field", 123)); Field internalWriterField = ParquetWriter.class.getDeclaredField("writer"); internalWriterField.setAccessible(true);