Skip to content

fix(py): Handle stream task cancellation without crashing callback#4953

Draft
huangjeff5 wants to merge 2 commits intomainfrom
jh-fastapi-fix
Draft

fix(py): Handle stream task cancellation without crashing callback#4953
huangjeff5 wants to merge 2 commits intomainfrom
jh-fastapi-fix

Conversation

@huangjeff5
Copy link
Contributor

@huangjeff5 huangjeff5 commented Mar 19, 2026

When the stream task is cancelled (e.g., client disconnects during streaming), the channel.closed done callback previously called result() on a cancelled future, which raised CancelledError and crashed the callback.

Fix: Add _propagate_closed_to_result that checks cancelled/exception/result before acting. When channel.closed is cancelled, cancel result_future instead of calling result(). Propagates cancellation cleanly to consumers.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request primarily focuses on enhancing the robustness of asynchronous stream handling within the genkit library. While the pull request title and description refer to a JSON serialization issue, the actual code changes address a different concern: preventing crashes in stream channel callbacks when an asyncio.Future is cancelled. The updated implementation ensures that cancellations and exceptions are gracefully propagated, improving the stability of streaming operations. A new test case has been added to validate this improved cancellation handling.

Highlights

  • Stream Cancellation Handling: Improved the handling of stream task cancellations in _action.py to prevent callbacks from crashing when an asyncio.Future is cancelled, ensuring more robust asynchronous operations.
  • Robust Callback Logic: Implemented a new callback function, _propagate_closed_to_result, which correctly propagates cancellation, exceptions, or results from the channel.closed future to the result_future.
  • New Test Case: Added a dedicated asynchronous test, test_stream_cancellation_does_not_crash_callback, to validate the fix for stream cancellation scenarios and confirm that the system behaves as expected under cancellation.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.


@pytest.mark.asyncio
async def test_stream_cancellation_does_not_crash_callback() -> None:
"""ACC-562: When stream task is cancelled, the channel.closed callback must not crash.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix comment format

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request addresses a potential crash when a streaming action is cancelled. The fix correctly propagates the cancellation status from the underlying task to the result future, preventing hangs or unhandled exceptions. A new asynchronous test case has been added to verify this fix by simulating a stream cancellation and asserting the correct CancelledError is raised. The changes appear correct and improve the robustness of streaming actions.

It's worth noting that the pull request title and description refer to a JSON serialization issue in FastAPI/Flask, which does not seem to match the content of the changes. It would be beneficial to update them to accurately reflect the work done on stream cancellation handling.

@huangjeff5 huangjeff5 changed the title HttpErrorWireFormat JSON serialization in FastAPI/Flask fix(py): Handle stream task cancellation without crashing callback Mar 19, 2026
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants