From 9848246bfb171a85ea6c33cfbc8de10878badd63 Mon Sep 17 00:00:00 2001 From: mateczagany Date: Thu, 7 Aug 2025 10:58:58 +0200 Subject: [PATCH 1/2] [FLINK-35280] Migrate HBase Sink to use Async Sink API --- docs/content/docs/connectors/table/hbase.md | 65 +++- flink-connector-hbase-2.6/pom.xml | 6 + .../hbase2/HBase2DynamicTableFactory.java | 134 +++++--- .../hbase2/sink/HBaseDynamicTableSink.java | 191 +++++++++-- .../hbase2/HBaseConnectorFailureITCase.java | 180 ++++++++++ .../hbase2/HBaseConnectorITCase.java | 175 ++++++++-- .../hbase2/HBaseDynamicTableFactoryTest.java | 176 ++++++---- .../util/HBaseTestingClusterAutoStarter.java | 2 +- flink-connector-hbase-base/pom.xml | 6 + .../hbase/options/HBaseWriteOptions.java | 183 ----------- .../hbase/sink/HBaseMutationConverter.java | 45 --- .../flink/connector/hbase/sink/HBaseSink.java | 168 ++++++++++ .../hbase/sink/HBaseSinkBuilder.java | 124 +++++++ .../hbase/sink/HBaseSinkException.java | 40 +++ .../hbase/sink/HBaseSinkFunction.java | 308 ------------------ .../hbase/sink/HBaseStateSerializer.java | 30 ++ .../connector/hbase/sink/HBaseWriter.java | 249 ++++++++++++++ .../hbase/sink/HBaseWriterAsyncHandler.java | 156 +++++++++ ...=> RowDataToMutationElementConverter.java} | 14 +- .../hbase/sink/SerializableMutation.java | 27 ++ .../hbase/sink/WrappedElementConverter.java | 36 ++ .../hbase/table/HBaseConnectorOptions.java | 48 ++- .../table/HBaseConnectorOptionsUtil.java | 18 - .../hbase/util/HBaseConfigurationUtil.java | 37 --- .../util/HBaseMutationSerialization.java | 45 +++ .../sink/HBaseWriterAsyncHandlerTest.java | 195 +++++++++++ .../src/test/resources/hbase_e2e.sql | 4 +- flink-sql-connector-hbase-2.6/pom.xml | 1 + pom.xml | 6 + 29 files changed, 1872 insertions(+), 797 deletions(-) create mode 100644 flink-connector-hbase-2.6/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorFailureITCase.java delete mode 100644 flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java delete mode 100644 flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseMutationConverter.java create mode 100644 flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSink.java create mode 100644 flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkBuilder.java create mode 100644 flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkException.java delete mode 100644 flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java create mode 100644 flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseStateSerializer.java create mode 100644 flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriter.java create mode 100644 flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriterAsyncHandler.java rename flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/{RowDataToMutationConverter.java => RowDataToMutationElementConverter.java} (82%) create mode 100644 flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/SerializableMutation.java create mode 100644 flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WrappedElementConverter.java create mode 100644 flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseMutationSerialization.java create mode 100644 flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/sink/HBaseWriterAsyncHandlerTest.java diff --git a/docs/content/docs/connectors/table/hbase.md b/docs/content/docs/connectors/table/hbase.md index 126c612f..4ed4a889 100644 --- a/docs/content/docs/connectors/table/hbase.md +++ b/docs/content/docs/connectors/table/hbase.md @@ -170,37 +170,78 @@ Connector Options Representation for null values for string fields. HBase source and sink encodes/decodes empty bytes as null values for all types except string type. -
sink.buffer-flush.max-size
+
sink.flush-buffer.size
optional yes - 2mb - MemorySize + 2097152 + Long Writing option, maximum size in memory of buffered rows for each writing request. This can improve performance for writing data to HBase database, but may increase the latency. - Can be set to '0' to disable it. -
sink.buffer-flush.max-rows
+
sink.batch.max-size
optional yes 1000 Integer Writing option, maximum number of rows to buffer for each writing request. This can improve performance for writing data to HBase database, but may increase the latency. - Can be set to '0' to disable it. -
sink.buffer-flush.interval
+
sink.flush-buffer.timeout
optional yes - 1s - Duration - Writing option, the interval to flush any buffered rows. + 1000 + Long + Writing option, the threshold time in milliseconds for an element to be in the buffer before flushing out. This can improve performance for writing data to HBase database, but may increase the latency. - Can be set to '0' to disable it. Note, both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' - can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. + + + +
sink.requests.max-inflight
+ optional + yes + 1000 + Integer + Request threshold for uncompleted requests before blocking new write requests. + + + +
sink.requests.max-buffered
+ optional + yes + 1000 + Integer + Maximum number of buffered records before applying backpressure. + + + +
sink.request-timeout
+ optional + yes + 10 min + Duration + The maximum time to wait for a batch of HBase requests to complete before timing out. + + + +
sink.fail-on-timeout
+ optional + yes + false + Boolean + Whether to fail the job when a request times out. If false, timed-out requests will be logged but the job will continue processing. If true, a timeout will cause the job to fail. + + + +
sink.max-record-size
+ optional + yes + 1048576 + Long + The maximum size in bytes of a single record. Records bigger than this will cause the job to fail. diff --git a/flink-connector-hbase-2.6/pom.xml b/flink-connector-hbase-2.6/pom.xml index 82d1637b..85432d05 100644 --- a/flink-connector-hbase-2.6/pom.xml +++ b/flink-connector-hbase-2.6/pom.xml @@ -40,6 +40,12 @@ under the License. provided + + org.apache.flink + flink-connector-base + provided + + org.apache.flink flink-streaming-java diff --git a/flink-connector-hbase-2.6/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java b/flink-connector-hbase-2.6/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java index b16cab92..de60e835 100644 --- a/flink-connector-hbase-2.6/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java +++ b/flink-connector-hbase-2.6/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java @@ -21,7 +21,9 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.connector.hbase.options.HBaseWriteOptions; +import org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory; +import org.apache.flink.connector.base.table.AsyncSinkConnectorOptions; +import org.apache.flink.connector.base.table.sink.options.AsyncSinkConfigurationValidator; import org.apache.flink.connector.hbase.util.HBaseTableSchema; import org.apache.flink.connector.hbase2.sink.HBaseDynamicTableSink; import org.apache.flink.connector.hbase2.source.HBaseDynamicTableSource; @@ -30,7 +32,6 @@ import org.apache.flink.table.connector.source.lookup.LookupOptions; import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache; import org.apache.flink.table.connector.source.lookup.cache.LookupCache; -import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper; @@ -38,6 +39,8 @@ import java.time.Duration; import java.util.HashSet; +import java.util.Optional; +import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -50,21 +53,23 @@ import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_FAIL_ON_TIMEOUT; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_IGNORE_NULL_VALUE; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_MAX_RECORD_SIZE; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_REQUEST_TIMEOUT; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.TABLE_NAME; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_QUORUM; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_ZNODE_PARENT; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.PROPERTIES_PREFIX; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseConfiguration; -import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseWriteOptions; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.validatePrimaryKey; import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper; /** HBase connector factory. */ @Internal -public class HBase2DynamicTableFactory - implements DynamicTableSourceFactory, DynamicTableSinkFactory { +public class HBase2DynamicTableFactory extends AsyncDynamicTableSinkFactory + implements DynamicTableSourceFactory { private static final String IDENTIFIER = "hbase-2.6"; @@ -116,21 +121,29 @@ public DynamicTableSink createDynamicTableSink(Context context) { TableFactoryHelper helper = createTableFactoryHelper(this, context); helper.validateExcept(PROPERTIES_PREFIX); - final ReadableConfig tableOptions = helper.getOptions(); + final ReadableConfig config = helper.getOptions(); validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes()); - String tableName = tableOptions.get(TABLE_NAME); - Configuration hbaseConf = getHBaseConfiguration(tableOptions); - HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions); - String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL); - - return new HBaseDynamicTableSink( - tableName, - context.getPhysicalRowDataType(), - hbaseConf, - hBaseWriteOptions, - nullStringLiteral); + HBaseDynamicTableSink.HBaseDynamicSinkBuilder builder = + HBaseDynamicTableSink.builder() + .setRequestTimeoutMS(config.get(SINK_REQUEST_TIMEOUT).toMillis()) + .setMaxRecordSizeInBytes(config.get(SINK_MAX_RECORD_SIZE)) + .setFailOnTimeout(config.get(SINK_FAIL_ON_TIMEOUT)) + .setTableName(config.get(TABLE_NAME)) + .setConfiguration(getHBaseConfiguration(config)) + .setNullStringLiteral(config.get(NULL_STRING_LITERAL)) + .setPhysicalDataType(context.getPhysicalRowDataType()) + .setParallelism(config.get(SINK_PARALLELISM)) + .setIgnoreNullValue(config.get(SINK_IGNORE_NULL_VALUE)); + + AsyncSinkConfigurationValidator asyncValidator = + new AsyncSinkConfigurationValidator(config); + + addAsyncOptionsToBuilder(getDeprecatedAsyncSinkOptions(config), builder); + addAsyncOptionsToBuilder(asyncValidator.getValidatedConfigurations(), builder); + + return builder.build(); } @Override @@ -147,42 +160,77 @@ public Set> requiredOptions() { @Override public Set> optionalOptions() { - Set> set = new HashSet<>(); - set.add(ZOOKEEPER_ZNODE_PARENT); - set.add(ZOOKEEPER_QUORUM); - set.add(NULL_STRING_LITERAL); - set.add(SINK_BUFFER_FLUSH_MAX_SIZE); - set.add(SINK_BUFFER_FLUSH_MAX_ROWS); - set.add(SINK_BUFFER_FLUSH_INTERVAL); - set.add(SINK_PARALLELISM); - set.add(SINK_IGNORE_NULL_VALUE); - set.add(LOOKUP_ASYNC); - set.add(LOOKUP_CACHE_MAX_ROWS); - set.add(LOOKUP_CACHE_TTL); - set.add(LOOKUP_MAX_RETRIES); - set.add(LookupOptions.CACHE_TYPE); - set.add(LookupOptions.MAX_RETRIES); - set.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS); - set.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE); - set.add(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY); - set.add(LookupOptions.PARTIAL_CACHE_MAX_ROWS); - return set; + Stream> hbaseOptions = + Stream.of( + ZOOKEEPER_ZNODE_PARENT, + ZOOKEEPER_QUORUM, + NULL_STRING_LITERAL, + SINK_BUFFER_FLUSH_MAX_SIZE, + SINK_BUFFER_FLUSH_MAX_ROWS, + SINK_BUFFER_FLUSH_INTERVAL, + SINK_PARALLELISM, + SINK_IGNORE_NULL_VALUE, + SINK_MAX_RECORD_SIZE, + SINK_REQUEST_TIMEOUT, + SINK_FAIL_ON_TIMEOUT, + LOOKUP_ASYNC, + LOOKUP_CACHE_MAX_ROWS, + LOOKUP_CACHE_TTL, + LOOKUP_MAX_RETRIES, + LookupOptions.CACHE_TYPE, + LookupOptions.MAX_RETRIES, + LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS, + LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE, + LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY, + LookupOptions.PARTIAL_CACHE_MAX_ROWS); + Stream> asyncOptions = super.optionalOptions().stream(); + + return Stream.concat(hbaseOptions, asyncOptions).collect(Collectors.toSet()); } @Override public Set> forwardOptions() { - return Stream.of( + Stream> hbaseOptions = + Stream.of( TABLE_NAME, ZOOKEEPER_ZNODE_PARENT, ZOOKEEPER_QUORUM, NULL_STRING_LITERAL, - LOOKUP_CACHE_MAX_ROWS, - LOOKUP_CACHE_TTL, - LOOKUP_MAX_RETRIES, SINK_BUFFER_FLUSH_MAX_SIZE, SINK_BUFFER_FLUSH_MAX_ROWS, SINK_BUFFER_FLUSH_INTERVAL, - SINK_IGNORE_NULL_VALUE) - .collect(Collectors.toSet()); + SINK_IGNORE_NULL_VALUE, + SINK_MAX_RECORD_SIZE, + SINK_REQUEST_TIMEOUT, + SINK_FAIL_ON_TIMEOUT, + LOOKUP_CACHE_MAX_ROWS, + LOOKUP_CACHE_TTL, + LOOKUP_MAX_RETRIES); + Stream> asyncOptions = super.optionalOptions().stream(); + + return Stream.concat(hbaseOptions, asyncOptions).collect(Collectors.toSet()); + } + + private Properties getDeprecatedAsyncSinkOptions(ReadableConfig config) { + Properties properties = new Properties(); + Optional.ofNullable(config.get(SINK_BUFFER_FLUSH_MAX_SIZE)) + .ifPresent( + flushBufferSize -> + properties.put( + AsyncSinkConnectorOptions.FLUSH_BUFFER_SIZE.key(), + flushBufferSize.getBytes())); + Optional.ofNullable(config.get(SINK_BUFFER_FLUSH_MAX_ROWS)) + .ifPresent( + maxBatchSize -> + properties.put( + AsyncSinkConnectorOptions.MAX_BATCH_SIZE.key(), + maxBatchSize)); + Optional.ofNullable(config.get(SINK_BUFFER_FLUSH_INTERVAL)) + .ifPresent( + timeout -> + properties.put( + AsyncSinkConnectorOptions.FLUSH_BUFFER_TIMEOUT.key(), + timeout.toMillis())); + return properties; } } diff --git a/flink-connector-hbase-2.6/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java b/flink-connector-hbase-2.6/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java index fa8ab78c..abda9ab3 100644 --- a/flink-connector-hbase-2.6/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java +++ b/flink-connector-hbase-2.6/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java @@ -20,20 +20,26 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.connector.hbase.options.HBaseWriteOptions; -import org.apache.flink.connector.hbase.sink.HBaseSinkFunction; -import org.apache.flink.connector.hbase.sink.RowDataToMutationConverter; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSink; +import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSinkBuilder; +import org.apache.flink.connector.hbase.sink.HBaseSinkBuilder; +import org.apache.flink.connector.hbase.sink.RowDataToMutationElementConverter; +import org.apache.flink.connector.hbase.sink.SerializableMutation; import org.apache.flink.connector.hbase.sink.WritableMetadata; import org.apache.flink.connector.hbase.util.HBaseTableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; -import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.connector.sink.SinkV2Provider; import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.types.RowKind; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Mutation; + +import javax.annotation.Nullable; import java.util.Collections; import java.util.List; @@ -41,49 +47,80 @@ /** HBase table sink implementation. */ @Internal -public class HBaseDynamicTableSink implements DynamicTableSink, SupportsWritingMetadata { +public class HBaseDynamicTableSink extends AsyncDynamicTableSink + implements SupportsWritingMetadata { + private final Long maxRecordSizeInBytes; + private final Long requestTimeoutMS; + private final Boolean failOnTimeout; private final String tableName; private final HBaseTableSchema hbaseTableSchema; - private final Configuration hbaseConf; - private final HBaseWriteOptions writeOptions; + private final Configuration configuration; private final String nullStringLiteral; private final DataType physicalDataType; + private final Integer parallelism; + private final Boolean ignoreNullValue; /** Metadata that is appended at the end of a physical sink row. */ private List metadataKeys; + public static HBaseDynamicSinkBuilder builder() { + return new HBaseDynamicSinkBuilder(); + } + public HBaseDynamicTableSink( + @Nullable Integer maxBatchSize, + @Nullable Integer maxInFlightRequests, + @Nullable Integer maxBufferedRequests, + @Nullable Long maxBufferSizeInBytes, + @Nullable Long maxTimeInBufferMS, + Long maxRecordSizeInBytes, + Long requestTimeoutMS, + Boolean failOnTimeout, + Integer parallelism, + Boolean ignoreNullValue, String tableName, DataType physicalDataType, - Configuration hbaseConf, - HBaseWriteOptions writeOptions, + Configuration configuration, String nullStringLiteral) { + super( + maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + maxBufferSizeInBytes, + maxTimeInBufferMS); + this.maxRecordSizeInBytes = maxRecordSizeInBytes; + this.requestTimeoutMS = requestTimeoutMS; + this.failOnTimeout = failOnTimeout; + this.parallelism = parallelism; + this.ignoreNullValue = ignoreNullValue; this.tableName = tableName; this.physicalDataType = physicalDataType; this.hbaseTableSchema = HBaseTableSchema.fromDataType(physicalDataType); this.metadataKeys = Collections.emptyList(); - this.hbaseConf = hbaseConf; - this.writeOptions = writeOptions; + this.configuration = configuration; this.nullStringLiteral = nullStringLiteral; } @Override - public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { - HBaseSinkFunction sinkFunction = - new HBaseSinkFunction<>( - tableName, - hbaseConf, - new RowDataToMutationConverter( - hbaseTableSchema, - physicalDataType, - metadataKeys, - nullStringLiteral, - writeOptions.isIgnoreNullValue()), - writeOptions.getBufferFlushMaxSizeInBytes(), - writeOptions.getBufferFlushMaxRows(), - writeOptions.getBufferFlushIntervalMillis()); - return SinkFunctionProvider.of(sinkFunction, writeOptions.getParallelism()); + public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + ElementConverter elementConverter = + new RowDataToMutationElementConverter( + hbaseTableSchema, + physicalDataType, + metadataKeys, + nullStringLiteral, + ignoreNullValue); + HBaseSinkBuilder builder = + new HBaseSinkBuilder() + .setTableName(tableName) + .setConfiguration(configuration) + .setMaxRecordSizeInBytes(maxRecordSizeInBytes) + .setRequestTimeoutMS(requestTimeoutMS) + .setFailOnTimeout(failOnTimeout) + .setElementConverter(elementConverter); + addAsyncOptionsToSinkBuilder(builder); + return SinkV2Provider.of(builder.build(), parallelism); } @Override @@ -111,7 +148,20 @@ public void applyWritableMetadata(List metadataKeys, DataType consumedDa @Override public DynamicTableSink copy() { return new HBaseDynamicTableSink( - tableName, physicalDataType, hbaseConf, writeOptions, nullStringLiteral); + maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + maxBufferSizeInBytes, + maxTimeInBufferMS, + requestTimeoutMS, + maxRecordSizeInBytes, + failOnTimeout, + parallelism, + ignoreNullValue, + tableName, + physicalDataType, + configuration, + nullStringLiteral); } @Override @@ -126,18 +176,93 @@ public HBaseTableSchema getHBaseTableSchema() { return this.hbaseTableSchema; } - @VisibleForTesting - public HBaseWriteOptions getWriteOptions() { - return writeOptions; - } - @VisibleForTesting public Configuration getConfiguration() { - return this.hbaseConf; + return this.configuration; } @VisibleForTesting public String getTableName() { return this.tableName; } + + /** Class for building HBaseDynamicTableSink. */ + @Internal + public static class HBaseDynamicSinkBuilder + extends AsyncDynamicTableSinkBuilder { + + private Long maxRecordSizeInBytes = null; + private Long requestTimeoutMS = null; + private Boolean failOnTimeout = null; + private String tableName = null; + private Configuration configuration = null; + private String nullStringLiteral = null; + private DataType physicalDataType = null; + private Integer parallelism = null; + private Boolean ignoreNullValue = null; + + @Override + public AsyncDynamicTableSink build() { + return new HBaseDynamicTableSink( + getMaxBatchSize(), + getMaxInFlightRequests(), + getMaxBufferedRequests(), + getMaxBufferSizeInBytes(), + getMaxTimeInBufferMS(), + maxRecordSizeInBytes, + requestTimeoutMS, + failOnTimeout, + parallelism, + ignoreNullValue, + tableName, + physicalDataType, + configuration, + nullStringLiteral); + } + + public HBaseDynamicSinkBuilder setMaxRecordSizeInBytes(Long maxRecordSizeInBytes) { + this.maxRecordSizeInBytes = maxRecordSizeInBytes; + return this; + } + + public HBaseDynamicSinkBuilder setRequestTimeoutMS(Long requestTimeoutMS) { + this.requestTimeoutMS = requestTimeoutMS; + return this; + } + + public HBaseDynamicSinkBuilder setFailOnTimeout(Boolean failOnTimeout) { + this.failOnTimeout = failOnTimeout; + return this; + } + + public HBaseDynamicSinkBuilder setTableName(String tableName) { + this.tableName = tableName; + return this; + } + + public HBaseDynamicSinkBuilder setConfiguration(Configuration configuration) { + this.configuration = configuration; + return this; + } + + public HBaseDynamicSinkBuilder setNullStringLiteral(String nullStringLiteral) { + this.nullStringLiteral = nullStringLiteral; + return this; + } + + public HBaseDynamicSinkBuilder setPhysicalDataType(DataType physicalDataType) { + this.physicalDataType = physicalDataType; + return this; + } + + public HBaseDynamicSinkBuilder setParallelism(Integer parallelism) { + this.parallelism = parallelism; + return this; + } + + public HBaseDynamicSinkBuilder setIgnoreNullValue(Boolean ignoreNullValue) { + this.ignoreNullValue = ignoreNullValue; + return this; + } + } } diff --git a/flink-connector-hbase-2.6/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorFailureITCase.java b/flink-connector-hbase-2.6/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorFailureITCase.java new file mode 100644 index 00000000..117b5fd4 --- /dev/null +++ b/flink-connector-hbase-2.6/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorFailureITCase.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.hbase2; + +import org.apache.flink.connector.hbase2.util.HBaseTestBase; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.CollectionUtil; + +import org.apache.hadoop.hbase.HBaseIOException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** IT cases that simulate failures/timeouts for HBase. */ +class HBaseConnectorFailureITCase extends HBaseTestBase { + + @AfterEach + public void afterEach() throws IOException { + if (hbaseTestingUtility.getMiniHBaseCluster().getNumLiveRegionServers() == 0) { + hbaseTestingUtility.getMiniHBaseCluster().startRegionServer(); + } + } + + /** This should fail on timeout because `fail-on-timeout` is set to true. */ + @Test + void testTableSinkWithStoppedRegionServerShouldFail() throws HBaseIOException { + hbaseTestingUtility.getMiniHBaseCluster().stopRegionServer(0); + + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); + + // register values table for source + String dataId = + TestValuesTableFactory.registerData( + Collections.singletonList(Row.ofKind(RowKind.INSERT, 1, Row.of("Hello1")))); + tEnv.executeSql( + "CREATE TABLE source_table (" + + " rowkey INT," + + " family1 ROW," + + " PRIMARY KEY (rowkey) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'data-id' = '" + + dataId + + "'," + + " 'changelog-mode'='I,UA,UB,D'" + + ")"); + + // register HBase table for sink + tEnv.executeSql( + "CREATE TABLE sink_table (" + + " rowkey INT," + + " family1 ROW," + + " PRIMARY KEY (rowkey) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'hbase-2.6'," + + " 'sink.request-timeout' = '100'," + + " 'sink.fail-on-timeout' = 'true'," + + " 'table-name' = '" + + TEST_TABLE_4 + + "'," + + " 'zookeeper.quorum' = '" + + getZookeeperQuorum() + + "'" + + ")"); + + assertThatThrownBy( + () -> + tEnv.executeSql("INSERT INTO sink_table SELECT * FROM source_table") + .await()) + .isExactlyInstanceOf(ExecutionException.class) + .hasCauseExactlyInstanceOf(TableException.class) + .hasRootCauseExactlyInstanceOf(TimeoutException.class); + } + + /** This should not fail on timeout because `fail-on-timeout` is set to false. */ + @Test + void testTableSinkWithStoppedRegionServerShouldNotFail() throws Exception { + hbaseTestingUtility.getMiniHBaseCluster().stopRegionServer(0); + + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); + + // register values table for source + String dataId = + TestValuesTableFactory.registerData( + Arrays.asList( + Row.ofKind(RowKind.INSERT, 1, Row.of("Hello1")), + Row.ofKind(RowKind.DELETE, 1, Row.of("Hello2")), + Row.ofKind(RowKind.INSERT, 2, Row.of("Hello1")), + Row.ofKind(RowKind.INSERT, 2, Row.of("Hello2")), + Row.ofKind(RowKind.INSERT, 2, Row.of("Hello3")), + Row.ofKind(RowKind.DELETE, 2, Row.of("Hello3")), + Row.ofKind(RowKind.INSERT, 1, Row.of("Hello3")))); + tEnv.executeSql( + "CREATE TABLE source_table (" + + " rowkey INT," + + " family1 ROW," + + " PRIMARY KEY (rowkey) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'data-id' = '" + + dataId + + "'," + + " 'changelog-mode'='I,UA,UB,D'" + + ")"); + + // register HBase table for sink + tEnv.executeSql( + "CREATE TABLE sink_table (" + + " rowkey INT," + + " family1 ROW," + + " PRIMARY KEY (rowkey) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'hbase-2.6'," + + " 'sink.request-timeout' = '100'," + + " 'sink.fail-on-timeout' = 'false'," + + " 'table-name' = '" + + TEST_TABLE_4 + + "'," + + " 'zookeeper.quorum' = '" + + getZookeeperQuorum() + + "'" + + ")"); + + // Restart region server to ensure that requests will be tried again after timeout + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + executor.schedule( + () -> { + try { + hbaseTestingUtility.getMiniHBaseCluster().startRegionServer(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, + 3, + TimeUnit.SECONDS); + + tEnv.executeSql("INSERT INTO sink_table SELECT * FROM source_table").await(); + + TableResult result = tEnv.executeSql("SELECT * FROM sink_table"); + + List actual = CollectionUtil.iteratorToList(result.collect()); + assertThat(actual).isEqualTo(Collections.singletonList(Row.of(1, Row.of("Hello3")))); + } +} diff --git a/flink-connector-hbase-2.6/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java b/flink-connector-hbase-2.6/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java index bc89c1b5..da63d882 100644 --- a/flink-connector-hbase-2.6/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java +++ b/flink-connector-hbase-2.6/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java @@ -21,10 +21,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.connector.hbase.sink.HBaseSinkFunction; -import org.apache.flink.connector.hbase.sink.RowDataToMutationConverter; -import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil; +import org.apache.flink.connector.hbase.sink.HBaseSinkException; import org.apache.flink.connector.hbase.util.HBaseTableSchema; import org.apache.flink.connector.hbase2.source.AbstractTableInputFormat; import org.apache.flink.connector.hbase2.source.HBaseRowDataInputFormat; @@ -35,7 +32,6 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.table.data.RowData; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.test.util.TestBaseUtils; @@ -43,10 +39,11 @@ import org.apache.flink.types.RowKind; import org.apache.flink.util.CollectionUtil; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.util.Bytes; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -59,6 +56,7 @@ import java.util.List; import java.util.Spliterator; import java.util.Spliterators; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -66,6 +64,7 @@ import static org.apache.flink.table.api.Expressions.$; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.fail; /** IT cases for HBase connector (including source and sink). */ class HBaseConnectorITCase extends HBaseTestBase { @@ -338,6 +337,115 @@ void testTableSink() throws Exception { TestBaseUtils.compareResultAsText(results, String.join("", expected)); } + @Test + void testTableSinkWithFailedMutation() { + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); + + String dataId = + TestValuesTableFactory.registerData( + Collections.singletonList( + Row.ofKind( + RowKind.INSERT, + 1, + Row.of("Hello1"), + Row.of("Hello1", 200L)))); + tEnv.executeSql( + "CREATE TABLE source_table (" + + " rowkey INT," + + " family1 ROW," + + " family2 ROW," + + " PRIMARY KEY (rowkey) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'data-id' = '" + + dataId + + "'," + + " 'changelog-mode'='I,UA,UB,D'" + + ")"); + + // register HBase table for sink + tEnv.executeSql( + "CREATE TABLE sink_table (" + + " rowkey INT," + + " family1 ROW," + + " family2 ROW," + + " PRIMARY KEY (rowkey) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'hbase-2.6'," + + " 'table-name' = '" + + TEST_TABLE_4 + + "'," + + " 'zookeeper.quorum' = '" + + getZookeeperQuorum() + + "'" + + ")"); + + assertThatThrownBy( + () -> + tEnv.executeSql("INSERT INTO sink_table SELECT * FROM source_table") + .await()) + .isExactlyInstanceOf(ExecutionException.class) + .satisfies( + ex -> { + Throwable cause = ex; + while (cause != null) { + if (cause instanceof HBaseSinkException) { + return; + } + cause = cause.getCause(); + } + fail("Expected to find HBaseSinkException in cause chain"); + }) + .hasRootCauseExactlyInstanceOf(NoSuchColumnFamilyException.class); + } + + /** The INSERT statement should fail as the max record size is too low. */ + @Test + void testTableSinkWithRecordSizeTooBig() { + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); + + String dataId = + TestValuesTableFactory.registerData( + Collections.singletonList(Row.ofKind(RowKind.INSERT, 1, Row.of("Hello1")))); + tEnv.executeSql( + "CREATE TABLE source_table (" + + " rowkey INT," + + " family1 ROW," + + " PRIMARY KEY (rowkey) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'data-id' = '" + + dataId + + "'," + + " 'changelog-mode'='I,UA,UB,D'" + + ")"); + + tEnv.executeSql( + "CREATE TABLE sink_table (" + + " rowkey INT," + + " family1 ROW," + + " PRIMARY KEY (rowkey) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'hbase-2.6'," + + " 'sink.max-record-size' = '1'," + + " 'table-name' = '" + + TEST_TABLE_4 + + "'," + + " 'zookeeper.quorum' = '" + + getZookeeperQuorum() + + "'" + + ")"); + + assertThatThrownBy( + () -> + tEnv.executeSql("INSERT INTO sink_table SELECT * FROM source_table") + .await()) + .isExactlyInstanceOf(ExecutionException.class) + .hasRootCauseExactlyInstanceOf(IllegalArgumentException.class); + } + @Test void testTableSinkWithChangelog() throws Exception { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -635,37 +743,40 @@ void testTableInputFormatTableExistence() throws IOException { } @Test - void testHBaseSinkFunctionTableExistence() throws Exception { - org.apache.hadoop.conf.Configuration hbaseConf = - HBaseConfigurationUtil.getHBaseConfiguration(); - hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, getZookeeperQuorum()); - hbaseConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase"); + void testHBaseSinkFunctionTableExistence() { + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); - HBaseTableSchema tableSchema = new HBaseTableSchema(); - tableSchema.addColumn(FAMILY1, F1COL1, byte[].class); + tEnv.executeSql( + "CREATE TABLE hbase_table (" + + " rowkey INT," + + " family1 ROW," + + " PRIMARY KEY (rowkey) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'hbase-2.6'," + + " 'table-name' = 'TEST-TABLE-DOES-NOT-EXIST'," + + " 'zookeeper.quorum' = '" + + getZookeeperQuorum() + + "'" + + ")"); - HBaseSinkFunction sinkFunction = - new HBaseSinkFunction<>( - TEST_NOT_EXISTS_TABLE, - hbaseConf, - new RowDataToMutationConverter( - tableSchema, - tableSchema.convertToDataType(), - Collections.emptyList(), - "null", - false), - 2 * 1024 * 1024, - 1000, - 1000); - - assertThatThrownBy(() -> sinkFunction.open(new Configuration())) + assertThatThrownBy( + () -> + tEnv.executeSql("INSERT INTO hbase_table SELECT * FROM hbase_table") + .await()) .getRootCause() .isExactlyInstanceOf(TableNotFoundException.class); - - sinkFunction.close(); } + /** + * Moving to Async Sink API changed the Sink class to a yielding operator, which cannot be + * chained to legacy sources according to {@link + * org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator}. It's not feasible to stop + * processing the source table while the sink is still processing records if the operators are + * not chained. + */ @Test + @Disabled void testTableSinkDisabledBufferFlush() throws Exception { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); @@ -676,8 +787,8 @@ void testTableSinkDisabledBufferFlush() throws Exception { + " family1 ROW" + ") WITH (" + " 'connector' = 'hbase-2.6'," - + " 'sink.buffer-flush.max-size' = '0'," - + " 'sink.buffer-flush.max-rows' = '0'," + + " 'sink.batch.max-size' = '1'," + + " 'sink.requests.max-inflight' = '1'," + " 'table-name' = '" + TEST_TABLE_7 + "'," diff --git a/flink-connector-hbase-2.6/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java b/flink-connector-hbase-2.6/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java index 004219d2..a75e0067 100644 --- a/flink-connector-hbase-2.6/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java +++ b/flink-connector-hbase-2.6/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java @@ -19,7 +19,6 @@ package org.apache.flink.connector.hbase2; import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.connector.hbase.options.HBaseWriteOptions; import org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction; import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil; import org.apache.flink.connector.hbase.util.HBaseTableSchema; @@ -30,7 +29,7 @@ import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.sink.DynamicTableSink; -import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.connector.sink.SinkV2Provider; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.LookupTableSource; import org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider; @@ -41,7 +40,6 @@ import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext; import org.apache.flink.table.runtime.connector.source.LookupRuntimeProviderContext; -import org.apache.commons.collections.IteratorUtils; import org.apache.hadoop.hbase.HConstants; import org.junit.jupiter.api.Test; @@ -49,6 +47,20 @@ import java.util.HashMap; import java.util.Map; +import static org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.FLUSH_BUFFER_SIZE; +import static org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.FLUSH_BUFFER_TIMEOUT; +import static org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.MAX_BATCH_SIZE; +import static org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.MAX_BUFFERED_REQUESTS; +import static org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.MAX_IN_FLIGHT_REQUESTS; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_ASYNC; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_FAIL_ON_TIMEOUT; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_IGNORE_NULL_VALUE; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_MAX_RECORD_SIZE; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_REQUEST_TIMEOUT; import static org.apache.flink.table.api.DataTypes.BIGINT; import static org.apache.flink.table.api.DataTypes.BOOLEAN; import static org.apache.flink.table.api.DataTypes.DATE; @@ -60,6 +72,12 @@ import static org.apache.flink.table.api.DataTypes.STRING; import static org.apache.flink.table.api.DataTypes.TIME; import static org.apache.flink.table.api.DataTypes.TIMESTAMP; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.MAX_RETRIES; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.PARTIAL_CACHE_MAX_ROWS; import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink; import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource; import static org.assertj.core.api.Assertions.assertThat; @@ -112,7 +130,7 @@ void testTableSourceFactory() { ((LookupFunctionProvider) lookupProvider).createLookupFunction(); assertThat(tableFunction).isInstanceOf(HBaseRowDataLookupFunction.class); assertThat(((HBaseRowDataLookupFunction) tableFunction).getHTableName()) - .isEqualTo("testHBastTable"); + .isEqualTo("testHBaseTable"); HBaseTableSchema hbaseSchema = hbaseSource.getHBaseTableSchema(); assertThat(hbaseSchema.getRowKeyIndex()).isEqualTo(2); @@ -135,12 +153,12 @@ void testTableSourceFactory() { void testLookupOptions() { ResolvedSchema schema = ResolvedSchema.of(Column.physical(ROWKEY, STRING())); Map options = getAllOptions(); - options.put("lookup.cache", "PARTIAL"); - options.put("lookup.partial-cache.expire-after-access", "15213s"); - options.put("lookup.partial-cache.expire-after-write", "18213s"); - options.put("lookup.partial-cache.max-rows", "10000"); - options.put("lookup.partial-cache.cache-missing-key", "false"); - options.put("lookup.max-retries", "15513"); + options.put(CACHE_TYPE.key(), "PARTIAL"); + options.put(PARTIAL_CACHE_EXPIRE_AFTER_ACCESS.key(), "15213s"); + options.put(PARTIAL_CACHE_EXPIRE_AFTER_WRITE.key(), "18213s"); + options.put(PARTIAL_CACHE_MAX_ROWS.key(), "10000"); + options.put(PARTIAL_CACHE_CACHE_MISSING_KEY.key(), "false"); + options.put(MAX_RETRIES.key(), "15513"); DynamicTableSource source = createTableSource(schema, options); HBaseDynamicTableSource hbaseSource = (HBaseDynamicTableSource) source; @@ -176,9 +194,9 @@ FAMILY3, ROW(FIELD(COL2, BOOLEAN()), FIELD(COL3, STRING()))), DynamicTableSink sink = createTableSink(schema, getAllOptions()); assertThat(sink).isInstanceOf(HBaseDynamicTableSink.class); - HBaseDynamicTableSink hbaseSink = (HBaseDynamicTableSink) sink; + HBaseDynamicTableSink actualSink = (HBaseDynamicTableSink) sink; - HBaseTableSchema hbaseSchema = hbaseSink.getHBaseTableSchema(); + HBaseTableSchema hbaseSchema = actualSink.getHBaseTableSchema(); assertThat(hbaseSchema.getRowKeyIndex()).isZero(); assertThat(hbaseSchema.getRowKeyDataType()).contains(STRING()); @@ -200,68 +218,104 @@ FAMILY3, ROW(FIELD(COL2, BOOLEAN()), FIELD(COL3, STRING()))), expectedConfiguration.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/flink"); expectedConfiguration.set("hbase.security.authentication", "kerberos"); - org.apache.hadoop.conf.Configuration actualConfiguration = hbaseSink.getConfiguration(); - - assertThat(IteratorUtils.toList(actualConfiguration.iterator())) - .isEqualTo(IteratorUtils.toList(expectedConfiguration.iterator())); - // verify tableName - assertThat(hbaseSink.getTableName()).isEqualTo("testHBastTable"); - - HBaseWriteOptions expectedWriteOptions = - HBaseWriteOptions.builder() - .setBufferFlushMaxRows(1000) - .setBufferFlushIntervalMillis(1000) - .setBufferFlushMaxSizeInBytes(2 * 1024 * 1024) - .build(); - HBaseWriteOptions actualWriteOptions = hbaseSink.getWriteOptions(); - assertThat(actualWriteOptions).isEqualTo(expectedWriteOptions); + assertThat(actualSink.getTableName()).isEqualTo("testHBaseTable"); + + HBaseDynamicTableSink expectedSink = + (HBaseDynamicTableSink) + HBaseDynamicTableSink.builder() + .setPhysicalDataType(schema.toPhysicalRowDataType()) + .setConfiguration(expectedConfiguration) + .build(); + assertThat(actualSink) + .usingRecursiveComparison() + .comparingOnlyFields("configuration") + .isEqualTo(expectedSink); } @Test - void testBufferFlushOptions() { + void testAsyncOptions() { Map options = getAllOptions(); - options.put("sink.buffer-flush.max-size", "10mb"); - options.put("sink.buffer-flush.max-rows", "100"); - options.put("sink.buffer-flush.interval", "10s"); + options.put(MAX_BATCH_SIZE.key(), "100"); + options.put(MAX_IN_FLIGHT_REQUESTS.key(), "62"); + options.put(MAX_BUFFERED_REQUESTS.key(), "72"); + options.put(FLUSH_BUFFER_SIZE.key(), String.valueOf(10 * 1024 * 1024)); + options.put(FLUSH_BUFFER_TIMEOUT.key(), "10000"); + options.put(SINK_REQUEST_TIMEOUT.key(), "5 s"); + options.put(SINK_FAIL_ON_TIMEOUT.key(), "true"); + options.put(SINK_IGNORE_NULL_VALUE.key(), "true"); + options.put(SINK_MAX_RECORD_SIZE.key(), "6123"); ResolvedSchema schema = ResolvedSchema.of(Column.physical(ROWKEY, STRING())); - DynamicTableSink sink = createTableSink(schema, options); - HBaseWriteOptions expected = - HBaseWriteOptions.builder() - .setBufferFlushMaxRows(100) - .setBufferFlushIntervalMillis(10 * 1000) - .setBufferFlushMaxSizeInBytes(10 * 1024 * 1024) - .build(); - HBaseWriteOptions actual = ((HBaseDynamicTableSink) sink).getWriteOptions(); - assertThat(actual).isEqualTo(expected); + DynamicTableSink actualSink = createTableSink(schema, options); + + HBaseDynamicTableSink expectedSink = + (HBaseDynamicTableSink) + HBaseDynamicTableSink.builder() + .setMaxBatchSize(100) + .setMaxInFlightRequests(62) + .setMaxBufferedRequests(72) + .setMaxBufferSizeInBytes(10 * 1024 * 1024) + .setMaxTimeInBufferMS(10 * 1000) + .setRequestTimeoutMS(5L * 1000L) + .setMaxRecordSizeInBytes(6123L) + .setFailOnTimeout(true) + .setIgnoreNullValue(true) + .setPhysicalDataType(schema.toPhysicalRowDataType()) + .build(); + assertThat(actualSink) + .usingRecursiveComparison() + .comparingOnlyFields( + "maxBatchSize", + "maxInFlightRequests", + "maxBufferedRequests", + "maxBufferSizeInBytes", + "maxTimeInBufferMS", + "requestTimeoutMS", + "maxRecordSizeInBytes", + "failOnTimeout", + "ignoreNullValue") + .isEqualTo(expectedSink); } @Test - void testSinkIgnoreNullValueOptions() { + void testDeprecatedAsyncOptions() { Map options = getAllOptions(); - options.put("sink.ignore-null-value", "true"); + options.put(SINK_BUFFER_FLUSH_MAX_SIZE.key(), "5"); + options.put(SINK_BUFFER_FLUSH_MAX_ROWS.key(), "6"); + options.put(SINK_BUFFER_FLUSH_INTERVAL.key(), "7"); ResolvedSchema schema = ResolvedSchema.of(Column.physical(ROWKEY, STRING())); - DynamicTableSink sink = createTableSink(schema, options); - HBaseWriteOptions actual = ((HBaseDynamicTableSink) sink).getWriteOptions(); - assertThat(actual.isIgnoreNullValue()).isTrue(); + DynamicTableSink actualSink = createTableSink(schema, options); + + HBaseDynamicTableSink expectedSink = + (HBaseDynamicTableSink) + HBaseDynamicTableSink.builder() + .setMaxBufferSizeInBytes(5) + .setMaxBatchSize(6) + .setMaxTimeInBufferMS(7L) + .setPhysicalDataType(schema.toPhysicalRowDataType()) + .build(); + assertThat(actualSink) + .usingRecursiveComparison() + .comparingOnlyFields("maxBufferSizeInBytes", "maxBatchSize", "maxTimeInBufferMS") + .isEqualTo(expectedSink); } @Test void testParallelismOptions() { Map options = getAllOptions(); - options.put("sink.parallelism", "2"); + options.put(SINK_PARALLELISM.key(), "2"); ResolvedSchema schema = ResolvedSchema.of(Column.physical(ROWKEY, STRING())); DynamicTableSink sink = createTableSink(schema, options); assertThat(sink).isInstanceOf(HBaseDynamicTableSink.class); HBaseDynamicTableSink hbaseSink = (HBaseDynamicTableSink) sink; - SinkFunctionProvider provider = - (SinkFunctionProvider) + SinkV2Provider provider = + (SinkV2Provider) hbaseSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)); assertThat(provider.getParallelism()).contains(2); } @@ -269,7 +323,7 @@ void testParallelismOptions() { @Test void testLookupAsync() { Map options = getAllOptions(); - options.put("lookup.async", "true"); + options.put(LOOKUP_ASYNC.key(), "true"); ResolvedSchema schema = ResolvedSchema.of( Column.physical(ROWKEY, STRING()), @@ -287,27 +341,7 @@ void testLookupAsync() { ((AsyncLookupFunctionProvider) lookupProvider).createAsyncLookupFunction(); assertThat(asyncTableFunction).isInstanceOf(HBaseRowDataAsyncLookupFunction.class); assertThat(((HBaseRowDataAsyncLookupFunction) asyncTableFunction).getHTableName()) - .isEqualTo("testHBastTable"); - } - - @Test - void testDisabledBufferFlushOptions() { - Map options = getAllOptions(); - options.put("sink.buffer-flush.max-size", "0"); - options.put("sink.buffer-flush.max-rows", "0"); - options.put("sink.buffer-flush.interval", "0"); - - ResolvedSchema schema = ResolvedSchema.of(Column.physical(ROWKEY, STRING())); - - DynamicTableSink sink = createTableSink(schema, options); - HBaseWriteOptions expected = - HBaseWriteOptions.builder() - .setBufferFlushMaxRows(0) - .setBufferFlushIntervalMillis(0) - .setBufferFlushMaxSizeInBytes(0) - .build(); - HBaseWriteOptions actual = ((HBaseDynamicTableSink) sink).getWriteOptions(); - assertThat(actual).isEqualTo(expected); + .isEqualTo("testHBaseTable"); } @Test @@ -367,7 +401,7 @@ void testTypeWithUnsupportedPrecision() { private Map getAllOptions() { Map options = new HashMap<>(); options.put("connector", "hbase-2.6"); - options.put("table-name", "testHBastTable"); + options.put("table-name", "testHBaseTable"); options.put("zookeeper.quorum", "localhost:2181"); options.put("zookeeper.znode.parent", "/flink"); options.put("properties.hbase.security.authentication", "kerberos"); diff --git a/flink-connector-hbase-2.6/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestingClusterAutoStarter.java b/flink-connector-hbase-2.6/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestingClusterAutoStarter.java index 676d8853..eef0c401 100644 --- a/flink-connector-hbase-2.6/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestingClusterAutoStarter.java +++ b/flink-connector-hbase-2.6/src/test/java/org/apache/flink/connector/hbase2/util/HBaseTestingClusterAutoStarter.java @@ -68,7 +68,7 @@ public class HBaseTestingClusterAutoStarter { private static final Range HADOOP_VERSION_RANGE = Range.between("2.8.0", "3.0.3", VersionUtil::compareVersions); - private static HBaseTestingUtility hbaseTestingUtility; + protected static HBaseTestingUtility hbaseTestingUtility; private static Admin admin = null; private static List createdTables = new ArrayList<>(); diff --git a/flink-connector-hbase-base/pom.xml b/flink-connector-hbase-base/pom.xml index fb2244e8..62005196 100644 --- a/flink-connector-hbase-base/pom.xml +++ b/flink-connector-hbase-base/pom.xml @@ -40,6 +40,12 @@ under the License. provided + + org.apache.flink + flink-connector-base + provided + + org.apache.flink flink-streaming-java diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java deleted file mode 100644 index 94627834..00000000 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseWriteOptions.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.hbase.options; - -import org.apache.flink.annotation.Internal; - -import org.apache.hadoop.hbase.client.ConnectionConfiguration; - -import java.io.Serializable; -import java.util.Objects; - -/** Options for HBase writing. */ -@Internal -public class HBaseWriteOptions implements Serializable { - - private static final long serialVersionUID = 1L; - - private final long bufferFlushMaxSizeInBytes; - private final long bufferFlushMaxRows; - private final long bufferFlushIntervalMillis; - private final boolean ignoreNullValue; - private final Integer parallelism; - - private HBaseWriteOptions( - long bufferFlushMaxSizeInBytes, - long bufferFlushMaxMutations, - long bufferFlushIntervalMillis, - boolean ignoreNullValue, - Integer parallelism) { - this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes; - this.bufferFlushMaxRows = bufferFlushMaxMutations; - this.bufferFlushIntervalMillis = bufferFlushIntervalMillis; - this.ignoreNullValue = ignoreNullValue; - this.parallelism = parallelism; - } - - public long getBufferFlushMaxSizeInBytes() { - return bufferFlushMaxSizeInBytes; - } - - public long getBufferFlushMaxRows() { - return bufferFlushMaxRows; - } - - public long getBufferFlushIntervalMillis() { - return bufferFlushIntervalMillis; - } - - public boolean isIgnoreNullValue() { - return ignoreNullValue; - } - - public Integer getParallelism() { - return parallelism; - } - - @Override - public String toString() { - return "HBaseWriteOptions{" - + "bufferFlushMaxSizeInBytes=" - + bufferFlushMaxSizeInBytes - + ", bufferFlushMaxRows=" - + bufferFlushMaxRows - + ", bufferFlushIntervalMillis=" - + bufferFlushIntervalMillis - + ", ignoreNullValue=" - + ignoreNullValue - + ", parallelism=" - + parallelism - + '}'; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - HBaseWriteOptions that = (HBaseWriteOptions) o; - return bufferFlushMaxSizeInBytes == that.bufferFlushMaxSizeInBytes - && bufferFlushMaxRows == that.bufferFlushMaxRows - && bufferFlushIntervalMillis == that.bufferFlushIntervalMillis - && ignoreNullValue == that.ignoreNullValue - && parallelism == that.parallelism; - } - - @Override - public int hashCode() { - return Objects.hash( - bufferFlushMaxSizeInBytes, - bufferFlushMaxRows, - bufferFlushIntervalMillis, - parallelism); - } - - /** Creates a builder for {@link HBaseWriteOptions}. */ - public static Builder builder() { - return new Builder(); - } - - /** Builder for {@link HBaseWriteOptions}. */ - public static class Builder { - - private long bufferFlushMaxSizeInBytes = ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT; - private long bufferFlushMaxRows = 0; - private long bufferFlushIntervalMillis = 0; - private boolean ignoreNullValue; - private Integer parallelism; - - /** - * Optional. Sets when to flush a buffered request based on the memory size of rows - * currently added. Default to 2mb. - */ - public Builder setBufferFlushMaxSizeInBytes(long bufferFlushMaxSizeInBytes) { - this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes; - return this; - } - - /** - * Optional. Sets when to flush buffered request based on the number of rows currently - * added. Defaults to not set, i.e. won't flush based on the number of buffered rows. - */ - public Builder setBufferFlushMaxRows(long bufferFlushMaxRows) { - this.bufferFlushMaxRows = bufferFlushMaxRows; - return this; - } - - /** - * Optional. Sets a flush interval flushing buffered requesting if the interval passes, in - * milliseconds. Defaults to not set, i.e. won't flush based on flush interval. - */ - public Builder setBufferFlushIntervalMillis(long bufferFlushIntervalMillis) { - this.bufferFlushIntervalMillis = bufferFlushIntervalMillis; - return this; - } - - /** - * Optional. Sets whether ignore null value or not. By defaults, null value will be writing. - */ - public Builder setIgnoreNullValue(boolean ignoreNullValue) { - this.ignoreNullValue = ignoreNullValue; - return this; - } - - /** - * Optional. Defines the parallelism of the HBase sink operator. By default, the parallelism - * is determined by the framework using the same parallelism of the upstream chained - * operator. - */ - public Builder setParallelism(Integer parallelism) { - this.parallelism = parallelism; - return this; - } - - /** Creates a new instance of {@link HBaseWriteOptions}. */ - public HBaseWriteOptions build() { - return new HBaseWriteOptions( - bufferFlushMaxSizeInBytes, - bufferFlushMaxRows, - bufferFlushIntervalMillis, - ignoreNullValue, - parallelism); - } - } -} diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseMutationConverter.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseMutationConverter.java deleted file mode 100644 index d89bd4ca..00000000 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseMutationConverter.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.hbase.sink; - -import org.apache.flink.annotation.Internal; - -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; - -import java.io.Serializable; - -/** - * A converter used to converts the input record into HBase {@link Mutation}. - * - * @param type of input record. - */ -@Internal -public interface HBaseMutationConverter extends Serializable { - - /** Initialization method for the function. It is called once before conversion method. */ - void open(); - - /** - * Converts the input record into HBase {@link Mutation}. A mutation can be a {@link Put} or - * {@link Delete}. - */ - Mutation convertToMutation(T record); -} diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSink.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSink.java new file mode 100644 index 00000000..93d25716 --- /dev/null +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSink.java @@ -0,0 +1,168 @@ +package org.apache.flink.connector.hbase.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import org.apache.hadoop.conf.Configuration; + +import java.util.Collection; +import java.util.Collections; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A sink implementation for writing data to Apache HBase tables using asynchronous batching. + * + *

This sink extends {@link AsyncSinkBase} to provide efficient, batched writes to HBase tables. + * It buffers incoming records and writes them in configurable batches to optimize throughput and + * reduce the number of round trips to HBase. + * + *

The sink supports configurable batching parameters including batch size limits, time-based + * flushing, and request timeout handling. It uses an {@link ElementConverter} to transform input + * records of type {@code T} into HBase mutations. + * + *

Note: It is recommended to use {@link HBaseSinkBuilder} to create instances of this + * class rather than calling the constructor directly. The builder provides a more convenient and + * readable way to configure the sink: + * + *

{@code
+ * HBaseSink sink = new HBaseSinkBuilder()
+ *     .setTableName("my_table")
+ *     .setElementConverter(myElementConverter)
+ *     .setMaxBatchSize(100)
+ *     .setMaxInFlightRequests(5)
+ *     .setMaxBufferedRequests(1000)
+ *     .setMaxBatchSizeInBytes(1048576L)  // 1MB
+ *     .setMaxTimeInBufferMS(5000L)       // 5 seconds
+ *     .setMaxRecordSizeInBytes(10485760L) // 10MB
+ *     .setRequestTimeoutMS(30000L)       // 30 seconds
+ *     .setFailOnTimeout(false)
+ *     .setConfiguration(hbaseConfiguration)
+ *     .build();
+ * }
+ * + * @param The type of input elements to be written to HBase + */ +@PublicEvolving +public class HBaseSink extends AsyncSinkBase { + + /** + * The converter used to transform input elements into HBase mutations wrapped by {@link + * SerializableMutation}. + */ + private final ElementConverter elementConverter; + + /** The name of the HBase table to write to. */ + private final String tableName; + + /** + * Serialized Hadoop configuration for HBase connection settings. Stored as a byte array to + * ensure that the sink is serializable. + */ + private final byte[] serializedHadoopConfiguration; + + public HBaseSink( + ElementConverter elementConverter, + int maxBatchSize, + int maxInFlightRequests, + int maxBufferedRequests, + long maxBatchSizeInBytes, + long maxTimeInBufferMS, + long maxRecordSizeInBytes, + long requestTimeoutMS, + boolean failOnTimeout, + String tableName, + Configuration configuration) { + super( + elementConverter, + maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInBytes, + maxTimeInBufferMS, + maxRecordSizeInBytes, + requestTimeoutMS, + failOnTimeout); + + checkNotNull(tableName, "HBase sink table name must not be null."); + checkArgument(!tableName.isEmpty(), "HBase sink table name must not be empty."); + checkNotNull(configuration, "HBase configuration must not be null."); + + this.elementConverter = elementConverter; + this.tableName = tableName; + this.serializedHadoopConfiguration = + HBaseConfigurationUtil.serializeConfiguration(configuration); + } + + /** This can be removed once rebased to Flink 2.0. */ + public SinkWriter createWriter(InitContext context) { + return new HBaseWriter<>( + elementConverter, + context, + Collections.emptyList(), + getMaxBatchSize(), + getMaxBatchSizeInBytes(), + getMaxInFlightRequests(), + getMaxBufferedRequests(), + getMaxTimeInBufferMS(), + getMaxRecordSizeInBytes(), + getRequestTimeoutMS(), + getFailOnTimeout(), + tableName, + HBaseConfigurationUtil.deserializeConfiguration( + serializedHadoopConfiguration, null)); + } + + @Override + public SinkWriter createWriter(WriterInitContext writerInitContext) { + return new HBaseWriter<>( + elementConverter, + writerInitContext, + Collections.emptyList(), + getMaxBatchSize(), + getMaxBatchSizeInBytes(), + getMaxInFlightRequests(), + getMaxBufferedRequests(), + getMaxTimeInBufferMS(), + getMaxRecordSizeInBytes(), + getRequestTimeoutMS(), + getFailOnTimeout(), + tableName, + HBaseConfigurationUtil.deserializeConfiguration( + serializedHadoopConfiguration, null)); + } + + @Override + public StatefulSinkWriter> restoreWriter( + WriterInitContext context, + Collection> recoveredState) { + return new HBaseWriter<>( + elementConverter, + context, + recoveredState, + getMaxBatchSize(), + getMaxBatchSizeInBytes(), + getMaxInFlightRequests(), + getMaxBufferedRequests(), + getMaxTimeInBufferMS(), + getMaxRecordSizeInBytes(), + getRequestTimeoutMS(), + getFailOnTimeout(), + tableName, + HBaseConfigurationUtil.deserializeConfiguration( + serializedHadoopConfiguration, null)); + } + + @Override + public SimpleVersionedSerializer> + getWriterStateSerializer() { + return new HBaseStateSerializer(); + } +} diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkBuilder.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkBuilder.java new file mode 100644 index 00000000..d57eda67 --- /dev/null +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkBuilder.java @@ -0,0 +1,124 @@ +package org.apache.flink.connector.hbase.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; +import org.apache.flink.connector.base.sink.writer.ElementConverter; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Mutation; + +import java.util.Optional; + +/** + * A builder class for constructing {@link HBaseSink} instances with a fluent API. + * + *

This builder provides a convenient way to configure and create HBase sinks with proper default + * values for all configuration parameters. It extends {@link AsyncSinkBaseBuilder} to inherit + * common async sink configuration options while adding HBase-specific settings. + * + *

The builder ensures that all required parameters are set before building the sink and provides + * sensible defaults for optional parameters: + * + *

    + *
  • maxBatchSize: 1000 records + *
  • maxInFlightRequests: 50 concurrent requests + *
  • maxBufferedRequests: 10000 records + *
  • maxBatchSizeInBytes: 2MB + *
  • maxTimeInBufferMS: 1 second + *
  • maxRecordSizeInBytes: 1MB + *
  • requestTimeoutMS: 60 seconds + *
  • failOnTimeout: false + *
+ * + *

Example usage: + * + *

{@code
+ * HBaseSink sink = new HBaseSinkBuilder()
+ *     .setTableName("my_table")
+ *     .setElementConverter(myElementConverter)
+ *     .setConfiguration(hbaseConfiguration)
+ *     .setMaxBatchSize(100)
+ *     .setMaxInFlightRequests(5)
+ *     .setMaxBufferedRequests(1000)
+ *     .setMaxBatchSizeInBytes(2097152L)  // 1MB
+ *     .setMaxTimeInBufferMS(5000L)       // 5 seconds
+ *     .setMaxRecordSizeInBytes(1048576L) // 10MB
+ *     .setRequestTimeoutMS(30000L)       // 30 seconds
+ *     .setFailOnTimeout(false)
+ *     .build();
+ * }
+ * + * @param The type of input elements to be written to HBase + */ +@PublicEvolving +public class HBaseSinkBuilder + extends AsyncSinkBaseBuilder> { + + private static final Integer DEFAULT_MAX_BATCH_SIZE = 1000; + private static final Integer DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50; + private static final Integer DEFAULT_MAX_BUFFERED_REQUESTS = 10000; + private static final Long DEFAULT_MAX_BATCH_SIZE_IN_BYTES = 2L * 1024L * 1024L; + private static final Long DEFAULT_MAX_TIME_IN_BUFFER_MS = 1000L; + private static final Long DEFAULT_MAX_RECORD_SIZE_IN_BYTES = 1024L * 1024L; + private static final Long DEFAULT_MAX_REQUEST_TIMEOUT_MS = 60L * 1000L; + private static final Boolean DEFAULT_FAIL_ON_TIMEOUT = false; + + private String tableName; + private Configuration configuration; + private ElementConverter elementConverter; + private Long requestTimeoutMS = null; + private Boolean failOnTimeout = null; + + public HBaseSinkBuilder() {} + + public HBaseSinkBuilder setTableName(String tableName) { + this.tableName = tableName; + return this; + } + + public HBaseSinkBuilder setConfiguration(Configuration configuration) { + this.configuration = configuration; + return this; + } + + public HBaseSinkBuilder setRequestTimeoutMS(Long requestTimeoutMS) { + this.requestTimeoutMS = requestTimeoutMS; + return this; + } + + public HBaseSinkBuilder setFailOnTimeout(Boolean failOnTimeout) { + this.failOnTimeout = failOnTimeout; + return this; + } + + /** + * Set up the converter to use when converting the input elements to a {@link Mutation} object. + * Since {@link Mutation} objects don't implement {@link java.io.Serializable}, this will + * internally wrap the passed {@link ElementConverter} to create a {@link SerializableMutation} + * for the Sink. + */ + public HBaseSinkBuilder setElementConverter( + ElementConverter elementConverter) { + this.elementConverter = new WrappedElementConverter<>(elementConverter); + return this; + } + + @Override + public HBaseSink build() { + return new HBaseSink<>( + elementConverter, + Optional.ofNullable(getMaxBatchSize()).orElse(DEFAULT_MAX_BATCH_SIZE), + Optional.ofNullable(getMaxInFlightRequests()) + .orElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS), + Optional.ofNullable(getMaxBufferedRequests()).orElse(DEFAULT_MAX_BUFFERED_REQUESTS), + Optional.ofNullable(getMaxBatchSizeInBytes()) + .orElse(DEFAULT_MAX_BATCH_SIZE_IN_BYTES), + Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS), + Optional.ofNullable(getMaxRecordSizeInBytes()) + .orElse(DEFAULT_MAX_RECORD_SIZE_IN_BYTES), + Optional.ofNullable(requestTimeoutMS).orElse(DEFAULT_MAX_REQUEST_TIMEOUT_MS), + Optional.ofNullable(failOnTimeout).orElse(DEFAULT_FAIL_ON_TIMEOUT), + tableName, + configuration); + } +} diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkException.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkException.java new file mode 100644 index 00000000..3dbf91ee --- /dev/null +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkException.java @@ -0,0 +1,40 @@ +package org.apache.flink.connector.hbase.sink; + +import org.apache.flink.annotation.PublicEvolving; + +/** Exception that can be thrown by {@link HBaseWriter} wrapping the HBase {@link Throwable}. */ +@PublicEvolving +public class HBaseSinkException extends RuntimeException { + + public HBaseSinkException(String message) { + super(message); + } + + public HBaseSinkException(String message, Throwable throwable) { + super(message, throwable); + } + + /** Exception thrown during HBase sink initialization. */ + public static class HBaseSinkInitException extends HBaseSinkException { + private static final String ERROR_MESSAGE = + "Exception while trying to initialize HBase sink."; + + public HBaseSinkInitException(Throwable throwable) { + super(ERROR_MESSAGE, throwable); + } + + public HBaseSinkInitException(String message) { + super(message); + } + } + + /** Exception thrown when trying to persist HBase mutations. */ + public static class HBaseSinkMutationException extends HBaseSinkException { + private static final String ERROR_MESSAGE = + "Exception while trying to persist records in HBase sink, not retrying."; + + public HBaseSinkMutationException(Throwable throwable) { + super(ERROR_MESSAGE, throwable); + } + } +} diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java deleted file mode 100644 index fbe8dcd9..00000000 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java +++ /dev/null @@ -1,308 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.hbase.sink; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.util.StringUtils; -import org.apache.flink.util.concurrent.ExecutorThreadFactory; - -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.BufferedMutator; -import org.apache.hadoop.hbase.client.BufferedMutatorParams; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -/** - * The sink function for HBase. - * - *

This class leverage {@link BufferedMutator} to buffer multiple {@link - * org.apache.hadoop.hbase.client.Mutation Mutations} before sending the requests to cluster. The - * buffering strategy can be configured by {@code bufferFlushMaxSizeInBytes}, {@code - * bufferFlushMaxMutations} and {@code bufferFlushIntervalMillis}. - */ -@Internal -public class HBaseSinkFunction extends RichSinkFunction - implements CheckpointedFunction, BufferedMutator.ExceptionListener { - - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(HBaseSinkFunction.class); - - private final String hTableName; - private final byte[] serializedConfig; - - private final long bufferFlushMaxSizeInBytes; - private final long bufferFlushMaxMutations; - private final long bufferFlushIntervalMillis; - private final HBaseMutationConverter mutationConverter; - - private transient Connection connection; - private transient DeduplicatedMutator mutator; - - private transient ScheduledExecutorService executor; - private transient ScheduledFuture scheduledFuture; - private transient AtomicLong numPendingRequests; - - private transient volatile boolean closed = false; - - /** - * This is set from inside the {@link BufferedMutator.ExceptionListener} if a {@link Throwable} - * was thrown. - * - *

Errors will be checked and rethrown before processing each input element, and when the - * sink is closed. - */ - private final AtomicReference failureThrowable = new AtomicReference<>(); - - public HBaseSinkFunction( - String hTableName, - org.apache.hadoop.conf.Configuration conf, - HBaseMutationConverter mutationConverter, - long bufferFlushMaxSizeInBytes, - long bufferFlushMaxMutations, - long bufferFlushIntervalMillis) { - this.hTableName = hTableName; - // Configuration is not serializable - this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf); - this.mutationConverter = mutationConverter; - this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes; - this.bufferFlushMaxMutations = bufferFlushMaxMutations; - this.bufferFlushIntervalMillis = bufferFlushIntervalMillis; - } - - @Override - public void open(Configuration parameters) throws Exception { - LOG.info("start open ..."); - org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration(); - try { - this.mutationConverter.open(); - this.numPendingRequests = new AtomicLong(0); - - if (null == connection) { - this.connection = ConnectionFactory.createConnection(config); - } - - TableName tableName = TableName.valueOf(hTableName); - if (!connection.getAdmin().tableExists(tableName)) { - throw new TableNotFoundException(tableName); - } - - // create a parameter instance, set the table name and custom listener reference. - BufferedMutatorParams params = new BufferedMutatorParams(tableName).listener(this); - if (bufferFlushMaxSizeInBytes > 0) { - params.writeBufferSize(bufferFlushMaxSizeInBytes); - } - this.mutator = - new DeduplicatedMutator( - (int) bufferFlushMaxMutations, connection.getBufferedMutator(params)); - - if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) { - this.executor = - Executors.newScheduledThreadPool( - 1, new ExecutorThreadFactory("hbase-upsert-sink-flusher")); - this.scheduledFuture = - this.executor.scheduleWithFixedDelay( - () -> { - if (closed) { - return; - } - try { - flush(); - } catch (Exception e) { - // fail the sink and skip the rest of the items - // if the failure handler decides to throw an exception - failureThrowable.compareAndSet(null, e); - } - }, - bufferFlushIntervalMillis, - bufferFlushIntervalMillis, - TimeUnit.MILLISECONDS); - } - } catch (TableNotFoundException tnfe) { - LOG.error("The table " + hTableName + " not found ", tnfe); - throw new RuntimeException("HBase table '" + hTableName + "' not found.", tnfe); - } catch (IOException ioe) { - LOG.error("Exception while creating connection to HBase.", ioe); - throw new RuntimeException("Cannot create connection to HBase.", ioe); - } - LOG.info("end open."); - } - - private org.apache.hadoop.conf.Configuration prepareRuntimeConfiguration() throws IOException { - // create default configuration from current runtime env (`hbase-site.xml` in classpath) - // first, - // and overwrite configuration using serialized configuration from client-side env - // (`hbase-site.xml` in classpath). - // user params from client-side have the highest priority - org.apache.hadoop.conf.Configuration runtimeConfig = - HBaseConfigurationUtil.deserializeConfiguration( - serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration()); - - // do validation: check key option(s) in final runtime configuration - if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) { - LOG.error( - "Can not connect to HBase without {} configuration", - HConstants.ZOOKEEPER_QUORUM); - throw new IOException( - "Check HBase configuration failed, lost: '" - + HConstants.ZOOKEEPER_QUORUM - + "'!"); - } - - return runtimeConfig; - } - - private void checkErrorAndRethrow() { - Throwable cause = failureThrowable.get(); - if (cause != null) { - throw new RuntimeException("An error occurred in HBaseSink.", cause); - } - } - - @SuppressWarnings("rawtypes") - @Override - public void invoke(T value, Context context) throws Exception { - checkErrorAndRethrow(); - - mutator.mutate(mutationConverter.convertToMutation(value)); - - // flush when the buffer number of mutations greater than the configured max size. - if (bufferFlushMaxMutations > 0 - && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) { - flush(); - } else if (bufferFlushMaxMutations == 0 && bufferFlushMaxSizeInBytes == 0) { - flush(); - } - } - - private void flush() throws IOException { - // DeduplicatedMutator is thread-safe - mutator.flush(); - numPendingRequests.set(0); - checkErrorAndRethrow(); - } - - @Override - public void close() throws Exception { - closed = true; - - if (mutator != null) { - try { - mutator.close(); - } catch (IOException e) { - LOG.warn("Exception occurs while closing HBase BufferedMutator.", e); - } - this.mutator = null; - } - - if (connection != null) { - try { - connection.close(); - } catch (IOException e) { - LOG.warn("Exception occurs while closing HBase Connection.", e); - } - this.connection = null; - } - - if (scheduledFuture != null) { - scheduledFuture.cancel(false); - if (executor != null) { - executor.shutdownNow(); - } - } - } - - @Override - public void snapshotState(FunctionSnapshotContext context) throws Exception { - while (numPendingRequests.get() != 0) { - flush(); - } - } - - @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - // nothing to do. - } - - @Override - public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator) - throws RetriesExhaustedWithDetailsException { - // fail the sink and skip the rest of the items - // if the failure handler decides to throw an exception - failureThrowable.compareAndSet(null, exception); - } - - /** - * Thread-safe class, grouped mutations by rows and keep the latest mutation. For more info, see - * HBASE-8626. - */ - private static class DeduplicatedMutator { - - private final BufferedMutator mutator; - private final Map mutations; - - DeduplicatedMutator(int size, BufferedMutator mutator) { - this.mutator = mutator; - this.mutations = new HashMap<>(size); - } - - synchronized void mutate(Mutation current) { - ByteBuffer key = ByteBuffer.wrap(current.getRow()); - Mutation old = mutations.get(key); - if (old == null || current.getTimeStamp() >= old.getTimeStamp()) { - mutations.put(key, current); - } - } - - synchronized void flush() throws IOException { - mutator.mutate(new ArrayList<>(mutations.values())); - mutator.flush(); - mutations.clear(); - } - - synchronized void close() throws IOException { - mutator.mutate(new ArrayList<>(mutations.values())); - mutator.close(); - mutations.clear(); - } - } -} diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseStateSerializer.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseStateSerializer.java new file mode 100644 index 00000000..dce79825 --- /dev/null +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseStateSerializer.java @@ -0,0 +1,30 @@ +package org.apache.flink.connector.hbase.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer; +import org.apache.flink.connector.hbase.util.HBaseMutationSerialization; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +/** Serializer class for state in HBase sink. */ +@Internal +public class HBaseStateSerializer extends AsyncSinkWriterStateSerializer { + @Override + protected void serializeRequestToStream(SerializableMutation request, DataOutputStream out) + throws IOException { + HBaseMutationSerialization.serialize(request.get(), out); + } + + @Override + protected SerializableMutation deserializeRequestFromStream( + long requestSize, DataInputStream in) throws IOException { + return new SerializableMutation(HBaseMutationSerialization.deserialize(in)); + } + + @Override + public int getVersion() { + return 1; + } +} diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriter.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriter.java new file mode 100644 index 00000000..06d90c44 --- /dev/null +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriter.java @@ -0,0 +1,249 @@ +package org.apache.flink.connector.hbase.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.ResultHandler; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy; +import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy; +import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy; +import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; +import org.apache.flink.util.StringUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.AsyncTable; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Mutation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * Sink writer created for {@link HBaseSink} to write {@link SerializableMutation} elements to + * HBase. More details can be found in the JavaDocs for {@link HBaseSink}. + * + *

More details on the internals of this sink writer can be found in the JavaDocs of {@link + * AsyncSinkWriter}. + */ +@Internal +public class HBaseWriter extends AsyncSinkWriter { + private static final Logger LOG = LoggerFactory.getLogger(HBaseWriter.class); + + private final AsyncConnection connection; + private final AsyncTable table; + private final SinkWriterMetricGroup metrics; + private final Counter numRecordsOutErrorsCounter; + private final HBaseWriterAsyncHandler hBaseWriterAsyncHandler; + + /** This can be removed once rebased to Flink 2.0. */ + public HBaseWriter( + ElementConverter elementConverter, + Sink.InitContext context, + Collection> states, + int maxBatchSize, + long maxBatchSizeInBytes, + int maxInFlightRequests, + int maxBufferedRequests, + long maxTimeInBufferMS, + long maxRecordSizeInBytes, + long requestTimeoutMS, + boolean failOnTimeout, + String tableName, + Configuration configuration) { + super( + elementConverter, + context, + AsyncSinkWriterConfiguration.builder() + .setMaxBatchSize(maxBatchSize) + .setMaxBatchSizeInBytes(maxBatchSizeInBytes) + .setMaxInFlightRequests(maxInFlightRequests) + .setMaxBufferedRequests(maxBufferedRequests) + .setMaxTimeInBufferMS(maxTimeInBufferMS) + .setMaxRecordSizeInBytes(maxRecordSizeInBytes) + .setRequestTimeoutMS(requestTimeoutMS) + .setFailOnTimeout(failOnTimeout) + .setRateLimitingStrategy( + buildRateLimitingStrategy(maxInFlightRequests, maxBatchSize)) + .build(), + states); + + this.connection = createClient(configuration); + this.table = connection.getTable(TableName.valueOf(tableName)); + this.metrics = context.metricGroup(); + this.numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter(); + this.hBaseWriterAsyncHandler = new HBaseWriterAsyncHandler(numRecordsOutErrorsCounter); + } + + public HBaseWriter( + ElementConverter elementConverter, + WriterInitContext context, + Collection> states, + int maxBatchSize, + long maxBatchSizeInBytes, + int maxInFlightRequests, + int maxBufferedRequests, + long maxTimeInBufferMS, + long maxRecordSizeInBytes, + long requestTimeoutMS, + boolean failOnTimeout, + String tableName, + Configuration configuration) { + super( + elementConverter, + context, + AsyncSinkWriterConfiguration.builder() + .setMaxBatchSize(maxBatchSize) + .setMaxBatchSizeInBytes(maxBatchSizeInBytes) + .setMaxInFlightRequests(maxInFlightRequests) + .setMaxBufferedRequests(maxBufferedRequests) + .setMaxTimeInBufferMS(maxTimeInBufferMS) + .setMaxRecordSizeInBytes(maxRecordSizeInBytes) + .setRequestTimeoutMS(requestTimeoutMS) + .setFailOnTimeout(failOnTimeout) + .setRateLimitingStrategy( + buildRateLimitingStrategy(maxInFlightRequests, maxBatchSize)) + .build(), + states); + + this.connection = createClient(configuration); + this.table = connection.getTable(TableName.valueOf(tableName)); + this.metrics = context.metricGroup(); + this.numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter(); + this.hBaseWriterAsyncHandler = new HBaseWriterAsyncHandler(numRecordsOutErrorsCounter); + } + + /** + * Submits a batch of mutation requests to HBase asynchronously. + * + *

This method performs the following operations: + * + *

    + *
  1. Deduplicates the request entries to ensure only the latest mutation per row is sent + *
  2. Extracts {@link Mutation} from {@link SerializableMutation} entries + *
  3. Submits the batch to HBase asynchronously + *
  4. Handles failures by collecting failed mutations for retry + *
+ */ + @Override + protected void submitRequestEntries( + List requestEntries, + ResultHandler resultHandler) { + // Requests have to be deduplicated to ensure correct behavior. + List requestEntriesDeduplicated = + deduplicateRequestEntries(requestEntries); + + // Convert WrappedMutations to Mutations + List mutations = + requestEntriesDeduplicated.stream() + .map(SerializableMutation::get) + .collect(Collectors.toList()); + + // Handle failed requests to retry them later. It's possible that some mutations failed + // while others did not. + List> futures = table.batch(mutations); + hBaseWriterAsyncHandler.handleWriteFutures( + futures, requestEntriesDeduplicated, resultHandler); + } + + @Override + protected long getSizeInBytes(SerializableMutation requestEntry) { + return requestEntry.get().heapSize(); + } + + @Override + public void close() { + if (!connection.isClosed()) { + try { + connection.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + /** + * Group mutations and keep the latest mutation only. Please see HBASE-8626 for more information. + * + * @param requestEntries entries to save + * @return deduplicated entries with the latest mutation only for each affected row + */ + private static List deduplicateRequestEntries( + List requestEntries) { + Map requestEntriesMap = new HashMap<>(); + for (SerializableMutation requestEntry : requestEntries) { + ByteBuffer key = ByteBuffer.wrap(requestEntry.get().getRow()); + SerializableMutation old = requestEntriesMap.get(key); + if (old == null || requestEntry.get().getTimestamp() >= old.get().getTimestamp()) { + requestEntriesMap.put(key, requestEntry); + } + } + + return new ArrayList<>(requestEntriesMap.values()); + } + + /** Builds a congestion control rate limiting strategy using AIMD algorithm. */ + private static RateLimitingStrategy buildRateLimitingStrategy( + int maxInFlightRequests, int maxBatchSize) { + return CongestionControlRateLimitingStrategy.builder() + .setMaxInFlightRequests(maxInFlightRequests) + .setInitialMaxInFlightMessages(maxBatchSize) + .setScalingStrategy( + AIMDScalingStrategy.builder(maxBatchSize * maxInFlightRequests).build()) + .build(); + } + + /** + * Creates an asynchronous HBase client connection using the provided configuration. + * + *

This method merges the runtime HBase configuration with the provided configuration and + * validates that the ZooKeeper quorum is properly configured before establishing the + * connection. + * + * @param configuration the HBase configuration to use for connection + * @return an asynchronous connection to the HBase cluster + * @throws HBaseSinkException.HBaseSinkInitException if ZooKeeper quorum is not configured or + * connection fails + */ + private static AsyncConnection createClient(Configuration configuration) + throws HBaseSinkException.HBaseSinkInitException { + Configuration runtimeConfig = HBaseConfigurationUtil.getHBaseConfiguration(); + runtimeConfig.addResource(configuration); + + if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) { + LOG.error( + "Can not connect to HBase without '{}' configuration", + HConstants.ZOOKEEPER_QUORUM); + throw new HBaseSinkException.HBaseSinkInitException( + "Can not connect to HBase without '" + + HConstants.ZOOKEEPER_QUORUM + + "' configuration"); + } + + try { + return ConnectionFactory.createAsyncConnection(runtimeConfig).get(); + } catch (InterruptedException | ExecutionException e) { + throw new HBaseSinkException.HBaseSinkInitException(e); + } + } +} diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriterAsyncHandler.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriterAsyncHandler.java new file mode 100644 index 00000000..01d32afb --- /dev/null +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriterAsyncHandler.java @@ -0,0 +1,156 @@ +package org.apache.flink.connector.hbase.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.base.sink.writer.ResultHandler; +import org.apache.flink.metrics.Counter; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.client.Mutation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.stream.Collectors; + +/** + * This class is responsible for managing the async calls to HBase and managing the {@link + * ResultHandler} to decide which request can be retried. + */ +@Internal +public class HBaseWriterAsyncHandler { + private static final Logger LOG = LoggerFactory.getLogger(HBaseWriterAsyncHandler.class); + + private final Counter numRecordsOutErrorsCounter; + + public HBaseWriterAsyncHandler(Counter numRecordsOutErrorsCounter) { + this.numRecordsOutErrorsCounter = numRecordsOutErrorsCounter; + } + + /** + * For a given list of HBase write futures, this method will asynchronously analyze their + * result, and using the provided {@link ResultHandler}, it will instruct {@link + * org.apache.flink.connector.base.sink.writer.AsyncSinkWriter} to retry some mutations. In case + * of errors which should not be retried, the Flink job will stop with an error. + * + * @param futures list of HBase write futures + * @param processedMutationsInOrder list of mutations with their indices matching that of the + * futures + * @param resultHandler result handler to manage retries and exceptions + */ + public void handleWriteFutures( + List> futures, + List processedMutationsInOrder, + ResultHandler resultHandler) { + Preconditions.checkArgument( + futures.size() == processedMutationsInOrder.size(), + "Different number of HBase futures was supplied than mutations."); + + ConcurrentLinkedQueue failedMutations = new ConcurrentLinkedQueue<>(); + + // Handle each future separately and store failures. + CompletableFuture[] handledFutures = new CompletableFuture[futures.size()]; + for (int i = 0; i < futures.size(); i++) { + final int index = i; + handledFutures[index] = + futures.get(index) + .exceptionally( + throwable -> { + failedMutations.add( + new FailedMutation( + processedMutationsInOrder.get(index), + throwable)); + return null; + }); + } + + // Exceptions are already handled here, so it's safe to use `thenRun()`. + CompletableFuture.allOf(handledFutures) + .thenRun( + () -> { + handleFailedRequests(failedMutations, resultHandler); + }); + } + + /** + * Handles mutations that failed to write to HBase. + * + *

This method increments the error counter and schedules the failed mutations for retry + * through the result handler. If the exception should not be retried, the job will fail instead + * with an exception. + * + * @param failedMutations the list of mutations that failed to write + * @param resultHandler the handler responsible for retry logic + */ + private void handleFailedRequests( + Collection failedMutations, + ResultHandler resultHandler) { + if (failedMutations.isEmpty()) { + resultHandler.complete(); + return; + } + + numRecordsOutErrorsCounter.inc(failedMutations.size()); + + for (FailedMutation failedMutation : failedMutations) { + LOG.warn("Mutation failed with exception", failedMutation.getThrowable()); + + if (isHBaseExceptionFatal(failedMutation.getThrowable())) { + resultHandler.completeExceptionally( + new HBaseSinkException.HBaseSinkMutationException( + failedMutation.getThrowable())); + return; + } + } + + resultHandler.retryForEntries( + failedMutations.stream() + .map(FailedMutation::getWrappedMutation) + .collect(Collectors.toList())); + } + + /** + * Check if HBase exception is fatal or could be retried. Also keeps a set of visited exceptions + * to make sure prevent infinite recursion. + */ + private boolean isHBaseExceptionFatal(Throwable throwable, Set visited) { + if (throwable == null || !visited.add(throwable)) { + // Null or already visited + return false; + } + + if (throwable instanceof DoNotRetryIOException) { + return true; + } + + return isHBaseExceptionFatal(throwable.getCause(), visited); + } + + private boolean isHBaseExceptionFatal(Throwable throwable) { + return isHBaseExceptionFatal(throwable, new HashSet<>()); + } + + /** Container class for a failed mutation also including the exception thrown. */ + private static final class FailedMutation { + private final SerializableMutation mutation; + private final Throwable throwable; + + private FailedMutation(SerializableMutation mutation, Throwable throwable) { + this.mutation = mutation; + this.throwable = throwable; + } + + public SerializableMutation getWrappedMutation() { + return mutation; + } + + public Throwable getThrowable() { + return throwable; + } + } +} diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationElementConverter.java similarity index 82% rename from flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java rename to flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationElementConverter.java index 5796d5c3..e80c54bd 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationConverter.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/RowDataToMutationElementConverter.java @@ -18,6 +18,9 @@ package org.apache.flink.connector.hbase.sink; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.connector.base.sink.writer.ElementConverter; import org.apache.flink.connector.hbase.sink.WritableMetadata.TimeToLiveMetadata; import org.apache.flink.connector.hbase.sink.WritableMetadata.TimestampMetadata; import org.apache.flink.connector.hbase.util.HBaseSerde; @@ -31,10 +34,10 @@ import java.util.List; /** - * An implementation of {@link HBaseMutationConverter} which converts {@link RowData} into {@link + * An implementation of {@link ElementConverter} which converts {@link RowData} into {@link * Mutation}. */ -public class RowDataToMutationConverter implements HBaseMutationConverter { +public class RowDataToMutationElementConverter implements ElementConverter { private static final long serialVersionUID = 1L; private final HBaseTableSchema schema; @@ -44,7 +47,7 @@ public class RowDataToMutationConverter implements HBaseMutationConverter metadataKeys, @@ -58,12 +61,13 @@ public RowDataToMutationConverter( } @Override - public void open() { + public void open(Sink.InitContext context) { + ElementConverter.super.open(context); this.serde = new HBaseSerde(schema, nullStringLiteral, ignoreNullValue); } @Override - public Mutation convertToMutation(RowData record) { + public Mutation apply(RowData record, SinkWriter.Context context) { Long timestamp = timestampMetadata.read(record); Long timeToLive = timeToLiveMetadata.read(record); RowKind kind = record.getRowKind(); diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/SerializableMutation.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/SerializableMutation.java new file mode 100644 index 00000000..9e8b6a41 --- /dev/null +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/SerializableMutation.java @@ -0,0 +1,27 @@ +package org.apache.flink.connector.hbase.sink; + +import org.apache.flink.annotation.Internal; + +import org.apache.hadoop.hbase.client.Mutation; + +import java.io.Serializable; + +/** + * This class is used by {@link HBaseSink} and {@link HBaseWriter} to wrap HBase {@link Mutation} + * objects to be able to serialize them. + */ +@Internal +public class SerializableMutation implements Serializable { + private static final long serialVersionUID = 1L; + + private transient Mutation mutation; + + public SerializableMutation(Mutation mutation) { + this.mutation = mutation; + } + + /** Get the wrapped mutation object. */ + public Mutation get() { + return mutation; + } +} diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WrappedElementConverter.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WrappedElementConverter.java new file mode 100644 index 00000000..49975721 --- /dev/null +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WrappedElementConverter.java @@ -0,0 +1,36 @@ +package org.apache.flink.connector.hbase.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.connector.base.sink.writer.ElementConverter; + +import org.apache.hadoop.hbase.client.Mutation; + +/** + * This is a helper class used to wrap an {@link ElementConverter} supplied by the user that + * converts the input data to {@link Mutation}. With this class, the elements will be seamlessly + * converted to internal {@link SerializableMutation} objects that can be serialized by the sink. + */ +@Internal +public class WrappedElementConverter + implements ElementConverter { + private static final long serialVersionUID = 1L; + + private final ElementConverter originalElementConverter; + + public WrappedElementConverter(ElementConverter originalElementConverter) { + this.originalElementConverter = originalElementConverter; + } + + @Override + public void open(Sink.InitContext context) { + ElementConverter.super.open(context); + originalElementConverter.open(context); + } + + @Override + public SerializableMutation apply(InputT element, SinkWriter.Context context) { + return new SerializableMutation(originalElementConverter.apply(element, context)); + } +} diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java index d760c034..9425e261 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java @@ -27,6 +27,10 @@ import java.time.Duration; +import static org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.FLUSH_BUFFER_SIZE; +import static org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.FLUSH_BUFFER_TIMEOUT; +import static org.apache.flink.connector.base.table.AsyncSinkConnectorOptions.MAX_BATCH_SIZE; + /** Options for the HBase connector. */ @PublicEvolving public class HBaseConnectorOptions { @@ -60,33 +64,63 @@ public class HBaseConnectorOptions { "Representation for null values for string fields. HBase source and " + "sink encodes/decodes empty bytes as null values for all types except string type."); + @Deprecated public static final ConfigOption SINK_BUFFER_FLUSH_MAX_SIZE = ConfigOptions.key("sink.buffer-flush.max-size") .memoryType() - .defaultValue(MemorySize.parse("2mb")) + .noDefaultValue() .withDescription( "Writing option, maximum size in memory of buffered rows for each " + "writing request. This can improve performance for writing data to HBase database, " - + "but may increase the latency. Can be set to '0' to disable it. "); + + "but may increase the latency. " + + String.format( + "This is a deprecated key and will be mapped to %s.", + FLUSH_BUFFER_SIZE.key())); + @Deprecated public static final ConfigOption SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions.key("sink.buffer-flush.max-rows") .intType() - .defaultValue(1000) + .noDefaultValue() .withDescription( "Writing option, maximum number of rows to buffer for each writing request. " + "This can improve performance for writing data to HBase database, but may increase the latency. " - + "Can be set to '0' to disable it."); + + String.format( + "This is a deprecated key and will be mapped to %s.", + MAX_BATCH_SIZE.key())); + @Deprecated public static final ConfigOption SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions.key("sink.buffer-flush.interval") .durationType() - .defaultValue(Duration.ofSeconds(1)) + .noDefaultValue() .withDescription( "Writing option, the interval to flush any buffered rows. " + "This can improve performance for writing data to HBase database, but may increase the latency. " - + "Can be set to '0' to disable it. Note, both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' " - + "can be set to '0' with the flush interval set allowing for complete async processing of buffered actions."); + + String.format( + "This is a deprecated key and will be mapped to %s.", + FLUSH_BUFFER_TIMEOUT.key())); + + public static final ConfigOption SINK_MAX_RECORD_SIZE = + ConfigOptions.key("sink.max-record-size") + .longType() + .defaultValue(1048576L) + .withDescription( + "The maximum size in bytes of a single record. Records bigger than this will cause the job to fail."); + + public static final ConfigOption SINK_REQUEST_TIMEOUT = + ConfigOptions.key("sink.request-timeout") + .durationType() + .defaultValue(Duration.ofMinutes(10)) + .withDescription( + "The maximum time to wait for a batch of HBase requests to complete before timing out."); + + public static final ConfigOption SINK_FAIL_ON_TIMEOUT = + ConfigOptions.key("sink.fail-on-timeout") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to fail the job when a request times out. If false, timed-out requests will be logged but the job will continue processing. If true, a timeout will cause the job to fail."); public static final ConfigOption SINK_IGNORE_NULL_VALUE = ConfigOptions.key("sink.ignore-null-value") diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java index 482644fd..39743e61 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java @@ -20,7 +20,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.connector.hbase.options.HBaseWriteOptions; import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil; import org.apache.flink.connector.hbase.util.HBaseTableSchema; import org.apache.flink.table.types.DataType; @@ -31,11 +30,6 @@ import java.util.Map; import java.util.Properties; -import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL; -import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS; -import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE; -import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_IGNORE_NULL_VALUE; -import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_QUORUM; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_ZNODE_PARENT; @@ -83,18 +77,6 @@ public static void validatePrimaryKey(DataType dataType, int[] primaryKeyIndexes } } - public static HBaseWriteOptions getHBaseWriteOptions(ReadableConfig tableOptions) { - HBaseWriteOptions.Builder builder = HBaseWriteOptions.builder(); - builder.setBufferFlushIntervalMillis( - tableOptions.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis()); - builder.setBufferFlushMaxRows(tableOptions.get(SINK_BUFFER_FLUSH_MAX_ROWS)); - builder.setBufferFlushMaxSizeInBytes( - tableOptions.get(SINK_BUFFER_FLUSH_MAX_SIZE).getBytes()); - builder.setIgnoreNullValue(tableOptions.get(SINK_IGNORE_NULL_VALUE)); - builder.setParallelism(tableOptions.getOptional(SINK_PARALLELISM).orElse(null)); - return builder.build(); - } - /** config HBase Configuration. */ public static Configuration getHBaseConfiguration(ReadableConfig tableOptions) { // create default configuration from current runtime env (`hbase-site.xml` in classpath) diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseConfigurationUtil.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseConfigurationUtil.java index b6679bdc..15710a19 100644 --- a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseConfigurationUtil.java +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseConfigurationUtil.java @@ -40,8 +40,6 @@ public class HBaseConfigurationUtil { private static final Logger LOG = LoggerFactory.getLogger(HBaseConfigurationUtil.class); - public static final String ENV_HBASE_CONF_DIR = "HBASE_CONF_DIR"; - public static Configuration getHBaseConfiguration() { // Instantiate an HBaseConfiguration to load the hbase-default.xml and hbase-site.xml from @@ -177,39 +175,4 @@ private static void deserializeWritable(T writable, byte[] DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream); writable.readFields(dataInputStream); } - - public static org.apache.hadoop.conf.Configuration createHBaseConf() { - org.apache.hadoop.conf.Configuration hbaseClientConf = HBaseConfiguration.create(); - - String hbaseConfDir = System.getenv(ENV_HBASE_CONF_DIR); - - if (hbaseConfDir != null) { - if (new File(hbaseConfDir).exists()) { - String coreSite = hbaseConfDir + "/core-site.xml"; - String hdfsSite = hbaseConfDir + "/hdfs-site.xml"; - String hbaseSite = hbaseConfDir + "/hbase-site.xml"; - if (new File(coreSite).exists()) { - hbaseClientConf.addResource(new org.apache.hadoop.fs.Path(coreSite)); - LOG.info("Adding " + coreSite + " to hbase configuration"); - } - if (new File(hdfsSite).exists()) { - hbaseClientConf.addResource(new org.apache.hadoop.fs.Path(hdfsSite)); - LOG.info("Adding " + hdfsSite + " to hbase configuration"); - } - if (new File(hbaseSite).exists()) { - hbaseClientConf.addResource(new org.apache.hadoop.fs.Path(hbaseSite)); - LOG.info("Adding " + hbaseSite + " to hbase configuration"); - } - } else { - LOG.warn( - "HBase config directory '{}' not found, cannot load HBase configuration.", - hbaseConfDir); - } - } else { - LOG.warn( - "{} env variable not found, cannot load HBase configuration.", - ENV_HBASE_CONF_DIR); - } - return hbaseClientConf; - } } diff --git a/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseMutationSerialization.java b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseMutationSerialization.java new file mode 100644 index 00000000..d297a9cb --- /dev/null +++ b/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseMutationSerialization.java @@ -0,0 +1,45 @@ +package org.apache.flink.connector.hbase.util; + +import org.apache.flink.annotation.Internal; + +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * Internal utility class for serializing and deserializing HBase mutations. + * + *

This class provides methods to convert HBase {@link Mutation} objects to and from their + * Protocol Buffer representations for transmission over the wire or storage. It supports the + * following HBase mutation types: {@link Put} and {@link Delete}. + */ +@Internal +public class HBaseMutationSerialization { + public static void serialize(Mutation mutation, OutputStream out) throws IOException { + ClientProtos.MutationProto.MutationType type; + if (mutation instanceof Put) { + type = ClientProtos.MutationProto.MutationType.PUT; + } else if (mutation instanceof Delete) { + type = ClientProtos.MutationProto.MutationType.DELETE; + } else { + throw new IllegalArgumentException( + String.format( + "Unknown HBase mutation type, cannot serialize: %s", + mutation.getClass())); + } + + ClientProtos.MutationProto proto = ProtobufUtil.toMutation(type, mutation); + proto.writeDelimitedTo(out); + } + + public static Mutation deserialize(InputStream in) throws IOException { + ClientProtos.MutationProto proto = ClientProtos.MutationProto.parseDelimitedFrom(in); + return ProtobufUtil.toMutation(proto); + } +} diff --git a/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/sink/HBaseWriterAsyncHandlerTest.java b/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/sink/HBaseWriterAsyncHandlerTest.java new file mode 100644 index 00000000..1f750f69 --- /dev/null +++ b/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/sink/HBaseWriterAsyncHandlerTest.java @@ -0,0 +1,195 @@ +package org.apache.flink.connector.hbase.sink; + +import org.apache.flink.connector.base.sink.writer.ResultHandler; +import org.apache.flink.metrics.Counter; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link HBaseWriterAsyncHandler}. */ +class HBaseWriterAsyncHandlerTest { + + private final Counter counter = new TestCounter(); + private final TestResultHandler resultHandler = new TestResultHandler(); + private final HBaseWriterAsyncHandler handler = new HBaseWriterAsyncHandler(counter); + + @Test + void testHBaseWriteAsyncHandlerEmpty() { + handler.handleWriteFutures(Collections.emptyList(), Collections.emptyList(), resultHandler); + + assertThat(counter.getCount()).isEqualTo(0); + assertThat(resultHandler.getComplete()).isTrue(); + } + + @Test + void testHBaseWriteAsyncHandlerSuccessful() { + List mutations = + IntStream.range(0, 500) + .mapToObj(__ -> generateMutation()) + .collect(Collectors.toList()); + List> futures = + mutations.stream() + .map(m -> new CompletableFuture()) + .collect(Collectors.toList()); + + handler.handleWriteFutures(futures, mutations, resultHandler); + futures.forEach(f -> f.complete(null)); + + assertThat(counter.getCount()).isEqualTo(0); + assertThat(resultHandler.getComplete()).isTrue(); + } + + /** Half the mutations will throw an exception that can be retried. */ + @Test + void testHBaseWriteAsyncHandlerException() { + List allMutations = + IntStream.range(0, 500) + .mapToObj(__ -> generateMutation()) + .collect(Collectors.toList()); + List> futures = + IntStream.range(0, 500) + .mapToObj(__ -> new CompletableFuture()) + .collect(Collectors.toList()); + + handler.handleWriteFutures(futures, allMutations, resultHandler); + + List failedMutations = new ArrayList<>(); + for (int i = 0; i < 500; i++) { + if (i % 2 == 0) { + futures.get(i).complete(null); + } else { + futures.get(i) + .completeExceptionally( + new HBaseSinkException.HBaseSinkMutationException( + new RuntimeException("test"))); + failedMutations.add(allMutations.get(i)); + } + } + + assertThat(resultHandler.getEntriesRetried()).hasSameElementsAs(failedMutations); + assertThat(counter.getCount()).isEqualTo(250); + } + + /** Exactly one mutation will throw an exception that cannot be retried, the job should fail. */ + @Test + void testHBaseWriteAsyncHandlerUnrecoverableException() { + List mutations = + IntStream.range(0, 500) + .mapToObj(__ -> generateMutation()) + .collect(Collectors.toList()); + List> futures = + IntStream.range(0, 500) + .mapToObj(__ -> new CompletableFuture()) + .collect(Collectors.toList()); + + handler.handleWriteFutures(futures, mutations, resultHandler); + for (int i = 0; i < futures.size(); i++) { + if (i == 250) { + futures.get(i) + .completeExceptionally( + new HBaseSinkException.HBaseSinkMutationException( + new DoNotRetryIOException("test"))); + } else { + futures.get(i).complete(null); + } + } + + assertThat(counter.getCount()).isEqualTo(1); + assertThat(resultHandler.getException()) + .isExactlyInstanceOf(HBaseSinkException.HBaseSinkMutationException.class) + .hasRootCauseExactlyInstanceOf(DoNotRetryIOException.class) + .hasMessage( + "Exception while trying to persist records in HBase sink, not retrying."); + } + + private SerializableMutation generateMutation() { + return new SerializableMutation(new Put(Bytes.toBytes(UUID.randomUUID().toString()))); + } + + /** Test class to verify usage of {@link ResultHandler}. */ + static final class TestResultHandler implements ResultHandler { + private Boolean complete = false; + private Exception exception = null; + private List entriesRetried = null; + + @Override + public void complete() { + complete = true; + exception = null; + entriesRetried = null; + } + + @Override + public void completeExceptionally(Exception e) { + exception = e; + complete = false; + entriesRetried = null; + } + + @Override + public void retryForEntries(List requestEntriesToRetry) { + entriesRetried = requestEntriesToRetry; + complete = false; + exception = null; + } + + public Boolean getComplete() { + return complete; + } + + public Exception getException() { + return exception; + } + + public List getEntriesRetried() { + return entriesRetried; + } + } + + /** Test class to verify metrics. */ + static final class TestCounter implements Counter { + private long countValue; + + public TestCounter() { + this.countValue = 0; + } + + @Override + public void inc() { + countValue++; + } + + @Override + public void inc(long n) { + countValue += n; + } + + @Override + public void dec() { + countValue--; + } + + @Override + public void dec(long n) { + countValue -= n; + } + + @Override + public long getCount() { + return countValue; + } + } +} diff --git a/flink-connector-hbase-e2e-tests/src/test/resources/hbase_e2e.sql b/flink-connector-hbase-e2e-tests/src/test/resources/hbase_e2e.sql index 2c1c9d05..0385509b 100644 --- a/flink-connector-hbase-e2e-tests/src/test/resources/hbase_e2e.sql +++ b/flink-connector-hbase-e2e-tests/src/test/resources/hbase_e2e.sql @@ -32,8 +32,8 @@ CREATE TABLE MyHBaseSink ( 'connector' = '$HBASE_CONNECTOR', 'table-name' = 'sink', 'zookeeper.quorum' = 'hbase:2181', - 'sink.buffer-flush.max-rows' = '1', - 'sink.buffer-flush.interval' = '2s' + 'sink.batch.max-size' = '1', + 'sink.flush-buffer.timeout' = '2000' ); INSERT INTO MyHBaseSink diff --git a/flink-sql-connector-hbase-2.6/pom.xml b/flink-sql-connector-hbase-2.6/pom.xml index b3e96ac5..350c3dd3 100644 --- a/flink-sql-connector-hbase-2.6/pom.xml +++ b/flink-sql-connector-hbase-2.6/pom.xml @@ -83,6 +83,7 @@ under the License. org.apache.commons:commons-crypto org.apache.commons:commons-lang3 io.dropwizard.metrics:metrics-core + commons-io:commons-io org.apache.hbase:hbase-metrics* diff --git a/pom.xml b/pom.xml index 5bac97c0..f393ecf7 100644 --- a/pom.xml +++ b/pom.xml @@ -159,6 +159,12 @@ under the License. ${flink.version} + + org.apache.flink + flink-connector-base + ${flink.version} + + org.apache.flink flink-streaming-java From 2c9473bceb9b42fdcebbe5dd48b59d71fafbde94 Mon Sep 17 00:00:00 2001 From: mateczagany Date: Thu, 11 Sep 2025 09:53:40 +0200 Subject: [PATCH 2/2] [FLINK-35280] Add commons-io to NOTICE file --- flink-sql-connector-hbase-2.6/src/main/resources/META-INF/NOTICE | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-sql-connector-hbase-2.6/src/main/resources/META-INF/NOTICE b/flink-sql-connector-hbase-2.6/src/main/resources/META-INF/NOTICE index 3c079c5f..8b437be6 100644 --- a/flink-sql-connector-hbase-2.6/src/main/resources/META-INF/NOTICE +++ b/flink-sql-connector-hbase-2.6/src/main/resources/META-INF/NOTICE @@ -6,6 +6,7 @@ The Apache Software Foundation (http://www.apache.org/). This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) + - commons-io:commons-io:2.11.0 - commons-codec:commons-codec:1.15 - io.dropwizard.metrics:metrics-core:3.2.6 - org.apache.commons:commons-crypto:1.1.0