From 6f21e457607df552196f7960ae56e99096d7fd52 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Mon, 22 Sep 2025 11:06:28 +0100 Subject: [PATCH] Added options to customize batch request size --- .../connector/common/OperationOption.java | 12 ++++++++++- .../spark/connector/write/YdbDataWriter.java | 20 ++++++++++++++----- .../connector/write/YdbWriterFactory.java | 12 +++++++---- 3 files changed, 34 insertions(+), 10 deletions(-) diff --git a/connector/src/main/java/tech/ydb/spark/connector/common/OperationOption.java b/connector/src/main/java/tech/ydb/spark/connector/common/OperationOption.java index 2d4d5e6..04fae5c 100644 --- a/connector/src/main/java/tech/ydb/spark/connector/common/OperationOption.java +++ b/connector/src/main/java/tech/ydb/spark/connector/common/OperationOption.java @@ -50,7 +50,17 @@ public enum OperationOption implements SparkOption { /** * YDB max batch rows for ingestion. */ - BATCH_SIZE("batchsize"), + BATCH_ROWS("batch.rows"), + + /** + * Limit for batch request size in bytes + */ + BATCH_LIMIT("batch.sizelimit"), + + /** + * Count of parallel batch requests per one writer + */ + BATCH_CONCURRENCY("batch.concurrency"), /** * YDB table's primary key, as a comma-delimited list of column names. diff --git a/connector/src/main/java/tech/ydb/spark/connector/write/YdbDataWriter.java b/connector/src/main/java/tech/ydb/spark/connector/write/YdbDataWriter.java index f8b58a8..5673c51 100644 --- a/connector/src/main/java/tech/ydb/spark/connector/write/YdbDataWriter.java +++ b/connector/src/main/java/tech/ydb/spark/connector/write/YdbDataWriter.java @@ -27,26 +27,34 @@ * @author zinal */ public abstract class YdbDataWriter implements DataWriter { + static final int MAX_ROWS_COUNT = 10000; + static final int MAX_BYTES_SIZE = 10 * 1024 * 1024; + static final int CONCURRENCY = 2; + private static final Logger logger = LoggerFactory.getLogger(YdbDataWriter.class); private final YdbTypes types; private final StructType structType; private final ValueReader[] readers; - private final int maxBatchSize; + private final int maxRowsCount; + private final int maxBytesSize; private final int maxConcurrency; private final Semaphore semaphore; private final Map, CompletableFuture> writesInFly = new ConcurrentHashMap<>(); private List> currentBatch = new ArrayList<>(); + private int currentBatchSize = 0; private volatile Status lastError = null; - public YdbDataWriter(YdbTypes types, StructType structType, ValueReader[] readers, int maxBatchSize) { + public YdbDataWriter(YdbTypes types, StructType structType, ValueReader[] readers, + int batchRowsCount, int batchBytesSize, int batchConcurrency) { this.types = types; this.structType = structType; this.readers = readers; - this.maxBatchSize = maxBatchSize; - this.maxConcurrency = 2; + this.maxRowsCount = batchRowsCount; + this.maxBytesSize = batchBytesSize; + this.maxConcurrency = batchConcurrency; this.semaphore = new Semaphore(maxConcurrency); } @@ -62,10 +70,11 @@ public void write(InternalRow record) throws IOException { Value[] row = new Value[readers.length]; for (int idx = 0; idx < row.length; ++idx) { row[idx] = readers[idx].read(types, record); + currentBatchSize += row[idx].toPb().getSerializedSize(); } currentBatch.add(structType.newValueUnsafe(row)); - if (currentBatch.size() >= maxBatchSize) { + if (currentBatch.size() >= maxRowsCount || currentBatchSize >= maxBytesSize) { writeBatch(); } } @@ -98,6 +107,7 @@ public void close() throws IOException { } private void writeBatch() { + currentBatchSize = 0; if (currentBatch.isEmpty()) { return; } diff --git a/connector/src/main/java/tech/ydb/spark/connector/write/YdbWriterFactory.java b/connector/src/main/java/tech/ydb/spark/connector/write/YdbWriterFactory.java index 6e9a2b7..99b4fc4 100644 --- a/connector/src/main/java/tech/ydb/spark/connector/write/YdbWriterFactory.java +++ b/connector/src/main/java/tech/ydb/spark/connector/write/YdbWriterFactory.java @@ -40,13 +40,17 @@ public class YdbWriterFactory implements DataWriterFactory { private final IngestMethod method; private final String autoPkName; - private final int maxBatchSize; + private final int batchRowsCount; + private final int batchBytesLimit; + private final int batchConcurrency; public YdbWriterFactory(YdbTable table, LogicalWriteInfo logical, PhysicalWriteInfo physical) { this.table = table; this.types = new YdbTypes(logical.options()); this.method = OperationOption.INGEST_METHOD.readEnum(logical.options(), IngestMethod.BULK_UPSERT); - this.maxBatchSize = OperationOption.BATCH_SIZE.readInt(logical.options(), 10000); + this.batchRowsCount = OperationOption.BATCH_ROWS.readInt(logical.options(), YdbDataWriter.MAX_ROWS_COUNT); + this.batchBytesLimit = OperationOption.BATCH_LIMIT.readInt(logical.options(), YdbDataWriter.MAX_BYTES_SIZE); + this.batchConcurrency = OperationOption.BATCH_CONCURRENCY.readInt(logical.options(), YdbDataWriter.CONCURRENCY); this.autoPkName = OperationOption.TABLE_AUTOPK_NAME.read(logical.options(), OperationOption.DEFAULT_AUTO_PK); this.schema = logical.schema(); } @@ -94,7 +98,7 @@ public DataWriter createWriter(int partitionId, long taskId) { } if (method == IngestMethod.BULK_UPSERT) { - return new YdbDataWriter(types, structType, readers, maxBatchSize) { + return new YdbDataWriter(types, structType, readers, batchRowsCount, batchBytesLimit, batchConcurrency) { @Override CompletableFuture executeWrite(ListValue batch) { return table.getCtx().getExecutor().executeBulkUpsert(table.getTablePath(), batch); @@ -103,7 +107,7 @@ CompletableFuture executeWrite(ListValue batch) { } String writeQuery = makeBatchSql(method.name(), table.getTablePath(), structType); - return new YdbDataWriter(types, structType, readers, maxBatchSize) { + return new YdbDataWriter(types, structType, readers, batchRowsCount, batchBytesLimit, batchConcurrency) { @Override CompletableFuture executeWrite(ListValue batch) { return table.getCtx().getExecutor().executeDataQuery(writeQuery, Params.of("$input", batch));