Skip to content

refactor(gateway): simplify and optimize OpenAI streaming internals#738

Open
slin1237 wants to merge 2 commits intomainfrom
slin/oai-refactor-8
Open

refactor(gateway): simplify and optimize OpenAI streaming internals#738
slin1237 wants to merge 2 commits intomainfrom
slin/oai-refactor-8

Conversation

@slin1237
Copy link
Collaborator

@slin1237 slin1237 commented Mar 12, 2026

Summary

  • Eliminate hot-path allocations (JSON clone, String alloc) on every forwarded streaming event
  • Extract effective_tool_call_limit() to replace 6 identical inline calculations
  • Add ingest_event() to bypass SSE format+reparse round-trip in accumulator
  • Unify duplicated fallback response builders and SSE send patterns

What changed

File Change
mcp_utils.rs Add effective_tool_call_limit() shared helper
tool_handler.rs Use ingest_event() instead of format+reparse
accumulator.rs Add ingest_event() fast path; unify build_fallback_response and build_fallback_response_snapshot into assemble_fallback
common.rs Fast-path in ChunkProcessor::push_chunk — skip char-by-char when no \r present
streaming.rs Pre-computed boolean flags replace event_type.to_string(); move parsed instead of cloning; remove unused import
tool_loop.rs Add local send_sse_event helper replacing 7 inline instances; use effective_tool_call_limit()

Why

Every forwarded streaming event was incurring an unnecessary Value::clone() and a String allocation for the event type. The accumulator was formatting an SSE block only to immediately re-parse it. These micro-allocations add up on high-throughput streaming paths. The duplicated limit calculation and SSE formatting patterns also made the code harder to maintain.

Test plan

  • cargo check -p smg compiles cleanly with no warnings
  • Existing MCP tool loop tests pass (streaming + non-streaming)
  • Manual test: streaming responses with MCP tool interception work end-to-end
  • Manual test: simple passthrough streaming (no MCP) unaffected

Summary by CodeRabbit

  • Performance

    • Faster streaming and event forwarding with reduced re-parsing and a quicker chunk-processing fast path.
    • Centralized enforcement of tool-call limits for consistent behavior.
  • New Features

    • Unified server-sent-event emission for more consistent, reliable tool event delivery.
    • Earlier "in-progress" notifications for tool interactions.
    • New direct event ingest path to reduce formatting overhead.
  • Bug Fixes

    • More consistent final-response assembly and fallback handling to improve result reliability.

Eliminate hot-path allocations and deduplicate shared patterns across
the MCP tool loop and streaming response modules.

What changed:
- mcp_utils.rs: add `effective_tool_call_limit()` to replace 6 identical
  inline limit calculations across openai and grpc routers
- tool_handler.rs: call `accumulator.ingest_event()` instead of
  formatting an SSE block string then re-parsing it
- accumulator.rs: add `ingest_event()` fast path; unify
  `build_fallback_response` and `build_fallback_response_snapshot` into
  shared `assemble_fallback` helper
- common.rs: add fast-path in `ChunkProcessor::push_chunk` that skips
  char-by-char CRLF normalization when no `\r` is present
- streaming.rs: replace `event_type.to_string()` with pre-computed
  boolean flags; eliminate `parsed.clone()` by reordering the
  `is_in_progress` check before moving ownership; remove unused
  `DEFAULT_MAX_ITERATIONS` import
- tool_loop.rs: add local `send_sse_event` helper and replace 7 inline
  SSE format+send instances; use `effective_tool_call_limit()`

Why:
Every forwarded streaming event was incurring an unnecessary JSON Value
deep-clone and a String allocation for the event type. The accumulator
was formatting an SSE block only to immediately re-parse it. These
micro-allocations add up on high-throughput streaming paths. The
duplicated limit calculation and SSE formatting patterns also made the
code harder to maintain.

How:
- Booleans computed before mutation eliminate borrow-checker workarounds
- `ingest_event` bypasses the format→parse_sse_block→parse_json cycle
- Ownership move replaces clone for the parsed JSON Value
- `contains('\r')` guard lets the common case (no CRLF) use push_str
- Shared helpers replace copy-pasted patterns

Signed-off-by: Simolin <simolin@users.noreply.github.com>
Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
@github-actions github-actions bot added model-gateway Model gateway crate changes openai OpenAI router changes labels Mar 12, 2026
@coderabbitai
Copy link

coderabbitai bot commented Mar 12, 2026

📝 Walkthrough

Walkthrough

Centralizes tool-call limit computation, adds unified SSE emission and pre-split event ingestion, consolidates accumulator fallback construction, exposes tool-loop state fields, and refactors MCP-related streaming transformations and CRLF chunk fast-paths.

Changes

Cohort / File(s) Summary
Tool call limit helper
model_gateway/src/routers/mcp_utils.rs
Adds pub fn effective_tool_call_limit(max_tool_calls: Option<usize>) -> usize to clamp/fallback tool-call limits against DEFAULT_MAX_ITERATIONS.
SSE & Tool loop
model_gateway/src/routers/openai/mcp/tool_loop.rs
Introduces send_sse_event helper, replaces ad-hoc SSE formatting, exposes ToolLoopState fields publicly, and uses effective_tool_call_limit for iteration caps.
Tool handler ingestion
model_gateway/src/routers/openai/mcp/tool_handler.rs
Switches from ingest_block(...) to ingest_event(event_name, data) to avoid format+reparse when ingesting events.
Accumulator fallback & ingest API
model_gateway/src/routers/openai/responses/accumulator.rs
Adds pub fn ingest_event(&mut self, event_name: Option<&str>, data: &str) and consolidates fallback construction into a new private assemble_fallback(...) helper used by snapshot/into_final_response paths.
Streaming event transformations
model_gateway/src/routers/openai/responses/streaming.rs
Adds is_mcp_args_done param to apply_event_transformations_inplace, centralizes MCP remapping logic, reuses pre-parsed JSON for forwarding, and replaces inline DEFAULT_MAX_ITERATIONS uses with effective_tool_call_limit.
Chunk buffer optimization
model_gateway/src/routers/openai/responses/common.rs
Adds a fast-path in ChunkProcessor::push_chunk that appends chunks without \r directly to the pending buffer, bypassing CRLF normalization.

Sequence Diagram(s)

sequenceDiagram
    rect rgba(200,230,201,0.5)
    participant Client
    end
    rect rgba(187,222,251,0.5)
    participant ToolLoop
    participant ToolHandler
    participant Accumulator
    participant Streaming
    participant mcp_utils
    end

    Client->>ToolLoop: start tool loop request
    ToolLoop->>mcp_utils: effective_tool_call_limit(max_tool_calls)
    mcp_utils-->>ToolLoop: limit
    ToolLoop->>ToolHandler: emit SSE events (send_sse_event)
    ToolHandler->>Accumulator: ingest_event(event_name, data)
    Accumulator-->>ToolHandler: ack / assembled output
    ToolHandler->>Streaming: forward pre-parsed JSON / transformed events
    Streaming-->>ToolLoop: in_progress / completion events
    ToolLoop->>Client: SSE stream (events)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

mcp

Suggested reviewers

  • CatherineSue
  • key4ng

Poem

🐰 I hopped through streams and clamped each call,
Emitted tidy SSEs, no needless brawl,
Events now lean, fallbacks neatly spun,
Hops are swift — the MCP race is won! ✨

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely captures the main objective of the PR: refactoring and optimizing OpenAI streaming internals with simplification of code patterns and removal of allocations.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch slin/oai-refactor-8

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the performance and maintainability of the OpenAI streaming internals within the Model Control Plane (MCP) gateway. It targets and resolves several micro-allocations and redundant code patterns that previously impacted high-throughput streaming paths, resulting in a more efficient and streamlined codebase.

Highlights

  • Performance Optimization: Eliminated hot-path allocations, such as JSON cloning and String allocations, on every forwarded streaming event.
  • Code Reusability: Extracted effective_tool_call_limit() into a shared helper function, replacing six identical inline calculations.
  • Streaming Efficiency: Added ingest_event() to bypass the SSE format and re-parse round-trip within the accumulator.
  • Code Unification: Unified duplicated fallback response builders and SSE send patterns into common helpers.
Changelog
  • model_gateway/src/routers/mcp_utils.rs
    • Added effective_tool_call_limit() shared helper.
  • model_gateway/src/routers/openai/mcp/tool_handler.rs
    • Updated to use ingest_event() instead of format+reparse.
  • model_gateway/src/routers/openai/mcp/tool_loop.rs
    • Added local send_sse_event helper, replacing seven inline instances.
    • Updated to use effective_tool_call_limit().
  • model_gateway/src/routers/openai/responses/accumulator.rs
    • Added ingest_event() fast path.
    • Unified build_fallback_response and build_fallback_response_snapshot into assemble_fallback.
  • model_gateway/src/routers/openai/responses/common.rs
    • Implemented a fast-path in ChunkProcessor::push_chunk to skip char-by-char processing when no \r is present.
  • model_gateway/src/routers/openai/responses/streaming.rs
    • Replaced event_type.to_string() with pre-computed boolean flags.
    • Modified to move parsed instead of cloning it.
    • Removed an unused import.
    • Updated to use effective_tool_call_limit.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@gemini-code-assist
Copy link

Warning

Gemini encountered an error creating the review. You can try again by commenting /gemini review.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@model_gateway/src/routers/openai/mcp/tool_loop.rs`:
- Around line 37-47: Duplicate send_sse_event helpers exist (this file's
send_sse_event and the one in streaming.rs); extract the shared logic into a
single private utility (e.g., a new module or helper function referenced by both
modules) and replace both local implementations with calls to that shared
function to reduce duplication while preserving current behavior and signature
(keep the parameters tx: &mpsc::UnboundedSender<Result<Bytes, io::Error>>,
event_name: &str, payload: &Value and return bool).

In `@model_gateway/src/routers/openai/responses/streaming.rs`:
- Around line 169-181: The is_args_done branch currently always remaps events to
McpEvent::CALL_ARGUMENTS_DONE and rewrites item_id, but it must only do this for
MCP/exposed tools; update the logic to consult the same guard used in
is_output_item_event (ctx.session.has_exposed_tool(&tool_name)) by resolving the
tool name for the given item_id/output_index before transforming. To implement
this: when handling is_output_item_event (or wherever output_index→tool mapping
is known) record a mapping from item_id (or output_index) to tool_name in the
StreamingToolHandler (or another handler-scoped map), expose that map to the
code handling is_args_done, and in the is_args_done branch look up the tool_name
for parsed_data["item_id"] (or parsed_data["output_index"]) and only set
parsed_data["type"] = McpEvent::CALL_ARGUMENTS_DONE and rewrite the fc_* → mcp_*
id when ctx.session.filter(|s| s.has_exposed_tool(&tool_name)) returns true;
leave regular function tool events unchanged.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 5c817b02-4614-4616-8507-a7c6c7e8481f

📥 Commits

Reviewing files that changed from the base of the PR and between 8d6689b and 34cd4af.

📒 Files selected for processing (6)
  • model_gateway/src/routers/mcp_utils.rs
  • model_gateway/src/routers/openai/mcp/tool_handler.rs
  • model_gateway/src/routers/openai/mcp/tool_loop.rs
  • model_gateway/src/routers/openai/responses/accumulator.rs
  • model_gateway/src/routers/openai/responses/common.rs
  • model_gateway/src/routers/openai/responses/streaming.rs

Comment on lines +37 to +47
/// Send an SSE event to the client channel.
/// Returns false if client disconnected.
#[inline]
fn send_sse_event(
tx: &mpsc::UnboundedSender<Result<Bytes, io::Error>>,
event_name: &str,
payload: &Value,
) -> bool {
let block = format!("event: {event_name}\ndata: {payload}\n\n");
tx.send(Ok(Bytes::from(block))).is_ok()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

LGTM! Clean SSE helper consolidating inline formatting.

The helper correctly formats the SSE block and returns connection status. The #[inline] hint is appropriate for this small, hot-path function.

Note: An identical send_sse_event helper exists in streaming.rs (lines 205-212). The duplication is acceptable given both are module-private, but could be consolidated into a shared utility if more call sites emerge.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/routers/openai/mcp/tool_loop.rs` around lines 37 - 47,
Duplicate send_sse_event helpers exist (this file's send_sse_event and the one
in streaming.rs); extract the shared logic into a single private utility (e.g.,
a new module or helper function referenced by both modules) and replace both
local implementations with calls to that shared function to reduce duplication
while preserving current behavior and signature (keep the parameters tx:
&mpsc::UnboundedSender<Result<Bytes, io::Error>>, event_name: &str, payload:
&Value and return bool).

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@model_gateway/src/routers/openai/responses/streaming.rs`:
- Around line 133-137: The code allocates a new String for tool_name
unnecessarily; change the declaration to keep it as a &str by removing
.to_string() and using the &str returned from item.get("name").and_then(|v|
v.as_str()).unwrap_or(""), i.e. bind tool_name as &str (retain the same variable
name) so downstream consumers that accept &str need no change and the
micro-allocation is eliminated.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 1cd16fe9-1201-4772-8417-af330af18ae9

📥 Commits

Reviewing files that changed from the base of the PR and between 34cd4af and 1bd2d71.

📒 Files selected for processing (1)
  • model_gateway/src/routers/openai/responses/streaming.rs

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 1bd2d71654

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +366 to 367
if is_mcp_args_done
&& !send_buffered_arguments(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve argument events for non-MCP function calls

Guarding send_buffered_arguments behind is_mcp_args_done means mixed-tool streams (requests that include MCP tools plus regular function tools) no longer emit any reconstructed argument payload for regular function calls, even though this path still suppresses response.function_call_arguments.delta events earlier in forward_streaming_event. In upstream streams where function_call_arguments.done does not carry the full arguments field, clients will lose tool arguments entirely for non-MCP calls, which is a behavioral regression from the previous logic that always emitted buffered arguments.

Useful? React with 👍 / 👎.

…ls only

The is_args_done branch in apply_event_transformations_inplace was
unconditionally transforming all function_call_arguments.done events
to MCP format (remapping type to mcp_call_arguments.done, rewriting
fc_* item IDs to mcp_*). This is incorrect when a request includes
both MCP tools and regular function tools — regular function tool
events should pass through unchanged.

What changed:
- streaming.rs: add is_mcp_args_done flag computed by looking up the
  tool name from handler.pending_calls and checking
  ctx.session.has_exposed_tool()
- Guard send_buffered_arguments (synthetic MCP delta) on the flag
- Guard the is_args_done transformation branch on the flag
- Guard map_event_name (fc→mcp event name remapping) on the flag
- Add is_mcp_args_done parameter to apply_event_transformations_inplace

Why:
When a request mixes MCP tools with regular function tools, the model
may call either type. Regular function_call events must flow to the
client unmodified. The output_item transformation was already guarded
by has_exposed_tool, but the arguments_done path was not.

Signed-off-by: Simolin <simolin@users.noreply.github.com>
Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
@slin1237 slin1237 force-pushed the slin/oai-refactor-8 branch from 1bd2d71 to 549f927 Compare March 12, 2026 05:41
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@model_gateway/src/routers/openai/responses/streaming.rs`:
- Around line 144-146: When rewriting output items you currently reduce the
resolved response format to the boolean is_mcp_args_done which loses whether the
tool is a WebSearchCall vs MCP and causes mixed event/id families; change the
data you thread through the arguments path to carry the resolved ResponseFormat
(or a tiny enum like TargetFamily {Mcp, WebSearch}) instead of a boolean. Update
the place that computes (new_type, id_prefix) from ResponseFormat (where
id_prefix is set to "ws_" or "mcp_") to pass that ResponseFormat/TargetFamily
into the code that sets is_mcp_args_done and into all downstream uses that build
McpEvent::CALL_ARGUMENTS_* and mcp_* ids; then switch those places to consult
the carried ResponseFormat/TargetFamily to choose the correct McpEvent variant,
ItemType and id_prefix so the event family stays consistent for the tool
throughout the stream.
- Around line 352-361: The stream handling currently only protects the `.done`
path with is_mcp_args_done, but forward_streaming_event still drops every
function_call.arguments.delta; update forward_streaming_event (and the
is_mcp_args_done guard usage) to only buffer/synthesize deltas for MCP-exposed
tools: use extract_output_index(&parsed_data).and_then(|idx|
handler.pending_calls.iter().find(|c| c.output_index==idx)) and
ctx.session.is_some_and(|s| s.has_exposed_tool(&call.name)) to detect MCP calls,
and for non-MCP calls let the original function_call.arguments.delta pass
through unchanged (do not consume/replace it and do not set mapped_output_index)
so ordinary function tools continue to receive incremental argument deltas.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 20eda41c-6d51-4f6a-b40f-0d77da0544a1

📥 Commits

Reviewing files that changed from the base of the PR and between 1bd2d71 and 549f927.

📒 Files selected for processing (1)
  • model_gateway/src/routers/openai/responses/streaming.rs

Comment on lines +144 to +146
let (new_type, id_prefix) = match response_format {
ResponseFormat::WebSearchCall => (ItemType::WEB_SEARCH_CALL, "ws_"),
_ => (ItemType::MCP_CALL, "mcp_"),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

is_mcp_args_done loses the tool's actual response format.

The output-item rewrite can classify an exposed tool as web_search_call with a ws_ id, but this boolean still drives the arguments path through McpEvent::CALL_ARGUMENTS_* and mcp_*. That makes the same tool switch event/id families mid-stream for ResponseFormat::WebSearchCall.

Please carry the resolved response format, or a small enum with the target id prefix/event family, through the args path instead of reducing it to a boolean.

Also applies to: 171-177, 354-357

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/routers/openai/responses/streaming.rs` around lines 144 -
146, When rewriting output items you currently reduce the resolved response
format to the boolean is_mcp_args_done which loses whether the tool is a
WebSearchCall vs MCP and causes mixed event/id families; change the data you
thread through the arguments path to carry the resolved ResponseFormat (or a
tiny enum like TargetFamily {Mcp, WebSearch}) instead of a boolean. Update the
place that computes (new_type, id_prefix) from ResponseFormat (where id_prefix
is set to "ws_" or "mcp_") to pass that ResponseFormat/TargetFamily into the
code that sets is_mcp_args_done and into all downstream uses that build
McpEvent::CALL_ARGUMENTS_* and mcp_* ids; then switch those places to consult
the carried ResponseFormat/TargetFamily to choose the correct McpEvent variant,
ItemType and id_prefix so the event family stays consistent for the tool
throughout the stream.

Comment on lines +352 to +361
// Determine if this arguments_done event is for an MCP-exposed tool.
// Regular function tool events should pass through without MCP transformation.
let is_mcp_args_done = event_name == Some(FunctionCallEvent::ARGUMENTS_DONE)
&& extract_output_index(&parsed_data)
.and_then(|idx| handler.pending_calls.iter().find(|c| c.output_index == idx))
.is_some_and(|call| ctx.session.is_some_and(|s| s.has_exposed_tool(&call.name)));

// Handle function_call_arguments.done - send buffered args only for MCP tools
let mut mapped_output_index: Option<usize> = None;
if event_name == Some(FunctionCallEvent::ARGUMENTS_DONE)
if is_mcp_args_done
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Preserve function_call.arguments.delta for pass-through tools.

This new guard only fixes the buffered .done path. forward_streaming_event() still drops every function_call.arguments.delta before it reaches this branch, so a stream that includes both intercepted MCP tools and ordinary function tools will never forward incremental arguments for the ordinary tool.

Buffer/synthesize deltas only for exposed MCP tools, and let non-MCP deltas pass through unchanged.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/routers/openai/responses/streaming.rs` around lines 352 -
361, The stream handling currently only protects the `.done` path with
is_mcp_args_done, but forward_streaming_event still drops every
function_call.arguments.delta; update forward_streaming_event (and the
is_mcp_args_done guard usage) to only buffer/synthesize deltas for MCP-exposed
tools: use extract_output_index(&parsed_data).and_then(|idx|
handler.pending_calls.iter().find(|c| c.output_index==idx)) and
ctx.session.is_some_and(|s| s.has_exposed_tool(&call.name)) to detect MCP calls,
and for non-MCP calls let the original function_call.arguments.delta pass
through unchanged (do not consume/replace it and do not set mapped_output_index)
so ordinary function tools continue to receive incremental argument deltas.

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 549f927481

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +361 to 362
if is_mcp_args_done
&& !send_buffered_arguments(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Restore buffered arguments for non-MCP done events

response.function_call_arguments.delta events are still dropped unconditionally, but this change now calls send_buffered_arguments only when is_mcp_args_done is true. In mixed-tool streams (MCP tools plus regular function tools), a regular function call that streams args via deltas and sends a ...arguments.done payload without a full arguments field will now lose its arguments entirely, which is a regression from the previous behavior that reconstructed args for all done events.

Useful? React with 👍 / 👎.

slin1237 added a commit that referenced this pull request Mar 12, 2026
…ages API

Add the first message-specific pipeline stage (Stage 1: Preparation) and
the utility functions it needs to convert Anthropic Messages API types
into the internal chat template format.

What changed:
- New message_utils.rs with conversion functions:
  - process_messages(): top-level orchestrator parallel to process_chat_messages()
  - process_message_content_format(): converts InputMessage to Vec<Value> JSON
  - convert_user_message(): handles user messages, splits ToolResult into
    separate "tool" role messages
  - convert_assistant_message(): extracts text, tool_calls, reasoning_content
  - extract_chat_tools(): filters Custom tools and converts to chat::Tool
  - convert_message_tool_choice(): maps Messages ToolChoice to chat ToolChoice
  - extract_tool_result_text(): helper for ToolResult content extraction
  - 7 unit tests covering all major conversion paths
- New MessagePreparationStage (git-cp'd from ChatPreparationStage):
  - Same structure as ChatPreparationStage (impl method pattern)
  - Resolves tokenizer, converts/filters tools, processes messages,
    tokenizes, builds tool constraints, creates stop decoder
  - Multimodal processing postponed (marked with async for future .await)
- Made process_tool_call_arguments pub(crate) in chat_utils.rs for reuse
- Updated delegating PreparationStage to use Display-based error messages
- Removed stale #[expect(dead_code)] from messages_request_arc (now used)

Why:
This is PR 2 in the Messages API gRPC pipeline series. PR 1 (#739) added
type scaffolding. This PR adds the preparation stage that converts
Messages API requests into the shared internal format, enabling the
existing request building and response processing stages to work with
Messages API requests in follow-up PRs.

How:
Follows the same architecture as chat: reuses shared utilities
(resolve_tokenizer, filter_tools_by_tool_choice, generate_tool_constraints,
create_stop_decoder, process_tool_call_arguments) and only replaces the
message-specific conversion layer (process_content_format → process_message_content_format).
Uses git-cp to preserve file history from chat/preparation.rs for reviewability.

Refs: #738
Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
slin1237 added a commit that referenced this pull request Mar 12, 2026
…ages API

Add the first message-specific pipeline stage (Stage 1: Preparation) and
the utility functions it needs to convert Anthropic Messages API types
into the internal chat template format.

What changed:
- New message_utils.rs with conversion functions:
  - process_messages(): top-level orchestrator parallel to process_chat_messages()
  - process_message_content_format(): converts InputMessage to Vec<Value> JSON
  - convert_user_message(): handles user messages, splits ToolResult into
    separate "tool" role messages
  - convert_assistant_message(): extracts text, tool_calls, reasoning_content
  - extract_chat_tools(): filters Custom tools and converts to chat::Tool
  - convert_message_tool_choice(): maps Messages ToolChoice to chat ToolChoice
  - extract_tool_result_text(): helper for ToolResult content extraction
  - 7 unit tests covering all major conversion paths
- New MessagePreparationStage (git-cp'd from ChatPreparationStage):
  - Same structure as ChatPreparationStage (impl method pattern)
  - Resolves tokenizer, converts/filters tools, processes messages,
    tokenizes, builds tool constraints, creates stop decoder
  - Multimodal processing postponed (marked with async for future .await)
- Made process_tool_call_arguments pub(crate) in chat_utils.rs for reuse
- Updated delegating PreparationStage to use Display-based error messages
- Removed stale #[expect(dead_code)] from messages_request_arc (now used)

Why:
This is PR 2 in the Messages API gRPC pipeline series. PR 1 (#739) added
type scaffolding. This PR adds the preparation stage that converts
Messages API requests into the shared internal format, enabling the
existing request building and response processing stages to work with
Messages API requests in follow-up PRs.

How:
Follows the same architecture as chat: reuses shared utilities
(resolve_tokenizer, filter_tools_by_tool_choice, generate_tool_constraints,
create_stop_decoder, process_tool_call_arguments) and only replaces the
message-specific conversion layer (process_content_format → process_message_content_format).
Uses git-cp to preserve file history from chat/preparation.rs for reviewability.

Refs: #738
Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
slin1237 added a commit that referenced this pull request Mar 12, 2026
…ages API

Add the first message-specific pipeline stage (Stage 1: Preparation) and
the utility functions it needs to convert Anthropic Messages API types
into the internal chat template format.

What changed:
- New message_utils.rs with conversion functions:
  - process_messages(): top-level orchestrator parallel to process_chat_messages()
  - process_message_content_format(): converts InputMessage to Vec<Value> JSON
  - convert_user_message(): handles user messages, splits ToolResult into
    separate "tool" role messages
  - convert_assistant_message(): extracts text, tool_calls, reasoning_content
  - extract_chat_tools(): filters Custom tools and converts to chat::Tool
  - convert_message_tool_choice(): maps Messages ToolChoice to chat ToolChoice
  - extract_tool_result_text(): helper for ToolResult content extraction
  - 7 unit tests covering all major conversion paths
- New MessagePreparationStage (parallel to ChatPreparationStage):
  - Same structure as ChatPreparationStage (impl method pattern)
  - Resolves tokenizer, converts/filters tools, processes messages,
    tokenizes, builds tool constraints, creates stop decoder
  - Multimodal processing postponed (marked with async for future .await)
- Made process_tool_call_arguments pub(crate) in chat_utils.rs for reuse
- Updated delegating PreparationStage to use Display-based error messages
- Removed stale #[expect(dead_code)] from messages_request_arc (now used)

Why:
This is PR 2 in the Messages API gRPC pipeline series. PR 1 (#739) added
type scaffolding. This PR adds the preparation stage that converts
Messages API requests into the shared internal format, enabling the
existing request building and response processing stages to work with
Messages API requests in follow-up PRs.

How:
Follows the same architecture as chat: reuses shared utilities
(resolve_tokenizer, filter_tools_by_tool_choice, generate_tool_constraints,
create_stop_decoder, process_tool_call_arguments) and only replaces the
message-specific conversion layer (process_content_format → process_message_content_format).

Refs: #738
Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
slin1237 added a commit that referenced this pull request Mar 12, 2026
…ages API

Add the first message-specific pipeline stage (Stage 1: Preparation) and
the utility functions it needs to convert Anthropic Messages API types
into the internal chat template format.

What changed:
- New message_utils.rs with conversion functions:
  - process_messages(): top-level orchestrator parallel to process_chat_messages()
  - process_message_content_format(): converts InputMessage to Vec<Value> JSON
  - convert_user_message(): handles user messages, splits ToolResult into
    separate "tool" role messages
  - convert_assistant_message(): extracts text, tool_calls, reasoning_content
  - extract_chat_tools(): filters Custom tools and converts to chat::Tool
  - convert_message_tool_choice(): maps Messages ToolChoice to chat ToolChoice
  - extract_tool_result_text(): helper for ToolResult content extraction
  - 7 unit tests covering all major conversion paths
- New MessagePreparationStage (parallel to ChatPreparationStage):
  - Same structure as ChatPreparationStage (impl method pattern)
  - Resolves tokenizer, converts/filters tools, processes messages,
    tokenizes, builds tool constraints, creates stop decoder
  - Multimodal processing postponed (marked with async for future .await)
- Made process_tool_call_arguments pub(crate) in chat_utils.rs for reuse
- Updated delegating PreparationStage to use Display-based error messages
- Removed stale #[expect(dead_code)] from messages_request_arc (now used)

Why:
This is PR 2 in the Messages API gRPC pipeline series. PR 1 (#739) added
type scaffolding. This PR adds the preparation stage that converts
Messages API requests into the shared internal format, enabling the
existing request building and response processing stages to work with
Messages API requests in follow-up PRs.

How:
Follows the same architecture as chat: reuses shared utilities
(resolve_tokenizer, filter_tools_by_tool_choice, generate_tool_constraints,
create_stop_decoder, process_tool_call_arguments) and only replaces the
message-specific conversion layer (process_content_format → process_message_content_format).

Refs: #738
Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
@mergify
Copy link

mergify bot commented Mar 18, 2026

Hi @slin1237, this PR has merge conflicts that must be resolved before it can be merged. Please rebase your branch:

git fetch origin main
git rebase origin/main
# resolve any conflicts, then:
git push --force-with-lease

@mergify mergify bot added the needs-rebase PR has merge conflicts that need to be resolved label Mar 18, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

model-gateway Model gateway crate changes needs-rebase PR has merge conflicts that need to be resolved openai OpenAI router changes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant