Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions docs/content/docs/connectors/datastream/kinesis.md
Original file line number Diff line number Diff line change
Expand Up @@ -369,10 +369,6 @@ This may require a different retry strategy compared to other API calls used dur
- *[RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)*:
this is called once per stream during stream consumer registration, unless the `SELF_MANAGED` consumer lifecycle is configured.

- *[DeregisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)*:
this is called once per stream during stream consumer deregistration, unless the `SELF_MANAGED` registration strategy is configured.


## Kinesis Consumer

{{< hint warning >}}
Expand Down
8 changes: 0 additions & 8 deletions docs/content/docs/connectors/table/kinesis.md
Original file line number Diff line number Diff line change
Expand Up @@ -436,14 +436,6 @@ Connector Options
<td>Duration</td>
<td>Only applicable to EFO <code>ReaderType</code>. Timeout for EFO Consumer subscription.</td>
</tr>
<tr>
<td><h5>source.efo.deregister.timeout</h5></td>
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">10 s</td>
<td>Duration</td>
<td>Only applicable to EFO <code>ReaderType</code>. Timeout for consumer deregistration. When timeout is reached, code will continue as per normal.</td>
</tr>
<tr>
<td><h5>source.efo.describe.retry-strategy.attempts.max</h5></td>
<td>optional</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,4 @@ public enum ConsumerLifecycle {
.defaultValue(100)
.withDescription(
"Maximum number of attempts for the exponential backoff retry strategy");

public static final ConfigOption<Duration> EFO_DEREGISTER_CONSUMER_TIMEOUT =
ConfigOptions.key("source.efo.deregister.timeout")
.durationType()
.defaultValue(Duration.ofMillis(10000))
.withDescription(
"Timeout for consumer deregistration. When timeout is reached, code will continue as per normal.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,10 @@
import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse;
import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;

import java.time.Instant;

import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.ConsumerLifecycle.JOB_MANAGED;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_LIFECYCLE;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_NAME;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DEREGISTER_CONSUMER_TIMEOUT;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.READER_TYPE;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.ReaderType.EFO;

Expand Down Expand Up @@ -108,39 +104,17 @@ public void registerStreamConsumer() {
}
}

/** De-registers stream consumer from specified stream, if needed. */
/**
* Stream consumer de-registration is intentionally skipped for JOB_MANAGED and SELF_MANAGED
* stream consumer lifecycles.
*
* <p>For the JOB_MANAGED consumer lifecycle, consumer de-registration is skipped to avoid
* race-conditions on subsequent application start up (FLINK-37908).
*
* <p>For the SELF_MANAGED consumer lifecycle, consumer de-registration is deferred to the user.
*/
public void deregisterStreamConsumer() {
if (sourceConfig.get(READER_TYPE) == EFO
&& sourceConfig.get(EFO_CONSUMER_LIFECYCLE) == JOB_MANAGED) {
LOG.info("De-registering stream consumer - {}", consumerArn);
if (consumerArn == null) {
LOG.warn(
"Unable to deregister stream consumer as there was no consumer ARN stored in the StreamConsumerRegistrar. There may be leaked EFO consumers on the Kinesis stream.");
return;
}
kinesisStreamProxy.deregisterStreamConsumer(consumerArn);
LOG.info("De-registered stream consumer - {}", consumerArn);

Instant timeout = Instant.now().plus(sourceConfig.get(EFO_DEREGISTER_CONSUMER_TIMEOUT));
String consumerName = getConsumerNameFromArn(consumerArn);
while (Instant.now().isBefore(timeout)) {
try {
DescribeStreamConsumerResponse response =
kinesisStreamProxy.describeStreamConsumer(streamArn, consumerName);
LOG.info(
"Waiting for stream consumer to be deregistered - {} {} {}",
streamArn,
consumerName,
response.consumerDescription().consumerStatusAsString());

} catch (ResourceNotFoundException e) {
LOG.info("Stream consumer {} has been deregistered", consumerArn);
return;
}
}
LOG.warn(
"Timed out waiting for stream consumer to be deregistered. There may be leaked EFO consumers on the Kinesis stream.");
}
// Do nothing.
}

private String getConsumerNameFromArn(String consumerArn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;

import java.time.Duration;

import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.ConsumerLifecycle.JOB_MANAGED;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.ConsumerLifecycle.SELF_MANAGED;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_LIFECYCLE;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_NAME;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DEREGISTER_CONSUMER_TIMEOUT;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.READER_TYPE;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.ReaderType.EFO;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.ReaderType.POLLING;
Expand Down Expand Up @@ -97,6 +94,25 @@ void testDeregisterStreamConsumerSkippedWhenNotEfo() {
.containsExactly(CONSUMER_NAME);
}

@Test
void testRegisterStreamConsumerSkippedWhenEfo() {
// Given JOB_MANAGED consumer lifecycle
sourceConfiguration.set(READER_TYPE, EFO);
sourceConfiguration.set(EFO_CONSUMER_LIFECYCLE, JOB_MANAGED);
// And consumer is registered
sourceConfiguration.set(EFO_CONSUMER_NAME, CONSUMER_NAME);
streamConsumerRegistrar.registerStreamConsumer();
assertThat(testKinesisStreamProxy.getRegisteredConsumers(STREAM_ARN))
.containsExactly(CONSUMER_NAME);

// When deregisterStreamConsumer is called
// Then we skip deregistration of the consumer
assertThatNoException()
.isThrownBy(() -> streamConsumerRegistrar.deregisterStreamConsumer());
assertThat(testKinesisStreamProxy.getRegisteredConsumers(STREAM_ARN))
.containsExactly(CONSUMER_NAME);
}

@Test
void testRegisterStreamConsumerSkippedWhenSelfManaged() {
// Given SELF_MANAGED consumer lifecycle
Expand Down Expand Up @@ -176,43 +192,4 @@ void testRegisterStreamConsumerHandledGracefullyWhenConsumerExists() {
assertThat(testKinesisStreamProxy.getRegisteredConsumers(STREAM_ARN))
.containsExactly(CONSUMER_NAME);
}

@Test
void testDeregisterStreamConsumerWhenJobManaged() {
// Given JOB_MANAGED consumer lifecycle
sourceConfiguration.set(READER_TYPE, EFO);
sourceConfiguration.set(EFO_CONSUMER_LIFECYCLE, JOB_MANAGED);
// And consumer is registered
sourceConfiguration.set(EFO_CONSUMER_NAME, CONSUMER_NAME);
streamConsumerRegistrar.registerStreamConsumer();
assertThat(testKinesisStreamProxy.getRegisteredConsumers(STREAM_ARN))
.containsExactly(CONSUMER_NAME);

// When deregisterStreamConsumer is called
streamConsumerRegistrar.deregisterStreamConsumer();

// Then consumer is deregistered
assertThat(testKinesisStreamProxy.getRegisteredConsumers(STREAM_ARN)).hasSize(0);
}

@Test
void testDeregisterStreamConsumerProceedsWhenTimeoutDeregistering() {
// Given JOB_MANAGED consumer lifecycle
sourceConfiguration.set(READER_TYPE, EFO);
sourceConfiguration.set(EFO_CONSUMER_LIFECYCLE, JOB_MANAGED);
sourceConfiguration.set(EFO_DEREGISTER_CONSUMER_TIMEOUT, Duration.ofMillis(50));
// And consumer is registered
sourceConfiguration.set(EFO_CONSUMER_NAME, CONSUMER_NAME);
streamConsumerRegistrar.registerStreamConsumer();
assertThat(testKinesisStreamProxy.getRegisteredConsumers(STREAM_ARN))
.containsExactly(CONSUMER_NAME);
// And consumer is stuck in DELETING
testKinesisStreamProxy.setConsumersCurrentlyDeleting(CONSUMER_NAME);

// When deregisterStreamConsumer is called
streamConsumerRegistrar.deregisterStreamConsumer();

// Then consumer is deregistered
assertThat(testKinesisStreamProxy.getRegisteredConsumers(STREAM_ARN)).hasSize(0);
}
}