From 4feb06331c65783a8976fdf1eb3f3b368ed0a564 Mon Sep 17 00:00:00 2001 From: James Pine Date: Sat, 28 Feb 2026 20:23:02 -0800 Subject: [PATCH 1/2] feat: add reaction support for webchat Reactions from the react tool were silently dropped in webchat because the adapter is a no-op and the SSE bus had no reaction event type. Backend: add ApiEvent::Reaction variant, emit it in the outbound routing loop, persist reactions via a new 'reaction' column on conversation_messages, and include them in the timeline query so they survive page refresh. Frontend: add ReactionEvent SSE handler that attaches the emoji to the most recent user message, and render it as a chip overlapping the message bubble. --- interface/src/api/client.ts | 11 +++++- interface/src/components/WebChatPanel.tsx | 39 ++++++++++++------- interface/src/hooks/useChannelLiveState.ts | 20 ++++++++++ .../20260228000001_message_reactions.sql | 1 + src/api/state.rs | 6 +++ src/api/system.rs | 1 + src/conversation/history.rs | 34 ++++++++++++++-- src/main.rs | 7 ++++ src/tools.rs | 10 ++++- src/tools/react.rs | 20 ++++++++-- 10 files changed, 125 insertions(+), 24 deletions(-) create mode 100644 migrations/20260228000001_message_reactions.sql diff --git a/interface/src/api/client.ts b/interface/src/api/client.ts index 620efe03d..963c5d6b2 100644 --- a/interface/src/api/client.ts +++ b/interface/src/api/client.ts @@ -109,6 +109,13 @@ export interface ToolCompletedEvent { result: string; } +export interface ReactionEvent { + type: "reaction"; + agent_id: string; + channel_id: string; + emoji: string; +} + export type ApiEvent = | InboundMessageEvent | OutboundMessageEvent @@ -119,7 +126,8 @@ export type ApiEvent = | BranchStartedEvent | BranchCompletedEvent | ToolStartedEvent - | ToolCompletedEvent; + | ToolCompletedEvent + | ReactionEvent; async function fetchJson(path: string): Promise { const response = await fetch(`${API_BASE}${path}`); @@ -137,6 +145,7 @@ export interface TimelineMessage { sender_id: string | null; content: string; created_at: string; + reaction?: string | null; } export interface TimelineBranchRun { diff --git a/interface/src/components/WebChatPanel.tsx b/interface/src/components/WebChatPanel.tsx index c6899fc1f..8d1f25ca0 100644 --- a/interface/src/components/WebChatPanel.tsx +++ b/interface/src/components/WebChatPanel.tsx @@ -184,24 +184,33 @@ export function WebChatPanel({ agentId }: WebChatPanelProps) { )} - {timeline.map((item) => { - if (item.type !== "message") return null; - return ( -
- {item.role === "user" ? ( -
-
+ {timeline.map((item) => { + if (item.type !== "message") return null; + return ( +
+ {item.role === "user" ? ( +
+
+

{item.content}

+ {item.reaction && ( +
+ + {item.reaction} + +
+ )}
- ) : ( -
- {item.content} -
- )} -
- ); - })} +
+ ) : ( +
+ {item.content} +
+ )} +
+ ); + })} {/* Typing indicator */} {isTyping && } diff --git a/interface/src/hooks/useChannelLiveState.ts b/interface/src/hooks/useChannelLiveState.ts index 363f5044f..25e4a0a12 100644 --- a/interface/src/hooks/useChannelLiveState.ts +++ b/interface/src/hooks/useChannelLiveState.ts @@ -5,6 +5,7 @@ import { type BranchStartedEvent, type InboundMessageEvent, type OutboundMessageEvent, + type ReactionEvent, type TimelineItem, type ToolCompletedEvent, type ToolStartedEvent, @@ -568,6 +569,24 @@ export function useChannelLiveState(channels: ChannelInfo[]) { } }, []); + const handleReaction = useCallback((data: unknown) => { + const event = data as ReactionEvent; + setLiveStates((prev) => { + const state = prev[event.channel_id]; + if (!state) return prev; + // Find the most recent user message to attach the reaction to + const timeline = [...state.timeline]; + for (let i = timeline.length - 1; i >= 0; i--) { + const item = timeline[i]; + if (item.type === "message" && item.role === "user") { + timeline[i] = { ...item, reaction: event.emoji }; + break; + } + } + return { ...prev, [event.channel_id]: { ...state, timeline } }; + }); + }, []); + const loadOlderMessages = useCallback((channelId: string) => { setLiveStates((prev) => { const state = prev[channelId]; @@ -621,6 +640,7 @@ export function useChannelLiveState(channels: ChannelInfo[]) { branch_completed: handleBranchCompleted, tool_started: handleToolStarted, tool_completed: handleToolCompleted, + reaction: handleReaction, }; return { liveStates, handlers, syncStatusSnapshot, loadOlderMessages }; diff --git a/migrations/20260228000001_message_reactions.sql b/migrations/20260228000001_message_reactions.sql new file mode 100644 index 000000000..9a81d3384 --- /dev/null +++ b/migrations/20260228000001_message_reactions.sql @@ -0,0 +1 @@ +ALTER TABLE conversation_messages ADD COLUMN reaction TEXT; diff --git a/src/api/state.rs b/src/api/state.rs index 75f00d821..9a9a03c4f 100644 --- a/src/api/state.rs +++ b/src/api/state.rs @@ -213,6 +213,12 @@ pub enum ApiEvent { /// "created", "updated", or "deleted". action: String, }, + /// An emoji reaction added to the latest message. + Reaction { + agent_id: String, + channel_id: String, + emoji: String, + }, } impl ApiState { diff --git a/src/api/system.rs b/src/api/system.rs index cf79b90a2..14b29e932 100644 --- a/src/api/system.rs +++ b/src/api/system.rs @@ -96,6 +96,7 @@ pub(super) async fn events_sse( ApiEvent::AgentMessageSent { .. } => "agent_message_sent", ApiEvent::AgentMessageReceived { .. } => "agent_message_received", ApiEvent::TaskUpdated { .. } => "task_updated", + ApiEvent::Reaction { .. } => "reaction", }; yield Ok(axum::response::sse::Event::default() .event(event_type) diff --git a/src/conversation/history.rs b/src/conversation/history.rs index 13d80dcf9..2bdc5ff05 100644 --- a/src/conversation/history.rs +++ b/src/conversation/history.rs @@ -104,6 +104,31 @@ impl ConversationLogger { }); } + /// Set a reaction emoji on the most recent user message in a channel. Fire-and-forget. + pub fn log_reaction(&self, channel_id: &ChannelId, emoji: &str) { + let pool = self.pool.clone(); + let channel_id = channel_id.to_string(); + let emoji = emoji.to_string(); + + tokio::spawn(async move { + if let Err(error) = sqlx::query( + "UPDATE conversation_messages SET reaction = ? \ + WHERE id = ( \ + SELECT id FROM conversation_messages \ + WHERE channel_id = ? AND role = 'user' \ + ORDER BY created_at DESC LIMIT 1 \ + )", + ) + .bind(&emoji) + .bind(&channel_id) + .execute(&pool) + .await + { + tracing::warn!(%error, "failed to persist reaction"); + } + }); + } + /// Load recent messages for a channel (oldest first). pub async fn load_recent( &self, @@ -196,6 +221,8 @@ pub enum TimelineItem { sender_id: Option, content: String, created_at: String, + #[serde(skip_serializing_if = "Option::is_none")] + reaction: Option, }, BranchRun { id: String, @@ -360,17 +387,17 @@ impl ProcessRunLogger { let query_str = format!( "SELECT * FROM ( \ - SELECT 'message' AS item_type, id, role, sender_name, sender_id, content, \ + SELECT 'message' AS item_type, id, role, sender_name, sender_id, content, reaction, \ NULL AS description, NULL AS conclusion, NULL AS task, NULL AS result, NULL AS status, \ created_at AS timestamp, NULL AS completed_at \ FROM conversation_messages WHERE channel_id = ?1 \ UNION ALL \ - SELECT 'branch_run' AS item_type, id, NULL, NULL, NULL, NULL, \ + SELECT 'branch_run' AS item_type, id, NULL, NULL, NULL, NULL, NULL, \ description, conclusion, NULL, NULL, NULL, \ started_at AS timestamp, completed_at \ FROM branch_runs WHERE channel_id = ?1 \ UNION ALL \ - SELECT 'worker_run' AS item_type, id, NULL, NULL, NULL, NULL, \ + SELECT 'worker_run' AS item_type, id, NULL, NULL, NULL, NULL, NULL, \ NULL, NULL, task, result, status, \ started_at AS timestamp, completed_at \ FROM worker_runs WHERE channel_id = ?1 \ @@ -403,6 +430,7 @@ impl ProcessRunLogger { .try_get::, _>("timestamp") .map(|t| t.to_rfc3339()) .unwrap_or_default(), + reaction: row.try_get("reaction").ok().flatten(), }), "branch_run" => Some(TimelineItem::BranchRun { id: row.try_get("id").unwrap_or_default(), diff --git a/src/main.rs b/src/main.rs index aa8861c4d..04e4be060 100644 --- a/src/main.rs +++ b/src/main.rs @@ -972,6 +972,13 @@ async fn run( is_typing: false, }).ok(); } + spacebot::OutboundResponse::Reaction(emoji) => { + api_event_tx.send(spacebot::api::ApiEvent::Reaction { + agent_id: sse_agent_id.clone(), + channel_id: sse_channel_id.clone(), + emoji: emoji.clone(), + }).ok(); + } _ => {} } diff --git a/src/tools.rs b/src/tools.rs index ef6061529..030e65afe 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -287,11 +287,17 @@ pub async fn add_channel_tools( state.deps.runtime_config.workspace_dir.clone(), )) .await?; - handle.add_tool(CancelTool::new(state)).await?; handle .add_tool(SkipTool::new(skip_flag.clone(), response_tx.clone())) .await?; - handle.add_tool(ReactTool::new(response_tx.clone())).await?; + handle + .add_tool(ReactTool::new( + response_tx.clone(), + state.conversation_logger.clone(), + state.channel_id.clone(), + )) + .await?; + handle.add_tool(CancelTool::new(state)).await?; if let Some(cron_tool) = cron_tool { let cron_tool = cron_tool.with_default_delivery_target( default_delivery_target_for_conversation(&conversation_id), diff --git a/src/tools/react.rs b/src/tools/react.rs index 6679d5edf..929e41d04 100644 --- a/src/tools/react.rs +++ b/src/tools/react.rs @@ -1,6 +1,7 @@ //! React tool for adding emoji reactions to messages (channel only). -use crate::OutboundResponse; +use crate::conversation::ConversationLogger; +use crate::{ChannelId, OutboundResponse}; use rig::completion::ToolDefinition; use rig::tool::Tool; use schemars::JsonSchema; @@ -11,11 +12,21 @@ use tokio::sync::mpsc; #[derive(Debug, Clone)] pub struct ReactTool { response_tx: mpsc::Sender, + conversation_logger: ConversationLogger, + channel_id: ChannelId, } impl ReactTool { - pub fn new(response_tx: mpsc::Sender) -> Self { - Self { response_tx } + pub fn new( + response_tx: mpsc::Sender, + conversation_logger: ConversationLogger, + channel_id: ChannelId, + ) -> Self { + Self { + response_tx, + conversation_logger, + channel_id, + } } } @@ -65,6 +76,9 @@ impl Tool for ReactTool { async fn call(&self, args: Self::Args) -> Result { tracing::info!(emoji = %args.emoji, "react tool called"); + self.conversation_logger + .log_reaction(&self.channel_id, &args.emoji); + self.response_tx .send(OutboundResponse::Reaction(args.emoji.clone())) .await From 5602959bbfe7b3955d3e17479d5460032986ee80 Mon Sep 17 00:00:00 2001 From: James Pine Date: Sat, 28 Feb 2026 20:54:20 -0800 Subject: [PATCH 2/2] fix: add message_id to reactions, retry persistence race, validate SSE fields Address PR review feedback: - Thread message_id through OutboundResponse::Reaction and ApiEvent::Reaction so the frontend can target the exact message instead of scanning backwards (coderabbitai) - Make log_reaction async with a retry loop (5 attempts, 25ms backoff) to handle the race where the fire-and-forget user message INSERT hasn't landed yet (tembo) - Guard the frontend SSE reaction handler against missing channel_id/emoji fields before mutating state (tembo) --- interface/src/api/client.ts | 1 + interface/src/hooks/useChannelLiveState.ts | 20 +++++--- src/api/state.rs | 4 +- src/conversation/history.rs | 60 +++++++++++++++------- src/lib.rs | 3 +- src/main.rs | 3 +- src/messaging/discord.rs | 2 +- src/messaging/email.rs | 4 +- src/messaging/slack.rs | 4 +- src/messaging/telegram.rs | 2 +- src/messaging/twitch.rs | 2 +- src/messaging/webhook.rs | 2 +- src/tools/react.rs | 8 +-- 13 files changed, 74 insertions(+), 41 deletions(-) diff --git a/interface/src/api/client.ts b/interface/src/api/client.ts index 963c5d6b2..28e222c48 100644 --- a/interface/src/api/client.ts +++ b/interface/src/api/client.ts @@ -114,6 +114,7 @@ export interface ReactionEvent { agent_id: string; channel_id: string; emoji: string; + message_id: string | null; } export type ApiEvent = diff --git a/interface/src/hooks/useChannelLiveState.ts b/interface/src/hooks/useChannelLiveState.ts index 25e4a0a12..e86394a5c 100644 --- a/interface/src/hooks/useChannelLiveState.ts +++ b/interface/src/hooks/useChannelLiveState.ts @@ -570,20 +570,24 @@ export function useChannelLiveState(channels: ChannelInfo[]) { }, []); const handleReaction = useCallback((data: unknown) => { - const event = data as ReactionEvent; + const event = data as Partial; + if (typeof event.channel_id !== "string" || typeof event.emoji !== "string") return; + const channelId = event.channel_id; + const emoji = event.emoji; + const messageId = event.message_id ?? null; setLiveStates((prev) => { - const state = prev[event.channel_id]; + const state = prev[channelId]; if (!state) return prev; - // Find the most recent user message to attach the reaction to const timeline = [...state.timeline]; + // Target by message_id when available, fall back to latest user message for (let i = timeline.length - 1; i >= 0; i--) { const item = timeline[i]; - if (item.type === "message" && item.role === "user") { - timeline[i] = { ...item, reaction: event.emoji }; - break; - } + if (item.type !== "message" || item.role !== "user") continue; + if (messageId && item.id !== messageId) continue; + timeline[i] = { ...item, reaction: emoji }; + break; } - return { ...prev, [event.channel_id]: { ...state, timeline } }; + return { ...prev, [channelId]: { ...state, timeline } }; }); }, []); diff --git a/src/api/state.rs b/src/api/state.rs index 9a9a03c4f..c00c42e31 100644 --- a/src/api/state.rs +++ b/src/api/state.rs @@ -213,11 +213,13 @@ pub enum ApiEvent { /// "created", "updated", or "deleted". action: String, }, - /// An emoji reaction added to the latest message. + /// An emoji reaction added to a message. Reaction { agent_id: String, channel_id: String, emoji: String, + /// The conversation_messages ID the reaction was attached to. + message_id: Option, }, } diff --git a/src/conversation/history.rs b/src/conversation/history.rs index 2bdc5ff05..ac9cd3a5d 100644 --- a/src/conversation/history.rs +++ b/src/conversation/history.rs @@ -104,29 +104,51 @@ impl ConversationLogger { }); } - /// Set a reaction emoji on the most recent user message in a channel. Fire-and-forget. - pub fn log_reaction(&self, channel_id: &ChannelId, emoji: &str) { - let pool = self.pool.clone(); - let channel_id = channel_id.to_string(); - let emoji = emoji.to_string(); - - tokio::spawn(async move { - if let Err(error) = sqlx::query( - "UPDATE conversation_messages SET reaction = ? \ - WHERE id = ( \ - SELECT id FROM conversation_messages \ - WHERE channel_id = ? AND role = 'user' \ - ORDER BY created_at DESC LIMIT 1 \ - )", + /// Set a reaction emoji on the most recent user message in a channel. + /// + /// Returns the message ID the reaction was attached to, or `None` if no + /// matching user message was found. Retries a few times to handle the race + /// where the user message INSERT (fire-and-forget) hasn't landed yet. + pub async fn log_reaction(&self, channel_id: &ChannelId, emoji: &str) -> Option { + let channel_id_str = channel_id.to_string(); + + for attempt in 0..5u32 { + // Find the target message first so we can return its ID. + let target_id: Option = sqlx::query_scalar( + "SELECT id FROM conversation_messages \ + WHERE channel_id = ? AND role = 'user' \ + ORDER BY created_at DESC LIMIT 1", ) - .bind(&emoji) - .bind(&channel_id) - .execute(&pool) + .bind(&channel_id_str) + .fetch_optional(&self.pool) .await + .ok() + .flatten(); + + let Some(message_id) = target_id else { + if attempt < 4 { + tokio::time::sleep(std::time::Duration::from_millis(25)).await; + continue; + } + tracing::warn!("no user message found for reaction"); + return None; + }; + + match sqlx::query("UPDATE conversation_messages SET reaction = ? WHERE id = ?") + .bind(emoji) + .bind(&message_id) + .execute(&self.pool) + .await { - tracing::warn!(%error, "failed to persist reaction"); + Ok(_) => return Some(message_id), + Err(error) => { + tracing::warn!(%error, "failed to persist reaction"); + return None; + } } - }); + } + + None } /// Load recent messages for a channel (oldest first). diff --git a/src/lib.rs b/src/lib.rs index 47c01a54d..f29d50f57 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -381,7 +381,8 @@ pub enum OutboundResponse { caption: Option, }, /// Add a reaction emoji to the triggering message. - Reaction(String), + /// The optional message_id identifies the specific message being reacted to. + Reaction(String, Option), /// Remove a reaction emoji from the triggering message. /// No-op on platforms that don't support reaction removal. RemoveReaction(String), diff --git a/src/main.rs b/src/main.rs index 04e4be060..94c542c0f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -972,11 +972,12 @@ async fn run( is_typing: false, }).ok(); } - spacebot::OutboundResponse::Reaction(emoji) => { + spacebot::OutboundResponse::Reaction(emoji, message_id) => { api_event_tx.send(spacebot::api::ApiEvent::Reaction { agent_id: sse_agent_id.clone(), channel_id: sse_channel_id.clone(), emoji: emoji.clone(), + message_id: message_id.clone(), }).ok(); } _ => {} diff --git a/src/messaging/discord.rs b/src/messaging/discord.rs index 38e8fa0ed..eff59eee1 100644 --- a/src/messaging/discord.rs +++ b/src/messaging/discord.rs @@ -286,7 +286,7 @@ impl Messaging for DiscordAdapter { .await .context("failed to send file attachment")?; } - OutboundResponse::Reaction(emoji) => { + OutboundResponse::Reaction(emoji, _) => { let message_id = message .metadata .get("discord_message_id") diff --git a/src/messaging/email.rs b/src/messaging/email.rs index aceff2c18..130eb861f 100644 --- a/src/messaging/email.rs +++ b/src/messaging/email.rs @@ -423,7 +423,7 @@ impl Messaging for EmailAdapter { ) .await?; } - OutboundResponse::Reaction(_) + OutboundResponse::Reaction(..) | OutboundResponse::RemoveReaction(_) | OutboundResponse::Status(_) => {} OutboundResponse::Ephemeral { text, .. } => { @@ -506,7 +506,7 @@ impl Messaging for EmailAdapter { self.send_email(&recipient, "Spacebot message", text, None, Vec::new(), None) .await?; } - OutboundResponse::Reaction(_) + OutboundResponse::Reaction(..) | OutboundResponse::RemoveReaction(_) | OutboundResponse::Status(_) | OutboundResponse::StreamStart diff --git a/src/messaging/slack.rs b/src/messaging/slack.rs index 1dd63f6f7..b7c5523ab 100644 --- a/src/messaging/slack.rs +++ b/src/messaging/slack.rs @@ -933,7 +933,7 @@ impl Messaging for SlackAdapter { .context("failed to complete slack file upload")?; } - OutboundResponse::Reaction(emoji) => { + OutboundResponse::Reaction(emoji, _) => { let ts = extract_message_ts(message).context("missing slack_message_ts for reaction")?; let req = SlackApiReactionsAddRequest::new( @@ -1517,7 +1517,7 @@ fn variant_name(response: &OutboundResponse) -> &'static str { OutboundResponse::Text(_) => "Text", OutboundResponse::ThreadReply { .. } => "ThreadReply", OutboundResponse::File { .. } => "File", - OutboundResponse::Reaction(_) => "Reaction", + OutboundResponse::Reaction(..) => "Reaction", OutboundResponse::RemoveReaction(_) => "RemoveReaction", OutboundResponse::Ephemeral { .. } => "Ephemeral", OutboundResponse::RichMessage { .. } => "RichMessage", diff --git a/src/messaging/telegram.rs b/src/messaging/telegram.rs index ed0ff5267..cd8df06af 100644 --- a/src/messaging/telegram.rs +++ b/src/messaging/telegram.rs @@ -403,7 +403,7 @@ impl Messaging for TelegramAdapter { } } } - OutboundResponse::Reaction(emoji) => { + OutboundResponse::Reaction(emoji, _) => { let message_id = self.extract_message_id(message)?; let reaction = ReactionType::Emoji { diff --git a/src/messaging/twitch.rs b/src/messaging/twitch.rs index e80d79010..58bb9fb14 100644 --- a/src/messaging/twitch.rs +++ b/src/messaging/twitch.rs @@ -397,7 +397,7 @@ impl Messaging for TwitchAdapter { } OutboundResponse::StreamEnd => {} // Reactions, status updates, and Slack-specific variants aren't meaningful in Twitch chat - OutboundResponse::Reaction(_) + OutboundResponse::Reaction(..) | OutboundResponse::RemoveReaction(_) | OutboundResponse::Status(_) => {} OutboundResponse::Ephemeral { text, .. } => { diff --git a/src/messaging/webhook.rs b/src/messaging/webhook.rs index 65565fad6..95b0bee9f 100644 --- a/src/messaging/webhook.rs +++ b/src/messaging/webhook.rs @@ -195,7 +195,7 @@ impl Messaging for WebhookAdapter { caption: None, }, // Reactions, status updates, and remove-reaction aren't meaningful over webhook - OutboundResponse::Reaction(_) + OutboundResponse::Reaction(..) | OutboundResponse::RemoveReaction(_) | OutboundResponse::Status(_) => return Ok(()), // Slack-specific rich variants — fall back to plain text diff --git a/src/tools/react.rs b/src/tools/react.rs index 929e41d04..6101f05e2 100644 --- a/src/tools/react.rs +++ b/src/tools/react.rs @@ -76,11 +76,13 @@ impl Tool for ReactTool { async fn call(&self, args: Self::Args) -> Result { tracing::info!(emoji = %args.emoji, "react tool called"); - self.conversation_logger - .log_reaction(&self.channel_id, &args.emoji); + let message_id = self + .conversation_logger + .log_reaction(&self.channel_id, &args.emoji) + .await; self.response_tx - .send(OutboundResponse::Reaction(args.emoji.clone())) + .send(OutboundResponse::Reaction(args.emoji.clone(), message_id)) .await .map_err(|error| ReactError(format!("failed to send reaction: {error}")))?;