diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c96d15..5e113ec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,8 +2,8 @@ ## [Unreleased] +- Ability to specify http versions for http lookups. - Amend to not log HTTP request response and header values by default. -- Added http 2 support. ## [0.22.0] - 2025-10-03 diff --git a/README.md b/README.md index e4bfe9e..53f74a5 100644 --- a/README.md +++ b/README.md @@ -9,10 +9,8 @@ The HTTP TableLookup connector that allows for pulling data from external system Please use [releases](https://github.com/getindata/flink-http-connector/releases) instead of the `main` branch in order to get a stable set of binaries. The goal for HTTP TableLookup connector was to use it in Flink SQL statement as a standard table that can be later joined with other stream using pure SQL Flink. - -Currently, HTTP source connector supports only Lookup Joins (TableLookup) [1] in Table/SQL API. -`HttpSink` supports both Streaming API (when using [HttpSink](src/main/java/com/getindata/connectors/http/internal/sink/HttpSink.java) built using [HttpSinkBuilder](src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkBuilder.java)) and the Table API (using connector created in [HttpDynamicTableSinkFactory](src/main/java/com/getindata/connectors/http/internal/table/HttpDynamicTableSinkFactory.java)). -Note that the connector will work with both http 1.1 and http 2 endpoints. + +`HttpSink` supports both Streaming API (when using [HttpSink](src/main/java/com/getindata/connectors/http/internal/sink/HttpSink.java) built using [HttpSinkBuilder](src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkBuilder.java)) and the Table API (using connector created in [HttpDynamicTableSinkFactory](src/main/java/com/getindata/connectors/http/internal/table/HttpDynamicTableSinkFactory.java)). ## Updating the connector In case of updating http-connector please see [Breaking changes](#breaking-changes) section. @@ -581,6 +579,7 @@ be requested if the current time is later than the cached token expiry time minu | gid.connector.http.security.oidc.token.endpoint.url | optional | OIDC `Token Endpoint` url, to which the token request will be issued | | gid.connector.http.security.oidc.token.expiry.reduction | optional | OIDC tokens will be requested if the current time is later than the cached token expiry time minus this value. | | gid.connector.http.source.lookup.request.timeout | optional | Sets HTTP request timeout in seconds. If not specified, the default value of 30 seconds will be used. | +| gid.connector.http.source.lookup.http-version | optional | Version of HTTP to use for lookup http requests. The valid values are HTTP_1_1 and HTTP_2, which specify HTTP 1.1 or 2 respectively. This option may be required as HTTP_1_1, if the endpoint is HTTP 1.1, because some http 1.1 endpoints reject HTTP Version 2 calls, with 'Invalid HTTP request received' and 'HTTP/2 upgrade not supported'. | | gid.connector.http.source.lookup.request.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 8 threads will be used. | | gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. | | gid.connector.http.source.lookup.use-raw-authorization-header | optional | If set to `'true'`, uses the raw value set for the `Authorization` header, without transformation for Basic Authentication (base64, addition of "Basic " prefix). If not specified, defaults to `'false'`. | diff --git a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java index c6e436a..67f4f01 100644 --- a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java +++ b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java @@ -59,6 +59,9 @@ public final class HttpConnectorConfigConstants { public static final String SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER = SOURCE_LOOKUP_PREFIX + "query-creator"; + public static final String SOURCE_LOOKUP_QUERY_HTTP_VERSION = + SOURCE_LOOKUP_PREFIX + "http-version"; + // -------------- HTTPS security settings -------------- public static final String ALLOW_SELF_SIGNED = GID_CONNECTOR_HTTP + "security.cert.server.allowSelfSigned"; diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/BatchRequestSubmitter.java b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/BatchRequestSubmitter.java index 1287d44..19852f6 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/BatchRequestSubmitter.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/BatchRequestSubmitter.java @@ -124,7 +124,7 @@ private HttpRequest buildHttpRequest(List reqeustBatch, UR Builder requestBuilder = java.net.http.HttpRequest .newBuilder() .uri(endpointUri) - .version(Version.HTTP_2) + .version(Version.HTTP_1_1) // consider allowing users to specify http 2 to override .timeout(Duration.ofSeconds(httpRequestTimeOutSeconds)) .method(method, publisher); diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/PerRequestSubmitter.java b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/PerRequestSubmitter.java index 684f2b2..7fd23ed 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/PerRequestSubmitter.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/PerRequestSubmitter.java @@ -62,7 +62,7 @@ private HttpRequest buildHttpRequest(HttpSinkRequestEntry requestEntry, URI endp Builder requestBuilder = java.net.http.HttpRequest .newBuilder() .uri(endpointUri) - .version(Version.HTTP_2) + .version(Version.HTTP_1_1) // consider allowing users to specify http 2 to override .timeout(Duration.ofSeconds(httpRequestTimeOutSeconds)) .method(requestEntry.method, BodyPublishers.ofByteArray(requestEntry.element)); diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactory.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactory.java index b005931..ae36be8 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactory.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactory.java @@ -5,7 +5,6 @@ import java.net.http.HttpRequest; import java.net.http.HttpRequest.BodyPublishers; import java.net.http.HttpRequest.Builder; -import java.time.Duration; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; @@ -42,10 +41,11 @@ public BodyBasedRequestFactory( */ @Override protected Builder setUpRequestMethod(LookupQueryInfo lookupQueryInfo) { - return HttpRequest.newBuilder() + HttpRequest.Builder builder = super.setUpRequestMethod(lookupQueryInfo); + builder .uri(constructUri(lookupQueryInfo)) - .method(methodName, BodyPublishers.ofString(lookupQueryInfo.getLookupQuery())) - .timeout(Duration.ofSeconds(this.httpRequestTimeOutSeconds)); + .method(methodName, BodyPublishers.ofString(lookupQueryInfo.getLookupQuery())); + return builder; } @Override diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/GetRequestFactory.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/GetRequestFactory.java index d5f2811..eee6c20 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/GetRequestFactory.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/GetRequestFactory.java @@ -4,7 +4,6 @@ import java.net.URISyntaxException; import java.net.http.HttpRequest; import java.net.http.HttpRequest.Builder; -import java.time.Duration; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; @@ -48,10 +47,11 @@ protected Logger getLogger() { */ @Override protected Builder setUpRequestMethod(LookupQueryInfo lookupQueryInfo) { - return HttpRequest.newBuilder() + HttpRequest.Builder builder = super.setUpRequestMethod(lookupQueryInfo); + builder .uri(constructGetUri(lookupQueryInfo)) - .GET() - .timeout(Duration.ofSeconds(this.httpRequestTimeOutSeconds)); + .GET(); + return builder; } URI constructGetUri(LookupQueryInfo lookupQueryInfo) { diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java index 3cf44e8..c352488 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java @@ -39,6 +39,17 @@ public class HttpLookupConnectorOptions { .stringType() .noDefaultValue(); + public static final ConfigOption LOOKUP_HTTP_VERSION = + ConfigOptions.key(SOURCE_LOOKUP_QUERY_HTTP_VERSION) + .stringType() + .noDefaultValue() + .withDescription("Version of HTTP to use for lookup HTTP requests. " + + "The valid values are HTTP_1_1 and HTTP_2, which specify HTTP 1.1 or 2" + + " respectively. This option may be required as HTTP_1_1, if the" + + " endpoint is http 1.1, because some http 1.1 endpoints reject HTTP" + + " Version 2 calls, with 'Invalid HTTP request received' and " + + " 'HTTP/2 upgrade not supported'."); + public static final ConfigOption LOOKUP_REQUEST_FORMAT = ConfigOptions.key("lookup-request.format") .stringType() diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/RequestFactoryBase.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/RequestFactoryBase.java index e7e3f79..cfbad01 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/RequestFactoryBase.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/RequestFactoryBase.java @@ -1,7 +1,9 @@ package com.getindata.connectors.http.internal.table.lookup; +import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpRequest.Builder; +import java.time.Duration; import java.util.Arrays; import java.util.Map; @@ -38,6 +40,7 @@ public abstract class RequestFactoryBase implements HttpRequestFactory { */ private final String[] headersAndValues; private final HttpLookupConfig options; + final HttpClient.Version httpVersion; public RequestFactoryBase( LookupQueryCreator lookupQueryCreator, @@ -65,6 +68,12 @@ public RequestFactoryBase( DEFAULT_REQUEST_TIMEOUT_SECONDS ) ); + String httpVersionFromConfig = options.getReadableConfig().get(HttpLookupConnectorOptions.LOOKUP_HTTP_VERSION); + if (httpVersionFromConfig == null) { + httpVersion = null; + } else { + httpVersion = HttpClient.Version.valueOf(httpVersionFromConfig); + } } @Override @@ -88,7 +97,14 @@ public HttpLookupSourceRequestEntry buildLookupRequest(RowData lookupRow) { * @param lookupQuery lookup query used for request query parameters or body. * @return {@link HttpRequest.Builder} for given lookupQuery. */ - protected abstract Builder setUpRequestMethod(LookupQueryInfo lookupQuery); + protected Builder setUpRequestMethod(LookupQueryInfo lookupQuery) { + HttpRequest.Builder builder = HttpRequest.newBuilder() + .timeout(Duration.ofSeconds(this.httpRequestTimeOutSeconds)); + if (httpVersion !=null) { + builder.version(httpVersion); + } + return builder; + } protected static StringBuilder resolvePathParameters(LookupQueryInfo lookupQueryInfo, StringBuilder resolvedUrl) { diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactoryTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactoryTest.java index 756af45..f65b94c 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactoryTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactoryTest.java @@ -1,33 +1,56 @@ package com.getindata.connectors.http.internal.table.lookup; + import java.net.URI; +import java.net.http.HttpClient; import java.util.Collection; +import java.util.HashSet; import java.util.Map; +import java.util.Set; +import org.apache.flink.configuration.Configuration; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import org.testcontainers.shaded.com.google.common.collect.ImmutableList; import static org.assertj.core.api.Assertions.assertThat; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.LOOKUP_HTTP_VERSION; + public class BodyBasedRequestFactoryTest { @ParameterizedTest @MethodSource("configProvider") void testconstructUri(TestSpec testSpec) throws Exception { - LookupQueryInfo lookupQueryInfo = new LookupQueryInfo(testSpec.url, + Set configs = new HashSet(); + + Configuration configuration= new Configuration(); + Configuration configuration_http11 = new Configuration(); + Configuration configuration_http2 = new Configuration(); + + configuration_http2.setString(LOOKUP_HTTP_VERSION, String.valueOf(HttpClient.Version.HTTP_2)); + configuration_http11.setString(LOOKUP_HTTP_VERSION, String.valueOf(HttpClient.Version.HTTP_1_1)); + + configs.add(configuration); + configs.add(configuration_http11); + configs.add(configuration_http2); + + for(Configuration config: configs) { + LookupQueryInfo lookupQueryInfo = new LookupQueryInfo(testSpec.url, testSpec.bodyBasedUrlQueryParams, testSpec.pathBasedUrlParams); - HttpLookupConfig httpLookupConfig = HttpLookupConfig.builder() + HttpLookupConfig httpLookupConfig = HttpLookupConfig.builder() .lookupMethod(testSpec.lookupMethod) .url(testSpec.url) .useAsync(false) + .readableConfig(config) .build(); - BodyBasedRequestFactory bodyBasedRequestFactory = + BodyBasedRequestFactory bodyBasedRequestFactory = new BodyBasedRequestFactory("test", null, null, httpLookupConfig); - URI uri = bodyBasedRequestFactory.constructUri(lookupQueryInfo); - assertThat(uri.toString()).isEqualTo(testSpec.expected); + URI uri = bodyBasedRequestFactory.constructUri(lookupQueryInfo); + assertThat(uri.toString()).isEqualTo(testSpec.expected); + } } private static class TestSpec { diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/Slf4JHttpLookupPostRequestCallbackTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/Slf4JHttpLookupPostRequestCallbackTest.java new file mode 100644 index 0000000..6522fc6 --- /dev/null +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/Slf4JHttpLookupPostRequestCallbackTest.java @@ -0,0 +1,21 @@ +package com.getindata.connectors.http.internal.table.lookup; + +import java.net.URI; +import java.net.URISyntaxException; +import java.net.http.HttpRequest; + +import org.junit.jupiter.api.Test; + +class Slf4JHttpLookupPostRequestCallbackTest { + @Test + public void testNullResponseDoesNotError() throws URISyntaxException { + HttpRequest httpRequest = HttpRequest.newBuilder() + .method("GET", HttpRequest.BodyPublishers.ofString("foo")) + .uri(new URI("http://testing123")).build(); + HttpLookupSourceRequestEntry requestEntry = + new HttpLookupSourceRequestEntry(httpRequest, new LookupQueryInfo("")); + Slf4JHttpLookupPostRequestCallback slf4JHttpLookupPostRequestCallback = + new Slf4JHttpLookupPostRequestCallback(); + slf4JHttpLookupPostRequestCallback.call(null, requestEntry, "aaa", null); + } +}