feat(realtime-api): WebRTC signaling handler and OpenAIRouter integra…#748
feat(realtime-api): WebRTC signaling handler and OpenAIRouter integra…#748pallasathena92 wants to merge 1 commit intomainfrom
Conversation
…tion Signed-off-by: yifeliu <yifengliu9@gmail.com>
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a complete implementation for WebRTC real-time communication within the model gateway. It enables the Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
📝 WalkthroughWalkthroughThis PR introduces WebRTC realtime signaling support for the /v1/realtime/calls endpoint. It adds request parsing and validation for multipart/SDP content, a bidirectional relay bridge using str0m for WebRTC protocol handling, upstream HTTP integration, and router-level hookup with worker selection and metrics. Changes
Sequence DiagramsequenceDiagram
participant Client
participant Handler as WebRTC Handler
participant Bridge as WebRTC Bridge
participant Upstream as Upstream Server
participant Registry as Realtime Registry
Client->>Handler: POST /v1/realtime/calls<br/>(multipart SDP or direct SDP)
Handler->>Handler: Parse & validate request<br/>(extract model, SDP, config)
Handler->>Handler: Resolve auth & select worker
Handler->>Bridge: setup(sdp, upstream_url, ...)
activate Bridge
Bridge->>Bridge: Bind UDP sockets<br/>(client & upstream sides)
Bridge->>Bridge: Gather ICE candidates<br/>(loopback + STUN SRFLX)
Bridge->>Upstream: POST SDP offer<br/>(multipart or direct)
Upstream-->>Bridge: SDP answer
Bridge->>Bridge: Create bidirectional RTCs<br/>(configure ICE-lite for client)
deactivate Bridge
Bridge-->>Handler: (bridge, answer_sdp)
Handler-->>Client: 200 OK + SDP answer
par Bridge Relay Loop
Bridge->>Bridge: run() - continuous relay
loop Bidirectional Data Flow
Client->>Bridge: UDP packets (RTP/RTCP)
Bridge->>Bridge: Process client RTC outputs<br/>(channels, data, RTP)
Bridge->>Upstream: Forward RTP/data via UDP
Upstream->>Bridge: UDP packets (RTP/RTCP)
Bridge->>Bridge: Process upstream RTC outputs
Bridge->>Client: Forward RTP/data via UDP
end
Bridge->>Registry: Update connection state
and Cancellation
Registry->>Bridge: Cancel token signal
Bridge->>Bridge: Graceful shutdown
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~70 minutes Possibly related issues
Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
📝 Coding Plan
Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a significant new feature: WebRTC signaling support. The implementation is well-structured, splitting logic between request handling (webrtc.rs) and the core relay logic (webrtc_bridge.rs). The use of str0m for the WebRTC stack is appropriate, and the implementation correctly handles the event loop, message buffering, and RTP packet forwarding. My feedback focuses on improving robustness by handling potential errors from str0m's write operations to prevent silent data loss, and enhancing maintainability by replacing a magic number with a named constant.
| if let Some(mut ch) = self.client_rtc.channel(ch_id) { | ||
| let _ = ch.write(data.binary, &data.data); | ||
| } |
There was a problem hiding this comment.
The result of ch.write is ignored. This can lead to silent message dropping if the channel buffer is full. It's better to log an error if writing fails.
if let Some(mut ch) = self.client_rtc.channel(ch_id) {
if let Err(e) = ch.write(data.binary, &data.data) {
warn!(call_id = self.call_id, error = %e, "Failed to write to client data channel, message dropped");
}
}References
- Instead of silently ignoring potential failures, log them as warnings to aid in debugging.
| ); | ||
| if let Some(mut ch) = self.client_rtc.channel(ch_id) { | ||
| for (binary, data) in pending { | ||
| let _ = ch.write(binary, &data); |
There was a problem hiding this comment.
The result of ch.write is ignored. This can lead to silent message dropping if the channel buffer is full. It's better to log an error if writing fails.
if let Err(e) = ch.write(binary, &data) {
warn!(call_id = self.call_id, error = %e, "Failed to flush message to client data channel, message dropped");
}References
- Instead of silently ignoring potential failures, log them as warnings to aid in debugging.
| ); | ||
| if let Some(mut ch) = self.upstream_rtc.channel(ch_id) { | ||
| for (binary, data) in pending { | ||
| let _ = ch.write(binary, &data); |
There was a problem hiding this comment.
The result of ch.write is ignored. This can lead to silent message dropping if the channel buffer is full. It's better to log an error if writing fails.
if let Err(e) = ch.write(binary, &data) {
warn!(call_id = self.call_id, error = %e, "Failed to flush message to upstream data channel, message dropped");
}References
- Instead of silently ignoring potential failures, log them as warnings to aid in debugging.
| let _ = tx.write_rtp( | ||
| pkt.header.payload_type, | ||
| pkt.seq_no, | ||
| pkt.header.timestamp, | ||
| pkt.timestamp, | ||
| pkt.header.marker, | ||
| pkt.header.ext_vals.clone(), | ||
| true, | ||
| pkt.payload.clone(), | ||
| ); |
There was a problem hiding this comment.
The result of tx.write_rtp is ignored. This can fail if str0m's internal buffers are full, leading to silent dropping of RTP packets. It's better to log a warning on failure.
if let Err(e) = tx.write_rtp(
pkt.header.payload_type,
pkt.seq_no,
pkt.header.timestamp,
pkt.timestamp,
pkt.header.marker,
pkt.header.ext_vals.clone(),
true,
pkt.payload.clone(),
) {
warn!(call_id = self.call_id, error = %e, "Failed to forward RTP packet, packet dropped");
}References
- Instead of silently ignoring potential failures, log them as warnings to aid in debugging.
|
|
||
| async fn route_realtime_webrtc(&self, req: Request<Body>, model: &str) -> Response { | ||
| let (parts, body) = req.into_parts(); | ||
| let body = match axum::body::to_bytes(body, 10 * 1024 * 1024).await { |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: c2ec5af181
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| ); | ||
|
|
||
| let call_id = uuid::Uuid::now_v7().to_string(); | ||
| let stun_server = resolve_stun_server(configured_stun_server.as_deref()).await; |
There was a problem hiding this comment.
Honor the documented default STUN server
This path disables STUN whenever webrtc_stun_server is unset, because None is forwarded directly into resolve_stun_server. The surrounding config/docs in this repo state that None should mean stun.l.google.com:19302, so default deployments silently skip srflx gathering. In NATed environments, that can cause upstream WebRTC setup to fail unless operators explicitly set --webrtc-stun-server.
Useful? React with 👍 / 👎.
| // Parse body once to extract model, SDP, and session config. | ||
| // For multipart, the model comes from the session JSON body. | ||
| // For application/sdp, the model comes from the query parameter. | ||
| let parsed = match webrtc::parse_webrtc_request(&parts, &body, model).await { |
There was a problem hiding this comment.
Avoid body-model routing after router selection
The multipart flow depends on extracting model from the request body here, but router selection has already happened earlier using the query model. In IGW/multi-router setups, /v1/realtime/calls multipart requests without ?model= are selected with an empty model and can be rejected before reaching this parser, so the new multipart path is effectively unreachable in that mode.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@model_gateway/src/routers/openai/realtime/webrtc_bridge.rs`:
- Around line 549-560: The current guard only counts messages
(self.pending_to_upstream.len()) and can still buffer huge memory; introduce a
byte-budget check using a new field (e.g., self.pending_bytes or
self.pending_buffered_bytes) and a constant cap (e.g., MAX_PENDING_BUFFER_BYTES)
and apply it when pushing to both self.pending_to_upstream and the corresponding
self.pending_to_downstream path; on push, add the length of data.data (or
data.binary) to the counter and if the cap would be exceeded, log with call_id
and tear the bridge down (or drop the connection) instead of buffering, and when
draining/sending those queued messages subtract their byte lengths from the
counter so the budget is accurate. Ensure updates occur in the same functions
that call .push((data.binary, data.data.to_vec())) and the symmetric push around
lines ~606-617 so both directions are bounded.
- Around line 233-259: The code currently sends the SMG SDP upstream
(send_sdp_to_upstream / upstream_rtc.sdp_api().accept_answer) before validating
and accepting the browser's offer, which can create orphaned upstream calls on
malformed client offers; change the flow to first parse and validate the client
offer by constructing client_rtc (RtcConfig::new()...build), call
SdpOffer::from_sdp_string and client_rtc.sdp_api().accept_offer and compute
client_audio_mid via find_audio_mid, returning a 400 on any parse/accept error,
and only after successful client-side RTC setup proceed to call
send_sdp_to_upstream and accept the upstream answer with
upstream_rtc.sdp_api().accept_answer.
In `@model_gateway/src/routers/openai/realtime/webrtc.rs`:
- Around line 394-415: Move the worker.record_outcome(false) call out of the
top-level Err arm and only record failures for genuine upstream errors: remove
the unconditional worker.record_outcome(false) before matching BridgeSetupError,
call worker.record_outcome(false) inside the BridgeSetupError::UpstreamHttp
branch (so upstream rejections are counted as worker failures), and change the
BridgeSetupError::Other branch to inspect the contained err to distinguish
client/local errors (e.g., malformed client SDP) from local setup faults —
return StatusCode::BAD_REQUEST for client SDP/malformed errors without calling
worker.record_outcome, but for true local setup failures call
worker.record_outcome(false) and return StatusCode::BAD_GATEWAY as before; keep
using Response::builder / StatusCode::BAD_GATEWAY / StatusCode::BAD_REQUEST and
the existing logging (warn! / error!) in their respective branches.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 82a50986-e9f2-470b-8747-a2598b915405
📒 Files selected for processing (6)
Cargo.tomlmodel_gateway/Cargo.tomlmodel_gateway/src/routers/openai/realtime/mod.rsmodel_gateway/src/routers/openai/realtime/webrtc.rsmodel_gateway/src/routers/openai/realtime/webrtc_bridge.rsmodel_gateway/src/routers/openai/router.rs
| // -- 3. Send offer to OpenAI, get answer ---------------------------- | ||
| let upstream_answer = send_sdp_to_upstream( | ||
| http_client, | ||
| upstream_url, | ||
| auth_header, | ||
| &upstream_offer.to_sdp_string(), | ||
| session_config, | ||
| ) | ||
| .await?; | ||
|
|
||
| upstream_rtc | ||
| .sdp_api() | ||
| .accept_answer(pending, upstream_answer)?; | ||
|
|
||
| // -- 4. Create client Rtc (answerer, ICE-lite) ---------------------- | ||
| // ICE-lite: SMG only responds to the browser's connectivity checks. | ||
| // This avoids needing to resolve mDNS candidates the browser may | ||
| // advertise, and is the standard mode for SFU/relay servers. | ||
| let mut client_rtc = RtcConfig::new() | ||
| .set_rtp_mode(true) | ||
| .set_ice_lite(true) | ||
| .build(Instant::now()); | ||
| client_rtc.add_local_candidate(Candidate::host(client_candidate, Protocol::Udp)?); | ||
|
|
||
| let client_offer = SdpOffer::from_sdp_string(client_sdp_offer_str)?; | ||
| let client_answer = client_rtc.sdp_api().accept_offer(client_offer)?; | ||
| let client_audio_mid = find_audio_mid(&client_rtc); |
There was a problem hiding this comment.
Validate the browser offer before opening the upstream call.
Line 234 posts SMG's SDP upstream before Lines 257-258 parse and accept the client offer. Because the earlier request parser only checks that the body starts with v=0, malformed offers can still reach this branch, create an upstream realtime call, and then abort locally without ever spawning a bridge to clean it up. Build/accept the client-side Rtc first and return 400 before touching the worker.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@model_gateway/src/routers/openai/realtime/webrtc_bridge.rs` around lines 233
- 259, The code currently sends the SMG SDP upstream (send_sdp_to_upstream /
upstream_rtc.sdp_api().accept_answer) before validating and accepting the
browser's offer, which can create orphaned upstream calls on malformed client
offers; change the flow to first parse and validate the client offer by
constructing client_rtc (RtcConfig::new()...build), call
SdpOffer::from_sdp_string and client_rtc.sdp_api().accept_offer and compute
client_audio_mid via find_audio_mid, returning a 400 on any parse/accept error,
and only after successful client-side RTC setup proceed to call
send_sdp_to_upstream and accept the upstream answer with
upstream_rtc.sdp_api().accept_answer.
| } else if self.pending_to_upstream.len() >= 1000 { | ||
| warn!( | ||
| call_id = self.call_id, | ||
| "Pending-to-upstream buffer full, dropping message" | ||
| ); | ||
| } else { | ||
| trace!( | ||
| call_id = self.call_id, | ||
| "Buffering client event (upstream channel not open)" | ||
| ); | ||
| self.pending_to_upstream | ||
| .push((data.binary, data.data.to_vec())); |
There was a problem hiding this comment.
Cap buffered data-channel bytes, not just message count.
The 1000-message guard does not bound memory: either side can still queue 1000 large SCTP messages before the opposite channel opens. On this internet-facing path that is enough to create very large per-call buffers and memory spikes. Track buffered bytes (and preferably tear the bridge down once the budget is exceeded) instead of only counting messages.
Also applies to: 606-617
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@model_gateway/src/routers/openai/realtime/webrtc_bridge.rs` around lines 549
- 560, The current guard only counts messages (self.pending_to_upstream.len())
and can still buffer huge memory; introduce a byte-budget check using a new
field (e.g., self.pending_bytes or self.pending_buffered_bytes) and a constant
cap (e.g., MAX_PENDING_BUFFER_BYTES) and apply it when pushing to both
self.pending_to_upstream and the corresponding self.pending_to_downstream path;
on push, add the length of data.data (or data.binary) to the counter and if the
cap would be exceeded, log with call_id and tear the bridge down (or drop the
connection) instead of buffering, and when draining/sending those queued
messages subtract their byte lengths from the counter so the budget is accurate.
Ensure updates occur in the same functions that call .push((data.binary,
data.data.to_vec())) and the symmetric push around lines ~606-617 so both
directions are bounded.
| Err(e) => { | ||
| worker.record_outcome(false); | ||
| return match e { | ||
| BridgeSetupError::UpstreamHttp { | ||
| status, | ||
| body, | ||
| content_type, | ||
| } => { | ||
| warn!(call_id, model, %status, "Upstream rejected WebRTC bridge setup ({label})"); | ||
| let mut builder = Response::builder().status(status); | ||
| if let Some(ct) = content_type { | ||
| builder = builder.header("Content-Type", ct); | ||
| } | ||
| builder | ||
| .body(axum::body::Body::from(body)) | ||
| .unwrap_or_else(|_| StatusCode::BAD_GATEWAY.into_response()) | ||
| } | ||
| BridgeSetupError::Other(ref err) => { | ||
| error!(call_id, model, error = %err, "Failed to create WebRTC bridge ({label})"); | ||
| StatusCode::BAD_GATEWAY.into_response() | ||
| } | ||
| }; |
There was a problem hiding this comment.
Don't mark every bridge setup failure as a worker failure.
BridgeSetupError::Other also covers malformed client SDP and local setup errors, so this branch will penalize a healthy worker and return a blanket 502 for bad requests that never failed upstream. Split client/local failures from upstream failures before calling worker.record_outcome(false) and map client SDP errors to 400 instead.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@model_gateway/src/routers/openai/realtime/webrtc.rs` around lines 394 - 415,
Move the worker.record_outcome(false) call out of the top-level Err arm and only
record failures for genuine upstream errors: remove the unconditional
worker.record_outcome(false) before matching BridgeSetupError, call
worker.record_outcome(false) inside the BridgeSetupError::UpstreamHttp branch
(so upstream rejections are counted as worker failures), and change the
BridgeSetupError::Other branch to inspect the contained err to distinguish
client/local errors (e.g., malformed client SDP) from local setup faults —
return StatusCode::BAD_REQUEST for client SDP/malformed errors without calling
worker.record_outcome, but for true local setup failures call
worker.record_outcome(false) and return StatusCode::BAD_GATEWAY as before; keep
using Response::builder / StatusCode::BAD_GATEWAY / StatusCode::BAD_REQUEST and
the existing logging (warn! / error!) in their respective branches.
Description
follow #733. Some changes are in pr 733, after it merge into main, I will rebase this branch.
Problem
The WebRTC bridge engine (webrtc_bridge.rs) and router trait (route_realtime_webrtc) exist, but nothing parses incoming signaling requests, resolves authentication, selects a worker, or wires the bridge into OpenAIRouter. POST
/v1/realtime/calls returns 501.
Solution
Add webrtc.rs — the signaling handler that parses two content types (multipart/form-data with model from session JSON, application/sdp with model from query param), resolves auth (user header or worker API key fallback), and
spawns a WebRtcBridge relay task. A WorkerLoadGuard is moved into the spawned task so the worker's load count stays elevated for the bridge lifetime. Implement route_realtime_webrtc on OpenAIRouter, which stores context:
Arc to access webrtc_bind_addr and webrtc_stun_server. After this PR, the feature is fully functional end-to-end.
Changes
model_gateway/src/routers/openai/realtime/webrtc.rs— parse_webrtc_request() (multipart + direct SDP parsing), handle_realtime_webrtc(), auth resolution, STUN server DNS resolution, bridge spawn with load guard lifecycle. -model_gateway/src/routers/openai/realtime/mod.rs— Add pub mod webrtc;.model_gateway/src/routers/openai/router.rs— Add context: Arc field to OpenAIRouter. Implement route_realtime_webrtc() — body parsing, metrics recording, worker selection, config extraction, handlerTest Plan
Checklist
cargo +nightly fmtpassescargo clippy --all-targets --all-features -- -D warningspassesSummary by CodeRabbit