Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion flytekit/utils/asyn.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ async def async_add(a: int, b: int) -> int:

import asyncio
import atexit
import contextvars
import functools
import os
import threading
Expand Down Expand Up @@ -95,8 +96,40 @@ def run_sync(self, coro_func: Callable[..., Awaitable[T]], *args, **kwargs) -> T
"""
This should be called from synchronous functions to run an async function.
"""
name = threading.current_thread().name + f"PID:{os.getpid()}"
coro = coro_func(*args, **kwargs)

# Detect if we're inside a running event loop (e.g., async web framework).
# In that case, the _TaskRunner approach can fail with
# "Cannot run the event loop while another loop is running",
# so we run the coroutine on a fresh thread with its own event loop.
try:
asyncio.get_running_loop()
except RuntimeError:
running_loop = False
else:
running_loop = True

if running_loop:
result: T | None = None
exception: BaseException | None = None
ctx = contextvars.copy_context()

def _run():
nonlocal result, exception
try:
result = ctx.run(asyncio.run, coro)
except BaseException as e:
exception = e

thread = threading.Thread(target=_run, daemon=True)
thread.start()
thread.join()

if exception is not None:
raise exception
return result # type: ignore[return-value]

name = threading.current_thread().name + f"PID:{os.getpid()}"
if name not in self._runner_map:
if len(self._runner_map) > 500:
logger.warning(
Expand Down
37 changes: 37 additions & 0 deletions tests/flytekit/unit/utils/test_asyn.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,40 @@ async def runner_2():
loop_manager.run_sync(runner_2)

assert mock_handler.call_count == 2


def test_run_sync_from_running_event_loop():
"""Test that run_sync works when called from within a running event loop.

This simulates the scenario where FlyteRemote.execute() is called from
an async web framework (e.g., Flask with Gunicorn) that already has a
running event loop.
"""

async def async_double(x: int) -> int:
await asyncio.sleep(0.001)
return x * 2

async def outer():
# We're inside a running event loop here.
# run_sync should detect this and use a fresh thread instead of _TaskRunner.
result = run_sync(async_double, x=21)
assert result == 42
return result

# Run the outer coroutine which will call run_sync from within a running loop
result = asyncio.run(outer())
assert result == 42


def test_run_sync_from_running_event_loop_with_exception():
"""Test that exceptions propagate correctly when run_sync is called from a running loop."""

async def async_fail():
raise ValueError("test error from async")

async def outer():
with pytest.raises(ValueError, match="test error from async"):
run_sync(async_fail)

asyncio.run(outer())
Loading