From 0cbe7c76de312c9f342d0a24634e082c28e7ef51 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Thu, 26 Feb 2026 19:30:57 -0500 Subject: [PATCH 1/2] Fix: Kill the server with one keyboard interrupt (#94) * Initial plan * Handle KeyboardInterrupt in run_app to allow single Ctrl+C shutdown Co-authored-by: lstein <111189+lstein@users.noreply.github.com> * Force os._exit(0) on KeyboardInterrupt to avoid hanging on background threads Co-authored-by: lstein <111189+lstein@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: lstein <111189+lstein@users.noreply.github.com> Fix graceful shutdown to wait for download/install worker threads (#102) * Initial plan * Replace os._exit(0) with ApiDependencies.shutdown() on KeyboardInterrupt Instead of immediately force-exiting the process on CTRL+C, call ApiDependencies.shutdown() to gracefully stop the download and install manager services, allowing active work to complete or cancel cleanly before the process exits. Co-authored-by: lstein <111189+lstein@users.noreply.github.com> * Make stop() idempotent in download and model install services When CTRL+C is pressed, uvicorn's graceful shutdown triggers the FastAPI lifespan which calls ApiDependencies.shutdown(), then a KeyboardInterrupt propagates from run_until_complete() hitting the except block which tries to call ApiDependencies.shutdown() a second time. Change both stop() methods to return silently (instead of raising) when the service is not running. This handles: - Double-shutdown: lifespan already stopped the services - Early interrupt: services were never fully started Co-authored-by: lstein <111189+lstein@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: lstein <111189+lstein@users.noreply.github.com> Fix shutdown hang on session processor thread lock (#108) * Initial plan * Fix shutdown hang: wake session processor thread on stop() and mark daemon Co-authored-by: lstein <111189+lstein@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: lstein <111189+lstein@users.noreply.github.com> --- invokeai/app/run_app.py | 10 +++++++++- invokeai/app/services/download/download_default.py | 2 +- .../services/model_install/model_install_default.py | 2 +- .../session_processor/session_processor_default.py | 4 ++++ 4 files changed, 15 insertions(+), 3 deletions(-) diff --git a/invokeai/app/run_app.py b/invokeai/app/run_app.py index 76c2ffdad51..f032ff9ab7a 100644 --- a/invokeai/app/run_app.py +++ b/invokeai/app/run_app.py @@ -100,4 +100,12 @@ def run_app() -> None: for hdlr in logger.handlers: uvicorn_logger.addHandler(hdlr) - loop.run_until_complete(server.serve()) + try: + loop.run_until_complete(server.serve()) + except KeyboardInterrupt: + logger.info("InvokeAI shutting down...") + # Gracefully shut down services (e.g. model download and install managers) so that any + # active work is completed or cleanly cancelled before the process exits. + from invokeai.app.api.dependencies import ApiDependencies + + ApiDependencies.shutdown() diff --git a/invokeai/app/services/download/download_default.py b/invokeai/app/services/download/download_default.py index 9b5fda5620d..c21ffde5a1b 100644 --- a/invokeai/app/services/download/download_default.py +++ b/invokeai/app/services/download/download_default.py @@ -88,7 +88,7 @@ def stop(self, *args: Any, **kwargs: Any) -> None: """Stop the download worker threads.""" with self._lock: if not self._worker_pool: - raise Exception("Attempt to stop the download service before it was started") + return self._accept_download_requests = False # reject attempts to add new jobs to queue queued_jobs = [x for x in self.list_jobs() if x.status == DownloadJobStatus.WAITING] active_jobs = [x for x in self.list_jobs() if x.status == DownloadJobStatus.RUNNING] diff --git a/invokeai/app/services/model_install/model_install_default.py b/invokeai/app/services/model_install/model_install_default.py index 714ae9329a4..f20a1784be8 100644 --- a/invokeai/app/services/model_install/model_install_default.py +++ b/invokeai/app/services/model_install/model_install_default.py @@ -330,7 +330,7 @@ def start(self, invoker: Optional[Invoker] = None) -> None: def stop(self, invoker: Optional[Invoker] = None) -> None: """Stop the installer thread; after this the object can be deleted and garbage collected.""" if not self._running: - raise Exception("Attempt to stop the install service before it was started") + return self._logger.debug("calling stop_event.set()") self._stop_event.set() self._clear_pending_jobs() diff --git a/invokeai/app/services/session_processor/session_processor_default.py b/invokeai/app/services/session_processor/session_processor_default.py index 6c320eabda5..6eee3495808 100644 --- a/invokeai/app/services/session_processor/session_processor_default.py +++ b/invokeai/app/services/session_processor/session_processor_default.py @@ -355,6 +355,7 @@ def start(self, invoker: Invoker) -> None: self._thread = Thread( name="session_processor", target=self._process, + daemon=True, kwargs={ "stop_event": self._stop_event, "poll_now_event": self._poll_now_event, @@ -366,6 +367,9 @@ def start(self, invoker: Invoker) -> None: def stop(self, *args, **kwargs) -> None: self._stop_event.set() + # Wake the thread if it is sleeping in poll_now_event.wait() or blocked in resume_event.wait() (paused). + self._poll_now_event.set() + self._resume_event.set() def _poll_now(self) -> None: self._poll_now_event.set() From 448eb34e835d61cf2e518cbb5a4522a2535ce286 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Mon, 2 Mar 2026 11:31:41 -0500 Subject: [PATCH 2/2] Fix: shut down asyncio executor on KeyboardInterrupt to prevent post-generation hang (#112) Fix: cancel pending asyncio tasks before loop.close() to suppress destroyed-task warnings Fix: suppress stack trace when dispatching events after event loop is closed on shutdown Fix: cancel in-progress generation on stop() to prevent core dump during mid-flight Ctrl+C Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: lstein <111189+lstein@users.noreply.github.com> --- invokeai/app/run_app.py | 34 ++++ .../services/events/events_fastapievents.py | 4 + .../session_processor_default.py | 5 + tests/test_asyncio_shutdown.py | 147 ++++++++++++++++++ 4 files changed, 190 insertions(+) create mode 100644 tests/test_asyncio_shutdown.py diff --git a/invokeai/app/run_app.py b/invokeai/app/run_app.py index f032ff9ab7a..febd4f4d4b1 100644 --- a/invokeai/app/run_app.py +++ b/invokeai/app/run_app.py @@ -9,6 +9,11 @@ def get_app(): def run_app() -> None: """The main entrypoint for the app.""" + import asyncio + import sys + import threading + import traceback + from invokeai.frontend.cli.arg_parser import InvokeAIArgs # Parse the CLI arguments before doing anything else, which ensures CLI args correctly override settings from other @@ -109,3 +114,32 @@ def run_app() -> None: from invokeai.app.api.dependencies import ApiDependencies ApiDependencies.shutdown() + + # Cancel any pending asyncio tasks (e.g. socket.io ping tasks) so that loop.close() does + # not emit "Task was destroyed but it is pending!" warnings for each one. + pending = [t for t in asyncio.all_tasks(loop) if not t.done()] + for task in pending: + task.cancel() + if pending: + loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) + + # Shut down the asyncio default thread executor. asyncio.to_thread() (used e.g. in the + # session queue for SQLite operations during generation) creates non-daemon threads via the + # event loop's default ThreadPoolExecutor. Without this call those threads remain alive and + # cause threading._shutdown() to hang indefinitely after the process's main code finishes. + loop.run_until_complete(loop.shutdown_default_executor()) + loop.close() + + # After graceful shutdown, log any non-daemon threads that are still alive. These are the + # threads that will cause Python's threading._shutdown() to block, preventing the process + # from exiting cleanly. This helps identify threads that need to be fixed or joined. + frames = sys._current_frames() + for thread in threading.enumerate(): + if thread.daemon or thread is threading.main_thread(): + continue + frame = frames.get(thread.ident) + stack = "".join(traceback.format_stack(frame)) if frame else "(no frame available)" + logger.warning( + f"Non-daemon thread still alive after shutdown: {thread.name!r} " + f"(ident={thread.ident})\nStack trace:\n{stack}" + ) diff --git a/invokeai/app/services/events/events_fastapievents.py b/invokeai/app/services/events/events_fastapievents.py index 3c46b37fd68..f44eecc5559 100644 --- a/invokeai/app/services/events/events_fastapievents.py +++ b/invokeai/app/services/events/events_fastapievents.py @@ -28,6 +28,10 @@ def stop(self, *args, **kwargs): self._loop.call_soon_threadsafe(self._queue.put_nowait, None) def dispatch(self, event: EventBase) -> None: + if self._loop.is_closed(): + # The event loop was closed during shutdown. Events can no longer be dispatched; + # silently drop this one so the generation thread can wind down cleanly. + return self._loop.call_soon_threadsafe(self._queue.put_nowait, event) async def _dispatch_from_queue(self, stop_event: threading.Event): diff --git a/invokeai/app/services/session_processor/session_processor_default.py b/invokeai/app/services/session_processor/session_processor_default.py index 6eee3495808..bda6ac98e36 100644 --- a/invokeai/app/services/session_processor/session_processor_default.py +++ b/invokeai/app/services/session_processor/session_processor_default.py @@ -367,6 +367,11 @@ def start(self, invoker: Invoker) -> None: def stop(self, *args, **kwargs) -> None: self._stop_event.set() + # Cancel any in-progress generation so that long-running nodes (e.g. denoising) stop at + # the next step boundary instead of running to completion. Without this, the generation + # thread may still be executing CUDA operations when Python teardown begins, which can + # cause a C++ std::terminate() crash ("terminate called without an active exception"). + self._cancel_event.set() # Wake the thread if it is sleeping in poll_now_event.wait() or blocked in resume_event.wait() (paused). self._poll_now_event.set() self._resume_event.set() diff --git a/tests/test_asyncio_shutdown.py b/tests/test_asyncio_shutdown.py new file mode 100644 index 00000000000..066ff937c92 --- /dev/null +++ b/tests/test_asyncio_shutdown.py @@ -0,0 +1,147 @@ +""" +Tests that verify the fix for the two-Ctrl+C shutdown hang. + +Root cause: asyncio.to_thread() (used during generation for SQLite session queue operations) +creates non-daemon threads via the event loop's default ThreadPoolExecutor. When the event +loop is interrupted by KeyboardInterrupt without calling loop.shutdown_default_executor() and +loop.close(), those non-daemon threads remain alive and cause threading._shutdown() to block. + +The fix in run_app.py: +1. Cancels all pending asyncio tasks (e.g. socket.io ping tasks) to avoid "Task was destroyed + but it is pending!" warnings when loop.close() is called. +2. Calls loop.run_until_complete(loop.shutdown_default_executor()) followed by loop.close() + after ApiDependencies.shutdown(), so all executor threads are cleaned up before the process + begins its Python-level teardown. +""" + +from tests.dangerously_run_function_in_subprocess import dangerously_run_function_in_subprocess + + +def test_asyncio_to_thread_creates_nondaemon_thread(): + """Confirm that asyncio.to_thread() leaves a non-daemon thread alive after run_until_complete() + is interrupted - this is the raw symptom that caused the two-Ctrl+C hang.""" + + def test_func(): + import asyncio + import threading + + async def use_thread(): + await asyncio.to_thread(lambda: None) + + loop = asyncio.new_event_loop() + loop.run_until_complete(use_thread()) + # Deliberately do NOT call shutdown_default_executor() or loop.close() + non_daemon = [t for t in threading.enumerate() if not t.daemon and t is not threading.main_thread()] + # There should be at least one non-daemon executor thread still alive + if not non_daemon: + raise AssertionError("Expected a non-daemon thread but found none") + print("ok") + + stdout, _stderr, returncode = dangerously_run_function_in_subprocess(test_func) + assert returncode == 0, _stderr + assert stdout.strip() == "ok" + + +def test_shutdown_default_executor_cleans_up_nondaemon_threads(): + """Verify that calling shutdown_default_executor() + loop.close() eliminates all non-daemon + threads created by asyncio.to_thread() - this is the fix applied in run_app.py.""" + + def test_func(): + import asyncio + import threading + + async def use_thread(): + await asyncio.to_thread(lambda: None) + + loop = asyncio.new_event_loop() + loop.run_until_complete(use_thread()) + + # Apply the fix + loop.run_until_complete(loop.shutdown_default_executor()) + loop.close() + + non_daemon = [t for t in threading.enumerate() if not t.daemon and t is not threading.main_thread()] + if non_daemon: + raise AssertionError(f"Expected no non-daemon threads but found: {[t.name for t in non_daemon]}") + print("ok") + + stdout, _stderr, returncode = dangerously_run_function_in_subprocess(test_func) + assert returncode == 0, _stderr + assert stdout.strip() == "ok" + + +def test_shutdown_default_executor_works_after_simulated_keyboard_interrupt(): + """Verify that the fix works even when run_until_complete() was previously interrupted, + matching the exact flow in run_app.py's except KeyboardInterrupt block.""" + + def test_func(): + import asyncio + import threading + + async def use_thread_then_raise(): + await asyncio.to_thread(lambda: None) + raise KeyboardInterrupt + + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(use_thread_then_raise()) + except KeyboardInterrupt: + pass + + # At this point a non-daemon thread exists (the bug) + non_daemon_before = [t for t in threading.enumerate() if not t.daemon and t is not threading.main_thread()] + if not non_daemon_before: + raise AssertionError("Expected a non-daemon thread before fix") + + # Apply the fix (what run_app.py now does) + loop.run_until_complete(loop.shutdown_default_executor()) + loop.close() + + non_daemon_after = [t for t in threading.enumerate() if not t.daemon and t is not threading.main_thread()] + if non_daemon_after: + raise AssertionError(f"Non-daemon threads remain after fix: {[t.name for t in non_daemon_after]}") + print("ok") + + stdout, _stderr, returncode = dangerously_run_function_in_subprocess(test_func) + assert returncode == 0, _stderr + assert stdout.strip() == "ok" + + +def test_cancel_pending_tasks_suppresses_destroyed_task_warnings(): + """Verify that cancelling pending tasks before loop.close() suppresses 'Task was destroyed + but it is pending!' warnings (e.g. from socket.io ping tasks).""" + + def test_func(): + import asyncio + + async def long_running(): + await asyncio.sleep(1) # simulates a socket.io ping task + + async def start_background_task(): + asyncio.create_task(long_running()) + await asyncio.to_thread(lambda: None) + raise KeyboardInterrupt + + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(start_background_task()) + except KeyboardInterrupt: + pass + + # Apply the task-cancellation fix + pending = [t for t in asyncio.all_tasks(loop) if not t.done()] + for task in pending: + task.cancel() + if pending: + loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) + + loop.run_until_complete(loop.shutdown_default_executor()) + loop.close() + print("ok") + + stdout, _stderr, returncode = dangerously_run_function_in_subprocess(test_func) + assert returncode == 0, _stderr + assert stdout.strip() == "ok" + # The "Task was destroyed but it is pending!" message appears on stderr when tasks are NOT + # cancelled before loop.close(). After the fix it must be absent. + assert "Task was destroyed but it is pending" not in _stderr