diff --git a/connector/src/main/java/tech/ydb/spark/connector/common/IngestMethod.java b/connector/src/main/java/tech/ydb/spark/connector/common/IngestMethod.java index 9c37ecf..3e59fba 100644 --- a/connector/src/main/java/tech/ydb/spark/connector/common/IngestMethod.java +++ b/connector/src/main/java/tech/ydb/spark/connector/common/IngestMethod.java @@ -8,5 +8,6 @@ public enum IngestMethod { UPSERT, REPLACE, + INSERT, BULK_UPSERT; } diff --git a/connector/src/main/java/tech/ydb/spark/connector/common/OperationOption.java b/connector/src/main/java/tech/ydb/spark/connector/common/OperationOption.java index 04fae5c..1519800 100644 --- a/connector/src/main/java/tech/ydb/spark/connector/common/OperationOption.java +++ b/connector/src/main/java/tech/ydb/spark/connector/common/OperationOption.java @@ -8,9 +8,9 @@ */ public enum OperationOption implements SparkOption { /** - * Use single partition for scanning each table, if true. Default false. + * Number of retries for atomic write operations. Default 10. */ - SCAN_SINGLE("scan.single"), + WRITE_RETRY_COUNT("write.retry.count"), /** * Scan queue depth for each executor. Default 10, minimum 2. diff --git a/connector/src/main/java/tech/ydb/spark/connector/impl/YdbExecutor.java b/connector/src/main/java/tech/ydb/spark/connector/impl/YdbExecutor.java index c88c801..1afe9ae 100644 --- a/connector/src/main/java/tech/ydb/spark/connector/impl/YdbExecutor.java +++ b/connector/src/main/java/tech/ydb/spark/connector/impl/YdbExecutor.java @@ -25,13 +25,11 @@ import tech.ydb.table.SessionRetryContext; import tech.ydb.table.TableClient; import tech.ydb.table.description.TableDescription; -import tech.ydb.table.query.Params; import tech.ydb.table.result.ResultSetReader; import tech.ydb.table.settings.AlterTableSettings; import tech.ydb.table.settings.BulkUpsertSettings; import tech.ydb.table.settings.CreateTableSettings; import tech.ydb.table.settings.DescribeTableSettings; -import tech.ydb.table.transaction.TxControl; import tech.ydb.table.values.ListValue; /** @@ -88,17 +86,19 @@ public String extractPath(String name) { return transport.getDatabase() + "/" + name; } + public SessionRetryContext createRetryCtx(int retryCount, boolean idempotent) { + return SessionRetryContext.create(tableClient) + .sessionCreationTimeout(Duration.ofMinutes(5)) + .idempotent(idempotent) + .maxRetries(retryCount) + .build(); + } + public CompletableFuture executeBulkUpsert(String tablePath, ListValue batch) { BulkUpsertSettings settings = new BulkUpsertSettings(); return retryCtx.supplyStatus(s -> s.executeBulkUpsert(tablePath, batch, settings)); } - public CompletableFuture executeDataQuery(String query, Params params) { - return retryCtx.supplyStatus( - s -> s.executeDataQuery(query, TxControl.serializableRw(), params).thenApply(Result::getStatus) - ); - } - public CompletableFuture executeSchemeQuery(String query) { return retryCtx.supplyStatus(s -> s.executeSchemeQuery(query)); } diff --git a/connector/src/main/java/tech/ydb/spark/connector/write/YdbDataWriter.java b/connector/src/main/java/tech/ydb/spark/connector/write/YdbDataWriter.java index 5673c51..9c9d7d0 100644 --- a/connector/src/main/java/tech/ydb/spark/connector/write/YdbDataWriter.java +++ b/connector/src/main/java/tech/ydb/spark/connector/write/YdbDataWriter.java @@ -17,6 +17,8 @@ import tech.ydb.core.Status; import tech.ydb.core.StatusCode; import tech.ydb.spark.connector.YdbTypes; +import tech.ydb.table.Session; +import tech.ydb.table.SessionRetryContext; import tech.ydb.table.values.ListValue; import tech.ydb.table.values.StructType; import tech.ydb.table.values.Value; @@ -30,9 +32,11 @@ public abstract class YdbDataWriter implements DataWriter { static final int MAX_ROWS_COUNT = 10000; static final int MAX_BYTES_SIZE = 10 * 1024 * 1024; static final int CONCURRENCY = 2; + static final int WRITE_RETRY_COUNT = 10; private static final Logger logger = LoggerFactory.getLogger(YdbDataWriter.class); + private final SessionRetryContext retryCtx; private final YdbTypes types; private final StructType structType; private final ValueReader[] readers; @@ -47,8 +51,9 @@ public abstract class YdbDataWriter implements DataWriter { private int currentBatchSize = 0; private volatile Status lastError = null; - public YdbDataWriter(YdbTypes types, StructType structType, ValueReader[] readers, + public YdbDataWriter(SessionRetryContext retryCtx, YdbTypes types, StructType structType, ValueReader[] readers, int batchRowsCount, int batchBytesSize, int batchConcurrency) { + this.retryCtx = retryCtx; this.types = types; this.structType = structType; this.readers = readers; @@ -58,7 +63,7 @@ public YdbDataWriter(YdbTypes types, StructType structType, ValueReader[] reader this.semaphore = new Semaphore(maxConcurrency); } - abstract CompletableFuture executeWrite(ListValue batch); + abstract CompletableFuture executeWrite(Session session, ListValue batch); @Override public void write(InternalRow record) throws IOException { @@ -121,7 +126,8 @@ private void writeBatch() { return; } - CompletableFuture future = executeWrite(ListValue.of(copy)); + ListValue batch = ListValue.of(copy); + CompletableFuture future = retryCtx.supplyStatus(session -> executeWrite(session, batch)); writesInFly.put(future, future); future.whenComplete((st, th) -> { diff --git a/connector/src/main/java/tech/ydb/spark/connector/write/YdbWriterFactory.java b/connector/src/main/java/tech/ydb/spark/connector/write/YdbWriterFactory.java index 99b4fc4..e615a38 100644 --- a/connector/src/main/java/tech/ydb/spark/connector/write/YdbWriterFactory.java +++ b/connector/src/main/java/tech/ydb/spark/connector/write/YdbWriterFactory.java @@ -14,13 +14,19 @@ import org.slf4j.LoggerFactory; import scala.collection.Iterator; +import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.spark.connector.YdbTable; import tech.ydb.spark.connector.YdbTypes; import tech.ydb.spark.connector.common.FieldInfo; import tech.ydb.spark.connector.common.IngestMethod; import tech.ydb.spark.connector.common.OperationOption; +import tech.ydb.table.Session; +import tech.ydb.table.SessionRetryContext; import tech.ydb.table.query.Params; +import tech.ydb.table.settings.BulkUpsertSettings; +import tech.ydb.table.settings.ExecuteDataQuerySettings; +import tech.ydb.table.transaction.TxControl; import tech.ydb.table.values.ListValue; import tech.ydb.table.values.PrimitiveType; import tech.ydb.table.values.StructType; @@ -43,6 +49,7 @@ public class YdbWriterFactory implements DataWriterFactory { private final int batchRowsCount; private final int batchBytesLimit; private final int batchConcurrency; + private final int retryCount; public YdbWriterFactory(YdbTable table, LogicalWriteInfo logical, PhysicalWriteInfo physical) { this.table = table; @@ -52,6 +59,7 @@ public YdbWriterFactory(YdbTable table, LogicalWriteInfo logical, PhysicalWriteI this.batchBytesLimit = OperationOption.BATCH_LIMIT.readInt(logical.options(), YdbDataWriter.MAX_BYTES_SIZE); this.batchConcurrency = OperationOption.BATCH_CONCURRENCY.readInt(logical.options(), YdbDataWriter.CONCURRENCY); this.autoPkName = OperationOption.TABLE_AUTOPK_NAME.read(logical.options(), OperationOption.DEFAULT_AUTO_PK); + this.retryCount = OperationOption.WRITE_RETRY_COUNT.readInt(logical.options(), YdbDataWriter.WRITE_RETRY_COUNT); this.schema = logical.schema(); } @@ -97,20 +105,31 @@ public DataWriter createWriter(int partitionId, long taskId) { readers[idx] = columnReadeds.get(structType.getMemberName(idx)); } + boolean idempotent = method != IngestMethod.INSERT; + SessionRetryContext retryCtx = table.getCtx().getExecutor().createRetryCtx(retryCount, idempotent); + if (method == IngestMethod.BULK_UPSERT) { - return new YdbDataWriter(types, structType, readers, batchRowsCount, batchBytesLimit, batchConcurrency) { + return new YdbDataWriter(retryCtx, types, structType, readers, batchRowsCount, batchBytesLimit, + batchConcurrency) { + private final BulkUpsertSettings settings = new BulkUpsertSettings(); + @Override - CompletableFuture executeWrite(ListValue batch) { - return table.getCtx().getExecutor().executeBulkUpsert(table.getTablePath(), batch); + CompletableFuture executeWrite(Session session, ListValue batch) { + return session.executeBulkUpsert(table.getTablePath(), batch, settings); } }; } - String writeQuery = makeBatchSql(method.name(), table.getTablePath(), structType); - return new YdbDataWriter(types, structType, readers, batchRowsCount, batchBytesLimit, batchConcurrency) { + return new YdbDataWriter(retryCtx, types, structType, readers, batchRowsCount, batchBytesLimit, + batchConcurrency) { + private final String query = makeBatchSql(method.name(), table.getTablePath(), structType); + private final ExecuteDataQuerySettings settings = new ExecuteDataQuerySettings(); + @Override - CompletableFuture executeWrite(ListValue batch) { - return table.getCtx().getExecutor().executeDataQuery(writeQuery, Params.of("$input", batch)); + CompletableFuture executeWrite(Session session, ListValue batch) { + Params prms = Params.of("$input", batch); + return session.executeDataQuery(query, TxControl.serializableRw(), prms, settings) + .thenApply(Result::getStatus); } }; }