Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@
public enum IngestMethod {
UPSERT,
REPLACE,
INSERT,
BULK_UPSERT;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
*/
public enum OperationOption implements SparkOption {
/**
* Use single partition for scanning each table, if true. Default false.
* Number of retries for atomic write operations. Default 10.
*/
SCAN_SINGLE("scan.single"),
WRITE_RETRY_COUNT("write.retry.count"),

/**
* Scan queue depth for each executor. Default 10, minimum 2.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@
import tech.ydb.table.SessionRetryContext;
import tech.ydb.table.TableClient;
import tech.ydb.table.description.TableDescription;
import tech.ydb.table.query.Params;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.table.settings.AlterTableSettings;
import tech.ydb.table.settings.BulkUpsertSettings;
import tech.ydb.table.settings.CreateTableSettings;
import tech.ydb.table.settings.DescribeTableSettings;
import tech.ydb.table.transaction.TxControl;
import tech.ydb.table.values.ListValue;

/**
Expand Down Expand Up @@ -88,17 +86,19 @@ public String extractPath(String name) {
return transport.getDatabase() + "/" + name;
}

public SessionRetryContext createRetryCtx(int retryCount, boolean idempotent) {
return SessionRetryContext.create(tableClient)
.sessionCreationTimeout(Duration.ofMinutes(5))
.idempotent(idempotent)
.maxRetries(retryCount)
.build();
}

public CompletableFuture<Status> executeBulkUpsert(String tablePath, ListValue batch) {
BulkUpsertSettings settings = new BulkUpsertSettings();
return retryCtx.supplyStatus(s -> s.executeBulkUpsert(tablePath, batch, settings));
}

public CompletableFuture<Status> executeDataQuery(String query, Params params) {
return retryCtx.supplyStatus(
s -> s.executeDataQuery(query, TxControl.serializableRw(), params).thenApply(Result::getStatus)
);
}

public CompletableFuture<Status> executeSchemeQuery(String query) {
return retryCtx.supplyStatus(s -> s.executeSchemeQuery(query));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.spark.connector.YdbTypes;
import tech.ydb.table.Session;
import tech.ydb.table.SessionRetryContext;
import tech.ydb.table.values.ListValue;
import tech.ydb.table.values.StructType;
import tech.ydb.table.values.Value;
Expand All @@ -30,9 +32,11 @@ public abstract class YdbDataWriter implements DataWriter<InternalRow> {
static final int MAX_ROWS_COUNT = 10000;
static final int MAX_BYTES_SIZE = 10 * 1024 * 1024;
static final int CONCURRENCY = 2;
static final int WRITE_RETRY_COUNT = 10;

private static final Logger logger = LoggerFactory.getLogger(YdbDataWriter.class);

private final SessionRetryContext retryCtx;
private final YdbTypes types;
private final StructType structType;
private final ValueReader[] readers;
Expand All @@ -47,8 +51,9 @@ public abstract class YdbDataWriter implements DataWriter<InternalRow> {
private int currentBatchSize = 0;
private volatile Status lastError = null;

public YdbDataWriter(YdbTypes types, StructType structType, ValueReader[] readers,
public YdbDataWriter(SessionRetryContext retryCtx, YdbTypes types, StructType structType, ValueReader[] readers,
int batchRowsCount, int batchBytesSize, int batchConcurrency) {
this.retryCtx = retryCtx;
this.types = types;
this.structType = structType;
this.readers = readers;
Expand All @@ -58,7 +63,7 @@ public YdbDataWriter(YdbTypes types, StructType structType, ValueReader[] reader
this.semaphore = new Semaphore(maxConcurrency);
}

abstract CompletableFuture<Status> executeWrite(ListValue batch);
abstract CompletableFuture<Status> executeWrite(Session session, ListValue batch);

@Override
public void write(InternalRow record) throws IOException {
Expand Down Expand Up @@ -121,7 +126,8 @@ private void writeBatch() {
return;
}

CompletableFuture<Status> future = executeWrite(ListValue.of(copy));
ListValue batch = ListValue.of(copy);
CompletableFuture<Status> future = retryCtx.supplyStatus(session -> executeWrite(session, batch));
writesInFly.put(future, future);

future.whenComplete((st, th) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,19 @@
import org.slf4j.LoggerFactory;
import scala.collection.Iterator;

import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.spark.connector.YdbTable;
import tech.ydb.spark.connector.YdbTypes;
import tech.ydb.spark.connector.common.FieldInfo;
import tech.ydb.spark.connector.common.IngestMethod;
import tech.ydb.spark.connector.common.OperationOption;
import tech.ydb.table.Session;
import tech.ydb.table.SessionRetryContext;
import tech.ydb.table.query.Params;
import tech.ydb.table.settings.BulkUpsertSettings;
import tech.ydb.table.settings.ExecuteDataQuerySettings;
import tech.ydb.table.transaction.TxControl;
import tech.ydb.table.values.ListValue;
import tech.ydb.table.values.PrimitiveType;
import tech.ydb.table.values.StructType;
Expand All @@ -43,6 +49,7 @@ public class YdbWriterFactory implements DataWriterFactory {
private final int batchRowsCount;
private final int batchBytesLimit;
private final int batchConcurrency;
private final int retryCount;

public YdbWriterFactory(YdbTable table, LogicalWriteInfo logical, PhysicalWriteInfo physical) {
this.table = table;
Expand All @@ -52,6 +59,7 @@ public YdbWriterFactory(YdbTable table, LogicalWriteInfo logical, PhysicalWriteI
this.batchBytesLimit = OperationOption.BATCH_LIMIT.readInt(logical.options(), YdbDataWriter.MAX_BYTES_SIZE);
this.batchConcurrency = OperationOption.BATCH_CONCURRENCY.readInt(logical.options(), YdbDataWriter.CONCURRENCY);
this.autoPkName = OperationOption.TABLE_AUTOPK_NAME.read(logical.options(), OperationOption.DEFAULT_AUTO_PK);
this.retryCount = OperationOption.WRITE_RETRY_COUNT.readInt(logical.options(), YdbDataWriter.WRITE_RETRY_COUNT);
this.schema = logical.schema();
}

Expand Down Expand Up @@ -97,20 +105,31 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
readers[idx] = columnReadeds.get(structType.getMemberName(idx));
}

boolean idempotent = method != IngestMethod.INSERT;
SessionRetryContext retryCtx = table.getCtx().getExecutor().createRetryCtx(retryCount, idempotent);

if (method == IngestMethod.BULK_UPSERT) {
return new YdbDataWriter(types, structType, readers, batchRowsCount, batchBytesLimit, batchConcurrency) {
return new YdbDataWriter(retryCtx, types, structType, readers, batchRowsCount, batchBytesLimit,
batchConcurrency) {
private final BulkUpsertSettings settings = new BulkUpsertSettings();

@Override
CompletableFuture<Status> executeWrite(ListValue batch) {
return table.getCtx().getExecutor().executeBulkUpsert(table.getTablePath(), batch);
CompletableFuture<Status> executeWrite(Session session, ListValue batch) {
return session.executeBulkUpsert(table.getTablePath(), batch, settings);
}
};
}

String writeQuery = makeBatchSql(method.name(), table.getTablePath(), structType);
return new YdbDataWriter(types, structType, readers, batchRowsCount, batchBytesLimit, batchConcurrency) {
return new YdbDataWriter(retryCtx, types, structType, readers, batchRowsCount, batchBytesLimit,
batchConcurrency) {
private final String query = makeBatchSql(method.name(), table.getTablePath(), structType);
private final ExecuteDataQuerySettings settings = new ExecuteDataQuerySettings();

@Override
CompletableFuture<Status> executeWrite(ListValue batch) {
return table.getCtx().getExecutor().executeDataQuery(writeQuery, Params.of("$input", batch));
CompletableFuture<Status> executeWrite(Session session, ListValue batch) {
Params prms = Params.of("$input", batch);
return session.executeDataQuery(query, TxControl.serializableRw(), prms, settings)
.thenApply(Result::getStatus);
}
};
}
Expand Down