Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 73 additions & 20 deletions src/agent/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::hooks::SpacebotHook;
use crate::llm::SpacebotModel;
use crate::{
AgentDeps, BranchId, ChannelId, InboundMessage, OutboundResponse, ProcessEvent, ProcessId,
ProcessType, WorkerId,
ProcessType, RoutedResponse, RoutedSender, WorkerId,
};
use rig::agent::AgentBuilder;
use rig::completion::CompletionModel;
Expand Down Expand Up @@ -329,9 +329,12 @@ pub struct Channel {
/// Event receiver for process events.
pub event_rx: broadcast::Receiver<ProcessEvent>,
/// Outbound response sender for the messaging layer.
pub response_tx: mpsc::Sender<OutboundResponse>,
pub response_tx: mpsc::Sender<RoutedResponse>,
/// Self-sender for re-triggering the channel after background process completion.
pub self_tx: mpsc::Sender<InboundMessage>,
/// The inbound message currently being processed. Used to pair outbound
/// responses with the correct platform routing metadata (e.g. Slack thread_ts).
current_inbound: Option<InboundMessage>,
/// Conversation ID from the first message (for synthetic re-trigger messages).
pub conversation_id: Option<String>,
/// Adapter source captured from the first non-system message.
Expand Down Expand Up @@ -405,7 +408,7 @@ impl Channel {
pub fn new(
id: ChannelId,
deps: AgentDeps,
response_tx: mpsc::Sender<OutboundResponse>,
response_tx: mpsc::Sender<RoutedResponse>,
event_rx: broadcast::Receiver<ProcessEvent>,
screenshot_dir: std::path::PathBuf,
logs_dir: std::path::PathBuf,
Expand Down Expand Up @@ -486,6 +489,7 @@ impl Channel {
event_rx,
response_tx,
self_tx,
current_inbound: None,
conversation_id: None,
source_adapter: None,
conversation_context: None,
Expand Down Expand Up @@ -778,12 +782,35 @@ impl Channel {
(invoked_by_command, invoked_by_mention, invoked_by_reply)
}

/// Send a routed response paired with the current inbound message.
///
/// Falls back to a bare response with a placeholder target if no inbound
/// message is set (should not happen during normal turn processing).
async fn send_routed(
&self,
response: OutboundResponse,
) -> std::result::Result<(), mpsc::error::SendError<RoutedResponse>> {
let routed = match &self.current_inbound {
Some(target) => RoutedResponse {
response,
target: target.clone(),
},
None => {
tracing::warn!(
channel_id = %self.id,
"sending response without a current inbound message"
);
RoutedResponse {
response,
target: InboundMessage::empty(),
}
}
};
self.response_tx.send(routed).await
}

async fn send_builtin_text(&mut self, text: String, log_label: &str) {
match self
.response_tx
.send(OutboundResponse::Text(text.clone()))
.await
{
match self.send_routed(OutboundResponse::Text(text.clone())).await {
Ok(()) => {
#[cfg(feature = "metrics")]
{
Expand Down Expand Up @@ -1422,6 +1449,13 @@ impl Channel {
*reply_target = messages.iter().rev().find_map(extract_message_id);
}

// Pin the inbound routing target from the last non-system message in the
// batch so the RoutedSender (and send_routed) carry the correct platform
// metadata (e.g. Slack thread_ts) for outbound responses.
if let Some(last_real) = messages.iter().rev().find(|m| m.source != "system") {
self.current_inbound = Some(last_real.clone());
}

// Run agent turn with any image/audio attachments preserved
let (result, skip_flag, replied_flag, _) = self
.run_agent_turn(
Expand Down Expand Up @@ -1528,6 +1562,13 @@ impl Channel {
// Apply runtime-config updates immediately without requiring a restart.
self.sync_listen_only_mode_from_runtime();

// Track the inbound message that triggered this turn so outbound
// responses carry the correct routing metadata (e.g. Slack thread_ts).
// System retrigger messages keep the previous inbound target.
if message.source != "system" {
self.current_inbound = Some(message.clone());
}

tracing::info!(
channel_id = %self.id,
message_id = %message.id,
Expand Down Expand Up @@ -2209,17 +2250,32 @@ impl Channel {
.clone()
.map(|tool| tool.with_originating_channel(conversation_id.to_string()));

let current_inbound = self
.current_inbound
.clone()
.unwrap_or_else(InboundMessage::empty);
let routed_sender = RoutedSender::new(self.response_tx.clone(), current_inbound.clone());

// Extract Slack thread_ts from the current inbound message so cron
// delivery targets include the originating thread.
let slack_thread_ts = current_inbound
.metadata
.get("slack_thread_ts")
.and_then(|v| v.as_str())
.map(|s| s.to_string());

if let Err(error) = crate::tools::add_channel_tools(
&self.tool_server,
self.state.clone(),
self.response_tx.clone(),
routed_sender,
conversation_id,
skip_flag.clone(),
replied_flag.clone(),
self.deps.cron_tool.clone(),
send_agent_message_tool,
allow_direct_reply,
adapter.map(|s| s.to_string()),
slack_thread_ts.as_deref(),
)
.await
{
Expand All @@ -2245,10 +2301,9 @@ impl Channel {
.tool_server_handle(self.tool_server.clone())
.build();

let _ = self
.response_tx
.send(OutboundResponse::Status(crate::StatusUpdate::Thinking))
.await;
self.send_routed(OutboundResponse::Status(crate::StatusUpdate::Thinking))
.await
.ok();

// Inject attachments as a user message before the text prompt
if !attachment_content.is_empty() {
Expand Down Expand Up @@ -2347,7 +2402,7 @@ impl Channel {

/// Send outbound text and record send metrics.
async fn send_outbound_text(&self, text: String, error_context: &str) {
match self.response_tx.send(OutboundResponse::Text(text)).await {
match self.send_routed(OutboundResponse::Text(text)).await {
Ok(()) => {
#[cfg(feature = "metrics")]
{
Expand Down Expand Up @@ -2606,19 +2661,17 @@ impl Channel {
.inc();
// Send error to user so they know something went wrong
let error_msg = format!("I encountered an error: {}", error);
self.response_tx
.send(OutboundResponse::Text(error_msg))
self.send_routed(OutboundResponse::Text(error_msg))
.await
.ok();
tracing::error!(channel_id = %self.id, %error, "channel LLM call failed");
}
}

// Ensure typing indicator is always cleaned up, even on error paths
let _ = self
.response_tx
.send(OutboundResponse::Status(crate::StatusUpdate::StopTyping))
.await;
self.send_routed(OutboundResponse::Status(crate::StatusUpdate::StopTyping))
.await
.ok();
}

/// Handle a process event (branch results, worker completions, status updates).
Expand Down
14 changes: 10 additions & 4 deletions src/cron/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::cron::store::CronStore;
use crate::error::Result;
use crate::messaging::MessagingManager;
use crate::messaging::target::{BroadcastTarget, parse_delivery_target};
use crate::{AgentDeps, InboundMessage, MessageContent, OutboundResponse};
use crate::{AgentDeps, InboundMessage, MessageContent, OutboundResponse, RoutedResponse};
use chrono::Timelike;
use chrono_tz::Tz;
use cron::Schedule;
Expand Down Expand Up @@ -849,7 +849,7 @@ async fn run_cron_job(job: &CronJob, context: &CronContext) -> Result<()> {
let channel_id: crate::ChannelId = Arc::from(format!("cron:{}", job.id).as_str());

// Create the outbound response channel to collect whatever the channel produces
let (response_tx, mut response_rx) = tokio::sync::mpsc::channel::<OutboundResponse>(32);
let (response_tx, mut response_rx) = tokio::sync::mpsc::channel::<RoutedResponse>(32);

// Subscribe to the agent's event bus (the channel needs this for branch/worker events)
let event_rx = context.deps.event_tx.subscribe();
Expand Down Expand Up @@ -909,10 +909,16 @@ async fn run_cron_job(job: &CronJob, context: &CronContext) -> Result<()> {
break;
}
match tokio::time::timeout(remaining, response_rx.recv()).await {
Ok(Some(OutboundResponse::Text(text))) => {
Ok(Some(RoutedResponse {
response: OutboundResponse::Text(text),
..
})) => {
collected_text.push(text);
}
Ok(Some(OutboundResponse::RichMessage { text, .. })) => {
Ok(Some(RoutedResponse {
response: OutboundResponse::RichMessage { text, .. },
..
})) => {
collected_text.push(text);
}
Ok(Some(_)) => {}
Expand Down
57 changes: 57 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub use error::{Error, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc;

/// Signal from the API to the main event loop to trigger provider setup.
#[derive(Debug)]
Expand Down Expand Up @@ -473,6 +474,23 @@ pub struct InboundMessage {
}

impl InboundMessage {
/// Construct an empty placeholder message. Used as a fallback when no
/// inbound context is available for outbound routing.
pub fn empty() -> Self {
Self {
id: String::new(),
source: String::new(),
adapter: None,
conversation_id: String::new(),
sender_id: String::new(),
agent_id: None,
content: MessageContent::Text(String::new()),
timestamp: chrono::Utc::now(),
metadata: HashMap::new(),
formatted_author: None,
}
}

/// Runtime adapter key for routing outbound operations.
///
/// Falls back to the platform source for backward compatibility.
Expand Down Expand Up @@ -571,6 +589,45 @@ pub struct Attachment {
pub auth_header: Option<String>,
}

/// An outbound response paired with the inbound message that triggered it.
///
/// This ensures outbound routing targets the correct thread/conversation even
/// when multiple threads share the same channel (e.g. Slack threads within a
/// single channel). The paired `InboundMessage` carries the platform metadata
/// (thread_ts, message_ts, etc.) needed to route the response correctly.
#[derive(Debug, Clone)]
pub struct RoutedResponse {
pub response: OutboundResponse,
pub target: InboundMessage,
}

/// A sender that automatically pairs outbound responses with a captured
/// inbound message target. Used by channel tools (reply, react, etc.) so
/// they don't need direct access to the triggering `InboundMessage`.
#[derive(Debug, Clone)]
pub struct RoutedSender {
inner: mpsc::Sender<RoutedResponse>,
target: InboundMessage,
}

impl RoutedSender {
pub fn new(inner: mpsc::Sender<RoutedResponse>, target: InboundMessage) -> Self {
Self { inner, target }
}

pub async fn send(
&self,
response: OutboundResponse,
) -> std::result::Result<(), mpsc::error::SendError<RoutedResponse>> {
self.inner
.send(RoutedResponse {
response,
target: self.target.clone(),
})
.await
}
}

/// Outbound response to messaging platforms.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
Expand Down
Loading
Loading