feat(grpc-servicer): support upstream vLLM KV event subscriptions#742
feat(grpc-servicer): support upstream vLLM KV event subscriptions#742smfirmin wants to merge 2 commits intolightseekorg:mainfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds documentation and dependencies; implements a new VllmKvEventBridge (ZeroMQ → gRPC) with per-rank replay, buffering, and sequence normalization; exposes SubscribeKvEvents RPC and servicer integration; adds async Python tests; and updates Rust kv_event_monitor stream-processing to detect restarts/gaps and handle specific gRPC statuses. Changes
Sequence Diagram(s)sequenceDiagram
participant vllm as Upstream vLLM
participant zmq as ZeroMQ Publisher
participant bridge as VllmKvEventBridge
participant grpc as gRPC Servicer
participant client as Client
participant gateway as Model Gateway
vllm->>zmq: Publish KV events (BlockStored / BlockRemoved / AllBlocksCleared)
zmq->>bridge: SUB stream (per-rank), REQ/REP for replay
bridge->>bridge: Translate → KvCacheEvent, assign global seq, buffer
client->>grpc: SubscribeKvEvents(start_sequence)
grpc->>bridge: subscribe(start_sequence)
bridge-->>grpc: Replay buffered batches and stream new batches
grpc-->>client: Stream KvEventBatch responses
bridge->>gateway: Surface restart/gap signals
gateway->>gateway: handle_batch → apply / skip / reset state
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the Highlights
Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces support for consuming vLLM KV cache events, a significant feature that enables more advanced cache-aware routing. The implementation includes a Python-based bridge to consume ZMQ events from vLLM and expose them over gRPC, as well as hardening of the Rust-based consumer to handle backend restarts and unsupported event layouts. The changes are well-structured, include comprehensive tests for the new functionality, and provide clear documentation for operators. My review found one opportunity for improvement to enhance robustness in the event translation logic.
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@grpc_servicer/smg_grpc_servicer/vllm/kv_events.py`:
- Line 1: The module-level pragma "# mypy: ignore-errors" in
grpc_servicer.smg_grpc_servicer.vllm.kv_events disables type checking for the
whole file; remove that line and instead add targeted "# type: ignore" comments
only on the specific imports, calls, or expressions that fail mypy (for example
problematic imports or functions inside kv_events such as any external-client
initializations or dynamic attribute uses), run mypy to see remaining errors,
and progressively add precise ignores or proper type annotations (or stubs) for
the identified symbols to restore type checks for the rest of the module.
- Line 82: The loop pairing event.block_hashes with token_chunks should use
strict validation to detect mismatches; update the zip call in the loop that
iterates over event.block_hashes and token_chunks (the one consuming the output
of _chunk_token_ids) to pass strict=True so any unexpected length mismatch
raises an error instead of silently truncating.
In `@grpc_servicer/smg_grpc_servicer/vllm/servicer.py`:
- Around line 349-350: The servicer.shutdown() method (which awaits
kv_event_bridge.shutdown()) is never invoked during server teardown, causing the
bridge task and ZMQ sockets to leak; update the gRPC server lifecycle (e.g., in
the function that starts/awaits the server such as your server serve/start
function or main) to call and await servicer.shutdown() in the shutdown
path—wrap the server run in try/finally or register a graceful shutdown callback
so that await servicer.shutdown() runs when stopping the server (ensure you
reference servicer.shutdown() and kv_event_bridge.shutdown() and await the call
to fully close resources).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 67a9c6e8-37b6-46b4-8f6c-d8a31d7c3645
📒 Files selected for processing (9)
docs/getting-started/grpc-workers.mddocs/getting-started/index.mdgrpc_servicer/README.mdgrpc_servicer/pyproject.tomlgrpc_servicer/smg_grpc_servicer/vllm/kv_events.pygrpc_servicer/smg_grpc_servicer/vllm/servicer.pygrpc_servicer/tests/test_kv_events.pymodel_gateway/src/core/kv_event_monitor.rsmodel_gateway/tests/routing/test_openai_routing.rs
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 05997b645f
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@grpc_servicer/smg_grpc_servicer/vllm/kv_events.py`:
- Around line 258-263: The code silently drops batches when a publisher's
per-rank sequence resets because rank_seq <= self._last_rank_sequence.get(rank,
-1) simply continues; update the logic in the block around rank_seq,
_last_rank_sequence, _decoder.decode and _append_batch to detect sequence
regression (rank_seq < last_seq) and handle it explicitly: either reset the
stored state for that rank (e.g., delete or set last_seq to -1 and replay/apply
the incoming batch) or raise/propagate an error/notification so the bridge can
fail-closed or request replay. Ensure the chosen behavior updates
_last_rank_sequence appropriately after calling _append_batch (or before if
replaying) and does not silently drop valid post-restart batches.
- Around line 188-203: The wait predicate currently treats a caught-up
subscriber (next_seq == self._next_sequence_number) as needing replay, causing
_normalize_requested_sequence_locked to rewind and the loop to resend the full
buffer; change the predicate to only consider next_seq beyond the known next
sequence (use next_seq > self._next_sequence_number instead of >=) or otherwise
ensure equality does not make the predicate true, and also ensure
_normalize_requested_sequence_locked does not rewind when next_seq ==
self._next_sequence_number (only rewind when requested sequence is strictly less
than the oldest allowed). Update the expressions using self._buffer,
self._next_sequence_number, next_seq and _normalize_requested_sequence_locked
accordingly so caught-up subscribers simply wait for new data.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 2b08c8b9-cf24-4955-8f2e-fe3574bf0ef0
📒 Files selected for processing (1)
grpc_servicer/smg_grpc_servicer/vllm/kv_events.py
There was a problem hiding this comment.
♻️ Duplicate comments (1)
grpc_servicer/smg_grpc_servicer/vllm/kv_events.py (1)
318-333:⚠️ Potential issue | 🔴 CriticalFail closed on rank-sequence regression instead of appending over stale history.
When
rank_seq < last_rank_seq, this path only logs and then appends the new batch into the existing_buffer. Active subscribers therefore continue from a mixed pre-restart + post-restart history, which can reconstruct the wrong KV state unless the first post-restart batch fully clears everything. This needs a reset/fatal path, not ordinary ingestion.🛡️ Minimal safe fix
async def _ingest_rank_batch(self, rank: int, rank_seq: int, payload: bytes) -> None: last_rank_seq = self._last_rank_sequence.get(rank, -1) if rank_seq == last_rank_seq: return if rank_seq < last_rank_seq: - logger.warning( - "KV event publisher sequence regressed for rank %s: previous=%s received=%s. " - "Assuming publisher restart and accepting new batches from this rank", - rank, - last_rank_seq, - rank_seq, - ) + raise RuntimeError( + "KV event publisher sequence regressed for rank " + f"{rank}: previous={last_rank_seq} received={rank_seq}" + ) batch = self._decoder.decode(payload) await self._append_batch(batch) self._last_rank_sequence[rank] = rank_seq🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@grpc_servicer/smg_grpc_servicer/vllm/kv_events.py` around lines 318 - 333, The code currently treats a rank_seq regression in _ingest_rank_batch as non-fatal and continues appending, which can corrupt state; change the regression path to fail-closed by logging an error and raising an exception (e.g., RuntimeError) instead of decoding/appending. Specifically, in _ingest_rank_batch detect rank_seq < last_rank_seq and call logger.error with context (rank, last_rank_seq, rank_seq) then raise a fatal error so callers stop processing; do not call self._decoder.decode, self._append_batch, or update self._last_rank_sequence for that case. This uses the existing symbols _ingest_rank_batch, _last_rank_sequence, _decoder, and _append_batch to locate and change the logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@grpc_servicer/smg_grpc_servicer/vllm/kv_events.py`:
- Around line 318-333: The code currently treats a rank_seq regression in
_ingest_rank_batch as non-fatal and continues appending, which can corrupt
state; change the regression path to fail-closed by logging an error and raising
an exception (e.g., RuntimeError) instead of decoding/appending. Specifically,
in _ingest_rank_batch detect rank_seq < last_rank_seq and call logger.error with
context (rank, last_rank_seq, rank_seq) then raise a fatal error so callers stop
processing; do not call self._decoder.decode, self._append_batch, or update
self._last_rank_sequence for that case. This uses the existing symbols
_ingest_rank_batch, _last_rank_sequence, _decoder, and _append_batch to locate
and change the logic.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: a7a01521-c1b4-4eea-b2e2-4f863ea99ecb
📒 Files selected for processing (2)
grpc_servicer/smg_grpc_servicer/vllm/kv_events.pygrpc_servicer/tests/test_kv_events.py
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 04affa6524
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
|
Hi @smfirmin, this PR has merge conflicts that must be resolved before it can be merged. Please rebase your branch: git fetch origin main
git rebase origin/main
# resolve any conflicts, then:
git push --force-with-lease |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@grpc_servicer/pyproject.toml`:
- Around line 14-15: The dependency "msgspec" in pyproject.toml has no version
constraint; update the dependency entry for msgspec to include a minimum version
(e.g., "msgspec>=X.Y.Z") to match the style used for "pyzmq>=25.0.0" and prevent
accidental incompatible upgrades—check the latest stable msgspec release and set
that as the minimum, then run your dependency install/check to verify
compatibility.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 11d8a066-b72f-43a4-8126-f94632f9ebab
📒 Files selected for processing (2)
grpc_servicer/README.mdgrpc_servicer/pyproject.toml
|
Hi @smfirmin, the DCO sign-off check has failed. All commits must include a To fix existing commits: # Sign off the last N commits (replace N with the number of unsigned commits)
git rebase HEAD~N --signoff
git push --force-with-leaseTo sign off future commits automatically:
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@grpc_servicer/pyproject.toml`:
- Around line 14-15: The TOML dependency list has a syntax error: the string for
the msgspec dependency is missing its closing double quote; locate the
dependency entry containing "msgspec>=0.20.0, and add the missing closing quote
so it becomes a valid TOML string (matching the format used for
"pyzmq>=25.0.0"), ensuring commas and quotes are correct for all dependency
lines.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 68321d8b-c2d7-443a-b2ae-3c41432e9b21
📒 Files selected for processing (1)
grpc_servicer/pyproject.toml
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 16658c7646
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
16658c7 to
d482fb9
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 21f18509d3
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ec068062b3
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| next_seq, | ||
| oldest, | ||
| ) | ||
| return oldest |
There was a problem hiding this comment.
Return terminal error for expired replay sequence requests
When a client asks for a sequence older than the replay buffer, this branch rewinds to oldest and silently drops the missing range instead of surfacing a reset/error. In the current monitor flow, that produces a permanent reconnect loop: handle_batch reports a gap (received > last_seq + 1), subscription_loop reconnects with the same last_seq, and the bridge rewinds again, so KV updates never recover after a long disconnect or backlog overrun.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: be03d6d1fd
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
|
Hi @smfirmin, the DCO sign-off check has failed. All commits must include a To fix existing commits: # Sign off the last N commits (replace N with the number of unsigned commits)
git rebase HEAD~N --signoff
git push --force-with-leaseTo sign off future commits automatically:
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ebf18881a0
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
4af2c0b to
a2d4535
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: a2d4535039
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
Signed-off-by: Sydney Firmin <sydney.firmin@oracle.com>
a2d4535 to
50ffe81
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: db41cd2208
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
Description
Summary
This PR adds SMG support for consuming vLLM KV cache events over SubscribeKvEvents through smg-grpc-servicer, with no source changes required in the vllm repo.
It also hardens the integration around two important edge cases:
Finally, it documents the remaining protocol limitation and stabilizes one unrelated flaky routing test fixture.
What changed
Added vLLM KV event bridge support in smg-grpc-servicer so SMG can subscribe to KV cache events from upstream vLLM when vLLM is started with --kv-events-config.
Added fail-closed handling for unsupported BlockStored layouts that cannot be translated losslessly.
Mapped those unsupported layouts to gRPC FAILED_PRECONDITION instead of allowing corrupted KV state.
Updated the Rust KV event monitor to:
Added tests for:
Added operator docs explaining:
Stabilized test_openai_router_circuit_breaker by replacing an environment-dependent invalid hostname with a deterministic closed localhost port.
Upstream vLLM compatibility
This works with upstream vLLM as-is. No vllm code changes are required.
To enable KV event streaming, run the stock vLLM gRPC server with --kv-events-config.
Residual limitation
Restart recovery is currently sequence-based.
SMG can detect a restart when the first fresh batch after reconnect has a lower sequence number than the last applied batch. If a restarted backend catches back up before SMG reconnects, the current protocol has no producer epoch/generation identifier, so that restart is not distinguishable from normal continuation.
Unsupported null-block/sparse BlockStored layouts also remain fail-closed until the stream carries enough per-block token range metadata for lossless translation.
Testing
Ran:
grpc_servicer/smg_grpc_servicer/vllm/servicer.py
routing::test_openai_routing::test_openai_router_circuit_breaker -- --exact
--nocapture
Summary by CodeRabbit
New Features
Documentation
Bug Fixes
Tests
Chores