Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 53 additions & 12 deletions docs/content/docs/connectors/table/hbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,37 +170,78 @@ Connector Options
<td>Representation for null values for string fields. HBase source and sink encodes/decodes empty bytes as null values for all types except string type.</td>
</tr>
<tr>
<td><h5>sink.buffer-flush.max-size</h5></td>
<td><h5>sink.flush-buffer.size</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">2mb</td>
<td>MemorySize</td>
<td style="word-wrap: break-word;">2097152</td>
<td>Long</td>
<td>Writing option, maximum size in memory of buffered rows for each writing request.
This can improve performance for writing data to HBase database, but may increase the latency.
Can be set to <code>'0'</code> to disable it.
</td>
</tr>
<tr>
<td><h5>sink.buffer-flush.max-rows</h5></td>
<td><h5>sink.batch.max-size</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">1000</td>
<td>Integer</td>
<td>Writing option, maximum number of rows to buffer for each writing request.
This can improve performance for writing data to HBase database, but may increase the latency.
Can be set to <code>'0'</code> to disable it.
</td>
</tr>
<tr>
<td><h5>sink.buffer-flush.interval</h5></td>
<td><h5>sink.flush-buffer.timeout</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">1s</td>
<td>Duration</td>
<td>Writing option, the interval to flush any buffered rows.
<td style="word-wrap: break-word;">1000</td>
<td>Long</td>
<td>Writing option, the threshold time in milliseconds for an element to be in the buffer before flushing out.
This can improve performance for writing data to HBase database, but may increase the latency.
Can be set to <code>'0'</code> to disable it. Note, both <code>'sink.buffer-flush.max-size'</code> and <code>'sink.buffer-flush.max-rows'</code>
can be set to <code>'0'</code> with the flush interval set allowing for complete async processing of buffered actions.
</td>
</tr>
<tr>
<td><h5>sink.requests.max-inflight</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">1000</td>
<td>Integer</td>
<td>Request threshold for uncompleted requests before blocking new write requests.
</td>
</tr>
<tr>
<td><h5>sink.requests.max-buffered</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">1000</td>
<td>Integer</td>
<td>Maximum number of buffered records before applying backpressure.
</td>
</tr>
<tr>
<td><h5>sink.request-timeout</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">10 min</td>
<td>Duration</td>
<td>The maximum time to wait for a batch of HBase requests to complete before timing out.
</td>
</tr>
<tr>
<td><h5>sink.fail-on-timeout</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to fail the job when a request times out. If false, timed-out requests will be logged but the job will continue processing. If true, a timeout will cause the job to fail.
</td>
</tr>
<tr>
<td><h5>sink.max-record-size</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">1048576</td>
<td>Long</td>
<td>The maximum size in bytes of a single record. Records bigger than this will cause the job to fail.
</td>
</tr>
<tr>
Expand Down
6 changes: 6 additions & 0 deletions flink-connector-hbase-2.6/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
import org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory;
import org.apache.flink.connector.base.table.AsyncSinkConnectorOptions;
import org.apache.flink.connector.base.table.sink.options.AsyncSinkConfigurationValidator;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.connector.hbase2.sink.HBaseDynamicTableSink;
import org.apache.flink.connector.hbase2.source.HBaseDynamicTableSource;
Expand All @@ -30,14 +32,15 @@
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;

import org.apache.hadoop.conf.Configuration;

import java.time.Duration;
import java.util.HashSet;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -50,21 +53,23 @@
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_FAIL_ON_TIMEOUT;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_IGNORE_NULL_VALUE;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_MAX_RECORD_SIZE;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_REQUEST_TIMEOUT;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.TABLE_NAME;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_QUORUM;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_ZNODE_PARENT;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.PROPERTIES_PREFIX;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseConfiguration;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseWriteOptions;
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.validatePrimaryKey;
import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;

/** HBase connector factory. */
@Internal
public class HBase2DynamicTableFactory
implements DynamicTableSourceFactory, DynamicTableSinkFactory {
public class HBase2DynamicTableFactory extends AsyncDynamicTableSinkFactory
implements DynamicTableSourceFactory {

private static final String IDENTIFIER = "hbase-2.6";

Expand Down Expand Up @@ -116,21 +121,29 @@ public DynamicTableSink createDynamicTableSink(Context context) {
TableFactoryHelper helper = createTableFactoryHelper(this, context);
helper.validateExcept(PROPERTIES_PREFIX);

final ReadableConfig tableOptions = helper.getOptions();
final ReadableConfig config = helper.getOptions();

validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes());

String tableName = tableOptions.get(TABLE_NAME);
Configuration hbaseConf = getHBaseConfiguration(tableOptions);
HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions);
String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);

return new HBaseDynamicTableSink(
tableName,
context.getPhysicalRowDataType(),
hbaseConf,
hBaseWriteOptions,
nullStringLiteral);
HBaseDynamicTableSink.HBaseDynamicSinkBuilder builder =
HBaseDynamicTableSink.builder()
.setRequestTimeoutMS(config.get(SINK_REQUEST_TIMEOUT).toMillis())
.setMaxRecordSizeInBytes(config.get(SINK_MAX_RECORD_SIZE))
.setFailOnTimeout(config.get(SINK_FAIL_ON_TIMEOUT))
.setTableName(config.get(TABLE_NAME))
.setConfiguration(getHBaseConfiguration(config))
.setNullStringLiteral(config.get(NULL_STRING_LITERAL))
.setPhysicalDataType(context.getPhysicalRowDataType())
.setParallelism(config.get(SINK_PARALLELISM))
.setIgnoreNullValue(config.get(SINK_IGNORE_NULL_VALUE));

AsyncSinkConfigurationValidator asyncValidator =
new AsyncSinkConfigurationValidator(config);

addAsyncOptionsToBuilder(getDeprecatedAsyncSinkOptions(config), builder);
addAsyncOptionsToBuilder(asyncValidator.getValidatedConfigurations(), builder);

return builder.build();
}

@Override
Expand All @@ -147,42 +160,77 @@ public Set<ConfigOption<?>> requiredOptions() {

@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> set = new HashSet<>();
set.add(ZOOKEEPER_ZNODE_PARENT);
set.add(ZOOKEEPER_QUORUM);
set.add(NULL_STRING_LITERAL);
set.add(SINK_BUFFER_FLUSH_MAX_SIZE);
set.add(SINK_BUFFER_FLUSH_MAX_ROWS);
set.add(SINK_BUFFER_FLUSH_INTERVAL);
set.add(SINK_PARALLELISM);
set.add(SINK_IGNORE_NULL_VALUE);
set.add(LOOKUP_ASYNC);
set.add(LOOKUP_CACHE_MAX_ROWS);
set.add(LOOKUP_CACHE_TTL);
set.add(LOOKUP_MAX_RETRIES);
set.add(LookupOptions.CACHE_TYPE);
set.add(LookupOptions.MAX_RETRIES);
set.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS);
set.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE);
set.add(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY);
set.add(LookupOptions.PARTIAL_CACHE_MAX_ROWS);
return set;
Stream<ConfigOption<?>> hbaseOptions =
Stream.of(
ZOOKEEPER_ZNODE_PARENT,
ZOOKEEPER_QUORUM,
NULL_STRING_LITERAL,
SINK_BUFFER_FLUSH_MAX_SIZE,
SINK_BUFFER_FLUSH_MAX_ROWS,
SINK_BUFFER_FLUSH_INTERVAL,
SINK_PARALLELISM,
SINK_IGNORE_NULL_VALUE,
SINK_MAX_RECORD_SIZE,
SINK_REQUEST_TIMEOUT,
SINK_FAIL_ON_TIMEOUT,
LOOKUP_ASYNC,
LOOKUP_CACHE_MAX_ROWS,
LOOKUP_CACHE_TTL,
LOOKUP_MAX_RETRIES,
LookupOptions.CACHE_TYPE,
LookupOptions.MAX_RETRIES,
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY,
LookupOptions.PARTIAL_CACHE_MAX_ROWS);
Stream<ConfigOption<?>> asyncOptions = super.optionalOptions().stream();

return Stream.concat(hbaseOptions, asyncOptions).collect(Collectors.toSet());
}

@Override
public Set<ConfigOption<?>> forwardOptions() {
return Stream.of(
Stream<ConfigOption<?>> hbaseOptions =
Stream.of(
TABLE_NAME,
ZOOKEEPER_ZNODE_PARENT,
ZOOKEEPER_QUORUM,
NULL_STRING_LITERAL,
LOOKUP_CACHE_MAX_ROWS,
LOOKUP_CACHE_TTL,
LOOKUP_MAX_RETRIES,
SINK_BUFFER_FLUSH_MAX_SIZE,
SINK_BUFFER_FLUSH_MAX_ROWS,
SINK_BUFFER_FLUSH_INTERVAL,
SINK_IGNORE_NULL_VALUE)
.collect(Collectors.toSet());
SINK_IGNORE_NULL_VALUE,
SINK_MAX_RECORD_SIZE,
SINK_REQUEST_TIMEOUT,
SINK_FAIL_ON_TIMEOUT,
LOOKUP_CACHE_MAX_ROWS,
LOOKUP_CACHE_TTL,
LOOKUP_MAX_RETRIES);
Stream<ConfigOption<?>> asyncOptions = super.optionalOptions().stream();

return Stream.concat(hbaseOptions, asyncOptions).collect(Collectors.toSet());
}

private Properties getDeprecatedAsyncSinkOptions(ReadableConfig config) {
Properties properties = new Properties();
Optional.ofNullable(config.get(SINK_BUFFER_FLUSH_MAX_SIZE))
.ifPresent(
flushBufferSize ->
properties.put(
AsyncSinkConnectorOptions.FLUSH_BUFFER_SIZE.key(),
flushBufferSize.getBytes()));
Optional.ofNullable(config.get(SINK_BUFFER_FLUSH_MAX_ROWS))
.ifPresent(
maxBatchSize ->
properties.put(
AsyncSinkConnectorOptions.MAX_BATCH_SIZE.key(),
maxBatchSize));
Optional.ofNullable(config.get(SINK_BUFFER_FLUSH_INTERVAL))
.ifPresent(
timeout ->
properties.put(
AsyncSinkConnectorOptions.FLUSH_BUFFER_TIMEOUT.key(),
timeout.toMillis()));
return properties;
}
}
Loading
Loading