build() {
+ return new HBaseSink<>(
+ elementConverter,
+ Optional.ofNullable(getMaxBatchSize()).orElse(DEFAULT_MAX_BATCH_SIZE),
+ Optional.ofNullable(getMaxInFlightRequests())
+ .orElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS),
+ Optional.ofNullable(getMaxBufferedRequests()).orElse(DEFAULT_MAX_BUFFERED_REQUESTS),
+ Optional.ofNullable(getMaxBatchSizeInBytes())
+ .orElse(DEFAULT_MAX_BATCH_SIZE_IN_BYTES),
+ Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS),
+ Optional.ofNullable(getMaxRecordSizeInBytes())
+ .orElse(DEFAULT_MAX_RECORD_SIZE_IN_BYTES),
+ Optional.ofNullable(requestTimeoutMS).orElse(DEFAULT_MAX_REQUEST_TIMEOUT_MS),
+ Optional.ofNullable(failOnTimeout).orElse(DEFAULT_FAIL_ON_TIMEOUT),
+ tableName,
+ configuration);
+ }
+}
diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkException.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkException.java
new file mode 100644
index 00000000..3dbf91ee
--- /dev/null
+++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkException.java
@@ -0,0 +1,40 @@
+package org.apache.flink.connector.hbase.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Exception that can be thrown by {@link HBaseWriter} wrapping the HBase {@link Throwable}. */
+@PublicEvolving
+public class HBaseSinkException extends RuntimeException {
+
+ public HBaseSinkException(String message) {
+ super(message);
+ }
+
+ public HBaseSinkException(String message, Throwable throwable) {
+ super(message, throwable);
+ }
+
+ /** Exception thrown during HBase sink initialization. */
+ public static class HBaseSinkInitException extends HBaseSinkException {
+ private static final String ERROR_MESSAGE =
+ "Exception while trying to initialize HBase sink.";
+
+ public HBaseSinkInitException(Throwable throwable) {
+ super(ERROR_MESSAGE, throwable);
+ }
+
+ public HBaseSinkInitException(String message) {
+ super(message);
+ }
+ }
+
+ /** Exception thrown when trying to persist HBase mutations. */
+ public static class HBaseSinkMutationException extends HBaseSinkException {
+ private static final String ERROR_MESSAGE =
+ "Exception while trying to persist records in HBase sink, not retrying.";
+
+ public HBaseSinkMutationException(Throwable throwable) {
+ super(ERROR_MESSAGE, throwable);
+ }
+ }
+}
diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
deleted file mode 100644
index fbe8dcd9..00000000
--- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.hbase.sink;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.util.StringUtils;
-import org.apache.flink.util.concurrent.ExecutorThreadFactory;
-
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.client.BufferedMutatorParams;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * The sink function for HBase.
- *
- * This class leverage {@link BufferedMutator} to buffer multiple {@link
- * org.apache.hadoop.hbase.client.Mutation Mutations} before sending the requests to cluster. The
- * buffering strategy can be configured by {@code bufferFlushMaxSizeInBytes}, {@code
- * bufferFlushMaxMutations} and {@code bufferFlushIntervalMillis}.
- */
-@Internal
-public class HBaseSinkFunction extends RichSinkFunction
- implements CheckpointedFunction, BufferedMutator.ExceptionListener {
-
- private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(HBaseSinkFunction.class);
-
- private final String hTableName;
- private final byte[] serializedConfig;
-
- private final long bufferFlushMaxSizeInBytes;
- private final long bufferFlushMaxMutations;
- private final long bufferFlushIntervalMillis;
- private final HBaseMutationConverter mutationConverter;
-
- private transient Connection connection;
- private transient DeduplicatedMutator mutator;
-
- private transient ScheduledExecutorService executor;
- private transient ScheduledFuture scheduledFuture;
- private transient AtomicLong numPendingRequests;
-
- private transient volatile boolean closed = false;
-
- /**
- * This is set from inside the {@link BufferedMutator.ExceptionListener} if a {@link Throwable}
- * was thrown.
- *
- * Errors will be checked and rethrown before processing each input element, and when the
- * sink is closed.
- */
- private final AtomicReference failureThrowable = new AtomicReference<>();
-
- public HBaseSinkFunction(
- String hTableName,
- org.apache.hadoop.conf.Configuration conf,
- HBaseMutationConverter mutationConverter,
- long bufferFlushMaxSizeInBytes,
- long bufferFlushMaxMutations,
- long bufferFlushIntervalMillis) {
- this.hTableName = hTableName;
- // Configuration is not serializable
- this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf);
- this.mutationConverter = mutationConverter;
- this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes;
- this.bufferFlushMaxMutations = bufferFlushMaxMutations;
- this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- LOG.info("start open ...");
- org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
- try {
- this.mutationConverter.open();
- this.numPendingRequests = new AtomicLong(0);
-
- if (null == connection) {
- this.connection = ConnectionFactory.createConnection(config);
- }
-
- TableName tableName = TableName.valueOf(hTableName);
- if (!connection.getAdmin().tableExists(tableName)) {
- throw new TableNotFoundException(tableName);
- }
-
- // create a parameter instance, set the table name and custom listener reference.
- BufferedMutatorParams params = new BufferedMutatorParams(tableName).listener(this);
- if (bufferFlushMaxSizeInBytes > 0) {
- params.writeBufferSize(bufferFlushMaxSizeInBytes);
- }
- this.mutator =
- new DeduplicatedMutator(
- (int) bufferFlushMaxMutations, connection.getBufferedMutator(params));
-
- if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) {
- this.executor =
- Executors.newScheduledThreadPool(
- 1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
- this.scheduledFuture =
- this.executor.scheduleWithFixedDelay(
- () -> {
- if (closed) {
- return;
- }
- try {
- flush();
- } catch (Exception e) {
- // fail the sink and skip the rest of the items
- // if the failure handler decides to throw an exception
- failureThrowable.compareAndSet(null, e);
- }
- },
- bufferFlushIntervalMillis,
- bufferFlushIntervalMillis,
- TimeUnit.MILLISECONDS);
- }
- } catch (TableNotFoundException tnfe) {
- LOG.error("The table " + hTableName + " not found ", tnfe);
- throw new RuntimeException("HBase table '" + hTableName + "' not found.", tnfe);
- } catch (IOException ioe) {
- LOG.error("Exception while creating connection to HBase.", ioe);
- throw new RuntimeException("Cannot create connection to HBase.", ioe);
- }
- LOG.info("end open.");
- }
-
- private org.apache.hadoop.conf.Configuration prepareRuntimeConfiguration() throws IOException {
- // create default configuration from current runtime env (`hbase-site.xml` in classpath)
- // first,
- // and overwrite configuration using serialized configuration from client-side env
- // (`hbase-site.xml` in classpath).
- // user params from client-side have the highest priority
- org.apache.hadoop.conf.Configuration runtimeConfig =
- HBaseConfigurationUtil.deserializeConfiguration(
- serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration());
-
- // do validation: check key option(s) in final runtime configuration
- if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
- LOG.error(
- "Can not connect to HBase without {} configuration",
- HConstants.ZOOKEEPER_QUORUM);
- throw new IOException(
- "Check HBase configuration failed, lost: '"
- + HConstants.ZOOKEEPER_QUORUM
- + "'!");
- }
-
- return runtimeConfig;
- }
-
- private void checkErrorAndRethrow() {
- Throwable cause = failureThrowable.get();
- if (cause != null) {
- throw new RuntimeException("An error occurred in HBaseSink.", cause);
- }
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- public void invoke(T value, Context context) throws Exception {
- checkErrorAndRethrow();
-
- mutator.mutate(mutationConverter.convertToMutation(value));
-
- // flush when the buffer number of mutations greater than the configured max size.
- if (bufferFlushMaxMutations > 0
- && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {
- flush();
- } else if (bufferFlushMaxMutations == 0 && bufferFlushMaxSizeInBytes == 0) {
- flush();
- }
- }
-
- private void flush() throws IOException {
- // DeduplicatedMutator is thread-safe
- mutator.flush();
- numPendingRequests.set(0);
- checkErrorAndRethrow();
- }
-
- @Override
- public void close() throws Exception {
- closed = true;
-
- if (mutator != null) {
- try {
- mutator.close();
- } catch (IOException e) {
- LOG.warn("Exception occurs while closing HBase BufferedMutator.", e);
- }
- this.mutator = null;
- }
-
- if (connection != null) {
- try {
- connection.close();
- } catch (IOException e) {
- LOG.warn("Exception occurs while closing HBase Connection.", e);
- }
- this.connection = null;
- }
-
- if (scheduledFuture != null) {
- scheduledFuture.cancel(false);
- if (executor != null) {
- executor.shutdownNow();
- }
- }
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext context) throws Exception {
- while (numPendingRequests.get() != 0) {
- flush();
- }
- }
-
- @Override
- public void initializeState(FunctionInitializationContext context) throws Exception {
- // nothing to do.
- }
-
- @Override
- public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator)
- throws RetriesExhaustedWithDetailsException {
- // fail the sink and skip the rest of the items
- // if the failure handler decides to throw an exception
- failureThrowable.compareAndSet(null, exception);
- }
-
- /**
- * Thread-safe class, grouped mutations by rows and keep the latest mutation. For more info, see
- * HBASE-8626.
- */
- private static class DeduplicatedMutator {
-
- private final BufferedMutator mutator;
- private final Map mutations;
-
- DeduplicatedMutator(int size, BufferedMutator mutator) {
- this.mutator = mutator;
- this.mutations = new HashMap<>(size);
- }
-
- synchronized void mutate(Mutation current) {
- ByteBuffer key = ByteBuffer.wrap(current.getRow());
- Mutation old = mutations.get(key);
- if (old == null || current.getTimeStamp() >= old.getTimeStamp()) {
- mutations.put(key, current);
- }
- }
-
- synchronized void flush() throws IOException {
- mutator.mutate(new ArrayList<>(mutations.values()));
- mutator.flush();
- mutations.clear();
- }
-
- synchronized void close() throws IOException {
- mutator.mutate(new ArrayList<>(mutations.values()));
- mutator.close();
- mutations.clear();
- }
- }
-}
diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseStateSerializer.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseStateSerializer.java
new file mode 100644
index 00000000..dce79825
--- /dev/null
+++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseStateSerializer.java
@@ -0,0 +1,30 @@
+package org.apache.flink.connector.hbase.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
+import org.apache.flink.connector.hbase.util.HBaseMutationSerialization;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/** Serializer class for state in HBase sink. */
+@Internal
+public class HBaseStateSerializer extends AsyncSinkWriterStateSerializer {
+ @Override
+ protected void serializeRequestToStream(SerializableMutation request, DataOutputStream out)
+ throws IOException {
+ HBaseMutationSerialization.serialize(request.get(), out);
+ }
+
+ @Override
+ protected SerializableMutation deserializeRequestFromStream(
+ long requestSize, DataInputStream in) throws IOException {
+ return new SerializableMutation(HBaseMutationSerialization.deserialize(in));
+ }
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+}
diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriter.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriter.java
new file mode 100644
index 00000000..06d90c44
--- /dev/null
+++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriter.java
@@ -0,0 +1,249 @@
+package org.apache.flink.connector.hbase.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
+import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;
+import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;
+import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * Sink writer created for {@link HBaseSink} to write {@link SerializableMutation} elements to
+ * HBase. More details can be found in the JavaDocs for {@link HBaseSink}.
+ *
+ * More details on the internals of this sink writer can be found in the JavaDocs of {@link
+ * AsyncSinkWriter}.
+ */
+@Internal
+public class HBaseWriter extends AsyncSinkWriter {
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseWriter.class);
+
+ private final AsyncConnection connection;
+ private final AsyncTable table;
+ private final SinkWriterMetricGroup metrics;
+ private final Counter numRecordsOutErrorsCounter;
+ private final HBaseWriterAsyncHandler hBaseWriterAsyncHandler;
+
+ /** This can be removed once rebased to Flink 2.0. */
+ public HBaseWriter(
+ ElementConverter elementConverter,
+ Sink.InitContext context,
+ Collection> states,
+ int maxBatchSize,
+ long maxBatchSizeInBytes,
+ int maxInFlightRequests,
+ int maxBufferedRequests,
+ long maxTimeInBufferMS,
+ long maxRecordSizeInBytes,
+ long requestTimeoutMS,
+ boolean failOnTimeout,
+ String tableName,
+ Configuration configuration) {
+ super(
+ elementConverter,
+ context,
+ AsyncSinkWriterConfiguration.builder()
+ .setMaxBatchSize(maxBatchSize)
+ .setMaxBatchSizeInBytes(maxBatchSizeInBytes)
+ .setMaxInFlightRequests(maxInFlightRequests)
+ .setMaxBufferedRequests(maxBufferedRequests)
+ .setMaxTimeInBufferMS(maxTimeInBufferMS)
+ .setMaxRecordSizeInBytes(maxRecordSizeInBytes)
+ .setRequestTimeoutMS(requestTimeoutMS)
+ .setFailOnTimeout(failOnTimeout)
+ .setRateLimitingStrategy(
+ buildRateLimitingStrategy(maxInFlightRequests, maxBatchSize))
+ .build(),
+ states);
+
+ this.connection = createClient(configuration);
+ this.table = connection.getTable(TableName.valueOf(tableName));
+ this.metrics = context.metricGroup();
+ this.numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter();
+ this.hBaseWriterAsyncHandler = new HBaseWriterAsyncHandler(numRecordsOutErrorsCounter);
+ }
+
+ public HBaseWriter(
+ ElementConverter elementConverter,
+ WriterInitContext context,
+ Collection> states,
+ int maxBatchSize,
+ long maxBatchSizeInBytes,
+ int maxInFlightRequests,
+ int maxBufferedRequests,
+ long maxTimeInBufferMS,
+ long maxRecordSizeInBytes,
+ long requestTimeoutMS,
+ boolean failOnTimeout,
+ String tableName,
+ Configuration configuration) {
+ super(
+ elementConverter,
+ context,
+ AsyncSinkWriterConfiguration.builder()
+ .setMaxBatchSize(maxBatchSize)
+ .setMaxBatchSizeInBytes(maxBatchSizeInBytes)
+ .setMaxInFlightRequests(maxInFlightRequests)
+ .setMaxBufferedRequests(maxBufferedRequests)
+ .setMaxTimeInBufferMS(maxTimeInBufferMS)
+ .setMaxRecordSizeInBytes(maxRecordSizeInBytes)
+ .setRequestTimeoutMS(requestTimeoutMS)
+ .setFailOnTimeout(failOnTimeout)
+ .setRateLimitingStrategy(
+ buildRateLimitingStrategy(maxInFlightRequests, maxBatchSize))
+ .build(),
+ states);
+
+ this.connection = createClient(configuration);
+ this.table = connection.getTable(TableName.valueOf(tableName));
+ this.metrics = context.metricGroup();
+ this.numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter();
+ this.hBaseWriterAsyncHandler = new HBaseWriterAsyncHandler(numRecordsOutErrorsCounter);
+ }
+
+ /**
+ * Submits a batch of mutation requests to HBase asynchronously.
+ *
+ * This method performs the following operations:
+ *
+ *
+ * - Deduplicates the request entries to ensure only the latest mutation per row is sent
+ *
- Extracts {@link Mutation} from {@link SerializableMutation} entries
+ *
- Submits the batch to HBase asynchronously
+ *
- Handles failures by collecting failed mutations for retry
+ *
+ */
+ @Override
+ protected void submitRequestEntries(
+ List requestEntries,
+ ResultHandler resultHandler) {
+ // Requests have to be deduplicated to ensure correct behavior.
+ List requestEntriesDeduplicated =
+ deduplicateRequestEntries(requestEntries);
+
+ // Convert WrappedMutations to Mutations
+ List mutations =
+ requestEntriesDeduplicated.stream()
+ .map(SerializableMutation::get)
+ .collect(Collectors.toList());
+
+ // Handle failed requests to retry them later. It's possible that some mutations failed
+ // while others did not.
+ List> futures = table.batch(mutations);
+ hBaseWriterAsyncHandler.handleWriteFutures(
+ futures, requestEntriesDeduplicated, resultHandler);
+ }
+
+ @Override
+ protected long getSizeInBytes(SerializableMutation requestEntry) {
+ return requestEntry.get().heapSize();
+ }
+
+ @Override
+ public void close() {
+ if (!connection.isClosed()) {
+ try {
+ connection.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Group mutations and keep the latest mutation only. Please see HBASE-8626 for more information.
+ *
+ * @param requestEntries entries to save
+ * @return deduplicated entries with the latest mutation only for each affected row
+ */
+ private static List deduplicateRequestEntries(
+ List requestEntries) {
+ Map requestEntriesMap = new HashMap<>();
+ for (SerializableMutation requestEntry : requestEntries) {
+ ByteBuffer key = ByteBuffer.wrap(requestEntry.get().getRow());
+ SerializableMutation old = requestEntriesMap.get(key);
+ if (old == null || requestEntry.get().getTimestamp() >= old.get().getTimestamp()) {
+ requestEntriesMap.put(key, requestEntry);
+ }
+ }
+
+ return new ArrayList<>(requestEntriesMap.values());
+ }
+
+ /** Builds a congestion control rate limiting strategy using AIMD algorithm. */
+ private static RateLimitingStrategy buildRateLimitingStrategy(
+ int maxInFlightRequests, int maxBatchSize) {
+ return CongestionControlRateLimitingStrategy.builder()
+ .setMaxInFlightRequests(maxInFlightRequests)
+ .setInitialMaxInFlightMessages(maxBatchSize)
+ .setScalingStrategy(
+ AIMDScalingStrategy.builder(maxBatchSize * maxInFlightRequests).build())
+ .build();
+ }
+
+ /**
+ * Creates an asynchronous HBase client connection using the provided configuration.
+ *
+ * This method merges the runtime HBase configuration with the provided configuration and
+ * validates that the ZooKeeper quorum is properly configured before establishing the
+ * connection.
+ *
+ * @param configuration the HBase configuration to use for connection
+ * @return an asynchronous connection to the HBase cluster
+ * @throws HBaseSinkException.HBaseSinkInitException if ZooKeeper quorum is not configured or
+ * connection fails
+ */
+ private static AsyncConnection createClient(Configuration configuration)
+ throws HBaseSinkException.HBaseSinkInitException {
+ Configuration runtimeConfig = HBaseConfigurationUtil.getHBaseConfiguration();
+ runtimeConfig.addResource(configuration);
+
+ if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
+ LOG.error(
+ "Can not connect to HBase without '{}' configuration",
+ HConstants.ZOOKEEPER_QUORUM);
+ throw new HBaseSinkException.HBaseSinkInitException(
+ "Can not connect to HBase without '"
+ + HConstants.ZOOKEEPER_QUORUM
+ + "' configuration");
+ }
+
+ try {
+ return ConnectionFactory.createAsyncConnection(runtimeConfig).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new HBaseSinkException.HBaseSinkInitException(e);
+ }
+ }
+}
diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriterAsyncHandler.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriterAsyncHandler.java
new file mode 100644
index 00000000..01d32afb
--- /dev/null
+++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriterAsyncHandler.java
@@ -0,0 +1,156 @@
+package org.apache.flink.connector.hbase.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
+
+/**
+ * This class is responsible for managing the async calls to HBase and managing the {@link
+ * ResultHandler} to decide which request can be retried.
+ */
+@Internal
+public class HBaseWriterAsyncHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseWriterAsyncHandler.class);
+
+ private final Counter numRecordsOutErrorsCounter;
+
+ public HBaseWriterAsyncHandler(Counter numRecordsOutErrorsCounter) {
+ this.numRecordsOutErrorsCounter = numRecordsOutErrorsCounter;
+ }
+
+ /**
+ * For a given list of HBase write futures, this method will asynchronously analyze their
+ * result, and using the provided {@link ResultHandler}, it will instruct {@link
+ * org.apache.flink.connector.base.sink.writer.AsyncSinkWriter} to retry some mutations. In case
+ * of errors which should not be retried, the Flink job will stop with an error.
+ *
+ * @param futures list of HBase write futures
+ * @param processedMutationsInOrder list of mutations with their indices matching that of the
+ * futures
+ * @param resultHandler result handler to manage retries and exceptions
+ */
+ public void handleWriteFutures(
+ List> futures,
+ List processedMutationsInOrder,
+ ResultHandler resultHandler) {
+ Preconditions.checkArgument(
+ futures.size() == processedMutationsInOrder.size(),
+ "Different number of HBase futures was supplied than mutations.");
+
+ ConcurrentLinkedQueue failedMutations = new ConcurrentLinkedQueue<>();
+
+ // Handle each future separately and store failures.
+ CompletableFuture>[] handledFutures = new CompletableFuture[futures.size()];
+ for (int i = 0; i < futures.size(); i++) {
+ final int index = i;
+ handledFutures[index] =
+ futures.get(index)
+ .exceptionally(
+ throwable -> {
+ failedMutations.add(
+ new FailedMutation(
+ processedMutationsInOrder.get(index),
+ throwable));
+ return null;
+ });
+ }
+
+ // Exceptions are already handled here, so it's safe to use `thenRun()`.
+ CompletableFuture.allOf(handledFutures)
+ .thenRun(
+ () -> {
+ handleFailedRequests(failedMutations, resultHandler);
+ });
+ }
+
+ /**
+ * Handles mutations that failed to write to HBase.
+ *
+ * This method increments the error counter and schedules the failed mutations for retry
+ * through the result handler. If the exception should not be retried, the job will fail instead
+ * with an exception.
+ *
+ * @param failedMutations the list of mutations that failed to write
+ * @param resultHandler the handler responsible for retry logic
+ */
+ private void handleFailedRequests(
+ Collection failedMutations,
+ ResultHandler resultHandler) {
+ if (failedMutations.isEmpty()) {
+ resultHandler.complete();
+ return;
+ }
+
+ numRecordsOutErrorsCounter.inc(failedMutations.size());
+
+ for (FailedMutation failedMutation : failedMutations) {
+ LOG.warn("Mutation failed with exception", failedMutation.getThrowable());
+
+ if (isHBaseExceptionFatal(failedMutation.getThrowable())) {
+ resultHandler.completeExceptionally(
+ new HBaseSinkException.HBaseSinkMutationException(
+ failedMutation.getThrowable()));
+ return;
+ }
+ }
+
+ resultHandler.retryForEntries(
+ failedMutations.stream()
+ .map(FailedMutation::getWrappedMutation)
+ .collect(Collectors.toList()));
+ }
+
+ /**
+ * Check if HBase exception is fatal or could be retried. Also keeps a set of visited exceptions
+ * to make sure prevent infinite recursion.
+ */
+ private boolean isHBaseExceptionFatal(Throwable throwable, Set visited) {
+ if (throwable == null || !visited.add(throwable)) {
+ // Null or already visited
+ return false;
+ }
+
+ if (throwable instanceof DoNotRetryIOException) {
+ return true;
+ }
+
+ return isHBaseExceptionFatal(throwable.getCause(), visited);
+ }
+
+ private boolean isHBaseExceptionFatal(Throwable throwable) {
+ return isHBaseExceptionFatal(throwable, new HashSet<>());
+ }
+
+ /** Container class for a failed mutation also including the exception thrown. */
+ private static final class FailedMutation {
+ private final SerializableMutation mutation;
+ private final Throwable throwable;
+
+ private FailedMutation(SerializableMutation mutation, Throwable throwable) {
+ this.mutation = mutation;
+ this.throwable = throwable;
+ }
+
+ public SerializableMutation getWrappedMutation() {
+ return mutation;
+ }
+
+ public Throwable getThrowable() {
+ return throwable;
+ }
+ }
+}
diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationElementConverter.java
similarity index 82%
rename from flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java
rename to flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationElementConverter.java
index 5796d5c3..e80c54bd 100644
--- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java
+++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationElementConverter.java
@@ -18,6 +18,9 @@
package org.apache.flink.connector.hbase.sink;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.hbase.sink.WritableMetadata.TimeToLiveMetadata;
import org.apache.flink.connector.hbase.sink.WritableMetadata.TimestampMetadata;
import org.apache.flink.connector.hbase.util.HBaseSerde;
@@ -31,10 +34,10 @@
import java.util.List;
/**
- * An implementation of {@link HBaseMutationConverter} which converts {@link RowData} into {@link
+ * An implementation of {@link ElementConverter} which converts {@link RowData} into {@link
* Mutation}.
*/
-public class RowDataToMutationConverter implements HBaseMutationConverter {
+public class RowDataToMutationElementConverter implements ElementConverter {
private static final long serialVersionUID = 1L;
private final HBaseTableSchema schema;
@@ -44,7 +47,7 @@ public class RowDataToMutationConverter implements HBaseMutationConverter metadataKeys,
@@ -58,12 +61,13 @@ public RowDataToMutationConverter(
}
@Override
- public void open() {
+ public void open(Sink.InitContext context) {
+ ElementConverter.super.open(context);
this.serde = new HBaseSerde(schema, nullStringLiteral, ignoreNullValue);
}
@Override
- public Mutation convertToMutation(RowData record) {
+ public Mutation apply(RowData record, SinkWriter.Context context) {
Long timestamp = timestampMetadata.read(record);
Long timeToLive = timeToLiveMetadata.read(record);
RowKind kind = record.getRowKind();
diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/SerializableMutation.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/SerializableMutation.java
new file mode 100644
index 00000000..9e8b6a41
--- /dev/null
+++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/SerializableMutation.java
@@ -0,0 +1,27 @@
+package org.apache.flink.connector.hbase.sink;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.hadoop.hbase.client.Mutation;
+
+import java.io.Serializable;
+
+/**
+ * This class is used by {@link HBaseSink} and {@link HBaseWriter} to wrap HBase {@link Mutation}
+ * objects to be able to serialize them.
+ */
+@Internal
+public class SerializableMutation implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private transient Mutation mutation;
+
+ public SerializableMutation(Mutation mutation) {
+ this.mutation = mutation;
+ }
+
+ /** Get the wrapped mutation object. */
+ public Mutation get() {
+ return mutation;
+ }
+}
diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WrappedElementConverter.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WrappedElementConverter.java
new file mode 100644
index 00000000..49975721
--- /dev/null
+++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WrappedElementConverter.java
@@ -0,0 +1,36 @@
+package org.apache.flink.connector.hbase.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import org.apache.hadoop.hbase.client.Mutation;
+
+/**
+ * This is a helper class used to wrap an {@link ElementConverter} supplied by the user that
+ * converts the input data to {@link Mutation}. With this class, the elements will be seamlessly
+ * converted to internal {@link SerializableMutation} objects that can be serialized by the sink.
+ */
+@Internal
+public class WrappedElementConverter
+ implements ElementConverter {
+ private static final long serialVersionUID = 1L;
+
+ private final ElementConverter originalElementConverter;
+
+ public WrappedElementConverter(ElementConverter originalElementConverter) {
+ this.originalElementConverter = originalElementConverter;
+ }
+
+ @Override
+ public void open(Sink.InitContext context) {
+ ElementConverter.super.open(context);
+ originalElementConverter.open(context);
+ }
+
+ @Override
+ public SerializableMutation apply(InputT element, SinkWriter.Context context) {
+ return new SerializableMutation(originalElementConverter.apply(element, context));
+ }
+}
diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java
index d760c034..9425e261 100644
--- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java
+++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java
@@ -27,6 +27,10 @@
import java.time.Duration;
+import static org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.FLUSH_BUFFER_SIZE;
+import static org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.FLUSH_BUFFER_TIMEOUT;
+import static org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.MAX_BATCH_SIZE;
+
/** Options for the HBase connector. */
@PublicEvolving
public class HBaseConnectorOptions {
@@ -60,33 +64,63 @@ public class HBaseConnectorOptions {
"Representation for null values for string fields. HBase source and "
+ "sink encodes/decodes empty bytes as null values for all types except string type.");
+ @Deprecated
public static final ConfigOption SINK_BUFFER_FLUSH_MAX_SIZE =
ConfigOptions.key("sink.buffer-flush.max-size")
.memoryType()
- .defaultValue(MemorySize.parse("2mb"))
+ .noDefaultValue()
.withDescription(
"Writing option, maximum size in memory of buffered rows for each "
+ "writing request. This can improve performance for writing data to HBase database, "
- + "but may increase the latency. Can be set to '0' to disable it. ");
+ + "but may increase the latency. "
+ + String.format(
+ "This is a deprecated key and will be mapped to %s.",
+ FLUSH_BUFFER_SIZE.key()));
+ @Deprecated
public static final ConfigOption SINK_BUFFER_FLUSH_MAX_ROWS =
ConfigOptions.key("sink.buffer-flush.max-rows")
.intType()
- .defaultValue(1000)
+ .noDefaultValue()
.withDescription(
"Writing option, maximum number of rows to buffer for each writing request. "
+ "This can improve performance for writing data to HBase database, but may increase the latency. "
- + "Can be set to '0' to disable it.");
+ + String.format(
+ "This is a deprecated key and will be mapped to %s.",
+ MAX_BATCH_SIZE.key()));
+ @Deprecated
public static final ConfigOption SINK_BUFFER_FLUSH_INTERVAL =
ConfigOptions.key("sink.buffer-flush.interval")
.durationType()
- .defaultValue(Duration.ofSeconds(1))
+ .noDefaultValue()
.withDescription(
"Writing option, the interval to flush any buffered rows. "
+ "This can improve performance for writing data to HBase database, but may increase the latency. "
- + "Can be set to '0' to disable it. Note, both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' "
- + "can be set to '0' with the flush interval set allowing for complete async processing of buffered actions.");
+ + String.format(
+ "This is a deprecated key and will be mapped to %s.",
+ FLUSH_BUFFER_TIMEOUT.key()));
+
+ public static final ConfigOption SINK_MAX_RECORD_SIZE =
+ ConfigOptions.key("sink.max-record-size")
+ .longType()
+ .defaultValue(1048576L)
+ .withDescription(
+ "The maximum size in bytes of a single record. Records bigger than this will cause the job to fail.");
+
+ public static final ConfigOption SINK_REQUEST_TIMEOUT =
+ ConfigOptions.key("sink.request-timeout")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(10))
+ .withDescription(
+ "The maximum time to wait for a batch of HBase requests to complete before timing out.");
+
+ public static final ConfigOption SINK_FAIL_ON_TIMEOUT =
+ ConfigOptions.key("sink.fail-on-timeout")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to fail the job when a request times out. If false, timed-out requests will be logged but the job will continue processing. If true, a timeout will cause the job to fail.");
public static final ConfigOption SINK_IGNORE_NULL_VALUE =
ConfigOptions.key("sink.ignore-null-value")
diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
index 482644fd..39743e61 100644
--- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
+++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
@@ -20,7 +20,6 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.table.types.DataType;
@@ -31,11 +30,6 @@
import java.util.Map;
import java.util.Properties;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_IGNORE_NULL_VALUE;
-import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_QUORUM;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_ZNODE_PARENT;
@@ -83,18 +77,6 @@ public static void validatePrimaryKey(DataType dataType, int[] primaryKeyIndexes
}
}
- public static HBaseWriteOptions getHBaseWriteOptions(ReadableConfig tableOptions) {
- HBaseWriteOptions.Builder builder = HBaseWriteOptions.builder();
- builder.setBufferFlushIntervalMillis(
- tableOptions.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
- builder.setBufferFlushMaxRows(tableOptions.get(SINK_BUFFER_FLUSH_MAX_ROWS));
- builder.setBufferFlushMaxSizeInBytes(
- tableOptions.get(SINK_BUFFER_FLUSH_MAX_SIZE).getBytes());
- builder.setIgnoreNullValue(tableOptions.get(SINK_IGNORE_NULL_VALUE));
- builder.setParallelism(tableOptions.getOptional(SINK_PARALLELISM).orElse(null));
- return builder.build();
- }
-
/** config HBase Configuration. */
public static Configuration getHBaseConfiguration(ReadableConfig tableOptions) {
// create default configuration from current runtime env (`hbase-site.xml` in classpath)
diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseConfigurationUtil.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseConfigurationUtil.java
index b6679bdc..15710a19 100644
--- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseConfigurationUtil.java
+++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseConfigurationUtil.java
@@ -40,8 +40,6 @@ public class HBaseConfigurationUtil {
private static final Logger LOG = LoggerFactory.getLogger(HBaseConfigurationUtil.class);
- public static final String ENV_HBASE_CONF_DIR = "HBASE_CONF_DIR";
-
public static Configuration getHBaseConfiguration() {
// Instantiate an HBaseConfiguration to load the hbase-default.xml and hbase-site.xml from
@@ -177,39 +175,4 @@ private static void deserializeWritable(T writable, byte[]
DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream);
writable.readFields(dataInputStream);
}
-
- public static org.apache.hadoop.conf.Configuration createHBaseConf() {
- org.apache.hadoop.conf.Configuration hbaseClientConf = HBaseConfiguration.create();
-
- String hbaseConfDir = System.getenv(ENV_HBASE_CONF_DIR);
-
- if (hbaseConfDir != null) {
- if (new File(hbaseConfDir).exists()) {
- String coreSite = hbaseConfDir + "/core-site.xml";
- String hdfsSite = hbaseConfDir + "/hdfs-site.xml";
- String hbaseSite = hbaseConfDir + "/hbase-site.xml";
- if (new File(coreSite).exists()) {
- hbaseClientConf.addResource(new org.apache.hadoop.fs.Path(coreSite));
- LOG.info("Adding " + coreSite + " to hbase configuration");
- }
- if (new File(hdfsSite).exists()) {
- hbaseClientConf.addResource(new org.apache.hadoop.fs.Path(hdfsSite));
- LOG.info("Adding " + hdfsSite + " to hbase configuration");
- }
- if (new File(hbaseSite).exists()) {
- hbaseClientConf.addResource(new org.apache.hadoop.fs.Path(hbaseSite));
- LOG.info("Adding " + hbaseSite + " to hbase configuration");
- }
- } else {
- LOG.warn(
- "HBase config directory '{}' not found, cannot load HBase configuration.",
- hbaseConfDir);
- }
- } else {
- LOG.warn(
- "{} env variable not found, cannot load HBase configuration.",
- ENV_HBASE_CONF_DIR);
- }
- return hbaseClientConf;
- }
}
diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseMutationSerialization.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseMutationSerialization.java
new file mode 100644
index 00000000..d297a9cb
--- /dev/null
+++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseMutationSerialization.java
@@ -0,0 +1,45 @@
+package org.apache.flink.connector.hbase.util;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Internal utility class for serializing and deserializing HBase mutations.
+ *
+ * This class provides methods to convert HBase {@link Mutation} objects to and from their
+ * Protocol Buffer representations for transmission over the wire or storage. It supports the
+ * following HBase mutation types: {@link Put} and {@link Delete}.
+ */
+@Internal
+public class HBaseMutationSerialization {
+ public static void serialize(Mutation mutation, OutputStream out) throws IOException {
+ ClientProtos.MutationProto.MutationType type;
+ if (mutation instanceof Put) {
+ type = ClientProtos.MutationProto.MutationType.PUT;
+ } else if (mutation instanceof Delete) {
+ type = ClientProtos.MutationProto.MutationType.DELETE;
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unknown HBase mutation type, cannot serialize: %s",
+ mutation.getClass()));
+ }
+
+ ClientProtos.MutationProto proto = ProtobufUtil.toMutation(type, mutation);
+ proto.writeDelimitedTo(out);
+ }
+
+ public static Mutation deserialize(InputStream in) throws IOException {
+ ClientProtos.MutationProto proto = ClientProtos.MutationProto.parseDelimitedFrom(in);
+ return ProtobufUtil.toMutation(proto);
+ }
+}
diff --git a/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/sink/HBaseWriterAsyncHandlerTest.java b/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/sink/HBaseWriterAsyncHandlerTest.java
new file mode 100644
index 00000000..1f750f69
--- /dev/null
+++ b/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/sink/HBaseWriterAsyncHandlerTest.java
@@ -0,0 +1,195 @@
+package org.apache.flink.connector.hbase.sink;
+
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
+import org.apache.flink.metrics.Counter;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link HBaseWriterAsyncHandler}. */
+class HBaseWriterAsyncHandlerTest {
+
+ private final Counter counter = new TestCounter();
+ private final TestResultHandler resultHandler = new TestResultHandler();
+ private final HBaseWriterAsyncHandler handler = new HBaseWriterAsyncHandler(counter);
+
+ @Test
+ void testHBaseWriteAsyncHandlerEmpty() {
+ handler.handleWriteFutures(Collections.emptyList(), Collections.emptyList(), resultHandler);
+
+ assertThat(counter.getCount()).isEqualTo(0);
+ assertThat(resultHandler.getComplete()).isTrue();
+ }
+
+ @Test
+ void testHBaseWriteAsyncHandlerSuccessful() {
+ List mutations =
+ IntStream.range(0, 500)
+ .mapToObj(__ -> generateMutation())
+ .collect(Collectors.toList());
+ List> futures =
+ mutations.stream()
+ .map(m -> new CompletableFuture())
+ .collect(Collectors.toList());
+
+ handler.handleWriteFutures(futures, mutations, resultHandler);
+ futures.forEach(f -> f.complete(null));
+
+ assertThat(counter.getCount()).isEqualTo(0);
+ assertThat(resultHandler.getComplete()).isTrue();
+ }
+
+ /** Half the mutations will throw an exception that can be retried. */
+ @Test
+ void testHBaseWriteAsyncHandlerException() {
+ List allMutations =
+ IntStream.range(0, 500)
+ .mapToObj(__ -> generateMutation())
+ .collect(Collectors.toList());
+ List> futures =
+ IntStream.range(0, 500)
+ .mapToObj(__ -> new CompletableFuture())
+ .collect(Collectors.toList());
+
+ handler.handleWriteFutures(futures, allMutations, resultHandler);
+
+ List failedMutations = new ArrayList<>();
+ for (int i = 0; i < 500; i++) {
+ if (i % 2 == 0) {
+ futures.get(i).complete(null);
+ } else {
+ futures.get(i)
+ .completeExceptionally(
+ new HBaseSinkException.HBaseSinkMutationException(
+ new RuntimeException("test")));
+ failedMutations.add(allMutations.get(i));
+ }
+ }
+
+ assertThat(resultHandler.getEntriesRetried()).hasSameElementsAs(failedMutations);
+ assertThat(counter.getCount()).isEqualTo(250);
+ }
+
+ /** Exactly one mutation will throw an exception that cannot be retried, the job should fail. */
+ @Test
+ void testHBaseWriteAsyncHandlerUnrecoverableException() {
+ List mutations =
+ IntStream.range(0, 500)
+ .mapToObj(__ -> generateMutation())
+ .collect(Collectors.toList());
+ List> futures =
+ IntStream.range(0, 500)
+ .mapToObj(__ -> new CompletableFuture())
+ .collect(Collectors.toList());
+
+ handler.handleWriteFutures(futures, mutations, resultHandler);
+ for (int i = 0; i < futures.size(); i++) {
+ if (i == 250) {
+ futures.get(i)
+ .completeExceptionally(
+ new HBaseSinkException.HBaseSinkMutationException(
+ new DoNotRetryIOException("test")));
+ } else {
+ futures.get(i).complete(null);
+ }
+ }
+
+ assertThat(counter.getCount()).isEqualTo(1);
+ assertThat(resultHandler.getException())
+ .isExactlyInstanceOf(HBaseSinkException.HBaseSinkMutationException.class)
+ .hasRootCauseExactlyInstanceOf(DoNotRetryIOException.class)
+ .hasMessage(
+ "Exception while trying to persist records in HBase sink, not retrying.");
+ }
+
+ private SerializableMutation generateMutation() {
+ return new SerializableMutation(new Put(Bytes.toBytes(UUID.randomUUID().toString())));
+ }
+
+ /** Test class to verify usage of {@link ResultHandler}. */
+ static final class TestResultHandler implements ResultHandler {
+ private Boolean complete = false;
+ private Exception exception = null;
+ private List entriesRetried = null;
+
+ @Override
+ public void complete() {
+ complete = true;
+ exception = null;
+ entriesRetried = null;
+ }
+
+ @Override
+ public void completeExceptionally(Exception e) {
+ exception = e;
+ complete = false;
+ entriesRetried = null;
+ }
+
+ @Override
+ public void retryForEntries(List requestEntriesToRetry) {
+ entriesRetried = requestEntriesToRetry;
+ complete = false;
+ exception = null;
+ }
+
+ public Boolean getComplete() {
+ return complete;
+ }
+
+ public Exception getException() {
+ return exception;
+ }
+
+ public List getEntriesRetried() {
+ return entriesRetried;
+ }
+ }
+
+ /** Test class to verify metrics. */
+ static final class TestCounter implements Counter {
+ private long countValue;
+
+ public TestCounter() {
+ this.countValue = 0;
+ }
+
+ @Override
+ public void inc() {
+ countValue++;
+ }
+
+ @Override
+ public void inc(long n) {
+ countValue += n;
+ }
+
+ @Override
+ public void dec() {
+ countValue--;
+ }
+
+ @Override
+ public void dec(long n) {
+ countValue -= n;
+ }
+
+ @Override
+ public long getCount() {
+ return countValue;
+ }
+ }
+}
diff --git a/flink-connector-hbase-e2e-tests/src/test/resources/hbase_e2e.sql b/flink-connector-hbase-e2e-tests/src/test/resources/hbase_e2e.sql
index 2c1c9d05..0385509b 100644
--- a/flink-connector-hbase-e2e-tests/src/test/resources/hbase_e2e.sql
+++ b/flink-connector-hbase-e2e-tests/src/test/resources/hbase_e2e.sql
@@ -32,8 +32,8 @@ CREATE TABLE MyHBaseSink (
'connector' = '$HBASE_CONNECTOR',
'table-name' = 'sink',
'zookeeper.quorum' = 'hbase:2181',
- 'sink.buffer-flush.max-rows' = '1',
- 'sink.buffer-flush.interval' = '2s'
+ 'sink.batch.max-size' = '1',
+ 'sink.flush-buffer.timeout' = '2000'
);
INSERT INTO MyHBaseSink
diff --git a/flink-sql-connector-hbase-2.6/pom.xml b/flink-sql-connector-hbase-2.6/pom.xml
index b3e96ac5..350c3dd3 100644
--- a/flink-sql-connector-hbase-2.6/pom.xml
+++ b/flink-sql-connector-hbase-2.6/pom.xml
@@ -83,6 +83,7 @@ under the License.
org.apache.commons:commons-crypto
org.apache.commons:commons-lang3
io.dropwizard.metrics:metrics-core
+ commons-io:commons-io
org.apache.hbase:hbase-metrics*
diff --git a/flink-sql-connector-hbase-2.6/src/main/resources/META-INF/NOTICE b/flink-sql-connector-hbase-2.6/src/main/resources/META-INF/NOTICE
index 3c079c5f..8b437be6 100644
--- a/flink-sql-connector-hbase-2.6/src/main/resources/META-INF/NOTICE
+++ b/flink-sql-connector-hbase-2.6/src/main/resources/META-INF/NOTICE
@@ -6,6 +6,7 @@ The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
+ - commons-io:commons-io:2.11.0
- commons-codec:commons-codec:1.15
- io.dropwizard.metrics:metrics-core:3.2.6
- org.apache.commons:commons-crypto:1.1.0
diff --git a/pom.xml b/pom.xml
index 5bac97c0..f393ecf7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -159,6 +159,12 @@ under the License.
${flink.version}