From 94221f09fcd8feac6c773173a0247516213695a3 Mon Sep 17 00:00:00 2001 From: Jeff Huang Date: Wed, 18 Mar 2026 21:39:41 -0500 Subject: [PATCH 1/2] fix(py): Handle channel.closed callback correctly when stream tasks are cancelled --- .../genkit/src/genkit/_core/_action.py | 14 +++++- .../genkit/tests/genkit/core/action_test.py | 49 +++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/py/packages/genkit/src/genkit/_core/_action.py b/py/packages/genkit/src/genkit/_core/_action.py index 4b985f90a1..4fc7cc02d2 100644 --- a/py/packages/genkit/src/genkit/_core/_action.py +++ b/py/packages/genkit/src/genkit/_core/_action.py @@ -467,7 +467,19 @@ def send_chunk(c: ChunkT) -> None: channel.set_close_future(asyncio.create_task(resp)) result_future: asyncio.Future[OutputT] = asyncio.Future() - channel.closed.add_done_callback(lambda _: result_future.set_result(channel.closed.result().response)) + + def _propagate_closed_to_result(_: asyncio.Future[ActionResponse[OutputT]]) -> None: + if result_future.done(): + return + closed = channel.closed + if closed.cancelled(): + result_future.cancel() + elif (exc := closed.exception()) is not None: + result_future.set_exception(exc) + else: + result_future.set_result(closed.result().response) + + channel.closed.add_done_callback(_propagate_closed_to_result) return StreamResponse(stream=channel, response=result_future) diff --git a/py/packages/genkit/tests/genkit/core/action_test.py b/py/packages/genkit/tests/genkit/core/action_test.py index 79610a4d04..53bdd2cc3e 100644 --- a/py/packages/genkit/tests/genkit/core/action_test.py +++ b/py/packages/genkit/tests/genkit/core/action_test.py @@ -5,8 +5,10 @@ """Tests for the action module.""" +import asyncio import json from typing import cast +from unittest.mock import patch import pytest @@ -177,6 +179,53 @@ async def foo( assert chunks == ['1', '2'] +@pytest.mark.asyncio +async def test_stream_cancellation_does_not_crash_callback() -> None: + """ACC-562: When stream task is cancelled, the channel.closed callback must not crash. + + Previously the callback called channel.closed.result() which raises CancelledError + when the future was cancelled. The fix propagates cancellation to result_future + instead of crashing. + """ + block = asyncio.Event() + + async def blocking_stream( + _input: str, + ctx: ActionRunContext, + ) -> int: + ctx.send_chunk('started') + await block.wait() # Never completes unless we set it + return 42 + + action = Action(name='blocking', kind=ActionKind.CUSTOM, fn=blocking_stream) + + captured_task: asyncio.Task | None = None + original_create_task = asyncio.create_task + + def capturing_create_task(coro: object) -> asyncio.Task: + nonlocal captured_task + task = original_create_task(coro) + captured_task = task + return task + + with patch('genkit._core._action.asyncio.create_task', side_effect=capturing_create_task): + result = action.stream('x') + + # Consume the first chunk so the action progresses to block.wait() + async for _ in result.stream: + break + + assert captured_task is not None + captured_task.cancel() + + # Give callbacks time to run + await asyncio.sleep(0.05) + + # Awaiting response should raise CancelledError, not crash in callback + with pytest.raises(asyncio.CancelledError): + await result.response + + def test_parse_plugin_name_from_action_name() -> None: """Parse plugin name from the action name.""" assert parse_plugin_name_from_action_name('foo') is None From fbac4b9284da013a69ecce0ba3338190fd078814 Mon Sep 17 00:00:00 2001 From: huangjeff5 <64040981+huangjeff5@users.noreply.github.com> Date: Wed, 18 Mar 2026 21:50:53 -0500 Subject: [PATCH 2/2] Update py/packages/genkit/tests/genkit/core/action_test.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- py/packages/genkit/tests/genkit/core/action_test.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/py/packages/genkit/tests/genkit/core/action_test.py b/py/packages/genkit/tests/genkit/core/action_test.py index 53bdd2cc3e..703801f955 100644 --- a/py/packages/genkit/tests/genkit/core/action_test.py +++ b/py/packages/genkit/tests/genkit/core/action_test.py @@ -218,12 +218,9 @@ def capturing_create_task(coro: object) -> asyncio.Task: assert captured_task is not None captured_task.cancel() - # Give callbacks time to run - await asyncio.sleep(0.05) - # Awaiting response should raise CancelledError, not crash in callback with pytest.raises(asyncio.CancelledError): - await result.response + await asyncio.wait_for(result.response, timeout=1.0) def test_parse_plugin_name_from_action_name() -> None: