diff --git a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
index e2bbe35480b..23f5d65fbb5 100644
--- a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
+++ b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
@@ -23,6 +23,7 @@
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.charset.Charset;
@@ -493,6 +494,16 @@ public ArrowBuf retain() {
return retain(1);
}
+ @Override
+ public ByteBuf touch() {
+ return this;
+ }
+
+ @Override
+ public ByteBuf touch(Object hint) {
+ return this;
+ }
+
@Override
public long getLong(int index) {
chk(index, 8);
@@ -505,6 +516,17 @@ public float getFloat(int index) {
return Float.intBitsToFloat(getInt(index));
}
+ /**
+ * Gets a 64-bit long integer at the specified absolute {@code index} in
+ * this buffer in Big Endian Byte Order.
+ */
+ @Override
+ public long getLongLE(int index) {
+ chk(index, 8);
+ final long v = PlatformDependent.getLong(addr(index));
+ return Long.reverseBytes(v);
+ }
+
@Override
public double getDouble(int index) {
return Double.longBitsToDouble(getLong(index));
@@ -527,6 +549,17 @@ public int getInt(int index) {
return v;
}
+ /**
+ * Gets a 32-bit integer at the specified absolute {@code index} in
+ * this buffer in Big Endian Byte Order.
+ */
+ @Override
+ public int getIntLE(int index) {
+ chk(index, 4);
+ final int v = PlatformDependent.getInt(addr(index));
+ return Integer.reverseBytes(v);
+ }
+
@Override
public int getUnsignedShort(int index) {
return getShort(index) & 0xFFFF;
@@ -535,10 +568,44 @@ public int getUnsignedShort(int index) {
@Override
public short getShort(int index) {
chk(index, 2);
- short v = PlatformDependent.getShort(addr(index));
+ final short v = PlatformDependent.getShort(addr(index));
return v;
}
+ /**
+ * Gets a 16-bit short integer at the specified absolute {@code index} in
+ * this buffer in Big Endian Byte Order.
+ */
+ @Override
+ public short getShortLE(int index) {
+ final short v = PlatformDependent.getShort(addr(index));
+ return Short.reverseBytes(v);
+ }
+
+ /**
+ * Gets an unsigned 24-bit medium integer at the specified absolute
+ * {@code index} in this buffer.
+ */
+ @Override
+ public int getUnsignedMedium(int index) {
+ chk(index, 3);
+ final long addr = addr(index);
+ return (PlatformDependent.getByte(addr) & 0xff) << 16 |
+ (PlatformDependent.getShort(addr + 1) & 0xffff);
+ }
+
+ /**
+ * Gets an unsigned 24-bit medium integer at the specified absolute {@code index} in
+ * this buffer in Big Endian Byte Order.
+ */
+ @Override
+ public int getUnsignedMediumLE(int index) {
+ chk(index, 3);
+ final long addr = addr(index);
+ return (PlatformDependent.getByte(addr) & 0xff) |
+ (Short.reverseBytes(PlatformDependent.getShort(addr + 1)) & 0xffff) << 8;
+ }
+
@Override
public ArrowBuf setShort(int index, int value) {
chk(index, 2);
@@ -546,6 +613,44 @@ public ArrowBuf setShort(int index, int value) {
return this;
}
+ /**
+ * Sets the specified 16-bit short integer at the specified absolute {@code index}
+ * in this buffer with Big Endian byte order.
+ */
+ @Override
+ public ByteBuf setShortLE(int index, int value) {
+ chk(index, 2);
+ PlatformDependent.putShort(addr(index), Short.reverseBytes((short) value));
+ return this;
+ }
+
+ /**
+ * Sets the specified 24-bit medium integer at the specified absolute
+ * {@code index} in this buffer.
+ */
+ @Override
+ public ByteBuf setMedium(int index, int value) {
+ chk(index, 3);
+ final long addr = addr(index);
+ PlatformDependent.putByte(addr, (byte) (value >>> 16));
+ PlatformDependent.putShort(addr + 1, (short) value);
+ return this;
+ }
+
+
+ /**
+ * Sets the specified 24-bit medium integer at the specified absolute {@code index}
+ * in this buffer with Big Endian byte order.
+ */
+ @Override
+ public ByteBuf setMediumLE(int index, int value) {
+ chk(index, 3);
+ final long addr = addr(index);
+ PlatformDependent.putByte(addr, (byte) value);
+ PlatformDependent.putShort(addr + 1, Short.reverseBytes((short) (value >>> 8)));
+ return this;
+ }
+
@Override
public ArrowBuf setInt(int index, int value) {
chk(index, 4);
@@ -553,6 +658,17 @@ public ArrowBuf setInt(int index, int value) {
return this;
}
+ /**
+ * Sets the specified 32-bit integer at the specified absolute {@code index}
+ * in this buffer with Big Endian byte order.
+ */
+ @Override
+ public ByteBuf setIntLE(int index, int value) {
+ chk(index, 4);
+ PlatformDependent.putInt(addr(index), Integer.reverseBytes(value));
+ return this;
+ }
+
@Override
public ArrowBuf setLong(int index, long value) {
chk(index, 8);
@@ -560,6 +676,17 @@ public ArrowBuf setLong(int index, long value) {
return this;
}
+ /**
+ * Sets the specified 64-bit long integer at the specified absolute {@code index}
+ * in this buffer with Big Endian byte order.
+ */
+ @Override
+ public ByteBuf setLongLE(int index, long value) {
+ chk(index, 8);
+ PlatformDependent.putLong(addr(index), Long.reverseBytes(value));
+ return this;
+ }
+
@Override
public ArrowBuf setChar(int index, int value) {
chk(index, 2);
@@ -668,16 +795,46 @@ protected short _getShort(int index) {
return getShort(index);
}
+ /** @see {@link #getShortLE(int)} */
+ @Override
+ protected short _getShortLE(int index) {
+ return getShortLE(index);
+ }
+
@Override
protected int _getInt(int index) {
return getInt(index);
}
+ /** @see {@link #getIntLE(int)} */
+ @Override
+ protected int _getIntLE(int index) {
+ return getIntLE(index);
+ }
+
+ /** @see {@link #getUnsignedMedium(int)} */
+ @Override
+ protected int _getUnsignedMedium(int index) {
+ return getUnsignedMedium(index);
+ }
+
+ /** @see {@link #getUnsignedMediumLE(int)} */
+ @Override
+ protected int _getUnsignedMediumLE(int index) {
+ return getUnsignedMediumLE(index);
+ }
+
@Override
protected long _getLong(int index) {
return getLong(index);
}
+ /** @see {@link #getLongLE(int)} */
+ @Override
+ protected long _getLongLE(int index) {
+ return getLongLE(index);
+ }
+
@Override
protected void _setByte(int index, int value) {
setByte(index, value);
@@ -688,21 +845,45 @@ protected void _setShort(int index, int value) {
setShort(index, value);
}
+ /** @see {@link #setShortLE(int, int)} */
+ @Override
+ protected void _setShortLE(int index, int value) {
+ setShortLE(index, value);
+ }
+
@Override
protected void _setMedium(int index, int value) {
setMedium(index, value);
}
+ /** @see {@link #setMediumLE(int, int)} */
+ @Override
+ protected void _setMediumLE(int index, int value) {
+ setMediumLE(index, value);
+ }
+
@Override
protected void _setInt(int index, int value) {
setInt(index, value);
}
+ /** @see {@link #setIntLE(int, int)} */
+ @Override
+ protected void _setIntLE(int index, int value) {
+ setIntLE(index, value);
+ }
+
@Override
protected void _setLong(int index, long value) {
setLong(index, value);
}
+ /** @see {@link #setLongLE(int, long)} */
+ @Override
+ public void _setLongLE(int index, long value) {
+ setLongLE(index, value);
+ }
+
@Override
public ArrowBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
udle.getBytes(index + offset, dst, dstIndex, length);
@@ -716,16 +897,13 @@ public ArrowBuf getBytes(int index, OutputStream out, int length) throws IOExcep
}
@Override
- protected int _getUnsignedMedium(int index) {
- final long addr = addr(index);
- return (PlatformDependent.getByte(addr) & 0xff) << 16 |
- (PlatformDependent.getByte(addr + 1) & 0xff) << 8 |
- PlatformDependent.getByte(addr + 2) & 0xff;
+ public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
+ return udle.getBytes(index + offset, out, length);
}
@Override
- public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
- return udle.getBytes(index + offset, out, length);
+ public int getBytes(int index, FileChannel out, long position, int length) throws IOException {
+ return udle.getBytes(index + offset, out, position, length);
}
@Override
@@ -776,6 +954,11 @@ public int setBytes(int index, ScatteringByteChannel in, int length) throws IOEx
return udle.setBytes(index + offset, in, length);
}
+ @Override
+ public int setBytes(int index, FileChannel in, long position, int length) throws IOException {
+ return udle.setBytes(index + offset, in, position, length);
+ }
+
@Override
public byte getByte(int index) {
chk(index, 1);
diff --git a/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java b/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
index a5683adccbc..f0bc84cdc2d 100644
--- a/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
+++ b/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java
@@ -23,9 +23,12 @@
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
+import io.netty.util.ByteProcessor;
+
/**
* This is basically a complete copy of DuplicatedByteBuf. We copy because we want to override
* some behaviors and make
@@ -128,6 +131,16 @@ protected short _getShort(int index) {
return buffer.getShort(index);
}
+ @Override
+ public short getShortLE(int index) {
+ return buffer.getShortLE(index);
+ }
+
+ @Override
+ protected short _getShortLE(int index) {
+ return buffer.getShortLE(index);
+ }
+
@Override
public int getUnsignedMedium(int index) {
return _getUnsignedMedium(index);
@@ -138,6 +151,16 @@ protected int _getUnsignedMedium(int index) {
return buffer.getUnsignedMedium(index);
}
+ @Override
+ public int getUnsignedMediumLE(int index) {
+ return buffer.getUnsignedMediumLE(index);
+ }
+
+ @Override
+ protected int _getUnsignedMediumLE(int index) {
+ return buffer.getUnsignedMediumLE(index);
+ }
+
@Override
public int getInt(int index) {
return _getInt(index);
@@ -148,6 +171,16 @@ protected int _getInt(int index) {
return buffer.getInt(index);
}
+ @Override
+ public int getIntLE(int index) {
+ return buffer.getIntLE(index);
+ }
+
+ @Override
+ protected int _getIntLE(int index) {
+ return buffer.getIntLE(index);
+ }
+
@Override
public long getLong(int index) {
return _getLong(index);
@@ -158,6 +191,16 @@ protected long _getLong(int index) {
return buffer.getLong(index);
}
+ @Override
+ public long getLongLE(int index) {
+ return buffer.getLongLE(index);
+ }
+
+ @Override
+ protected long _getLongLE(int index) {
+ return buffer.getLongLE(index);
+ }
+
@Override
public abstract ByteBuf copy(int index, int length);
@@ -206,6 +249,17 @@ protected void _setShort(int index, int value) {
buffer.setShort(index, value);
}
+ @Override
+ public ByteBuf setShortLE(int index, int value) {
+ buffer.setShortLE(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setShortLE(int index, int value) {
+ buffer.setShortLE(index, value);
+ }
+
@Override
public ByteBuf setMedium(int index, int value) {
_setMedium(index, value);
@@ -217,6 +271,17 @@ protected void _setMedium(int index, int value) {
buffer.setMedium(index, value);
}
+ @Override
+ public ByteBuf setMediumLE(int index, int value) {
+ buffer.setMediumLE(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setMediumLE(int index, int value) {
+ buffer.setMediumLE(index, value);
+ }
+
@Override
public ByteBuf setInt(int index, int value) {
_setInt(index, value);
@@ -228,6 +293,17 @@ protected void _setInt(int index, int value) {
buffer.setInt(index, value);
}
+ @Override
+ public ByteBuf setIntLE(int index, int value) {
+ buffer.setIntLE(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setIntLE(int index, int value) {
+ buffer.setIntLE(index, value);
+ }
+
@Override
public ByteBuf setLong(int index, long value) {
_setLong(index, value);
@@ -239,6 +315,17 @@ protected void _setLong(int index, long value) {
buffer.setLong(index, value);
}
+ @Override
+ public ByteBuf setLongLE(int index, long value) {
+ buffer.setLongLE(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setLongLE(int index, long value) {
+ buffer.setLongLE(index, value);
+ }
+
@Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
buffer.setBytes(index, src, srcIndex, length);
@@ -257,6 +344,12 @@ public ByteBuf setBytes(int index, ByteBuffer src) {
return this;
}
+ @Override
+ public int setBytes(int index, FileChannel in, long position, int length)
+ throws IOException {
+ return buffer.setBytes(index, in, position, length);
+ }
+
@Override
public ByteBuf getBytes(int index, OutputStream out, int length)
throws IOException {
@@ -282,6 +375,13 @@ public int setBytes(int index, ScatteringByteChannel in, int length)
return buffer.setBytes(index, in, length);
}
+
+ @Override
+ public int getBytes(int index, FileChannel out, long position, int length)
+ throws IOException {
+ return buffer.getBytes(index, out, position, length);
+ }
+
@Override
public int nioBufferCount() {
return buffer.nioBufferCount();
@@ -298,12 +398,12 @@ public ByteBuffer internalNioBuffer(int index, int length) {
}
@Override
- public int forEachByte(int index, int length, ByteBufProcessor processor) {
+ public int forEachByte(int index, int length, ByteProcessor processor) {
return buffer.forEachByte(index, length, processor);
}
@Override
- public int forEachByteDesc(int index, int length, ByteBufProcessor processor) {
+ public int forEachByteDesc(int index, int length, ByteProcessor processor) {
return buffer.forEachByteDesc(index, length, processor);
}
@@ -312,6 +412,18 @@ public final int refCnt() {
return unwrap().refCnt();
}
+ @Override
+ public final ByteBuf touch() {
+ unwrap().touch();
+ return this;
+ }
+
+ @Override
+ public final ByteBuf touch(Object hint) {
+ unwrap().touch(hint);
+ return this;
+ }
+
@Override
public final ByteBuf retain() {
unwrap().retain();
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java b/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java
index 5bd6b9fe379..ed2184aadab 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java
@@ -219,6 +219,15 @@ public long getLimit() {
return allocationLimit.get();
}
+ /**
+ * Return the initial reservation.
+ *
+ * @return reservation in bytes.
+ */
+ public long getInitReservation() {
+ return reservation;
+ }
+
/**
* Set the maximum amount of memory that can be allocated in the this Accountant before failing
* an allocation.
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java
index b8b5283423c..94102992139 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java
@@ -18,8 +18,8 @@
package org.apache.arrow.memory;
+import io.netty.buffer.AbstractByteBufAllocator;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.ExpandableByteBuf;
@@ -32,7 +32,7 @@
* otherwise non-expandable
* ArrowBufs to be expandable.
*/
-public class ArrowByteBufAllocator implements ByteBufAllocator {
+public class ArrowByteBufAllocator extends AbstractByteBufAllocator {
private static final int DEFAULT_BUFFER_SIZE = 4096;
private static final int DEFAULT_MAX_COMPOSITE_COMPONENTS = 16;
@@ -142,8 +142,17 @@ public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
throw fail();
}
+ @Override
+ protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
+ throw fail();
+ }
+
+ @Override
+ protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
+ return buffer(initialCapacity, maxCapacity);
+ }
+
private RuntimeException fail() {
throw new UnsupportedOperationException("Allocator doesn't support heap-based memory.");
}
-
}
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java
index b23a6e4bd85..a5da50e6278 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java
@@ -91,6 +91,13 @@ public interface BufferAllocator extends AutoCloseable {
*/
public long getLimit();
+ /**
+ * Return the initial reservation.
+ *
+ * @return reservation in bytes.
+ */
+ public long getInitReservation();
+
/**
* Set the maximum amount of memory this allocator is allowed to allocate.
*
diff --git a/java/pom.xml b/java/pom.xml
index 0a0f2e0ce8f..384ef56882f 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -32,7 +32,7 @@
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector;
+
+/**
+ * Vector that support density aware initial capacity settings.
+ */
+public interface DensityAwareVector {
+ /**
+ * Set value with density
+ * @param valueCount
+ * @param density
+ */
+ void setInitialCapacity(int valueCount, double density);
+
+}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/NullableDecimalVector.java b/java/vector/src/main/java/org/apache/arrow/vector/NullableDecimalVector.java
index dcc551094ae..69bcd33fc6e 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/NullableDecimalVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/NullableDecimalVector.java
@@ -18,6 +18,7 @@
package org.apache.arrow.vector;
+import com.google.common.base.Preconditions;
import io.netty.buffer.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.complex.impl.DecimalReaderImpl;
@@ -199,6 +200,61 @@ public void set(int index, ArrowBuf buffer) {
valueBuffer.setBytes(index * TYPE_WIDTH, buffer, 0, TYPE_WIDTH);
}
+ /**
+ * Set the decimal element at given index to the provided array of bytes.
+ * Decimal is now implemented as Little Endian. This API allows the user
+ * to pass a decimal value in the form of byte array in BE byte order.
+ *
+ * Consumers of Arrow code can use this API instead of first swapping
+ * the source bytes (doing a write and read) and then finally writing to
+ * ArrowBuf of decimal vector.
+ *
+ * This method takes care of adding the necessary padding if the length
+ * of byte array is less then 16 (length of decimal type).
+ *
+ * @param index position of element
+ * @param value array of bytes containing decimal in big endian byte order.
+ */
+ public void setBigEndian(int index, byte[] value) {
+ BitVectorHelper.setValidityBitToOne(validityBuffer, index);
+ final int length = value.length;
+ int startIndex = index * TYPE_WIDTH;
+ if (length == TYPE_WIDTH) {
+ for (int i = TYPE_WIDTH - 1; i >= 3; i-=4) {
+ valueBuffer.setByte(startIndex, value[i]);
+ valueBuffer.setByte(startIndex + 1, value[i-1]);
+ valueBuffer.setByte(startIndex + 2, value[i-2]);
+ valueBuffer.setByte(startIndex + 3, value[i-3]);
+ startIndex += 4;
+ }
+
+ return;
+ }
+
+ if (length == 0) {
+ valueBuffer.setZero(startIndex, TYPE_WIDTH);
+ return;
+ }
+
+ if (length < 16) {
+ for (int i = length - 1; i >= 0; i--) {
+ valueBuffer.setByte(startIndex, value[i]);
+ startIndex++;
+ }
+
+ final byte pad = (byte) (value[0] < 0 ? 0xFF : 0x00);
+ final int maxStartIndex = (index + 1) * TYPE_WIDTH;
+ while (startIndex < maxStartIndex) {
+ valueBuffer.setByte(startIndex, pad);
+ startIndex++;
+ }
+
+ return;
+ }
+
+ throw new IllegalArgumentException("Invalid decimal value length. Valid length in [1 - 16], got " + length);
+ }
+
/**
* Set the element at the given index to the given value.
*
@@ -266,6 +322,16 @@ public void setSafe(int index, ArrowBuf buffer) {
set(index, buffer);
}
+ /**
+ * Same as {@link #setBigEndian(int, byte[])} except that it handles the
+ * case when index is greater than or equal to existing
+ * value capacity {@link #getValueCapacity()}.
+ */
+ public void setBigEndianSafe(int index, byte[] value) {
+ handleSafe(index);
+ setBigEndian(index, value);
+ }
+
/**
* Same as {@link #set(int, int, ArrowBuf)} except that it handles the
* case when index is greater than or equal to existing
@@ -424,4 +490,4 @@ public void copyValueSafe(int fromIndex, int toIndex) {
to.copyFromSafe(fromIndex, toIndex, NullableDecimalVector.this);
}
}
-}
\ No newline at end of file
+}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VariableWidthVector.java b/java/vector/src/main/java/org/apache/arrow/vector/VariableWidthVector.java
index 04c00b7c834..7182fa8476c 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VariableWidthVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VariableWidthVector.java
@@ -18,7 +18,7 @@
package org.apache.arrow.vector;
-public interface VariableWidthVector extends ValueVector {
+public interface VariableWidthVector extends ValueVector, DensityAwareVector {
/**
* Allocate a new memory space for this vector. Must be called prior to using the ValueVector.
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
index db0ff86df47..c777618fdfb 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
@@ -20,6 +20,7 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.OutOfMemoryException;
+import org.apache.arrow.vector.DensityAwareVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.types.Types.MinorType;
@@ -33,7 +34,7 @@
*
* This class implements common functionality of composite vectors.
*/
-public abstract class AbstractContainerVector implements ValueVector {
+public abstract class AbstractContainerVector implements ValueVector, DensityAwareVector {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractContainerVector.class);
protected final String name;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
index 4648d078949..c4414871eb8 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
@@ -25,10 +25,13 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.AddOrGetResult;
import org.apache.arrow.vector.BaseValueVector;
+import org.apache.arrow.vector.DensityAwareVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.ZeroVector;
+import org.apache.arrow.vector.BaseNullableVariableWidthVector;
+import org.apache.arrow.vector.BaseNullableFixedWidthVector;
import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.util.CallBack;
@@ -106,6 +109,7 @@ protected void reallocOffsetBuffer() {
long newAllocationSize = baseSize * 2L;
newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize);
+ newAllocationSize = Math.max(newAllocationSize, 1);
if (newAllocationSize > MAX_ALLOCATION_SIZE) {
throw new OversizedAllocationException("Unable to expand the buffer");
@@ -134,7 +138,53 @@ public FieldVector getDataVector() {
@Override
public void setInitialCapacity(int numRecords) {
offsetAllocationSizeInBytes = (numRecords + 1) * OFFSET_WIDTH;
- vector.setInitialCapacity(numRecords * RepeatedValueVector.DEFAULT_REPEAT_PER_RECORD);
+ if (vector instanceof BaseNullableVariableWidthVector || vector instanceof BaseNullableFixedWidthVector) {
+ vector.setInitialCapacity(numRecords * RepeatedValueVector.DEFAULT_REPEAT_PER_RECORD);
+ } else {
+ vector.setInitialCapacity(numRecords);
+ }
+ }
+
+ /**
+ * Specialized version of setInitialCapacity() for ListVector. This is
+ * used by some callers when they want to explicitly control and be
+ * conservative about memory allocated for inner data vector. This is
+ * very useful when we are working with memory constraints for a query
+ * and have a fixed amount of memory reserved for the record batch. In
+ * such cases, we are likely to face OOM or related problems when
+ * we reserve memory for a record batch with value count x and
+ * do setInitialCapacity(x) such that each vector allocates only
+ * what is necessary and not the default amount but the multiplier
+ * forces the memory requirement to go beyond what was needed.
+ *
+ * @param numRecords value count
+ * @param density density of ListVector. Density is the average size of
+ * list per position in the List vector. For example, a
+ * density value of 10 implies each position in the list
+ * vector has a list of 10 values.
+ * A density value of 0.1 implies out of 10 positions in
+ * the list vector, 1 position has a list of size 1 and
+ * remaining positions are null (no lists) or empty lists.
+ * This helps in tightly controlling the memory we provision
+ * for inner data vector.
+ */
+ @Override
+ public void setInitialCapacity(int numRecords, double density) {
+ if ((numRecords * density) >= 2_000_000_000) {
+ throw new OversizedAllocationException("Requested amount of memory is more than max allowed");
+ }
+ offsetAllocationSizeInBytes = (numRecords + 1) * OFFSET_WIDTH;
+ int innerValueCapacity = (int)(numRecords * density);
+
+ if(innerValueCapacity == 0) {
+ innerValueCapacity = 1;
+ }
+
+ if (vector instanceof DensityAwareVector) {
+ ((DensityAwareVector)vector).setInitialCapacity(innerValueCapacity, density);
+ } else {
+ vector.setInitialCapacity(innerValueCapacity);
+ }
}
@Override
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java
index 6713b1c7871..408ee930321 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java
@@ -222,6 +222,7 @@ private void reallocValidityBuffer() {
long newAllocationSize = baseSize * 2L;
newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize);
+ newAllocationSize = Math.max(newAllocationSize, 1);
if (newAllocationSize > MAX_ALLOCATION_SIZE) {
throw new OversizedAllocationException("Unable to expand the buffer");
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
index afe86a692c3..33698ca61a0 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
@@ -31,12 +31,7 @@
import org.apache.arrow.memory.BaseAllocator;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.OutOfMemoryException;
-import org.apache.arrow.vector.AddOrGetResult;
-import org.apache.arrow.vector.BufferBacked;
-import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.ValueVector;
-import org.apache.arrow.vector.ZeroVector;
-import org.apache.arrow.vector.BitVectorHelper;
+import org.apache.arrow.vector.*;
import org.apache.arrow.vector.complex.impl.ComplexCopier;
import org.apache.arrow.vector.complex.impl.UnionListReader;
import org.apache.arrow.vector.complex.impl.UnionListWriter;
@@ -102,6 +97,58 @@ public void initializeChildrenFromFields(List