Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,17 @@ public enum OperationOption implements SparkOption {
/**
* YDB max batch rows for ingestion.
*/
BATCH_SIZE("batchsize"),
BATCH_ROWS("batch.rows"),

/**
* Limit for batch request size in bytes
*/
BATCH_LIMIT("batch.sizelimit"),

/**
* Count of parallel batch requests per one writer
*/
BATCH_CONCURRENCY("batch.concurrency"),

/**
* YDB table's primary key, as a comma-delimited list of column names.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,34 @@
* @author zinal
*/
public abstract class YdbDataWriter implements DataWriter<InternalRow> {
static final int MAX_ROWS_COUNT = 10000;
static final int MAX_BYTES_SIZE = 10 * 1024 * 1024;
static final int CONCURRENCY = 2;

private static final Logger logger = LoggerFactory.getLogger(YdbDataWriter.class);

private final YdbTypes types;
private final StructType structType;
private final ValueReader[] readers;

private final int maxBatchSize;
private final int maxRowsCount;
private final int maxBytesSize;
private final int maxConcurrency;
private final Semaphore semaphore;

private final Map<CompletableFuture<?>, CompletableFuture<?>> writesInFly = new ConcurrentHashMap<>();
private List<Value<?>> currentBatch = new ArrayList<>();
private int currentBatchSize = 0;
private volatile Status lastError = null;

public YdbDataWriter(YdbTypes types, StructType structType, ValueReader[] readers, int maxBatchSize) {
public YdbDataWriter(YdbTypes types, StructType structType, ValueReader[] readers,
int batchRowsCount, int batchBytesSize, int batchConcurrency) {
this.types = types;
this.structType = structType;
this.readers = readers;
this.maxBatchSize = maxBatchSize;
this.maxConcurrency = 2;
this.maxRowsCount = batchRowsCount;
this.maxBytesSize = batchBytesSize;
this.maxConcurrency = batchConcurrency;
this.semaphore = new Semaphore(maxConcurrency);
}

Expand All @@ -62,10 +70,11 @@ public void write(InternalRow record) throws IOException {
Value<?>[] row = new Value<?>[readers.length];
for (int idx = 0; idx < row.length; ++idx) {
row[idx] = readers[idx].read(types, record);
currentBatchSize += row[idx].toPb().getSerializedSize();
}

currentBatch.add(structType.newValueUnsafe(row));
if (currentBatch.size() >= maxBatchSize) {
if (currentBatch.size() >= maxRowsCount || currentBatchSize >= maxBytesSize) {
writeBatch();
}
}
Expand Down Expand Up @@ -98,6 +107,7 @@ public void close() throws IOException {
}

private void writeBatch() {
currentBatchSize = 0;
if (currentBatch.isEmpty()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,17 @@ public class YdbWriterFactory implements DataWriterFactory {

private final IngestMethod method;
private final String autoPkName;
private final int maxBatchSize;
private final int batchRowsCount;
private final int batchBytesLimit;
private final int batchConcurrency;

public YdbWriterFactory(YdbTable table, LogicalWriteInfo logical, PhysicalWriteInfo physical) {
this.table = table;
this.types = new YdbTypes(logical.options());
this.method = OperationOption.INGEST_METHOD.readEnum(logical.options(), IngestMethod.BULK_UPSERT);
this.maxBatchSize = OperationOption.BATCH_SIZE.readInt(logical.options(), 10000);
this.batchRowsCount = OperationOption.BATCH_ROWS.readInt(logical.options(), YdbDataWriter.MAX_ROWS_COUNT);
this.batchBytesLimit = OperationOption.BATCH_LIMIT.readInt(logical.options(), YdbDataWriter.MAX_BYTES_SIZE);
this.batchConcurrency = OperationOption.BATCH_CONCURRENCY.readInt(logical.options(), YdbDataWriter.CONCURRENCY);
this.autoPkName = OperationOption.TABLE_AUTOPK_NAME.read(logical.options(), OperationOption.DEFAULT_AUTO_PK);
this.schema = logical.schema();
}
Expand Down Expand Up @@ -94,7 +98,7 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
}

if (method == IngestMethod.BULK_UPSERT) {
return new YdbDataWriter(types, structType, readers, maxBatchSize) {
return new YdbDataWriter(types, structType, readers, batchRowsCount, batchBytesLimit, batchConcurrency) {
@Override
CompletableFuture<Status> executeWrite(ListValue batch) {
return table.getCtx().getExecutor().executeBulkUpsert(table.getTablePath(), batch);
Expand All @@ -103,7 +107,7 @@ CompletableFuture<Status> executeWrite(ListValue batch) {
}

String writeQuery = makeBatchSql(method.name(), table.getTablePath(), structType);
return new YdbDataWriter(types, structType, readers, maxBatchSize) {
return new YdbDataWriter(types, structType, readers, batchRowsCount, batchBytesLimit, batchConcurrency) {
@Override
CompletableFuture<Status> executeWrite(ListValue batch) {
return table.getCtx().getExecutor().executeDataQuery(writeQuery, Params.of("$input", batch));
Expand Down