diff --git a/flytekit/utils/asyn.py b/flytekit/utils/asyn.py index 0ae25e9bb7..167d62e3e0 100644 --- a/flytekit/utils/asyn.py +++ b/flytekit/utils/asyn.py @@ -11,6 +11,7 @@ async def async_add(a: int, b: int) -> int: import asyncio import atexit +import contextvars import functools import os import threading @@ -95,8 +96,40 @@ def run_sync(self, coro_func: Callable[..., Awaitable[T]], *args, **kwargs) -> T """ This should be called from synchronous functions to run an async function. """ - name = threading.current_thread().name + f"PID:{os.getpid()}" coro = coro_func(*args, **kwargs) + + # Detect if we're inside a running event loop (e.g., async web framework). + # In that case, the _TaskRunner approach can fail with + # "Cannot run the event loop while another loop is running", + # so we run the coroutine on a fresh thread with its own event loop. + try: + asyncio.get_running_loop() + except RuntimeError: + running_loop = False + else: + running_loop = True + + if running_loop: + result: T | None = None + exception: BaseException | None = None + ctx = contextvars.copy_context() + + def _run(): + nonlocal result, exception + try: + result = ctx.run(asyncio.run, coro) + except BaseException as e: + exception = e + + thread = threading.Thread(target=_run, daemon=True) + thread.start() + thread.join() + + if exception is not None: + raise exception + return result # type: ignore[return-value] + + name = threading.current_thread().name + f"PID:{os.getpid()}" if name not in self._runner_map: if len(self._runner_map) > 500: logger.warning( diff --git a/tests/flytekit/unit/utils/test_asyn.py b/tests/flytekit/unit/utils/test_asyn.py index 69d27f827a..9b332e23f7 100644 --- a/tests/flytekit/unit/utils/test_asyn.py +++ b/tests/flytekit/unit/utils/test_asyn.py @@ -168,3 +168,40 @@ async def runner_2(): loop_manager.run_sync(runner_2) assert mock_handler.call_count == 2 + + +def test_run_sync_from_running_event_loop(): + """Test that run_sync works when called from within a running event loop. + + This simulates the scenario where FlyteRemote.execute() is called from + an async web framework (e.g., Flask with Gunicorn) that already has a + running event loop. + """ + + async def async_double(x: int) -> int: + await asyncio.sleep(0.001) + return x * 2 + + async def outer(): + # We're inside a running event loop here. + # run_sync should detect this and use a fresh thread instead of _TaskRunner. + result = run_sync(async_double, x=21) + assert result == 42 + return result + + # Run the outer coroutine which will call run_sync from within a running loop + result = asyncio.run(outer()) + assert result == 42 + + +def test_run_sync_from_running_event_loop_with_exception(): + """Test that exceptions propagate correctly when run_sync is called from a running loop.""" + + async def async_fail(): + raise ValueError("test error from async") + + async def outer(): + with pytest.raises(ValueError, match="test error from async"): + run_sync(async_fail) + + asyncio.run(outer())