Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## [Unreleased]

- allow format options to be applied to the http response decoding.

## [0.24.0] - 2025-11-26

- Add UNABLE_TO_DESERIALIZE_RESPONSE http-completion-state. If you have used
Expand Down
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,12 @@ POST, PUT and GET operations. This query creator allows you to issue json reques
your own custom http connector. The mappings from columns to the json request are supplied in the query creator configuration
parameters `gid.connector.http.request.query-param-fields`, `gid.connector.http.request.body-fields` and `gid.connector.http.request.url-map`.

### Format considerations

#### For http requests
In order to use custom format, user has to specify option `'lookup-request.format' = 'customFormatName'`, where `customFormatName` is the identifier of custom format factory.

Additionally, it is possible to pass query format options from table's DDL.
Additionally, it is possible to pass custom query format options from table's DDL.
This can be done by using option like so: `'lookup-request.format.customFormatName.customFormatProperty' = 'propertyValue'`, for example
`'lookup-request.format.customFormatName.fail-on-missing-field' = 'true'`.

Expand All @@ -166,6 +169,14 @@ DynamicTableFactory.Context context, ReadableConfig formatOptions)` method in `R
With default configuration, Flink-Json format is used for `GenericGetQueryCreator`, all options defined in [json-format](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/)
can be passed through table DDL. For example `'lookup-request.format.json.fail-on-missing-field' = 'true'`. In this case, format identifier is `json`.

#### For http responses
Specify your format options at the top level. For example:
```roomsql
'format' = 'json',
'json.ignore-parse-errors' = 'true',
```


#### Timeouts
Lookup Source is guarded by two timeout timers. First one is specified by Flink's AsyncIO operator that executes `AsyncTableFunction`.
The default value of this timer is set to 3 minutes and can be changed via `table.exec.async-lookup.timeout` [option](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-exec-async-lookup-timeout).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,26 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
FactoryUtil.createTableFactoryHelper(this, dynamicTableContext);

ReadableConfig readable = helper.getOptions();
helper.validateExcept(
// properties coming from org.apache.flink.table.api.config.ExecutionConfigOptions
"table.",
HttpConnectorConfigConstants.GID_CONNECTOR_HTTP,
LOOKUP_REQUEST_FORMAT.key()
);
validateHttpLookupSourceOptions(readable);

// Discover and validate the decoding format first - this validates format-specific options
DecodingFormat<DeserializationSchema<RowData>> decodingFormat =
helper.discoverDecodingFormat(
DeserializationFormatFactory.class,
FactoryUtil.FORMAT
);

// Validate connector options, excluding:
// - "table.*" (Flink execution config options)
// - "gid.connector.http.*" (dynamic connector-specific properties)
// - LOOKUP_REQUEST_FORMAT (custom lookup format option)
// Format options are already validated by discoverDecodingFormat() above
helper.validateExcept(
"table.",
HttpConnectorConfigConstants.GID_CONNECTOR_HTTP,
LOOKUP_REQUEST_FORMAT.key()
);
validateHttpLookupSourceOptions(readable);

HttpLookupConfig lookupConfig = getHttpLookupOptions(dynamicTableContext, readable);

ResolvedSchema resolvedSchema = dynamicTableContext.getCatalogTable().getResolvedSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public Collection<RowData> lookup(RowData keyRow) {
HttpRowDataWrapper httpRowDataWrapper = client.pull(keyRow);
Collection<RowData> httpCollector = httpRowDataWrapper.getData();

int physicalArity=-1;
int physicalArity = -1;

GenericRowData producedRow = null;
if (httpRowDataWrapper.shouldIgnore()) {
Expand All @@ -103,10 +103,15 @@ public Collection<RowData> lookup(RowData keyRow) {
}
}
// if we did not get the physical arity from the http response physical row then get it from the
// producedDataType. which is set when we have metadata
if (physicalArity == -1 && producedDataType != null ) {
List<LogicalType> childrenLogicalTypes=producedDataType.getLogicalType().getChildren();
physicalArity=childrenLogicalTypes.size()-metadataArity;
// producedDataType. which is set when we have metadata or when there's no data
if (physicalArity == -1) {
if (producedDataType != null) {
List<LogicalType> childrenLogicalTypes = producedDataType.getLogicalType().getChildren();
physicalArity = childrenLogicalTypes.size() - metadataArity;
} else {
// If producedDataType is null and we have no data, return the same way as ignore.
return Collections.emptyList();
}
}
// if there was no data, create an empty producedRow
if (producedRow == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,11 @@ private void assertResultsForSpec(TestSpec spec, Collection<Row> rows) {
if (spec.badStatus) {
assertEnrichedRowsNoDataBadStatus(rows);
} else if (spec.deserError) {
assertEnrichedRowsDeserException(rows);
if (spec.ignoreParseErrors) {
assertEnrichedRowsNoDataGoodStatus(rows);
} else {
assertEnrichedRowsDeserException(rows);
}
} else if (spec.connectionError) {
assertEnrichedRowsException(rows);
} else if (spec.useMetadata) {
Expand Down Expand Up @@ -1063,6 +1067,33 @@ private void assertEnrichedRowsNoDataBadStatus(Collection<Row> collectedRows ) {
);
}

private void assertEnrichedRowsNoDataGoodStatus(Collection<Row> collectedRows ) {

final int rowArity = 10;
// validate every row and its column.

assertAll(() -> {
assertThat(collectedRows.size()).isEqualTo(4);
int intElement = 0;
for (Row row : collectedRows) {
intElement++;
assertThat(row.getArity()).isEqualTo(rowArity);
// "id" and "id2" columns should be different for every row.
assertThat(row.getField("id")).isEqualTo(String.valueOf(intElement));
assertThat(row.getField("id2")).isEqualTo(String.valueOf(intElement + 1));
assertThat(row.getField("uuid")).isNull();
assertThat(row.getField("isActive")).isNull();
assertThat(row.getField("balance")).isNull();
// metadata
assertThat(row.getField("errStr")).isNull();
assertThat(row.getField("headers")).isNotNull();
assertThat(row.getField("statusCode")).isEqualTo(200);
assertEquals(row.getField("completionState"), HttpCompletionState.SUCCESS.name());
}
}
);
}

private void assertEnrichedRowsDeserException(Collection<Row> collectedRows ) {

final int rowArity = 10;
Expand Down Expand Up @@ -1181,7 +1212,8 @@ private void setUpServerBodyStub(
private void setUpServerBodyStub(
String methodName,
WireMockServer wireMockServer,
List<StringValuePattern> matchingJsonPaths, boolean isDeserErr) {
List<StringValuePattern> matchingJsonPaths,
boolean isDeserErr) {
setUpServerBodyStub(methodName, wireMockServer, matchingJsonPaths, null, null, null, isDeserErr);
}

Expand Down Expand Up @@ -1235,7 +1267,6 @@ private void setUpServerBodyStub(
wireMockServer.addStubMapping(stubMapping);
}

// Prototype parameterizedTest
@ParameterizedTest
@MethodSource("testSpecProvider")
void testHttpLookupJoinParameterized(TestSpec spec) throws Exception {
Expand Down Expand Up @@ -1316,16 +1347,19 @@ static Collection<TestSpec> testSpecProvider() {
for (String method : Arrays.asList("GET", "POST", "PUT")) {
for (boolean asyncFlag : Arrays.asList(false, true)) {
for (boolean continueOnError : Arrays.asList(false, true)) {
specs.add(TestSpec.builder()
.testName("HTTP Lookup Join With Metadata Deserialization Error")
.methodName(method)
.useMetadata(true)
.maxRows(4)
.useAsync(asyncFlag)
.deserError(true)
.continueOnError(continueOnError)
.build()
);
for (boolean ignoreParseErrors : Arrays.asList(false, true)) {
specs.add(TestSpec.builder()
.testName("HTTP Lookup Join With Metadata Deserialization Error")
.methodName(method)
.useMetadata(true)
.maxRows(4)
.useAsync(asyncFlag)
.deserError(true)
.ignoreParseErrors(ignoreParseErrors)
.continueOnError(continueOnError)
.build()
);
}
}
}
}
Expand Down Expand Up @@ -1367,6 +1401,7 @@ private static class TestSpec {
final int maxRows;
final boolean useAsync;
final boolean continueOnError;
final boolean ignoreParseErrors;

@Override
public String toString() {
Expand Down Expand Up @@ -1447,7 +1482,9 @@ private String createLookupTableSql(TestSpec spec) {
sql.append(") WITH (")
.append("'format' = 'json',")
.append("'connector' = 'rest-lookup',");

if (spec.ignoreParseErrors) {
sql.append("'json.ignore-parse-errors' = 'true',");
}
if (!StringUtils.isNullOrWhitespaceOnly(spec.methodName)) {
sql.append("'lookup-method' = '").append(spec.methodName).append("',");
}
Expand Down
Loading