diff --git a/interface/src/api/client.ts b/interface/src/api/client.ts index 482d056d1..5fae281e6 100644 --- a/interface/src/api/client.ts +++ b/interface/src/api/client.ts @@ -109,6 +109,14 @@ export interface ToolCompletedEvent { result: string; } +export interface ReactionEvent { + type: "reaction"; + agent_id: string; + channel_id: string; + emoji: string; + message_id: string | null; +} + export type ApiEvent = | InboundMessageEvent | OutboundMessageEvent @@ -119,7 +127,8 @@ export type ApiEvent = | BranchStartedEvent | BranchCompletedEvent | ToolStartedEvent - | ToolCompletedEvent; + | ToolCompletedEvent + | ReactionEvent; async function fetchJson(path: string): Promise { const response = await fetch(`${API_BASE}${path}`); @@ -137,6 +146,7 @@ export interface TimelineMessage { sender_id: string | null; content: string; created_at: string; + reaction?: string | null; } export interface TimelineBranchRun { diff --git a/interface/src/components/WebChatPanel.tsx b/interface/src/components/WebChatPanel.tsx index c6899fc1f..8d1f25ca0 100644 --- a/interface/src/components/WebChatPanel.tsx +++ b/interface/src/components/WebChatPanel.tsx @@ -184,24 +184,33 @@ export function WebChatPanel({ agentId }: WebChatPanelProps) { )} - {timeline.map((item) => { - if (item.type !== "message") return null; - return ( -
- {item.role === "user" ? ( -
-
+ {timeline.map((item) => { + if (item.type !== "message") return null; + return ( +
+ {item.role === "user" ? ( +
+
+

{item.content}

+ {item.reaction && ( +
+ + {item.reaction} + +
+ )}
- ) : ( -
- {item.content} -
- )} -
- ); - })} +
+ ) : ( +
+ {item.content} +
+ )} +
+ ); + })} {/* Typing indicator */} {isTyping && } diff --git a/interface/src/hooks/useChannelLiveState.ts b/interface/src/hooks/useChannelLiveState.ts index 363f5044f..e86394a5c 100644 --- a/interface/src/hooks/useChannelLiveState.ts +++ b/interface/src/hooks/useChannelLiveState.ts @@ -5,6 +5,7 @@ import { type BranchStartedEvent, type InboundMessageEvent, type OutboundMessageEvent, + type ReactionEvent, type TimelineItem, type ToolCompletedEvent, type ToolStartedEvent, @@ -568,6 +569,28 @@ export function useChannelLiveState(channels: ChannelInfo[]) { } }, []); + const handleReaction = useCallback((data: unknown) => { + const event = data as Partial; + if (typeof event.channel_id !== "string" || typeof event.emoji !== "string") return; + const channelId = event.channel_id; + const emoji = event.emoji; + const messageId = event.message_id ?? null; + setLiveStates((prev) => { + const state = prev[channelId]; + if (!state) return prev; + const timeline = [...state.timeline]; + // Target by message_id when available, fall back to latest user message + for (let i = timeline.length - 1; i >= 0; i--) { + const item = timeline[i]; + if (item.type !== "message" || item.role !== "user") continue; + if (messageId && item.id !== messageId) continue; + timeline[i] = { ...item, reaction: emoji }; + break; + } + return { ...prev, [channelId]: { ...state, timeline } }; + }); + }, []); + const loadOlderMessages = useCallback((channelId: string) => { setLiveStates((prev) => { const state = prev[channelId]; @@ -621,6 +644,7 @@ export function useChannelLiveState(channels: ChannelInfo[]) { branch_completed: handleBranchCompleted, tool_started: handleToolStarted, tool_completed: handleToolCompleted, + reaction: handleReaction, }; return { liveStates, handlers, syncStatusSnapshot, loadOlderMessages }; diff --git a/migrations/20260228000001_message_reactions.sql b/migrations/20260228000001_message_reactions.sql new file mode 100644 index 000000000..9a81d3384 --- /dev/null +++ b/migrations/20260228000001_message_reactions.sql @@ -0,0 +1 @@ +ALTER TABLE conversation_messages ADD COLUMN reaction TEXT; diff --git a/src/api/state.rs b/src/api/state.rs index 20a6c2499..c1adddd4d 100644 --- a/src/api/state.rs +++ b/src/api/state.rs @@ -215,6 +215,14 @@ pub enum ApiEvent { /// "created", "updated", or "deleted". action: String, }, + /// An emoji reaction added to a message. + Reaction { + agent_id: String, + channel_id: String, + emoji: String, + /// The conversation_messages ID the reaction was attached to. + message_id: Option, + }, } impl ApiState { diff --git a/src/api/system.rs b/src/api/system.rs index cf79b90a2..14b29e932 100644 --- a/src/api/system.rs +++ b/src/api/system.rs @@ -96,6 +96,7 @@ pub(super) async fn events_sse( ApiEvent::AgentMessageSent { .. } => "agent_message_sent", ApiEvent::AgentMessageReceived { .. } => "agent_message_received", ApiEvent::TaskUpdated { .. } => "task_updated", + ApiEvent::Reaction { .. } => "reaction", }; yield Ok(axum::response::sse::Event::default() .event(event_type) diff --git a/src/conversation/history.rs b/src/conversation/history.rs index 13d80dcf9..ac9cd3a5d 100644 --- a/src/conversation/history.rs +++ b/src/conversation/history.rs @@ -104,6 +104,53 @@ impl ConversationLogger { }); } + /// Set a reaction emoji on the most recent user message in a channel. + /// + /// Returns the message ID the reaction was attached to, or `None` if no + /// matching user message was found. Retries a few times to handle the race + /// where the user message INSERT (fire-and-forget) hasn't landed yet. + pub async fn log_reaction(&self, channel_id: &ChannelId, emoji: &str) -> Option { + let channel_id_str = channel_id.to_string(); + + for attempt in 0..5u32 { + // Find the target message first so we can return its ID. + let target_id: Option = sqlx::query_scalar( + "SELECT id FROM conversation_messages \ + WHERE channel_id = ? AND role = 'user' \ + ORDER BY created_at DESC LIMIT 1", + ) + .bind(&channel_id_str) + .fetch_optional(&self.pool) + .await + .ok() + .flatten(); + + let Some(message_id) = target_id else { + if attempt < 4 { + tokio::time::sleep(std::time::Duration::from_millis(25)).await; + continue; + } + tracing::warn!("no user message found for reaction"); + return None; + }; + + match sqlx::query("UPDATE conversation_messages SET reaction = ? WHERE id = ?") + .bind(emoji) + .bind(&message_id) + .execute(&self.pool) + .await + { + Ok(_) => return Some(message_id), + Err(error) => { + tracing::warn!(%error, "failed to persist reaction"); + return None; + } + } + } + + None + } + /// Load recent messages for a channel (oldest first). pub async fn load_recent( &self, @@ -196,6 +243,8 @@ pub enum TimelineItem { sender_id: Option, content: String, created_at: String, + #[serde(skip_serializing_if = "Option::is_none")] + reaction: Option, }, BranchRun { id: String, @@ -360,17 +409,17 @@ impl ProcessRunLogger { let query_str = format!( "SELECT * FROM ( \ - SELECT 'message' AS item_type, id, role, sender_name, sender_id, content, \ + SELECT 'message' AS item_type, id, role, sender_name, sender_id, content, reaction, \ NULL AS description, NULL AS conclusion, NULL AS task, NULL AS result, NULL AS status, \ created_at AS timestamp, NULL AS completed_at \ FROM conversation_messages WHERE channel_id = ?1 \ UNION ALL \ - SELECT 'branch_run' AS item_type, id, NULL, NULL, NULL, NULL, \ + SELECT 'branch_run' AS item_type, id, NULL, NULL, NULL, NULL, NULL, \ description, conclusion, NULL, NULL, NULL, \ started_at AS timestamp, completed_at \ FROM branch_runs WHERE channel_id = ?1 \ UNION ALL \ - SELECT 'worker_run' AS item_type, id, NULL, NULL, NULL, NULL, \ + SELECT 'worker_run' AS item_type, id, NULL, NULL, NULL, NULL, NULL, \ NULL, NULL, task, result, status, \ started_at AS timestamp, completed_at \ FROM worker_runs WHERE channel_id = ?1 \ @@ -403,6 +452,7 @@ impl ProcessRunLogger { .try_get::, _>("timestamp") .map(|t| t.to_rfc3339()) .unwrap_or_default(), + reaction: row.try_get("reaction").ok().flatten(), }), "branch_run" => Some(TimelineItem::BranchRun { id: row.try_get("id").unwrap_or_default(), diff --git a/src/lib.rs b/src/lib.rs index 1bb8ed36a..24ae75d45 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -399,7 +399,8 @@ pub enum OutboundResponse { caption: Option, }, /// Add a reaction emoji to the triggering message. - Reaction(String), + /// The optional message_id identifies the specific message being reacted to. + Reaction(String, Option), /// Remove a reaction emoji from the triggering message. /// No-op on platforms that don't support reaction removal. RemoveReaction(String), diff --git a/src/main.rs b/src/main.rs index 8b44d8369..a50a3db14 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1691,6 +1691,14 @@ async fn run( is_typing: false, }).ok(); } + spacebot::OutboundResponse::Reaction(emoji, message_id) => { + api_event_tx.send(spacebot::api::ApiEvent::Reaction { + agent_id: sse_agent_id.clone(), + channel_id: sse_channel_id.clone(), + emoji: emoji.clone(), + message_id: message_id.clone(), + }).ok(); + } _ => {} } diff --git a/src/messaging/discord.rs b/src/messaging/discord.rs index 7f2363450..69cb3b266 100644 --- a/src/messaging/discord.rs +++ b/src/messaging/discord.rs @@ -290,7 +290,7 @@ impl Messaging for DiscordAdapter { .await .context("failed to send file attachment")?; } - OutboundResponse::Reaction(emoji) => { + OutboundResponse::Reaction(emoji, _) => { let message_id = message .metadata .get("discord_message_id") diff --git a/src/messaging/email.rs b/src/messaging/email.rs index 1dc0d9406..66c299170 100644 --- a/src/messaging/email.rs +++ b/src/messaging/email.rs @@ -423,7 +423,7 @@ impl Messaging for EmailAdapter { ) .await?; } - OutboundResponse::Reaction(_) + OutboundResponse::Reaction(..) | OutboundResponse::RemoveReaction(_) | OutboundResponse::Status(_) => {} OutboundResponse::Ephemeral { text, .. } => { @@ -506,7 +506,7 @@ impl Messaging for EmailAdapter { self.send_email(&recipient, "Spacebot message", text, None, Vec::new(), None) .await?; } - OutboundResponse::Reaction(_) + OutboundResponse::Reaction(..) | OutboundResponse::RemoveReaction(_) | OutboundResponse::Status(_) | OutboundResponse::StreamStart diff --git a/src/messaging/slack.rs b/src/messaging/slack.rs index 49dc17287..652bdaeee 100644 --- a/src/messaging/slack.rs +++ b/src/messaging/slack.rs @@ -933,7 +933,7 @@ impl Messaging for SlackAdapter { .context("failed to complete slack file upload")?; } - OutboundResponse::Reaction(emoji) => { + OutboundResponse::Reaction(emoji, _) => { let ts = extract_message_ts(message).context("missing slack_message_ts for reaction")?; let req = SlackApiReactionsAddRequest::new( @@ -1544,7 +1544,7 @@ fn variant_name(response: &OutboundResponse) -> &'static str { OutboundResponse::Text(_) => "Text", OutboundResponse::ThreadReply { .. } => "ThreadReply", OutboundResponse::File { .. } => "File", - OutboundResponse::Reaction(_) => "Reaction", + OutboundResponse::Reaction(..) => "Reaction", OutboundResponse::RemoveReaction(_) => "RemoveReaction", OutboundResponse::Ephemeral { .. } => "Ephemeral", OutboundResponse::RichMessage { .. } => "RichMessage", diff --git a/src/messaging/telegram.rs b/src/messaging/telegram.rs index c6060b03e..472a202c9 100644 --- a/src/messaging/telegram.rs +++ b/src/messaging/telegram.rs @@ -403,7 +403,7 @@ impl Messaging for TelegramAdapter { } } } - OutboundResponse::Reaction(emoji) => { + OutboundResponse::Reaction(emoji, _) => { let message_id = self.extract_message_id(message)?; let reaction = ReactionType::Emoji { diff --git a/src/messaging/twitch.rs b/src/messaging/twitch.rs index 2f824eac2..3b8d311b0 100644 --- a/src/messaging/twitch.rs +++ b/src/messaging/twitch.rs @@ -405,7 +405,7 @@ impl Messaging for TwitchAdapter { } OutboundResponse::StreamEnd => {} // Reactions, status updates, and Slack-specific variants aren't meaningful in Twitch chat - OutboundResponse::Reaction(_) + OutboundResponse::Reaction(..) | OutboundResponse::RemoveReaction(_) | OutboundResponse::Status(_) => {} OutboundResponse::Ephemeral { text, .. } => { diff --git a/src/messaging/webhook.rs b/src/messaging/webhook.rs index 1654bc1d7..b742ba202 100644 --- a/src/messaging/webhook.rs +++ b/src/messaging/webhook.rs @@ -195,7 +195,7 @@ impl Messaging for WebhookAdapter { caption: None, }, // Reactions, status updates, and remove-reaction aren't meaningful over webhook - OutboundResponse::Reaction(_) + OutboundResponse::Reaction(..) | OutboundResponse::RemoveReaction(_) | OutboundResponse::Status(_) => return Ok(()), // Slack-specific rich variants — fall back to plain text diff --git a/src/tools.rs b/src/tools.rs index efc37294a..63f0c0cbd 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -289,11 +289,17 @@ pub async fn add_channel_tools( state.deps.runtime_config.workspace_dir.clone(), )) .await?; - handle.add_tool(CancelTool::new(state)).await?; handle .add_tool(SkipTool::new(skip_flag.clone(), response_tx.clone())) .await?; - handle.add_tool(ReactTool::new(response_tx.clone())).await?; + handle + .add_tool(ReactTool::new( + response_tx.clone(), + state.conversation_logger.clone(), + state.channel_id.clone(), + )) + .await?; + handle.add_tool(CancelTool::new(state)).await?; if let Some(cron_tool) = cron_tool { let cron_tool = cron_tool.with_default_delivery_target( default_delivery_target_for_conversation(&conversation_id), diff --git a/src/tools/react.rs b/src/tools/react.rs index 6679d5edf..6101f05e2 100644 --- a/src/tools/react.rs +++ b/src/tools/react.rs @@ -1,6 +1,7 @@ //! React tool for adding emoji reactions to messages (channel only). -use crate::OutboundResponse; +use crate::conversation::ConversationLogger; +use crate::{ChannelId, OutboundResponse}; use rig::completion::ToolDefinition; use rig::tool::Tool; use schemars::JsonSchema; @@ -11,11 +12,21 @@ use tokio::sync::mpsc; #[derive(Debug, Clone)] pub struct ReactTool { response_tx: mpsc::Sender, + conversation_logger: ConversationLogger, + channel_id: ChannelId, } impl ReactTool { - pub fn new(response_tx: mpsc::Sender) -> Self { - Self { response_tx } + pub fn new( + response_tx: mpsc::Sender, + conversation_logger: ConversationLogger, + channel_id: ChannelId, + ) -> Self { + Self { + response_tx, + conversation_logger, + channel_id, + } } } @@ -65,8 +76,13 @@ impl Tool for ReactTool { async fn call(&self, args: Self::Args) -> Result { tracing::info!(emoji = %args.emoji, "react tool called"); + let message_id = self + .conversation_logger + .log_reaction(&self.channel_id, &args.emoji) + .await; + self.response_tx - .send(OutboundResponse::Reaction(args.emoji.clone())) + .send(OutboundResponse::Reaction(args.emoji.clone(), message_id)) .await .map_err(|error| ReactError(format!("failed to send reaction: {error}")))?;