Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion interface/src/api/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ export interface ToolCompletedEvent {
result: string;
}

export interface ReactionEvent {
type: "reaction";
agent_id: string;
channel_id: string;
emoji: string;
message_id: string | null;
}

export type ApiEvent =
| InboundMessageEvent
| OutboundMessageEvent
Expand All @@ -119,7 +127,8 @@ export type ApiEvent =
| BranchStartedEvent
| BranchCompletedEvent
| ToolStartedEvent
| ToolCompletedEvent;
| ToolCompletedEvent
| ReactionEvent;

async function fetchJson<T>(path: string): Promise<T> {
const response = await fetch(`${API_BASE}${path}`);
Expand All @@ -137,6 +146,7 @@ export interface TimelineMessage {
sender_id: string | null;
content: string;
created_at: string;
reaction?: string | null;
}

export interface TimelineBranchRun {
Expand Down
39 changes: 24 additions & 15 deletions interface/src/components/WebChatPanel.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -184,24 +184,33 @@ export function WebChatPanel({ agentId }: WebChatPanelProps) {
</div>
)}

{timeline.map((item) => {
if (item.type !== "message") return null;
return (
<div key={item.id}>
{item.role === "user" ? (
<div className="flex justify-end">
<div className="max-w-[85%] rounded-2xl rounded-br-md bg-accent/10 px-4 py-2.5">
{timeline.map((item) => {
if (item.type !== "message") return null;
return (
<div key={item.id}>
{item.role === "user" ? (
<div className="flex justify-end">
<div className="relative max-w-[85%]">
<div className="rounded-2xl rounded-br-md bg-app-box/60 px-4 py-2.5">
<p className="text-sm text-ink">{item.content}</p>
</div>
{item.reaction && (
<div className="absolute -bottom-2.5 left-1">
<span className="flex h-6 items-center rounded-full bg-app-box/60 px-1.5 text-sm leading-none">
{item.reaction}
</span>
</div>
)}
</div>
) : (
<div className="text-sm text-ink-dull">
<Markdown>{item.content}</Markdown>
</div>
)}
</div>
);
})}
</div>
) : (
<div className="text-sm text-ink-dull">
<Markdown>{item.content}</Markdown>
</div>
)}
</div>
);
})}

{/* Typing indicator */}
{isTyping && <ThinkingIndicator />}
Expand Down
24 changes: 24 additions & 0 deletions interface/src/hooks/useChannelLiveState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
type BranchStartedEvent,
type InboundMessageEvent,
type OutboundMessageEvent,
type ReactionEvent,
type TimelineItem,
type ToolCompletedEvent,
type ToolStartedEvent,
Expand Down Expand Up @@ -568,6 +569,28 @@ export function useChannelLiveState(channels: ChannelInfo[]) {
}
}, []);

const handleReaction = useCallback((data: unknown) => {
const event = data as Partial<ReactionEvent>;
if (typeof event.channel_id !== "string" || typeof event.emoji !== "string") return;
const channelId = event.channel_id;
const emoji = event.emoji;
const messageId = event.message_id ?? null;
setLiveStates((prev) => {
const state = prev[channelId];
if (!state) return prev;
const timeline = [...state.timeline];
// Target by message_id when available, fall back to latest user message
for (let i = timeline.length - 1; i >= 0; i--) {
const item = timeline[i];
if (item.type !== "message" || item.role !== "user") continue;
if (messageId && item.id !== messageId) continue;
timeline[i] = { ...item, reaction: emoji };
break;
}
return { ...prev, [channelId]: { ...state, timeline } };
});
}, []);

const loadOlderMessages = useCallback((channelId: string) => {
setLiveStates((prev) => {
const state = prev[channelId];
Expand Down Expand Up @@ -621,6 +644,7 @@ export function useChannelLiveState(channels: ChannelInfo[]) {
branch_completed: handleBranchCompleted,
tool_started: handleToolStarted,
tool_completed: handleToolCompleted,
reaction: handleReaction,
};

return { liveStates, handlers, syncStatusSnapshot, loadOlderMessages };
Expand Down
1 change: 1 addition & 0 deletions migrations/20260228000001_message_reactions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE conversation_messages ADD COLUMN reaction TEXT;
8 changes: 8 additions & 0 deletions src/api/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,14 @@ pub enum ApiEvent {
/// "created", "updated", or "deleted".
action: String,
},
/// An emoji reaction added to a message.
Reaction {
agent_id: String,
channel_id: String,
emoji: String,
/// The conversation_messages ID the reaction was attached to.
message_id: Option<String>,
},
}

impl ApiState {
Expand Down
1 change: 1 addition & 0 deletions src/api/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub(super) async fn events_sse(
ApiEvent::AgentMessageSent { .. } => "agent_message_sent",
ApiEvent::AgentMessageReceived { .. } => "agent_message_received",
ApiEvent::TaskUpdated { .. } => "task_updated",
ApiEvent::Reaction { .. } => "reaction",
};
yield Ok(axum::response::sse::Event::default()
.event(event_type)
Expand Down
56 changes: 53 additions & 3 deletions src/conversation/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,53 @@ impl ConversationLogger {
});
}

/// Set a reaction emoji on the most recent user message in a channel.
///
/// Returns the message ID the reaction was attached to, or `None` if no
/// matching user message was found. Retries a few times to handle the race
/// where the user message INSERT (fire-and-forget) hasn't landed yet.
pub async fn log_reaction(&self, channel_id: &ChannelId, emoji: &str) -> Option<String> {
let channel_id_str = channel_id.to_string();
Comment on lines +112 to +113
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

log_reaction diverges from the repository’s fire-and-forget write policy.

Line 112 introduces an awaited conversation-history write path, while this module’s write operations are expected to remain fire-and-forget via spawned tasks.

As per coding guidelines "Use fire-and-forget DB writes with tokio::spawn for conversation history saves, memory writes, and worker log persistence".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/conversation/history.rs` around lines 112 - 113, The log_reaction
function currently awaits a DB write and must be converted to a fire-and-forget
write: do not await the write path inside log_reaction; instead spawn a
background task with tokio::spawn that performs the conversation-history write.
Capture/clone any required data (e.g., channel_id_str, emoji.to_string(), and a
clone or Arc of self or the db handle used by log_reaction) and move them into
the spawned task so log_reaction can return immediately; ensure any return value
(Option<String>) is produced synchronously before spawning or by generating the
ID locally and then performing the persistence inside the spawned task.


for attempt in 0..5u32 {
// Find the target message first so we can return its ID.
let target_id: Option<String> = sqlx::query_scalar(
"SELECT id FROM conversation_messages \
WHERE channel_id = ? AND role = 'user' \
ORDER BY created_at DESC LIMIT 1",
)
.bind(&channel_id_str)
.fetch_optional(&self.pool)
.await
.ok()
.flatten();
Comment on lines +117 to +126
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don’t swallow DB lookup failures in reaction targeting.

Line 125 converts query errors into None via .ok().flatten(), which masks real DB failures and can emit a misleading “no user message found” path.

🔧 Proposed fix
-            let target_id: Option<String> = sqlx::query_scalar(
+            let target_id: Option<String> = match sqlx::query_scalar(
                 "SELECT id FROM conversation_messages \
                  WHERE channel_id = ? AND role = 'user' \
                  ORDER BY created_at DESC LIMIT 1",
             )
             .bind(&channel_id_str)
             .fetch_optional(&self.pool)
             .await
-            .ok()
-            .flatten();
+            {
+                Ok(value) => value,
+                Err(error) => {
+                    tracing::warn!(
+                        %error,
+                        channel_id = %channel_id_str,
+                        "failed to resolve reaction target message"
+                    );
+                    return None;
+                }
+            };

As per coding guidelines "Don't silently discard errors. No let _ = on Results. Handle them, log them, or propagate them."

Also applies to: 133-134

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/conversation/history.rs` around lines 117 - 126, The DB query for
target_id currently swallows errors via
`.fetch_optional(&self.pool).await.ok().flatten()` which masks real failures;
replace that pattern by handling the Result from `.await` explicitly — either
propagate the error (return a Result from the surrounding function) or log the
error and only then decide to treat it as None; update the
`sqlx::query_scalar(...).bind(&channel_id_str).fetch_optional(&self.pool).await`
handling accordingly (do not use `.ok().flatten()`), and apply the same explicit
error handling to the similar query at the other occurrence (lines around the
second `.ok().flatten()`).


let Some(message_id) = target_id else {
if attempt < 4 {
tokio::time::sleep(std::time::Duration::from_millis(25)).await;
continue;
}
tracing::warn!("no user message found for reaction");
return None;
};

match sqlx::query("UPDATE conversation_messages SET reaction = ? WHERE id = ?")
.bind(emoji)
.bind(&message_id)
.execute(&self.pool)
.await
{
Ok(_) => return Some(message_id),
Err(error) => {
tracing::warn!(%error, "failed to persist reaction");
return None;
}
}
}

None
}

/// Load recent messages for a channel (oldest first).
pub async fn load_recent(
&self,
Expand Down Expand Up @@ -196,6 +243,8 @@ pub enum TimelineItem {
sender_id: Option<String>,
content: String,
created_at: String,
#[serde(skip_serializing_if = "Option::is_none")]
reaction: Option<String>,
},
BranchRun {
id: String,
Expand Down Expand Up @@ -360,17 +409,17 @@ impl ProcessRunLogger {

let query_str = format!(
"SELECT * FROM ( \
SELECT 'message' AS item_type, id, role, sender_name, sender_id, content, \
SELECT 'message' AS item_type, id, role, sender_name, sender_id, content, reaction, \
NULL AS description, NULL AS conclusion, NULL AS task, NULL AS result, NULL AS status, \
created_at AS timestamp, NULL AS completed_at \
FROM conversation_messages WHERE channel_id = ?1 \
UNION ALL \
SELECT 'branch_run' AS item_type, id, NULL, NULL, NULL, NULL, \
SELECT 'branch_run' AS item_type, id, NULL, NULL, NULL, NULL, NULL, \
description, conclusion, NULL, NULL, NULL, \
started_at AS timestamp, completed_at \
FROM branch_runs WHERE channel_id = ?1 \
UNION ALL \
SELECT 'worker_run' AS item_type, id, NULL, NULL, NULL, NULL, \
SELECT 'worker_run' AS item_type, id, NULL, NULL, NULL, NULL, NULL, \
NULL, NULL, task, result, status, \
started_at AS timestamp, completed_at \
FROM worker_runs WHERE channel_id = ?1 \
Expand Down Expand Up @@ -403,6 +452,7 @@ impl ProcessRunLogger {
.try_get::<chrono::DateTime<chrono::Utc>, _>("timestamp")
.map(|t| t.to_rfc3339())
.unwrap_or_default(),
reaction: row.try_get("reaction").ok().flatten(),
}),
"branch_run" => Some(TimelineItem::BranchRun {
id: row.try_get("id").unwrap_or_default(),
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ pub enum OutboundResponse {
caption: Option<String>,
},
/// Add a reaction emoji to the triggering message.
Reaction(String),
/// The optional message_id identifies the specific message being reacted to.
Reaction(String, Option<String>),
/// Remove a reaction emoji from the triggering message.
/// No-op on platforms that don't support reaction removal.
RemoveReaction(String),
Expand Down
8 changes: 8 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1691,6 +1691,14 @@ async fn run(
is_typing: false,
}).ok();
}
spacebot::OutboundResponse::Reaction(emoji, message_id) => {
api_event_tx.send(spacebot::api::ApiEvent::Reaction {
agent_id: sse_agent_id.clone(),
channel_id: sse_channel_id.clone(),
emoji: emoji.clone(),
message_id: message_id.clone(),
}).ok();
}
_ => {}
}

Expand Down
2 changes: 1 addition & 1 deletion src/messaging/discord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ impl Messaging for DiscordAdapter {
.await
.context("failed to send file attachment")?;
}
OutboundResponse::Reaction(emoji) => {
OutboundResponse::Reaction(emoji, _) => {
let message_id = message
.metadata
.get("discord_message_id")
Expand Down
4 changes: 2 additions & 2 deletions src/messaging/email.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ impl Messaging for EmailAdapter {
)
.await?;
}
OutboundResponse::Reaction(_)
OutboundResponse::Reaction(..)
| OutboundResponse::RemoveReaction(_)
| OutboundResponse::Status(_) => {}
OutboundResponse::Ephemeral { text, .. } => {
Expand Down Expand Up @@ -506,7 +506,7 @@ impl Messaging for EmailAdapter {
self.send_email(&recipient, "Spacebot message", text, None, Vec::new(), None)
.await?;
}
OutboundResponse::Reaction(_)
OutboundResponse::Reaction(..)
| OutboundResponse::RemoveReaction(_)
| OutboundResponse::Status(_)
| OutboundResponse::StreamStart
Expand Down
4 changes: 2 additions & 2 deletions src/messaging/slack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ impl Messaging for SlackAdapter {
.context("failed to complete slack file upload")?;
}

OutboundResponse::Reaction(emoji) => {
OutboundResponse::Reaction(emoji, _) => {
let ts =
extract_message_ts(message).context("missing slack_message_ts for reaction")?;
let req = SlackApiReactionsAddRequest::new(
Expand Down Expand Up @@ -1544,7 +1544,7 @@ fn variant_name(response: &OutboundResponse) -> &'static str {
OutboundResponse::Text(_) => "Text",
OutboundResponse::ThreadReply { .. } => "ThreadReply",
OutboundResponse::File { .. } => "File",
OutboundResponse::Reaction(_) => "Reaction",
OutboundResponse::Reaction(..) => "Reaction",
OutboundResponse::RemoveReaction(_) => "RemoveReaction",
OutboundResponse::Ephemeral { .. } => "Ephemeral",
OutboundResponse::RichMessage { .. } => "RichMessage",
Expand Down
2 changes: 1 addition & 1 deletion src/messaging/telegram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ impl Messaging for TelegramAdapter {
}
}
}
OutboundResponse::Reaction(emoji) => {
OutboundResponse::Reaction(emoji, _) => {
let message_id = self.extract_message_id(message)?;

let reaction = ReactionType::Emoji {
Expand Down
2 changes: 1 addition & 1 deletion src/messaging/twitch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ impl Messaging for TwitchAdapter {
}
OutboundResponse::StreamEnd => {}
// Reactions, status updates, and Slack-specific variants aren't meaningful in Twitch chat
OutboundResponse::Reaction(_)
OutboundResponse::Reaction(..)
| OutboundResponse::RemoveReaction(_)
| OutboundResponse::Status(_) => {}
OutboundResponse::Ephemeral { text, .. } => {
Expand Down
2 changes: 1 addition & 1 deletion src/messaging/webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl Messaging for WebhookAdapter {
caption: None,
},
// Reactions, status updates, and remove-reaction aren't meaningful over webhook
OutboundResponse::Reaction(_)
OutboundResponse::Reaction(..)
| OutboundResponse::RemoveReaction(_)
| OutboundResponse::Status(_) => return Ok(()),
// Slack-specific rich variants — fall back to plain text
Expand Down
10 changes: 8 additions & 2 deletions src/tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,17 @@ pub async fn add_channel_tools(
state.deps.runtime_config.workspace_dir.clone(),
))
.await?;
handle.add_tool(CancelTool::new(state)).await?;
handle
.add_tool(SkipTool::new(skip_flag.clone(), response_tx.clone()))
.await?;
handle.add_tool(ReactTool::new(response_tx.clone())).await?;
handle
.add_tool(ReactTool::new(
response_tx.clone(),
state.conversation_logger.clone(),
state.channel_id.clone(),
))
.await?;
handle.add_tool(CancelTool::new(state)).await?;
if let Some(cron_tool) = cron_tool {
let cron_tool = cron_tool.with_default_delivery_target(
default_delivery_target_for_conversation(&conversation_id),
Expand Down
Loading
Loading