Skip to content

Conversation

@Nikita-Shupletsov
Copy link
Contributor

@Nikita-Shupletsov Nikita-Shupletsov commented Feb 7, 2026

Cherry-pick of #21365.

This PR fixes a bug when KS doesn't close stores if the shutdown was
triggered during rebalance where an active tasks gets converted to a
standby one and put into pendingTasksToInit

  • Added logic to close pending tasks to init.
  • Made standby task closure similar to the one for active tasks.
  • Added a separate method for getting standby tasks from task registry.
  • Added an integration test that reproduces the issue.

Reviewers: Matthias J. Sax matthias@confluent.io

Conflicts:
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
Additional changes:
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RebalanceTaskClosureIntegrationTest.java

This PR fixes a bug when KS doesn't close stores if the shutdown was
triggered during rebalance where an active tasks gets converted to a
standby one and put into pendingTasksToInit

* Added logic to close pending tasks to init.
* Made standby task closure similar to the one for active tasks.
* Added a separate method for getting standby tasks from task registry.
* Added an integration test that reproduces the issue.

Reviewers: Matthias J. Sax <matthias@confluent.io>

---------

Co-authored-by: Matthias J. Sax <mjsax@apache.org>
 Conflicts:
	streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@Nikita-Shupletsov
Copy link
Contributor Author

cherry pick of #21365

// and starting the shutdown directly
// We don't want to let the rebalance finish before we trigger the shutdown, because we want the stream thread to stop before it gets to moving pending tasks from task registry to state updater.
streams1.close(new KafkaStreams.CloseOptions().leaveGroup(true));
streams2.close(new KafkaStreams.CloseOptions().leaveGroup(true));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't work as expected (that's why it was change in newer release). It only works for static members.

In 4.0, there is an internal config that we should use instead.

@mjsax mjsax merged commit 7041ed3 into apache:4.1 Feb 10, 2026
18 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants