Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion invokeai/app/run_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ def get_app():

def run_app() -> None:
"""The main entrypoint for the app."""
import asyncio
import sys
import threading
import traceback

from invokeai.frontend.cli.arg_parser import InvokeAIArgs

# Parse the CLI arguments before doing anything else, which ensures CLI args correctly override settings from other
Expand Down Expand Up @@ -100,4 +105,41 @@ def run_app() -> None:
for hdlr in logger.handlers:
uvicorn_logger.addHandler(hdlr)

loop.run_until_complete(server.serve())
try:
loop.run_until_complete(server.serve())
except KeyboardInterrupt:
logger.info("InvokeAI shutting down...")
# Gracefully shut down services (e.g. model download and install managers) so that any
# active work is completed or cleanly cancelled before the process exits.
from invokeai.app.api.dependencies import ApiDependencies

ApiDependencies.shutdown()

# Cancel any pending asyncio tasks (e.g. socket.io ping tasks) so that loop.close() does
# not emit "Task was destroyed but it is pending!" warnings for each one.
pending = [t for t in asyncio.all_tasks(loop) if not t.done()]
for task in pending:
task.cancel()
if pending:
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))

# Shut down the asyncio default thread executor. asyncio.to_thread() (used e.g. in the
# session queue for SQLite operations during generation) creates non-daemon threads via the
# event loop's default ThreadPoolExecutor. Without this call those threads remain alive and
# cause threading._shutdown() to hang indefinitely after the process's main code finishes.
loop.run_until_complete(loop.shutdown_default_executor())
loop.close()

# After graceful shutdown, log any non-daemon threads that are still alive. These are the
# threads that will cause Python's threading._shutdown() to block, preventing the process
# from exiting cleanly. This helps identify threads that need to be fixed or joined.
frames = sys._current_frames()
for thread in threading.enumerate():
if thread.daemon or thread is threading.main_thread():
continue
frame = frames.get(thread.ident)
stack = "".join(traceback.format_stack(frame)) if frame else "(no frame available)"
logger.warning(
f"Non-daemon thread still alive after shutdown: {thread.name!r} "
f"(ident={thread.ident})\nStack trace:\n{stack}"
)
2 changes: 1 addition & 1 deletion invokeai/app/services/download/download_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def stop(self, *args: Any, **kwargs: Any) -> None:
"""Stop the download worker threads."""
with self._lock:
if not self._worker_pool:
raise Exception("Attempt to stop the download service before it was started")
return
self._accept_download_requests = False # reject attempts to add new jobs to queue
queued_jobs = [x for x in self.list_jobs() if x.status == DownloadJobStatus.WAITING]
active_jobs = [x for x in self.list_jobs() if x.status == DownloadJobStatus.RUNNING]
Expand Down
4 changes: 4 additions & 0 deletions invokeai/app/services/events/events_fastapievents.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ def stop(self, *args, **kwargs):
self._loop.call_soon_threadsafe(self._queue.put_nowait, None)

def dispatch(self, event: EventBase) -> None:
if self._loop.is_closed():
# The event loop was closed during shutdown. Events can no longer be dispatched;
# silently drop this one so the generation thread can wind down cleanly.
return
self._loop.call_soon_threadsafe(self._queue.put_nowait, event)

async def _dispatch_from_queue(self, stop_event: threading.Event):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ def start(self, invoker: Optional[Invoker] = None) -> None:
def stop(self, invoker: Optional[Invoker] = None) -> None:
"""Stop the installer thread; after this the object can be deleted and garbage collected."""
if not self._running:
raise Exception("Attempt to stop the install service before it was started")
return
self._logger.debug("calling stop_event.set()")
self._stop_event.set()
self._clear_pending_jobs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ def start(self, invoker: Invoker) -> None:
self._thread = Thread(
name="session_processor",
target=self._process,
daemon=True,
kwargs={
"stop_event": self._stop_event,
"poll_now_event": self._poll_now_event,
Expand All @@ -366,6 +367,14 @@ def start(self, invoker: Invoker) -> None:

def stop(self, *args, **kwargs) -> None:
self._stop_event.set()
# Cancel any in-progress generation so that long-running nodes (e.g. denoising) stop at
# the next step boundary instead of running to completion. Without this, the generation
# thread may still be executing CUDA operations when Python teardown begins, which can
# cause a C++ std::terminate() crash ("terminate called without an active exception").
self._cancel_event.set()
# Wake the thread if it is sleeping in poll_now_event.wait() or blocked in resume_event.wait() (paused).
self._poll_now_event.set()
self._resume_event.set()

def _poll_now(self) -> None:
self._poll_now_event.set()
Expand Down
147 changes: 147 additions & 0 deletions tests/test_asyncio_shutdown.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""
Tests that verify the fix for the two-Ctrl+C shutdown hang.

Root cause: asyncio.to_thread() (used during generation for SQLite session queue operations)
creates non-daemon threads via the event loop's default ThreadPoolExecutor. When the event
loop is interrupted by KeyboardInterrupt without calling loop.shutdown_default_executor() and
loop.close(), those non-daemon threads remain alive and cause threading._shutdown() to block.

The fix in run_app.py:
1. Cancels all pending asyncio tasks (e.g. socket.io ping tasks) to avoid "Task was destroyed
but it is pending!" warnings when loop.close() is called.
2. Calls loop.run_until_complete(loop.shutdown_default_executor()) followed by loop.close()
after ApiDependencies.shutdown(), so all executor threads are cleaned up before the process
begins its Python-level teardown.
"""

from tests.dangerously_run_function_in_subprocess import dangerously_run_function_in_subprocess


def test_asyncio_to_thread_creates_nondaemon_thread():
"""Confirm that asyncio.to_thread() leaves a non-daemon thread alive after run_until_complete()
is interrupted - this is the raw symptom that caused the two-Ctrl+C hang."""

def test_func():
import asyncio
import threading

async def use_thread():
await asyncio.to_thread(lambda: None)

loop = asyncio.new_event_loop()
loop.run_until_complete(use_thread())
# Deliberately do NOT call shutdown_default_executor() or loop.close()
non_daemon = [t for t in threading.enumerate() if not t.daemon and t is not threading.main_thread()]
# There should be at least one non-daemon executor thread still alive
if not non_daemon:
raise AssertionError("Expected a non-daemon thread but found none")
print("ok")

stdout, _stderr, returncode = dangerously_run_function_in_subprocess(test_func)
assert returncode == 0, _stderr
assert stdout.strip() == "ok"


def test_shutdown_default_executor_cleans_up_nondaemon_threads():
"""Verify that calling shutdown_default_executor() + loop.close() eliminates all non-daemon
threads created by asyncio.to_thread() - this is the fix applied in run_app.py."""

def test_func():
import asyncio
import threading

async def use_thread():
await asyncio.to_thread(lambda: None)

loop = asyncio.new_event_loop()
loop.run_until_complete(use_thread())

# Apply the fix
loop.run_until_complete(loop.shutdown_default_executor())
loop.close()

non_daemon = [t for t in threading.enumerate() if not t.daemon and t is not threading.main_thread()]
if non_daemon:
raise AssertionError(f"Expected no non-daemon threads but found: {[t.name for t in non_daemon]}")
print("ok")

stdout, _stderr, returncode = dangerously_run_function_in_subprocess(test_func)
assert returncode == 0, _stderr
assert stdout.strip() == "ok"


def test_shutdown_default_executor_works_after_simulated_keyboard_interrupt():
"""Verify that the fix works even when run_until_complete() was previously interrupted,
matching the exact flow in run_app.py's except KeyboardInterrupt block."""

def test_func():
import asyncio
import threading

async def use_thread_then_raise():
await asyncio.to_thread(lambda: None)
raise KeyboardInterrupt

loop = asyncio.new_event_loop()
try:
loop.run_until_complete(use_thread_then_raise())
except KeyboardInterrupt:
pass

# At this point a non-daemon thread exists (the bug)
non_daemon_before = [t for t in threading.enumerate() if not t.daemon and t is not threading.main_thread()]
if not non_daemon_before:
raise AssertionError("Expected a non-daemon thread before fix")

# Apply the fix (what run_app.py now does)
loop.run_until_complete(loop.shutdown_default_executor())
loop.close()

non_daemon_after = [t for t in threading.enumerate() if not t.daemon and t is not threading.main_thread()]
if non_daemon_after:
raise AssertionError(f"Non-daemon threads remain after fix: {[t.name for t in non_daemon_after]}")
print("ok")

stdout, _stderr, returncode = dangerously_run_function_in_subprocess(test_func)
assert returncode == 0, _stderr
assert stdout.strip() == "ok"


def test_cancel_pending_tasks_suppresses_destroyed_task_warnings():
"""Verify that cancelling pending tasks before loop.close() suppresses 'Task was destroyed
but it is pending!' warnings (e.g. from socket.io ping tasks)."""

def test_func():
import asyncio

async def long_running():
await asyncio.sleep(1) # simulates a socket.io ping task

async def start_background_task():
asyncio.create_task(long_running())
await asyncio.to_thread(lambda: None)
raise KeyboardInterrupt

loop = asyncio.new_event_loop()
try:
loop.run_until_complete(start_background_task())
except KeyboardInterrupt:
pass

# Apply the task-cancellation fix
pending = [t for t in asyncio.all_tasks(loop) if not t.done()]
for task in pending:
task.cancel()
if pending:
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))

loop.run_until_complete(loop.shutdown_default_executor())
loop.close()
print("ok")

stdout, _stderr, returncode = dangerously_run_function_in_subprocess(test_func)
assert returncode == 0, _stderr
assert stdout.strip() == "ok"
# The "Task was destroyed but it is pending!" message appears on stderr when tasks are NOT
# cancelled before loop.close(). After the fix it must be absent.
assert "Task was destroyed but it is pending" not in _stderr