fix: reject OpenAI/generate requests while sleeping; handle missing KV cache without crashing engine#4497
fix: reject OpenAI/generate requests while sleeping; handle missing KV cache without crashing engine#4497lvhan028 wants to merge 3 commits intoInternLM:mainfrom
Conversation
…V cache without crashing engine
There was a problem hiding this comment.
Pull request overview
This PR improves robustness around engine sleep/wakeup by rejecting inference requests while sleeping and preventing PyTorch-engine inference from crashing when KV/state cache engines are missing (e.g., after sleep or partial wakeup).
Changes:
- Make sleep/wakeup flows async end-to-end (OpenAI server → AsyncEngine → backend engines/executors), with thread offloading where appropriate.
- Add an epoch-stamping mechanism to drop work that bound a session before a stop-all/abort-all event, avoiding races during sleep.
- Convert missing cache situations into structured internal-engine errors instead of uncaught exceptions, and propagate them through the engine loop.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| lmdeploy/turbomind/turbomind.py | Makes TurboMind sleep/wakeup async via asyncio.to_thread wrappers. |
| lmdeploy/serve/openai/api_server.py | Rejects inference requests while sleeping; stamps session epoch; makes /sleep and /wakeup await engine operations. |
| lmdeploy/serve/managers/session_manager.py | Adds Session.epoch tracking and logs epoch on abort. |
| lmdeploy/serve/core/async_engine.py | Adds stale-session dropping via epoch; makes sleep/wakeup await backend; adjusts metrics increments for new abort/error paths. |
| lmdeploy/pytorch/engine/mp_engine/base.py | Converts MP engine sleep/wakeup RPC calls to async. |
| lmdeploy/pytorch/engine/mp_engine/base_worker.py | Converts MP worker sleep/wakeup to async and awaits engine methods. |
| lmdeploy/pytorch/engine/model_agent/agent.py | Introduces CacheNotReadyError, guards cache usage, and converts cache-missing failures into batched outputs with an engine error message. |
| lmdeploy/pytorch/engine/model_agent/init.py | Exposes CacheNotReadyError from the model_agent package. |
| lmdeploy/pytorch/engine/executor/uni_executor.py | Adds async sleep/wakeup plumbing for the single-device executor. |
| lmdeploy/pytorch/engine/executor/ray_executor.py | Makes sleep/wakeup async and offloads blocking RPC calls to threads. |
| lmdeploy/pytorch/engine/executor/mp_executor.py | Adds async sleep/wakeup implemented via collective_rpc_async. |
| lmdeploy/pytorch/engine/executor/base.py | Updates executor interface: wakeup is now async. |
| lmdeploy/pytorch/engine/executor/base_worker.py | Updates worker wrapper interface: wakeup is now async. |
| lmdeploy/pytorch/engine/engine.py | Makes engine sleep/wakeup async and awaits executor implementations. |
| lmdeploy/pytorch/engine/engine_loop.py | Treats engine_error_msg in BatchedOutputs as an internal engine error and finishes running requests accordingly. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| def _if_session_stale(self, session: Session, | ||
| input_token_len: int) -> GenOut | None: | ||
| """If session is stamped ``http_bind_epoch`` by api_server and |
There was a problem hiding this comment.
Docstring refers to http_bind_epoch, but the stamped attribute is session.epoch (set in api_server.VariableInterface.get_session). Consider updating the wording to avoid suggesting a non-existent field/name.
| """If session is stamped ``http_bind_epoch`` by api_server and | |
| """If api_server stamped the session's ``epoch`` and |
| else: | ||
| return session_mgr.get(session_id) | ||
| session = session_mgr.get(session_id) | ||
| # Stamp epoch for ``stop_all_session`` / ``abort_all`` coordination in ``AsyncEngine.generate``. |
There was a problem hiding this comment.
This comment mentions coordination with abort_all, but the implementation uses AsyncEngine.stop_all_session() (which calls session_mgr.async_abort_all()). Consider renaming in the comment to the actual API names to reduce confusion.
| # Stamp epoch for ``stop_all_session`` / ``abort_all`` coordination in ``AsyncEngine.generate``. | |
| # Stamp epoch for coordination with ``AsyncEngine.stop_all_session()`` / | |
| # ``session_mgr.async_abort_all()`` in ``AsyncEngine.generate``. |
No description provided.