Skip to content

Conversation

@Nikita-Shupletsov
Copy link
Contributor

@Nikita-Shupletsov Nikita-Shupletsov commented Feb 6, 2026

Cherry-pick of #21365.

This PR fixes a bug when KS doesn't close stores if the shutdown was triggered during rebalance where an active tasks gets converted to a standby one and put into pendingTasksToInit

  • Added logic to close pending tasks to init.
  • Made standby task closure similar to the one for active tasks.
  • Added a separate method for getting standby tasks from task registry.
  • Added an integration test that reproduces the issue.

Conflicts:
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
Additional changes:
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RebalanceTaskClosureIntegrationTest.java

Reviewers: Matthias J. Sax matthias@confluent.io

This PR fixes a bug when KS doesn't close stores if the shutdown was
triggered during rebalance where an active tasks gets converted to a
standby one and put into pendingTasksToInit

* Added logic to close pending tasks to init.
* Made standby task closure similar to the one for active tasks.
* Added a separate method for getting standby tasks from task registry.
* Added an integration test that reproduces the issue.

Reviewers: Matthias J. Sax <matthias@confluent.io>

---------

Co-authored-by: Matthias J. Sax <mjsax@apache.org>
@Nikita-Shupletsov
Copy link
Contributor Author

cherry pick of #21365

@viktorsomogyi
Copy link
Contributor

viktorsomogyi commented Feb 9, 2026

@Nikita-Shupletsov it seems like there are compilation errors:

Error:  /home/runner/work/kafka/kafka/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RebalanceTaskClosureIntegrationTest.java:25:  error: cannot find symbol
Error:  /home/runner/work/kafka/kafka/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RebalanceTaskClosureIntegrationTest.java:163:  error: package CloseOptions does not exist
Error:  /home/runner/work/kafka/kafka/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RebalanceTaskClosureIntegrationTest.java:163:  error: cannot find symbol

Since tomorrow would be the code freeze for 4.0.2, would you please take a look?

// and starting the shutdown directly
// We don't want to let the rebalance finish before we trigger the shutdown, because we want the stream thread to stop before it gets to moving pending tasks from task registry to state updater.
streams1.close(new KafkaStreams.CloseOptions().leaveGroup(true));
streams2.close(new KafkaStreams.CloseOptions().leaveGroup(true));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't work as expected (that's why it was change in newer release). It only works for static members.

In 4.0, there is an internal config that we should use instead.

@mjsax mjsax added the streams label Feb 9, 2026
@mjsax mjsax merged commit 4ed0cf8 into apache:4.0 Feb 10, 2026
11 checks passed
@Nikita-Shupletsov
Copy link
Contributor Author

@viktorsomogyi, @mjsax
thanks a lot folks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants