Skip to content

Commit 1820d41

Browse files
update aircompressor to 3.5
1 parent 7190ab6 commit 1820d41

File tree

3 files changed

+72
-15
lines changed

3 files changed

+72
-15
lines changed

parquet-hadoop/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@
143143
</dependency>
144144
<dependency>
145145
<groupId>io.airlift</groupId>
146-
<artifactId>aircompressor</artifactId>
147-
<version>2.0.2</version>
146+
<artifactId>aircompressor-v3</artifactId>
147+
<version>3.5</version>
148148
</dependency>
149149
<dependency>
150150
<groupId>commons-pool</groupId>

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCompressor.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,53 @@
1818
*/
1919
package org.apache.parquet.hadoop.codec;
2020

21-
import io.airlift.compress.lz4.Lz4Compressor;
21+
import io.airlift.compress.v3.lz4.Lz4Compressor;
2222
import java.io.IOException;
2323
import java.nio.ByteBuffer;
2424

2525
public class Lz4RawCompressor extends NonBlockedCompressor {
2626

27-
private Lz4Compressor compressor = new Lz4Compressor();
27+
private final Lz4Compressor compressor = Lz4Compressor.create();
28+
29+
/** Reused for direct buffers; lazily allocated and grown when needed. */
30+
private byte[] inputBuf;
31+
/** Reused for direct buffers; lazily allocated and grown when needed. */
32+
private byte[] outputBuf;
2833

2934
@Override
3035
protected int maxCompressedLength(int byteSize) {
31-
return io.airlift.compress.lz4.Lz4RawCompressor.maxCompressedLength(byteSize);
36+
return compressor.maxCompressedLength(byteSize);
3237
}
3338

3439
@Override
3540
protected int compress(ByteBuffer uncompressed, ByteBuffer compressed) throws IOException {
36-
compressor.compress(uncompressed, compressed);
37-
int compressedSize = compressed.position();
38-
compressed.limit(compressedSize);
39-
compressed.rewind();
41+
int startPos = compressed.position();
42+
int inputLen = uncompressed.remaining();
43+
int maxOut = compressor.maxCompressedLength(inputLen);
44+
45+
final int compressedSize;
46+
if (uncompressed.hasArray() && compressed.hasArray()) {
47+
int inputOffset = uncompressed.arrayOffset() + uncompressed.position();
48+
int outputOffset = compressed.arrayOffset() + compressed.position();
49+
compressedSize = compressor.compress(
50+
uncompressed.array(), inputOffset, inputLen,
51+
compressed.array(), outputOffset, maxOut);
52+
// Advance positions to match the direct-buffer path (where get/put do this)
53+
uncompressed.position(uncompressed.position() + inputLen);
54+
} else {
55+
if (inputBuf == null || inputBuf.length < inputLen) {
56+
inputBuf = new byte[inputLen];
57+
}
58+
if (outputBuf == null || outputBuf.length < maxOut) {
59+
outputBuf = new byte[maxOut];
60+
}
61+
uncompressed.get(inputBuf, 0, inputLen);
62+
compressedSize = compressor.compress(inputBuf, 0, inputLen, outputBuf, 0, maxOut);
63+
compressed.put(outputBuf, 0, compressedSize);
64+
}
65+
66+
compressed.limit(startPos + compressedSize);
67+
compressed.position(startPos);
4068
return compressedSize;
4169
}
4270
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,19 @@
1818
*/
1919
package org.apache.parquet.hadoop.codec;
2020

21-
import io.airlift.compress.lz4.Lz4Decompressor;
21+
import io.airlift.compress.v3.lz4.Lz4Decompressor;
2222
import java.io.IOException;
2323
import java.nio.ByteBuffer;
2424
import org.apache.hadoop.io.compress.DirectDecompressor;
2525

2626
public class Lz4RawDecompressor extends NonBlockedDecompressor implements DirectDecompressor {
2727

28-
private Lz4Decompressor decompressor = new Lz4Decompressor();
28+
private final Lz4Decompressor decompressor = Lz4Decompressor.create();
29+
30+
/** Reused for direct buffers; lazily allocated and grown when needed. */
31+
private byte[] inputBuf;
32+
/** Reused for direct buffers; lazily allocated and grown when needed. */
33+
private byte[] outputBuf;
2934

3035
@Override
3136
protected int maxUncompressedLength(ByteBuffer compressed, int maxUncompressedLength) throws IOException {
@@ -36,10 +41,34 @@ protected int maxUncompressedLength(ByteBuffer compressed, int maxUncompressedLe
3641

3742
@Override
3843
protected int uncompress(ByteBuffer compressed, ByteBuffer uncompressed) throws IOException {
39-
decompressor.decompress(compressed, uncompressed);
40-
int uncompressedSize = uncompressed.position();
41-
uncompressed.limit(uncompressedSize);
42-
uncompressed.rewind();
44+
int startPos = uncompressed.position();
45+
int compressedLen = compressed.remaining();
46+
int maxOut = uncompressed.remaining();
47+
48+
final int uncompressedSize;
49+
if (compressed.hasArray() && uncompressed.hasArray()) {
50+
int inputOffset = compressed.arrayOffset() + compressed.position();
51+
int outputOffset = uncompressed.arrayOffset() + uncompressed.position();
52+
uncompressedSize = decompressor.decompress(
53+
compressed.array(), inputOffset, compressedLen,
54+
uncompressed.array(), outputOffset, maxOut);
55+
// Advance positions to match the direct-buffer path (where get/put do this)
56+
compressed.position(compressed.position() + compressedLen);
57+
} else {
58+
if (inputBuf == null || inputBuf.length < compressedLen) {
59+
inputBuf = new byte[compressedLen];
60+
}
61+
if (outputBuf == null || outputBuf.length < maxOut) {
62+
outputBuf = new byte[maxOut];
63+
}
64+
compressed.get(inputBuf, 0, compressedLen);
65+
uncompressedSize = decompressor.decompress(
66+
inputBuf, 0, compressedLen, outputBuf, 0, maxOut);
67+
uncompressed.put(outputBuf, 0, uncompressedSize);
68+
}
69+
70+
uncompressed.limit(startPos + uncompressedSize);
71+
uncompressed.position(startPos);
4372
return uncompressedSize;
4473
}
4574

0 commit comments

Comments
 (0)