From ae8ad6334781c19c611a08f3fe161f1515915658 Mon Sep 17 00:00:00 2001 From: James Pine Date: Fri, 27 Feb 2026 03:10:30 -0800 Subject: [PATCH] fix: improve openai-chatgpt provider reliability for long-running Codex tasks Addresses user-reported failures where ChatGPT Pro OAuth (openai-chatgpt provider) returns empty responses or silently fails on longer coding tasks. Five fixes: 1. 401 -> refresh -> replay: attempt_completion now detects auth errors from OAuth providers and force-refreshes the token before replaying the request once, handling mid-session token expiry and server-side revocation. 2. Per-request timeout for Codex SSE: bumps the timeout from the global 120s to 600s for openai-chatgpt requests, since Codex coding tasks can run for several minutes and the buffered .text().await was aborting mid-stream. 3. Defensive SSE parsing: the stream parser now handles response.failed, response.incomplete, truncated streams, and empty streams with specific error messages instead of the opaque 'missing response.completed event'. 4. Retriable stream failures: stream-terminal errors (empty stream, truncated, incomplete model run) are now classified as retriable in routing.rs so the retry loop can attempt recovery. 5. Better error messages: empty output arrays now report whether the response was incomplete (with reason), truly empty, or had output items with no usable content, replacing the generic 'empty response from OpenAI Responses API' message. --- src/llm/manager.rs | 37 ++++++ src/llm/model.rs | 307 +++++++++++++++++++++++++++++++++++++-------- src/llm/routing.rs | 6 + 3 files changed, 298 insertions(+), 52 deletions(-) diff --git a/src/llm/manager.rs b/src/llm/manager.rs index 56693c560..54316c8d7 100644 --- a/src/llm/manager.rs +++ b/src/llm/manager.rs @@ -233,6 +233,43 @@ impl LlmManager { } } + /// Force-refresh the OpenAI OAuth token, bypassing the expiry check. + /// + /// Used when the API returns 401 even though the token appeared valid + /// locally (server-side revocation, clock skew, mid-stream expiry). + /// Returns `Ok(Some(token))` on successful refresh, `Ok(None)` if no + /// credentials are configured, or `Err` if the refresh request itself fails. + pub async fn force_refresh_openai_token(&self) -> Result> { + let mut creds_guard = self.openai_oauth_credentials.write().await; + let Some(creds) = creds_guard.as_ref() else { + return Ok(None); + }; + + tracing::info!("force-refreshing OpenAI OAuth token after 401"); + match creds.refresh().await { + Ok(new_creds) => { + if let Some(ref instance_dir) = self.instance_dir + && let Err(error) = + crate::openai_auth::save_credentials(instance_dir, &new_creds) + { + tracing::warn!(%error, "failed to persist refreshed OpenAI OAuth credentials"); + } + let token = new_creds.access_token.clone(); + *creds_guard = Some(new_creds); + tracing::info!("OpenAI OAuth token force-refreshed successfully"); + Ok(Some(token)) + } + Err(error) => { + tracing::error!(%error, "OpenAI OAuth token force-refresh failed"); + Err(LlmError::ProviderRequest(format!( + "OpenAI OAuth token refresh failed: {error}. \ + Re-authenticate via /api/providers/openai-chatgpt/auth" + )) + .into()) + } + } + } + /// Resolve the OpenAI provider config from static API-key configuration. /// /// OpenAI ChatGPT OAuth is intentionally handled via a separate internal diff --git a/src/llm/model.rs b/src/llm/model.rs index 7eaa5780e..f589cf3b0 100644 --- a/src/llm/model.rs +++ b/src/llm/model.rs @@ -16,6 +16,13 @@ use rig::streaming::StreamingCompletionResponse; use serde::{Deserialize, Serialize}; use std::sync::Arc; +/// Timeout in seconds for ChatGPT Codex SSE streaming requests. +/// +/// Codex coding tasks can run for several minutes. The default 120s client +/// timeout is too short — it causes the buffered `.text().await` to abort +/// mid-stream. 10 minutes gives long coding tasks room to complete. +const CHATGPT_CODEX_TIMEOUT_SECS: u64 = 600; + /// Raw provider response. Wraps the JSON so Rig can carry it through. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RawResponse { @@ -79,6 +86,10 @@ impl SpacebotModel { } /// Direct call to the provider (no fallback logic). + /// + /// For OAuth-based providers (`openai-chatgpt`, `anthropic`), if the API + /// returns a 401/403 auth error, the token is force-refreshed and the + /// request is replayed once before propagating the error. async fn attempt_completion( &self, request: CompletionRequest, @@ -111,48 +122,110 @@ impl SpacebotModel { .map_err(|e| CompletionError::ProviderError(e.to_string()))?, }; - match provider_config.api_type { - ApiType::Anthropic => self.call_anthropic(request, &provider_config).await, - ApiType::OpenAiCompletions => self.call_openai(request, &provider_config).await, - ApiType::OpenAiChatCompletions => { - let endpoint = format!( - "{}/chat/completions", - provider_config.base_url.trim_end_matches('/') - ); - let display_name = provider_config - .name - .as_deref() - .unwrap_or("OpenAI-compatible provider"); - self.call_openai_compatible_with_optional_auth( - request, - display_name, - &endpoint, - Some(provider_config.api_key.clone()), - &[], - ) - .await - } - ApiType::KiloGateway => { - let endpoint = format!( - "{}/chat/completions", - provider_config.base_url.trim_end_matches('/') - ); - self.call_openai_compatible_with_optional_auth( - request, - "Kilo Gateway", - &endpoint, - Some(provider_config.api_key.clone()), - &[ - ("HTTP-Referer", "https://github.com/spacedriveapp/spacebot"), - ("X-Title", "spacebot"), - ], - ) - .await + let result = self + .dispatch_to_provider(request.clone(), &provider_config) + .await; + + // On 401/403 from OAuth-based providers, force-refresh the token and replay once. + if let Err(ref error) = result + && is_auth_error(error) + && matches!(provider_id, "openai-chatgpt" | "anthropic") + { + tracing::warn!( + provider = %provider_id, + "got 401/auth error, force-refreshing OAuth token and replaying" + ); + + let refreshed_config = match provider_id { + "openai-chatgpt" => match self.llm_manager.force_refresh_openai_token().await { + Ok(Some(token)) => { + let mut config = provider_config.clone(); + config.api_key = token; + Some(config) + } + Ok(None) => None, + Err(error) => { + return Err(CompletionError::ProviderError(format!( + "OAuth token refresh failed after 401: {error}" + ))); + } + }, + "anthropic" => match self.llm_manager.get_anthropic_provider().await { + Ok(config) => Some(config), + Err(error) => { + return Err(CompletionError::ProviderError(format!( + "OAuth token refresh failed after 401: {error}" + ))); + } + }, + _ => None, + }; + + if let Some(new_config) = refreshed_config { + tracing::info!(provider = %provider_id, "replaying request with refreshed token"); + return self.dispatch_to_provider(request, &new_config).await; } - ApiType::OpenAiResponses => self.call_openai_responses(request, &provider_config).await, - ApiType::Gemini => { - self.call_openai_compatible(request, "Google Gemini", &provider_config) + } + + result + } + + /// Route a request to the appropriate provider-specific call method. + fn dispatch_to_provider( + &self, + request: CompletionRequest, + provider_config: &ProviderConfig, + ) -> impl std::future::Future< + Output = Result, CompletionError>, + > + Send + + '_ { + let provider_config = provider_config.clone(); + async move { + match provider_config.api_type { + ApiType::Anthropic => self.call_anthropic(request, &provider_config).await, + ApiType::OpenAiCompletions => self.call_openai(request, &provider_config).await, + ApiType::OpenAiChatCompletions => { + let endpoint = format!( + "{}/chat/completions", + provider_config.base_url.trim_end_matches('/') + ); + let display_name = provider_config + .name + .as_deref() + .unwrap_or("OpenAI-compatible provider"); + self.call_openai_compatible_with_optional_auth( + request, + display_name, + &endpoint, + Some(provider_config.api_key.clone()), + &[], + ) + .await + } + ApiType::KiloGateway => { + let endpoint = format!( + "{}/chat/completions", + provider_config.base_url.trim_end_matches('/') + ); + self.call_openai_compatible_with_optional_auth( + request, + "Kilo Gateway", + &endpoint, + Some(provider_config.api_key.clone()), + &[ + ("HTTP-Referer", "https://github.com/spacedriveapp/spacebot"), + ("X-Title", "spacebot"), + ], + ) .await + } + ApiType::OpenAiResponses => { + self.call_openai_responses(request, &provider_config).await + } + ApiType::Gemini => { + self.call_openai_compatible(request, "Google Gemini", &provider_config) + .await + } } } } @@ -704,15 +777,45 @@ impl SpacebotModel { ); } - let response = request_builder - .json(&body) - .send() - .await - .map_err(|e| CompletionError::ProviderError(e.to_string()))?; + // ChatGPT Codex uses SSE streaming where the entire response is buffered + // via .text(). Coding tasks can run for several minutes, so we need a much + // longer timeout than the default 120s client timeout. + if is_chatgpt_codex { + request_builder = + request_builder.timeout(std::time::Duration::from_secs(CHATGPT_CODEX_TIMEOUT_SECS)); + } + + let response = request_builder.json(&body).send().await.map_err(|e| { + if e.is_timeout() { + CompletionError::ProviderError(format!( + "OpenAI Responses API request timed out after {}s \ + (connection or response exceeded deadline)", + if is_chatgpt_codex { + CHATGPT_CODEX_TIMEOUT_SECS + } else { + 120 + } + )) + } else { + CompletionError::ProviderError(e.to_string()) + } + })?; let status = response.status(); let response_text = response.text().await.map_err(|e| { - CompletionError::ProviderError(format!("failed to read response body: {e}")) + if e.is_timeout() { + CompletionError::ProviderError(format!( + "OpenAI Responses API response body read timed out after {}s \ + (model may still be generating)", + if is_chatgpt_codex { + CHATGPT_CODEX_TIMEOUT_SECS + } else { + 120 + } + )) + } else { + CompletionError::ProviderError(format!("failed to read response body: {e}")) + } })?; if !status.is_success() { @@ -1500,8 +1603,33 @@ fn parse_openai_responses_response( } } + let response_status = body["status"].as_str().unwrap_or("unknown"); let choice = OneOrMany::many(assistant_content).map_err(|_| { - CompletionError::ResponseError("empty response from OpenAI Responses API".into()) + let output_count = body["output"] + .as_array() + .map(|array| array.len()) + .unwrap_or(0); + if response_status == "incomplete" { + let reason = body["incomplete_details"]["reason"] + .as_str() + .unwrap_or("unknown reason"); + CompletionError::ResponseError(format!( + "OpenAI Responses API: model run incomplete ({reason}). \ + The model stopped before producing output." + )) + } else if output_count == 0 { + CompletionError::ResponseError( + "OpenAI Responses API returned an empty output array. \ + The model produced no text or tool calls. This may indicate \ + an auth issue, a model error, or the request was rejected silently." + .into(), + ) + } else { + CompletionError::ResponseError(format!( + "OpenAI Responses API: response contained {output_count} output item(s) \ + but none had usable content (status: {response_status})" + )) + } })?; let input_tokens = body["usage"]["input_tokens"].as_u64().unwrap_or(0); @@ -1522,9 +1650,23 @@ fn parse_openai_responses_response( }) } +/// Parse a ChatGPT Codex SSE stream into the response JSON body. +/// +/// The stream contains events like `response.created`, `response.output_item.added`, +/// `response.output_text.delta`, and finally `response.completed` with the full +/// response object. We prefer the `response.completed` event, but handle failure +/// modes defensively: +/// +/// - `response.failed` / `response.incomplete` — surface the server-side error +/// - Truncated stream (no terminal event) — attempt recovery from partial events +/// - Auth errors embedded in the stream — detect and surface clearly fn parse_openai_responses_sse_response( response_text: &str, ) -> Result { + let mut last_response_event: Option = None; + let mut error_event: Option = None; + let mut event_count = 0u32; + for line in response_text.lines() { let Some(data) = line.strip_prefix("data: ") else { continue; @@ -1538,15 +1680,65 @@ fn parse_openai_responses_sse_response( continue; }; - if event_body["type"].as_str() == Some("response.completed") - && let Some(response) = event_body.get("response") - { - return Ok(response.clone()); + event_count += 1; + let event_type = event_body["type"].as_str().unwrap_or(""); + + match event_type { + // Ideal path: the full response is included in the completed event. + "response.completed" => { + if let Some(response) = event_body.get("response") { + return Ok(response.clone()); + } + } + // Server-side failure: the model run failed or was incomplete. + "response.failed" | "response.incomplete" => { + let status = event_body + .pointer("/response/status") + .and_then(|value| value.as_str()) + .unwrap_or(event_type); + let error_message = event_body + .pointer("/response/error/message") + .or_else(|| event_body.pointer("/response/last_error/message")) + .and_then(|value| value.as_str()) + .unwrap_or("unknown server error"); + error_event = Some(format!( + "OpenAI Responses API: model run {status}: {error_message}" + )); + // Keep parsing in case a completed event follows (shouldn't, but be safe). + } + // Track the last response-level event for partial recovery. + _ if event_body.get("response").is_some() => { + last_response_event = event_body.get("response").cloned(); + } + _ => {} } } + // If we got an explicit error event, surface it. + if let Some(error_message) = error_event { + return Err(CompletionError::ProviderError(error_message)); + } + + // No response.completed found. Try partial recovery from the last + // response-bearing event (e.g. response.output_item.done). + if let Some(partial_response) = last_response_event { + tracing::warn!( + event_count, + "SSE stream missing response.completed, recovering from last response event" + ); + return Ok(partial_response); + } + + // Completely empty or unparseable stream. + let diagnostic = if event_count == 0 { + "SSE stream was empty (no events received). This may indicate an auth failure, \ + network interruption, or the server closed the connection before sending any data." + } else { + "SSE stream ended without a response.completed event (stream may have been truncated)." + }; + Err(CompletionError::ProviderError(format!( - "OpenAI Responses SSE stream missing response.completed event.\nBody: {}", + "OpenAI Responses API: {diagnostic} ({event_count} events received)\nBody: {}", truncate_body(response_text) ))) } @@ -1560,6 +1752,17 @@ fn parse_openai_error_message(response_text: &str) -> Option { .map(ToOwned::to_owned) } +/// Whether a completion error indicates an authentication/authorization failure (401/403). +fn is_auth_error(error: &CompletionError) -> bool { + let message = error.to_string().to_lowercase(); + message.contains("401") + || message.contains("403") + || message.contains("unauthorized") + || message.contains("authentication") + || message.contains("invalid_api_key") + || message.contains("invalid api key") +} + fn remap_model_name_for_api(provider: &str, model_name: &str) -> String { if provider == "zai-coding-plan" { // Coding Plan endpoint expects plain model ids (e.g. "glm-5"). diff --git a/src/llm/routing.rs b/src/llm/routing.rs index 0c3b6744c..fcb0dec33 100644 --- a/src/llm/routing.rs +++ b/src/llm/routing.rs @@ -129,8 +129,14 @@ pub fn is_retriable_error(error_message: &str) -> bool { || lower.contains("connection") // Empty/malformed responses are transient provider issues || lower.contains("empty response") + || lower.contains("empty output array") || lower.contains("failed to read response body") || lower.contains("error decoding response body") + // SSE stream failures (truncated, incomplete, server closed early) + || lower.contains("stream was empty") + || lower.contains("stream ended without") + || lower.contains("stream may have been truncated") + || lower.contains("model run incomplete") } /// Whether a completion error indicates context window overflow.