diff --git a/src/llm/manager.rs b/src/llm/manager.rs index 56693c560..54316c8d7 100644 --- a/src/llm/manager.rs +++ b/src/llm/manager.rs @@ -233,6 +233,43 @@ impl LlmManager { } } + /// Force-refresh the OpenAI OAuth token, bypassing the expiry check. + /// + /// Used when the API returns 401 even though the token appeared valid + /// locally (server-side revocation, clock skew, mid-stream expiry). + /// Returns `Ok(Some(token))` on successful refresh, `Ok(None)` if no + /// credentials are configured, or `Err` if the refresh request itself fails. + pub async fn force_refresh_openai_token(&self) -> Result> { + let mut creds_guard = self.openai_oauth_credentials.write().await; + let Some(creds) = creds_guard.as_ref() else { + return Ok(None); + }; + + tracing::info!("force-refreshing OpenAI OAuth token after 401"); + match creds.refresh().await { + Ok(new_creds) => { + if let Some(ref instance_dir) = self.instance_dir + && let Err(error) = + crate::openai_auth::save_credentials(instance_dir, &new_creds) + { + tracing::warn!(%error, "failed to persist refreshed OpenAI OAuth credentials"); + } + let token = new_creds.access_token.clone(); + *creds_guard = Some(new_creds); + tracing::info!("OpenAI OAuth token force-refreshed successfully"); + Ok(Some(token)) + } + Err(error) => { + tracing::error!(%error, "OpenAI OAuth token force-refresh failed"); + Err(LlmError::ProviderRequest(format!( + "OpenAI OAuth token refresh failed: {error}. \ + Re-authenticate via /api/providers/openai-chatgpt/auth" + )) + .into()) + } + } + } + /// Resolve the OpenAI provider config from static API-key configuration. /// /// OpenAI ChatGPT OAuth is intentionally handled via a separate internal diff --git a/src/llm/model.rs b/src/llm/model.rs index 7eaa5780e..f589cf3b0 100644 --- a/src/llm/model.rs +++ b/src/llm/model.rs @@ -16,6 +16,13 @@ use rig::streaming::StreamingCompletionResponse; use serde::{Deserialize, Serialize}; use std::sync::Arc; +/// Timeout in seconds for ChatGPT Codex SSE streaming requests. +/// +/// Codex coding tasks can run for several minutes. The default 120s client +/// timeout is too short — it causes the buffered `.text().await` to abort +/// mid-stream. 10 minutes gives long coding tasks room to complete. +const CHATGPT_CODEX_TIMEOUT_SECS: u64 = 600; + /// Raw provider response. Wraps the JSON so Rig can carry it through. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RawResponse { @@ -79,6 +86,10 @@ impl SpacebotModel { } /// Direct call to the provider (no fallback logic). + /// + /// For OAuth-based providers (`openai-chatgpt`, `anthropic`), if the API + /// returns a 401/403 auth error, the token is force-refreshed and the + /// request is replayed once before propagating the error. async fn attempt_completion( &self, request: CompletionRequest, @@ -111,48 +122,110 @@ impl SpacebotModel { .map_err(|e| CompletionError::ProviderError(e.to_string()))?, }; - match provider_config.api_type { - ApiType::Anthropic => self.call_anthropic(request, &provider_config).await, - ApiType::OpenAiCompletions => self.call_openai(request, &provider_config).await, - ApiType::OpenAiChatCompletions => { - let endpoint = format!( - "{}/chat/completions", - provider_config.base_url.trim_end_matches('/') - ); - let display_name = provider_config - .name - .as_deref() - .unwrap_or("OpenAI-compatible provider"); - self.call_openai_compatible_with_optional_auth( - request, - display_name, - &endpoint, - Some(provider_config.api_key.clone()), - &[], - ) - .await - } - ApiType::KiloGateway => { - let endpoint = format!( - "{}/chat/completions", - provider_config.base_url.trim_end_matches('/') - ); - self.call_openai_compatible_with_optional_auth( - request, - "Kilo Gateway", - &endpoint, - Some(provider_config.api_key.clone()), - &[ - ("HTTP-Referer", "https://github.com/spacedriveapp/spacebot"), - ("X-Title", "spacebot"), - ], - ) - .await + let result = self + .dispatch_to_provider(request.clone(), &provider_config) + .await; + + // On 401/403 from OAuth-based providers, force-refresh the token and replay once. + if let Err(ref error) = result + && is_auth_error(error) + && matches!(provider_id, "openai-chatgpt" | "anthropic") + { + tracing::warn!( + provider = %provider_id, + "got 401/auth error, force-refreshing OAuth token and replaying" + ); + + let refreshed_config = match provider_id { + "openai-chatgpt" => match self.llm_manager.force_refresh_openai_token().await { + Ok(Some(token)) => { + let mut config = provider_config.clone(); + config.api_key = token; + Some(config) + } + Ok(None) => None, + Err(error) => { + return Err(CompletionError::ProviderError(format!( + "OAuth token refresh failed after 401: {error}" + ))); + } + }, + "anthropic" => match self.llm_manager.get_anthropic_provider().await { + Ok(config) => Some(config), + Err(error) => { + return Err(CompletionError::ProviderError(format!( + "OAuth token refresh failed after 401: {error}" + ))); + } + }, + _ => None, + }; + + if let Some(new_config) = refreshed_config { + tracing::info!(provider = %provider_id, "replaying request with refreshed token"); + return self.dispatch_to_provider(request, &new_config).await; } - ApiType::OpenAiResponses => self.call_openai_responses(request, &provider_config).await, - ApiType::Gemini => { - self.call_openai_compatible(request, "Google Gemini", &provider_config) + } + + result + } + + /// Route a request to the appropriate provider-specific call method. + fn dispatch_to_provider( + &self, + request: CompletionRequest, + provider_config: &ProviderConfig, + ) -> impl std::future::Future< + Output = Result, CompletionError>, + > + Send + + '_ { + let provider_config = provider_config.clone(); + async move { + match provider_config.api_type { + ApiType::Anthropic => self.call_anthropic(request, &provider_config).await, + ApiType::OpenAiCompletions => self.call_openai(request, &provider_config).await, + ApiType::OpenAiChatCompletions => { + let endpoint = format!( + "{}/chat/completions", + provider_config.base_url.trim_end_matches('/') + ); + let display_name = provider_config + .name + .as_deref() + .unwrap_or("OpenAI-compatible provider"); + self.call_openai_compatible_with_optional_auth( + request, + display_name, + &endpoint, + Some(provider_config.api_key.clone()), + &[], + ) + .await + } + ApiType::KiloGateway => { + let endpoint = format!( + "{}/chat/completions", + provider_config.base_url.trim_end_matches('/') + ); + self.call_openai_compatible_with_optional_auth( + request, + "Kilo Gateway", + &endpoint, + Some(provider_config.api_key.clone()), + &[ + ("HTTP-Referer", "https://github.com/spacedriveapp/spacebot"), + ("X-Title", "spacebot"), + ], + ) .await + } + ApiType::OpenAiResponses => { + self.call_openai_responses(request, &provider_config).await + } + ApiType::Gemini => { + self.call_openai_compatible(request, "Google Gemini", &provider_config) + .await + } } } } @@ -704,15 +777,45 @@ impl SpacebotModel { ); } - let response = request_builder - .json(&body) - .send() - .await - .map_err(|e| CompletionError::ProviderError(e.to_string()))?; + // ChatGPT Codex uses SSE streaming where the entire response is buffered + // via .text(). Coding tasks can run for several minutes, so we need a much + // longer timeout than the default 120s client timeout. + if is_chatgpt_codex { + request_builder = + request_builder.timeout(std::time::Duration::from_secs(CHATGPT_CODEX_TIMEOUT_SECS)); + } + + let response = request_builder.json(&body).send().await.map_err(|e| { + if e.is_timeout() { + CompletionError::ProviderError(format!( + "OpenAI Responses API request timed out after {}s \ + (connection or response exceeded deadline)", + if is_chatgpt_codex { + CHATGPT_CODEX_TIMEOUT_SECS + } else { + 120 + } + )) + } else { + CompletionError::ProviderError(e.to_string()) + } + })?; let status = response.status(); let response_text = response.text().await.map_err(|e| { - CompletionError::ProviderError(format!("failed to read response body: {e}")) + if e.is_timeout() { + CompletionError::ProviderError(format!( + "OpenAI Responses API response body read timed out after {}s \ + (model may still be generating)", + if is_chatgpt_codex { + CHATGPT_CODEX_TIMEOUT_SECS + } else { + 120 + } + )) + } else { + CompletionError::ProviderError(format!("failed to read response body: {e}")) + } })?; if !status.is_success() { @@ -1500,8 +1603,33 @@ fn parse_openai_responses_response( } } + let response_status = body["status"].as_str().unwrap_or("unknown"); let choice = OneOrMany::many(assistant_content).map_err(|_| { - CompletionError::ResponseError("empty response from OpenAI Responses API".into()) + let output_count = body["output"] + .as_array() + .map(|array| array.len()) + .unwrap_or(0); + if response_status == "incomplete" { + let reason = body["incomplete_details"]["reason"] + .as_str() + .unwrap_or("unknown reason"); + CompletionError::ResponseError(format!( + "OpenAI Responses API: model run incomplete ({reason}). \ + The model stopped before producing output." + )) + } else if output_count == 0 { + CompletionError::ResponseError( + "OpenAI Responses API returned an empty output array. \ + The model produced no text or tool calls. This may indicate \ + an auth issue, a model error, or the request was rejected silently." + .into(), + ) + } else { + CompletionError::ResponseError(format!( + "OpenAI Responses API: response contained {output_count} output item(s) \ + but none had usable content (status: {response_status})" + )) + } })?; let input_tokens = body["usage"]["input_tokens"].as_u64().unwrap_or(0); @@ -1522,9 +1650,23 @@ fn parse_openai_responses_response( }) } +/// Parse a ChatGPT Codex SSE stream into the response JSON body. +/// +/// The stream contains events like `response.created`, `response.output_item.added`, +/// `response.output_text.delta`, and finally `response.completed` with the full +/// response object. We prefer the `response.completed` event, but handle failure +/// modes defensively: +/// +/// - `response.failed` / `response.incomplete` — surface the server-side error +/// - Truncated stream (no terminal event) — attempt recovery from partial events +/// - Auth errors embedded in the stream — detect and surface clearly fn parse_openai_responses_sse_response( response_text: &str, ) -> Result { + let mut last_response_event: Option = None; + let mut error_event: Option = None; + let mut event_count = 0u32; + for line in response_text.lines() { let Some(data) = line.strip_prefix("data: ") else { continue; @@ -1538,15 +1680,65 @@ fn parse_openai_responses_sse_response( continue; }; - if event_body["type"].as_str() == Some("response.completed") - && let Some(response) = event_body.get("response") - { - return Ok(response.clone()); + event_count += 1; + let event_type = event_body["type"].as_str().unwrap_or(""); + + match event_type { + // Ideal path: the full response is included in the completed event. + "response.completed" => { + if let Some(response) = event_body.get("response") { + return Ok(response.clone()); + } + } + // Server-side failure: the model run failed or was incomplete. + "response.failed" | "response.incomplete" => { + let status = event_body + .pointer("/response/status") + .and_then(|value| value.as_str()) + .unwrap_or(event_type); + let error_message = event_body + .pointer("/response/error/message") + .or_else(|| event_body.pointer("/response/last_error/message")) + .and_then(|value| value.as_str()) + .unwrap_or("unknown server error"); + error_event = Some(format!( + "OpenAI Responses API: model run {status}: {error_message}" + )); + // Keep parsing in case a completed event follows (shouldn't, but be safe). + } + // Track the last response-level event for partial recovery. + _ if event_body.get("response").is_some() => { + last_response_event = event_body.get("response").cloned(); + } + _ => {} } } + // If we got an explicit error event, surface it. + if let Some(error_message) = error_event { + return Err(CompletionError::ProviderError(error_message)); + } + + // No response.completed found. Try partial recovery from the last + // response-bearing event (e.g. response.output_item.done). + if let Some(partial_response) = last_response_event { + tracing::warn!( + event_count, + "SSE stream missing response.completed, recovering from last response event" + ); + return Ok(partial_response); + } + + // Completely empty or unparseable stream. + let diagnostic = if event_count == 0 { + "SSE stream was empty (no events received). This may indicate an auth failure, \ + network interruption, or the server closed the connection before sending any data." + } else { + "SSE stream ended without a response.completed event (stream may have been truncated)." + }; + Err(CompletionError::ProviderError(format!( - "OpenAI Responses SSE stream missing response.completed event.\nBody: {}", + "OpenAI Responses API: {diagnostic} ({event_count} events received)\nBody: {}", truncate_body(response_text) ))) } @@ -1560,6 +1752,17 @@ fn parse_openai_error_message(response_text: &str) -> Option { .map(ToOwned::to_owned) } +/// Whether a completion error indicates an authentication/authorization failure (401/403). +fn is_auth_error(error: &CompletionError) -> bool { + let message = error.to_string().to_lowercase(); + message.contains("401") + || message.contains("403") + || message.contains("unauthorized") + || message.contains("authentication") + || message.contains("invalid_api_key") + || message.contains("invalid api key") +} + fn remap_model_name_for_api(provider: &str, model_name: &str) -> String { if provider == "zai-coding-plan" { // Coding Plan endpoint expects plain model ids (e.g. "glm-5"). diff --git a/src/llm/routing.rs b/src/llm/routing.rs index 0c3b6744c..fcb0dec33 100644 --- a/src/llm/routing.rs +++ b/src/llm/routing.rs @@ -129,8 +129,14 @@ pub fn is_retriable_error(error_message: &str) -> bool { || lower.contains("connection") // Empty/malformed responses are transient provider issues || lower.contains("empty response") + || lower.contains("empty output array") || lower.contains("failed to read response body") || lower.contains("error decoding response body") + // SSE stream failures (truncated, incomplete, server closed early) + || lower.contains("stream was empty") + || lower.contains("stream ended without") + || lower.contains("stream may have been truncated") + || lower.contains("model run incomplete") } /// Whether a completion error indicates context window overflow.