diff --git a/CHANGELOG.md b/CHANGELOG.md index 8aaf31b..4b432a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## [Unreleased] +- allow format options to be applied to the http response decoding. + ## [0.24.0] - 2025-11-26 - Add UNABLE_TO_DESERIALIZE_RESPONSE http-completion-state. If you have used diff --git a/README.md b/README.md index a6f4d11..e3a57e2 100644 --- a/README.md +++ b/README.md @@ -153,9 +153,12 @@ POST, PUT and GET operations. This query creator allows you to issue json reques your own custom http connector. The mappings from columns to the json request are supplied in the query creator configuration parameters `gid.connector.http.request.query-param-fields`, `gid.connector.http.request.body-fields` and `gid.connector.http.request.url-map`. +### Format considerations + +#### For http requests In order to use custom format, user has to specify option `'lookup-request.format' = 'customFormatName'`, where `customFormatName` is the identifier of custom format factory. -Additionally, it is possible to pass query format options from table's DDL. +Additionally, it is possible to pass custom query format options from table's DDL. This can be done by using option like so: `'lookup-request.format.customFormatName.customFormatProperty' = 'propertyValue'`, for example `'lookup-request.format.customFormatName.fail-on-missing-field' = 'true'`. @@ -166,6 +169,14 @@ DynamicTableFactory.Context context, ReadableConfig formatOptions)` method in `R With default configuration, Flink-Json format is used for `GenericGetQueryCreator`, all options defined in [json-format](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/) can be passed through table DDL. For example `'lookup-request.format.json.fail-on-missing-field' = 'true'`. In this case, format identifier is `json`. +#### For http responses +Specify your format options at the top level. For example: +```roomsql + 'format' = 'json', + 'json.ignore-parse-errors' = 'true', +``` + + #### Timeouts Lookup Source is guarded by two timeout timers. First one is specified by Flink's AsyncIO operator that executes `AsyncTableFunction`. The default value of this timer is set to 3 minutes and can be changed via `table.exec.async-lookup.timeout` [option](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-exec-async-lookup-timeout). diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java index 7320a25..28f69b4 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java @@ -53,20 +53,26 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext) FactoryUtil.createTableFactoryHelper(this, dynamicTableContext); ReadableConfig readable = helper.getOptions(); - helper.validateExcept( - // properties coming from org.apache.flink.table.api.config.ExecutionConfigOptions - "table.", - HttpConnectorConfigConstants.GID_CONNECTOR_HTTP, - LOOKUP_REQUEST_FORMAT.key() - ); - validateHttpLookupSourceOptions(readable); + // Discover and validate the decoding format first - this validates format-specific options DecodingFormat> decodingFormat = helper.discoverDecodingFormat( DeserializationFormatFactory.class, FactoryUtil.FORMAT ); + // Validate connector options, excluding: + // - "table.*" (Flink execution config options) + // - "gid.connector.http.*" (dynamic connector-specific properties) + // - LOOKUP_REQUEST_FORMAT (custom lookup format option) + // Format options are already validated by discoverDecodingFormat() above + helper.validateExcept( + "table.", + HttpConnectorConfigConstants.GID_CONNECTOR_HTTP, + LOOKUP_REQUEST_FORMAT.key() + ); + validateHttpLookupSourceOptions(readable); + HttpLookupConfig lookupConfig = getHttpLookupOptions(dynamicTableContext, readable); ResolvedSchema resolvedSchema = dynamicTableContext.getCatalogTable().getResolvedSchema(); diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java index 9def454..2ae684c 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java @@ -81,7 +81,7 @@ public Collection lookup(RowData keyRow) { HttpRowDataWrapper httpRowDataWrapper = client.pull(keyRow); Collection httpCollector = httpRowDataWrapper.getData(); - int physicalArity=-1; + int physicalArity = -1; GenericRowData producedRow = null; if (httpRowDataWrapper.shouldIgnore()) { @@ -103,10 +103,15 @@ public Collection lookup(RowData keyRow) { } } // if we did not get the physical arity from the http response physical row then get it from the - // producedDataType. which is set when we have metadata - if (physicalArity == -1 && producedDataType != null ) { - List childrenLogicalTypes=producedDataType.getLogicalType().getChildren(); - physicalArity=childrenLogicalTypes.size()-metadataArity; + // producedDataType. which is set when we have metadata or when there's no data + if (physicalArity == -1) { + if (producedDataType != null) { + List childrenLogicalTypes = producedDataType.getLogicalType().getChildren(); + physicalArity = childrenLogicalTypes.size() - metadataArity; + } else { + // If producedDataType is null and we have no data, return the same way as ignore. + return Collections.emptyList(); + } } // if there was no data, create an empty producedRow if (producedRow == null) { diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java index 9725b9b..64ca4ab 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java @@ -993,7 +993,11 @@ private void assertResultsForSpec(TestSpec spec, Collection rows) { if (spec.badStatus) { assertEnrichedRowsNoDataBadStatus(rows); } else if (spec.deserError) { - assertEnrichedRowsDeserException(rows); + if (spec.ignoreParseErrors) { + assertEnrichedRowsNoDataGoodStatus(rows); + } else { + assertEnrichedRowsDeserException(rows); + } } else if (spec.connectionError) { assertEnrichedRowsException(rows); } else if (spec.useMetadata) { @@ -1063,6 +1067,33 @@ private void assertEnrichedRowsNoDataBadStatus(Collection collectedRows ) { ); } + private void assertEnrichedRowsNoDataGoodStatus(Collection collectedRows ) { + + final int rowArity = 10; + // validate every row and its column. + + assertAll(() -> { + assertThat(collectedRows.size()).isEqualTo(4); + int intElement = 0; + for (Row row : collectedRows) { + intElement++; + assertThat(row.getArity()).isEqualTo(rowArity); + // "id" and "id2" columns should be different for every row. + assertThat(row.getField("id")).isEqualTo(String.valueOf(intElement)); + assertThat(row.getField("id2")).isEqualTo(String.valueOf(intElement + 1)); + assertThat(row.getField("uuid")).isNull(); + assertThat(row.getField("isActive")).isNull(); + assertThat(row.getField("balance")).isNull(); + // metadata + assertThat(row.getField("errStr")).isNull(); + assertThat(row.getField("headers")).isNotNull(); + assertThat(row.getField("statusCode")).isEqualTo(200); + assertEquals(row.getField("completionState"), HttpCompletionState.SUCCESS.name()); + } + } + ); + } + private void assertEnrichedRowsDeserException(Collection collectedRows ) { final int rowArity = 10; @@ -1181,7 +1212,8 @@ private void setUpServerBodyStub( private void setUpServerBodyStub( String methodName, WireMockServer wireMockServer, - List matchingJsonPaths, boolean isDeserErr) { + List matchingJsonPaths, + boolean isDeserErr) { setUpServerBodyStub(methodName, wireMockServer, matchingJsonPaths, null, null, null, isDeserErr); } @@ -1235,7 +1267,6 @@ private void setUpServerBodyStub( wireMockServer.addStubMapping(stubMapping); } - // Prototype parameterizedTest @ParameterizedTest @MethodSource("testSpecProvider") void testHttpLookupJoinParameterized(TestSpec spec) throws Exception { @@ -1316,16 +1347,19 @@ static Collection testSpecProvider() { for (String method : Arrays.asList("GET", "POST", "PUT")) { for (boolean asyncFlag : Arrays.asList(false, true)) { for (boolean continueOnError : Arrays.asList(false, true)) { - specs.add(TestSpec.builder() - .testName("HTTP Lookup Join With Metadata Deserialization Error") - .methodName(method) - .useMetadata(true) - .maxRows(4) - .useAsync(asyncFlag) - .deserError(true) - .continueOnError(continueOnError) - .build() - ); + for (boolean ignoreParseErrors : Arrays.asList(false, true)) { + specs.add(TestSpec.builder() + .testName("HTTP Lookup Join With Metadata Deserialization Error") + .methodName(method) + .useMetadata(true) + .maxRows(4) + .useAsync(asyncFlag) + .deserError(true) + .ignoreParseErrors(ignoreParseErrors) + .continueOnError(continueOnError) + .build() + ); + } } } } @@ -1367,6 +1401,7 @@ private static class TestSpec { final int maxRows; final boolean useAsync; final boolean continueOnError; + final boolean ignoreParseErrors; @Override public String toString() { @@ -1447,7 +1482,9 @@ private String createLookupTableSql(TestSpec spec) { sql.append(") WITH (") .append("'format' = 'json',") .append("'connector' = 'rest-lookup',"); - + if (spec.ignoreParseErrors) { + sql.append("'json.ignore-parse-errors' = 'true',"); + } if (!StringUtils.isNullOrWhitespaceOnly(spec.methodName)) { sql.append("'lookup-method' = '").append(spec.methodName).append("',"); }