Skip to content

Conversation

@vbabenkoru
Copy link

@vbabenkoru vbabenkoru commented Oct 30, 2025

Purpose of the change

Fix the race condition issue that occasionally happens (about 1-2% probability per partition) because the connector is creating a dummy consumer to seek to the right cursor position, closes it and immediately after that creates the real consumer. It leads to a race condition where the previous consumer is not fully released on the broker side, and the broker responds with Exclusive consumer is already connected , which leads to the job being restarted. In our case we were subscribing to thousands of topics, so the job would continuously restart for hours until it reaches an attempt where none of the topics hit this race condition.
I believe this may be a regression from #59. The reason we have to create a separate consumer to seek is described in PIP-194. Basically it looks like there isn't a way to create a consumer with the cursor already set: if we create it and then call seek, some messages may still leak through in between. Maybe StreamNative knows of another way, but it seems like PIP-194 is not adopted/implemented so we have to seek before creating the real consumer.

Brief change log

  • Add a retry with a 2-second delay when creating a consumer for a split, to handle a race condition from creating two consumers in quick succession.

Verifying this change

Please make sure both new and modified tests in this PR follows the conventions defined in our code quality
guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing

  • Manually verified by running the Pulsar connector on a production-scale Flink cluster with thousands of topics.

Significant changes

(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for
convenience.)

  • Dependencies have been added or upgraded
  • Public API has been changed (Public API is any class annotated with @Public(Evolving))
  • Serializers have been changed
  • New feature has been introduced
    • If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)

@boring-cyborg
Copy link

boring-cyborg bot commented Oct 30, 2025

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

@vbabenkoru
Copy link
Author

vbabenkoru commented Oct 30, 2025

FWIW I just randomly picked 2 seconds and that was enough to fix the issue for us, but I'm looking for input on what the best way to handle this may be.
Also open to suggestions re: what kind of tests could help cover it. I'm not sure if there is an easy way to simulate Pulsar client errors..

@vbabenkoru vbabenkoru changed the title Fix a race condition in consumer creation by adding a retry with delay [FLINK-38600] Fix a race condition in consumer creation by adding a retry with delay Oct 31, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant