Add logging request level statistics from engine#693
Add logging request level statistics from engine#693scottjlee wants to merge 30 commits intolightseekorg:mainfrom
Conversation
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment Tip You can customize the tone of the review comments and chat replies.Configure the |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the observability of the model gateway by introducing a robust system for collecting and logging request-level statistics from various backend engines. It establishes new gRPC definitions for these statistics, integrates them into core response messages, and provides a unified, configurable mechanism for processing and emitting these performance metrics. This improvement offers deeper insights into request performance and behavior, particularly for streaming and aborted requests, aiding in debugging and performance analysis. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a new feature to collect and emit request-level statistics across different backend engines (SGLang, vLLM, TRTLLM). The changes involve updating .proto definitions to include RequestStats in GenerateResponse and AbortResponse, modifying gRPC client implementations to propagate these statistics, and adding a configuration option to enable/disable this feature. New data structures (RequestStatsFieldMapping, UnifiedRequestStats, RequestStatsEvent) are introduced in the observability module to normalize and emit these statistics. Response collection and streaming logic in the Harmony and regular gRPC routers are updated to process and log these RequestStats when enabled. Review comments suggest refactoring duplicated logging logic in RequestStatsEvent::emit for better maintainability and simplifying match statements with the ? operator in several response processing functions. Additionally, a duplicate and unused collect_stream_responses function in chat_utils.rs should be removed.
| impl Event for RequestStatsEvent<'_> { | ||
| #[inline] | ||
| fn emit(&self) { | ||
| let request_received_timestamp_s = | ||
| format_optional_f64(self.stats.request_received_timestamp_s); | ||
| let first_token_generated_timestamp_s = | ||
| format_optional_f64(self.stats.first_token_generated_timestamp_s); | ||
| let request_finished_timestamp_s = | ||
| format_optional_f64(self.stats.request_finished_timestamp_s); | ||
| let cache_hit_rate = format_optional_f64(self.stats.cache_hit_rate); | ||
| let spec_decoding_acceptance_rate = | ||
| format_optional_f64(self.stats.spec_decoding_acceptance_rate); | ||
| let http_status_code = format_optional_u16(self.http_status_code); | ||
| let error_message = self | ||
| .error_message | ||
| .or(self.stats.error_message.as_deref()) | ||
| .unwrap_or("None"); | ||
|
|
||
| if is_otel_enabled() { | ||
| event!( | ||
| Level::INFO, | ||
| request_id = %self.request_id, | ||
| model = %self.model, | ||
| router_backend = %self.router_backend, | ||
| http_status_code = %http_status_code, | ||
| error_message = %error_message, | ||
| engine = %self.stats.engine, | ||
| request_received_timestamp_s = %request_received_timestamp_s, | ||
| first_token_generated_timestamp_s = %first_token_generated_timestamp_s, | ||
| request_finished_timestamp_s = %request_finished_timestamp_s, | ||
| cache_hit_rate = %cache_hit_rate, | ||
| spec_decoding_acceptance_rate = %spec_decoding_acceptance_rate, | ||
| prompt_tokens = self.stats.prompt_tokens, | ||
| completion_tokens = self.stats.completion_tokens, | ||
| cached_tokens = self.stats.cached_tokens, | ||
| "request_stats" | ||
| ); | ||
| } else { | ||
| debug!( | ||
| request_id = %self.request_id, | ||
| model = %self.model, | ||
| router_backend = %self.router_backend, | ||
| http_status_code = %http_status_code, | ||
| error_message = %error_message, | ||
| engine = %self.stats.engine, | ||
| request_received_timestamp_s = %request_received_timestamp_s, | ||
| first_token_generated_timestamp_s = %first_token_generated_timestamp_s, | ||
| request_finished_timestamp_s = %request_finished_timestamp_s, | ||
| cache_hit_rate = %cache_hit_rate, | ||
| spec_decoding_acceptance_rate = %spec_decoding_acceptance_rate, | ||
| prompt_tokens = self.stats.prompt_tokens, | ||
| completion_tokens = self.stats.completion_tokens, | ||
| cached_tokens = self.stats.cached_tokens, | ||
| "request_stats" | ||
| ); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
The emit function contains a large block of duplicated code for handling the is_otel_enabled() cases. This can be simplified by determining the log level first and then using a single event! macro call. This will make the code more maintainable and less prone to errors if new fields are added.
impl Event for RequestStatsEvent<'_> {
#[inline]
fn emit(&self) {
let request_received_timestamp_s =
format_optional_f64(self.stats.request_received_timestamp_s);
let first_token_generated_timestamp_s =
format_optional_f64(self.stats.first_token_generated_timestamp_s);
let request_finished_timestamp_s =
format_optional_f64(self.stats.request_finished_timestamp_s);
let cache_hit_rate = format_optional_f64(self.stats.cache_hit_rate);
let spec_decoding_acceptance_rate =
format_optional_f64(self.stats.spec_decoding_acceptance_rate);
let http_status_code = format_optional_u16(self.http_status_code);
let error_message = self
.error_message
.or(self.stats.error_message.as_deref())
.unwrap_or("None");
let level = if is_otel_enabled() { Level::INFO } else { Level::DEBUG };
event!(
level,
request_id = %self.request_id,
model = %self.model,
router_backend = %self.router_backend,
http_status_code = %http_status_code,
error_message = %error_message,
engine = %self.stats.engine,
request_received_timestamp_s = %request_received_timestamp_s,
first_token_generated_timestamp_s = %first_token_generated_timestamp_s,
request_finished_timestamp_s = %request_finished_timestamp_s,
cache_hit_rate = %cache_hit_rate,
spec_decoding_acceptance_rate = %spec_decoding_acceptance_rate,
prompt_tokens = self.stats.prompt_tokens,
completion_tokens = self.stats.completion_tokens,
cached_tokens = self.stats.cached_tokens,
"request_stats"
);
}
}References
- The
emitfunction contains duplicated logic for handlingis_otel_enabled()cases. This rule suggests extracting duplicated logic into a shared helper function to improve maintainability and reduce redundancy.
| let response_collection::CollectedResponses { | ||
| completes: all_responses, | ||
| request_stats, | ||
| } = match response_collection::collect_responses( | ||
| execution_result, | ||
| request_logprobs, | ||
| self.enable_request_statistics, | ||
| ) | ||
| .await | ||
| { | ||
| Ok(collected) => collected, | ||
| Err(err) => return Err(err), | ||
| }; |
There was a problem hiding this comment.
This match statement can be simplified by using the ? operator, as the error types are compatible. This will make the code more concise and readable.
| let response_collection::CollectedResponses { | |
| completes: all_responses, | |
| request_stats, | |
| } = match response_collection::collect_responses( | |
| execution_result, | |
| request_logprobs, | |
| self.enable_request_statistics, | |
| ) | |
| .await | |
| { | |
| Ok(collected) => collected, | |
| Err(err) => return Err(err), | |
| }; | |
| let response_collection::CollectedResponses { | |
| completes: all_responses, | |
| request_stats, | |
| } = response_collection::collect_responses( | |
| execution_result, | |
| request_logprobs, | |
| self.enable_request_statistics, | |
| ) | |
| .await?; |
| let response_collection::CollectedResponses { | ||
| completes: all_responses, | ||
| request_stats, | ||
| } = match response_collection::collect_responses( | ||
| execution_result, | ||
| request_logprobs, | ||
| self.enable_request_statistics, | ||
| ) | ||
| .await | ||
| { | ||
| Ok(collected) => collected, | ||
| Err(err) => return Err(err), | ||
| }; |
There was a problem hiding this comment.
This match statement can be simplified by using the ? operator, as the error types are compatible. This will make the code more concise and readable.
| let response_collection::CollectedResponses { | |
| completes: all_responses, | |
| request_stats, | |
| } = match response_collection::collect_responses( | |
| execution_result, | |
| request_logprobs, | |
| self.enable_request_statistics, | |
| ) | |
| .await | |
| { | |
| Ok(collected) => collected, | |
| Err(err) => return Err(err), | |
| }; | |
| let response_collection::CollectedResponses { | |
| completes: all_responses, | |
| request_stats, | |
| } = response_collection::collect_responses( | |
| execution_result, | |
| request_logprobs, | |
| self.enable_request_statistics, | |
| ) | |
| .await?; |
| let response_collection::CollectedResponses { | ||
| completes: all_responses, | ||
| request_stats, | ||
| } = match response_collection::collect_responses( | ||
| execution_result, | ||
| request_logprobs, | ||
| self.enable_request_statistics, | ||
| ) | ||
| .await | ||
| { | ||
| Ok(collected) => collected, | ||
| Err(err) => return Err(err), | ||
| }; |
There was a problem hiding this comment.
This match statement can be simplified by using the ? operator, as the error types are compatible. This will make the code more concise and readable.
| let response_collection::CollectedResponses { | |
| completes: all_responses, | |
| request_stats, | |
| } = match response_collection::collect_responses( | |
| execution_result, | |
| request_logprobs, | |
| self.enable_request_statistics, | |
| ) | |
| .await | |
| { | |
| Ok(collected) => collected, | |
| Err(err) => return Err(err), | |
| }; | |
| let response_collection::CollectedResponses { | |
| completes: all_responses, | |
| request_stats, | |
| } = response_collection::collect_responses( | |
| execution_result, | |
| request_logprobs, | |
| self.enable_request_statistics, | |
| ) | |
| .await?; |
| let response_collection::CollectedResponses { | ||
| completes: all_responses, | ||
| request_stats, | ||
| } = match response_collection::collect_responses( | ||
| execution_result, | ||
| request_logprobs, | ||
| self.enable_request_statistics, | ||
| ) | ||
| .await | ||
| { | ||
| Ok(collected) => collected, | ||
| Err(err) => return Err(err), | ||
| }; |
There was a problem hiding this comment.
This match statement can be simplified by using the ? operator, as the error types are compatible. This will make the code more concise and readable.
| let response_collection::CollectedResponses { | |
| completes: all_responses, | |
| request_stats, | |
| } = match response_collection::collect_responses( | |
| execution_result, | |
| request_logprobs, | |
| self.enable_request_statistics, | |
| ) | |
| .await | |
| { | |
| Ok(collected) => collected, | |
| Err(err) => return Err(err), | |
| }; | |
| let response_collection::CollectedResponses { | |
| completes: all_responses, | |
| request_stats, | |
| } = response_collection::collect_responses( | |
| execution_result, | |
| request_logprobs, | |
| self.enable_request_statistics, | |
| ) | |
| .await?; |
| pub(crate) async fn collect_stream_responses( | ||
| stream: &mut ProtoStream, | ||
| worker_name: &str, | ||
| enable_request_statistics: bool, | ||
| ) -> Result<CollectedStreamResponses, Response> { | ||
| let mut all_responses = Vec::new(); | ||
| let mut stream_request_stats = Vec::new(); | ||
|
|
||
| while let Some(response) = stream.next().await { | ||
| match response { | ||
| Ok(gen_response) => { | ||
| match gen_response.into_response() { | ||
| ProtoResponseVariant::Complete(complete) => { | ||
| all_responses.push(complete); | ||
| } | ||
| ProtoResponseVariant::Error(err) => { | ||
| error!(function = "collect_stream_responses", worker = %worker_name, error = %err.message(), "Worker generation error"); | ||
| // Don't mark as completed - let Drop send abort for error cases | ||
| return Err(error::internal_error( | ||
| "worker_generation_failed", | ||
| format!("{} generation failed: {}", worker_name, err.message()), | ||
| )); | ||
| } | ||
| ProtoResponseVariant::Chunk(_chunk) => { | ||
| // Streaming chunk - no action needed | ||
| } | ||
| ProtoResponseVariant::RequestStats(request_stats) => { | ||
| if enable_request_statistics { | ||
| stream_request_stats.push(request_stats); | ||
| } | ||
| } | ||
| ProtoResponseVariant::None => { | ||
| // Empty response - no action needed | ||
| } | ||
| } | ||
| } | ||
| Err(e) => { | ||
| error!(function = "collect_stream_responses", worker = %worker_name, error = ?e, "Worker stream error"); | ||
| // Don't mark as completed - let Drop send abort for error cases | ||
| return Err(error::internal_error( | ||
| "worker_stream_failed", | ||
| format!("{worker_name} stream failed: {e}"), | ||
| )); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| let request_stats = if enable_request_statistics { | ||
| collect_request_stats(&all_responses, &stream_request_stats) | ||
| } else { | ||
| None | ||
| }; | ||
|
|
||
| Ok(CollectedStreamResponses { | ||
| completes: all_responses, | ||
| request_stats, | ||
| }) | ||
| } |
There was a problem hiding this comment.
This collect_stream_responses function appears to be a duplicate of the one defined in model_gateway/src/routers/grpc/common/response_collection.rs. The function in this file also seems to be unused. To improve maintainability and avoid confusion, please remove this duplicated function.
References
- The
collect_stream_responsesfunction is a duplicate of one defined elsewhere. This rule advises against unnecessary duplication to improve maintainability and avoid confusion.
CatherineSue
left a comment
There was a problem hiding this comment.
Hey Scott, appreciate the effort on this — the idea of collecting request-level statistics from engines is genuinely useful and I can see the thought that went into supporting multiple backends. I couldn't help but take a look out of interest, and I noticed quite a few issues with the current design that I think are worth addressing before this goes further, even as a WIP.
I've left detailed line-by-line comments below, but here's the high-level summary:
Architectural concerns
- Stats collection is deeply entangled with abort/lifecycle — these are orthogonal concerns that shouldn't be coupled
enable_request_statistics: boolis threaded through 15+ function signatures — this creates a massive API surface change for what should be an observability-only concern. The boolean-parameter-threading pattern is a well-known anti-pattern; consider a middleware/interceptor approach instead- The proto changes couple stats into the streaming response union and the abort response — stats are metadata, not a response variant. This breaks the single-responsibility of those messages
- ~430 lines added to
proto_wrapper.rs(already a large file) with three near-identical mapper implementations and four overlapping collection functions
Code quality concerns
- Multiple instances of
match { Ok(x) => x, Err(e) => return Err(e) }which is literally the?operator send_sse_or_aborttakes 8 mutable reference parameters — a clear sign the function is doing too many thingsBox<dyn Error>used as error type throughout instead of proper error enumsformat_optional_f64allocatesStringto representNoneas the literal string"None"— use tracing's nativeOptionsupport- Copy-paste
abort()methods across three gRPC client backends instead of a shared trait - Two identical
CollectedResponses/CollectedStreamResponsesstructs in different modules - No tests for any of the new functionality
- Unrelated changes mixed in (MCP label constant, format string changes,
finalized_analysisremoval)
Suggested alternative approach
Rather than threading a boolean through every function, consider:
- A stats-collecting stream wrapper that transparently intercepts
ProtoStreamand collects stats — zero changes to existing function signatures - Keep stats out of the proto response oneof — use the dedicated
GetRequestStatsRPC you already defined, or collect from theCompletemessages you already receive - Keep abort as a pure lifecycle operation — don't overload it with stats collection
| rpc GetLoads(GetLoadsRequest) returns (GetLoadsResponse); | ||
|
|
||
| // Get request-level statistics for a completed request | ||
| rpc GetRequestStats(GetRequestStatsRequest) returns (GetRequestStatsResponse); |
There was a problem hiding this comment.
This RPC is defined but never called anywhere in the gateway code. If the plan is to use it later, it shouldn't be in this PR. If it's the intended mechanism for stats collection, then the in-band RequestStats variant in the streaming response (line 201) is redundant — you'd be collecting stats two different ways.
Pick one approach: either poll via this RPC after completion, or receive stats in-band. Having both is confusing and creates maintenance burden.
| GenerateStreamChunk chunk = 2; | ||
| GenerateComplete complete = 3; | ||
| GenerateError error = 4; | ||
| RequestStats request_stats = 5; |
There was a problem hiding this comment.
Adding RequestStats as a variant in the GenerateResponse oneof is a fundamental design issue.
The oneof currently has a clean semantic: chunk | complete | error. These are response lifecycle states. RequestStats is metadata about the response, not a response itself. Mixing these concerns in the same union means every consumer of the stream now needs to handle a variant that has nothing to do with the generation lifecycle.
The Complete message already carries prompt_tokens, completion_tokens, and cached_tokens. For the additional fields (timestamps, cache hit rate, etc.), consider either:
- Extending
GenerateCompletewith optional stats fields, or - Using the
GetRequestStatsRPC you defined at line 34 (which is currently unused)
| string request_id = 1; | ||
| } | ||
|
|
||
| message RequestStats { |
There was a problem hiding this comment.
The field numbering mixes optional and non-optional inconsistently. prompt_tokens (field 8), completion_tokens (field 9), and cached_tokens (field 10) are non-optional uint32, meaning they default to 0 on the wire. But semantically, "0 tokens" and "unknown" are different. If this message is meant to represent stats that may or may not be available from a given engine, all numeric fields should be optional.
Also, response_sent_timestamp_s (field 5) is defined here but never populated anywhere in the gateway code.
| message AbortResponse { | ||
| bool success = 1; | ||
| string message = 2; | ||
| RequestStats request_stats = 3; |
There was a problem hiding this comment.
This is the core of the problem with this PR's design: abort is a lifecycle operation, not a stats-collection mechanism.
An abort says "stop processing this request." The response should confirm whether the abort succeeded — which it already does with success and message. Piggybacking stats onto the abort response couples two orthogonal concerns:
- It forces abort callers to understand and handle stats even when they don't care
- It means stats are only available if you abort — what about requests that complete naturally?
- It changes the abort contract from "fire and forget" to "fire and parse stats"
This then cascades into the gateway code where abort_request() changes from Result<(), tonic::Status> to Result<proto::AbortResponse, Box<dyn Error>>, which is a breaking API change across all three backends.
| pub async fn abort( | ||
| &mut self, | ||
| reason: String, | ||
| ) -> Result<proto::AbortResponse, Box<dyn std::error::Error + Send + Sync>> { |
There was a problem hiding this comment.
Box<dyn std::error::Error + Send + Sync> as the error type is a step backwards from the existing tonic::Status. The original abort_request returned Result<(), tonic::Status> — callers knew exactly what errors to expect. Now they get an opaque box that could contain anything.
If you need a richer error type, define a proper error enum. Don't erase the type information.
| buffer.extend_from_slice(b"\n\n"); | ||
| } | ||
|
|
||
| async fn send_sse_or_abort( |
There was a problem hiding this comment.
async fn send_sse_or_abort(
tx: &UnboundedSender<Result<Bytes, io::Error>>,
payload: Bytes,
client_disconnected: &mut bool,
abort_sent: &mut bool,
stream: &mut ProtoStream,
send_error_message: &'static str,
client_disconnect_error_message: &mut Option<&'static str>,
stream_request_stats: Option<&mut Vec<ProtoRequestStats>>,
) -> Result<(), String>8 parameters, 5 of which are mutable references. This function is doing at least three things:
- Sending an SSE payload
- Managing client disconnect state
- Aborting the backend stream and collecting stats
This should be decomposed. Consider a ClientConnection struct that encapsulates tx, client_disconnected, abort_sent, and client_disconnect_error_message. Then this becomes connection.send_or_abort(payload, stream, stats).
The function is called 13 times in this file alone, each time with the same 8-argument boilerplate. That's 13 call sites that will all break if the signature changes.
| if !tool_calls.is_empty() { | ||
| let analysis_content = if has_analysis { | ||
| finalized_analysis | ||
| // Get analysis from finalized parser output by calling finalize again |
There was a problem hiding this comment.
// Get analysis from finalized parser output by calling finalize again
// This is safe because finalize can be called multiple times
let output = parser.finalize(finish_reason.clone());
output.analysisCalling finalize() twice because the original finalized_analysis variable was removed is a hack. "This is safe because finalize can be called multiple times" — is this actually guaranteed by the parser contract? If it is, it should be documented on the trait. If it isn't, this is a latent bug.
The original code stored finalized_analysis for exactly this reason. Why was it removed?
| let label = session | ||
| .map(|s| s.resolve_tool_server_label(tool_name)) | ||
| .unwrap_or_else(|| DEFAULT_SERVER_LABEL.to_string()); | ||
| .unwrap_or_else(|| "mcp".to_string()); |
There was a problem hiding this comment.
.unwrap_or_else(|| "mcp".to_string()) — This replaces DEFAULT_SERVER_LABEL.to_string() with a hardcoded string literal. This appears in 3 places in this file (lines 777, 939, 1072).
This is an unrelated change that:
- Removes the use of a named constant, making future changes error-prone
- Hardcodes
"mcp"without explanation of why the constant was wrong - Shouldn't be in a PR about request statistics
Unrelated changes make PRs harder to review and should be in a separate commit at minimum.
| /// # Returns | ||
| /// * `Ok(CollectedStreamResponses)` - Collected complete responses and unified request stats | ||
| /// * `Err(Response)` - Error response if the stream fails or returns an error | ||
| pub(crate) struct CollectedStreamResponses { |
There was a problem hiding this comment.
CollectedStreamResponses — this struct has the exact same fields as CollectedResponses in response_collection.rs (line 22): a Vec<ProtoGenerateComplete> and an Option<UnifiedRequestStats>.
Two identical structs in different modules for the same purpose is a clear DRY violation. Use one type.
| merge_logprobs: bool, | ||
| ) -> Result<Vec<ProtoGenerateComplete>, Response> { | ||
| let all_responses = match execution_result { | ||
| enable_request_statistics: bool, |
There was a problem hiding this comment.
enable_request_statistics: bool — this parameter is now threaded through:
collect_responses→collect_stream_responses(this file)ResponseProcessor::new,HarmonyResponseProcessor::new,HarmonyStreamingProcessor::newStreamingProcessor::new,GenerateStreamContextRequestPipeline::new_regular,::new_harmony_single,::new_harmony_dual,::new_pdHarmonyResponseProcessingStage::newGrpcRouter::build,GrpcPDRouter::build
That's 15+ function signatures changed for a single boolean. This is the "boolean parameter threading" anti-pattern. The feature should be implemented at the boundary (e.g., a wrapper around ProtoStream that transparently collects stats) rather than plumbed through every layer of the stack.
CatherineSue
left a comment
There was a problem hiding this comment.
Round 2 — more line-by-line issues I caught on closer inspection.
| prefill_cached_tokens_by_index | ||
| .insert(complete_wrapper.index(), complete_wrapper.cached_tokens()); | ||
| } | ||
| } |
There was a problem hiding this comment.
Indentation bug. The closing } here has been dedented one level:
if let ProtoResponseVariant::Complete(complete_wrapper) = response.into_response() {
prefill_cached_tokens_by_index
.insert(complete_wrapper.index(), complete_wrapper.cached_tokens());
} // <-- this should be at 12-space indent, not 8
}This makes it look like the if let body closes at the while level. The code still compiles because both braces are present, but the formatting is broken and misleading. Did cargo fmt actually run on this?
| // Metadata from Complete message; seed cached_tokens from prefill phase (dual-stream) | ||
| let mut finish_reason: String; | ||
| let mut finalized_analysis: Option<String> = None; | ||
| let mut finish_reason = String::from("stop"); |
There was a problem hiding this comment.
let mut finish_reason = String::from("stop"); — pre-initializing finish_reason to "stop" masks bugs. The original code left it uninitialized (declared later or via let finish_reason: String;), which means the compiler would catch any code path that uses it before the Complete message sets it.
By defaulting to "stop", if the Complete message is never received (e.g., stream ends early), you silently report a successful stop instead of surfacing the error. This defeats Rust's ability to catch missing-initialization bugs at compile time.
| finalized_analysis = final_output.analysis; | ||
| accumulated_tool_calls = final_output.commentary; | ||
| // Store finalized tool calls and reasoning token count | ||
| accumulated_tool_calls.clone_from(&final_output.commentary); |
There was a problem hiding this comment.
accumulated_tool_calls.clone_from(&final_output.commentary); — why clone_from instead of = final_output.commentary? The clone_from method is typically used to reuse allocations when overwriting an existing value, but accumulated_tool_calls was None before this (it's initialized as Option<Vec<...>>). There's no allocation to reuse.
Also, the original code was accumulated_tool_calls = final_output.commentary; (a move). This PR changes it to a clone. Why? final_output isn't used after this point — the move was correct and zero-cost.
| } | ||
| } | ||
| ProtoResponseVariant::Complete(complete) => { | ||
| completed_responses.push(complete.clone()); |
There was a problem hiding this comment.
completed_responses.push(complete.clone()); — you're cloning the entire ProtoGenerateComplete proto message (which contains output text, logprobs, etc.) on the hot path, purely for stats collection.
This clone happens at every Complete event. For requests with n > 1, you clone multiple times. The stats you extract from it are just a few integers (prompt_tokens, completion_tokens, cached_tokens). Extract the stats eagerly into a lightweight struct instead of cloning the full proto.
Same issue in harmony/streaming.rs:303.
| }); | ||
| } | ||
|
|
||
| async fn emit_generate_request_stats( |
There was a problem hiding this comment.
async fn emit_generate_request_stats(...) — this function is marked async but its body is entirely synchronous. It calls collect_request_stats() (sync) and emit() (sync). No .await appears in the function body.
An unnecessary async forces all callers to .await it and prevents calling from synchronous contexts. Remove the async.
| let mut prefill_cached_tokens_by_index: HashMap<u32, u32> = HashMap::new(); | ||
| while let Some(result) = prefill_stream.next().await { | ||
| let response = result.map_err(|e| format!("Prefill stream error: {}", e.message()))?; | ||
| let response = result.map_err(|e| format!("Prefill stream error: {e}"))?; |
There was a problem hiding this comment.
format!("Prefill stream error: {e}") — this was previously format!("Prefill stream error: {}", e.message()). This is not just a style change.
e here is a tonic::Status. Display for tonic::Status includes the status code (e.g., "status: Internal, message: ..."), while .message() returns just the message string. This changes the error string format, which could break log parsers or error matching.
This is an unrelated semantic change buried in a stats PR.
| } | ||
| } | ||
| ProtoResponseVariant::Complete(complete_wrapper) => { | ||
| completed_responses.push(complete_wrapper.clone()); |
There was a problem hiding this comment.
completed_responses.push(complete_wrapper.clone()); — same unnecessary full-proto clone as in regular/streaming.rs:501. This is on the streaming hot path inside a while let Some(response) loop.
| let mut error_message: Option<String> = None; | ||
|
|
||
| for sample in stats { | ||
| seen += 1; |
There was a problem hiding this comment.
let mut seen = 0u64;
// ...
for sample in stats {
seen += 1;
// ...
}
if seen == 0 { return None; }The seen counter is unnecessary. You can just check stats.is_empty() before the loop, or use the fact that stats.first()? already returns None for empty slices (which you do at the top of the calling function anyway). This manual counter adds complexity for no reason.
| /// HarmonyParserAdapter to extract the complete response. | ||
| pub(crate) struct HarmonyResponseProcessor; | ||
| pub(crate) struct HarmonyResponseProcessor { | ||
| enable_request_statistics: bool, |
There was a problem hiding this comment.
HarmonyResponseProcessor was a unit struct (struct HarmonyResponseProcessor;). Now it carries enable_request_statistics: bool.
This means a struct whose job is response processing now carries configuration about whether to collect metrics. This violates single responsibility — the processor should process responses, and something else should decide whether stats get emitted.
This is why the middleware/wrapper approach would be cleaner: the processor stays focused on its job, and the stats layer wraps around it.
| processor: HarmonyResponseProcessor::new(enable_request_statistics), | ||
| streaming_processor: Arc::new(HarmonyStreamingProcessor::new(enable_request_statistics)), | ||
| } | ||
| } |
There was a problem hiding this comment.
streaming_processor: Arc::new(HarmonyStreamingProcessor::new(enable_request_statistics)),This line is over 80 chars (rustfmt would catch this) and shows the config boolean being passed into an Arc-wrapped processor. The enable_request_statistics value is now baked into an Arc that lives for the lifetime of the stage. If you ever wanted to toggle stats at runtime (e.g., via a config reload), this architecture makes it impossible.
Signed-off-by: Scott Lee <scott@together.ai>
Signed-off-by: Scott Lee <scott@together.ai>
Signed-off-by: Scott Lee <scott@together.ai>
Signed-off-by: Scott Lee <scott@together.ai>
Signed-off-by: Scott Lee <scott@together.ai>
|
CI fails because the branch is on my fork, so API keys used in the CI tests are unavailable. Close in favor of #757 which should fix this issue. |
! WIP - Not ready for review !
Description
WIP, add logging request-level statistics from engine. Create abstraction for general engines, and implement concrete SGLang version.
Problem
Solution
Changes
Test Plan
Checklist
cargo +nightly fmtpassescargo clippy --all-targets --all-features -- -D warningspasses