Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## [Unreleased]

- Added HTTP Sink request retries with delivery guarantee support (`sink.delivery-guarantee`).
- Added AIMD rate limiting strategy for HTTP Sink backpressure management.
- Added new HTTP Sink configuration options: `gid.connector.http.sink.success-codes`, `gid.connector.http.sink.retry-codes`, and `gid.connector.http.sink.ignored-response-codes`.
- Amend to not log HTTP request response and header values by default.
- Added http 2 support.

Expand Down
37 changes: 23 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -451,17 +451,25 @@ is provided.

## HTTP status code handler
### Sink table
You can configure a list of HTTP status codes that should be treated as errors for HTTP sink table.
By default all 400 and 500 response codes will be interpreted as error code.

This behavior can be changed by using below properties in table definition (DDL) or passing it via `setProperty' method from Sink's builder. The property name are:
- `gid.connector.http.sink.error.code` used to defined HTTP status code value that should be treated as error for example 404.
Many status codes can be defined in one value, where each code should be separated with comma, for example:
`401, 402, 403`. User can use this property also to define a type code mask. In that case, all codes from given HTTP response type will be treated as errors.
An example of such a mask would be `3XX, 4XX, 5XX`. In this case, all 300s, 400s and 500s status codes will be treated as errors.
- `gid.connector.http.sink.error.code.exclude` used to exclude a HTTP code from error list.
Many status codes can be defined in one value, where each code should be separated with comma, for example:
`401, 402, 403`. In this example, codes 401, 402 and 403 would not be interpreted as error codes.
You can configure HTTP status code handling for HTTP sink table and enable automatic retries with delivery guarantees.

#### Retries and delivery guarantee
HTTP Sink supports automatic retries when `sink.delivery-guarantee` is set to `at-least-once`. Failed requests will be automatically retried based on the configured status codes.
Copy link
Collaborator

@davidradl davidradl Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sink.delivery-guarantee is interesting and looks like it is a bringing this connector in line with others. When do we think we will get more that one request issued? I assume we would need more processing for exactly once.

Copy link
Author

@jonathanlehto jonathanlehto Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am lifting a big part of the code from an earlier PR, which was the source of using a delivery guarantee. However, for my personal use case, I do in fact need to guarantee at least once behavior 🙂. The current implementation has an indefinite number of retries. If we don't have the delivery guarantee, we'll need to have something like number of attempts imo. I'm fine with whatever though! I just really need retries!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly once is pretty challenging given that we are using HTTP requests. Imo, it would require coordination with the receiving service to response with some kind of ack in order to enforce it. I don't believe exactly once can be achieved on just the client side with HTTP requests if that makes sense. However, duplicated success message emissions should be really infrequent

- When `sink.delivery-guarantee` is `at-least-once`: Failed requests are retried automatically using AIMD (Additive Increase Multiplicative Decrease) rate limiting strategy.
- When `sink.delivery-guarantee` is `none` (default): Failed requests are logged but not retried.

The sink categorizes HTTP responses into groups:
- Success codes (`gid.connector.http.sink.success-codes`): Expected successful responses.
- Retry codes (`gid.connector.http.sink.retry-codes`): Transient errors that trigger automatic retries when using `at-least-once` delivery guarantee.
- Ignored responses (`gid.connector.http.sink.ignored-response-codes`): Responses whose content is ignored but treated as successful.
- Error codes: Any response code not classified in the above groups.

Parameters support whitelisting and blacklisting: `2XX,404,!203` means all codes from 200-299, plus 404, except 203.

#### Legacy error code configuration
For backward compatibility, you can use the legacy properties:
- `gid.connector.http.sink.error.code` - HTTP status codes treated as errors (supports masks like `3XX, 4XX, 5XX`).
- `gid.connector.http.sink.error.code.exclude` - HTTP codes to exclude from the error list.

### Source table
The source table categorizes HTTP responses into three groups based on status codes:
Expand Down Expand Up @@ -612,13 +620,17 @@ be requested if the current time is later than the cached token expiry time minu
| url | required | The base URL that should be use for HTTP requests. For example _http://localhost:8080/client_. |
| insert-method | optional | Specify which HTTP method to use in the request. The value should be set either to `POST` or `PUT`. |
| sink.batch.max-size | optional | Maximum number of elements that may be passed in a batch to be written downstream. |
| sink.delivery-guarantee | optional | Defines the delivery semantic for the HTTP sink. Accepted enumerations are 'at-least-once', and 'none' (actually 'none' is the same as 'at-most-once'. 'exactly-once' semantic is not supported. |
| sink.requests.max-inflight | optional | The maximum number of in flight requests that may exist, if any more in flight requests need to be initiated once the maximum has been reached, then it will be blocked until some have completed. |
| sink.requests.max-buffered | optional | Maximum number of buffered records before applying backpressure. |
| sink.flush-buffer.size | optional | The maximum size of a batch of entries that may be sent to the HTTP endpoint measured in bytes. |
| sink.flush-buffer.timeout | optional | Threshold time in milliseconds for an element to be in a buffer before being flushed. |
| gid.connector.http.sink.request-callback | optional | Specify which `HttpPostRequestCallback` implementation to use. By default, it is set to `slf4j-logger` corresponding to `Slf4jHttpPostRequestCallback`. |
| gid.connector.http.sink.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Sink, separated with comma. |
| gid.connector.http.sink.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.sink.error.code` list, separated with comma. |
| gid.connector.http.sink.success-codes | optional | Comma separated http codes considered as success response. Use [1-5]XX for groups and '!' character for excluding. |
| gid.connector.http.sink.retry-codes | optional | Comma separated http codes considered as transient errors that will trigger retries. Use [1-5]XX for groups and '!' character for excluding. Only used when `sink.delivery-guarantee` is set to `at-least-once`. |
| gid.connector.http.sink.ignored-response-codes | optional | Comma separated http codes. Content for these responses will be ignored. Use [1-5]XX for groups and '!' character for excluding. |
| gid.connector.http.security.cert.server | optional | Path to trusted HTTP server certificate that should be add to connectors key store. More than one path can be specified using `,` as path delimiter. |
| gid.connector.http.security.cert.client | optional | Path to trusted certificate that should be used by connector's HTTP client for mTLS communication. |
| gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. |
Expand Down Expand Up @@ -736,9 +748,6 @@ The mapping from Http Json Response to SQL table schema is done via Flink's Json
### HTTP TableLookup Source
- Check other `//TODO`'s.

### HTTP Sink
- Make `HttpSink` retry the failed requests. Currently, it does not retry those at all, only adds their count to the `numRecordsSendErrors` metric. It should be thoroughly thought over how to do it efficiently and then implemented.

###
[1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/queries/joins/#lookup-join
</br>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.getindata.connectors.http;

import java.util.List;

import lombok.Getter;

import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;

@Getter
public class BatchHttpStatusCodeValidationFailedException extends Exception {
List<HttpRequest> failedRequests;

public BatchHttpStatusCodeValidationFailedException(String message, List<HttpRequest> failedRequests) {
super(message);
this.failedRequests = failedRequests;
}
}
3 changes: 3 additions & 0 deletions src/main/java/com/getindata/connectors/http/HttpSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Properties;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.base.sink.writer.ElementConverter;

import com.getindata.connectors.http.internal.HeaderPreprocessor;
Expand Down Expand Up @@ -41,6 +42,7 @@ public class HttpSink<InputT> extends HttpSinkInternal<InputT> {
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
DeliveryGuarantee deliveryGuarantee,
String endpointUrl,
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
HeaderPreprocessor headerPreprocessor,
Expand All @@ -54,6 +56,7 @@ public class HttpSink<InputT> extends HttpSinkInternal<InputT> {
maxBatchSizeInBytes,
maxTimeInBufferMS,
maxRecordSizeInBytes,
deliveryGuarantee,
endpointUrl,
httpPostRequestCallback,
headerPreprocessor,
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import java.util.Properties;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.util.Preconditions;

import com.getindata.connectors.http.internal.HeaderPreprocessor;
import com.getindata.connectors.http.internal.SinkHttpClient;
Expand Down Expand Up @@ -71,6 +73,8 @@ public class HttpSinkBuilder<InputT> extends

private final Properties properties = new Properties();

private DeliveryGuarantee deliveryGuarantee;

// Mandatory field
private String endpointUrl;

Expand All @@ -92,6 +96,17 @@ public class HttpSinkBuilder<InputT> extends
this.headerPreprocessor = DEFAULT_HEADER_PREPROCESSOR;
}

/**
* @param deliveryGuarantee HTTP Sink delivery guarantee
* @return {@link HttpSinkBuilder} itself
*/
public HttpSinkBuilder<InputT> setDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) {
Preconditions.checkArgument(deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE,
"Only at-least-once and none delivery guarantees are supported.");
this.deliveryGuarantee = deliveryGuarantee;
return this;
}

/**
* @param endpointUrl the URL of the endpoint
* @return {@link HttpSinkBuilder} itself
Expand Down Expand Up @@ -181,6 +196,7 @@ public HttpSink<InputT> build() {
Optional.ofNullable(getMaxBatchSizeInBytes()).orElse(DEFAULT_MAX_BATCH_SIZE_IN_B),
Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS),
Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B),
Optional.ofNullable(deliveryGuarantee).orElse(DeliveryGuarantee.NONE),
endpointUrl,
httpPostRequestCallback,
headerPreprocessor,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,62 @@
package com.getindata.connectors.http.internal;

import java.util.List;
import java.util.stream.Collectors;

import lombok.Data;
import lombok.NonNull;
import lombok.ToString;

import com.getindata.connectors.http.internal.config.ResponseItemStatus;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;

/**
* Data class holding {@link HttpSinkRequestEntry} instances that {@link SinkHttpClient} attempted
* to write, divided into two lists &mdash; successful and failed ones.
* to write.
*/
@Data
@ToString
public class SinkHttpClientResponse {

/**
* A list of successfully written requests.
* A list of requests along with write status.
*/
@NonNull
private final List<HttpRequest> successfulRequests;
private final List<ResponseItem> requests;

/**
* A list of requests that {@link SinkHttpClient} failed to write.
*/
@NonNull
private final List<HttpRequest> failedRequests;
public List<HttpRequest> getSuccessfulRequests() {
return requests.stream()
.filter(r -> r.getStatus().equals(ResponseItemStatus.SUCCESS))
.map(ResponseItem::getRequest)
.collect(Collectors.toList());
}

public List<HttpRequest> getFailedRequests() {
return requests.stream()
.filter(r -> r.getStatus().equals(ResponseItemStatus.FAILURE))
.map(ResponseItem::getRequest)
.collect(Collectors.toList());
}

public List<HttpRequest> getTemporalRequests() {
return requests.stream()
.filter(r -> r.getStatus().equals(ResponseItemStatus.TEMPORAL))
.map(ResponseItem::getRequest)
.collect(Collectors.toList());
}

public List<HttpRequest> getIgnoredRequests() {
return requests.stream()
.filter(r -> r.getStatus().equals(ResponseItemStatus.IGNORE))
.map(ResponseItem::getRequest)
.collect(Collectors.toList());
}

@Data
@ToString
public static class ResponseItem {
private final HttpRequest request;
private final ResponseItemStatus status;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public final class HttpConnectorConfigConstants {
*/
public static final String GID_CONNECTOR_HTTP = "gid.connector.http.";
private static final String SOURCE_LOOKUP_PREFIX = GID_CONNECTOR_HTTP + "source.lookup.";
private static final String SINK_PREFIX = GID_CONNECTOR_HTTP + "sink.";

/**
* A property prefix for http connector header properties
Expand All @@ -45,9 +46,14 @@ public final class HttpConnectorConfigConstants {
public static final String RESULT_TYPE = SOURCE_LOOKUP_PREFIX + "result-type";

// --------- Error code handling configuration ---------
public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST = GID_CONNECTOR_HTTP + "sink.error.code.exclude";

public static final String HTTP_ERROR_SINK_CODES_LIST = GID_CONNECTOR_HTTP + "sink.error.code";
public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST = SINK_PREFIX + "error.code.exclude";
public static final String HTTP_ERROR_SINK_CODES_LIST = SINK_PREFIX + "error.code";

public static final String SINK_SUCCESS_CODES = SINK_PREFIX + "success-codes";
public static final String SINK_RETRY_CODES = SINK_PREFIX + "retry-codes";
public static final String SINK_IGNORE_RESPONSE_CODES = SINK_PREFIX + "ignored-response-codes";

// -----------------------------------------------------

public static final String SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.getindata.connectors.http.internal.config;

public enum ResponseItemStatus {
SUCCESS("success"),
TEMPORAL("temporal"),
IGNORE("ignore"),
FAILURE("failure");

private final String status;

ResponseItemStatus(String status) {
this.status = status;
}

public String getStatus() {
return status;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Collections;
import java.util.Properties;

import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
Expand Down Expand Up @@ -61,6 +62,8 @@ public class HttpSinkInternal<InputT> extends AsyncSinkBase<InputT, HttpSinkRequ

private final String endpointUrl;

private final DeliveryGuarantee deliveryGuarantee;

// having Builder instead of an instance of `SinkHttpClient`
// makes it possible to serialize `HttpSink`
private final SinkHttpClientBuilder sinkHttpClientBuilder;
Expand All @@ -79,6 +82,7 @@ protected HttpSinkInternal(
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
DeliveryGuarantee deliveryGuarantee,
String endpointUrl,
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
HeaderPreprocessor headerPreprocessor,
Expand All @@ -94,9 +98,9 @@ protected HttpSinkInternal(
maxTimeInBufferMS,
maxRecordSizeInBytes
);

Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(endpointUrl),
"The endpoint URL must be set when initializing HTTP Sink.");
this.deliveryGuarantee = deliveryGuarantee;
this.endpointUrl = endpointUrl;
this.httpPostRequestCallback =
Preconditions.checkNotNull(
Expand Down Expand Up @@ -132,6 +136,7 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> cr
getMaxBatchSizeInBytes(),
getMaxTimeInBufferMS(),
getMaxRecordSizeInBytes(),
deliveryGuarantee,
endpointUrl,
sinkHttpClientBuilder.build(
properties,
Expand Down Expand Up @@ -159,6 +164,7 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> re
getMaxBatchSizeInBytes(),
getMaxTimeInBufferMS(),
getMaxRecordSizeInBytes(),
deliveryGuarantee,
endpointUrl,
sinkHttpClientBuilder.build(
properties,
Expand Down
Loading
Loading