Skip to content

Commit 9848246

Browse files
committed
[FLINK-35280] Migrate HBase Sink to use Async Sink API
1 parent 97b6947 commit 9848246

File tree

29 files changed

+1872
-797
lines changed

29 files changed

+1872
-797
lines changed

docs/content/docs/connectors/table/hbase.md

Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -170,37 +170,78 @@ Connector Options
170170
<td>Representation for null values for string fields. HBase source and sink encodes/decodes empty bytes as null values for all types except string type.</td>
171171
</tr>
172172
<tr>
173-
<td><h5>sink.buffer-flush.max-size</h5></td>
173+
<td><h5>sink.flush-buffer.size</h5></td>
174174
<td>optional</td>
175175
<td>yes</td>
176-
<td style="word-wrap: break-word;">2mb</td>
177-
<td>MemorySize</td>
176+
<td style="word-wrap: break-word;">2097152</td>
177+
<td>Long</td>
178178
<td>Writing option, maximum size in memory of buffered rows for each writing request.
179179
This can improve performance for writing data to HBase database, but may increase the latency.
180-
Can be set to <code>'0'</code> to disable it.
181180
</td>
182181
</tr>
183182
<tr>
184-
<td><h5>sink.buffer-flush.max-rows</h5></td>
183+
<td><h5>sink.batch.max-size</h5></td>
185184
<td>optional</td>
186185
<td>yes</td>
187186
<td style="word-wrap: break-word;">1000</td>
188187
<td>Integer</td>
189188
<td>Writing option, maximum number of rows to buffer for each writing request.
190189
This can improve performance for writing data to HBase database, but may increase the latency.
191-
Can be set to <code>'0'</code> to disable it.
192190
</td>
193191
</tr>
194192
<tr>
195-
<td><h5>sink.buffer-flush.interval</h5></td>
193+
<td><h5>sink.flush-buffer.timeout</h5></td>
196194
<td>optional</td>
197195
<td>yes</td>
198-
<td style="word-wrap: break-word;">1s</td>
199-
<td>Duration</td>
200-
<td>Writing option, the interval to flush any buffered rows.
196+
<td style="word-wrap: break-word;">1000</td>
197+
<td>Long</td>
198+
<td>Writing option, the threshold time in milliseconds for an element to be in the buffer before flushing out.
201199
This can improve performance for writing data to HBase database, but may increase the latency.
202-
Can be set to <code>'0'</code> to disable it. Note, both <code>'sink.buffer-flush.max-size'</code> and <code>'sink.buffer-flush.max-rows'</code>
203-
can be set to <code>'0'</code> with the flush interval set allowing for complete async processing of buffered actions.
200+
</td>
201+
</tr>
202+
<tr>
203+
<td><h5>sink.requests.max-inflight</h5></td>
204+
<td>optional</td>
205+
<td>yes</td>
206+
<td style="word-wrap: break-word;">1000</td>
207+
<td>Integer</td>
208+
<td>Request threshold for uncompleted requests before blocking new write requests.
209+
</td>
210+
</tr>
211+
<tr>
212+
<td><h5>sink.requests.max-buffered</h5></td>
213+
<td>optional</td>
214+
<td>yes</td>
215+
<td style="word-wrap: break-word;">1000</td>
216+
<td>Integer</td>
217+
<td>Maximum number of buffered records before applying backpressure.
218+
</td>
219+
</tr>
220+
<tr>
221+
<td><h5>sink.request-timeout</h5></td>
222+
<td>optional</td>
223+
<td>yes</td>
224+
<td style="word-wrap: break-word;">10 min</td>
225+
<td>Duration</td>
226+
<td>The maximum time to wait for a batch of HBase requests to complete before timing out.
227+
</td>
228+
</tr>
229+
<tr>
230+
<td><h5>sink.fail-on-timeout</h5></td>
231+
<td>optional</td>
232+
<td>yes</td>
233+
<td style="word-wrap: break-word;">false</td>
234+
<td>Boolean</td>
235+
<td>Whether to fail the job when a request times out. If false, timed-out requests will be logged but the job will continue processing. If true, a timeout will cause the job to fail.
236+
</td>
237+
</tr>
238+
<tr>
239+
<td><h5>sink.max-record-size</h5></td>
240+
<td>optional</td>
241+
<td>yes</td>
242+
<td style="word-wrap: break-word;">1048576</td>
243+
<td>Long</td>
244+
<td>The maximum size in bytes of a single record. Records bigger than this will cause the job to fail.
204245
</td>
205246
</tr>
206247
<tr>

flink-connector-hbase-2.6/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ under the License.
4040
<scope>provided</scope>
4141
</dependency>
4242

43+
<dependency>
44+
<groupId>org.apache.flink</groupId>
45+
<artifactId>flink-connector-base</artifactId>
46+
<scope>provided</scope>
47+
</dependency>
48+
4349
<dependency>
4450
<groupId>org.apache.flink</groupId>
4551
<artifactId>flink-streaming-java</artifactId>

flink-connector-hbase-2.6/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java

Lines changed: 91 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.configuration.ConfigOption;
2323
import org.apache.flink.configuration.ReadableConfig;
24-
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
24+
import org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory;
25+
import org.apache.flink.connector.base.table.AsyncSinkConnectorOptions;
26+
import org.apache.flink.connector.base.table.sink.options.AsyncSinkConfigurationValidator;
2527
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
2628
import org.apache.flink.connector.hbase2.sink.HBaseDynamicTableSink;
2729
import org.apache.flink.connector.hbase2.source.HBaseDynamicTableSource;
@@ -30,14 +32,15 @@
3032
import org.apache.flink.table.connector.source.lookup.LookupOptions;
3133
import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
3234
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
33-
import org.apache.flink.table.factories.DynamicTableSinkFactory;
3435
import org.apache.flink.table.factories.DynamicTableSourceFactory;
3536
import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
3637

3738
import org.apache.hadoop.conf.Configuration;
3839

3940
import java.time.Duration;
4041
import java.util.HashSet;
42+
import java.util.Optional;
43+
import java.util.Properties;
4144
import java.util.Set;
4245
import java.util.stream.Collectors;
4346
import java.util.stream.Stream;
@@ -50,21 +53,23 @@
5053
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
5154
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
5255
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE;
56+
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_FAIL_ON_TIMEOUT;
5357
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_IGNORE_NULL_VALUE;
58+
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_MAX_RECORD_SIZE;
5459
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM;
60+
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_REQUEST_TIMEOUT;
5561
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.TABLE_NAME;
5662
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_QUORUM;
5763
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_ZNODE_PARENT;
5864
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.PROPERTIES_PREFIX;
5965
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseConfiguration;
60-
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseWriteOptions;
6166
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.validatePrimaryKey;
6267
import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
6368

6469
/** HBase connector factory. */
6570
@Internal
66-
public class HBase2DynamicTableFactory
67-
implements DynamicTableSourceFactory, DynamicTableSinkFactory {
71+
public class HBase2DynamicTableFactory extends AsyncDynamicTableSinkFactory
72+
implements DynamicTableSourceFactory {
6873

6974
private static final String IDENTIFIER = "hbase-2.6";
7075

@@ -116,21 +121,29 @@ public DynamicTableSink createDynamicTableSink(Context context) {
116121
TableFactoryHelper helper = createTableFactoryHelper(this, context);
117122
helper.validateExcept(PROPERTIES_PREFIX);
118123

119-
final ReadableConfig tableOptions = helper.getOptions();
124+
final ReadableConfig config = helper.getOptions();
120125

121126
validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes());
122127

123-
String tableName = tableOptions.get(TABLE_NAME);
124-
Configuration hbaseConf = getHBaseConfiguration(tableOptions);
125-
HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions);
126-
String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
127-
128-
return new HBaseDynamicTableSink(
129-
tableName,
130-
context.getPhysicalRowDataType(),
131-
hbaseConf,
132-
hBaseWriteOptions,
133-
nullStringLiteral);
128+
HBaseDynamicTableSink.HBaseDynamicSinkBuilder builder =
129+
HBaseDynamicTableSink.builder()
130+
.setRequestTimeoutMS(config.get(SINK_REQUEST_TIMEOUT).toMillis())
131+
.setMaxRecordSizeInBytes(config.get(SINK_MAX_RECORD_SIZE))
132+
.setFailOnTimeout(config.get(SINK_FAIL_ON_TIMEOUT))
133+
.setTableName(config.get(TABLE_NAME))
134+
.setConfiguration(getHBaseConfiguration(config))
135+
.setNullStringLiteral(config.get(NULL_STRING_LITERAL))
136+
.setPhysicalDataType(context.getPhysicalRowDataType())
137+
.setParallelism(config.get(SINK_PARALLELISM))
138+
.setIgnoreNullValue(config.get(SINK_IGNORE_NULL_VALUE));
139+
140+
AsyncSinkConfigurationValidator asyncValidator =
141+
new AsyncSinkConfigurationValidator(config);
142+
143+
addAsyncOptionsToBuilder(getDeprecatedAsyncSinkOptions(config), builder);
144+
addAsyncOptionsToBuilder(asyncValidator.getValidatedConfigurations(), builder);
145+
146+
return builder.build();
134147
}
135148

136149
@Override
@@ -147,42 +160,77 @@ public Set<ConfigOption<?>> requiredOptions() {
147160

148161
@Override
149162
public Set<ConfigOption<?>> optionalOptions() {
150-
Set<ConfigOption<?>> set = new HashSet<>();
151-
set.add(ZOOKEEPER_ZNODE_PARENT);
152-
set.add(ZOOKEEPER_QUORUM);
153-
set.add(NULL_STRING_LITERAL);
154-
set.add(SINK_BUFFER_FLUSH_MAX_SIZE);
155-
set.add(SINK_BUFFER_FLUSH_MAX_ROWS);
156-
set.add(SINK_BUFFER_FLUSH_INTERVAL);
157-
set.add(SINK_PARALLELISM);
158-
set.add(SINK_IGNORE_NULL_VALUE);
159-
set.add(LOOKUP_ASYNC);
160-
set.add(LOOKUP_CACHE_MAX_ROWS);
161-
set.add(LOOKUP_CACHE_TTL);
162-
set.add(LOOKUP_MAX_RETRIES);
163-
set.add(LookupOptions.CACHE_TYPE);
164-
set.add(LookupOptions.MAX_RETRIES);
165-
set.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS);
166-
set.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE);
167-
set.add(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY);
168-
set.add(LookupOptions.PARTIAL_CACHE_MAX_ROWS);
169-
return set;
163+
Stream<ConfigOption<?>> hbaseOptions =
164+
Stream.of(
165+
ZOOKEEPER_ZNODE_PARENT,
166+
ZOOKEEPER_QUORUM,
167+
NULL_STRING_LITERAL,
168+
SINK_BUFFER_FLUSH_MAX_SIZE,
169+
SINK_BUFFER_FLUSH_MAX_ROWS,
170+
SINK_BUFFER_FLUSH_INTERVAL,
171+
SINK_PARALLELISM,
172+
SINK_IGNORE_NULL_VALUE,
173+
SINK_MAX_RECORD_SIZE,
174+
SINK_REQUEST_TIMEOUT,
175+
SINK_FAIL_ON_TIMEOUT,
176+
LOOKUP_ASYNC,
177+
LOOKUP_CACHE_MAX_ROWS,
178+
LOOKUP_CACHE_TTL,
179+
LOOKUP_MAX_RETRIES,
180+
LookupOptions.CACHE_TYPE,
181+
LookupOptions.MAX_RETRIES,
182+
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,
183+
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
184+
LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY,
185+
LookupOptions.PARTIAL_CACHE_MAX_ROWS);
186+
Stream<ConfigOption<?>> asyncOptions = super.optionalOptions().stream();
187+
188+
return Stream.concat(hbaseOptions, asyncOptions).collect(Collectors.toSet());
170189
}
171190

172191
@Override
173192
public Set<ConfigOption<?>> forwardOptions() {
174-
return Stream.of(
193+
Stream<ConfigOption<?>> hbaseOptions =
194+
Stream.of(
175195
TABLE_NAME,
176196
ZOOKEEPER_ZNODE_PARENT,
177197
ZOOKEEPER_QUORUM,
178198
NULL_STRING_LITERAL,
179-
LOOKUP_CACHE_MAX_ROWS,
180-
LOOKUP_CACHE_TTL,
181-
LOOKUP_MAX_RETRIES,
182199
SINK_BUFFER_FLUSH_MAX_SIZE,
183200
SINK_BUFFER_FLUSH_MAX_ROWS,
184201
SINK_BUFFER_FLUSH_INTERVAL,
185-
SINK_IGNORE_NULL_VALUE)
186-
.collect(Collectors.toSet());
202+
SINK_IGNORE_NULL_VALUE,
203+
SINK_MAX_RECORD_SIZE,
204+
SINK_REQUEST_TIMEOUT,
205+
SINK_FAIL_ON_TIMEOUT,
206+
LOOKUP_CACHE_MAX_ROWS,
207+
LOOKUP_CACHE_TTL,
208+
LOOKUP_MAX_RETRIES);
209+
Stream<ConfigOption<?>> asyncOptions = super.optionalOptions().stream();
210+
211+
return Stream.concat(hbaseOptions, asyncOptions).collect(Collectors.toSet());
212+
}
213+
214+
private Properties getDeprecatedAsyncSinkOptions(ReadableConfig config) {
215+
Properties properties = new Properties();
216+
Optional.ofNullable(config.get(SINK_BUFFER_FLUSH_MAX_SIZE))
217+
.ifPresent(
218+
flushBufferSize ->
219+
properties.put(
220+
AsyncSinkConnectorOptions.FLUSH_BUFFER_SIZE.key(),
221+
flushBufferSize.getBytes()));
222+
Optional.ofNullable(config.get(SINK_BUFFER_FLUSH_MAX_ROWS))
223+
.ifPresent(
224+
maxBatchSize ->
225+
properties.put(
226+
AsyncSinkConnectorOptions.MAX_BATCH_SIZE.key(),
227+
maxBatchSize));
228+
Optional.ofNullable(config.get(SINK_BUFFER_FLUSH_INTERVAL))
229+
.ifPresent(
230+
timeout ->
231+
properties.put(
232+
AsyncSinkConnectorOptions.FLUSH_BUFFER_TIMEOUT.key(),
233+
timeout.toMillis()));
234+
return properties;
187235
}
188236
}

0 commit comments

Comments
 (0)