feat(completions): add native gRPC stages for /v1/completions#772
feat(completions): add native gRPC stages for /v1/completions#772vschandramourya wants to merge 2 commits intofeat/completions-response-infrafrom
Conversation
Signed-off-by: VS Chandra Mourya <msrinivasa@together.ai> (cherry picked from commit b3b8e531d5831dd74b8c35764083038f6f831ee9)
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request completes the integration of native Highlights
Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request successfully adds native gRPC support for /v1/completions by introducing and integrating completion-specific pipeline stages. The implementation follows the existing architecture for other endpoints, ensuring consistency. The changes are well-structured and logical. I have one suggestion to enhance the robustness of the request building stage by making the contract between pipeline stages more explicit, which will help in preventing potential silent failures.
| let mut proto_request = builder_client | ||
| .build_completion_request( | ||
| request_id, | ||
| &completion_request, | ||
| prep.original_text.clone().unwrap_or_default(), | ||
| prep.token_ids.clone(), | ||
| ) | ||
| .map_err(|e| { | ||
| error!( | ||
| function = "CompletionRequestBuildingStage::execute", | ||
| error = %e, | ||
| "Failed to build completion request" | ||
| ); | ||
| error::bad_request( | ||
| "invalid_request_parameters", | ||
| format!("Invalid request parameters: {e}"), | ||
| ) | ||
| })?; |
There was a problem hiding this comment.
The CompletionPreparationStage always sets original_text, so prep.original_text should not be None at this point. Using unwrap_or_default() can hide a potential logic error if it were ever None, which could lead to silent failures where an empty string is used as the prompt. It would be more robust to explicitly handle the None case as an internal error. This makes the contract between stages explicit and ensures the system fails fast if an invariant is broken.
let original_text = prep.original_text.as_ref().ok_or_else(|| {
error!(
function = "CompletionRequestBuildingStage::execute",
"original_text not found in preparation output for completion request"
);
error::internal_error(
"missing_preparation_output",
"original_text not found in preparation output for completion request",
)
})?;
let mut proto_request = builder_client
.build_completion_request(
request_id,
&completion_request,
original_text.clone(),
prep.token_ids.clone(),
)
.map_err(|e| {
error!(
function = "CompletionRequestBuildingStage::execute",
error = %e,
"Failed to build completion request"
);
error::bad_request(
"invalid_request_parameters",
format!("Invalid request parameters: {e}"),
)
})?;There was a problem hiding this comment.
Code Review
This pull request adds support for /v1/completions to the native gRPC router by introducing completion-specific pipeline stages. The changes are well-structured and follow existing patterns for other endpoints like chat and generate. I've identified a couple of areas with code duplication in the main router logic that could be refactored for better maintainability. Specifically, there's a duplicated validation check and repeated retry logic that has been copied for this new endpoint.
| if matches!(body.prompt, StringOrArray::Array(_)) { | ||
| return error::bad_request( | ||
| "batch_prompts_not_supported", | ||
| "Batched prompt arrays are not supported for gRPC /v1/completions yet", | ||
| ); |
There was a problem hiding this comment.
This validation for batch prompts appears to be duplicated. The same check exists in CompletionPreparationStage. To improve maintainability and avoid logic drift, consider removing this check from the router and letting the preparation stage be the single source of truth for this validation. This would centralize request validation logic within the pipeline stages.
References
- Extract duplicated logic into a shared helper function to improve maintainability and reduce redundancy.
| async fn route_completion_impl( | ||
| &self, | ||
| headers: Option<&HeaderMap>, | ||
| body: &CompletionRequest, | ||
| model_id: Option<&str>, | ||
| ) -> Response { | ||
| debug!( | ||
| "Processing completion request for model: {}, stream={}", | ||
| model_id.unwrap_or(UNKNOWN_MODEL_ID), | ||
| body.stream | ||
| ); | ||
|
|
||
| if matches!(body.prompt, StringOrArray::Array(_)) { | ||
| return error::bad_request( | ||
| "batch_prompts_not_supported", | ||
| "Batched prompt arrays are not supported for gRPC /v1/completions yet", | ||
| ); | ||
| } | ||
|
|
||
| if body.stream && body.logprobs.is_some() { | ||
| return error::bad_request( | ||
| "streaming_logprobs_not_supported", | ||
| "Streaming logprobs are not supported for gRPC /v1/completions", | ||
| ); | ||
| } | ||
|
|
||
| if HarmonyDetector::is_harmony_model_in_registry(&self.worker_registry, &body.model) { | ||
| return error::bad_request( | ||
| "harmony_completion_not_supported", | ||
| "Completion requests are not supported with Harmony models".to_string(), | ||
| ); | ||
| } | ||
|
|
||
| let request = Arc::new(body.clone()); | ||
| let headers_cloned = headers.cloned(); | ||
| let model_id_cloned = model_id.map(|s| s.to_string()); | ||
| let components = self.shared_components.clone(); | ||
| let pipeline = &self.pipeline; | ||
|
|
||
| RetryExecutor::execute_response_with_retry( | ||
| &self.retry_config, | ||
| |_attempt| { | ||
| let request = Arc::clone(&request); | ||
| let headers = headers_cloned.clone(); | ||
| let model_id = model_id_cloned.clone(); | ||
| let components = Arc::clone(&components); | ||
| async move { | ||
| pipeline | ||
| .execute_completion(request, headers, model_id, components) | ||
| .await | ||
| } | ||
| }, | ||
| |res, _attempt| is_retryable_status(res.status()), | ||
| |delay, attempt| { | ||
| Metrics::record_worker_retry( | ||
| metrics_labels::WORKER_REGULAR, | ||
| metrics_labels::ENDPOINT_COMPLETIONS, | ||
| ); | ||
| Metrics::record_worker_retry_backoff(attempt, delay); | ||
| }, | ||
| || { | ||
| Metrics::record_worker_retries_exhausted( | ||
| metrics_labels::WORKER_REGULAR, | ||
| metrics_labels::ENDPOINT_COMPLETIONS, | ||
| ); | ||
| }, | ||
| ) | ||
| .await | ||
| } |
There was a problem hiding this comment.
The implementation of route_completion_impl is very similar to other route_*_impl functions, especially the RetryExecutor logic. This PR adds to the existing code duplication. Consider refactoring this common retry logic into a generic helper function or macro to improve maintainability and reduce boilerplate.
References
- Extract duplicated logic into a shared helper function to improve maintainability and reduce redundancy.
Description
Problem
This PR completes the native
/v1/completionsrollout by wiringcompletion-specific stages into the regular gRPC router.
After adding the request contract, native pipeline typing, backend request
builders, and completion-aware response infrastructure, the regular gRPC
router still needed endpoint-specific completion stages and final router
wiring to make
/v1/completionsoperational end to end.Solution
Add completion-specific stages to the regular gRPC router and wire them into
the existing stage delegators and router entrypoint.
This makes
/v1/completionsa true native gRPC pipeline endpoint withthe same overall stage architecture as chat:
preparation, worker selection, client acquisition, request building,
dispatch metadata, request execution, and response processing.
Changes
CompletionPreparationStageCompletionRequestBuildingStageCompletionResponseProcessingStage/v1/completionspath.Test Plan
cargo test -p smg completion --quietcargo test -p smg --quietcargo clippy -p smg --all-targets --all-features -- -D warningsChecklist
cargo +nightly fmtpassescargo clippy --all-targets --all-features -- -D warningspasses