diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c96d151..8c4165ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## [Unreleased] +- Added HTTP Sink request retries with delivery guarantee support (`sink.delivery-guarantee`). +- Added AIMD rate limiting strategy for HTTP Sink backpressure management. +- Added new HTTP Sink configuration options: `gid.connector.http.sink.success-codes`, `gid.connector.http.sink.retry-codes`, and `gid.connector.http.sink.ignored-response-codes`. - Amend to not log HTTP request response and header values by default. - Added http 2 support. diff --git a/README.md b/README.md index e4bfe9ef..0961be34 100644 --- a/README.md +++ b/README.md @@ -451,17 +451,25 @@ is provided. ## HTTP status code handler ### Sink table -You can configure a list of HTTP status codes that should be treated as errors for HTTP sink table. -By default all 400 and 500 response codes will be interpreted as error code. - -This behavior can be changed by using below properties in table definition (DDL) or passing it via `setProperty' method from Sink's builder. The property name are: -- `gid.connector.http.sink.error.code` used to defined HTTP status code value that should be treated as error for example 404. -Many status codes can be defined in one value, where each code should be separated with comma, for example: -`401, 402, 403`. User can use this property also to define a type code mask. In that case, all codes from given HTTP response type will be treated as errors. -An example of such a mask would be `3XX, 4XX, 5XX`. In this case, all 300s, 400s and 500s status codes will be treated as errors. -- `gid.connector.http.sink.error.code.exclude` used to exclude a HTTP code from error list. - Many status codes can be defined in one value, where each code should be separated with comma, for example: - `401, 402, 403`. In this example, codes 401, 402 and 403 would not be interpreted as error codes. +You can configure HTTP status code handling for HTTP sink table and enable automatic retries with delivery guarantees. + +#### Retries and delivery guarantee +HTTP Sink supports automatic retries when `sink.delivery-guarantee` is set to `at-least-once`. Failed requests will be automatically retried based on the configured status codes. +- When `sink.delivery-guarantee` is `at-least-once`: Failed requests are retried automatically using AIMD (Additive Increase Multiplicative Decrease) rate limiting strategy. +- When `sink.delivery-guarantee` is `none` (default): Failed requests are logged but not retried. + +The sink categorizes HTTP responses into groups: +- Success codes (`gid.connector.http.sink.success-codes`): Expected successful responses. +- Retry codes (`gid.connector.http.sink.retry-codes`): Transient errors that trigger automatic retries when using `at-least-once` delivery guarantee. +- Ignored responses (`gid.connector.http.sink.ignored-response-codes`): Responses whose content is ignored but treated as successful. +- Error codes: Any response code not classified in the above groups. + +Parameters support whitelisting and blacklisting: `2XX,404,!203` means all codes from 200-299, plus 404, except 203. + +#### Legacy error code configuration +For backward compatibility, you can use the legacy properties: +- `gid.connector.http.sink.error.code` - HTTP status codes treated as errors (supports masks like `3XX, 4XX, 5XX`). +- `gid.connector.http.sink.error.code.exclude` - HTTP codes to exclude from the error list. ### Source table The source table categorizes HTTP responses into three groups based on status codes: @@ -612,6 +620,7 @@ be requested if the current time is later than the cached token expiry time minu | url | required | The base URL that should be use for HTTP requests. For example _http://localhost:8080/client_. | | insert-method | optional | Specify which HTTP method to use in the request. The value should be set either to `POST` or `PUT`. | | sink.batch.max-size | optional | Maximum number of elements that may be passed in a batch to be written downstream. | +| sink.delivery-guarantee | optional | Defines the delivery semantic for the HTTP sink. Accepted enumerations are 'at-least-once', and 'none' (actually 'none' is the same as 'at-most-once'. 'exactly-once' semantic is not supported. | | sink.requests.max-inflight | optional | The maximum number of in flight requests that may exist, if any more in flight requests need to be initiated once the maximum has been reached, then it will be blocked until some have completed. | | sink.requests.max-buffered | optional | Maximum number of buffered records before applying backpressure. | | sink.flush-buffer.size | optional | The maximum size of a batch of entries that may be sent to the HTTP endpoint measured in bytes. | @@ -619,6 +628,9 @@ be requested if the current time is later than the cached token expiry time minu | gid.connector.http.sink.request-callback | optional | Specify which `HttpPostRequestCallback` implementation to use. By default, it is set to `slf4j-logger` corresponding to `Slf4jHttpPostRequestCallback`. | | gid.connector.http.sink.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Sink, separated with comma. | | gid.connector.http.sink.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.sink.error.code` list, separated with comma. | +| gid.connector.http.sink.success-codes | optional | Comma separated http codes considered as success response. Use [1-5]XX for groups and '!' character for excluding. | +| gid.connector.http.sink.retry-codes | optional | Comma separated http codes considered as transient errors that will trigger retries. Use [1-5]XX for groups and '!' character for excluding. Only used when `sink.delivery-guarantee` is set to `at-least-once`. | +| gid.connector.http.sink.ignored-response-codes | optional | Comma separated http codes. Content for these responses will be ignored. Use [1-5]XX for groups and '!' character for excluding. | | gid.connector.http.security.cert.server | optional | Path to trusted HTTP server certificate that should be add to connectors key store. More than one path can be specified using `,` as path delimiter. | | gid.connector.http.security.cert.client | optional | Path to trusted certificate that should be used by connector's HTTP client for mTLS communication. | | gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. | @@ -736,9 +748,6 @@ The mapping from Http Json Response to SQL table schema is done via Flink's Json ### HTTP TableLookup Source - Check other `//TODO`'s. -### HTTP Sink -- Make `HttpSink` retry the failed requests. Currently, it does not retry those at all, only adds their count to the `numRecordsSendErrors` metric. It should be thoroughly thought over how to do it efficiently and then implemented. - ### [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/queries/joins/#lookup-join
diff --git a/src/main/java/com/getindata/connectors/http/BatchHttpStatusCodeValidationFailedException.java b/src/main/java/com/getindata/connectors/http/BatchHttpStatusCodeValidationFailedException.java new file mode 100644 index 00000000..840e4014 --- /dev/null +++ b/src/main/java/com/getindata/connectors/http/BatchHttpStatusCodeValidationFailedException.java @@ -0,0 +1,17 @@ +package com.getindata.connectors.http; + +import java.util.List; + +import lombok.Getter; + +import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest; + +@Getter +public class BatchHttpStatusCodeValidationFailedException extends Exception { + List failedRequests; + + public BatchHttpStatusCodeValidationFailedException(String message, List failedRequests) { + super(message); + this.failedRequests = failedRequests; + } +} diff --git a/src/main/java/com/getindata/connectors/http/HttpSink.java b/src/main/java/com/getindata/connectors/http/HttpSink.java index 23faf100..8dc42b5a 100644 --- a/src/main/java/com/getindata/connectors/http/HttpSink.java +++ b/src/main/java/com/getindata/connectors/http/HttpSink.java @@ -3,6 +3,7 @@ import java.util.Properties; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.base.sink.writer.ElementConverter; import com.getindata.connectors.http.internal.HeaderPreprocessor; @@ -41,6 +42,7 @@ public class HttpSink extends HttpSinkInternal { long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, + DeliveryGuarantee deliveryGuarantee, String endpointUrl, HttpPostRequestCallback httpPostRequestCallback, HeaderPreprocessor headerPreprocessor, @@ -54,6 +56,7 @@ public class HttpSink extends HttpSinkInternal { maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, + deliveryGuarantee, endpointUrl, httpPostRequestCallback, headerPreprocessor, diff --git a/src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java b/src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java index 208bcf01..7c2f581a 100644 --- a/src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java +++ b/src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java @@ -4,8 +4,10 @@ import java.util.Properties; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.util.Preconditions; import com.getindata.connectors.http.internal.HeaderPreprocessor; import com.getindata.connectors.http.internal.SinkHttpClient; @@ -71,6 +73,8 @@ public class HttpSinkBuilder extends private final Properties properties = new Properties(); + private DeliveryGuarantee deliveryGuarantee; + // Mandatory field private String endpointUrl; @@ -92,6 +96,17 @@ public class HttpSinkBuilder extends this.headerPreprocessor = DEFAULT_HEADER_PREPROCESSOR; } + /** + * @param deliveryGuarantee HTTP Sink delivery guarantee + * @return {@link HttpSinkBuilder} itself + */ + public HttpSinkBuilder setDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) { + Preconditions.checkArgument(deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE, + "Only at-least-once and none delivery guarantees are supported."); + this.deliveryGuarantee = deliveryGuarantee; + return this; + } + /** * @param endpointUrl the URL of the endpoint * @return {@link HttpSinkBuilder} itself @@ -181,6 +196,7 @@ public HttpSink build() { Optional.ofNullable(getMaxBatchSizeInBytes()).orElse(DEFAULT_MAX_BATCH_SIZE_IN_B), Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS), Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B), + Optional.ofNullable(deliveryGuarantee).orElse(DeliveryGuarantee.NONE), endpointUrl, httpPostRequestCallback, headerPreprocessor, diff --git a/src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java b/src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java index b5637377..5f3811d2 100644 --- a/src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java +++ b/src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java @@ -1,31 +1,62 @@ package com.getindata.connectors.http.internal; import java.util.List; +import java.util.stream.Collectors; import lombok.Data; import lombok.NonNull; import lombok.ToString; +import com.getindata.connectors.http.internal.config.ResponseItemStatus; import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry; import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest; /** * Data class holding {@link HttpSinkRequestEntry} instances that {@link SinkHttpClient} attempted - * to write, divided into two lists — successful and failed ones. + * to write. */ @Data @ToString public class SinkHttpClientResponse { /** - * A list of successfully written requests. + * A list of requests along with write status. */ @NonNull - private final List successfulRequests; + private final List requests; - /** - * A list of requests that {@link SinkHttpClient} failed to write. - */ - @NonNull - private final List failedRequests; + public List getSuccessfulRequests() { + return requests.stream() + .filter(r -> r.getStatus().equals(ResponseItemStatus.SUCCESS)) + .map(ResponseItem::getRequest) + .collect(Collectors.toList()); + } + + public List getFailedRequests() { + return requests.stream() + .filter(r -> r.getStatus().equals(ResponseItemStatus.FAILURE)) + .map(ResponseItem::getRequest) + .collect(Collectors.toList()); + } + + public List getTemporalRequests() { + return requests.stream() + .filter(r -> r.getStatus().equals(ResponseItemStatus.TEMPORAL)) + .map(ResponseItem::getRequest) + .collect(Collectors.toList()); + } + + public List getIgnoredRequests() { + return requests.stream() + .filter(r -> r.getStatus().equals(ResponseItemStatus.IGNORE)) + .map(ResponseItem::getRequest) + .collect(Collectors.toList()); + } + + @Data + @ToString + public static class ResponseItem { + private final HttpRequest request; + private final ResponseItemStatus status; + } } diff --git a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java index c6e436ad..9c2ec036 100644 --- a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java +++ b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java @@ -19,6 +19,7 @@ public final class HttpConnectorConfigConstants { */ public static final String GID_CONNECTOR_HTTP = "gid.connector.http."; private static final String SOURCE_LOOKUP_PREFIX = GID_CONNECTOR_HTTP + "source.lookup."; + private static final String SINK_PREFIX = GID_CONNECTOR_HTTP + "sink."; /** * A property prefix for http connector header properties @@ -45,9 +46,14 @@ public final class HttpConnectorConfigConstants { public static final String RESULT_TYPE = SOURCE_LOOKUP_PREFIX + "result-type"; // --------- Error code handling configuration --------- - public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST = GID_CONNECTOR_HTTP + "sink.error.code.exclude"; - public static final String HTTP_ERROR_SINK_CODES_LIST = GID_CONNECTOR_HTTP + "sink.error.code"; + public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST = SINK_PREFIX + "error.code.exclude"; + public static final String HTTP_ERROR_SINK_CODES_LIST = SINK_PREFIX + "error.code"; + + public static final String SINK_SUCCESS_CODES = SINK_PREFIX + "success-codes"; + public static final String SINK_RETRY_CODES = SINK_PREFIX + "retry-codes"; + public static final String SINK_IGNORE_RESPONSE_CODES = SINK_PREFIX + "ignored-response-codes"; + // ----------------------------------------------------- public static final String SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER = diff --git a/src/main/java/com/getindata/connectors/http/internal/config/ResponseItemStatus.java b/src/main/java/com/getindata/connectors/http/internal/config/ResponseItemStatus.java new file mode 100644 index 00000000..b84db3dc --- /dev/null +++ b/src/main/java/com/getindata/connectors/http/internal/config/ResponseItemStatus.java @@ -0,0 +1,18 @@ +package com.getindata.connectors.http.internal.config; + +public enum ResponseItemStatus { + SUCCESS("success"), + TEMPORAL("temporal"), + IGNORE("ignore"), + FAILURE("failure"); + + private final String status; + + ResponseItemStatus(String status) { + this.status = status; + } + + public String getStatus() { + return status; + } +} diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java index de37faac..b95e7bb9 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java @@ -5,6 +5,7 @@ import java.util.Collections; import java.util.Properties; +import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.base.sink.AsyncSinkBase; import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; @@ -61,6 +62,8 @@ public class HttpSinkInternal extends AsyncSinkBase httpPostRequestCallback, HeaderPreprocessor headerPreprocessor, @@ -94,9 +98,9 @@ protected HttpSinkInternal( maxTimeInBufferMS, maxRecordSizeInBytes ); - Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(endpointUrl), "The endpoint URL must be set when initializing HTTP Sink."); + this.deliveryGuarantee = deliveryGuarantee; this.endpointUrl = endpointUrl; this.httpPostRequestCallback = Preconditions.checkNotNull( @@ -132,6 +136,7 @@ public StatefulSinkWriter> cr getMaxBatchSizeInBytes(), getMaxTimeInBufferMS(), getMaxRecordSizeInBytes(), + deliveryGuarantee, endpointUrl, sinkHttpClientBuilder.build( properties, @@ -159,6 +164,7 @@ public StatefulSinkWriter> re getMaxBatchSizeInBytes(), getMaxTimeInBufferMS(), getMaxRecordSizeInBytes(), + deliveryGuarantee, endpointUrl, sinkHttpClientBuilder.build( properties, diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java index d17e9213..ca62aeed 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java @@ -1,5 +1,6 @@ package com.getindata.connectors.http.internal.sink; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -10,14 +11,23 @@ import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy; +import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy; +import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy; import org.apache.flink.metrics.Counter; import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import com.getindata.connectors.http.BatchHttpStatusCodeValidationFailedException; import com.getindata.connectors.http.internal.SinkHttpClient; +import com.getindata.connectors.http.internal.SinkHttpClientResponse; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; +import com.getindata.connectors.http.internal.config.ResponseItemStatus; +import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest; import com.getindata.connectors.http.internal.utils.ThreadUtils; /** @@ -32,6 +42,9 @@ @Slf4j public class HttpSinkWriter extends AsyncSinkWriter { + private static final int AIMD_RATE_LIMITING_STRATEGY_INCREASE_RATE = 10; + private static final double AIMD_RATE_LIMITING_STRATEGY_DECREASE_FACTOR = 0.99D; + private static final String HTTP_SINK_WRITER_THREAD_POOL_SIZE = "4"; /** @@ -45,6 +58,8 @@ public class HttpSinkWriter extends AsyncSinkWriter elementConverter, Sink.InitContext context, @@ -54,13 +69,26 @@ public HttpSinkWriter( long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, + DeliveryGuarantee deliveryGuarantee, String endpointUrl, SinkHttpClient sinkHttpClient, Collection> bufferedRequestStates, Properties properties) { - - super(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests, - maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, bufferedRequestStates); + super( + elementConverter, + context, + AsyncSinkWriterConfiguration.builder() + .setMaxBatchSize(maxBatchSize) + .setMaxBatchSizeInBytes(maxBatchSizeInBytes) + .setMaxInFlightRequests(maxInFlightRequests) + .setMaxBufferedRequests(maxBufferedRequests) + .setMaxTimeInBufferMS(maxTimeInBufferMS) + .setMaxRecordSizeInBytes(maxRecordSizeInBytes) + .setRateLimitingStrategy( + buildRateLimitingStrategy(maxInFlightRequests, maxBatchSize)) + .build(), + bufferedRequestStates); + this.deliveryGuarantee = deliveryGuarantee; this.endpointUrl = endpointUrl; this.sinkHttpClient = sinkHttpClient; @@ -79,7 +107,19 @@ public HttpSinkWriter( "http-sink-writer-worker", ThreadUtils.LOGGING_EXCEPTION_HANDLER)); } - // TODO: Reintroduce retries by adding backoff policy + private static RateLimitingStrategy buildRateLimitingStrategy( + int maxInFlightRequests, int maxBatchSize) { + return CongestionControlRateLimitingStrategy.builder() + .setMaxInFlightRequests(maxInFlightRequests) + .setInitialMaxInFlightMessages(maxBatchSize) + .setScalingStrategy( + AIMDScalingStrategy.builder(maxBatchSize * maxInFlightRequests) + .setIncreaseRate(AIMD_RATE_LIMITING_STRATEGY_INCREASE_RATE) + .setDecreaseFactor(AIMD_RATE_LIMITING_STRATEGY_DECREASE_FACTOR) + .build()) + .build(); + } + @Override protected void submitRequestEntries( List requestEntries, @@ -87,37 +127,88 @@ protected void submitRequestEntries( var future = sinkHttpClient.putRequests(requestEntries, endpointUrl); future.whenCompleteAsync((response, err) -> { if (err != null) { - int failedRequestsNumber = requestEntries.size(); - log.error( - "Http Sink fatally failed to write all {} requests", - failedRequestsNumber); - numRecordsSendErrorsCounter.inc(failedRequestsNumber); - - // TODO: Make `HttpSinkInternal` retry the failed requests. - // Currently, it does not retry those at all, only adds their count - // to the `numRecordsSendErrors` metric. It is due to the fact we do not have - // a clear image how we want to do it, so it would be both efficient and correct. - //requestResult.accept(requestEntries); - } else if (response.getFailedRequests().size() > 0) { - int failedRequestsNumber = response.getFailedRequests().size(); - log.error("Http Sink failed to write and will retry {} requests", - failedRequestsNumber); - numRecordsSendErrorsCounter.inc(failedRequestsNumber); - - // TODO: Make `HttpSinkInternal` retry the failed requests. Currently, - // it does not retry those at all, only adds their count to the - // `numRecordsSendErrors` metric. It is due to the fact we do not have - // a clear image how we want to do it, so it would be both efficient and correct. - - //requestResult.accept(response.getFailedRequests()); - //} else { - //requestResult.accept(Collections.emptyList()); - //} + handleFullyFailedRequest(err, requestEntries, requestResult); + } else { + List failedRequests = response.getFailedRequests(); + List ignoredRequests = response.getIgnoredRequests(); + List temporalRequests = response.getTemporalRequests(); + + if (!failedRequests.isEmpty()) { + numRecordsSendErrorsCounter.inc(failedRequests.size()); + log.error( + "failed requests: {}, throwing BatchHttpStatusCodeValidationFailedException from sink", + failedRequests + ); + getFatalExceptionCons().accept(new BatchHttpStatusCodeValidationFailedException( + String.format("Received %d fatal response codes", failedRequests.size()), failedRequests) + ); + } + + if (!ignoredRequests.isEmpty()) { + log.info("Ignoring {} requests", ignoredRequests.size()); + } + + if (!temporalRequests.isEmpty()) { + numRecordsSendErrorsCounter.inc(temporalRequests.size()); + if (deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE) { + log.warn("Retrying {} requests", temporalRequests.size()); + handlePartiallyFailedRequest(response, requestEntries, requestResult); + } else { + log.warn( + "Http Sink failed to write {} requests but will continue due to {} DeliveryGuarantee", + temporalRequests.size(), + deliveryGuarantee + ); + requestResult.accept(Collections.emptyList()); + } + } else { + requestResult.accept(Collections.emptyList()); + } } - requestResult.accept(Collections.emptyList()); }, sinkWriterThreadPool); } + private void handleFullyFailedRequest(Throwable err, + List requestEntries, + Consumer> requestResult) { + int failedRequestsNumber = requestEntries.size(); + numRecordsSendErrorsCounter.inc(failedRequestsNumber); + + if (deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE) { + // Retry all requests. + log.error("Http Sink fatally failed to write and will retry {} requests", failedRequestsNumber, err); + requestResult.accept(requestEntries); + } else if (deliveryGuarantee == DeliveryGuarantee.NONE) { + // Do not retry failed requests. + log.error( + "Http Sink fatally failed to write {} requests but will continue due to {} DeliveryGuarantee", + failedRequestsNumber, + deliveryGuarantee, + err + ); + requestResult.accept(Collections.emptyList()); + } else { + throw new UnsupportedOperationException( + "Unsupported delivery guarantee: " + deliveryGuarantee); + } + } + + private void handlePartiallyFailedRequest(SinkHttpClientResponse response, + List requestEntries, + Consumer> requestResult) { + // Assumption: the order of response.requests is the same as requestEntries. + // See com.getindata.connectors.http.internal.sink.httpclient. + // JavaNetSinkHttpClient#putRequests where requests are submitted sequentially and + // then their futures are joined sequentially too. + List failedRequestEntries = new ArrayList<>(); + for (int i = 0; i < response.getRequests().size(); ++i) { + if (response.getRequests().get(i).getStatus().equals(ResponseItemStatus.TEMPORAL)) { + failedRequestEntries.add(requestEntries.get(i)); + } + } + requestResult.accept(failedRequestEntries); + } + @Override protected long getSizeInBytes(HttpSinkRequestEntry s) { return s.getSizeInBytes(); diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java index 7e4c19ff..649995f4 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java @@ -3,6 +3,7 @@ import java.net.http.HttpClient; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; @@ -11,16 +12,18 @@ import lombok.extern.slf4j.Slf4j; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.ConfigurationException; import com.getindata.connectors.http.HttpPostRequestCallback; import com.getindata.connectors.http.internal.HeaderPreprocessor; import com.getindata.connectors.http.internal.SinkHttpClient; import com.getindata.connectors.http.internal.SinkHttpClientResponse; +import com.getindata.connectors.http.internal.SinkHttpClientResponse.ResponseItem; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; +import com.getindata.connectors.http.internal.config.ResponseItemStatus; import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry; -import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker; -import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig; -import com.getindata.connectors.http.internal.status.HttpStatusCodeChecker; +import com.getindata.connectors.http.internal.status.HttpCodesParser; +import com.getindata.connectors.http.internal.status.HttpResponseChecker; import com.getindata.connectors.http.internal.utils.HttpHeaderUtils; /** @@ -34,12 +37,12 @@ public class JavaNetSinkHttpClient implements SinkHttpClient { private final Map headerMap; - private final HttpStatusCodeChecker statusCodeChecker; - private final HttpPostRequestCallback httpPostRequestCallback; private final RequestSubmitter requestSubmitter; + private final HttpResponseChecker responseChecker; + public JavaNetSinkHttpClient( Properties properties, HttpPostRequestCallback httpPostRequestCallback, @@ -53,17 +56,7 @@ public JavaNetSinkHttpClient( headerPreprocessor ); - // TODO Inject this via constructor when implementing a response processor. - // Processor will be injected and it will wrap statusChecker implementation. - ComposeHttpStatusCodeCheckerConfig checkerConfig = - ComposeHttpStatusCodeCheckerConfig.builder() - .properties(properties) - .whiteListPrefix(HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODE_WHITE_LIST) - .errorCodePrefix(HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST) - .build(); - - this.statusCodeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); - + this.responseChecker = createHttpResponseChecker(properties); this.headersAndValues = HttpHeaderUtils.toHeaderAndValueArray(this.headerMap); this.requestSubmitter = requestSubmitterFactory.createSubmitter( properties, @@ -71,12 +64,76 @@ public JavaNetSinkHttpClient( ); } + public static HttpResponseChecker createHttpResponseChecker(Properties properties) { + try { + String deprecatedIgnoreExpr = properties.getProperty( + HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODE_WHITE_LIST, + "" + ); + String deprecatedErrorExpr = properties.getProperty( + HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST, + "" + ); + + if (deprecatedIgnoreExpr.replace(',',' ').trim().isEmpty() + && deprecatedErrorExpr.replace(',',' ').trim().isEmpty()) { + return createHttpResponseCheckerWithDefaults(properties); + } else { + return createBackwardsCompatibleResponseChecker(properties); + } + } catch (ConfigurationException e) { + throw new IllegalStateException(e); + } + } + + private static HttpResponseChecker createHttpResponseCheckerWithDefaults(Properties properties) + throws ConfigurationException { + String ignoreCodeExpr = properties.getProperty( + HttpConnectorConfigConstants.SINK_IGNORE_RESPONSE_CODES, + "" + ); + String retryCodeExpr = properties.getProperty( + HttpConnectorConfigConstants.SINK_RETRY_CODES, + "500,503,504" + ); + String successCodeExpr = properties.getProperty( + HttpConnectorConfigConstants.SINK_SUCCESS_CODES, + "1XX,2XX,3XX" + ); + + return new HttpResponseChecker(successCodeExpr, retryCodeExpr, ignoreCodeExpr); + } + + private static HttpResponseChecker createBackwardsCompatibleResponseChecker(Properties properties) + throws ConfigurationException { + String ignoreCodeExpr = properties.getProperty( + HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODE_WHITE_LIST, + "" + ); + String errorCodeExpr = properties.getProperty( + HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST, + "4XX,5XX" + ); + + //backwards compatibility + var ignoreErrorCodes = HttpCodesParser.parse(ignoreCodeExpr); + var errorCodes = HttpCodesParser.parse(errorCodeExpr); + var retryCodes = HttpCodesParser.parse("500,503,504"); + + var successCodes = new HashSet<>(HttpCodesParser.parse("1XX,2XX,3XX,4XX,5XX")); + successCodes.removeAll(retryCodes); + successCodes.removeAll(errorCodes); + return new HttpResponseChecker(successCodes, retryCodes, ignoreErrorCodes); + } + @Override public CompletableFuture putRequests( List requestEntries, String endpointUrl) { return submitRequests(requestEntries, endpointUrl) - .thenApply(responses -> prepareSinkHttpClientResponse(responses, endpointUrl)); + .thenApply(responses -> + prepareSinkHttpClientResponse(responses, endpointUrl) + ); } private CompletableFuture> submitRequests( @@ -92,26 +149,29 @@ private CompletableFuture> submitRequests( private SinkHttpClientResponse prepareSinkHttpClientResponse( List responses, String endpointUrl) { - var successfulResponses = new ArrayList(); - var failedResponses = new ArrayList(); + var responseItems = new ArrayList(); for (var response : responses) { var sinkRequestEntry = response.getHttpRequest(); var optResponse = response.getResponse(); - httpPostRequestCallback.call( - optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap); + httpPostRequestCallback.call(optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap); - // TODO Add response processor here and orchestrate it with statusCodeChecker. - if (optResponse.isEmpty() || - statusCodeChecker.isErrorCode(optResponse.get().statusCode())) { - failedResponses.add(sinkRequestEntry); + final ResponseItemStatus status; + if (optResponse.isEmpty() || responseChecker.isTemporalError(optResponse.get())) { + status = ResponseItemStatus.TEMPORAL; + } else if (responseChecker.isIgnoreCode(optResponse.get())) { + status = ResponseItemStatus.IGNORE; + } else if (responseChecker.isSuccessful(optResponse.get())) { + status = ResponseItemStatus.SUCCESS; } else { - successfulResponses.add(sinkRequestEntry); + status = ResponseItemStatus.FAILURE; } + + responseItems.add(new ResponseItem(sinkRequestEntry, status)); } - return new SinkHttpClientResponse(successfulResponses, failedResponses); + return new SinkHttpClientResponse(responseItems); } @VisibleForTesting diff --git a/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java b/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java deleted file mode 100644 index 015c068c..00000000 --- a/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java +++ /dev/null @@ -1,161 +0,0 @@ -package com.getindata.connectors.http.internal.status; - -import java.util.Arrays; -import java.util.HashSet; -import java.util.Properties; -import java.util.Set; -import java.util.stream.Collectors; - -import lombok.AccessLevel; -import lombok.Builder; -import lombok.Data; -import lombok.RequiredArgsConstructor; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.StringUtils; - -import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; - -/** - * An implementation of {@link HttpStatusCodeChecker} that checks Http Status code against - * white list, concrete value or {@link HttpResponseCodeType} - */ -public class ComposeHttpStatusCodeChecker implements HttpStatusCodeChecker { - - private static final Set DEFAULT_ERROR_CODES = - Set.of( - new TypeStatusCodeChecker(HttpResponseCodeType.CLIENT_ERROR), - new TypeStatusCodeChecker(HttpResponseCodeType.SERVER_ERROR) - ); - - private static final int MIN_HTTP_STATUS_CODE = 100; - - /** - * Set of {@link HttpStatusCodeChecker} for white listed status codes. - */ - private final Set excludedCodes; - - /** - * Set of {@link HttpStatusCodeChecker} that check status code againts value match or {@link - * HttpResponseCodeType} match. - */ - private final Set errorCodes; - - public ComposeHttpStatusCodeChecker(ComposeHttpStatusCodeCheckerConfig config) { - excludedCodes = prepareWhiteList(config); - errorCodes = prepareErrorCodes(config); - } - - /** - * Checks whether given status code is considered as a error code. - * This implementation checks if status code matches any single value mask like "404" - * or http type mask such as "4XX". Code that matches one of those masks and is not on a - * white list will be considered as error code. - * @param statusCode http status code to assess. - * @return true if status code is considered as error or false if not. - */ - public boolean isErrorCode(int statusCode) { - - Preconditions.checkArgument( - statusCode >= MIN_HTTP_STATUS_CODE, - String.format( - "Provided invalid Http status code %s," - + " status code should be equal or bigger than %d.", - statusCode, - MIN_HTTP_STATUS_CODE) - ); - - boolean isWhiteListed = excludedCodes.stream() - .anyMatch(check -> check.isWhiteListed(statusCode)); - - return !isWhiteListed - && errorCodes.stream() - .anyMatch(httpStatusCodeChecker -> httpStatusCodeChecker.isErrorCode(statusCode)); - } - - private Set prepareErrorCodes( - ComposeHttpStatusCodeCheckerConfig config) { - - Properties properties = config.getProperties(); - String errorCodePrefix = config.getErrorCodePrefix(); - - String errorCodes = - properties.getProperty(errorCodePrefix, ""); - - if (StringUtils.isNullOrWhitespaceOnly(errorCodes)) { - return DEFAULT_ERROR_CODES; - } else { - String[] splitCodes = errorCodes.split(HttpConnectorConfigConstants.PROP_DELIM); - return prepareErrorCodes(splitCodes); - } - } - - /** - * Process given array of status codes and assign them to - * {@link SingleValueHttpStatusCodeChecker} for full codes such as 100, 404 etc. or to - * {@link TypeStatusCodeChecker} for codes that were constructed with "XX" mask - */ - private Set prepareErrorCodes(String[] statusCodes) { - - Set errorCodes = new HashSet<>(); - for (String sCode : statusCodes) { - if (!StringUtils.isNullOrWhitespaceOnly(sCode)) { - String trimCode = sCode.toUpperCase().trim(); - Preconditions.checkArgument( - trimCode.length() == 3, - "Status code should contain three characters. Provided [%s]", - trimCode); - - // at this point we have trim, upper case 3 character status code. - if (isTypeCode(trimCode)) { - int code = Integer.parseInt(trimCode.replace("X", "")); - errorCodes.add(new TypeStatusCodeChecker(HttpResponseCodeType.getByCode(code))); - } else { - errorCodes.add( - new SingleValueHttpStatusCodeChecker(Integer.parseInt(trimCode)) - ); - } - } - } - return (errorCodes.isEmpty()) ? DEFAULT_ERROR_CODES : errorCodes; - } - - private Set prepareWhiteList( - ComposeHttpStatusCodeCheckerConfig config) { - - Properties properties = config.getProperties(); - String whiteListPrefix = config.getWhiteListPrefix(); - - return Arrays.stream( - properties.getProperty(whiteListPrefix, "") - .split(HttpConnectorConfigConstants.PROP_DELIM)) - .filter(sCode -> !StringUtils.isNullOrWhitespaceOnly(sCode)) - .map(String::trim) - .mapToInt(Integer::parseInt) - .mapToObj(WhiteListHttpStatusCodeChecker::new) - .collect(Collectors.toSet()); - } - - /** - * This method checks if "code" param matches "digit + XX" mask. This method expects that - * provided string will be 3 elements long, trim and upper case. - * - * @param code to check if it contains XX on second ant third position. Parameter is expected to - * be 3 characters long, trim and uppercase. - * @return true if string matches "anything + XX" and false if not. - */ - private boolean isTypeCode(final String code) { - return code.charAt(1) == 'X' && code.charAt(2) == 'X'; - } - - @Data - @Builder - @RequiredArgsConstructor(access = AccessLevel.PRIVATE) - public static class ComposeHttpStatusCodeCheckerConfig { - - private final String whiteListPrefix; - - private final String errorCodePrefix; - - private final Properties properties; - } -} diff --git a/src/main/java/com/getindata/connectors/http/internal/status/HttpResponseChecker.java b/src/main/java/com/getindata/connectors/http/internal/status/HttpResponseChecker.java index 6b59c4bf..adc1580c 100644 --- a/src/main/java/com/getindata/connectors/http/internal/status/HttpResponseChecker.java +++ b/src/main/java/com/getindata/connectors/http/internal/status/HttpResponseChecker.java @@ -13,16 +13,28 @@ public class HttpResponseChecker { private final Set successCodes; private final Set temporalErrorCodes; + private final Set ignoreCodes; - HttpResponseChecker(@NonNull String successCodeExpr, @NonNull String temporalErrorCodeExpr) - throws ConfigurationException { - this(HttpCodesParser.parse(successCodeExpr), HttpCodesParser.parse(temporalErrorCodeExpr)); + public HttpResponseChecker( + @NonNull String successCodeExpr, + @NonNull String temporalErrorCodeExpr, + @NonNull String ignoreCodeExpr + ) throws ConfigurationException { + this( + HttpCodesParser.parse(successCodeExpr), + HttpCodesParser.parse(temporalErrorCodeExpr), + HttpCodesParser.parse(ignoreCodeExpr) + ); } - public HttpResponseChecker(@NonNull Set successCodes, @NonNull Set temporalErrorCodes) - throws ConfigurationException { + public HttpResponseChecker( + @NonNull Set successCodes, + @NonNull Set temporalErrorCodes, + @NonNull Set ignoreCodes + ) throws ConfigurationException { this.successCodes = successCodes; this.temporalErrorCodes = temporalErrorCodes; + this.ignoreCodes = ignoreCodes; validate(); } @@ -31,7 +43,7 @@ public boolean isSuccessful(HttpResponse response) { } public boolean isSuccessful(int httpStatusCode) { - return successCodes.contains(httpStatusCode); + return successCodes.contains(httpStatusCode) || ignoreCodes.contains(httpStatusCode); } public boolean isTemporalError(HttpResponse response) { @@ -42,12 +54,33 @@ public boolean isTemporalError(int httpStatusCode) { return temporalErrorCodes.contains(httpStatusCode); } + public boolean isIgnoreCode(HttpResponse response) { + return isIgnoreCode(response.statusCode()); + } + + public boolean isIgnoreCode(int httpStatusCode) { + return ignoreCodes.contains(httpStatusCode); + } + + public boolean isErrorCode(HttpResponse response) { + return isErrorCode(response.statusCode()); + } + + public boolean isErrorCode(int httpStatusCode) { + return !isTemporalError(httpStatusCode) && !isSuccessful(httpStatusCode); + } + private void validate() throws ConfigurationException { - if (successCodes.isEmpty()) { - throw new ConfigurationException("Success code list can not be empty"); + if (successCodes.isEmpty() && ignoreCodes.isEmpty()) { + throw new ConfigurationException("Success and ignore code lists can not be empty"); } - var intersection = new HashSet<>(successCodes); - intersection.retainAll(temporalErrorCodes); + HashSet intersection = new HashSet<>(temporalErrorCodes); + + HashSet combinedSuccessIgnoreCodes = new HashSet<>(); + combinedSuccessIgnoreCodes.addAll(successCodes); + combinedSuccessIgnoreCodes.addAll(ignoreCodes); + + intersection.retainAll(combinedSuccessIgnoreCodes); if (!intersection.isEmpty()) { throw new ConfigurationException("Http codes " + intersection + " can not be used as both success and retry codes"); diff --git a/src/main/java/com/getindata/connectors/http/internal/status/HttpResponseCodeType.java b/src/main/java/com/getindata/connectors/http/internal/status/HttpResponseCodeType.java deleted file mode 100644 index 71f174eb..00000000 --- a/src/main/java/com/getindata/connectors/http/internal/status/HttpResponseCodeType.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.getindata.connectors.http.internal.status; - -import java.util.HashMap; -import java.util.Map; - -/** - * This enum represents HTTP response code types, grouped by "hundreds" digit. - */ -public enum HttpResponseCodeType { - - INFO(1), - SUCCESS(2), - REDIRECTION(3), - CLIENT_ERROR(4), - SERVER_ERROR(5); - - private static final Map map; - - static { - map = new HashMap<>(); - for (HttpResponseCodeType httpResponseCodeType : HttpResponseCodeType.values()) { - map.put(httpResponseCodeType.httpTypeCode, httpResponseCodeType); - } - } - - private final int httpTypeCode; - - HttpResponseCodeType(int httpTypeCode) { - this.httpTypeCode = httpTypeCode; - } - - /** - * @param statusCode Http status code to get the {@link HttpResponseCodeType} instance for. - * @return a {@link HttpResponseCodeType} instance based on http type code, for example {@code - * HttpResponseCodeType.getByCode(1)} will return {@link HttpResponseCodeType#INFO} type. - */ - public static HttpResponseCodeType getByCode(int statusCode) { - return map.get(statusCode); - } - - /** - * @return a "hundreds" digit that represents given {@link HttpResponseCodeType} instance. - * For example {@code HttpResponseCodeType.INFO.getHttpTypeCode()} will return 1 since HTTP - * information repossess have status codes in range 100 - 199. - */ - public int getHttpTypeCode() { - return this.httpTypeCode; - } -} diff --git a/src/main/java/com/getindata/connectors/http/internal/status/HttpStatusCodeChecker.java b/src/main/java/com/getindata/connectors/http/internal/status/HttpStatusCodeChecker.java deleted file mode 100644 index 6af0344c..00000000 --- a/src/main/java/com/getindata/connectors/http/internal/status/HttpStatusCodeChecker.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.getindata.connectors.http.internal.status; - -/** - * Base interface for all classes that would validate HTTP status - * code whether it is an error or not. - */ -public interface HttpStatusCodeChecker { - - /** - * Validates http status code wheter it is considered as error code. The logic for - * what status codes are considered as "errors" depends on the concreted implementation - * @param statusCode http status code to assess. - * @return true if statusCode is considered as Error and false if not. - */ - boolean isErrorCode(int statusCode); -} diff --git a/src/main/java/com/getindata/connectors/http/internal/status/SingleValueHttpStatusCodeChecker.java b/src/main/java/com/getindata/connectors/http/internal/status/SingleValueHttpStatusCodeChecker.java deleted file mode 100644 index b52951ed..00000000 --- a/src/main/java/com/getindata/connectors/http/internal/status/SingleValueHttpStatusCodeChecker.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.getindata.connectors.http.internal.status; - -import lombok.EqualsAndHashCode; -import lombok.RequiredArgsConstructor; - -/** - * An implementation of {@link HttpStatusCodeChecker} that validates status code against - * constant value. - */ -@RequiredArgsConstructor -@EqualsAndHashCode -public class SingleValueHttpStatusCodeChecker implements HttpStatusCodeChecker { - - /** - * A reference http status code to compare with. - */ - private final int errorCode; - - /** - * Validates given statusCode against constant value. - * @param statusCode http status code to assess. - * @return true if status code is considered as error or false if not. - */ - @Override - public boolean isErrorCode(int statusCode) { - return errorCode == statusCode; - } -} diff --git a/src/main/java/com/getindata/connectors/http/internal/status/TypeStatusCodeChecker.java b/src/main/java/com/getindata/connectors/http/internal/status/TypeStatusCodeChecker.java deleted file mode 100644 index df942879..00000000 --- a/src/main/java/com/getindata/connectors/http/internal/status/TypeStatusCodeChecker.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.getindata.connectors.http.internal.status; - -import lombok.EqualsAndHashCode; - -/** - * Implementation of {@link HttpStatusCodeChecker} that verifies if given Http status code - * belongs to specific HTTP code type family. For example if it any of 100's 200's or 500's code. - */ -@EqualsAndHashCode -public class TypeStatusCodeChecker implements HttpStatusCodeChecker { - - /** - * First digit from HTTP status code that describes a type of code, - * for example 1 for all 100's, 5 for all 500's. - */ - private final int httpTypeCode; - - /** - * Creates TypeStatusCodeChecker for given {@link HttpResponseCodeType} - * - * @param httpResponseCodeType {@link HttpResponseCodeType} for this {@link - * TypeStatusCodeChecker} instance. - */ - public TypeStatusCodeChecker(HttpResponseCodeType httpResponseCodeType) { - this.httpTypeCode = httpResponseCodeType.getHttpTypeCode(); - } - - /** - * Checks whether given status code belongs to Http code status type. - * For example: - *
{@code
-     *    TypeStatusCodeChecker checker =  new TypeStatusCodeChecker(5);
-     *    checker.isErrorCode(505); <- will return true.
-     *    }
-     * 
- * @param statusCode http status code to assess. - * @return true if status code is considered as error or false if not. - */ - @Override - public boolean isErrorCode(int statusCode) { - return statusCode / 100 == httpTypeCode; - } -} diff --git a/src/main/java/com/getindata/connectors/http/internal/status/WhiteListHttpStatusCodeChecker.java b/src/main/java/com/getindata/connectors/http/internal/status/WhiteListHttpStatusCodeChecker.java deleted file mode 100644 index 2aa65c65..00000000 --- a/src/main/java/com/getindata/connectors/http/internal/status/WhiteListHttpStatusCodeChecker.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.getindata.connectors.http.internal.status; - -import lombok.EqualsAndHashCode; -import lombok.RequiredArgsConstructor; - -/** - * Class that implements logic of a "white list" against single constant value. - */ -@RequiredArgsConstructor -@EqualsAndHashCode -public class WhiteListHttpStatusCodeChecker { - - private final int whiteListCode; - - /** - * Checks if given statusCode is considered as "white listed" - * @param statusCode status code to check. - * @return true if given statusCode is white listed and false if not. - */ - public boolean isWhiteListed(int statusCode) { - return whiteListCode == statusCode; - } -} diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java index 81a75674..2afd2097 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java @@ -9,11 +9,9 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; @@ -61,7 +59,7 @@ public class JavaNetHttpPollingClient implements PollingClient { private final ObjectMapper objectMapper; private final HttpPostRequestCallback httpPostRequestCallback; private final HttpLookupConfig options; - private final Set ignoredErrorCodes; + private final HttpResponseChecker responseChecker; private final boolean continueOnError; public JavaNetHttpPollingClient( @@ -77,17 +75,16 @@ public JavaNetHttpPollingClient( this.options = options; var config = options.getReadableConfig(); - this.ignoredErrorCodes = HttpCodesParser.parse(config.get(SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES)); + var ignoreCodes = HttpCodesParser.parse(config.get(SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES)); var errorCodes = HttpCodesParser.parse(config.get(SOURCE_LOOKUP_HTTP_RETRY_CODES)); - var successCodes = new HashSet(); - successCodes.addAll(HttpCodesParser.parse(config.get(SOURCE_LOOKUP_HTTP_SUCCESS_CODES))); - successCodes.addAll(ignoredErrorCodes); + var successCodes = HttpCodesParser.parse(config.get(SOURCE_LOOKUP_HTTP_SUCCESS_CODES)); this.continueOnError = config.get(SOURCE_LOOKUP_CONTINUE_ON_ERROR); + this.responseChecker = new HttpResponseChecker(successCodes, errorCodes, ignoreCodes); this.httpClient = HttpClientWithRetry.builder() .httpClient(httpClient) .retryConfig(RetryConfigProvider.create(config)) - .responseChecker(new HttpResponseChecker(successCodes, errorCodes)) + .responseChecker(responseChecker) .build(); } @@ -213,7 +210,7 @@ private HttpRowDataWrapper processHttpResponse( var responseBody = response.body(); log.debug("Received status code [{}] for RestTableSource request", response.statusCode()); - if (!isError && (StringUtils.isNullOrWhitespaceOnly(responseBody) || ignoreResponse(response))) { + if (!isError && (StringUtils.isNullOrWhitespaceOnly(responseBody) || responseChecker.isIgnoreCode(response))) { return HttpRowDataWrapper.builder() .data(Collections.emptyList()) .httpCompletionState(HttpCompletionState.SUCCESS) @@ -291,8 +288,4 @@ private List deserializeArray(byte[] rawBytes) throws IOException { } return result; } - - private boolean ignoreResponse(HttpResponse response) { - return ignoredErrorCodes.contains(response.statusCode()); - } } diff --git a/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java b/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java index f634e1d6..acae15e1 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java @@ -25,6 +25,7 @@ import com.getindata.connectors.http.internal.sink.httpclient.JavaNetSinkHttpClient; import com.getindata.connectors.http.internal.table.SerializationSchemaElementConverter; import com.getindata.connectors.http.internal.utils.HttpHeaderUtils; +import static com.getindata.connectors.http.internal.table.sink.HttpDynamicSinkConnectorOptions.DELIVERY_GUARANTEE; import static com.getindata.connectors.http.internal.table.sink.HttpDynamicSinkConnectorOptions.INSERT_METHOD; import static com.getindata.connectors.http.internal.table.sink.HttpDynamicSinkConnectorOptions.URL; @@ -125,6 +126,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { HttpSinkBuilder builder = HttpSink .builder() + .setDeliveryGuarantee(tableOptions.get(DELIVERY_GUARANTEE)) .setEndpointUrl(tableOptions.get(URL)) .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new) .setHttpPostRequestCallback(httpPostRequestCallback) diff --git a/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSinkConnectorOptions.java b/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSinkConnectorOptions.java index b87b6eb7..79036051 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSinkConnectorOptions.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSinkConnectorOptions.java @@ -2,6 +2,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.connector.base.DeliveryGuarantee; import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.SINK_REQUEST_CALLBACK_IDENTIFIER; @@ -24,4 +25,12 @@ public class HttpDynamicSinkConnectorOptions { ConfigOptions.key(SINK_REQUEST_CALLBACK_IDENTIFIER) .stringType() .defaultValue(Slf4jHttpPostRequestCallbackFactory.IDENTIFIER); + + public static final ConfigOption DELIVERY_GUARANTEE = + ConfigOptions.key("sink.delivery-guarantee") + .enumType(DeliveryGuarantee.class) + .defaultValue(DeliveryGuarantee.NONE) + .withDescription("Defines the delivery semantic for the HTTP sink. " + + "Accepted enumerations are 'at-least-once', and 'none'. " + + "'exactly-once' semantic is not supported."); } diff --git a/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicTableSinkFactory.java b/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicTableSinkFactory.java index 549ff650..8bb01e34 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicTableSinkFactory.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicTableSinkFactory.java @@ -5,6 +5,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory; import org.apache.flink.connector.base.table.sink.options.AsyncSinkConfigurationValidator; import org.apache.flink.table.connector.sink.DynamicTableSink; @@ -82,6 +83,7 @@ public Set> optionalOptions() { var options = super.optionalOptions(); options.add(INSERT_METHOD); options.add(REQUEST_CALLBACK_IDENTIFIER); + options.add(DELIVERY_GUARANTEE); return options; } @@ -96,5 +98,11 @@ private void validateHttpSinkOptions(ReadableConfig tableOptions) )); } }); + tableOptions.getOptional(DELIVERY_GUARANTEE).ifPresent(deliveryGuarantee -> { + if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) { + throw new IllegalArgumentException("'exactly-once' semantic is not supported. " + + "It is expected to be either 'none' or 'at-least-once."); + } + }); } } diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java index cbf33c2f..d0e1dc3a 100644 --- a/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java @@ -15,12 +15,14 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.MetricReporterFactory; +import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.testutils.junit.extensions.ContextClassLoaderExtension; import org.junit.jupiter.api.AfterEach; @@ -30,6 +32,7 @@ import static com.github.tomakehurst.wiremock.client.WireMock.*; import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import com.getindata.connectors.http.HttpSink; @@ -189,18 +192,43 @@ public List testConnection( } @Test - public void testServerErrorConnection() throws Exception { + public void testNoRetryWithNoneConnection() throws Exception { wireMockServer.stubFor(any(urlPathEqualTo("/myendpoint")) - .withHeader("Content-Type", equalTo("application/json")) .inScenario("Retry Scenario") .whenScenarioStateIs(STARTED) .willReturn(serverError()) .willSetStateTo("Cause Success")); - wireMockServer.stubFor(any(urlPathEqualTo("/myendpoint")) - .withHeader("Content-Type", equalTo("application/json")) - .inScenario("Retry Scenario") + + var source = env.fromCollection(List.of(messages.get(0))); + var httpSink = HttpSink.builder() + .setEndpointUrl("http://localhost:" + SERVER_PORT + "/myendpoint") + .setElementConverter( + (s, _context) -> + new HttpSinkRequestEntry("POST", s.getBytes(StandardCharsets.UTF_8))) + .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new) + .setDeliveryGuarantee(DeliveryGuarantee.NONE) + .build(); + source.sinkTo(httpSink); + + env.execute("Http Sink none delivery guarantee retry"); + + assertEquals(1, SendErrorsTestReporterFactory.getCount()); + } + + @Test + public void testRetryableServerErrorConnection() throws Exception { + wireMockServer.stubFor(post(urlPathEqualTo("/myendpoint")) + .withRequestBody(equalTo("[{\"http-sink-id\":0}]")) + .inScenario("Retry Success Scenario") + .whenScenarioStateIs(STARTED) + .willReturn(serviceUnavailable()) + .willSetStateTo("Cause Success")); + + wireMockServer.stubFor(post(urlPathEqualTo("/myendpoint")) + .withRequestBody(equalTo("[{\"http-sink-id\":0}]")) + .inScenario("Retry Success Scenario") .whenScenarioStateIs("Cause Success") - .willReturn(aResponse().withStatus(200)) + .willReturn(aResponse().withStatus(200).withBody("msg1")) .willSetStateTo("Cause Success")); var source = env.fromCollection(List.of(messages.get(0))); @@ -208,19 +236,68 @@ public void testServerErrorConnection() throws Exception { .setEndpointUrl("http://localhost:" + SERVER_PORT + "/myendpoint") .setElementConverter( (s, _context) -> - new HttpSinkRequestEntry("POST", s.getBytes(StandardCharsets.UTF_8))) + new HttpSinkRequestEntry("POST", s.getBytes(StandardCharsets.UTF_8))) + .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new) + .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) + .build(); + source.sinkTo(httpSink); + env.execute("Http Sink test failed connection"); + + assertEquals(1, SendErrorsTestReporterFactory.getCount()); + + var postedRequests = wireMockServer + .findAll(postRequestedFor(urlPathEqualTo("/myendpoint"))); + assertEquals(2, postedRequests.size()); + assertEquals(postedRequests.get(0).getBodyAsString(), postedRequests.get(1).getBodyAsString()); + } + + @Test + public void testMixedRetryable() throws Exception { + wireMockServer.stubFor(post(urlPathEqualTo("/myendpoint")) + .withRequestBody(equalTo("[{\"http-sink-id\":0}]")) + .inScenario("Retry Success Scenario") + .whenScenarioStateIs(STARTED) + .willReturn(serviceUnavailable()) + .willSetStateTo("Cause Success")); + + wireMockServer.stubFor(post(urlPathEqualTo("/myendpoint")) + .withRequestBody(equalTo("[{\"http-sink-id\":0}]")) + .inScenario("Retry Success Scenario") + .whenScenarioStateIs("Cause Success") + .willReturn(aResponse().withStatus(200).withBody("msg1")) + .willSetStateTo("Cause Success")); + + wireMockServer.stubFor(post(urlPathEqualTo("/myendpoint")) + .withRequestBody(equalTo("[{\"http-sink-id\":1}]")) + .inScenario("Retry Success Scenario") + .whenScenarioStateIs(STARTED) + .willReturn(aResponse().withStatus(200).withBody("msg2")) + .willSetStateTo(STARTED)); + + wireMockServer.stubFor(post(urlPathEqualTo("/myendpoint")) + .withRequestBody(equalTo("[{\"http-sink-id\":1}]")) + .inScenario("Retry Success Scenario") + .whenScenarioStateIs("Cause Success") + .willReturn(aResponse().withStatus(200).withBody("msg2")) + .willSetStateTo("Cause Success")); + + var source = env.fromCollection(List.of(messages.get(0), messages.get(1))); + var httpSink = HttpSink.builder() + .setEndpointUrl("http://localhost:" + SERVER_PORT + "/myendpoint") + .setElementConverter( + (s, _context) -> + new HttpSinkRequestEntry("POST", s.getBytes(StandardCharsets.UTF_8))) .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new) + .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); source.sinkTo(httpSink); env.execute("Http Sink test failed connection"); assertEquals(1, SendErrorsTestReporterFactory.getCount()); - // TODO: reintroduce along with the retries - // var postedRequests = wireMockServer - // .findAll(postRequestedFor(urlPathEqualTo("/myendpoint"))); - // assertEquals(2, postedRequests.size()); - // assertEquals(postedRequests.get(0).getBodyAsString(), - // postedRequests.get(1).getBodyAsString()); + + var postedRequests = wireMockServer + .findAll(postRequestedFor(urlPathEqualTo("/myendpoint"))); + assertEquals(3, postedRequests.size()); } @Test @@ -249,7 +326,7 @@ public void testFailedConnection() throws Exception { .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new) .build(); source.sinkTo(httpSink); - env.execute("Http Sink test failed connection"); + assertThrows(JobExecutionException.class, () -> env.execute("Http Sink test failed connection")); assertEquals(1, SendErrorsTestReporterFactory.getCount()); // var postedRequests = wireMockServer diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkWriterTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkWriterTest.java index db1975ed..d9bf6b75 100644 --- a/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkWriterTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkWriterTest.java @@ -1,8 +1,8 @@ package com.getindata.connectors.http.internal.sink; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.concurrent.CompletableFuture; @@ -10,6 +10,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.connector.sink2.Sink.InitContext; +import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; import org.apache.flink.metrics.Counter; @@ -20,6 +21,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.verify; @@ -27,6 +29,8 @@ import com.getindata.connectors.http.internal.SinkHttpClient; import com.getindata.connectors.http.internal.SinkHttpClientResponse; +import com.getindata.connectors.http.internal.SinkHttpClientResponse.ResponseItem; +import com.getindata.connectors.http.internal.config.ResponseItemStatus; @Slf4j @ExtendWith(MockitoExtension.class) @@ -58,41 +62,259 @@ public void setUp() { when(metricGroup.getNumRecordsSendErrorsCounter()).thenReturn(errorCounter); when(metricGroup.getIOMetricGroup()).thenReturn(operatorIOMetricGroup); when(context.metricGroup()).thenReturn(metricGroup); + } + private void createHttpSinkWriter(DeliveryGuarantee deliveryGuarantee) { Collection> stateBuffer = new ArrayList<>(); this.httpSinkWriter = new HttpSinkWriter<>( - elementConverter, - context, - 10, - 10, - 100, - 10, - 10, - 10, - "http://localhost/client", - httpClient, - stateBuffer, - new Properties()); + elementConverter, + context, + 10, + 10, + 100, + 10, + 10, + 10, + deliveryGuarantee, + "http://localhost/client", + httpClient, + stateBuffer, + new Properties()); + } + + @Test + public void testErrorMetricWhenAllRequestsFailed() throws InterruptedException { + createHttpSinkWriter(DeliveryGuarantee.NONE); + + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new Exception("Test Exception")); + + when(httpClient.putRequests(anyList(), anyString())).thenReturn(future); + + HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes()); + HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes()); + Consumer> requestResult = + httpSinkRequestEntries -> log.info(String.valueOf(httpSinkRequestEntries)); + + List requestEntries = Arrays.asList(request1, request2); + this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult); + + // would be good to use Countdown Latch instead sleep... + Thread.sleep(2000); + verify(errorCounter).inc(2); + } + + @Test + public void testErrorMetricWhenFailureRequestsOccur() throws InterruptedException { + createHttpSinkWriter(DeliveryGuarantee.NONE); + + CompletableFuture future = new CompletableFuture<>(); + future.complete(new SinkHttpClientResponse( + Arrays.asList( + new ResponseItem(null, ResponseItemStatus.SUCCESS), + new ResponseItem(null, ResponseItemStatus.IGNORE), + new ResponseItem(null, ResponseItemStatus.TEMPORAL), + new ResponseItem(null, ResponseItemStatus.FAILURE)) + )); + + when(httpClient.putRequests(anyList(), anyString())).thenReturn(future); + + HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes()); + HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes()); + HttpSinkRequestEntry request3 = new HttpSinkRequestEntry("PUT", "lorem".getBytes()); + HttpSinkRequestEntry request4 = new HttpSinkRequestEntry("PUT", "ipsum".getBytes()); + Consumer> requestResult = + httpSinkRequestEntries -> log.info(String.valueOf(httpSinkRequestEntries)); + + List requestEntries = Arrays.asList(request1, request2, request3, request4); + this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult); + + // would be good to use Countdown Latch instead sleep... + Thread.sleep(2000); + verify(errorCounter).inc(1); + } + + @Test + public void testRetryWhenAllRequestsFailed() throws InterruptedException { + createHttpSinkWriter(DeliveryGuarantee.AT_LEAST_ONCE); + + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new Exception("Test Exception")); + + when(httpClient.putRequests(anyList(), anyString())).thenReturn(future); + + final List entriesToRetry = new ArrayList<>(); + + HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes()); + HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes()); + Consumer> requestResult = entriesToRetry::addAll; + + List requestEntries = Arrays.asList(request1, request2); + this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult); + + // would be good to use Countdown Latch instead sleep... + Thread.sleep(2000); + verify(errorCounter).inc(2); + assertEquals(2, entriesToRetry.size()); + } + + + @Test + public void testRetryWhenAPartOfRequestsFailed() throws InterruptedException { + createHttpSinkWriter(DeliveryGuarantee.AT_LEAST_ONCE); + + CompletableFuture future = new CompletableFuture<>(); + future.complete(new SinkHttpClientResponse( + Arrays.asList( + new ResponseItem(null, ResponseItemStatus.TEMPORAL), + new ResponseItem(null, ResponseItemStatus.SUCCESS)) + )); + + when(httpClient.putRequests(anyList(), anyString())).thenReturn(future); + + final List entriesToRetry = new ArrayList<>(); + + HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes()); + HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes()); + Consumer> requestResult = entriesToRetry::addAll; + + List requestEntries = Arrays.asList(request1, request2); + this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult); + + // would be good to use Countdown Latch instead sleep... + Thread.sleep(2000); + verify(errorCounter).inc(1); + assertEquals(1, entriesToRetry.size()); + } + + @Test + public void testTemporalRequestsWithNoneGuaranteeDoNotRetry() throws InterruptedException { + createHttpSinkWriter(DeliveryGuarantee.NONE); + + CompletableFuture future = new CompletableFuture<>(); + future.complete(new SinkHttpClientResponse( + Arrays.asList( + new ResponseItem(null, ResponseItemStatus.TEMPORAL), + new ResponseItem(null, ResponseItemStatus.SUCCESS)) + )); + + when(httpClient.putRequests(anyList(), anyString())).thenReturn(future); + + final List entriesToRetry = new ArrayList<>(); + + HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes()); + HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes()); + Consumer> requestResult = entriesToRetry::addAll; + + List requestEntries = Arrays.asList(request1, request2); + this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult); + + // would be good to use Countdown Latch instead sleep... + Thread.sleep(2000); + verify(errorCounter).inc(1); + assertEquals(0, entriesToRetry.size()); + } + + @Test + public void testAllSuccessfulRequests() throws InterruptedException { + createHttpSinkWriter(DeliveryGuarantee.NONE); + + CompletableFuture future = new CompletableFuture<>(); + future.complete(new SinkHttpClientResponse( + Arrays.asList( + new ResponseItem(null, ResponseItemStatus.SUCCESS), + new ResponseItem(null, ResponseItemStatus.SUCCESS)) + )); + + when(httpClient.putRequests(anyList(), anyString())).thenReturn(future); + + final List entriesToRetry = new ArrayList<>(); + + HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes()); + HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes()); + Consumer> requestResult = entriesToRetry::addAll; + + List requestEntries = Arrays.asList(request1, request2); + this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult); + + // would be good to use Countdown Latch instead sleep... + Thread.sleep(2000); + assertEquals(0, entriesToRetry.size()); + } + + @Test + public void testIgnoredRequests() throws InterruptedException { + createHttpSinkWriter(DeliveryGuarantee.NONE); + + CompletableFuture future = new CompletableFuture<>(); + future.complete(new SinkHttpClientResponse( + Arrays.asList( + new ResponseItem(null, ResponseItemStatus.IGNORE), + new ResponseItem(null, ResponseItemStatus.SUCCESS)) + )); + + when(httpClient.putRequests(anyList(), anyString())).thenReturn(future); + + final List entriesToRetry = new ArrayList<>(); + + HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes()); + HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes()); + Consumer> requestResult = entriesToRetry::addAll; + + List requestEntries = Arrays.asList(request1, request2); + this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult); + + // would be good to use Countdown Latch instead sleep... + Thread.sleep(2000); + assertEquals(0, entriesToRetry.size()); + } + + @Test + public void testFailureRequestsWithAtLeastOnceGuarantee() throws InterruptedException { + createHttpSinkWriter(DeliveryGuarantee.AT_LEAST_ONCE); + + CompletableFuture future = new CompletableFuture<>(); + future.complete(new SinkHttpClientResponse( + Arrays.asList( + new ResponseItem(null, ResponseItemStatus.FAILURE), + new ResponseItem(null, ResponseItemStatus.SUCCESS)) + )); + + when(httpClient.putRequests(anyList(), anyString())).thenReturn(future); + + HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes()); + HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes()); + Consumer> requestResult = + httpSinkRequestEntries -> log.info(String.valueOf(httpSinkRequestEntries)); + + List requestEntries = Arrays.asList(request1, request2); + this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult); + + // would be good to use Countdown Latch instead sleep... + Thread.sleep(2000); + verify(errorCounter).inc(1); } @Test - public void testErrorMetric() throws InterruptedException { + public void testUnsupportedDeliveryGuaranteeThrowsException() throws InterruptedException { + createHttpSinkWriter(DeliveryGuarantee.EXACTLY_ONCE); CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(new Exception("Test Exception")); when(httpClient.putRequests(anyList(), anyString())).thenReturn(future); - HttpSinkRequestEntry request = new HttpSinkRequestEntry("PUT", "hello".getBytes()); + HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes()); + HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes()); Consumer> requestResult = httpSinkRequestEntries -> log.info(String.valueOf(httpSinkRequestEntries)); - List requestEntries = Collections.singletonList(request); + List requestEntries = Arrays.asList(request1, request2); this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult); // would be good to use Countdown Latch instead sleep... Thread.sleep(2000); - verify(errorCounter).inc(requestEntries.size()); + verify(errorCounter).inc(2); } } diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/status/ComposeHttpStatusCodeCheckerTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/status/ComposeHttpStatusCodeCheckerDeprecationTest.java similarity index 75% rename from src/test/java/com/getindata/connectors/http/internal/sink/httpclient/status/ComposeHttpStatusCodeCheckerTest.java rename to src/test/java/com/getindata/connectors/http/internal/sink/httpclient/status/ComposeHttpStatusCodeCheckerDeprecationTest.java index 23baaead..4ed955f1 100644 --- a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/status/ComposeHttpStatusCodeCheckerTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/status/ComposeHttpStatusCodeCheckerDeprecationTest.java @@ -18,10 +18,10 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; -import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker; -import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig; +import com.getindata.connectors.http.internal.status.HttpResponseChecker; +import static com.getindata.connectors.http.internal.sink.httpclient.JavaNetSinkHttpClient.createHttpResponseChecker; -class ComposeHttpStatusCodeCheckerTest { +class ComposeHttpStatusCodeCheckerDeprecationTest { private static final String STRING_CODES = "403, 100,200, 300, , 303 ,200"; @@ -33,7 +33,7 @@ class ComposeHttpStatusCodeCheckerTest { .boxed() .collect(Collectors.toList()); - private ComposeHttpStatusCodeChecker codeChecker; + private HttpResponseChecker codeChecker; @BeforeAll public static void beforeAll() { @@ -53,14 +53,11 @@ private static Stream propertiesArguments() { @MethodSource("propertiesArguments") public void shouldPassOnDefault(Properties properties) { - ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); - - codeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); - + codeChecker = prepareCheckerConfig(properties); assertAll(() -> { assertThat(codeChecker.isErrorCode(100)).isFalse(); assertThat(codeChecker.isErrorCode(200)).isFalse(); - assertThat(codeChecker.isErrorCode(500)).isTrue(); + assertThat(codeChecker.isTemporalError(500)).isTrue(); assertThat(codeChecker.isErrorCode(501)).isTrue(); assertThat(codeChecker.isErrorCode(400)).isTrue(); assertThat(codeChecker.isErrorCode(404)).isTrue(); @@ -79,9 +76,7 @@ public void shouldParseWhiteList() { "1XX, 2XX, 3XX, 4XX, 5XX" ); - ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); - - codeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); + codeChecker = prepareCheckerConfig(properties); assertAll(() -> { CODES.forEach(code -> assertThat(codeChecker.isErrorCode(code)).isFalse()); @@ -102,9 +97,7 @@ public void shouldParseErrorCodeList() { HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST, STRING_CODES); - ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); - - codeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); + codeChecker = prepareCheckerConfig(properties); assertAll(() -> CODES.forEach(code -> assertThat(codeChecker.isErrorCode(code)).isTrue())); } @@ -118,9 +111,7 @@ public void shouldParseErrorCodeRange() { List codes = List.of(100, 110, 200, 220); - ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); - - codeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); + codeChecker = prepareCheckerConfig(properties); assertAll(() -> { codes.forEach(code -> assertThat(codeChecker.isErrorCode(code)).isTrue()); @@ -140,11 +131,9 @@ public void shouldThrowOnInvalidCodeRange(String listCode) { properties.setProperty( HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST, listCode); - ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); - assertThrows( Exception.class, - () -> new ComposeHttpStatusCodeChecker(checkerConfig) + () -> prepareCheckerConfig(properties) ); } @@ -161,11 +150,7 @@ private static Properties prepareErrorCodeProperties(String errorCodeList, Strin return properties; } - private ComposeHttpStatusCodeCheckerConfig prepareCheckerConfig(Properties properties) { - return ComposeHttpStatusCodeCheckerConfig.builder() - .properties(properties) - .whiteListPrefix(HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODE_WHITE_LIST) - .errorCodePrefix(HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST) - .build(); + private HttpResponseChecker prepareCheckerConfig(Properties properties) { + return createHttpResponseChecker(properties); } } diff --git a/src/test/java/com/getindata/connectors/http/internal/status/HttpResponseCheckerTest.java b/src/test/java/com/getindata/connectors/http/internal/status/HttpResponseCheckerTest.java index 4cc27052..6dd41397 100644 --- a/src/test/java/com/getindata/connectors/http/internal/status/HttpResponseCheckerTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/status/HttpResponseCheckerTest.java @@ -23,13 +23,19 @@ class HttpResponseCheckerTest { @Test void failWhenTheSameCodeIsMarkedSuccessAndError() { assertThrows(ConfigurationException.class, - () -> new HttpResponseChecker(Set.of(404), Set.of(404))); + () -> new HttpResponseChecker(Set.of(404), Set.of(404), emptySet())); + } + + @Test + void failWhenTheSameCodeIsMarkedIgnoreAndError() { + assertThrows(ConfigurationException.class, + () -> new HttpResponseChecker(Set.of(200), Set.of(404), Set.of(404))); } @Test void failWhenSuccessListIsEmpty() { assertThrows(ConfigurationException.class, - () -> new HttpResponseChecker(emptySet(), Set.of(500))); + () -> new HttpResponseChecker(emptySet(), Set.of(500), emptySet())); } private static Stream testData() { @@ -50,7 +56,7 @@ private static Stream testData() { @ParameterizedTest @MethodSource("testData") void verifyCodes(InputArgs inputArgs) throws ConfigurationException { - var checker = new HttpResponseChecker("2XX,404,!202", "4XX,!404,500,501,502,!409"); + var checker = new HttpResponseChecker("2XX,!202", "4XX,!404,500,501,502,!409", "404"); var response = inputArgs.getResponse(); switch (inputArgs.getCodeType()) { @@ -81,6 +87,67 @@ private void assertTemporalError(HttpResponseChecker checker, HttpResponse re private void assertError(HttpResponseChecker checker, HttpResponse response) { assertFalse(checker.isSuccessful(response)); assertFalse(checker.isTemporalError(response)); + assertTrue(checker.isErrorCode(response)); + } + + @Test + void testIsIgnoreCode() throws ConfigurationException { + var checker = new HttpResponseChecker("2XX", "5XX", "404"); + var response404 = mock(HttpResponse.class); + when(response404.statusCode()).thenReturn(404); + + assertTrue(checker.isIgnoreCode(response404)); + assertTrue(checker.isSuccessful(response404)); // Ignore codes are considered successful + assertFalse(checker.isTemporalError(response404)); + assertFalse(checker.isErrorCode(response404)); + } + + @Test + void testIsErrorCode() throws ConfigurationException { + var checker = new HttpResponseChecker("2XX", "5XX", "404"); + var response400 = mock(HttpResponse.class); + when(response400.statusCode()).thenReturn(400); + + assertTrue(checker.isErrorCode(response400)); + assertFalse(checker.isSuccessful(response400)); + assertFalse(checker.isTemporalError(response400)); + assertFalse(checker.isIgnoreCode(response400)); + } + + @Test + void testNullSuccessCodeExprThrowsNullPointerException() { + assertThrows(NullPointerException.class, + () -> new HttpResponseChecker(null, "5XX", "404")); + } + + @Test + void testNullTemporalErrorCodeExprThrowsNullPointerException() { + assertThrows(NullPointerException.class, + () -> new HttpResponseChecker("2XX", null, "404")); + } + + @Test + void testNullIgnoreCodeExprThrowsNullPointerException() { + assertThrows(NullPointerException.class, + () -> new HttpResponseChecker("2XX", "5XX", null)); + } + + @Test + void testNullSuccessCodesSetThrowsNullPointerException() { + assertThrows(NullPointerException.class, + () -> new HttpResponseChecker(null, Set.of(500), Set.of(404))); + } + + @Test + void testNullTemporalErrorCodesSetThrowsNullPointerException() { + assertThrows(NullPointerException.class, + () -> new HttpResponseChecker(Set.of(200), null, Set.of(404))); + } + + @Test + void testNullIgnoreCodesSetThrowsNullPointerException() { + assertThrows(NullPointerException.class, + () -> new HttpResponseChecker(Set.of(200), Set.of(500), null)); } @RequiredArgsConstructor diff --git a/src/test/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicTableSinkFactoryTest.java b/src/test/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicTableSinkFactoryTest.java index 269ee87b..6f6d0c4d 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicTableSinkFactoryTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicTableSinkFactoryTest.java @@ -91,4 +91,44 @@ public void nonexistentOptionsTest() { assertThrows(ValidationException.class, () -> tEnv.executeSql("INSERT INTO http VALUES (1)").await()); } + + @Test + public void invalidSinkDeliveryGuaranteeOptionTests() { + final String invalidOptionCreateSql = + String.format( + "CREATE TABLE http (\n" + + " id bigint\n" + + ") with (\n" + + " 'connector' = '%s',\n" + + " 'url' = '%s',\n" + + " 'format' = 'json',\n" + + " 'sink.delivery-guarantee' = 'invalid'\n" + + ")", + HttpDynamicTableSinkFactory.IDENTIFIER, + "http://localhost/" + ); + tEnv.executeSql(invalidOptionCreateSql); + assertThrows(ValidationException.class, + () -> tEnv.executeSql("INSERT INTO http VALUES (1)").await()); + } + + @Test + public void exactlyOnceDeliveryGuaranteeNotSupported() { + final String exactlyOnceCreateSql = + String.format( + "CREATE TABLE http (\n" + + " id bigint\n" + + ") with (\n" + + " 'connector' = '%s',\n" + + " 'url' = '%s',\n" + + " 'format' = 'json',\n" + + " 'sink.delivery-guarantee' = 'exactly-once'\n" + + ")", + HttpDynamicTableSinkFactory.IDENTIFIER, + "http://localhost/" + ); + tEnv.executeSql(exactlyOnceCreateSql); + assertThrows(ValidationException.class, + () -> tEnv.executeSql("INSERT INTO http VALUES (1)").await()); + } }