From 17146d5c9d10498ee60b1d8c7173baaa82af585d Mon Sep 17 00:00:00 2001 From: bhagwan Date: Fri, 6 Mar 2026 17:06:34 -0500 Subject: [PATCH 1/2] fix(channel): surface LLM errors to users instead of silently logging When an LLM call fails (e.g., provider error like StepFun's 'Unrecognized chat message'), the channel was only logging the error and sending nothing to the user. This led to confusing silent failures where the user received no response and had to check logs to understand what happened. Changes: - src/agent/channel.rs: Modified error handler to send error to user - Formats error message and sends via response_tx - User now sees: 'I encountered an error: ...' - Still logs full error for debugging This ensures users always receive feedback when something goes wrong, even if it's an internal/provider error rather than a channel bug. --- src/agent/channel.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/agent/channel.rs b/src/agent/channel.rs index 4daddff19..3c55ebb35 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -2582,6 +2582,12 @@ impl Channel { .channel_errors_total .with_label_values(&[metrics_agent_id, metrics_channel_type, "llm_error"]) .inc(); + // Send error to user so they know something went wrong + let error_msg = format!("I encountered an error: {}", error); + let _ = self + .response_tx + .send(OutboundResponse::Text(error_msg)) + .await; tracing::error!(channel_id = %self.id, %error, "channel LLM call failed"); } } From 11064dd12c8eac89ac75630cc9c00026fe523281 Mon Sep 17 00:00:00 2001 From: bhagwan Date: Wed, 4 Mar 2026 10:33:21 -0500 Subject: [PATCH 2/2] feat(messaging): add Signal adapter via signal-cli JSON-RPC daemon MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements Signal messaging support using the signal-cli daemon HTTP API, following the existing adapter architecture (Telegram, Discord, Slack, Twitch). - Inbound: SSE stream with automatic reconnection and exponential backoff (2s → 60s), UTF-8 chunk boundary handling, buffer overflow protection. - Outbound: JSON-RPC send calls. DM recipients must be a JSON array. - Typing indicators: JSON-RPC sendTyping with ~5s expiry. - Attachments: Temp files in {instance_dir}/tmp/, auto-cleaned after send. - Streaming: Not supported (Signal can't edit messages). - Permissions: DM allowlist + group filter (None = block all groups). Config types in types.rs, toml_schema.rs, load.rs. SignalPermissions in permissions.rs with from_config/from_instance_config. Hot-reload support in watcher.rs. 23 unit tests. Add to config.toml: ```toml [messaging.signal] enabled = true http_url = "http://127.0.0.1:8686" account = "+1234567890" dm_allowed_users = ["+0987654321"] group_ids = ["", ""] group_allowed_users = ["+5566778899", "+1122334455"] ignore_stories = true [[messaging.signal.instances]] name = "work" enabled = true http_url = "http://127.0.0.1:8687" account = "+1122334455" dm_allowed_users = ["+5566778899"] group_ids = ["", ""] group_allowed_users = ["+1122334455"] ``` Requires signal-cli daemon running: `signal-cli daemon --http` Closes #310 --- prompts/en/adapters/signal.md.j2 | 10 + .../en/tools/send_message_description.md.j2 | 5 + src/agent/channel.rs | 36 +- src/agent/channel_history.rs | 12 +- src/config.rs | 8 +- src/config/load.rs | 59 +- src/config/permissions.rs | 179 +- src/config/toml_schema.rs | 40 + src/config/types.rs | 124 +- src/config/watcher.rs | 70 +- src/main.rs | 60 + src/messaging.rs | 3 +- src/messaging/signal.rs | 2057 +++++++++++++++++ src/messaging/target.rs | 339 ++- src/prompts/engine.rs | 5 + src/prompts/text.rs | 1 + src/secrets/store.rs | 5 +- src/tools.rs | 2 + src/tools/send_message_to_another_channel.rs | 295 ++- tests/context_dump.rs | 2 + 20 files changed, 3281 insertions(+), 31 deletions(-) create mode 100644 prompts/en/adapters/signal.md.j2 create mode 100644 src/messaging/signal.rs diff --git a/prompts/en/adapters/signal.md.j2 b/prompts/en/adapters/signal.md.j2 new file mode 100644 index 000000000..4fe6ae238 --- /dev/null +++ b/prompts/en/adapters/signal.md.j2 @@ -0,0 +1,10 @@ +## Signal Adapter Guidance + +You are in a Signal conversation. Signal supports cross-channel messaging — you can send messages to other Signal users and groups using the `send_message_to_another_channel` tool. + +**Supported Signal targets:** +- `signal:uuid:{uuid}` — Send to a Signal user by their UUID +- `signal:group:{group_id}` — Send to a Signal group +- `signal:+{phone}` or `signal:e164:+{phone}` — Send to a Signal user by phone number + +Use this when the user asks you to message someone else on Signal or post to a Signal group. \ No newline at end of file diff --git a/prompts/en/tools/send_message_description.md.j2 b/prompts/en/tools/send_message_description.md.j2 index 69d427adb..8ace81438 100644 --- a/prompts/en/tools/send_message_description.md.j2 +++ b/prompts/en/tools/send_message_description.md.j2 @@ -1 +1,6 @@ Send a message to a DIFFERENT channel than the one you are currently in. Use this for cross-channel delivery — reminders, notifications, or when the user asks you to post something in another channel or DM them. Do NOT use this to reply in the current conversation — use the `reply` tool for that. Target channels by name or ID from the available channels in your context. + +For Signal messaging, you can use these explicit targets: +- `signal:uuid:{uuid}` - Send to a Signal user by their UUID +- `signal:group:{group_id}` - Send to a Signal group +- `signal:+{phone}` or `signal:e164:+{phone}` — Send to a Signal user by phone number diff --git a/src/agent/channel.rs b/src/agent/channel.rs index 3c55ebb35..89680b7a1 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -823,7 +823,7 @@ impl Channel { } let supported_source = matches!( message.source.as_str(), - "telegram" | "discord" | "slack" | "twitch" + "telegram" | "discord" | "slack" | "twitch" | "signal" ); if !supported_source { return Ok(false); @@ -1202,11 +1202,13 @@ impl Channel { self.conversation_id = Some(first.conversation_id.clone()); } + // Track source adapter from the first non-system message + // Prefer message.adapter (full adapter string like "signal:work") over message.source if self.source_adapter.is_none() && let Some(first) = messages.first() && first.source != "system" { - self.source_adapter = Some(first.source.clone()); + self.source_adapter = first.adapter.clone().or_else(|| Some(first.source.clone())); } // Capture conversation context from the first message @@ -1408,6 +1410,13 @@ impl Channel { .build_system_prompt_with_coalesce(message_count, elapsed_secs, unique_sender_count) .await?; + // Extract adapter from messages (prefer explicit message.adapter, fall back to stored source_adapter) + // This preserves per-message adapter for Signal named instances (e.g., "signal:work") + let batch_adapter = messages + .iter() + .find_map(|m| m.adapter.as_deref()) + .or(self.source_adapter.as_deref()); + { let mut reply_target = self.state.reply_target_message_id.write().await; *reply_target = messages.iter().rev().find_map(extract_message_id); @@ -1421,6 +1430,7 @@ impl Channel { &conversation_id, attachment_parts, false, // not a retrigger + batch_adapter, ) .await?; @@ -1552,8 +1562,13 @@ impl Channel { self.conversation_id = Some(message.conversation_id.clone()); } + // Track source adapter from non-system messages + // Prefer message.adapter (full adapter string like "signal:work") over message.source if self.source_adapter.is_none() && message.source != "system" { - self.source_adapter = Some(message.source.clone()); + self.source_adapter = message + .adapter + .clone() + .or_else(|| Some(message.source.clone())); } let (raw_text, attachments) = match &message.content { @@ -1743,6 +1758,10 @@ impl Channel { Vec::new() }; + let adapter = message + .adapter + .as_deref() + .or_else(|| self.current_adapter()); let (result, skip_flag, replied_flag, retrigger_reply_preserved) = self .run_agent_turn( &user_text, @@ -1750,6 +1769,7 @@ impl Channel { &message.conversation_id, attachment_content, is_retrigger, + adapter, ) .await?; @@ -1765,7 +1785,7 @@ impl Channel { && !replied_flag.load(std::sync::atomic::Ordering::Relaxed) && matches!( message.source.as_str(), - "discord" | "telegram" | "slack" | "twitch" + "discord" | "telegram" | "slack" | "twitch" | "signal" ) { self.send_builtin_text( @@ -2171,6 +2191,7 @@ impl Channel { conversation_id: &str, attachment_content: Vec, is_retrigger: bool, + adapter: Option<&str>, ) -> Result<( std::result::Result, crate::tools::SkipFlag, @@ -2198,6 +2219,7 @@ impl Channel { self.deps.cron_tool.clone(), send_agent_message_tool, allow_direct_reply, + adapter.map(|s| s.to_string()), ) .await { @@ -2584,10 +2606,10 @@ impl Channel { .inc(); // Send error to user so they know something went wrong let error_msg = format!("I encountered an error: {}", error); - let _ = self - .response_tx + self.response_tx .send(OutboundResponse::Text(error_msg)) - .await; + .await + .ok(); tracing::error!(channel_id = %self.id, %error, "channel LLM call failed"); } } diff --git a/src/agent/channel_history.rs b/src/agent/channel_history.rs index 56764a256..24d330867 100644 --- a/src/agent/channel_history.rs +++ b/src/agent/channel_history.rs @@ -317,7 +317,17 @@ pub(crate) fn format_user_message( raw_text }; - format!("{display_name}{bot_tag}{reply_context} [{timestamp_text}]: {text_content}") + let sender_context = message + .metadata + .get("sender_context") + .and_then(|v| v.as_str()) + .filter(|s| !s.is_empty()) + .map(|s| format!(" {s}")) + .unwrap_or_default(); + + format!( + "{display_name}{bot_tag}{reply_context}{sender_context} [{timestamp_text}]: {text_content}" + ) } pub(crate) fn format_batched_user_message( diff --git a/src/config.rs b/src/config.rs index 11318cab8..44d066f79 100644 --- a/src/config.rs +++ b/src/config.rs @@ -15,7 +15,7 @@ pub(crate) use load::resolve_env_value; pub use load::set_resolve_secrets_store; pub use onboarding::run_onboarding; pub use permissions::{ - DiscordPermissions, SlackPermissions, TelegramPermissions, TwitchPermissions, + DiscordPermissions, SignalPermissions, SlackPermissions, TelegramPermissions, TwitchPermissions, }; pub(crate) use providers::default_provider_config; pub use runtime::RuntimeConfig; @@ -1539,6 +1539,7 @@ maintenance_merge_similarity_threshold = 1.1 email: None, webhook: None, twitch: None, + signal: None, }; let bindings = vec![ Binding { @@ -1548,6 +1549,7 @@ maintenance_merge_similarity_threshold = 1.1 guild_id: None, workspace_id: None, chat_id: None, + channel_ids: vec![], require_mention: false, dm_allowed_users: vec![], @@ -1559,6 +1561,7 @@ maintenance_merge_similarity_threshold = 1.1 guild_id: None, workspace_id: None, chat_id: None, + channel_ids: vec![], require_mention: false, dm_allowed_users: vec![], @@ -1581,6 +1584,7 @@ maintenance_merge_similarity_threshold = 1.1 email: None, webhook: None, twitch: None, + signal: None, }; let bindings = vec![Binding { agent_id: "main".into(), @@ -1643,6 +1647,7 @@ maintenance_merge_similarity_threshold = 1.1 }), webhook: None, twitch: None, + signal: None, }; let bindings = vec![Binding { agent_id: "main".into(), @@ -1677,6 +1682,7 @@ maintenance_merge_similarity_threshold = 1.1 email: None, webhook: None, twitch: None, + signal: None, }; // Binding targets default adapter, but no default credentials exist let bindings = vec![Binding { diff --git a/src/config/load.rs b/src/config/load.rs index 3da09e61f..eec7d3d0b 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -14,10 +14,10 @@ use super::{ CoalesceConfig, CompactionConfig, Config, CortexConfig, CronDef, DefaultsConfig, DiscordConfig, DiscordInstanceConfig, EmailConfig, EmailInstanceConfig, GroupDef, HumanDef, IngestionConfig, LinkDef, LlmConfig, McpServerConfig, McpTransport, MemoryPersistenceConfig, MessagingConfig, - MetricsConfig, OpenCodeConfig, ProjectsConfig, ProviderConfig, SlackCommandConfig, SlackConfig, - SlackInstanceConfig, TelegramConfig, TelegramInstanceConfig, TelemetryConfig, TwitchConfig, - TwitchInstanceConfig, WarmupConfig, WebhookConfig, normalize_adapter, - validate_named_messaging_adapters, + MetricsConfig, OpenCodeConfig, ProjectsConfig, ProviderConfig, SignalConfig, + SignalInstanceConfig, SlackCommandConfig, SlackConfig, SlackInstanceConfig, TelegramConfig, + TelegramInstanceConfig, TelemetryConfig, TwitchConfig, TwitchInstanceConfig, WarmupConfig, + WebhookConfig, normalize_adapter, validate_named_messaging_adapters, }; use crate::error::{ConfigError, Result}; @@ -2148,6 +2148,57 @@ impl Config { trigger_prefix: t.trigger_prefix, }) }), + signal: toml.messaging.signal.and_then(|s| { + let instances = s + .instances + .into_iter() + .map(|instance| { + let http_url = instance.http_url.as_deref().and_then(resolve_env_value); + let account = instance.account.as_deref().and_then(resolve_env_value); + if instance.enabled && (http_url.is_none() || account.is_none()) { + tracing::warn!( + adapter = %instance.name, + "signal instance is enabled but http_url or account is missing/unresolvable — disabling" + ); + } + let has_credentials = http_url.is_some() && account.is_some(); + SignalInstanceConfig { + name: instance.name, + enabled: instance.enabled && has_credentials, + http_url: http_url.unwrap_or_default(), + account: account.unwrap_or_default(), + dm_allowed_users: instance.dm_allowed_users, + group_ids: instance.group_ids, + group_allowed_users: instance.group_allowed_users, + ignore_stories: instance.ignore_stories, + } + }) + .collect::>(); + + let http_url = std::env::var("SIGNAL_HTTP_URL") + .ok() + .or_else(|| s.http_url.as_deref().and_then(resolve_env_value)); + let account = std::env::var("SIGNAL_ACCOUNT") + .ok() + .or_else(|| s.account.as_deref().and_then(resolve_env_value)); + + if (http_url.is_none() || account.is_none()) + && !instances.iter().any(|inst| inst.enabled) + { + return None; + } + + Some(SignalConfig { + enabled: s.enabled, + http_url: http_url.unwrap_or_default(), + account: account.unwrap_or_default(), + instances, + dm_allowed_users: s.dm_allowed_users, + group_ids: s.group_ids, + group_allowed_users: s.group_allowed_users, + ignore_stories: s.ignore_stories, + }) + }), }; let bindings: Vec = toml diff --git a/src/config/permissions.rs b/src/config/permissions.rs index 8844cf614..3c0713228 100644 --- a/src/config/permissions.rs +++ b/src/config/permissions.rs @@ -1,6 +1,7 @@ use super::{ - Binding, DiscordConfig, DiscordInstanceConfig, SlackConfig, SlackInstanceConfig, - TelegramConfig, TelegramInstanceConfig, TwitchConfig, TwitchInstanceConfig, + Binding, DiscordConfig, DiscordInstanceConfig, SignalConfig, SignalInstanceConfig, SlackConfig, + SlackInstanceConfig, TelegramConfig, TelegramInstanceConfig, TwitchConfig, + TwitchInstanceConfig, }; use std::collections::HashMap; @@ -324,6 +325,135 @@ impl TwitchPermissions { } } +/// Hot-reloadable Signal permission filters. +/// +/// Shared with the Signal adapter via `Arc>` for hot-reloading. +/// Uses string-based identifiers since Signal users are identified by phone +/// numbers (E.164) or UUIDs. +/// +/// Wildcards: +/// - `"*"` in `dm_allowed_users` means allow all DM users +/// - `"*"` in `group_allowed_users` means allow all group users +/// - `"*"` in `group_filter` means allow all groups +/// - Empty array means block all (the `"*"` must be explicitly set to allow all) +#[derive(Debug, Clone, Default)] +pub struct SignalPermissions { + /// Allowed group IDs. None = block all, Some(["*"]) = allow all, Some([...]) = specific list. + pub group_filter: Option>, + /// Phone numbers or UUIDs allowed to DM the bot. ["*"] = allow all, [] = block all. + /// Only applies to direct messages. + pub dm_allowed_users: Vec, + /// Phone numbers or UUIDs allowed in group messages. ["*"] = allow all, [] = block all. + /// For groups, both dm_allowed_users AND group_allowed_users are checked (merged). + pub group_allowed_users: Vec, +} + +impl SignalPermissions { + /// Build from the current config's signal settings. + pub fn from_config(signal: &SignalConfig) -> Self { + Self::build_from_seed( + signal.dm_allowed_users.clone(), + signal.group_ids.clone(), + signal.group_allowed_users.clone(), + ) + } + + /// Build permissions for a named Signal adapter instance. + pub fn from_instance_config(instance: &SignalInstanceConfig) -> Self { + Self::build_from_seed( + instance.dm_allowed_users.clone(), + instance.group_ids.clone(), + instance.group_allowed_users.clone(), + ) + } + + fn build_from_seed( + seed_dm_allowed_users: Vec, + seed_group_ids: Vec, + seed_group_allowed_users: Vec, + ) -> Self { + // Group filter: collect group_ids from signal config/instance. + // - "*" means allow all groups + // - Empty list means block all groups + // - Specific IDs means only those groups are allowed + let mut group_filter_wildcard = false; + let group_filter = { + let mut all_group_ids: Vec = Vec::new(); + + // Process seed_group_ids with validation + for id in &seed_group_ids { + let id = id.trim().to_string(); + if id.is_empty() { + continue; + } + if id == "*" { + group_filter_wildcard = true; + break; + } + // Signal group IDs are base64-encoded; validate format. + if !is_valid_base64(&id) { + tracing::warn!( + group_id = %id, + "signal: seed group_id is not valid base64, dropping" + ); + continue; + } + if !all_group_ids.contains(&id) { + all_group_ids.push(id); + } + } + + if group_filter_wildcard { + Some(vec!["*".to_string()]) + } else if all_group_ids.is_empty() { + None + } else { + Some(all_group_ids) + } + }; + + // Build dm_allowed_users separately (for DMs only) + // - "*" means allow all DM users + // - Empty list means block all DMs + // - Specific list means only those users are allowed for DMs + let dm_users: Vec = seed_dm_allowed_users + .iter() + .filter_map(|id| { + let trimmed = id.trim(); + (!trimmed.is_empty()).then(|| trimmed.to_string()) + }) + .collect(); + let dm_wildcard = dm_users.iter().any(|id| id == "*"); + + // Build group_allowed_users separately (for groups only) + // - "*" means allow all group users + // - Empty list means block all group users + // - Specific list means only those users are allowed in groups + let group_users: Vec = seed_group_allowed_users + .iter() + .filter_map(|id| { + let trimmed = id.trim(); + (!trimmed.is_empty()).then(|| trimmed.to_string()) + }) + .collect(); + let group_wildcard = group_users.iter().any(|id| id == "*"); + + Self { + group_filter, + dm_allowed_users: if dm_wildcard { + vec!["*".to_string()] + } else { + dm_users + }, + group_allowed_users: if group_wildcard { + vec!["*".to_string()] + } else { + group_users + }, + } + } +} + fn binding_adapter_selector_matches(binding: &Binding, adapter_selector: Option<&str>) -> bool { match (binding.adapter.as_deref(), adapter_selector) { (None, None) => true, @@ -333,3 +463,48 @@ fn binding_adapter_selector_matches(binding: &Binding, adapter_selector: Option< _ => false, } } + +/// Check if a string is valid base64 (URL-safe or standard). +/// Signal group IDs are base64-encoded. +fn is_valid_base64(s: &str) -> bool { + use base64::{ + Engine, engine::general_purpose::STANDARD, engine::general_purpose::URL_SAFE, + engine::general_purpose::URL_SAFE_NO_PAD, + }; + + let trimmed = s.trim(); + if trimmed.is_empty() { + return false; + } + + URL_SAFE_NO_PAD.decode(trimmed).is_ok() + || URL_SAFE.decode(trimmed).is_ok() + || STANDARD.decode(trimmed).is_ok() +} + +#[cfg(test)] +mod base64_tests { + use super::is_valid_base64; + + #[test] + fn test_valid_url_safe_base64() { + // URL-safe base64 without padding (common for Signal group IDs) + assert!(is_valid_base64("abc123def456")); + assert!(is_valid_base64("abc123_def_456__")); // URL-safe with underscores + } + + #[test] + fn test_valid_standard_base64() { + // Standard base64 + assert!(is_valid_base64("abc123DEF456")); + assert!(is_valid_base64("SGVsbG8gV29ybGQ=")); // "Hello World" + } + + #[test] + fn test_invalid_base64() { + // Invalid characters not in base64 alphabet + assert!(!is_valid_base64("not@valid!base64")); + assert!(!is_valid_base64("")); + assert!(!is_valid_base64(" ")); + } +} diff --git a/src/config/toml_schema.rs b/src/config/toml_schema.rs index 969333053..d4453f2e0 100644 --- a/src/config/toml_schema.rs +++ b/src/config/toml_schema.rs @@ -496,6 +496,7 @@ pub(super) struct TomlMessagingConfig { pub(super) email: Option, pub(super) webhook: Option, pub(super) twitch: Option, + pub(super) signal: Option, } #[derive(Deserialize)] @@ -687,6 +688,45 @@ pub(super) struct TomlTwitchInstanceConfig { pub(super) trigger_prefix: Option, } +#[derive(Deserialize)] +pub(super) struct TomlSignalConfig { + #[serde(default)] + pub(super) enabled: bool, + pub(super) http_url: Option, + pub(super) account: Option, + #[serde(default)] + pub(super) instances: Vec, + #[serde(default)] + pub(super) dm_allowed_users: Vec, + #[serde(default)] + pub(super) group_ids: Vec, + #[serde(default)] + pub(super) group_allowed_users: Vec, + #[serde(default = "default_signal_ignore_stories")] + pub(super) ignore_stories: bool, +} + +#[derive(Deserialize)] +pub(super) struct TomlSignalInstanceConfig { + pub(super) name: String, + #[serde(default)] + pub(super) enabled: bool, + pub(super) http_url: Option, + pub(super) account: Option, + #[serde(default)] + pub(super) dm_allowed_users: Vec, + #[serde(default)] + pub(super) group_ids: Vec, + #[serde(default)] + pub(super) group_allowed_users: Vec, + #[serde(default = "default_signal_ignore_stories")] + pub(super) ignore_stories: bool, +} + +pub(super) fn default_signal_ignore_stories() -> bool { + true +} + pub(super) fn default_webhook_port() -> u16 { 18789 } diff --git a/src/config/types.rs b/src/config/types.rs index 6c6111f66..93b66982e 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -1461,7 +1461,7 @@ pub struct Binding { pub adapter: Option, pub guild_id: Option, pub workspace_id: Option, // Slack workspace (team) ID - pub chat_id: Option, + pub chat_id: Option, // Telegram group ID /// Channel IDs this binding applies to. If empty, all channels in the guild/workspace are allowed. pub channel_ids: Vec, /// Require explicit @mention (or reply-to-bot) for inbound messages. @@ -1659,7 +1659,7 @@ pub(super) struct AdapterValidationState { pub(super) fn is_named_adapter_platform(platform: &str) -> bool { matches!( platform, - "discord" | "slack" | "telegram" | "twitch" | "email" + "discord" | "slack" | "telegram" | "twitch" | "email" | "signal" ) } @@ -1822,6 +1822,26 @@ pub(super) fn build_adapter_validation_states( ); } + if let Some(signal) = &messaging.signal { + let named_instances = validate_instance_names( + "signal", + signal + .instances + .iter() + .map(|instance| instance.name.as_str()), + )?; + let default_present = + !signal.http_url.trim().is_empty() && !signal.account.trim().is_empty(); + validate_runtime_keys("signal", default_present, &named_instances)?; + states.insert( + "signal", + AdapterValidationState { + default_present, + named_instances, + }, + ); + } + Ok(states) } @@ -1942,6 +1962,7 @@ pub struct MessagingConfig { pub email: Option, pub webhook: Option, pub twitch: Option, + pub signal: Option, } #[derive(Clone)] @@ -2435,3 +2456,102 @@ pub struct WebhookConfig { pub bind: String, pub auth_token: Option, } + +/// Signal messaging via signal-cli JSON-RPC daemon. +/// +/// Connects to a running `signal-cli daemon --http` instance for sending and +/// receiving Signal messages. Supports both direct messages and group chats. +#[derive(Clone)] +pub struct SignalConfig { + pub enabled: bool, + /// Base URL of the signal-cli JSON-RPC HTTP daemon (e.g. `http://127.0.0.1:8686`). + /// May contain embedded credentials which are redacted in debug output. + pub http_url: String, + /// E.164 phone number of the bot's Signal account (e.g. `+1234567890`). + pub account: String, + /// Additional named Signal adapter instances. + pub instances: Vec, + /// Phone numbers or UUIDs allowed to DM the bot. If empty, DMs are ignored. + pub dm_allowed_users: Vec, + /// Group IDs allowed for this adapter. If empty, all groups are blocked + /// (same as `None` in the permission filter — groups are opt-in only). + pub group_ids: Vec, + /// User IDs allowed to message in Signal groups. + pub group_allowed_users: Vec, + /// Whether to silently drop story messages (default: true). + pub ignore_stories: bool, +} + +/// Per-instance config for a named Signal adapter. +#[derive(Clone)] +pub struct SignalInstanceConfig { + pub name: String, + pub enabled: bool, + /// Base URL of this instance's signal-cli daemon. + pub http_url: String, + /// E.164 phone number for this instance's Signal account. + pub account: String, + /// Phone numbers or UUIDs allowed to DM this instance. + pub dm_allowed_users: Vec, + /// Group IDs allowed for this instance. + pub group_ids: Vec, + /// User IDs allowed to message in Signal groups for this instance. + pub group_allowed_users: Vec, + /// Whether this instance drops story messages. + pub ignore_stories: bool, +} + +impl std::fmt::Debug for SignalInstanceConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SignalInstanceConfig") + .field("name", &self.name) + .field("enabled", &self.enabled) + .field("http_url", &"[REDACTED]") + .field("account", &"[REDACTED]") + .field("dm_allowed_users", &"[REDACTED]") + .field("group_ids", &self.group_ids) + .field("group_allowed_users", &"[REDACTED]") + .field("ignore_stories", &self.ignore_stories) + .finish() + } +} + +impl std::fmt::Debug for SignalConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SignalConfig") + .field("enabled", &self.enabled) + .field("http_url", &"[REDACTED]") + .field("account", &"[REDACTED]") + .field("instances", &self.instances) + .field("dm_allowed_users", &"[REDACTED]") + .field("group_ids", &self.group_ids) + .field("group_allowed_users", &"[REDACTED]") + .field("ignore_stories", &self.ignore_stories) + .finish() + } +} + +impl SystemSecrets for SignalConfig { + fn section() -> &'static str { + "signal" + } + + fn is_messaging_adapter() -> bool { + true + } + + fn secret_fields() -> &'static [SecretField] { + &[ + SecretField { + toml_key: "http_url", + secret_name: "SIGNAL_HTTP_URL", + instance_pattern: None, + }, + SecretField { + toml_key: "account", + secret_name: "SIGNAL_ACCOUNT", + instance_pattern: None, + }, + ] + } +} diff --git a/src/config/watcher.rs b/src/config/watcher.rs index e39ec1533..8daa4dbdd 100644 --- a/src/config/watcher.rs +++ b/src/config/watcher.rs @@ -2,8 +2,8 @@ use std::path::PathBuf; use std::sync::Arc; use super::{ - Binding, Config, DiscordPermissions, RuntimeConfig, SlackPermissions, TelegramPermissions, - TwitchPermissions, binding_runtime_adapter_key, + Binding, Config, DiscordPermissions, RuntimeConfig, SignalPermissions, SlackPermissions, + TelegramPermissions, TwitchPermissions, binding_runtime_adapter_key, }; /// Per-agent context needed by the file watcher: (id, prompt_dir, identity_dir, @@ -31,6 +31,7 @@ pub fn spawn_file_watcher( slack_permissions: Option>>, telegram_permissions: Option>>, twitch_permissions: Option>>, + signal_permissions: Option>>, bindings: Arc>>, messaging_manager: Option>, llm_manager: Arc, @@ -240,6 +241,14 @@ pub fn spawn_file_watcher( tracing::info!("twitch permissions reloaded"); } + if let Some(ref perms) = signal_permissions + && let Some(signal_config) = &config.messaging.signal + { + let new_perms = SignalPermissions::from_config(signal_config); + perms.store(Arc::new(new_perms)); + tracing::info!("signal permissions reloaded"); + } + // Hot-start adapters that are newly enabled in the config if let Some(ref manager) = messaging_manager { let rt = tokio::runtime::Handle::current(); @@ -249,6 +258,7 @@ pub fn spawn_file_watcher( let slack_permissions = slack_permissions.clone(); let telegram_permissions = telegram_permissions.clone(); let twitch_permissions = twitch_permissions.clone(); + let signal_permissions = signal_permissions.clone(); let instance_dir = instance_dir.clone(); rt.spawn(async move { @@ -531,6 +541,62 @@ pub fn spawn_file_watcher( } } } + + // Signal: start default + named instances that are enabled and not already running. + if let Some(signal_config) = &config.messaging.signal + && signal_config.enabled { + if !signal_config.http_url.is_empty() + && !signal_config.account.is_empty() + && !manager.has_adapter("signal").await + { + let permissions = match signal_permissions { + Some(ref existing) => existing.clone(), + None => { + let permissions = SignalPermissions::from_config(signal_config); + Arc::new(arc_swap::ArcSwap::from_pointee(permissions)) + } + }; + let tmp_dir = instance_dir.join("tmp"); + let adapter = crate::messaging::signal::SignalAdapter::new( + "signal", + &signal_config.http_url, + &signal_config.account, + signal_config.ignore_stories, + permissions, + tmp_dir, + ); + if let Err(error) = manager.register_and_start(adapter).await { + tracing::error!(%error, "failed to hot-start signal adapter from config change"); + } + } + + for instance in signal_config.instances.iter().filter(|instance| instance.enabled) { + let runtime_key = binding_runtime_adapter_key( + "signal", + Some(instance.name.as_str()), + ); + if manager.has_adapter(runtime_key.as_str()).await { + // TODO: named instance permissions not hot-updated (see discord block comment) + continue; + } + + let permissions = Arc::new(arc_swap::ArcSwap::from_pointee( + SignalPermissions::from_instance_config(instance), + )); + let tmp_dir = instance_dir.join("tmp"); + let adapter = crate::messaging::signal::SignalAdapter::new( + runtime_key, + &instance.http_url, + &instance.account, + instance.ignore_stories, + permissions, + tmp_dir, + ); + if let Err(error) = manager.register_and_start(adapter).await { + tracing::error!(%error, adapter = %instance.name, "failed to hot-start named signal adapter from config change"); + } + } + } }); } } diff --git a/src/main.rs b/src/main.rs index 1c09da7d7..776673a0f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1587,6 +1587,7 @@ async fn run( let mut slack_permissions = None; let mut telegram_permissions = None; let mut twitch_permissions = None; + let mut signal_permissions = None; initialize_agents( &config, &llm_manager, @@ -1604,6 +1605,7 @@ async fn run( &mut slack_permissions, &mut telegram_permissions, &mut twitch_permissions, + &mut signal_permissions, agent_links.clone(), agent_humans.clone(), injection_tx.clone(), @@ -1622,6 +1624,7 @@ async fn run( slack_permissions, telegram_permissions, twitch_permissions, + signal_permissions, bindings.clone(), Some(messaging_manager.clone()), llm_manager.clone(), @@ -1638,6 +1641,7 @@ async fn run( None, None, None, + None, bindings.clone(), None, llm_manager.clone(), @@ -2316,6 +2320,7 @@ async fn run( let mut new_slack_permissions = None; let mut new_telegram_permissions = None; let mut new_twitch_permissions = None; + let mut new_signal_permissions = None; match initialize_agents( &new_config, &new_llm_manager, @@ -2333,6 +2338,7 @@ async fn run( &mut new_slack_permissions, &mut new_telegram_permissions, &mut new_twitch_permissions, + &mut new_signal_permissions, agent_links.clone(), agent_humans.clone(), injection_tx.clone(), @@ -2350,6 +2356,7 @@ async fn run( new_slack_permissions, new_telegram_permissions, new_twitch_permissions, + new_signal_permissions, bindings.clone(), Some(messaging_manager.clone()), new_llm_manager.clone(), @@ -2472,6 +2479,7 @@ async fn initialize_agents( slack_permissions: &mut Option>>, telegram_permissions: &mut Option>>, twitch_permissions: &mut Option>>, + signal_permissions: &mut Option>>, agent_links: Arc>>, agent_humans: Arc>>, injection_tx: tokio::sync::mpsc::Sender, @@ -3172,6 +3180,58 @@ async fn initialize_agents( } } + // Shared Signal permissions (hot-reloadable via file watcher) + *signal_permissions = config.messaging.signal.as_ref().map(|signal_config| { + let perms = spacebot::config::SignalPermissions::from_config(signal_config); + Arc::new(ArcSwap::from_pointee(perms)) + }); + + if let Some(signal_config) = &config.messaging.signal + && signal_config.enabled + { + let tmp_dir = config.instance_dir.join("tmp"); + if !signal_config.http_url.is_empty() && !signal_config.account.is_empty() { + let adapter = spacebot::messaging::signal::SignalAdapter::new( + "signal", + &signal_config.http_url, + &signal_config.account, + signal_config.ignore_stories, + signal_permissions.clone().ok_or_else(|| { + anyhow::anyhow!("signal permissions not initialized when signal is enabled") + })?, + tmp_dir.clone(), + ); + new_messaging_manager.register(adapter).await; + } + + for instance in signal_config + .instances + .iter() + .filter(|instance| instance.enabled) + { + if instance.http_url.is_empty() || instance.account.is_empty() { + tracing::warn!(adapter = %instance.name, "skipping enabled signal instance with missing credentials"); + continue; + } + let runtime_key = spacebot::config::binding_runtime_adapter_key( + "signal", + Some(instance.name.as_str()), + ); + let perms = Arc::new(ArcSwap::from_pointee( + spacebot::config::SignalPermissions::from_instance_config(instance), + )); + let adapter = spacebot::messaging::signal::SignalAdapter::new( + runtime_key, + &instance.http_url, + &instance.account, + instance.ignore_stories, + perms, + tmp_dir.clone(), + ); + new_messaging_manager.register(adapter).await; + } + } + let webchat_agent_pools = agents .iter() .map(|(agent_id, agent)| (agent_id.to_string(), agent.db.sqlite.clone())) diff --git a/src/messaging.rs b/src/messaging.rs index f57d5d68a..e85ccf063 100644 --- a/src/messaging.rs +++ b/src/messaging.rs @@ -1,8 +1,9 @@ -//! Messaging adapters (Discord, Slack, Telegram, Twitch, Email, Webhook, WebChat). +//! Messaging adapters (Discord, Slack, Telegram, Twitch, Signal, Email, Webhook, WebChat). pub mod discord; pub mod email; pub mod manager; +pub mod signal; pub mod slack; pub mod target; pub mod telegram; diff --git a/src/messaging/signal.rs b/src/messaging/signal.rs new file mode 100644 index 000000000..b9f16a0a6 --- /dev/null +++ b/src/messaging/signal.rs @@ -0,0 +1,2057 @@ +//! Signal messaging adapter using signal-cli JSON-RPC daemon. +//! +//! Connects to a running `signal-cli daemon --http` instance for sending and +//! receiving Signal messages via its JSON-RPC and Server-Sent Events (SSE) APIs. +//! +//! ## Architecture +//! +//! - **Inbound:** SSE stream from `{http_url}/api/v1/events?account={account}` with +//! automatic reconnection and exponential backoff (2s → 60s). +//! - **Outbound:** JSON-RPC `send` calls to `{http_url}/api/v1/rpc`. DM recipients +//! must be passed as a JSON **array** (signal-cli requirement). Group messages use +//! `groupId` instead of `recipient`. +//! - **Typing:** JSON-RPC `sendTyping` method with repeating task (Signal indicators +//! expire after ~5s). +//! - **Attachments outbound:** Written to `{tmp_dir}/` as temp files, file paths passed +//! in the `attachments` JSON array, cleaned up after send. +//! - **Attachments inbound:** signal-cli provides file paths on disk; currently treated +//! as opaque (message text falls back to `[Attachment]` for attachment-only messages). +//! - **Streaming:** Not supported (Signal can't edit sent messages). `StreamStart`, +//! `StreamChunk`, and `StreamEnd` are no-ops. +//! +//! ## Limitations +//! +//! - **Threading:** Signal has no native thread concept. `ThreadReply` responses are +//! sent as regular messages without thread context. +//! - **Rich formatting:** Signal has no rich formatting (bold, italic, etc.). +//! `RichMessage` is sent as plain text. +//! - **Reactions:** Signal reactions are supported via JSON-RPC but require complex +//! target identification (author UUID + timestamp). Not currently implemented. +//! - **Ephemeral messages:** Not supported; sent as regular messages. +//! - **Scheduled messages:** Not supported; sent immediately. + +use crate::config::SignalPermissions; +use crate::messaging::traits::{ + InboundStream, Messaging, apply_runtime_adapter_to_conversation_id, +}; +use crate::{InboundMessage, MessageContent, OutboundResponse, StatusUpdate, metadata_keys}; + +use anyhow::Context as _; +use arc_swap::ArcSwap; +use futures::StreamExt; +use serde::Deserialize; +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::{RwLock, mpsc}; +use tokio::task::JoinHandle; +use uuid::Uuid; + +// ── constants ─────────────────────────────────────────────────── + +/// Maximum SSE line buffer size before reset (prevents OOM from runaway streams). +const MAX_SSE_BUFFER_SIZE: usize = 1024 * 1024; // 1 MiB + +/// Maximum single SSE event size (prevents huge JSON payloads). +const MAX_SSE_EVENT_SIZE: usize = 256 * 1024; // 256 KiB + +/// Maximum JSON-RPC response body size. +const MAX_RPC_RESPONSE_SIZE: usize = 1024 * 1024; // 1 MiB + +/// Prefix used to encode group targets in the reply target string. +const GROUP_TARGET_PREFIX: &str = "group:"; + +/// Signal-cli health check endpoint. +const HEALTH_ENDPOINT: &str = "/api/v1/check"; + +/// JSON-RPC endpoint path. +const RPC_ENDPOINT: &str = "/api/v1/rpc"; + +/// SSE events endpoint path. +const SSE_ENDPOINT: &str = "/api/v1/events"; + +/// Typing indicator repeat interval. Signal indicators expire after ~5 seconds, +/// so we resend every 4 seconds. +const TYPING_REPEAT_INTERVAL: Duration = Duration::from_secs(4); + +/// HTTP connect timeout for the reqwest client. +const HTTP_CONNECT_TIMEOUT: Duration = Duration::from_secs(10); + +/// Per-request timeout for JSON-RPC calls. +const RPC_REQUEST_TIMEOUT: Duration = Duration::from_secs(30); + +/// Initial SSE reconnection delay. +const SSE_INITIAL_BACKOFF: Duration = Duration::from_secs(2); + +/// Maximum SSE reconnection delay. +const SSE_MAX_BACKOFF: Duration = Duration::from_secs(60); + +// ── PII redaction helpers ─────────────────────────────────────── + +/// Redact a Signal identifier (phone number or UUID) for logging. +/// Keeps first 4 and last 4 characters, masks the middle with "****". +fn redact_identifier(id: &str) -> String { + if id.len() <= 8 { + return "[redacted]".to_string(); + } + let prefix = &id[..4.min(id.len())]; + let suffix = &id[id.len().saturating_sub(4)..]; + format!("{}****{}", prefix, suffix) +} + +/// Redact a URL for logging, keeping only the scheme and host (no userinfo). +/// Parses the URL to properly handle user:pass@host patterns. +fn redact_url(url: &str) -> String { + // Find the scheme separator + let Some(scheme_end) = url.find("://") else { + // Not a proper URL, return as-is truncated + return url.chars().take(50).collect(); + }; + + let scheme = &url[..scheme_end]; + let after_scheme = &url[scheme_end + 3..]; + + // Skip userinfo if present (user:pass@host) + let host_start = after_scheme.find('@').map(|i| i + 1).unwrap_or(0); + let host_port = &after_scheme[host_start..]; + + // Extract just the host (and port if present), stopping at first path segment + let host_end = host_port.find('/').unwrap_or(host_port.len()); + let host = &host_port[..host_end]; + + if host.is_empty() { + url.chars().take(50).collect() + } else { + format!("{}://{}", scheme, host) + } +} + +// ── signal-cli SSE event JSON shapes ──────────────────────────── + +#[derive(Debug, Deserialize)] +struct SseEnvelope { + #[serde(default)] + envelope: Option, +} + +#[derive(Debug, Deserialize)] +struct Envelope { + /// Source identifier (UUID for privacy-enabled users). + #[serde(default)] + source: Option, + /// E.164 phone number of the sender (preferred identifier). + #[serde(rename = "sourceNumber", default)] + source_number: Option, + /// Display name of the sender. + #[serde(rename = "sourceName", default)] + source_name: Option, + /// UUID of the sender. + #[serde(rename = "sourceUuid", default)] + source_uuid: Option, + #[serde(rename = "dataMessage", default)] + data_message: Option, + /// Present when the envelope is a story message (dropped when `ignore_stories` is true). + #[serde(rename = "storyMessage", default)] + story_message: Option, + #[serde(default)] + timestamp: Option, +} + +#[derive(Debug, Deserialize)] +struct DataMessage { + #[serde(default)] + message: Option, + #[serde(default)] + timestamp: Option, + #[serde(rename = "groupInfo", default)] + group_info: Option, + /// Inbound attachments as opaque JSON. signal-cli provides file paths on disk + /// in each attachment object (e.g. `contentType`, `filename`, `file`). + #[serde(default)] + attachments: Option>, +} + +#[derive(Debug, Deserialize)] +struct GroupInfo { + #[serde(rename = "groupId", default)] + group_id: Option, +} + +// ── recipient routing ─────────────────────────────────────────── + +/// Classification for outbound message routing. +#[derive(Debug, Clone)] +enum RecipientTarget { + /// Direct message to a phone number or UUID. + Direct(String), + /// Group message by group ID. + Group(String), +} + +// ── adapter ───────────────────────────────────────────────────── + +/// Signal messaging adapter. +/// +/// Connects to a signal-cli JSON-RPC daemon for sending/receiving messages. +/// Implements the [`Messaging`] trait for integration with [`MessagingManager`]. +pub struct SignalAdapter { + /// Runtime key used for registration and routing (e.g. `"signal"` or `"signal:personal"`). + runtime_key: String, + /// Base URL of the signal-cli HTTP daemon (e.g. `http://127.0.0.1:8686`). + http_url: String, + /// E.164 phone number of the bot's Signal account. + account: String, + /// Whether to silently drop story messages. + ignore_stories: bool, + /// Hot-reloadable permissions (DM allowlist + group filter). + permissions: Arc>, + /// HTTP client for JSON-RPC and SSE connections. + client: reqwest::Client, + /// Directory for temporary outbound attachment files. + tmp_dir: PathBuf, + /// Repeating typing indicator tasks per conversation_id. + typing_tasks: Arc>>>, + /// Shutdown signal for the SSE listener loop. + shutdown_tx: Arc>>>, +} + +impl SignalAdapter { + /// Create a new Signal adapter. + /// + /// # Arguments + /// - `runtime_key` — Adapter name for registration (e.g. `"signal"` or `"signal:work"`). + /// - `http_url` — Base URL of the signal-cli daemon (e.g. `http://127.0.0.1:8686`). + /// - `account` — E.164 phone number of the bot's Signal account. + /// - `ignore_stories` — Whether to silently drop story messages. + /// - `permissions` — Hot-reloadable permission filters. + /// - `tmp_dir` — Directory for temporary outbound attachment files. + pub fn new( + runtime_key: impl Into, + http_url: impl Into, + account: impl Into, + ignore_stories: bool, + permissions: Arc>, + tmp_dir: PathBuf, + ) -> Self { + let client = reqwest::Client::builder() + .connect_timeout(HTTP_CONNECT_TIMEOUT) + .build() + .unwrap_or_else(|error| { + tracing::warn!( + %error, + "failed to build reqwest client for signal adapter; falling back to default client" + ); + reqwest::Client::new() + }); + + Self { + runtime_key: runtime_key.into(), + http_url: http_url.into(), + account: account.into(), + ignore_stories, + permissions, + client, + tmp_dir, + typing_tasks: Arc::new(RwLock::new(HashMap::new())), + shutdown_tx: Arc::new(RwLock::new(None)), + } + } + + // ── JSON-RPC ──────────────────────────────────────────────── + + /// Send a JSON-RPC request to the signal-cli daemon. + /// + /// Returns the `result` field from the response, or `None` if the response + /// has no body (e.g. 201 for typing indicators). + async fn rpc_request( + &self, + method: &str, + params: serde_json::Value, + ) -> anyhow::Result> { + let url = format!("{}{RPC_ENDPOINT}", self.http_url); + let request_id = Uuid::new_v4().to_string(); + + let body = serde_json::json!({ + "jsonrpc": "2.0", + "method": method, + "params": params, + "id": request_id, + }); + + let response = self + .client + .post(&url) + .timeout(RPC_REQUEST_TIMEOUT) + .header("Content-Type", "application/json") + .json(&body) + .send() + .await + .with_context(|| format!("signal RPC request to {method} failed"))?; + + // 201 = success with no body (e.g. typing indicators). + if response.status().as_u16() == 201 { + return Ok(None); + } + + // Reject oversized responses before buffering. + if let Some(length) = response.content_length() + && length as usize > MAX_RPC_RESPONSE_SIZE + { + anyhow::bail!( + "signal RPC response too large: {length} bytes (max {MAX_RPC_RESPONSE_SIZE})" + ); + } + + let status = response.status(); + + // Stream the response body with size guard. + let mut stream = response.bytes_stream(); + let mut response_body = Vec::new(); + let mut total_bytes = 0usize; + + while let Some(chunk) = stream.next().await { + let chunk = chunk.context("failed to read signal RPC response chunk")?; + total_bytes += chunk.len(); + if total_bytes > MAX_RPC_RESPONSE_SIZE { + anyhow::bail!( + "signal RPC response exceeded {MAX_RPC_RESPONSE_SIZE} bytes while streaming" + ); + } + response_body.extend_from_slice(&chunk); + } + + if !status.is_success() { + let truncated_length = std::cmp::min(response_body.len(), 512); + let truncated_body = String::from_utf8_lossy(&response_body[..truncated_length]); + anyhow::bail!("signal RPC HTTP {}: {truncated_body}", status.as_u16()); + } + + if response_body.is_empty() { + return Ok(None); + } + + let parsed: serde_json::Value = + serde_json::from_slice(&response_body).context("invalid signal RPC response JSON")?; + + if let Some(error) = parsed.get("error") { + let code = error.get("code").and_then(|c| c.as_i64()).unwrap_or(-1); + let message = error + .get("message") + .and_then(|m| m.as_str()) + .unwrap_or("unknown"); + anyhow::bail!("signal RPC error {code}: {message}"); + } + + Ok(parsed.get("result").cloned()) + } + + /// Build JSON-RPC params for a send or typing call. + /// + /// **Critical:** DM `recipient` must be a JSON **array** (signal-cli requirement). + /// Group messages use `groupId` instead (mutually exclusive with `recipient`). + fn build_rpc_params( + &self, + target: &RecipientTarget, + message: Option<&str>, + attachments: Option<&[String]>, + ) -> serde_json::Value { + let mut params = match target { + RecipientTarget::Direct(identifier) => { + serde_json::json!({ + "recipient": [identifier], + "account": &self.account, + }) + } + RecipientTarget::Group(group_id) => { + serde_json::json!({ + "groupId": group_id, + "account": &self.account, + }) + } + }; + + if let Some(text) = message { + params["message"] = serde_json::Value::String(text.to_string()); + } + + if let Some(paths) = attachments + && !paths.is_empty() + { + params["attachments"] = serde_json::Value::Array( + paths + .iter() + .map(|path| serde_json::Value::String(path.clone())) + .collect(), + ); + } + + params + } + + // ── outbound helpers ──────────────────────────────────────── + + /// Resolve the outbound target for a conversation from message metadata. + fn resolve_target(&self, message: &InboundMessage) -> Option { + // Target is set during inbound processing via signal_target metadata. + if let Some(target_string) = message + .metadata + .get("signal_target") + .and_then(|value| value.as_str()) + { + return Some(parse_recipient_target(target_string)); + } + None + } + + /// Send a text message to the resolved target. + async fn send_text(&self, target: &RecipientTarget, text: &str) -> anyhow::Result<()> { + let params = self.build_rpc_params(target, Some(text), None); + self.rpc_request("send", params).await?; + Ok(()) + } + + /// Send a file attachment by writing the data to a temp file, sending it via + /// the JSON-RPC `attachments` field, then cleaning up the temp file. + async fn send_file( + &self, + target: &RecipientTarget, + filename: &str, + data: &[u8], + caption: Option<&str>, + ) -> anyhow::Result<()> { + // Ensure tmp dir exists. + tokio::fs::create_dir_all(&self.tmp_dir) + .await + .with_context(|| { + format!( + "failed to create signal tmp dir: {}", + self.tmp_dir.display() + ) + })?; + + // Write temp file with a unique name to avoid collisions. + // Sanitize filename to prevent path traversal. + let safe_filename = std::path::Path::new(filename) + .file_name() + .and_then(|name| name.to_str()) + .filter(|name| !name.is_empty()) + .unwrap_or("attachment.bin"); + let unique_name = format!("{}_{}", Uuid::new_v4(), safe_filename); + let tmp_path = self.tmp_dir.join(&unique_name); + tokio::fs::write(&tmp_path, data).await.with_context(|| { + format!( + "failed to write signal attachment to {}", + tmp_path.display() + ) + })?; + + let attachment_path = tmp_path + .to_str() + .context("signal tmp path is not valid UTF-8")? + .to_string(); + + let result = { + let params = self.build_rpc_params(target, caption, Some(&[attachment_path])); + self.rpc_request("send", params).await + }; + + // Clean up temp file regardless of send result. + if let Err(error) = tokio::fs::remove_file(&tmp_path).await { + tracing::debug!( + path = %tmp_path.display(), + %error, + "failed to clean up signal attachment temp file" + ); + } + + result?; + Ok(()) + } + + /// Cancel the repeating typing indicator for a conversation. + async fn stop_typing(&self, conversation_id: &str) { + if let Some(handle) = self.typing_tasks.write().await.remove(conversation_id) { + handle.abort(); + } + } + + // ── SSE envelope processing ───────────────────────────────── + + /// Process a single SSE envelope into an `InboundMessage` if it passes + /// permission filters and contains valid content. + /// + /// Returns `(InboundMessage, reply_target_string)` on success. + fn process_envelope(&self, envelope: &Envelope) -> Option<(InboundMessage, String)> { + // Drop story messages when configured. + if self.ignore_stories && envelope.story_message.is_some() { + tracing::debug!("signal: dropping story message"); + return None; + } + + let data_message = envelope.data_message.as_ref()?; + + let has_attachments = data_message + .attachments + .as_ref() + .is_some_and(|attachments| !attachments.is_empty()); + let has_text = data_message + .message + .as_ref() + .is_some_and(|text| !text.is_empty()); + + // Need either text or attachments to produce a message. + if !has_text && !has_attachments { + return None; + } + + // Use message text, or fall back to "[Attachment]" for attachment-only messages. + let text = data_message + .message + .as_deref() + .filter(|text| !text.is_empty()) + .map(String::from) + .or_else(|| { + if has_attachments { + Some("[Attachment]".to_string()) + } else { + None + } + })?; + + // Resolve sender: prefer sourceNumber (E.164), fall back to source (UUID), then source_uuid. + let sender = envelope + .source_number + .as_deref() + .or(envelope.source.as_deref()) + .or(envelope.source_uuid.as_deref()) + .map(String::from)?; + + // Determine if this is a group message. + let group_id = data_message + .group_info + .as_ref() + .and_then(|group| group.group_id.as_deref()); + let is_group = group_id.is_some(); + + // ── permission checks ─────────────────────────────────── + let permissions = self.permissions.load(); + + if is_group { + // Group filter: None/empty = block all groups, ["*"] = allow all, specific list = check + match &permissions.group_filter { + None => { + let redacted_group = group_id.map(redact_identifier); + tracing::info!( + group_id = ?redacted_group, + "signal: group message blocked (no groups configured)" + ); + return None; + } + Some(allowed_groups) => { + if allowed_groups.is_empty() { + let redacted_group = group_id.map(redact_identifier); + tracing::info!( + group_id = ?redacted_group, + "signal: group message blocked (empty group filter)" + ); + return None; + } + if allowed_groups.iter().any(|g| g == "*") { + // Wildcard: allow all groups + } else if let Some(gid) = group_id + && !allowed_groups.iter().any(|allowed| allowed == gid) + { + tracing::info!( + group_id = %redact_identifier(gid), + "signal: group message rejected (not in group filter)" + ); + return None; + } + } + }; + // Check sender is allowed for groups: + // Both dm_allowed_users AND group_allowed_users apply to groups (merged) + // ["*"] in either = allow all, [] = block all, specific list = check + let all_group_users: Vec<&String> = permissions + .dm_allowed_users + .iter() + .chain(permissions.group_allowed_users.iter()) + .collect(); + + let sender_allowed = if all_group_users.is_empty() { + false // Empty = block all + } else if all_group_users.iter().any(|u| u.as_str() == "*") { + true // Wildcard = allow all + } else { + all_group_users.iter().any(|allowed| { + allowed.as_str() == sender + || envelope + .source_uuid + .as_deref() + .is_some_and(|uuid| allowed.as_str() == uuid) + }) + }; + if !sender_allowed { + tracing::info!( + sender = %redact_identifier(&sender), + "signal: group message rejected (sender not in allowed users)" + ); + return None; + } + } else { + // DM filter: ["*"] = allow all, [] = block all, specific list = check + let sender_allowed = if permissions.dm_allowed_users.is_empty() { + false // Empty = block all + } else if permissions + .dm_allowed_users + .iter() + .any(|u| u.as_str() == "*") + { + true // Wildcard = allow all + } else { + permissions.dm_allowed_users.iter().any(|allowed| { + // Match against phone number, UUID, or source field. + allowed.as_str() == sender + || envelope + .source_uuid + .as_deref() + .is_some_and(|uuid| allowed.as_str() == uuid) + }) + }; + if !sender_allowed { + tracing::info!( + sender = %redact_identifier(&sender), + "signal: DM rejected (sender not in dm_allowed_users)" + ); + return None; + } + } + + // Log authorized message + let sender_display = envelope + .source_uuid + .as_deref() + .or(envelope.source_number.as_deref()) + .unwrap_or(&sender); + tracing::info!( + sender = %redact_identifier(sender_display), + text_len = %text.len(), + is_group = is_group, + "signal: message received" + ); + + // ── build reply target ────────────────────────────────── + + let reply_target = if let Some(gid) = group_id { + format!("{GROUP_TARGET_PREFIX}{gid}") + } else { + sender.clone() + }; + + // ── build conversation ID ─────────────────────────────── + + let base_conversation_id = if let Some(gid) = group_id { + format!("signal:group:{gid}") + } else { + // Use UUID if available (stable), fall back to phone number. + // Canonicalize UUID-backed targets to "signal:uuid:{uuid}" format + if let Some(uuid) = envelope.source_uuid.as_deref() { + format!("signal:uuid:{uuid}") + } else { + format!("signal:{sender}") + } + }; + let conversation_id = + apply_runtime_adapter_to_conversation_id(&self.runtime_key, base_conversation_id); + + // ── build timestamp ───────────────────────────────────── + + let timestamp_millis = data_message + .timestamp + .or(envelope.timestamp) + .unwrap_or_else(|| chrono::Utc::now().timestamp_millis().max(0) as u64); + + let timestamp = chrono::DateTime::from_timestamp_millis(timestamp_millis as i64) + .unwrap_or_else(chrono::Utc::now); + + // ── build metadata ────────────────────────────────────── + + let (metadata, formatted_author) = build_metadata(envelope, &reply_target, group_id); + + // ── build sender_id ───────────────────────────────────── + + let sender_id = envelope + .source_uuid + .as_deref() + .unwrap_or(&sender) + .to_string(); + + // ── build message ID ──────────────────────────────────── + + let message_id = format!("{timestamp_millis}_{sender_id}"); + + // ── assemble InboundMessage ───────────────────────────── + + let inbound = InboundMessage { + id: message_id, + source: "signal".into(), + adapter: Some(self.runtime_key.clone()), + conversation_id, + sender_id, + agent_id: None, + content: MessageContent::Text(text), + timestamp, + metadata, + formatted_author, + }; + + Some((inbound, reply_target)) + } +} + +// ── Messaging trait implementation ────────────────────────────── + +impl Messaging for SignalAdapter { + fn name(&self) -> &str { + &self.runtime_key + } + + async fn start(&self) -> crate::Result { + let (inbound_tx, inbound_rx) = mpsc::channel(256); + let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); + + *self.shutdown_tx.write().await = Some(shutdown_tx); + + // Verify connectivity before spawning the listener. + self.health_check().await?; + + tracing::info!( + adapter = %self.runtime_key, + account = %redact_identifier(&self.account), + "signal adapter connected" + ); + + // Clone fields for the spawned SSE listener task. + let client = self.client.clone(); + let http_url = self.http_url.clone(); + let account = self.account.clone(); + let runtime_key = self.runtime_key.clone(); + let ignore_stories = self.ignore_stories; + let permissions = self.permissions.clone(); + let tmp_dir = self.tmp_dir.clone(); + + tokio::spawn(async move { + // Build a temporary adapter for envelope processing inside the task. + // We need access to process_envelope which reads permissions. + let adapter = SignalAdapter { + runtime_key, + http_url: http_url.clone(), + account: account.clone(), + ignore_stories, + permissions, + client: client.clone(), + tmp_dir, + typing_tasks: Arc::new(RwLock::new(HashMap::new())), + shutdown_tx: Arc::new(RwLock::new(None)), + }; + + sse_listener(adapter, client, http_url, account, inbound_tx, shutdown_rx).await; + }); + + let stream = tokio_stream::wrappers::ReceiverStream::new(inbound_rx); + Ok(Box::pin(stream)) + } + + async fn respond( + &self, + message: &InboundMessage, + response: OutboundResponse, + ) -> crate::Result<()> { + let target = self.resolve_target(message).with_context(|| { + format!( + "cannot resolve signal reply target for conversation {}", + message.conversation_id + ) + })?; + + match response { + OutboundResponse::Text(text) => { + self.stop_typing(&message.conversation_id).await; + self.send_text(&target, &text).await?; + } + OutboundResponse::RichMessage { text, .. } => { + // Signal has no rich formatting — send plain text. + self.stop_typing(&message.conversation_id).await; + self.send_text(&target, &text).await?; + } + OutboundResponse::ThreadReply { text, .. } => { + // Signal has no named threads — send as regular message. + self.stop_typing(&message.conversation_id).await; + self.send_text(&target, &text).await?; + } + OutboundResponse::File { + filename, + data, + caption, + .. + } => { + self.stop_typing(&message.conversation_id).await; + self.send_file(&target, &filename, &data, caption.as_deref()) + .await?; + } + OutboundResponse::Reaction(_) => { + // Signal supports reactions via JSON-RPC but the API shape is complex + // (requires target author UUID + timestamp). Skip for now. + tracing::debug!( + conversation_id = %message.conversation_id, + "signal: reactions not supported, dropping" + ); + } + OutboundResponse::RemoveReaction(_) => { + tracing::debug!( + conversation_id = %message.conversation_id, + "signal: remove reactions not supported, dropping" + ); + } + OutboundResponse::Ephemeral { text, .. } => { + // Signal has no ephemeral messages — send as regular text. + self.stop_typing(&message.conversation_id).await; + self.send_text(&target, &text).await?; + } + OutboundResponse::ScheduledMessage { text, .. } => { + // Signal has no scheduled messages — send immediately. + self.stop_typing(&message.conversation_id).await; + self.send_text(&target, &text).await?; + } + OutboundResponse::StreamStart + | OutboundResponse::StreamChunk(_) + | OutboundResponse::StreamEnd => { + // Signal can't edit sent messages — streaming is not supported. + // StreamStart/Chunk/End are no-ops. + } + OutboundResponse::Status(status) => { + self.send_status(message, status).await?; + } + } + + Ok(()) + } + + async fn send_status( + &self, + message: &InboundMessage, + status: StatusUpdate, + ) -> crate::Result<()> { + match status { + StatusUpdate::Thinking => { + let Some(target) = self.resolve_target(message) else { + return Ok(()); + }; + + // Abort any existing typing task before starting a new one. + self.stop_typing(&message.conversation_id).await; + + let client = self.client.clone(); + let http_url = self.http_url.clone(); + let account = self.account.clone(); + let conversation_id = message.conversation_id.clone(); + let rpc_url = format!("{http_url}{RPC_ENDPOINT}"); + + // Send typing indicator immediately, then repeat every 4 seconds. + let handle = tokio::spawn(async move { + loop { + let params = match &target { + RecipientTarget::Direct(identifier) => { + serde_json::json!({ + "recipient": [identifier], + "account": &account, + }) + } + RecipientTarget::Group(group_id) => { + serde_json::json!({ + "groupId": group_id, + "account": &account, + }) + } + }; + + let body = serde_json::json!({ + "jsonrpc": "2.0", + "method": "sendTyping", + "params": params, + "id": Uuid::new_v4().to_string(), + }); + + match client + .post(&rpc_url) + .timeout(RPC_REQUEST_TIMEOUT) + .header("Content-Type", "application/json") + .json(&body) + .send() + .await + { + Ok(response) => { + if !response.status().is_success() { + let status = response.status(); + // Log response body at debug level only (may contain sensitive data) + if let Ok(body_text) = response.text().await { + let truncated = if body_text.len() > 200 { + format!("{}...", &body_text[..200]) + } else { + body_text + }; + tracing::debug!(%status, body = %truncated, "signal typing indicator failed"); + } + tracing::warn!(%status, "signal typing indicator request failed"); + break; + } + } + Err(error) => { + tracing::debug!(%error, "failed to send signal typing indicator"); + break; + } + } + + tokio::time::sleep(TYPING_REPEAT_INTERVAL).await; + } + }); + + self.typing_tasks + .write() + .await + .insert(conversation_id, handle); + } + _ => { + self.stop_typing(&message.conversation_id).await; + } + } + + Ok(()) + } + + async fn broadcast( + &self, + target: &str, + response: crate::OutboundResponse, + ) -> crate::Result<()> { + // Parse target into RecipientTarget + // Format: uuid:xxx, group:xxx, or +xxx + let recipient = if let Some(uuid) = target.strip_prefix("uuid:") { + RecipientTarget::Direct(uuid.to_string()) + } else if let Some(group_id) = target.strip_prefix("group:") { + RecipientTarget::Group(group_id.to_string()) + } else if target.starts_with('+') { + RecipientTarget::Direct(target.to_string()) + } else { + return Err(crate::Error::Other(anyhow::anyhow!( + "invalid signal broadcast target format: {target}" + ))); + }; + + match response { + crate::OutboundResponse::Text(text) => { + self.send_text(&recipient, &text).await?; + } + crate::OutboundResponse::RichMessage { text, .. } => { + // Signal has no rich formatting — send plain text + self.send_text(&recipient, &text).await?; + } + crate::OutboundResponse::File { + filename, + data, + caption, + .. + } => { + self.send_file(&recipient, &filename, &data, caption.as_deref()) + .await?; + } + crate::OutboundResponse::ThreadReply { text, .. } => { + // Signal has no threads — send as regular message + self.send_text(&recipient, &text).await?; + } + crate::OutboundResponse::Ephemeral { text, .. } => { + // Signal has no ephemeral messages — send as regular text + self.send_text(&recipient, &text).await?; + } + crate::OutboundResponse::ScheduledMessage { text, .. } => { + // Signal has no scheduled messages — send immediately + self.send_text(&recipient, &text).await?; + } + _ => { + // Other response types are not supported for broadcast + tracing::debug!( + target = %redact_identifier(target), + "signal: unsupported broadcast response type, dropping" + ); + } + } + + Ok(()) + } + + async fn health_check(&self) -> crate::Result<()> { + let url = format!("{}{HEALTH_ENDPOINT}", self.http_url); + tracing::debug!(url = %redact_url(&url), "Signal health check: GET"); + let response = self + .client + .get(&url) + .timeout(Duration::from_secs(10)) + .send() + .await + .context("signal health check failed: connection error")?; + + tracing::debug!(status = %response.status(), "Signal health check response"); + + if response.status().is_success() { + Ok(()) + } else { + Err(anyhow::anyhow!( + "signal health check returned HTTP {}", + response.status() + ))? + } + } + + async fn shutdown(&self) -> crate::Result<()> { + // Cancel all typing indicator tasks. + let mut tasks = self.typing_tasks.write().await; + for (_, handle) in tasks.drain() { + handle.abort(); + } + + // Signal the SSE listener to stop. + if let Some(shutdown_tx) = self.shutdown_tx.write().await.take() { + shutdown_tx.try_send(()).ok(); + } + + tracing::info!(adapter = %self.runtime_key, "signal adapter shut down"); + Ok(()) + } +} + +// ── SSE listener ──────────────────────────────────────────────── + +/// Long-running SSE listener that reconnects with exponential backoff. +/// +/// Connects to `{http_url}/api/v1/events?account={account}`, parses SSE events, +/// processes envelopes through the adapter's permission filters, and sends +/// valid inbound messages through the channel. +async fn sse_listener( + adapter: SignalAdapter, + client: reqwest::Client, + http_url: String, + account: String, + inbound_tx: mpsc::Sender, + mut shutdown_rx: mpsc::Receiver<()>, +) { + let sse_url = match reqwest::Url::parse(&format!("{http_url}{SSE_ENDPOINT}")) { + Ok(mut url) => { + url.query_pairs_mut().append_pair("account", &account); + url + } + Err(error) => { + tracing::error!(%error, "signal: invalid SSE URL, listener exiting"); + return; + } + }; + + let mut retry_delay = SSE_INITIAL_BACKOFF; + + loop { + // Check for shutdown before each connection attempt. + if shutdown_rx.try_recv().is_ok() { + tracing::info!("signal SSE listener shutting down"); + return; + } + + let response = client + .get(sse_url.clone()) + .header("Accept", "text/event-stream") + .send() + .await; + + let response = match response { + Ok(response) if response.status().is_success() => response, + Ok(response) => { + tracing::warn!( + status = %response.status(), + "signal SSE returned error, retrying in {:?}", + retry_delay + ); + tokio::select! { + _ = tokio::time::sleep(retry_delay) => {} + _ = shutdown_rx.recv() => { + tracing::info!("signal SSE listener shutting down during retry"); + return; + } + } + retry_delay = (retry_delay * 2).min(SSE_MAX_BACKOFF); + continue; + } + Err(error) => { + tracing::warn!( + %error, + "signal SSE connect error, retrying in {:?}", + retry_delay + ); + tokio::select! { + _ = tokio::time::sleep(retry_delay) => {} + _ = shutdown_rx.recv() => { + tracing::info!("signal SSE listener shutting down during retry"); + return; + } + } + retry_delay = (retry_delay * 2).min(SSE_MAX_BACKOFF); + continue; + } + }; + + // Connection succeeded — reset backoff. + retry_delay = SSE_INITIAL_BACKOFF; + tracing::info!("signal SSE connected"); + + // ── stream processing loop ────────────────────────────── + + let mut bytes_stream = response.bytes_stream(); + let mut buffer = String::with_capacity(8192); + let mut current_data = String::with_capacity(4096); + // Holds trailing bytes from the previous chunk that form an incomplete + // multi-byte UTF-8 sequence. At most 3 bytes (the longest incomplete + // leading sequence for a 4-byte character). + let mut utf8_carry: Vec = Vec::with_capacity(4); + + 'stream: loop { + tokio::select! { + _ = shutdown_rx.recv() => { + tracing::info!("signal SSE listener shutting down"); + return; + } + chunk = bytes_stream.next() => { + let Some(chunk) = chunk else { + // Stream ended — break to reconnect. + break 'stream; + }; + let chunk = match chunk { + Ok(chunk) => chunk, + Err(error) => { + tracing::debug!(%error, "signal SSE chunk error, reconnecting"); + break 'stream; + } + }; + + // Prepend any leftover bytes from the previous chunk. + let decode_buf = if utf8_carry.is_empty() { + chunk.to_vec() + } else { + let mut combined = std::mem::take(&mut utf8_carry); + combined.extend_from_slice(&chunk); + combined + }; + + // Decode as much valid UTF-8 as possible, carrying over any + // incomplete trailing sequence to the next iteration. + let (valid_len, carry_start) = match std::str::from_utf8(&decode_buf) { + Ok(_) => (decode_buf.len(), decode_buf.len()), + Err(error) => { + let valid_up_to = error.valid_up_to(); + match error.error_len() { + Some(bad_len) => { + // Genuinely invalid byte sequence — skip the bad byte(s). + tracing::debug!( + offset = valid_up_to, + "signal SSE: invalid UTF-8 byte, skipping" + ); + (valid_up_to, valid_up_to + bad_len) + } + None => { + // Incomplete multi-byte sequence at the end — carry it over. + (valid_up_to, valid_up_to) + } + } + } + }; + + let text = match std::str::from_utf8(&decode_buf[..valid_len]) { + Ok(s) => s, + Err(_) => { + tracing::warn!( + "signal SSE: unexpected invalid UTF-8 at boundary, skipping chunk" + ); + continue; + } + }; + + // Buffer overflow protection. + if buffer.len() + text.len() > MAX_SSE_BUFFER_SIZE { + tracing::warn!( + buffer_len = buffer.len(), + text_len = text.len(), + "signal SSE buffer overflow, resetting" + ); + buffer.clear(); + utf8_carry.clear(); + current_data.clear(); + continue; + } + buffer.push_str(text); + + // Preserve any trailing incomplete bytes for the next chunk. + if carry_start < decode_buf.len() { + utf8_carry.extend_from_slice(&decode_buf[carry_start..]); + } + + // Parse complete lines from the buffer. + while let Some(newline_pos) = buffer.find('\n') { + let line = buffer[..newline_pos].trim_end_matches('\r').to_string(); + buffer.drain(..=newline_pos); + + // Skip SSE comments (keepalive pings). + if line.starts_with(':') { + continue; + } + + if line.is_empty() { + // Empty line = event boundary — dispatch accumulated data. + if !current_data.is_empty() { + process_sse_event(&adapter, ¤t_data, &inbound_tx).await; + current_data.clear(); + } + } else if let Some(data) = line.strip_prefix("data:") { + // Guard against oversized single events. + if current_data.len() + data.len() > MAX_SSE_EVENT_SIZE { + tracing::warn!("signal SSE event too large, dropping"); + current_data.clear(); + continue; + } + if !current_data.is_empty() { + current_data.push('\n'); + } + current_data.push_str(data.trim_start()); + } + // Ignore "event:", "id:", "retry:" lines. + } + } + } + } + + // Process any trailing data before reconnect. + if !current_data.is_empty() { + process_sse_event(&adapter, ¤t_data, &inbound_tx).await; + } + + tracing::debug!("signal SSE stream ended, reconnecting with backoff..."); + tokio::select! { + _ = tokio::time::sleep(retry_delay) => {} + _ = shutdown_rx.recv() => { + tracing::info!("signal SSE listener shutting down during reconnect"); + return; + } + } + retry_delay = (retry_delay * 2).min(SSE_MAX_BACKOFF); + } +} + +/// Parse and dispatch a single SSE event. +async fn process_sse_event( + adapter: &SignalAdapter, + data: &str, + inbound_tx: &mpsc::Sender, +) { + let sse: SseEnvelope = match serde_json::from_str(data) { + Ok(sse) => sse, + Err(error) => { + tracing::debug!(%error, "signal SSE: failed to parse event, skipping"); + return; + } + }; + + let Some(ref envelope) = sse.envelope else { + return; + }; + + let Some((inbound, _reply_target)) = adapter.process_envelope(envelope) else { + return; + }; + + if let Err(error) = inbound_tx.send(inbound).await { + tracing::warn!(%error, "signal: inbound channel closed (receiver dropped)"); + } +} + +// ── metadata builder ──────────────────────────────────────────── + +/// Build platform-specific metadata for a Signal message. +/// +/// Returns `(metadata_map, formatted_author)`. +fn build_metadata( + envelope: &Envelope, + reply_target: &str, + group_id: Option<&str>, +) -> (HashMap, Option) { + let mut metadata = HashMap::new(); + + // Sender identifiers. + // Try phone number first, then UUID, then source (legacy), then unknown + // This ensures UUID-only privacy-mode envelopes preserve sender identity + let sender = envelope + .source_number + .as_deref() + .or(envelope.source_uuid.as_deref()) + .or(envelope.source.as_deref()) + .unwrap_or("unknown"); + + metadata.insert( + "signal_source".into(), + serde_json::Value::String(sender.to_string()), + ); + + // Always include both UUID and phone number (null if not available) + metadata.insert( + "signal_source_number".into(), + envelope + .source_number + .clone() + .map(serde_json::Value::String) + .unwrap_or(serde_json::Value::Null), + ); + + metadata.insert( + "signal_source_uuid".into(), + envelope + .source_uuid + .clone() + .map(serde_json::Value::String) + .unwrap_or(serde_json::Value::Null), + ); + + // Sender context for agent visibility - always shows both e164 and uuid + let e164 = envelope + .source_number + .as_deref() + .map(|n| format!("e164:{}", n)) + .unwrap_or_else(|| "e164:none".to_string()); + + let uuid = envelope + .source_uuid + .as_deref() + .map(|u| format!("uuid:{}", u)) + .unwrap_or_else(|| "uuid:none".to_string()); + + let sender_context = format!("[Signal: {} {}]", e164, uuid); + metadata.insert( + "sender_context".into(), + serde_json::Value::String(sender_context), + ); + + if let Some(ref source_name) = envelope.source_name { + metadata.insert( + "signal_source_name".into(), + serde_json::Value::String(source_name.clone()), + ); + } + + // Reply target for outbound routing. + metadata.insert( + "signal_target".into(), + serde_json::Value::String(reply_target.to_string()), + ); + + // Timestamp. + let timestamp = envelope + .data_message + .as_ref() + .and_then(|dm| dm.timestamp) + .or(envelope.timestamp); + if let Some(ts) = timestamp { + metadata.insert( + "signal_timestamp".into(), + serde_json::Value::Number(ts.into()), + ); + } + + // Chat type. + let chat_type = if group_id.is_some() { "group" } else { "dm" }; + metadata.insert( + "signal_chat_type".into(), + serde_json::Value::String(chat_type.into()), + ); + + // Group ID. + if let Some(gid) = group_id { + metadata.insert( + "signal_group_id".into(), + serde_json::Value::String(gid.to_string()), + ); + } + + // Standard metadata keys. + metadata.insert( + metadata_keys::MESSAGE_ID.into(), + serde_json::Value::String(format!("{}", timestamp.unwrap_or(0))), + ); + + // Channel name: "Group Chat {id}" for groups, "Direct Message with {name}" for DMs. + let channel_name = if let Some(gid) = group_id { + format!("Group Chat {}", gid) + } else { + envelope + .source_name + .as_deref() + .filter(|name| !name.is_empty()) + .map(|name| format!("Direct Message with {}", name)) + .unwrap_or_else(|| "Direct Message".to_string()) + }; + metadata.insert( + metadata_keys::CHANNEL_NAME.into(), + serde_json::Value::String(channel_name), + ); + + // Sender display name. + let display_name = envelope + .source_name + .as_deref() + .filter(|name| !name.is_empty()) + .unwrap_or(sender); + + metadata.insert( + "sender_display_name".into(), + serde_json::Value::String(display_name.to_string()), + ); + + // Formatted author: "Display Name (+phone)" or just the display name. + let formatted_author = if let Some(ref source_number) = envelope.source_number + && display_name != source_number + { + Some(format!("{display_name} ({source_number})")) + } else { + Some(display_name.to_string()) + }; + + (metadata, formatted_author) +} + +// ── helper functions ──────────────────────────────────────────── + +/// Parse a reply target string into a `RecipientTarget`. +/// +/// Format: `"group:{group_id}"` for groups, otherwise treated as a direct +/// message identifier (phone number or UUID). +fn parse_recipient_target(target: &str) -> RecipientTarget { + if let Some(group_id) = target.strip_prefix(GROUP_TARGET_PREFIX) { + RecipientTarget::Group(group_id.to_string()) + } else { + RecipientTarget::Direct(target.to_string()) + } +} + +// ── tests ─────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_recipient_target_dm() { + let target = parse_recipient_target("+1234567890"); + assert!(matches!(target, RecipientTarget::Direct(ref id) if id == "+1234567890")); + } + + #[test] + fn parse_recipient_target_uuid() { + let target = parse_recipient_target("a1b2c3d4-e5f6-7890-abcd-ef1234567890"); + assert!(matches!( + target, + RecipientTarget::Direct(ref id) if id == "a1b2c3d4-e5f6-7890-abcd-ef1234567890" + )); + } + + #[test] + fn parse_recipient_target_group() { + let target = parse_recipient_target("group:abc123"); + assert!(matches!(target, RecipientTarget::Group(ref id) if id == "abc123")); + } + + #[test] + fn parse_sse_envelope_dm() { + let json = r#"{ + "envelope": { + "sourceNumber": "+1111111111", + "sourceName": "Alice", + "sourceUuid": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + "dataMessage": { + "message": "Hello", + "timestamp": 1700000000000 + } + } + }"#; + let sse: SseEnvelope = serde_json::from_str(json).unwrap(); + let envelope = sse.envelope.unwrap(); + assert_eq!(envelope.source_number.as_deref(), Some("+1111111111")); + assert_eq!(envelope.source_name.as_deref(), Some("Alice")); + let dm = envelope.data_message.unwrap(); + assert_eq!(dm.message.as_deref(), Some("Hello")); + assert_eq!(dm.timestamp, Some(1700000000000)); + assert!(dm.group_info.is_none()); + } + + #[test] + fn parse_sse_envelope_group() { + let json = r#"{ + "envelope": { + "sourceNumber": "+1111111111", + "dataMessage": { + "message": "Group hello", + "groupInfo": { + "groupId": "testgroup123" + } + } + } + }"#; + let sse: SseEnvelope = serde_json::from_str(json).unwrap(); + let dm = sse.envelope.unwrap().data_message.unwrap(); + assert_eq!( + dm.group_info.unwrap().group_id.as_deref(), + Some("testgroup123") + ); + } + + #[test] + fn parse_sse_envelope_story_message() { + let json = r#"{ + "envelope": { + "sourceNumber": "+1111111111", + "storyMessage": {"text": "story content"} + } + }"#; + let sse: SseEnvelope = serde_json::from_str(json).unwrap(); + assert!(sse.envelope.unwrap().story_message.is_some()); + } + + #[test] + fn parse_sse_envelope_with_attachments() { + let json = r#"{ + "envelope": { + "sourceNumber": "+1111111111", + "dataMessage": { + "message": "See attached", + "attachments": [ + {"contentType": "image/jpeg", "filename": "photo.jpg"}, + {"contentType": "application/pdf"} + ] + } + } + }"#; + let sse: SseEnvelope = serde_json::from_str(json).unwrap(); + let dm = sse.envelope.unwrap().data_message.unwrap(); + assert_eq!(dm.attachments.unwrap().len(), 2); + } + + #[test] + fn parse_sse_envelope_empty() { + let json = r#"{}"#; + let sse: SseEnvelope = serde_json::from_str(json).unwrap(); + assert!(sse.envelope.is_none()); + } + + #[test] + fn parse_sse_envelope_no_data_message() { + let json = r#"{"envelope": {"sourceNumber": "+1111111111"}}"#; + let sse: SseEnvelope = serde_json::from_str(json).unwrap(); + assert!(sse.envelope.unwrap().data_message.is_none()); + } + + #[test] + fn build_rpc_params_dm() { + let adapter = test_adapter(); + let target = RecipientTarget::Direct("+5555555555".to_string()); + let params = adapter.build_rpc_params(&target, Some("hello"), None); + + assert_eq!(params["recipient"], serde_json::json!(["+5555555555"])); + assert_eq!(params["account"], "+0000000000"); + assert_eq!(params["message"], "hello"); + assert!(params.get("groupId").is_none()); + } + + #[test] + fn build_rpc_params_group() { + let adapter = test_adapter(); + let target = RecipientTarget::Group("abc123".to_string()); + let params = adapter.build_rpc_params(&target, Some("hello"), None); + + assert_eq!(params["groupId"], "abc123"); + assert_eq!(params["account"], "+0000000000"); + assert!(params.get("recipient").is_none()); + } + + #[test] + fn build_rpc_params_with_attachments() { + let adapter = test_adapter(); + let target = RecipientTarget::Direct("+5555555555".to_string()); + let paths = vec!["/tmp/file.png".to_string()]; + let params = adapter.build_rpc_params(&target, Some("caption"), Some(&paths)); + + assert_eq!(params["attachments"], serde_json::json!(["/tmp/file.png"])); + assert_eq!(params["message"], "caption"); + } + + #[test] + fn build_rpc_params_no_message() { + let adapter = test_adapter(); + let target = RecipientTarget::Direct("+5555555555".to_string()); + let params = adapter.build_rpc_params(&target, None, None); + + assert!(params.get("message").is_none()); + assert!(params.get("attachments").is_none()); + } + + #[test] + fn build_metadata_dm() { + let envelope = Envelope { + source: None, + source_number: Some("+1234567890".into()), + source_name: Some("Alice".into()), + source_uuid: Some("aaaa-bbbb".into()), + data_message: Some(DataMessage { + message: Some("hi".into()), + timestamp: Some(1700000000000), + group_info: None, + attachments: None, + }), + story_message: None, + timestamp: None, + }; + + let (metadata, author) = build_metadata(&envelope, "+1234567890", None); + + assert_eq!( + metadata.get("signal_source").unwrap().as_str().unwrap(), + "+1234567890" + ); + assert_eq!( + metadata + .get("signal_source_uuid") + .unwrap() + .as_str() + .unwrap(), + "aaaa-bbbb" + ); + assert_eq!( + metadata.get("signal_chat_type").unwrap().as_str().unwrap(), + "dm" + ); + assert!(!metadata.contains_key("signal_group_id")); + assert_eq!(author.unwrap(), "Alice (+1234567890)"); + assert_eq!( + metadata.get("sender_context").unwrap().as_str().unwrap(), + "[Signal: e164:+1234567890 uuid:aaaa-bbbb]" + ); + } + + #[test] + fn build_metadata_group() { + let envelope = Envelope { + source: None, + source_number: Some("+1234567890".into()), + source_name: Some("Bob".into()), + source_uuid: None, + data_message: Some(DataMessage { + message: Some("group msg".into()), + timestamp: Some(1700000000000), + group_info: Some(GroupInfo { + group_id: Some("grp123".into()), + }), + attachments: None, + }), + story_message: None, + timestamp: None, + }; + + let (metadata, _) = build_metadata(&envelope, "group:grp123", Some("grp123")); + + assert_eq!( + metadata.get("signal_chat_type").unwrap().as_str().unwrap(), + "group" + ); + assert_eq!( + metadata.get("signal_group_id").unwrap().as_str().unwrap(), + "grp123" + ); + assert_eq!( + metadata.get("sender_context").unwrap().as_str().unwrap(), + "[Signal: e164:+1234567890 uuid:none]" + ); + } + + #[test] + fn build_metadata_privacy_mode() { + let envelope = Envelope { + source: None, + source_number: None, + source_name: Some("Alice".into()), + source_uuid: Some("uuid-only-123".into()), + data_message: Some(DataMessage { + message: Some("hi from privacy mode".into()), + timestamp: Some(1700000000000), + group_info: None, + attachments: None, + }), + story_message: None, + timestamp: None, + }; + + let (metadata, _) = build_metadata(&envelope, "+1234567890", None); + + assert_eq!( + metadata.get("sender_context").unwrap().as_str().unwrap(), + "[Signal: e164:none uuid:uuid-only-123]" + ); + } + + #[test] + fn process_envelope_drops_story_when_configured() { + let adapter = test_adapter(); + let envelope = Envelope { + source: None, + source_number: Some("+1111111111".into()), + source_name: None, + source_uuid: None, + data_message: None, + story_message: Some(serde_json::json!({"text": "story"})), + timestamp: None, + }; + assert!(adapter.process_envelope(&envelope).is_none()); + } + + #[test] + fn process_envelope_drops_empty_message() { + let adapter = test_adapter(); + let envelope = Envelope { + source: None, + source_number: Some("+1111111111".into()), + source_name: None, + source_uuid: None, + data_message: Some(DataMessage { + message: None, + timestamp: None, + group_info: None, + attachments: None, + }), + story_message: None, + timestamp: None, + }; + assert!(adapter.process_envelope(&envelope).is_none()); + } + + #[test] + fn process_envelope_attachment_only_produces_placeholder() { + let adapter = test_adapter_open_dm(); + let envelope = Envelope { + source: None, + source_number: Some("+2222222222".into()), + source_name: None, + source_uuid: None, + data_message: Some(DataMessage { + message: None, + timestamp: Some(1700000000000), + group_info: None, + attachments: Some(vec![serde_json::json!({"contentType": "image/png"})]), + }), + story_message: None, + timestamp: None, + }; + let result = adapter.process_envelope(&envelope); + assert!(result.is_some()); + let (msg, _) = result.unwrap(); + if let MessageContent::Text(text) = &msg.content { + assert_eq!(text, "[Attachment]"); + } else { + panic!("expected Text content"); + } + } + + #[test] + fn process_envelope_dm_allowed() { + let adapter = test_adapter_with_dm_allowed(vec!["+2222222222".to_string()]); + let envelope = make_dm_envelope("+2222222222", "hello"); + let result = adapter.process_envelope(&envelope); + assert!(result.is_some()); + } + + #[test] + fn process_envelope_dm_rejected() { + let adapter = test_adapter_with_dm_allowed(vec!["+3333333333".to_string()]); + let envelope = make_dm_envelope("+2222222222", "hello"); + assert!(adapter.process_envelope(&envelope).is_none()); + } + + #[test] + fn process_envelope_dm_blocked_when_empty() { + // Empty dm_allowed_users = block all DMs + let adapter = test_adapter(); + let envelope = make_dm_envelope("+2222222222", "hello"); + assert!(adapter.process_envelope(&envelope).is_none()); + } + + #[test] + fn process_envelope_dm_allowed_wildcard() { + // ["*"] in dm_allowed_users = allow all DMs + let adapter = test_adapter_open_dm(); + let envelope = make_dm_envelope("+9999999999", "hello from anyone"); + let result = adapter.process_envelope(&envelope); + assert!(result.is_some()); + } + + #[test] + fn process_envelope_group_blocked_by_default() { + // Default group_filter is None = block all groups. + let adapter = test_adapter(); + let envelope = make_group_envelope("+1111111111", "hi group", "grp123"); + assert!(adapter.process_envelope(&envelope).is_none()); + } + + #[test] + fn process_envelope_group_blocked_when_empty() { + // Empty group_filter = block all groups + let perms = SignalPermissions { + group_filter: Some(vec![]), + dm_allowed_users: vec!["*".to_string()], + group_allowed_users: vec!["*".to_string()], + }; + let adapter = test_adapter_with_permissions(perms); + let envelope = make_group_envelope("+1111111111", "hi group", "grp123"); + assert!(adapter.process_envelope(&envelope).is_none()); + } + + #[test] + fn process_envelope_group_allowed_wildcard() { + // ["*"] in group_filter = allow all groups + let perms = SignalPermissions { + group_filter: Some(vec!["*".to_string()]), + dm_allowed_users: vec!["*".to_string()], + group_allowed_users: vec!["*".to_string()], + }; + let adapter = test_adapter_with_permissions(perms); + let envelope = make_group_envelope("+1111111111", "hi group", "any-group"); + let result = adapter.process_envelope(&envelope); + assert!(result.is_some()); + } + + #[test] + fn process_envelope_group_allowed_when_configured() { + let perms = SignalPermissions { + group_filter: Some(vec!["grp123".to_string()]), + dm_allowed_users: vec!["+1111111111".to_string()], + group_allowed_users: vec!["+1111111111".to_string()], + }; + let adapter = test_adapter_with_permissions(perms); + let envelope = make_group_envelope("+1111111111", "hi group", "grp123"); + let result = adapter.process_envelope(&envelope); + assert!(result.is_some()); + let (msg, target) = result.unwrap(); + assert_eq!(target, "group:grp123"); + assert!(msg.conversation_id.starts_with("signal:group:grp123")); + } + + #[test] + fn process_envelope_conversation_id_dm() { + let adapter = test_adapter_open_dm(); + let envelope = Envelope { + source: None, + source_number: Some("+1234567890".into()), + source_name: None, + source_uuid: Some("uuid-1234".into()), + data_message: Some(DataMessage { + message: Some("test".into()), + timestamp: Some(1700000000000), + group_info: None, + attachments: None, + }), + story_message: None, + timestamp: None, + }; + let (msg, _) = adapter.process_envelope(&envelope).unwrap(); + // Should use UUID when available, in canonical "signal:uuid:{uuid}" format. + assert_eq!(msg.conversation_id, "signal:uuid:uuid-1234"); + } + + // ── test helpers ──────────────────────────────────────────── + + fn test_adapter() -> SignalAdapter { + // Default: blocks all DMs and groups (empty lists) + test_adapter_with_permissions(SignalPermissions::default()) + } + + fn test_adapter_open_dm() -> SignalAdapter { + // Wildcard ["*"] means allow all DMs (but groups still blocked by default) + test_adapter_with_permissions(SignalPermissions { + group_filter: None, + dm_allowed_users: vec!["*".to_string()], + group_allowed_users: vec![], + }) + } + + fn test_adapter_with_dm_allowed(users: Vec) -> SignalAdapter { + test_adapter_with_permissions(SignalPermissions { + group_filter: None, + dm_allowed_users: users, + group_allowed_users: vec![], + }) + } + + fn test_adapter_with_permissions(perms: SignalPermissions) -> SignalAdapter { + SignalAdapter { + runtime_key: "signal".into(), + http_url: "http://127.0.0.1:8686".into(), + account: "+0000000000".into(), + ignore_stories: true, + permissions: Arc::new(ArcSwap::from_pointee(perms)), + client: reqwest::Client::new(), + tmp_dir: PathBuf::from("/tmp/spacebot-test"), + typing_tasks: Arc::new(RwLock::new(HashMap::new())), + shutdown_tx: Arc::new(RwLock::new(None)), + } + } + + fn make_dm_envelope(sender: &str, text: &str) -> Envelope { + Envelope { + source: None, + source_number: Some(sender.into()), + source_name: None, + source_uuid: None, + data_message: Some(DataMessage { + message: Some(text.into()), + timestamp: Some(1700000000000), + group_info: None, + attachments: None, + }), + story_message: None, + timestamp: None, + } + } + + fn make_group_envelope(sender: &str, text: &str, group_id: &str) -> Envelope { + Envelope { + source: None, + source_number: Some(sender.into()), + source_name: None, + source_uuid: None, + data_message: Some(DataMessage { + message: Some(text.into()), + timestamp: Some(1700000000000), + group_info: Some(GroupInfo { + group_id: Some(group_id.into()), + }), + attachments: None, + }), + story_message: None, + timestamp: None, + } + } + + #[test] + fn process_envelope_dm_allowed_by_uuid() { + // Test that UUID matching works when sender has no phone number + let perms = SignalPermissions { + group_filter: None, + dm_allowed_users: vec!["uuid-1234".to_string()], + group_allowed_users: vec![], + }; + let adapter = test_adapter_with_permissions(perms); + let envelope = Envelope { + source: None, + source_number: None, + source_name: None, + source_uuid: Some("uuid-1234".into()), + data_message: Some(DataMessage { + message: Some("hello".into()), + timestamp: Some(1700000000000), + group_info: None, + attachments: None, + }), + story_message: None, + timestamp: None, + }; + let result = adapter.process_envelope(&envelope); + assert!( + result.is_some(), + "DM should be allowed when sender UUID matches" + ); + } + + #[test] + fn process_envelope_group_message_rejected_when_sender_not_allowed() { + // Group message from sender not in group_allowed_users should be rejected + let perms = SignalPermissions { + group_filter: Some(vec!["grp123".to_string()]), + dm_allowed_users: vec!["+9999999999".to_string()], // Different from sender + group_allowed_users: vec!["+9999999999".to_string()], + }; + let adapter = test_adapter_with_permissions(perms); + let envelope = make_group_envelope("+1111111111", "hi group", "grp123"); + let result = adapter.process_envelope(&envelope); + assert!( + result.is_none(), + "Group message should be rejected when sender not in allowed users" + ); + } +} + +#[cfg(test)] +mod rpc_error_tests { + use super::*; + + #[test] + fn rpc_params_build_direct_message() { + let adapter = test_adapter_with_permissions(SignalPermissions::default()); + let params = adapter.build_rpc_params( + &RecipientTarget::Direct("+1234567890".to_string()), + Some("test message"), + None, + ); + // Verify recipient is an array (signal-cli requirement) + assert!(params.get("recipient").is_some()); + assert!(params["recipient"].is_array()); + } + + #[test] + fn rpc_params_build_group_message() { + let adapter = test_adapter_with_permissions(SignalPermissions::default()); + let params = adapter.build_rpc_params( + &RecipientTarget::Group("base64groupid".to_string()), + Some("group message"), + None, + ); + // Verify groupId is used instead of recipient + assert!(params.get("groupId").is_some()); + assert!(params.get("recipient").is_none()); + } + + #[test] + fn rpc_params_with_attachments() { + let adapter = test_adapter_with_permissions(SignalPermissions::default()); + let attachments = vec!["/tmp/file1.jpg".to_string(), "/tmp/file2.png".to_string()]; + let params = adapter.build_rpc_params( + &RecipientTarget::Direct("+1234567890".to_string()), + Some("check this out"), + Some(&attachments), + ); + assert!(params.get("attachments").is_some()); + let attachments_arr = params["attachments"].as_array().unwrap(); + assert_eq!(attachments_arr.len(), 2); + } + + fn test_adapter_with_permissions(perms: SignalPermissions) -> SignalAdapter { + SignalAdapter { + runtime_key: "signal".into(), + http_url: "http://127.0.0.1:8686".into(), + account: "+0000000000".into(), + ignore_stories: true, + permissions: Arc::new(ArcSwap::from_pointee(perms)), + client: reqwest::Client::new(), + tmp_dir: PathBuf::from("/tmp/spacebot-test"), + typing_tasks: Arc::new(RwLock::new(HashMap::new())), + shutdown_tx: Arc::new(RwLock::new(None)), + } + } +} diff --git a/src/messaging/target.rs b/src/messaging/target.rs index 51e7f721a..17676afab 100644 --- a/src/messaging/target.rs +++ b/src/messaging/target.rs @@ -101,6 +101,28 @@ pub fn resolve_broadcast_target(channel: &ChannelInfo) -> Option { + // Signal channels store target in signal_target metadata + if let Some(signal_target) = channel + .platform_meta + .as_ref() + .and_then(|meta| meta.get("signal_target")) + .and_then(json_value_to_string) + { + // signal_target is already normalized (e.g., "uuid:xxxx", "group:xxxx", "+123...") + // Determine adapter from channel.id: named if format is "signal:{name}:..." + let adapter = extract_signal_adapter_from_channel_id(&channel.id); + let target = normalize_signal_target(&signal_target)?; + return Some(BroadcastTarget { adapter, target }); + } + + // Fallback: parse from conversation ID + // Format: signal:{target} or signal:{instance}:{target} + // where {target} is uuid:xxx, group:xxx, or +xxx + let parts: Vec<&str> = channel.id.split(':').collect(); + // Skip "signal" prefix and use shared parser for the rest + return parse_signal_target_parts(parts.get(1..).unwrap_or(&[])); + } "email" => { let reply_to = channel .platform_meta @@ -129,7 +151,7 @@ pub fn resolve_broadcast_target(channel: &ChannelInfo) -> Option Option { +pub fn normalize_target(adapter: &str, raw_target: &str) -> Option { let trimmed = raw_target.trim(); if trimmed.is_empty() { return None; @@ -143,6 +165,7 @@ fn normalize_target(adapter: &str, raw_target: &str) -> Option { "email" => normalize_email_target(trimmed), // Webchat targets are full conversation IDs (e.g. "portal:chat:main") "webchat" => Some(trimmed.to_string()), + "signal" => normalize_signal_target(trimmed), _ => Some(trimmed.to_string()), } } @@ -235,6 +258,55 @@ fn normalize_email_target(raw_target: &str) -> Option { None } +fn normalize_signal_target(raw_target: &str) -> Option { + let target = strip_repeated_prefix(raw_target, "signal"); + + // Handle uuid:xxxx-xxxx format + if let Some(uuid) = target.strip_prefix("uuid:") { + if !uuid.is_empty() { + return Some(format!("uuid:{uuid}")); + } + return None; + } + + // Handle group:grp123 format + if let Some(group_id) = target.strip_prefix("group:") { + if !group_id.is_empty() { + return Some(format!("group:{group_id}")); + } + return None; + } + + // Handle e164:+123 or bare +123 format + if let Some(phone) = target.strip_prefix("e164:") { + let phone = phone.trim_start_matches('+'); + if !phone.is_empty() && phone.len() >= 7 && phone.chars().all(|c| c.is_ascii_digit()) { + return Some(format!("+{phone}")); + } + return None; + } + + // Bare +123 format + if let Some(phone) = target.strip_prefix('+') { + if !phone.is_empty() && phone.len() >= 7 && phone.chars().all(|c| c.is_ascii_digit()) { + return Some(target.to_string()); + } + return None; + } + + // Check if it's a valid UUID (contains dashes and alphanumeric) + if target.contains('-') && target.len() > 8 && target.chars().any(|c| c.is_ascii_digit()) { + return Some(format!("uuid:{target}")); + } + + // Check if it's a bare phone number (7+ digits required for E.164) + if target.chars().all(|c| c.is_ascii_digit()) && target.len() >= 7 { + return Some(format!("+{target}")); + } + + None +} + fn strip_repeated_prefix<'a>(raw_target: &'a str, adapter: &str) -> &'a str { let mut target = raw_target; let prefix = format!("{adapter}:"); @@ -257,6 +329,100 @@ fn json_value_to_string(value: &serde_json::Value) -> Option { None } +/// Extract the Signal adapter name from a channel ID. +/// +/// Channel ID formats: +/// - "signal:{target}" -> default adapter "signal" +/// - "signal:{instance}:{target}" -> named adapter "signal:{instance}" +/// +/// Where {target} is uuid:xxx, group:xxx, or +xxx (starts with valid target prefix) +fn extract_signal_adapter_from_channel_id(channel_id: &str) -> String { + let parts: Vec<&str> = channel_id.split(':').collect(); + match parts.as_slice() { + // Named adapter: signal:{instance}:uuid:{uuid}, signal:{instance}:group:{id} + // or signal:{instance}:e164:+{phone} + ["signal", instance, "uuid", ..] + | ["signal", instance, "group", ..] + | ["signal", instance, "e164", ..] => { + format!("signal:{instance}") + } + // Named adapter: signal:{instance}:+{phone} + ["signal", instance, phone, ..] if phone.starts_with('+') => { + format!("signal:{instance}") + } + // Default adapter: signal:{target} + _ => "signal".to_string(), + } +} + +/// Parse Signal target components into BroadcastTarget. +/// +/// Handles formats: +/// - Default adapter: ["uuid", xxx], ["group", xxx], ["e164", +xxx], ["+xxx"] +/// - Named adapter: [instance, "uuid", xxx], [instance, "group", xxx], [instance, "e164", +xxx], [instance, "+xxx"] +/// +/// Returns None for invalid formats. +pub fn parse_signal_target_parts(parts: &[&str]) -> Option { + match parts { + // Default adapter: signal:uuid:xxx, signal:group:xxx, signal:e164:+xxx, signal:+xxx + ["uuid", uuid] => Some(BroadcastTarget { + adapter: "signal".to_string(), + target: format!("uuid:{uuid}"), + }), + ["group", group_id] => Some(BroadcastTarget { + adapter: "signal".to_string(), + target: format!("group:{group_id}"), + }), + // Use normalize_signal_target for phone/e164 to ensure consistent parsing + ["e164", phone] => { + normalize_signal_target(&format!("e164:{phone}")).map(|target| BroadcastTarget { + adapter: "signal".to_string(), + target, + }) + } + [phone] if phone.starts_with('+') => { + normalize_signal_target(phone).map(|target| BroadcastTarget { + adapter: "signal".to_string(), + target, + }) + } + // Single-part targets: delegate to normalize_signal_target for bare UUIDs/phones + [single] => normalize_signal_target(single).map(|target| BroadcastTarget { + adapter: "signal".to_string(), + target, + }), + // Named adapter: signal:instance:uuid:xxx, signal:instance:group:xxx + [instance, "uuid", uuid] => Some(BroadcastTarget { + adapter: format!("signal:{instance}"), + target: format!("uuid:{uuid}"), + }), + [instance, "group", group_id] => Some(BroadcastTarget { + adapter: format!("signal:{instance}"), + target: format!("group:{group_id}"), + }), + // Named adapter: signal:instance:e164:+xxx - use normalize_signal_target + [instance, "e164", phone] => { + normalize_signal_target(&format!("e164:{phone}")).map(|target| BroadcastTarget { + adapter: format!("signal:{instance}"), + target, + }) + } + // Named adapter: signal:instance:+xxx - use normalize_signal_target + [instance, phone] if phone.starts_with('+') => { + normalize_signal_target(phone).map(|target| BroadcastTarget { + adapter: format!("signal:{instance}"), + target, + }) + } + // Named adapter with single-part target: delegate to normalize_signal_target + [instance, single] => normalize_signal_target(single).map(|target| BroadcastTarget { + adapter: format!("signal:{instance}"), + target, + }), + _ => None, + } +} + #[cfg(test)] mod tests { use super::{parse_delivery_target, resolve_broadcast_target}; @@ -366,4 +532,175 @@ mod tests { }) ); } + + // Signal tests + #[test] + fn parse_signal_uuid_with_prefix() { + let parsed = parse_delivery_target("signal:uuid:abc-123-def"); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal".to_string(), + target: "uuid:abc-123-def".to_string(), + }) + ); + } + + #[test] + fn parse_signal_group_with_prefix() { + let parsed = parse_delivery_target("signal:group:grp123"); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal".to_string(), + target: "group:grp123".to_string(), + }) + ); + } + + #[test] + fn parse_signal_phone_with_prefix() { + let parsed = parse_delivery_target("signal:+1234567890"); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal".to_string(), + target: "+1234567890".to_string(), + }) + ); + } + + #[test] + fn parse_signal_phone_e164_format() { + let parsed = parse_delivery_target("signal:e164:+1234567890"); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal".to_string(), + target: "+1234567890".to_string(), + }) + ); + } + + #[test] + fn parse_signal_phone_e164_no_plus() { + let parsed = parse_delivery_target("signal:e164:1234567890"); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal".to_string(), + target: "+1234567890".to_string(), + }) + ); + } + + // Tests for parse_signal_target_parts + #[test] + fn parse_signal_target_parts_uuid_default() { + let parsed = + super::parse_signal_target_parts(&["uuid", "550e8400-e29b-41d4-a716-446655440000"]); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal".to_string(), + target: "uuid:550e8400-e29b-41d4-a716-446655440000".to_string(), + }) + ); + } + + #[test] + fn parse_signal_target_parts_group_default() { + let parsed = super::parse_signal_target_parts(&["group", "grp123"]); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal".to_string(), + target: "group:grp123".to_string(), + }) + ); + } + + #[test] + fn parse_signal_target_parts_phone_default() { + let parsed = super::parse_signal_target_parts(&["+1234567890"]); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal".to_string(), + target: "+1234567890".to_string(), + }) + ); + } + + #[test] + fn parse_signal_target_parts_e164_default() { + let parsed = super::parse_signal_target_parts(&["e164", "+1234567890"]); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal".to_string(), + target: "+1234567890".to_string(), + }) + ); + } + + #[test] + fn parse_signal_target_parts_uuid_named() { + let parsed = super::parse_signal_target_parts(&[ + "gvoice1", + "uuid", + "550e8400-e29b-41d4-a716-446655440000", + ]); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal:gvoice1".to_string(), + target: "uuid:550e8400-e29b-41d4-a716-446655440000".to_string(), + }) + ); + } + + #[test] + fn parse_signal_target_parts_group_named() { + let parsed = super::parse_signal_target_parts(&["gvoice1", "group", "grp123"]); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal:gvoice1".to_string(), + target: "group:grp123".to_string(), + }) + ); + } + + #[test] + fn parse_signal_target_parts_phone_named() { + let parsed = super::parse_signal_target_parts(&["gvoice1", "+1234567890"]); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal:gvoice1".to_string(), + target: "+1234567890".to_string(), + }) + ); + } + + #[test] + fn parse_signal_target_parts_e164_named() { + let parsed = super::parse_signal_target_parts(&["gvoice1", "e164", "+1234567890"]); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal:gvoice1".to_string(), + target: "+1234567890".to_string(), + }) + ); + } + + #[test] + fn parse_signal_target_parts_invalid() { + assert!(super::parse_signal_target_parts(&[]).is_none()); + assert!(super::parse_signal_target_parts(&["unknown"]).is_none()); + assert!(super::parse_signal_target_parts(&["uuid"]).is_none()); // missing UUID value + assert!(super::parse_signal_target_parts(&["gvoice1", "unknown"]).is_none()); + } } diff --git a/src/prompts/engine.rs b/src/prompts/engine.rs index 510bcb6c2..16969accb 100644 --- a/src/prompts/engine.rs +++ b/src/prompts/engine.rs @@ -76,6 +76,10 @@ impl PromptEngine { crate::prompts::text::get("adapters/email"), )?; env.add_template("adapters/cron", crate::prompts::text::get("adapters/cron"))?; + env.add_template( + "adapters/signal", + crate::prompts::text::get("adapters/signal"), + )?; // Fragment templates env.add_template( @@ -471,6 +475,7 @@ impl PromptEngine { let template_name = match adapter { "email" => "adapters/email", "cron" => "adapters/cron", + "signal" => "adapters/signal", _ => return None, }; diff --git a/src/prompts/text.rs b/src/prompts/text.rs index 88fe86df8..717e4c4a4 100644 --- a/src/prompts/text.rs +++ b/src/prompts/text.rs @@ -69,6 +69,7 @@ fn lookup(lang: &str, key: &str) -> &'static str { // Adapter-specific prompt fragments ("en", "adapters/email") => include_str!("../../prompts/en/adapters/email.md.j2"), ("en", "adapters/cron") => include_str!("../../prompts/en/adapters/cron.md.j2"), + ("en", "adapters/signal") => include_str!("../../prompts/en/adapters/signal.md.j2"), // Fragment Templates ("en", "fragments/worker_capabilities") => { diff --git a/src/secrets/store.rs b/src/secrets/store.rs index 0a89e0961..3c4bcc19a 100644 --- a/src/secrets/store.rs +++ b/src/secrets/store.rs @@ -1101,8 +1101,8 @@ pub trait SystemSecrets { /// here. pub fn system_secret_registry() -> Vec<&'static SecretField> { use crate::config::{ - DefaultsConfig, DiscordConfig, EmailConfig, LlmConfig, SlackConfig, TelegramConfig, - TwitchConfig, + DefaultsConfig, DiscordConfig, EmailConfig, LlmConfig, SignalConfig, SlackConfig, + TelegramConfig, TwitchConfig, }; let mut fields = Vec::new(); @@ -1116,6 +1116,7 @@ pub fn system_secret_registry() -> Vec<&'static SecretField> { fields.extend(TelegramConfig::secret_fields()); fields.extend(TwitchConfig::secret_fields()); fields.extend(EmailConfig::secret_fields()); + fields.extend(SignalConfig::secret_fields()); fields } diff --git a/src/tools.rs b/src/tools.rs index 4ed3bd7f5..a688d46f8 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -341,6 +341,7 @@ pub async fn add_channel_tools( cron_tool: Option, send_agent_message_tool: Option, allow_direct_reply: bool, + current_adapter: Option, ) -> Result<(), rig::tool::server::ToolServerError> { let conversation_id = conversation_id.into(); @@ -378,6 +379,7 @@ pub async fn add_channel_tools( state.channel_store.clone(), state.conversation_logger.clone(), send_message_display_name, + current_adapter.clone(), )) .await?; } diff --git a/src/tools/send_message_to_another_channel.rs b/src/tools/send_message_to_another_channel.rs index 2523ca5e0..abf72e239 100644 --- a/src/tools/send_message_to_another_channel.rs +++ b/src/tools/send_message_to_another_channel.rs @@ -11,6 +11,12 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::sync::Arc; +/// Check if a string is a valid UUID format. +/// Accepts standard UUID format: 8-4-4-4-12 hexadecimal digits. +fn is_valid_uuid(s: &str) -> bool { + uuid::Uuid::parse_str(s).is_ok() +} + /// Tool for sending messages to other channels or DMs. /// /// Resolves targets by name or ID via the channel store, extracts the @@ -23,6 +29,7 @@ pub struct SendMessageTool { channel_store: ChannelStore, conversation_logger: ConversationLogger, agent_display_name: String, + current_adapter: Option, } impl std::fmt::Debug for SendMessageTool { @@ -37,12 +44,14 @@ impl SendMessageTool { channel_store: ChannelStore, conversation_logger: ConversationLogger, agent_display_name: String, + current_adapter: Option, ) -> Self { Self { messaging_manager, channel_store, conversation_logger, agent_display_name, + current_adapter, } } } @@ -79,6 +88,12 @@ impl Tool for SendMessageTool { async fn definition(&self, _prompt: String) -> ToolDefinition { let email_adapter_available = self.messaging_manager.has_adapter("email").await; + // Check if current adapter is Signal (e.g., "signal:gvoice1" starts with "signal") + let signal_adapter_available = self + .current_adapter + .as_ref() + .map(|adapter| adapter.starts_with("signal")) + .unwrap_or(false); let mut description = crate::prompts::text::get("tools/send_message_to_another_channel").to_string(); @@ -93,6 +108,15 @@ impl Tool for SendMessageTool { ); } + if signal_adapter_available { + description.push_str( + " Signal messaging is enabled: you can target `signal:uuid:{uuid}`, `signal:group:{group_id}`, or `signal:+{phone}`.", + ); + target_description.push_str( + " With Signal enabled, explicit targets are also allowed: `signal:uuid:{uuid}`, `signal:group:{group_id}`, `signal:+{phone}`", + ); + } + ToolDefinition { name: Self::NAME.to_string(), description, @@ -120,6 +144,79 @@ impl Tool for SendMessageTool { "send_message_to_another_channel tool called" ); + // Check for explicit signal: prefix first - always honored regardless of current adapter. + // This allows users to explicitly target Signal even when in Discord/Telegram/etc. + if let Some(mut target) = parse_explicit_signal_prefix(&args.target) { + // If explicit prefix returned default "signal" adapter but we're in a named + // Signal adapter conversation (e.g., signal:gvoice1), use the current adapter + // to ensure the message goes through the correct account. + if target.adapter == "signal" { + if let Some(current_adapter) = self + .current_adapter + .as_ref() + .filter(|adapter| adapter.starts_with("signal:")) + { + target.adapter = current_adapter.clone(); + } + } + + self.messaging_manager + .broadcast( + &target.adapter, + &target.target, + crate::OutboundResponse::Text(args.message), + ) + .await + .map_err(|error| SendMessageError(format!("failed to send message: {error}")))?; + + tracing::info!( + adapter = %target.adapter, + broadcast_target = %"[REDACTED]", + "message sent via explicit signal: prefix" + ); + + return Ok(SendMessageOutput { + success: true, + target: target.target, + platform: target.adapter, + }); + } + + // Check for implicit Signal shorthands, but only when in a Signal conversation. + // This prevents bare UUIDs, group:..., or +phone from being hijacked as Signal targets + // when the user is actually in a Discord/Telegram/etc conversation. + if let Some(current_adapter) = self + .current_adapter + .as_ref() + .filter(|adapter| adapter.starts_with("signal")) + { + if let Some(target) = parse_implicit_signal_shorthand(&args.target, current_adapter) { + self.messaging_manager + .broadcast( + &target.adapter, + &target.target, + crate::OutboundResponse::Text(args.message), + ) + .await + .map_err(|error| { + SendMessageError(format!("failed to send message: {error}")) + })?; + + tracing::info!( + adapter = %target.adapter, + broadcast_target = %"[REDACTED]", + "message sent via implicit Signal shorthand" + ); + + return Ok(SendMessageOutput { + success: true, + target: target.target, + platform: target.adapter, + }); + } + } + + // Check for explicit email target if let Some(explicit_target) = parse_explicit_email_target(&args.target) { self.messaging_manager .broadcast( @@ -144,17 +241,25 @@ impl Tool for SendMessageTool { }); } - let channel = self + // Try to find channel by name first + let channel_result = self .channel_store .find_by_name(&args.target) .await - .map_err(|error| SendMessageError(format!("failed to search channels: {error}")))? - .ok_or_else(|| { - SendMessageError(format!( - "no channel found matching '{}'. Use a channel name/ID from the available channels list or an explicit email target like email:alice@example.com.", + .map_err(|error| SendMessageError(format!("failed to search channels: {error}")))?; + + // If channel not found, return error. + // Signal targets are handled earlier (explicit signal: prefix always, + // implicit shorthands only in Signal conversations). + let channel = match channel_result { + Some(ch) => ch, + None => { + return Err(SendMessageError(format!( + "no channel found matching '{}'. Use a channel name/ID from the available channels list, an explicit email target like email:alice@example.com, or signal: prefix for Signal targets.", args.target - )) - })?; + ))); + } + }; let broadcast_target = crate::messaging::target::resolve_broadcast_target(&channel) .ok_or_else(|| { @@ -199,6 +304,77 @@ impl Tool for SendMessageTool { } } +/// Parse explicit signal: prefix - always honored regardless of current adapter. +/// Returns BroadcastTarget with adapter="signal" for targets like: +/// - signal:uuid:xxx +/// - signal:group:xxx +/// - signal:+1234567890 +/// - signal:e164:... +fn parse_explicit_signal_prefix(raw: &str) -> Option { + let trimmed = raw.trim(); + if trimmed.is_empty() { + return None; + } + + // Only handle explicit signal: prefix + if let Some(rest) = trimmed.strip_prefix("signal:") { + let parts: Vec<&str> = rest.split(':').collect(); + // Use shared parser for Signal target components + if let Some(target) = crate::messaging::target::parse_signal_target_parts(&parts) { + return Some(target); + } + // Fallback: try parsing the full signal:... string + return crate::messaging::target::parse_delivery_target(trimmed); + } + + None +} + +/// Parse implicit Signal shorthands - only in Signal conversations. +/// Handles bare UUIDs, group:xxx, and +phone without explicit signal: prefix. +/// Returns BroadcastTarget directly instead of building strings to avoid +/// parse_delivery_target() issues with colons in named adapters. +fn parse_implicit_signal_shorthand( + raw: &str, + current_adapter: &str, +) -> Option { + let trimmed = raw.trim(); + if trimmed.is_empty() { + return None; + } + + use crate::messaging::target::BroadcastTarget; + + // Check for bare UUID format (strict validation) + if is_valid_uuid(trimmed) { + return Some(BroadcastTarget { + adapter: current_adapter.to_string(), + target: format!("uuid:{trimmed}"), + }); + } + + // Phone number format: starts with + followed by 7+ digits + if trimmed.starts_with('+') + && trimmed[1..].len() >= 7 + && trimmed[1..].chars().all(|c| c.is_ascii_digit()) + { + return Some(BroadcastTarget { + adapter: current_adapter.to_string(), + target: trimmed.to_string(), + }); + } + + // Group ID format: group:xxx + if trimmed.starts_with("group:") { + return Some(BroadcastTarget { + adapter: current_adapter.to_string(), + target: trimmed.to_string(), + }); + } + + None +} + fn parse_explicit_email_target(raw: &str) -> Option { let trimmed = raw.trim(); if trimmed.is_empty() { @@ -218,7 +394,9 @@ fn parse_explicit_email_target(raw: &str) -> Option