From c109c735db11e5b04e1bad52704be8274e648495 Mon Sep 17 00:00:00 2001 From: Clifford Ressel Date: Fri, 16 Jan 2026 17:03:17 -0500 Subject: [PATCH] Add ACP resume session support --- codex-rs/Cargo.lock | 8 ++-- codex-rs/acp/Cargo.toml | 2 +- codex-rs/acp/docs.md | 7 ++-- codex-rs/acp/src/backend.rs | 52 +++++++++++++++++++++++- codex-rs/acp/src/connection.rs | 59 ++++++++++++++++++++++++---- codex-rs/mock-acp-agent/Cargo.toml | 2 +- codex-rs/tui/src/app.rs | 5 +++ codex-rs/tui/src/app_backtrack.rs | 1 + codex-rs/tui/src/chatwidget.rs | 11 +++++- codex-rs/tui/src/chatwidget/agent.rs | 10 ++++- codex-rs/tui/src/chatwidget/tests.rs | 1 + codex-rs/tui/src/lib.rs | 13 +++++- codex-rs/tui/src/test_backend.rs | 2 + 13 files changed, 149 insertions(+), 24 deletions(-) diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 947d83d4b..ca08d3678 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -40,9 +40,9 @@ dependencies = [ [[package]] name = "agent-client-protocol" -version = "0.9.0" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2ffe7d502c1e451aafc5aff655000f84d09c9af681354ac0012527009b1af13" +checksum = "1ea4b85f3bcd56ebe65f830321d34bc939af1b5a33b9dcb683195a3b72de0cdb" dependencies = [ "agent-client-protocol-schema", "anyhow", @@ -57,9 +57,9 @@ dependencies = [ [[package]] name = "agent-client-protocol-schema" -version = "0.10.1" +version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1de0bd518a2b1a4ce0cbde47d3fdf7c7babced977d697120132bd44924e30ad" +checksum = "70829a300bd178abe42836ac779cd3eb3b0dd3881250c752b2621b5324735df1" dependencies = [ "anyhow", "derive_more 2.0.1", diff --git a/codex-rs/acp/Cargo.toml b/codex-rs/acp/Cargo.toml index 1e815ab8e..0f30e23a2 100644 --- a/codex-rs/acp/Cargo.toml +++ b/codex-rs/acp/Cargo.toml @@ -12,7 +12,7 @@ unstable = [] workspace = true [dependencies] -agent-client-protocol = { version = "0.9.0", features = ["unstable"] } +agent-client-protocol = { version = "0.9.3", features = ["unstable"] } anyhow = { workspace = true } codex-core = { workspace = true } codex-protocol = { path = "../protocol" } diff --git a/codex-rs/acp/docs.md b/codex-rs/acp/docs.md index ca988aaa3..2ed85f49b 100644 --- a/codex-rs/acp/docs.md +++ b/codex-rs/acp/docs.md @@ -443,7 +443,7 @@ The `AcpBackend` provides a TUI-compatible interface that wraps `AcpConnection`: └─────────────────────────┘ └─────────────────────────┘ ``` -- `AcpBackendConfig`: Configuration for spawning (model, cwd, approval_policy, sandbox_policy, notify, nori_home, history_persistence) +- `AcpBackendConfig`: Configuration for spawning (model, cwd, approval_policy, sandbox_policy, notify, nori_home, history_persistence, resume_session_id) - `AcpBackend::spawn()`: Creates AcpConnection, session, and starts approval handler task. Uses enhanced error handling to provide actionable error messages on spawn or session creation failure. - `AcpBackend::submit(Op)`: Translates Codex Ops to ACP actions: - `Op::UserInput` → ACP `prompt()` @@ -684,10 +684,9 @@ Client advertises these capabilities to agents: The following features are marked with TODO comments in the codebase: -**Resume/Fork Integration (connection.rs:343-350):** -- Accept optional session_id parameter to resume existing sessions +**History-aware Resume/Fork Integration (connection.rs):** - Load persisted history from Codex rollout format -- Send history to agent via session initialization +- Send history to agents via `session/load` when full transcript replay is needed **Codex-format History Persistence (connection.rs:385-394):** - Collect all SessionUpdates during prompts diff --git a/codex-rs/acp/src/backend.rs b/codex-rs/acp/src/backend.rs index d371bfd3c..38b6b39e3 100644 --- a/codex-rs/acp/src/backend.rs +++ b/codex-rs/acp/src/backend.rs @@ -149,6 +149,8 @@ pub struct AcpBackendConfig { pub nori_home: PathBuf, /// History persistence policy pub history_persistence: crate::config::HistoryPersistence, + /// Optional ACP session ID to resume via `session/resume`. + pub resume_session_id: Option, } /// Backend adapter that provides a TUI-compatible interface for ACP agents. @@ -223,8 +225,37 @@ impl AcpBackend { } }; + let resume_session_id = config + .resume_session_id + .as_ref() + .map(|session_id| acp::SessionId::from(session_id.clone())); + // Create a session with enhanced error handling - let session_result = connection.create_session(&cwd).await; + let session_result = if let Some(resume_session_id) = resume_session_id.clone() { + #[cfg(feature = "unstable")] + { + let supports_resume = connection + .capabilities() + .session_capabilities + .resume + .is_some(); + if !supports_resume { + return Err(anyhow::anyhow!( + "Agent does not support session/resume. Start a new session or choose a different agent." + )); + } + } + #[cfg(not(feature = "unstable"))] + { + return Err(anyhow::anyhow!( + "Session resume is unavailable because unstable ACP features are disabled." + )); + } + + connection.resume_session(&resume_session_id, &cwd).await + } else { + connection.create_session(&cwd).await + }; let session_id = match session_result { Ok(id) => id, Err(e) => { @@ -247,7 +278,11 @@ impl AcpBackend { } }; - debug!("ACP session created: {:?}", session_id); + if resume_session_id.is_some() { + debug!("ACP session resumed: {:?}", session_id); + } else { + debug!("ACP session created: {:?}", session_id); + } // Take the approval receiver for handling permission requests let approval_rx = connection.take_approval_receiver(); @@ -301,6 +336,18 @@ impl AcpBackend { .await .ok(); + if let Some(resume_session_id) = resume_session_id { + let message = format!( + "Resumed ACP session {resume_session_id}. Previous messages are not shown." + ); + let _ = event_tx + .send(Event { + id: String::new(), + msg: EventMsg::Warning(codex_protocol::protocol::WarningEvent { message }), + }) + .await; + } + // Spawn approval handler task tokio::spawn(Self::run_approval_handler( approval_rx, @@ -2269,6 +2316,7 @@ mod tests { notify: None, nori_home: temp_dir.path().to_path_buf(), history_persistence: crate::config::HistoryPersistence::SaveAll, + resume_session_id: None, }; let result = AcpBackend::spawn(&config, event_tx).await; diff --git a/codex-rs/acp/src/connection.rs b/codex-rs/acp/src/connection.rs index b3e7bc897..0b45b18c8 100644 --- a/codex-rs/acp/src/connection.rs +++ b/codex-rs/acp/src/connection.rs @@ -113,6 +113,11 @@ enum AcpCommand { cwd: PathBuf, response_tx: oneshot::Sender>, }, + ResumeSession { + session_id: acp::SessionId, + cwd: PathBuf, + response_tx: oneshot::Sender>, + }, Prompt { session_id: acp::SessionId, prompt: Vec, @@ -258,6 +263,24 @@ impl AcpConnection { response_rx.await.context("ACP worker thread died")? } + /// Resume an existing session with the agent. + pub async fn resume_session( + &self, + session_id: &acp::SessionId, + cwd: &Path, + ) -> Result { + let (response_tx, response_rx) = oneshot::channel(); + self.command_tx + .send(AcpCommand::ResumeSession { + session_id: session_id.clone(), + cwd: cwd.to_path_buf(), + response_tx, + }) + .await + .context("ACP worker thread died")?; + response_rx.await.context("ACP worker thread died")? + } + /// Send a prompt to an existing session and receive streaming updates. /// /// Returns the stop reason when the prompt completes. @@ -608,14 +631,6 @@ async fn run_command_loop( while let Some(cmd) = command_rx.recv().await { match cmd { AcpCommand::CreateSession { cwd, response_tx } => { - // TODO: [Future] Resume/Fork Integration - // When creating a session, check if there's an existing session to resume. - // This would require: - // 1. Accepting an optional session_id parameter to resume - // 2. Loading persisted history from Codex rollout format - // 3. Sending history to the agent via the session initialization - // See: codex-core/src/rollout.rs for the persistence format - let result = inner .connection .new_session(acp::NewSessionRequest::new(cwd)) @@ -640,6 +655,34 @@ async fn run_command_loop( .context("Failed to create ACP session"); let _ = response_tx.send(result); } + AcpCommand::ResumeSession { + session_id, + cwd, + response_tx, + } => { + let result = inner + .connection + .resume_session(acp::ResumeSessionRequest::new(session_id.clone(), cwd)) + .await; + + #[cfg(feature = "unstable")] + if let Ok(ref response) = result + && let Some(ref models) = response.models + && let Ok(mut state) = model_state.write() + { + *state = AcpModelState::from_session_model_state(models); + debug!( + "Model state updated after resume: current={:?}, available={}", + state.current_model_id, + state.available_models.len() + ); + } + + let result = result + .map(|_| session_id) + .context("Failed to resume ACP session"); + let _ = response_tx.send(result); + } AcpCommand::Prompt { session_id, prompt, diff --git a/codex-rs/mock-acp-agent/Cargo.toml b/codex-rs/mock-acp-agent/Cargo.toml index 018a1a907..9cb842f30 100644 --- a/codex-rs/mock-acp-agent/Cargo.toml +++ b/codex-rs/mock-acp-agent/Cargo.toml @@ -14,7 +14,7 @@ default = ["unstable"] unstable = ["agent-client-protocol/unstable"] [dependencies] -agent-client-protocol = "0.9.0" +agent-client-protocol = "0.9.3" tokio = { version = "1", features = ["full"] } tokio-util = { version = "0.7", features = ["compat"] } async-trait = "0.1" diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index 10255a368..e66a37b1e 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -254,6 +254,7 @@ impl App { initial_prompt: Option, initial_images: Vec, resume_selection: ResumeSelection, + acp_resume_session_id: Option, #[cfg(feature = "feedback")] feedback: crate::feedback_compat::CodexFeedback, ) -> Result { use tokio_stream::StreamExt; @@ -302,6 +303,7 @@ impl App { #[cfg(feature = "feedback")] feedback: feedback.clone(), expected_model: None, // No filtering for fresh sessions + acp_resume_session_id: acp_resume_session_id.clone(), }; ChatWidget::new(init, conversation_manager.clone()) } @@ -327,6 +329,7 @@ impl App { #[cfg(feature = "feedback")] feedback: feedback.clone(), expected_model: None, // No filtering for resumed sessions + acp_resume_session_id: None, }; ChatWidget::new_from_existing( init, @@ -498,6 +501,7 @@ impl App { #[cfg(feature = "feedback")] feedback: self.feedback.clone(), expected_model: None, // No filtering for /new command + acp_resume_session_id: None, }; self.chat_widget = ChatWidget::new(init, self.server.clone()); if let Some(summary) = summary { @@ -1034,6 +1038,7 @@ impl App { #[cfg(feature = "feedback")] feedback: self.feedback.clone(), expected_model: Some(model_name.clone()), + acp_resume_session_id: None, }; self.chat_widget = ChatWidget::new(init, self.server.clone()); diff --git a/codex-rs/tui/src/app_backtrack.rs b/codex-rs/tui/src/app_backtrack.rs index e7fe7c473..62db43b7c 100644 --- a/codex-rs/tui/src/app_backtrack.rs +++ b/codex-rs/tui/src/app_backtrack.rs @@ -349,6 +349,7 @@ impl App { #[cfg(feature = "feedback")] feedback: self.feedback.clone(), expected_model: None, // No filtering for backtracked conversations + acp_resume_session_id: None, }; self.chat_widget = crate::chatwidget::ChatWidget::new_from_existing(init, conv, session_configured); diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index e03b594f9..006aca097 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -326,6 +326,8 @@ pub(crate) struct ChatWidgetInit { /// (e.g., from a previous agent) are ignored until SessionConfigured arrives /// with a matching model. This prevents race conditions when switching agents. pub(crate) expected_model: Option, + /// ACP-only: optional session ID to resume via `session/resume`. + pub(crate) acp_resume_session_id: Option, } #[derive(Default)] @@ -1481,10 +1483,16 @@ impl ChatWidget { #[cfg(feature = "feedback")] feedback, expected_model, + acp_resume_session_id, } = common; let mut rng = rand::rng(); let placeholder = EXAMPLE_PROMPTS[rng.random_range(0..EXAMPLE_PROMPTS.len())].to_string(); - let spawn_result = spawn_agent(config.clone(), app_event_tx.clone(), conversation_manager); + let spawn_result = spawn_agent( + config.clone(), + app_event_tx.clone(), + conversation_manager, + acp_resume_session_id, + ); let mut widget = Self { app_event_tx: app_event_tx.clone(), @@ -1571,6 +1579,7 @@ impl ChatWidget { #[cfg(feature = "feedback")] feedback, expected_model, + acp_resume_session_id: _, } = common; let mut rng = rand::rng(); let placeholder = EXAMPLE_PROMPTS[rng.random_range(0..EXAMPLE_PROMPTS.len())].to_string(); diff --git a/codex-rs/tui/src/chatwidget/agent.rs b/codex-rs/tui/src/chatwidget/agent.rs index f34ea2761..a63ae4b80 100644 --- a/codex-rs/tui/src/chatwidget/agent.rs +++ b/codex-rs/tui/src/chatwidget/agent.rs @@ -95,12 +95,13 @@ pub(crate) fn spawn_agent( config: Config, app_event_tx: AppEventSender, server: Arc, + acp_resume_session_id: Option, ) -> SpawnAgentResult { let acp_agent_result = get_agent_config(&config.model); match (acp_agent_result.is_ok(), config.acp_allow_http_fallback) { // Model is registered in ACP registry -> use ACP - (true, _) => spawn_acp_agent(config, app_event_tx), + (true, _) => spawn_acp_agent(config, app_event_tx, acp_resume_session_id), // Model NOT registered, but HTTP fallback is allowed -> use HTTP (false, true) => { @@ -156,7 +157,11 @@ fn spawn_error_agent( /// /// This uses the `codex_acp` crate to spawn an agent subprocess and handle /// communication via the Agent Client Protocol. -fn spawn_acp_agent(config: Config, app_event_tx: AppEventSender) -> SpawnAgentResult { +fn spawn_acp_agent( + config: Config, + app_event_tx: AppEventSender, + acp_resume_session_id: Option, +) -> SpawnAgentResult { let (codex_op_tx, mut codex_op_rx) = unbounded_channel::(); // Create the model command channel for model switching operations @@ -184,6 +189,7 @@ fn spawn_acp_agent(config: Config, app_event_tx: AppEventSender) -> SpawnAgentRe notify: config.notify.clone(), nori_home, history_persistence: HistoryPersistence::SaveAll, + resume_session_id: acp_resume_session_id, }; let backend = match AcpBackend::spawn(&acp_config, event_tx).await { diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index 888dfeda8..bf5ee2532 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -317,6 +317,7 @@ async fn helpers_are_available_and_do_not_panic() { #[cfg(feature = "feedback")] feedback: crate::feedback_compat::CodexFeedback::new(), expected_model: None, + acp_resume_session_id: None, }; let mut w = ChatWidget::new(init, conversation_manager); // Basic construction sanity. diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index f3c02ce3f..c8f81c4c6 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -578,8 +578,17 @@ async fn run_ratatui_app( initial_config }; + let is_acp_model = codex_acp::get_agent_config(&config.model).is_ok(); + let acp_resume_session_id = if is_acp_model { + cli.resume_session_id.clone() + } else { + None + }; + // Determine resume behavior: explicit id, then resume last, then picker. - let resume_selection = if let Some(id_str) = cli.resume_session_id.as_deref() { + let resume_selection = if acp_resume_session_id.is_some() { + resume_picker::ResumeSelection::StartFresh + } else if let Some(id_str) = cli.resume_session_id.as_deref() { match find_conversation_path_by_id_str(&config.codex_home, id_str).await? { Some(path) => resume_picker::ResumeSelection::Resume(path), None => { @@ -654,6 +663,7 @@ async fn run_ratatui_app( prompt, images, resume_selection, + acp_resume_session_id, feedback, ) .await; @@ -666,6 +676,7 @@ async fn run_ratatui_app( prompt, images, resume_selection, + acp_resume_session_id, ) .await; diff --git a/codex-rs/tui/src/test_backend.rs b/codex-rs/tui/src/test_backend.rs index a5460af2e..0f9e32800 100644 --- a/codex-rs/tui/src/test_backend.rs +++ b/codex-rs/tui/src/test_backend.rs @@ -2,6 +2,7 @@ use std::fmt::{self}; use std::io::Write; use std::io::{self}; +use crossterm::style::force_color_output; use ratatui::prelude::CrosstermBackend; use ratatui::backend::Backend; @@ -25,6 +26,7 @@ pub struct VT100Backend { impl VT100Backend { /// Creates a new `TestBackend` with the specified width and height. pub fn new(width: u16, height: u16) -> Self { + force_color_output(true); Self { crossterm_backend: CrosstermBackend::new(vt100::Parser::new(height, width, 0)), }