From 8f20589aae0952c58e6fa2fe56baa004b17ed998 Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Wed, 11 Mar 2026 16:59:09 -0700 Subject: [PATCH 1/4] fix(slack): normalize conversation ID to prevent thread splits Slack thread replies included thread_ts in the conversation ID, producing a different key than the original top-level message. This caused the main event loop to create a second channel for the same Slack conversation, orphaning the original worker results and outbound response routing. Always use slack:{team}:{channel} as the conversation ID regardless of thread context. Thread targeting for outbound replies still works via slack_thread_ts in message metadata. --- src/messaging/slack.rs | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/src/messaging/slack.rs b/src/messaging/slack.rs index 1ba15ba43..8474810ee 100644 --- a/src/messaging/slack.rs +++ b/src/messaging/slack.rs @@ -238,11 +238,7 @@ async fn handle_message_event( } } - let base_conversation_id = if let Some(ref thread_ts) = msg_event.origin.thread_ts { - format!("slack:{}:{}:{}", team_id_str, channel_id, thread_ts.0) - } else { - format!("slack:{}:{}", team_id_str, channel_id) - }; + let base_conversation_id = format!("slack:{}:{}", team_id_str, channel_id); let conversation_id = apply_runtime_adapter_to_conversation_id(&adapter_state.runtime_key, base_conversation_id); @@ -376,11 +372,7 @@ async fn handle_app_mention_event( return Ok(()); } - let base_conversation_id = if let Some(ref thread_ts) = mention.origin.thread_ts { - format!("slack:{}:{}:{}", team_id_str, channel_id, thread_ts.0) - } else { - format!("slack:{}:{}", team_id_str, channel_id) - }; + let base_conversation_id = format!("slack:{}:{}", team_id_str, channel_id); let conversation_id = apply_runtime_adapter_to_conversation_id(&adapter_state.runtime_key, base_conversation_id); @@ -658,11 +650,7 @@ async fn handle_interaction_event( // Use trigger_id as the unique message id for this interaction turn. let msg_id = block_actions.trigger_id.0.clone(); - let base_conversation_id = if let Some(ref ts) = message_ts { - format!("slack:{}:{}:{}", team_id, channel_id, ts) - } else { - format!("slack:{}:{}", team_id, channel_id) - }; + let base_conversation_id = format!("slack:{}:{}", team_id, channel_id); let conversation_id = apply_runtime_adapter_to_conversation_id(&adapter_state.runtime_key, base_conversation_id); From bda4c9df99806acff95e0cd0f86346ed2c1babfb Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Wed, 11 Mar 2026 17:26:04 -0700 Subject: [PATCH 2/4] fix(slack): pin outbound target at enqueue time to prevent cross-thread misrouting The previous latest_message approach used a shared mutable reference that could be overwritten by a newer message from a different thread before the worker's response was delivered. With conversation IDs now per-channel (not per-thread), this race becomes possible when multiple Slack threads interleave. Introduces RoutedResponse which pairs each OutboundResponse with the InboundMessage that triggered it. The channel captures current_inbound at the start of each turn and all send sites (channel, reply, react, skip, send_file) use it to pin the outbound target. The outbound routing task in main.rs reads the paired target instead of a shared latest_message. --- src/agent/channel.rs | 70 +++++++++--- src/cron/scheduler.rs | 14 ++- src/lib.rs | 57 ++++++++++ src/main.rs | 248 ++++++++++++++--------------------------- src/tools.rs | 6 +- src/tools/react.rs | 7 +- src/tools/reply.rs | 7 +- src/tools/send_file.rs | 21 ++-- src/tools/skip.rs | 7 +- 9 files changed, 223 insertions(+), 214 deletions(-) diff --git a/src/agent/channel.rs b/src/agent/channel.rs index 89680b7a1..8b5a3d47d 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -21,7 +21,7 @@ use crate::hooks::SpacebotHook; use crate::llm::SpacebotModel; use crate::{ AgentDeps, BranchId, ChannelId, InboundMessage, OutboundResponse, ProcessEvent, ProcessId, - ProcessType, WorkerId, + ProcessType, RoutedResponse, RoutedSender, WorkerId, }; use rig::agent::AgentBuilder; use rig::completion::CompletionModel; @@ -329,9 +329,12 @@ pub struct Channel { /// Event receiver for process events. pub event_rx: broadcast::Receiver, /// Outbound response sender for the messaging layer. - pub response_tx: mpsc::Sender, + pub response_tx: mpsc::Sender, /// Self-sender for re-triggering the channel after background process completion. pub self_tx: mpsc::Sender, + /// The inbound message currently being processed. Used to pair outbound + /// responses with the correct platform routing metadata (e.g. Slack thread_ts). + current_inbound: Option, /// Conversation ID from the first message (for synthetic re-trigger messages). pub conversation_id: Option, /// Adapter source captured from the first non-system message. @@ -405,7 +408,7 @@ impl Channel { pub fn new( id: ChannelId, deps: AgentDeps, - response_tx: mpsc::Sender, + response_tx: mpsc::Sender, event_rx: broadcast::Receiver, screenshot_dir: std::path::PathBuf, logs_dir: std::path::PathBuf, @@ -486,6 +489,7 @@ impl Channel { event_rx, response_tx, self_tx, + current_inbound: None, conversation_id: None, source_adapter: None, conversation_context: None, @@ -778,12 +782,35 @@ impl Channel { (invoked_by_command, invoked_by_mention, invoked_by_reply) } + /// Send a routed response paired with the current inbound message. + /// + /// Falls back to a bare response with a placeholder target if no inbound + /// message is set (should not happen during normal turn processing). + async fn send_routed( + &self, + response: OutboundResponse, + ) -> std::result::Result<(), mpsc::error::SendError> { + let routed = match &self.current_inbound { + Some(target) => RoutedResponse { + response, + target: target.clone(), + }, + None => { + tracing::warn!( + channel_id = %self.id, + "sending response without a current inbound message" + ); + RoutedResponse { + response, + target: InboundMessage::empty(), + } + } + }; + self.response_tx.send(routed).await + } + async fn send_builtin_text(&mut self, text: String, log_label: &str) { - match self - .response_tx - .send(OutboundResponse::Text(text.clone())) - .await - { + match self.send_routed(OutboundResponse::Text(text.clone())).await { Ok(()) => { #[cfg(feature = "metrics")] { @@ -1528,6 +1555,13 @@ impl Channel { // Apply runtime-config updates immediately without requiring a restart. self.sync_listen_only_mode_from_runtime(); + // Track the inbound message that triggered this turn so outbound + // responses carry the correct routing metadata (e.g. Slack thread_ts). + // System retrigger messages keep the previous inbound target. + if message.source != "system" { + self.current_inbound = Some(message.clone()); + } + tracing::info!( channel_id = %self.id, message_id = %message.id, @@ -2209,10 +2243,17 @@ impl Channel { .clone() .map(|tool| tool.with_originating_channel(conversation_id.to_string())); + let routed_sender = RoutedSender::new( + self.response_tx.clone(), + self.current_inbound + .clone() + .unwrap_or_else(InboundMessage::empty), + ); + if let Err(error) = crate::tools::add_channel_tools( &self.tool_server, self.state.clone(), - self.response_tx.clone(), + routed_sender, conversation_id, skip_flag.clone(), replied_flag.clone(), @@ -2246,8 +2287,7 @@ impl Channel { .build(); let _ = self - .response_tx - .send(OutboundResponse::Status(crate::StatusUpdate::Thinking)) + .send_routed(OutboundResponse::Status(crate::StatusUpdate::Thinking)) .await; // Inject attachments as a user message before the text prompt @@ -2347,7 +2387,7 @@ impl Channel { /// Send outbound text and record send metrics. async fn send_outbound_text(&self, text: String, error_context: &str) { - match self.response_tx.send(OutboundResponse::Text(text)).await { + match self.send_routed(OutboundResponse::Text(text)).await { Ok(()) => { #[cfg(feature = "metrics")] { @@ -2606,8 +2646,7 @@ impl Channel { .inc(); // Send error to user so they know something went wrong let error_msg = format!("I encountered an error: {}", error); - self.response_tx - .send(OutboundResponse::Text(error_msg)) + self.send_routed(OutboundResponse::Text(error_msg)) .await .ok(); tracing::error!(channel_id = %self.id, %error, "channel LLM call failed"); @@ -2616,8 +2655,7 @@ impl Channel { // Ensure typing indicator is always cleaned up, even on error paths let _ = self - .response_tx - .send(OutboundResponse::Status(crate::StatusUpdate::StopTyping)) + .send_routed(OutboundResponse::Status(crate::StatusUpdate::StopTyping)) .await; } diff --git a/src/cron/scheduler.rs b/src/cron/scheduler.rs index 1b78f6d22..1d0fe5d60 100644 --- a/src/cron/scheduler.rs +++ b/src/cron/scheduler.rs @@ -10,7 +10,7 @@ use crate::cron::store::CronStore; use crate::error::Result; use crate::messaging::MessagingManager; use crate::messaging::target::{BroadcastTarget, parse_delivery_target}; -use crate::{AgentDeps, InboundMessage, MessageContent, OutboundResponse}; +use crate::{AgentDeps, InboundMessage, MessageContent, OutboundResponse, RoutedResponse}; use chrono::Timelike; use chrono_tz::Tz; use cron::Schedule; @@ -849,7 +849,7 @@ async fn run_cron_job(job: &CronJob, context: &CronContext) -> Result<()> { let channel_id: crate::ChannelId = Arc::from(format!("cron:{}", job.id).as_str()); // Create the outbound response channel to collect whatever the channel produces - let (response_tx, mut response_rx) = tokio::sync::mpsc::channel::(32); + let (response_tx, mut response_rx) = tokio::sync::mpsc::channel::(32); // Subscribe to the agent's event bus (the channel needs this for branch/worker events) let event_rx = context.deps.event_tx.subscribe(); @@ -909,10 +909,16 @@ async fn run_cron_job(job: &CronJob, context: &CronContext) -> Result<()> { break; } match tokio::time::timeout(remaining, response_rx.recv()).await { - Ok(Some(OutboundResponse::Text(text))) => { + Ok(Some(RoutedResponse { + response: OutboundResponse::Text(text), + .. + })) => { collected_text.push(text); } - Ok(Some(OutboundResponse::RichMessage { text, .. })) => { + Ok(Some(RoutedResponse { + response: OutboundResponse::RichMessage { text, .. }, + .. + })) => { collected_text.push(text); } Ok(Some(_)) => {} diff --git a/src/lib.rs b/src/lib.rs index fba359745..fb36ba0c7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,6 +37,7 @@ pub use error::{Error, Result}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; +use tokio::sync::mpsc; /// Signal from the API to the main event loop to trigger provider setup. #[derive(Debug)] @@ -473,6 +474,23 @@ pub struct InboundMessage { } impl InboundMessage { + /// Construct an empty placeholder message. Used as a fallback when no + /// inbound context is available for outbound routing. + pub fn empty() -> Self { + Self { + id: String::new(), + source: String::new(), + adapter: None, + conversation_id: String::new(), + sender_id: String::new(), + agent_id: None, + content: MessageContent::Text(String::new()), + timestamp: chrono::Utc::now(), + metadata: HashMap::new(), + formatted_author: None, + } + } + /// Runtime adapter key for routing outbound operations. /// /// Falls back to the platform source for backward compatibility. @@ -571,6 +589,45 @@ pub struct Attachment { pub auth_header: Option, } +/// An outbound response paired with the inbound message that triggered it. +/// +/// This ensures outbound routing targets the correct thread/conversation even +/// when multiple threads share the same channel (e.g. Slack threads within a +/// single channel). The paired `InboundMessage` carries the platform metadata +/// (thread_ts, message_ts, etc.) needed to route the response correctly. +#[derive(Debug, Clone)] +pub struct RoutedResponse { + pub response: OutboundResponse, + pub target: InboundMessage, +} + +/// A sender that automatically pairs outbound responses with a captured +/// inbound message target. Used by channel tools (reply, react, etc.) so +/// they don't need direct access to the triggering `InboundMessage`. +#[derive(Debug, Clone)] +pub struct RoutedSender { + inner: mpsc::Sender, + target: InboundMessage, +} + +impl RoutedSender { + pub fn new(inner: mpsc::Sender, target: InboundMessage) -> Self { + Self { inner, target } + } + + pub async fn send( + &self, + response: OutboundResponse, + ) -> std::result::Result<(), mpsc::error::SendError> { + self.inner + .send(RoutedResponse { + response, + target: self.target.clone(), + }) + .await + } +} + /// Outbound response to messaging platforms. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] diff --git a/src/main.rs b/src/main.rs index 776673a0f..93dffbe6c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -179,10 +179,6 @@ enum SecretsCommand { /// Tracks an active conversation channel and its message sender. struct ActiveChannel { message_tx: mpsc::Sender, - /// Latest inbound message for this conversation, shared with the outbound - /// routing task so status updates (e.g. typing indicators) target the - /// most recent message rather than the first one the channel ever received. - latest_message: Arc>, /// Retained so the outbound routing task stays alive. _outbound_handle: tokio::task::JoinHandle<()>, } @@ -270,6 +266,68 @@ fn render_conversation_history_backfill( serialize_backfill_transcript(entries) } +/// Forward outbound response events to SSE clients for the dashboard. +fn forward_sse_event( + api_event_tx: &tokio::sync::broadcast::Sender, + agent_id: &str, + channel_id: &str, + response: &spacebot::OutboundResponse, +) { + match response { + spacebot::OutboundResponse::Text(text) + | spacebot::OutboundResponse::RichMessage { text, .. } + | spacebot::OutboundResponse::ThreadReply { text, .. } => { + api_event_tx + .send(spacebot::api::ApiEvent::OutboundMessage { + agent_id: agent_id.to_string(), + channel_id: channel_id.to_string(), + text: text.clone(), + }) + .ok(); + } + spacebot::OutboundResponse::Status(spacebot::StatusUpdate::Thinking) => { + api_event_tx + .send(spacebot::api::ApiEvent::TypingState { + agent_id: agent_id.to_string(), + channel_id: channel_id.to_string(), + is_typing: true, + }) + .ok(); + } + spacebot::OutboundResponse::Status(spacebot::StatusUpdate::StopTyping) => { + api_event_tx + .send(spacebot::api::ApiEvent::TypingState { + agent_id: agent_id.to_string(), + channel_id: channel_id.to_string(), + is_typing: false, + }) + .ok(); + } + _ => {} + } +} + +/// Route an outbound response to the messaging adapter using the pinned target +/// message for platform routing metadata (thread_ts, channel_id, etc.). +async fn route_outbound( + messaging: &std::sync::Arc, + target: &spacebot::InboundMessage, + response: spacebot::OutboundResponse, +) { + match response { + spacebot::OutboundResponse::Status(status) => { + if let Err(error) = messaging.send_status(target, status).await { + tracing::warn!(%error, "failed to send status update"); + } + } + response => { + if let Err(error) = messaging.respond(target, response).await { + tracing::error!(%error, "failed to send outbound response"); + } + } + } +} + fn main() -> anyhow::Result<()> { rustls::crypto::ring::default_provider() .install_default() @@ -1746,7 +1804,7 @@ async fn run( } let (response_tx, mut response_rx) = - mpsc::channel::(32); + mpsc::channel::(32); let event_rx = agent.deps.event_tx.subscribe(); let channel_id: spacebot::ChannelId = Arc::from(conversation_id.as_str()); @@ -1873,99 +1931,20 @@ async fn run( .await; }); - // Outbound response routing for this pre-created channel. - // Since there's no inbound message yet, we create a placeholder. - let latest_message = - Arc::new(tokio::sync::RwLock::new(spacebot::InboundMessage { - id: uuid::Uuid::new_v4().to_string(), - source: "internal".to_string(), - adapter: None, - conversation_id: conversation_id.clone(), - content: spacebot::MessageContent::Text(String::new()), - sender_id: String::new(), - formatted_author: None, - metadata: std::collections::HashMap::new(), - agent_id: Some(agent_id.clone()), - timestamp: chrono::Utc::now(), - })); - let outbound_message = latest_message.clone(); let messaging_for_outbound = messaging_manager.clone(); let api_event_tx = api_state.event_tx.clone(); let sse_agent_id = agent_id.to_string(); let sse_channel_id = conversation_id.clone(); let outbound_handle = tokio::spawn(async move { - while let Some(response) = response_rx.recv().await { - match &response { - spacebot::OutboundResponse::Text(text) => { - api_event_tx - .send(spacebot::api::ApiEvent::OutboundMessage { - agent_id: sse_agent_id.clone(), - channel_id: sse_channel_id.clone(), - text: text.clone(), - }) - .ok(); - } - spacebot::OutboundResponse::RichMessage { text, .. } => { - api_event_tx - .send(spacebot::api::ApiEvent::OutboundMessage { - agent_id: sse_agent_id.clone(), - channel_id: sse_channel_id.clone(), - text: text.clone(), - }) - .ok(); - } - spacebot::OutboundResponse::ThreadReply { text, .. } => { - api_event_tx - .send(spacebot::api::ApiEvent::OutboundMessage { - agent_id: sse_agent_id.clone(), - channel_id: sse_channel_id.clone(), - text: text.clone(), - }) - .ok(); - } - spacebot::OutboundResponse::Status( - spacebot::StatusUpdate::Thinking, - ) => { - api_event_tx - .send(spacebot::api::ApiEvent::TypingState { - agent_id: sse_agent_id.clone(), - channel_id: sse_channel_id.clone(), - is_typing: true, - }) - .ok(); - } - spacebot::OutboundResponse::Status( - spacebot::StatusUpdate::StopTyping, - ) => { - api_event_tx - .send(spacebot::api::ApiEvent::TypingState { - agent_id: sse_agent_id.clone(), - channel_id: sse_channel_id.clone(), - is_typing: false, - }) - .ok(); - } - _ => {} - } - let current_message = outbound_message.read().await.clone(); - match response { - spacebot::OutboundResponse::Status(status) => { - if let Err(error) = messaging_for_outbound - .send_status(¤t_message, status) - .await - { - tracing::warn!(%error, "failed to send status update"); - } - } - response => { - if let Err(error) = messaging_for_outbound - .respond(¤t_message, response) - .await - { - tracing::error!(%error, "failed to send outbound response"); - } - } - } + while let Some(routed) = response_rx.recv().await { + let spacebot::RoutedResponse { response, target } = routed; + forward_sse_event( + &api_event_tx, + &sse_agent_id, + &sse_channel_id, + &response, + ); + route_outbound(&messaging_for_outbound, &target, response).await; } }); @@ -1973,7 +1952,6 @@ async fn run( conversation_id.clone(), ActiveChannel { message_tx: channel_tx, - latest_message, _outbound_handle: outbound_handle, }, ); @@ -2030,7 +2008,7 @@ async fn run( }; // Create outbound response channel - let (response_tx, mut response_rx) = mpsc::channel::(32); + let (response_tx, mut response_rx) = mpsc::channel::(32); // Subscribe to the agent's event bus let event_rx = agent.deps.event_tx.subscribe(); @@ -2122,84 +2100,24 @@ async fn run( // Spawn outbound response routing: reads from response_rx, // sends to the messaging adapter and forwards to SSE let messaging_for_outbound = messaging_manager.clone(); - let latest_message = Arc::new(tokio::sync::RwLock::new(message.clone())); - let outbound_message = latest_message.clone(); let outbound_conversation_id = conversation_id.clone(); let api_event_tx = api_state.event_tx.clone(); let sse_agent_id = agent_id.to_string(); let sse_channel_id = conversation_id.clone(); let outbound_handle = tokio::spawn(async move { - while let Some(response) = response_rx.recv().await { - // Forward relevant events to SSE clients - match &response { - spacebot::OutboundResponse::Text(text) => { - api_event_tx.send(spacebot::api::ApiEvent::OutboundMessage { - agent_id: sse_agent_id.clone(), - channel_id: sse_channel_id.clone(), - text: text.clone(), - }).ok(); - } - spacebot::OutboundResponse::RichMessage { text, .. } => { - api_event_tx.send(spacebot::api::ApiEvent::OutboundMessage { - agent_id: sse_agent_id.clone(), - channel_id: sse_channel_id.clone(), - text: text.clone(), - }).ok(); - } - spacebot::OutboundResponse::ThreadReply { text, .. } => { - api_event_tx.send(spacebot::api::ApiEvent::OutboundMessage { - agent_id: sse_agent_id.clone(), - channel_id: sse_channel_id.clone(), - text: text.clone(), - }).ok(); - } - spacebot::OutboundResponse::Status(spacebot::StatusUpdate::Thinking) => { - api_event_tx.send(spacebot::api::ApiEvent::TypingState { - agent_id: sse_agent_id.clone(), - channel_id: sse_channel_id.clone(), - is_typing: true, - }).ok(); - } - spacebot::OutboundResponse::Status(spacebot::StatusUpdate::StopTyping) => { - api_event_tx.send(spacebot::api::ApiEvent::TypingState { - agent_id: sse_agent_id.clone(), - channel_id: sse_channel_id.clone(), - is_typing: false, - }).ok(); - } - _ => {} - } - - let current_message = outbound_message.read().await.clone(); - - match response { - spacebot::OutboundResponse::Status(status) => { - if let Err(error) = messaging_for_outbound - .send_status(¤t_message, status) - .await - { - tracing::warn!(%error, "failed to send status update"); - } - } - response => { - tracing::info!( - conversation_id = %outbound_conversation_id, - "routing outbound response to messaging adapter" - ); - if let Err(error) = messaging_for_outbound - .respond(¤t_message, response) - .await - { - tracing::error!(%error, "failed to send outbound response"); - } - } - } + while let Some(routed) = response_rx.recv().await { + let spacebot::RoutedResponse { response, target } = routed; + forward_sse_event(&api_event_tx, &sse_agent_id, &sse_channel_id, &response); + route_outbound(&messaging_for_outbound, &target, response).await; } + tracing::debug!( + conversation_id = %outbound_conversation_id, + "outbound response channel closed" + ); }); active_channels.insert(conversation_id.clone(), ActiveChannel { message_tx: channel_tx, - latest_message, _outbound_handle: outbound_handle, }); @@ -2212,10 +2130,6 @@ async fn run( // Forward the message to the channel if let Some(active) = active_channels.get(&conversation_id) { - // Update the shared message reference so outbound routing - // (typing indicators, reactions) targets this message - *active.latest_message.write().await = message.clone(); - // Emit inbound message to SSE clients let sender_name = message.formatted_author.clone().or_else(|| { message diff --git a/src/tools.rs b/src/tools.rs index eeaecd3ac..11ee60585 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -175,12 +175,12 @@ use crate::config::{BrowserConfig, RuntimeConfig}; use crate::memory::MemorySearch; use crate::sandbox::Sandbox; use crate::tasks::TaskStore; -use crate::{AgentId, ChannelId, OutboundResponse, ProcessEvent, WorkerId}; +use crate::{AgentId, ChannelId, ProcessEvent, RoutedSender, WorkerId}; use rig::tool::Tool as _; use rig::tool::server::{ToolServer, ToolServerHandle}; use std::path::PathBuf; use std::sync::Arc; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::broadcast; #[derive(Debug, Clone)] pub enum BranchToolProfile { @@ -348,7 +348,7 @@ pub fn should_block_user_visible_text(value: &str) -> bool { pub async fn add_channel_tools( handle: &ToolServerHandle, state: ChannelState, - response_tx: mpsc::Sender, + response_tx: RoutedSender, conversation_id: impl Into, skip_flag: SkipFlag, replied_flag: RepliedFlag, diff --git a/src/tools/react.rs b/src/tools/react.rs index 6679d5edf..f62d8be99 100644 --- a/src/tools/react.rs +++ b/src/tools/react.rs @@ -1,20 +1,19 @@ //! React tool for adding emoji reactions to messages (channel only). -use crate::OutboundResponse; +use crate::{OutboundResponse, RoutedSender}; use rig::completion::ToolDefinition; use rig::tool::Tool; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc; /// Tool for reacting to messages with emoji. #[derive(Debug, Clone)] pub struct ReactTool { - response_tx: mpsc::Sender, + response_tx: RoutedSender, } impl ReactTool { - pub fn new(response_tx: mpsc::Sender) -> Self { + pub fn new(response_tx: RoutedSender) -> Self { Self { response_tx } } } diff --git a/src/tools/reply.rs b/src/tools/reply.rs index 91c68b2e9..57e645bf7 100644 --- a/src/tools/reply.rs +++ b/src/tools/reply.rs @@ -2,7 +2,7 @@ use crate::conversation::ConversationLogger; -use crate::{ChannelId, OutboundResponse}; +use crate::{ChannelId, OutboundResponse, RoutedSender}; use regex::Regex; use rig::completion::ToolDefinition; use rig::tool::Tool; @@ -12,7 +12,6 @@ use std::collections::HashMap; use std::sync::Arc; use std::sync::LazyLock; use std::sync::atomic::{AtomicBool, Ordering}; -use tokio::sync::mpsc; static BROKEN_DISCORD_MENTION_REGEX: LazyLock = LazyLock::new(|| { Regex::new(r"<{2,}@(!?)>\s*(\d{15,22})>").expect("hardcoded broken mention regex") @@ -40,7 +39,7 @@ pub fn new_replied_flag() -> RepliedFlag { /// tools once and shares them across calls. #[derive(Debug, Clone)] pub struct ReplyTool { - response_tx: mpsc::Sender, + response_tx: RoutedSender, conversation_id: String, conversation_logger: ConversationLogger, channel_id: ChannelId, @@ -51,7 +50,7 @@ pub struct ReplyTool { impl ReplyTool { /// Create a new reply tool bound to a conversation's response channel. pub fn new( - response_tx: mpsc::Sender, + response_tx: RoutedSender, conversation_id: impl Into, conversation_logger: ConversationLogger, channel_id: ChannelId, diff --git a/src/tools/send_file.rs b/src/tools/send_file.rs index 60e4ce8f5..ea1d4e000 100644 --- a/src/tools/send_file.rs +++ b/src/tools/send_file.rs @@ -1,14 +1,13 @@ //! Send file tool for delivering file attachments to users (channel only). -use crate::OutboundResponse; use crate::sandbox::Sandbox; +use crate::{OutboundResponse, RoutedSender}; use rig::completion::ToolDefinition; use rig::tool::Tool; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::path::PathBuf; use std::sync::Arc; -use tokio::sync::mpsc; /// Tool for sending files to users. /// @@ -19,17 +18,13 @@ use tokio::sync::mpsc; /// workspace boundary. When sandbox is disabled, any readable path is allowed. #[derive(Debug, Clone)] pub struct SendFileTool { - response_tx: mpsc::Sender, + response_tx: RoutedSender, workspace: PathBuf, sandbox: Arc, } impl SendFileTool { - pub fn new( - response_tx: mpsc::Sender, - workspace: PathBuf, - sandbox: Arc, - ) -> Self { + pub fn new(response_tx: RoutedSender, workspace: PathBuf, sandbox: Arc) -> Self { Self { response_tx, workspace, @@ -239,7 +234,8 @@ mod tests { fn create_tool(workspace: PathBuf) -> SendFileTool { let sandbox = create_sandbox(SandboxMode::Enabled, &workspace); - let (response_tx, _response_rx) = mpsc::channel(1); + let (tx, _rx) = tokio::sync::mpsc::channel(1); + let response_tx = RoutedSender::new(tx, crate::InboundMessage::empty()); SendFileTool::new(response_tx, workspace, sandbox) } @@ -320,7 +316,8 @@ mod tests { fs::write(&file, "public data").expect("failed to write file"); let sandbox = create_sandbox(SandboxMode::Disabled, &workspace); - let (response_tx, mut response_rx) = mpsc::channel(1); + let (tx, mut response_rx) = tokio::sync::mpsc::channel(1); + let response_tx = RoutedSender::new(tx, crate::InboundMessage::empty()); let tool = SendFileTool::new(response_tx, workspace, sandbox); let result = tool @@ -336,10 +333,10 @@ mod tests { assert_eq!(result.size_bytes, 11); // Verify the file data was actually sent through the channel. - let response = response_rx + let routed = response_rx .try_recv() .expect("should have received response"); - match response { + match routed.response { crate::OutboundResponse::File { filename, data, .. } => { assert_eq!(filename, "report.txt"); assert_eq!(data, b"public data"); diff --git a/src/tools/skip.rs b/src/tools/skip.rs index 481b5fb8b..ebf7b050a 100644 --- a/src/tools/skip.rs +++ b/src/tools/skip.rs @@ -5,14 +5,13 @@ //! instead of `reply`. The channel checks the skip flag after the LLM turn and //! suppresses any fallback text output. -use crate::OutboundResponse; +use crate::{OutboundResponse, RoutedSender}; use rig::completion::ToolDefinition; use rig::tool::Tool; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; -use tokio::sync::mpsc; /// Shared flag between the SkipTool and the channel event loop. /// @@ -29,11 +28,11 @@ pub fn new_skip_flag() -> SkipFlag { #[derive(Debug, Clone)] pub struct SkipTool { flag: SkipFlag, - response_tx: mpsc::Sender, + response_tx: RoutedSender, } impl SkipTool { - pub fn new(flag: SkipFlag, response_tx: mpsc::Sender) -> Self { + pub fn new(flag: SkipFlag, response_tx: RoutedSender) -> Self { Self { flag, response_tx } } } From 6594c021064205243444549bea046cbd1665ec1e Mon Sep 17 00:00:00 2001 From: Jamie Pine Date: Wed, 11 Mar 2026 23:13:33 -0700 Subject: [PATCH 3/4] fix: address pre-existing clippy warnings and test compilation Update context_dump tests to use RoutedSender. Fix collapsible_if warnings in send_message_to_another_channel.rs. --- src/tools/send_message_to_another_channel.rs | 54 +++++++++----------- tests/context_dump.rs | 6 ++- 2 files changed, 29 insertions(+), 31 deletions(-) diff --git a/src/tools/send_message_to_another_channel.rs b/src/tools/send_message_to_another_channel.rs index abf72e239..bb0681fab 100644 --- a/src/tools/send_message_to_another_channel.rs +++ b/src/tools/send_message_to_another_channel.rs @@ -150,14 +150,13 @@ impl Tool for SendMessageTool { // If explicit prefix returned default "signal" adapter but we're in a named // Signal adapter conversation (e.g., signal:gvoice1), use the current adapter // to ensure the message goes through the correct account. - if target.adapter == "signal" { - if let Some(current_adapter) = self + if target.adapter == "signal" + && let Some(current_adapter) = self .current_adapter .as_ref() .filter(|adapter| adapter.starts_with("signal:")) - { - target.adapter = current_adapter.clone(); - } + { + target.adapter = current_adapter.clone(); } self.messaging_manager @@ -189,31 +188,28 @@ impl Tool for SendMessageTool { .current_adapter .as_ref() .filter(|adapter| adapter.starts_with("signal")) + && let Some(target) = parse_implicit_signal_shorthand(&args.target, current_adapter) { - if let Some(target) = parse_implicit_signal_shorthand(&args.target, current_adapter) { - self.messaging_manager - .broadcast( - &target.adapter, - &target.target, - crate::OutboundResponse::Text(args.message), - ) - .await - .map_err(|error| { - SendMessageError(format!("failed to send message: {error}")) - })?; - - tracing::info!( - adapter = %target.adapter, - broadcast_target = %"[REDACTED]", - "message sent via implicit Signal shorthand" - ); - - return Ok(SendMessageOutput { - success: true, - target: target.target, - platform: target.adapter, - }); - } + self.messaging_manager + .broadcast( + &target.adapter, + &target.target, + crate::OutboundResponse::Text(args.message), + ) + .await + .map_err(|error| SendMessageError(format!("failed to send message: {error}")))?; + + tracing::info!( + adapter = %target.adapter, + broadcast_target = %"[REDACTED]", + "message sent via implicit Signal shorthand" + ); + + return Ok(SendMessageOutput { + success: true, + target: target.target, + platform: target.adapter, + }); } // Check for explicit email target diff --git a/tests/context_dump.rs b/tests/context_dump.rs index a7595719d..d53b1f826 100644 --- a/tests/context_dump.rs +++ b/tests/context_dump.rs @@ -225,7 +225,8 @@ async fn dump_channel_context() { let status_block = Arc::new(tokio::sync::RwLock::new( spacebot::agent::status::StatusBlock::new(), )); - let (response_tx, _response_rx) = tokio::sync::mpsc::channel(16); + let (raw_tx, _response_rx) = tokio::sync::mpsc::channel(16); + let response_tx = spacebot::RoutedSender::new(raw_tx, spacebot::InboundMessage::empty()); let state = spacebot::agent::channel::ChannelState { channel_id, @@ -459,7 +460,8 @@ async fn dump_all_contexts() { let channel_prompt = build_channel_system_prompt(rc); let channel_id: spacebot::ChannelId = Arc::from("test-channel"); - let (response_tx, _response_rx) = tokio::sync::mpsc::channel(16); + let (raw_tx, _response_rx) = tokio::sync::mpsc::channel(16); + let response_tx = spacebot::RoutedSender::new(raw_tx, spacebot::InboundMessage::empty()); let state = spacebot::agent::channel::ChannelState { channel_id, history: Arc::new(tokio::sync::RwLock::new(Vec::new())), From d9e14ae267adfc45152890528f6750b78d9bc033 Mon Sep 17 00:00:00 2001 From: James Pine Date: Thu, 12 Mar 2026 02:29:31 -0700 Subject: [PATCH 4/4] fix: address review feedback for PR #406 - Set current_inbound in handle_message_batch() from the last non-system message so the RoutedSender carries correct routing metadata (e.g. Slack thread_ts) instead of stale/empty data. - Replace `let _ =` with `.ok()` on best-effort send_routed calls for Thinking and StopTyping status updates per repo conventions. - Thread slack_thread_ts through add_channel_tools into the cron default delivery target so cron jobs created inside a Slack thread post results back into that thread. Encodes thread_ts as a #thread: suffix in the broadcast target string, parsed by Slack's broadcast() method. --- src/agent/channel.rs | 39 +++++++++++++++++++++++++++------------ src/messaging/slack.rs | 16 ++++++++++++---- src/tools.rs | 17 +++++++++++++++-- tests/context_dump.rs | 2 ++ 4 files changed, 56 insertions(+), 18 deletions(-) diff --git a/src/agent/channel.rs b/src/agent/channel.rs index 8b5a3d47d..eadc0c069 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -1449,6 +1449,13 @@ impl Channel { *reply_target = messages.iter().rev().find_map(extract_message_id); } + // Pin the inbound routing target from the last non-system message in the + // batch so the RoutedSender (and send_routed) carry the correct platform + // metadata (e.g. Slack thread_ts) for outbound responses. + if let Some(last_real) = messages.iter().rev().find(|m| m.source != "system") { + self.current_inbound = Some(last_real.clone()); + } + // Run agent turn with any image/audio attachments preserved let (result, skip_flag, replied_flag, _) = self .run_agent_turn( @@ -2243,12 +2250,19 @@ impl Channel { .clone() .map(|tool| tool.with_originating_channel(conversation_id.to_string())); - let routed_sender = RoutedSender::new( - self.response_tx.clone(), - self.current_inbound - .clone() - .unwrap_or_else(InboundMessage::empty), - ); + let current_inbound = self + .current_inbound + .clone() + .unwrap_or_else(InboundMessage::empty); + let routed_sender = RoutedSender::new(self.response_tx.clone(), current_inbound.clone()); + + // Extract Slack thread_ts from the current inbound message so cron + // delivery targets include the originating thread. + let slack_thread_ts = current_inbound + .metadata + .get("slack_thread_ts") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); if let Err(error) = crate::tools::add_channel_tools( &self.tool_server, @@ -2261,6 +2275,7 @@ impl Channel { send_agent_message_tool, allow_direct_reply, adapter.map(|s| s.to_string()), + slack_thread_ts.as_deref(), ) .await { @@ -2286,9 +2301,9 @@ impl Channel { .tool_server_handle(self.tool_server.clone()) .build(); - let _ = self - .send_routed(OutboundResponse::Status(crate::StatusUpdate::Thinking)) - .await; + self.send_routed(OutboundResponse::Status(crate::StatusUpdate::Thinking)) + .await + .ok(); // Inject attachments as a user message before the text prompt if !attachment_content.is_empty() { @@ -2654,9 +2669,9 @@ impl Channel { } // Ensure typing indicator is always cleaned up, even on error paths - let _ = self - .send_routed(OutboundResponse::Status(crate::StatusUpdate::StopTyping)) - .await; + self.send_routed(OutboundResponse::Status(crate::StatusUpdate::StopTyping)) + .await + .ok(); } /// Handle a process event (branch results, worker completions, status updates). diff --git a/src/messaging/slack.rs b/src/messaging/slack.rs index 8474810ee..d3ace875d 100644 --- a/src/messaging/slack.rs +++ b/src/messaging/slack.rs @@ -1124,7 +1124,13 @@ impl Messaging for SlackAdapter { async fn broadcast(&self, target: &str, response: OutboundResponse) -> crate::Result<()> { let session = self.session(); - let channel_id = if let Some(user_id_str) = target.strip_prefix("dm:") { + // Parse an optional thread target encoded as `#thread:` suffix. + let (bare_target, thread_ts) = match target.split_once("#thread:") { + Some((prefix, ts)) if !ts.is_empty() => (prefix, Some(SlackTs(ts.to_string()))), + _ => (target, None), + }; + + let channel_id = if let Some(user_id_str) = bare_target.strip_prefix("dm:") { let open_req = SlackApiConversationsOpenRequest::new() .with_users(vec![SlackUserId(user_id_str.to_string())]); let open_resp = session @@ -1133,16 +1139,17 @@ impl Messaging for SlackAdapter { .context("failed to open Slack DM conversation")?; open_resp.channel.id } else { - SlackChannelId(target.to_string()) + SlackChannelId(bare_target.to_string()) }; match response { OutboundResponse::Text(text) => { for chunk in split_message(&text, 12_000) { - let req = SlackApiChatPostMessageRequest::new( + let mut req = SlackApiChatPostMessageRequest::new( channel_id.clone(), markdown_content(chunk), ); + req = req.opt_thread_ts(thread_ts.clone()); session .chat_post_message(&req) .await @@ -1158,7 +1165,8 @@ impl Messaging for SlackAdapter { .with_text(text) .with_blocks(slack_blocks) }; - let req = SlackApiChatPostMessageRequest::new(channel_id.clone(), content); + let mut req = SlackApiChatPostMessageRequest::new(channel_id.clone(), content); + req = req.opt_thread_ts(thread_ts.clone()); session .chat_post_message(&req) .await diff --git a/src/tools.rs b/src/tools.rs index 11ee60585..f1e53257b 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -356,6 +356,7 @@ pub async fn add_channel_tools( send_agent_message_tool: Option, allow_direct_reply: bool, current_adapter: Option, + slack_thread_ts: Option<&str>, ) -> Result<(), rig::tool::server::ToolServerError> { let conversation_id = conversation_id.into(); @@ -432,7 +433,7 @@ pub async fn add_channel_tools( handle.add_tool(ReactTool::new(response_tx.clone())).await?; if let Some(cron_tool) = cron_tool { let cron_tool = cron_tool.with_default_delivery_target( - default_delivery_target_for_conversation(&conversation_id), + default_delivery_target_for_conversation(&conversation_id, slack_thread_ts), ); handle.add_tool(cron_tool).await?; } @@ -443,7 +444,10 @@ pub async fn add_channel_tools( Ok(()) } -fn default_delivery_target_for_conversation(conversation_id: &str) -> Option { +fn default_delivery_target_for_conversation( + conversation_id: &str, + slack_thread_ts: Option<&str>, +) -> Option { let parsed = crate::messaging::target::parse_delivery_target(conversation_id)?; match parsed.adapter.as_str() { // Cron channels can't receive broadcast delivery. @@ -452,6 +456,15 @@ fn default_delivery_target_for_conversation(conversation_id: &str) -> Option Some(format!("webchat:{conversation_id}")), + // For Slack, append the originating thread_ts so cron broadcasts land in + // the correct thread rather than posting top-level. + "slack" => { + let base = parsed.to_string(); + match slack_thread_ts { + Some(ts) => Some(format!("{base}#thread:{ts}")), + None => Some(base), + } + } _ => Some(parsed.to_string()), } } diff --git a/tests/context_dump.rs b/tests/context_dump.rs index d53b1f826..abf09b07d 100644 --- a/tests/context_dump.rs +++ b/tests/context_dump.rs @@ -262,6 +262,7 @@ async fn dump_channel_context() { None, true, None, + None, ) .await .expect("failed to add channel tools"); @@ -497,6 +498,7 @@ async fn dump_all_contexts() { None, true, None, + None, ) .await .expect("failed to add channel tools");