Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 197 additions & 5 deletions src/agent/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,41 @@ use crate::config::BrowserConfig;
use crate::error::Result;
use crate::hooks::{SpacebotHook, ToolNudgePolicy};
use crate::llm::SpacebotModel;
use crate::llm::routing::is_context_overflow_error;
use crate::llm::routing::{is_context_overflow_error, is_retriable_error};
use crate::{AgentDeps, ChannelId, ProcessId, ProcessType, WorkerId};
use rig::agent::AgentBuilder;
use rig::completion::CompletionModel;
use std::collections::HashMap;
use std::fmt::Write as _;
use std::path::PathBuf;
use tokio::sync::{mpsc, watch};
use uuid::Uuid;

/// How many turns per segment before we check context and potentially compact.
const TURNS_PER_SEGMENT: usize = 25;
///
/// Kept relatively low so compaction checks run frequently. Fast models can
/// burn through many tool-call turns quickly, and each turn may add large
/// tool results (browser snapshots, shell output). Checking every 15 turns
/// instead of 25 reduces the chance of blowing past the context window
/// within a single segment.
const TURNS_PER_SEGMENT: usize = 15;
Comment on lines 18 to +25
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Decouple segment cadence from the worker turn cap.

This constant still feeds default_max_turns() on Line 290, so dropping it to 15 does more than increase compaction frequency: it also reduces each worker prompt to a 15-turn Rig budget and makes MaxTurnsError much more likely. If the intent is “compact every 15 turns,” that needs a separate checkpoint from the worker’s actual max_turns. As per coding guidelines, "Set explicit max_turns on all Rig agents: max_turns(50) for workers, max_turns(10) for branches, max_turns(5) for channels".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/agent/worker.rs` around lines 18 - 25, TURNS_PER_SEGMENT is currently
reused by default_max_turns(), unintentionally shrinking worker Rig budgets and
causing MaxTurnsError; separate the compaction cadence from worker turn caps by
creating a new constant (e.g., COMPACTION_TURNS or SEGMENT_CHECK_TURNS) and use
that where compaction checks occur, then restore default_max_turns() to return
the intended worker cap (or stop using TURNS_PER_SEGMENT inside
default_max_turns()); also ensure all Rig agents explicitly set max_turns (call
max_turns(50) for workers, max_turns(10) for branches, max_turns(5) for
channels) so the worker prompt budgets are fixed regardless of compaction
cadence.


/// Max consecutive context overflow recoveries before giving up.
/// Prevents infinite compact-retry loops if something is fundamentally wrong.
const MAX_OVERFLOW_RETRIES: usize = 3;
/// Each retry dedup-strips stale tool results and force-compacts 75% of
/// remaining messages. Two retries is enough to handle the edge case where
/// a single message is enormous — beyond that, something is fundamentally
/// broken (system prompt alone exceeds the context window, or the compaction
/// floor of 4 messages is still too large).
const MAX_OVERFLOW_RETRIES: usize = 2;

/// Max consecutive transient provider error retries before giving up.
/// Transient errors (upstream 500s, timeouts, rate limits that survived
/// model-level retries) get a backoff-and-retry at the worker level so
/// the worker survives temporary provider outages.
const MAX_TRANSIENT_RETRIES: usize = 5;

/// Base delay for worker-level transient error backoff (doubles each retry).
const TRANSIENT_RETRY_BASE_DELAY: std::time::Duration = std::time::Duration::from_secs(5);

/// Max segments before the worker gives up and returns a partial result.
/// Prevents unbounded worker loops when the LLM keeps hitting max_turns
Expand Down Expand Up @@ -302,6 +322,7 @@ impl Worker {
let mut prompt = self.task.clone();
let mut segments_run = 0;
let mut overflow_retries = 0;
let mut transient_retries = 0;

let mut result = if resuming {
// For resumed workers, synthesize a "result" from the task
Expand All @@ -311,6 +332,17 @@ impl Worker {
loop {
segments_run += 1;

// Pre-prompt maintenance: dedup stale tool results and check
// context usage *before* each LLM call, not just at segment
// boundaries. Fast models can accumulate large tool results
// within a single segment and exceed the context window before
// we ever reach a checkpoint.
if segments_run > 1 {
dedup_tool_results(&mut history);
self.maybe_compact_history(&mut compacted_history, &mut history)
.await;
}

match self
.hook
.prompt_with_tool_nudge_retry(&agent, &mut history, &prompt)
Expand All @@ -321,6 +353,7 @@ impl Worker {
}
Err(rig::completion::PromptError::MaxTurnsError { .. }) => {
overflow_retries = 0;
transient_retries = 0;

if segments_run >= MAX_SEGMENTS {
tracing::warn!(
Expand All @@ -337,6 +370,7 @@ impl Worker {
}

self.persist_transcript(&compacted_history, &history).await;
dedup_tool_results(&mut history);
self.maybe_compact_history(&mut compacted_history, &mut history)
.await;
prompt =
Expand Down Expand Up @@ -377,13 +411,51 @@ impl Worker {
"context overflow, compacting and retrying"
);
self.hook.send_status("compacting (overflow recovery)");
dedup_tool_results(&mut history);
self.force_compact_history(&mut compacted_history, &mut history)
.await;
prompt = "Continue where you left off. Do not repeat completed work. \
Your previous attempt exceeded the context limit, so older history \
has been compacted."
.into();
}
Err(error) if is_retriable_error(&error.to_string()) => {
transient_retries += 1;
if transient_retries > MAX_TRANSIENT_RETRIES {
self.state = WorkerState::Failed;
self.hook.send_status("failed");
self.write_failure_log(&history, &format!(
"transient provider error after {MAX_TRANSIENT_RETRIES} retries: {error}"
));
self.persist_transcript(&compacted_history, &history).await;
tracing::error!(
worker_id = %self.id,
retries = MAX_TRANSIENT_RETRIES,
%error,
"worker transient error retries exhausted"
);
return Err(crate::error::AgentError::Other(error.into()).into());
}

let delay =
TRANSIENT_RETRY_BASE_DELAY * 2u32.pow((transient_retries - 1) as u32);
tracing::warn!(
worker_id = %self.id,
attempt = transient_retries,
delay_secs = delay.as_secs(),
%error,
"transient provider error, backing off and retrying"
);
self.hook.send_status(format!(
"provider error, retrying in {}s ({transient_retries}/{MAX_TRANSIENT_RETRIES})",
delay.as_secs()
));
tokio::time::sleep(delay).await;

// Don't change the prompt — just retry with the same
// state. The LLM never saw this request so there's
// nothing to "continue" from.
}
Err(error) => {
self.state = WorkerState::Failed;
self.hook.send_status("failed");
Expand Down Expand Up @@ -440,12 +512,14 @@ impl Worker {
self.state = WorkerState::Running;
self.hook.send_status("processing follow-up");

// Compact before follow-up if needed
// Dedup stale tool results and compact before follow-up if needed
dedup_tool_results(&mut history);
self.maybe_compact_history(&mut compacted_history, &mut history)
.await;

let mut follow_up_prompt = follow_up.clone();
let mut follow_up_overflow_retries = 0;
let mut follow_up_transient_retries = 0u32;
let follow_up_hook = self
.hook
.clone()
Expand Down Expand Up @@ -474,12 +548,38 @@ impl Worker {
"follow-up context overflow, compacting and retrying"
);
self.hook.send_status("compacting (overflow recovery)");
dedup_tool_results(&mut history);
self.force_compact_history(&mut compacted_history, &mut history)
.await;
let prompt_engine = self.deps.runtime_config.prompts.load();
let overflow_msg = prompt_engine.render_system_worker_overflow()?;
follow_up_prompt = format!("{follow_up}\n\n{overflow_msg}");
}
Err(error) if is_retriable_error(&error.to_string()) => {
follow_up_transient_retries += 1;
if follow_up_transient_retries > MAX_TRANSIENT_RETRIES as u32 {
let failure_reason = format!(
"follow-up transient error after {MAX_TRANSIENT_RETRIES} retries: {error}"
);
self.write_failure_log(&history, &failure_reason);
tracing::error!(worker_id = %self.id, %error, "follow-up transient retries exhausted");
break Err(failure_reason);
}
let delay = TRANSIENT_RETRY_BASE_DELAY
* 2u32.pow(follow_up_transient_retries - 1);
tracing::warn!(
worker_id = %self.id,
attempt = follow_up_transient_retries,
delay_secs = delay.as_secs(),
%error,
"follow-up transient error, backing off and retrying"
);
self.hook.send_status(format!(
"provider error, retrying in {}s ({follow_up_transient_retries}/{MAX_TRANSIENT_RETRIES})",
delay.as_secs()
));
tokio::time::sleep(delay).await;
}
Err(error) => {
let failure_reason = format!("follow-up failed: {error}");
self.write_failure_log(&history, &failure_reason);
Expand Down Expand Up @@ -865,6 +965,98 @@ impl Worker {
}
}

/// Tool names whose results are bulky and superseded by the latest call.
/// Only the most recent result for each tool is kept in full; older results
/// are replaced with a short marker to save context space.
///
/// This runs in-place on the history before every LLM call, so the model
/// always has the latest snapshot but doesn't waste context on stale ones.
const DEDUP_TOOL_RESULTS: &[&str] = &["browser_snapshot", "browser_tab_list"];

/// Replace all but the most recent result for each tool in `DEDUP_TOOL_RESULTS`
/// with a short placeholder. This dramatically reduces context usage for
/// browser-heavy workflows where `browser_snapshot` returns large ARIA trees
/// on every call.
///
/// Note: this mutates `history` in-place, so superseded results are also
/// replaced in the persisted transcript.
fn dedup_tool_results(history: &mut [rig::message::Message]) {
// Step 1: Build a map from tool-call ID → tool name for dedup-eligible tools.
// We need this because ToolResult only has call_id, not the tool name.
let mut call_id_to_tool: HashMap<String, String> = HashMap::new();
for message in history.iter() {
if let rig::message::Message::Assistant { content, .. } = message {
for item in content.iter() {
if let rig::message::AssistantContent::ToolCall(tc) = item
&& DEDUP_TOOL_RESULTS.contains(&tc.function.name.as_str())
{
// Rig uses call_id when present, falls back to id.
let effective_id = tc.call_id.as_ref().unwrap_or(&tc.id);
call_id_to_tool.insert(effective_id.clone(), tc.function.name.clone());
}
}
}
}

if call_id_to_tool.is_empty() {
return;
}

// Step 2: Find the last (most recent) result position for each tool name.
// Tracked as (message_index, item_index) since Rig can pack multiple
// ToolResult entries into a single User message.
let mut last_result_position: HashMap<&str, (usize, usize)> = HashMap::new();
for (message_index, message) in history.iter().enumerate() {
if let rig::message::Message::User { content } = message {
for (item_index, item) in content.iter().enumerate() {
if let rig::message::UserContent::ToolResult(tr) = item
&& let Some(call_id) = &tr.call_id
&& let Some(tool_name) = call_id_to_tool.get(call_id)
{
last_result_position.insert(
// Safe: tool_name came from DEDUP_TOOL_RESULTS which is 'static
DEDUP_TOOL_RESULTS
.iter()
.find(|&&name| name == tool_name)
.expect("tool name came from DEDUP_TOOL_RESULTS"),
(message_index, item_index),
);
}
}
}
}

// Step 3: Replace older results with a compact marker.
let mut replaced = 0usize;
for (message_index, message) in history.iter_mut().enumerate() {
if let rig::message::Message::User { content } = message {
for (item_index, item) in content.iter_mut().enumerate() {
if let rig::message::UserContent::ToolResult(tr) = item
&& let Some(call_id) = &tr.call_id
&& let Some(tool_name) = call_id_to_tool.get(call_id)
{
let is_last = last_result_position
.get(tool_name.as_str())
.is_some_and(|&last| last == (message_index, item_index));

if !is_last {
tr.content =
rig::OneOrMany::one(rig::message::ToolResultContent::text(format!(
"[{tool_name} output superseded by a more recent call — \
see latest {tool_name} result below]"
)));
replaced += 1;
}
}
}
}
}

if replaced > 0 {
tracing::debug!(replaced, "deduped stale tool results in history");
}
}

/// Build a recap of removed worker history for the compaction marker.
///
/// Extracts tool calls, assistant text, and tool results so the worker
Expand Down
Loading