diff --git a/prompts/en/adapters/signal.md.j2 b/prompts/en/adapters/signal.md.j2 new file mode 100644 index 000000000..4fe6ae238 --- /dev/null +++ b/prompts/en/adapters/signal.md.j2 @@ -0,0 +1,10 @@ +## Signal Adapter Guidance + +You are in a Signal conversation. Signal supports cross-channel messaging — you can send messages to other Signal users and groups using the `send_message_to_another_channel` tool. + +**Supported Signal targets:** +- `signal:uuid:{uuid}` — Send to a Signal user by their UUID +- `signal:group:{group_id}` — Send to a Signal group +- `signal:+{phone}` or `signal:e164:+{phone}` — Send to a Signal user by phone number + +Use this when the user asks you to message someone else on Signal or post to a Signal group. \ No newline at end of file diff --git a/prompts/en/tools/send_message_description.md.j2 b/prompts/en/tools/send_message_description.md.j2 index 69d427adb..8ace81438 100644 --- a/prompts/en/tools/send_message_description.md.j2 +++ b/prompts/en/tools/send_message_description.md.j2 @@ -1 +1,6 @@ Send a message to a DIFFERENT channel than the one you are currently in. Use this for cross-channel delivery — reminders, notifications, or when the user asks you to post something in another channel or DM them. Do NOT use this to reply in the current conversation — use the `reply` tool for that. Target channels by name or ID from the available channels in your context. + +For Signal messaging, you can use these explicit targets: +- `signal:uuid:{uuid}` - Send to a Signal user by their UUID +- `signal:group:{group_id}` - Send to a Signal group +- `signal:+{phone}` or `signal:e164:+{phone}` — Send to a Signal user by phone number diff --git a/src/agent/channel.rs b/src/agent/channel.rs index 4daddff19..89680b7a1 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -823,7 +823,7 @@ impl Channel { } let supported_source = matches!( message.source.as_str(), - "telegram" | "discord" | "slack" | "twitch" + "telegram" | "discord" | "slack" | "twitch" | "signal" ); if !supported_source { return Ok(false); @@ -1202,11 +1202,13 @@ impl Channel { self.conversation_id = Some(first.conversation_id.clone()); } + // Track source adapter from the first non-system message + // Prefer message.adapter (full adapter string like "signal:work") over message.source if self.source_adapter.is_none() && let Some(first) = messages.first() && first.source != "system" { - self.source_adapter = Some(first.source.clone()); + self.source_adapter = first.adapter.clone().or_else(|| Some(first.source.clone())); } // Capture conversation context from the first message @@ -1408,6 +1410,13 @@ impl Channel { .build_system_prompt_with_coalesce(message_count, elapsed_secs, unique_sender_count) .await?; + // Extract adapter from messages (prefer explicit message.adapter, fall back to stored source_adapter) + // This preserves per-message adapter for Signal named instances (e.g., "signal:work") + let batch_adapter = messages + .iter() + .find_map(|m| m.adapter.as_deref()) + .or(self.source_adapter.as_deref()); + { let mut reply_target = self.state.reply_target_message_id.write().await; *reply_target = messages.iter().rev().find_map(extract_message_id); @@ -1421,6 +1430,7 @@ impl Channel { &conversation_id, attachment_parts, false, // not a retrigger + batch_adapter, ) .await?; @@ -1552,8 +1562,13 @@ impl Channel { self.conversation_id = Some(message.conversation_id.clone()); } + // Track source adapter from non-system messages + // Prefer message.adapter (full adapter string like "signal:work") over message.source if self.source_adapter.is_none() && message.source != "system" { - self.source_adapter = Some(message.source.clone()); + self.source_adapter = message + .adapter + .clone() + .or_else(|| Some(message.source.clone())); } let (raw_text, attachments) = match &message.content { @@ -1743,6 +1758,10 @@ impl Channel { Vec::new() }; + let adapter = message + .adapter + .as_deref() + .or_else(|| self.current_adapter()); let (result, skip_flag, replied_flag, retrigger_reply_preserved) = self .run_agent_turn( &user_text, @@ -1750,6 +1769,7 @@ impl Channel { &message.conversation_id, attachment_content, is_retrigger, + adapter, ) .await?; @@ -1765,7 +1785,7 @@ impl Channel { && !replied_flag.load(std::sync::atomic::Ordering::Relaxed) && matches!( message.source.as_str(), - "discord" | "telegram" | "slack" | "twitch" + "discord" | "telegram" | "slack" | "twitch" | "signal" ) { self.send_builtin_text( @@ -2171,6 +2191,7 @@ impl Channel { conversation_id: &str, attachment_content: Vec, is_retrigger: bool, + adapter: Option<&str>, ) -> Result<( std::result::Result, crate::tools::SkipFlag, @@ -2198,6 +2219,7 @@ impl Channel { self.deps.cron_tool.clone(), send_agent_message_tool, allow_direct_reply, + adapter.map(|s| s.to_string()), ) .await { @@ -2582,6 +2604,12 @@ impl Channel { .channel_errors_total .with_label_values(&[metrics_agent_id, metrics_channel_type, "llm_error"]) .inc(); + // Send error to user so they know something went wrong + let error_msg = format!("I encountered an error: {}", error); + self.response_tx + .send(OutboundResponse::Text(error_msg)) + .await + .ok(); tracing::error!(channel_id = %self.id, %error, "channel LLM call failed"); } } diff --git a/src/agent/channel_history.rs b/src/agent/channel_history.rs index 56764a256..24d330867 100644 --- a/src/agent/channel_history.rs +++ b/src/agent/channel_history.rs @@ -317,7 +317,17 @@ pub(crate) fn format_user_message( raw_text }; - format!("{display_name}{bot_tag}{reply_context} [{timestamp_text}]: {text_content}") + let sender_context = message + .metadata + .get("sender_context") + .and_then(|v| v.as_str()) + .filter(|s| !s.is_empty()) + .map(|s| format!(" {s}")) + .unwrap_or_default(); + + format!( + "{display_name}{bot_tag}{reply_context}{sender_context} [{timestamp_text}]: {text_content}" + ) } pub(crate) fn format_batched_user_message( diff --git a/src/config.rs b/src/config.rs index 11318cab8..44d066f79 100644 --- a/src/config.rs +++ b/src/config.rs @@ -15,7 +15,7 @@ pub(crate) use load::resolve_env_value; pub use load::set_resolve_secrets_store; pub use onboarding::run_onboarding; pub use permissions::{ - DiscordPermissions, SlackPermissions, TelegramPermissions, TwitchPermissions, + DiscordPermissions, SignalPermissions, SlackPermissions, TelegramPermissions, TwitchPermissions, }; pub(crate) use providers::default_provider_config; pub use runtime::RuntimeConfig; @@ -1539,6 +1539,7 @@ maintenance_merge_similarity_threshold = 1.1 email: None, webhook: None, twitch: None, + signal: None, }; let bindings = vec![ Binding { @@ -1548,6 +1549,7 @@ maintenance_merge_similarity_threshold = 1.1 guild_id: None, workspace_id: None, chat_id: None, + channel_ids: vec![], require_mention: false, dm_allowed_users: vec![], @@ -1559,6 +1561,7 @@ maintenance_merge_similarity_threshold = 1.1 guild_id: None, workspace_id: None, chat_id: None, + channel_ids: vec![], require_mention: false, dm_allowed_users: vec![], @@ -1581,6 +1584,7 @@ maintenance_merge_similarity_threshold = 1.1 email: None, webhook: None, twitch: None, + signal: None, }; let bindings = vec![Binding { agent_id: "main".into(), @@ -1643,6 +1647,7 @@ maintenance_merge_similarity_threshold = 1.1 }), webhook: None, twitch: None, + signal: None, }; let bindings = vec![Binding { agent_id: "main".into(), @@ -1677,6 +1682,7 @@ maintenance_merge_similarity_threshold = 1.1 email: None, webhook: None, twitch: None, + signal: None, }; // Binding targets default adapter, but no default credentials exist let bindings = vec![Binding { diff --git a/src/config/load.rs b/src/config/load.rs index 3da09e61f..eec7d3d0b 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -14,10 +14,10 @@ use super::{ CoalesceConfig, CompactionConfig, Config, CortexConfig, CronDef, DefaultsConfig, DiscordConfig, DiscordInstanceConfig, EmailConfig, EmailInstanceConfig, GroupDef, HumanDef, IngestionConfig, LinkDef, LlmConfig, McpServerConfig, McpTransport, MemoryPersistenceConfig, MessagingConfig, - MetricsConfig, OpenCodeConfig, ProjectsConfig, ProviderConfig, SlackCommandConfig, SlackConfig, - SlackInstanceConfig, TelegramConfig, TelegramInstanceConfig, TelemetryConfig, TwitchConfig, - TwitchInstanceConfig, WarmupConfig, WebhookConfig, normalize_adapter, - validate_named_messaging_adapters, + MetricsConfig, OpenCodeConfig, ProjectsConfig, ProviderConfig, SignalConfig, + SignalInstanceConfig, SlackCommandConfig, SlackConfig, SlackInstanceConfig, TelegramConfig, + TelegramInstanceConfig, TelemetryConfig, TwitchConfig, TwitchInstanceConfig, WarmupConfig, + WebhookConfig, normalize_adapter, validate_named_messaging_adapters, }; use crate::error::{ConfigError, Result}; @@ -2148,6 +2148,57 @@ impl Config { trigger_prefix: t.trigger_prefix, }) }), + signal: toml.messaging.signal.and_then(|s| { + let instances = s + .instances + .into_iter() + .map(|instance| { + let http_url = instance.http_url.as_deref().and_then(resolve_env_value); + let account = instance.account.as_deref().and_then(resolve_env_value); + if instance.enabled && (http_url.is_none() || account.is_none()) { + tracing::warn!( + adapter = %instance.name, + "signal instance is enabled but http_url or account is missing/unresolvable — disabling" + ); + } + let has_credentials = http_url.is_some() && account.is_some(); + SignalInstanceConfig { + name: instance.name, + enabled: instance.enabled && has_credentials, + http_url: http_url.unwrap_or_default(), + account: account.unwrap_or_default(), + dm_allowed_users: instance.dm_allowed_users, + group_ids: instance.group_ids, + group_allowed_users: instance.group_allowed_users, + ignore_stories: instance.ignore_stories, + } + }) + .collect::>(); + + let http_url = std::env::var("SIGNAL_HTTP_URL") + .ok() + .or_else(|| s.http_url.as_deref().and_then(resolve_env_value)); + let account = std::env::var("SIGNAL_ACCOUNT") + .ok() + .or_else(|| s.account.as_deref().and_then(resolve_env_value)); + + if (http_url.is_none() || account.is_none()) + && !instances.iter().any(|inst| inst.enabled) + { + return None; + } + + Some(SignalConfig { + enabled: s.enabled, + http_url: http_url.unwrap_or_default(), + account: account.unwrap_or_default(), + instances, + dm_allowed_users: s.dm_allowed_users, + group_ids: s.group_ids, + group_allowed_users: s.group_allowed_users, + ignore_stories: s.ignore_stories, + }) + }), }; let bindings: Vec = toml diff --git a/src/config/permissions.rs b/src/config/permissions.rs index 8844cf614..3c0713228 100644 --- a/src/config/permissions.rs +++ b/src/config/permissions.rs @@ -1,6 +1,7 @@ use super::{ - Binding, DiscordConfig, DiscordInstanceConfig, SlackConfig, SlackInstanceConfig, - TelegramConfig, TelegramInstanceConfig, TwitchConfig, TwitchInstanceConfig, + Binding, DiscordConfig, DiscordInstanceConfig, SignalConfig, SignalInstanceConfig, SlackConfig, + SlackInstanceConfig, TelegramConfig, TelegramInstanceConfig, TwitchConfig, + TwitchInstanceConfig, }; use std::collections::HashMap; @@ -324,6 +325,135 @@ impl TwitchPermissions { } } +/// Hot-reloadable Signal permission filters. +/// +/// Shared with the Signal adapter via `Arc>` for hot-reloading. +/// Uses string-based identifiers since Signal users are identified by phone +/// numbers (E.164) or UUIDs. +/// +/// Wildcards: +/// - `"*"` in `dm_allowed_users` means allow all DM users +/// - `"*"` in `group_allowed_users` means allow all group users +/// - `"*"` in `group_filter` means allow all groups +/// - Empty array means block all (the `"*"` must be explicitly set to allow all) +#[derive(Debug, Clone, Default)] +pub struct SignalPermissions { + /// Allowed group IDs. None = block all, Some(["*"]) = allow all, Some([...]) = specific list. + pub group_filter: Option>, + /// Phone numbers or UUIDs allowed to DM the bot. ["*"] = allow all, [] = block all. + /// Only applies to direct messages. + pub dm_allowed_users: Vec, + /// Phone numbers or UUIDs allowed in group messages. ["*"] = allow all, [] = block all. + /// For groups, both dm_allowed_users AND group_allowed_users are checked (merged). + pub group_allowed_users: Vec, +} + +impl SignalPermissions { + /// Build from the current config's signal settings. + pub fn from_config(signal: &SignalConfig) -> Self { + Self::build_from_seed( + signal.dm_allowed_users.clone(), + signal.group_ids.clone(), + signal.group_allowed_users.clone(), + ) + } + + /// Build permissions for a named Signal adapter instance. + pub fn from_instance_config(instance: &SignalInstanceConfig) -> Self { + Self::build_from_seed( + instance.dm_allowed_users.clone(), + instance.group_ids.clone(), + instance.group_allowed_users.clone(), + ) + } + + fn build_from_seed( + seed_dm_allowed_users: Vec, + seed_group_ids: Vec, + seed_group_allowed_users: Vec, + ) -> Self { + // Group filter: collect group_ids from signal config/instance. + // - "*" means allow all groups + // - Empty list means block all groups + // - Specific IDs means only those groups are allowed + let mut group_filter_wildcard = false; + let group_filter = { + let mut all_group_ids: Vec = Vec::new(); + + // Process seed_group_ids with validation + for id in &seed_group_ids { + let id = id.trim().to_string(); + if id.is_empty() { + continue; + } + if id == "*" { + group_filter_wildcard = true; + break; + } + // Signal group IDs are base64-encoded; validate format. + if !is_valid_base64(&id) { + tracing::warn!( + group_id = %id, + "signal: seed group_id is not valid base64, dropping" + ); + continue; + } + if !all_group_ids.contains(&id) { + all_group_ids.push(id); + } + } + + if group_filter_wildcard { + Some(vec!["*".to_string()]) + } else if all_group_ids.is_empty() { + None + } else { + Some(all_group_ids) + } + }; + + // Build dm_allowed_users separately (for DMs only) + // - "*" means allow all DM users + // - Empty list means block all DMs + // - Specific list means only those users are allowed for DMs + let dm_users: Vec = seed_dm_allowed_users + .iter() + .filter_map(|id| { + let trimmed = id.trim(); + (!trimmed.is_empty()).then(|| trimmed.to_string()) + }) + .collect(); + let dm_wildcard = dm_users.iter().any(|id| id == "*"); + + // Build group_allowed_users separately (for groups only) + // - "*" means allow all group users + // - Empty list means block all group users + // - Specific list means only those users are allowed in groups + let group_users: Vec = seed_group_allowed_users + .iter() + .filter_map(|id| { + let trimmed = id.trim(); + (!trimmed.is_empty()).then(|| trimmed.to_string()) + }) + .collect(); + let group_wildcard = group_users.iter().any(|id| id == "*"); + + Self { + group_filter, + dm_allowed_users: if dm_wildcard { + vec!["*".to_string()] + } else { + dm_users + }, + group_allowed_users: if group_wildcard { + vec!["*".to_string()] + } else { + group_users + }, + } + } +} + fn binding_adapter_selector_matches(binding: &Binding, adapter_selector: Option<&str>) -> bool { match (binding.adapter.as_deref(), adapter_selector) { (None, None) => true, @@ -333,3 +463,48 @@ fn binding_adapter_selector_matches(binding: &Binding, adapter_selector: Option< _ => false, } } + +/// Check if a string is valid base64 (URL-safe or standard). +/// Signal group IDs are base64-encoded. +fn is_valid_base64(s: &str) -> bool { + use base64::{ + Engine, engine::general_purpose::STANDARD, engine::general_purpose::URL_SAFE, + engine::general_purpose::URL_SAFE_NO_PAD, + }; + + let trimmed = s.trim(); + if trimmed.is_empty() { + return false; + } + + URL_SAFE_NO_PAD.decode(trimmed).is_ok() + || URL_SAFE.decode(trimmed).is_ok() + || STANDARD.decode(trimmed).is_ok() +} + +#[cfg(test)] +mod base64_tests { + use super::is_valid_base64; + + #[test] + fn test_valid_url_safe_base64() { + // URL-safe base64 without padding (common for Signal group IDs) + assert!(is_valid_base64("abc123def456")); + assert!(is_valid_base64("abc123_def_456__")); // URL-safe with underscores + } + + #[test] + fn test_valid_standard_base64() { + // Standard base64 + assert!(is_valid_base64("abc123DEF456")); + assert!(is_valid_base64("SGVsbG8gV29ybGQ=")); // "Hello World" + } + + #[test] + fn test_invalid_base64() { + // Invalid characters not in base64 alphabet + assert!(!is_valid_base64("not@valid!base64")); + assert!(!is_valid_base64("")); + assert!(!is_valid_base64(" ")); + } +} diff --git a/src/config/toml_schema.rs b/src/config/toml_schema.rs index 969333053..d4453f2e0 100644 --- a/src/config/toml_schema.rs +++ b/src/config/toml_schema.rs @@ -496,6 +496,7 @@ pub(super) struct TomlMessagingConfig { pub(super) email: Option, pub(super) webhook: Option, pub(super) twitch: Option, + pub(super) signal: Option, } #[derive(Deserialize)] @@ -687,6 +688,45 @@ pub(super) struct TomlTwitchInstanceConfig { pub(super) trigger_prefix: Option, } +#[derive(Deserialize)] +pub(super) struct TomlSignalConfig { + #[serde(default)] + pub(super) enabled: bool, + pub(super) http_url: Option, + pub(super) account: Option, + #[serde(default)] + pub(super) instances: Vec, + #[serde(default)] + pub(super) dm_allowed_users: Vec, + #[serde(default)] + pub(super) group_ids: Vec, + #[serde(default)] + pub(super) group_allowed_users: Vec, + #[serde(default = "default_signal_ignore_stories")] + pub(super) ignore_stories: bool, +} + +#[derive(Deserialize)] +pub(super) struct TomlSignalInstanceConfig { + pub(super) name: String, + #[serde(default)] + pub(super) enabled: bool, + pub(super) http_url: Option, + pub(super) account: Option, + #[serde(default)] + pub(super) dm_allowed_users: Vec, + #[serde(default)] + pub(super) group_ids: Vec, + #[serde(default)] + pub(super) group_allowed_users: Vec, + #[serde(default = "default_signal_ignore_stories")] + pub(super) ignore_stories: bool, +} + +pub(super) fn default_signal_ignore_stories() -> bool { + true +} + pub(super) fn default_webhook_port() -> u16 { 18789 } diff --git a/src/config/types.rs b/src/config/types.rs index 6c6111f66..93b66982e 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -1461,7 +1461,7 @@ pub struct Binding { pub adapter: Option, pub guild_id: Option, pub workspace_id: Option, // Slack workspace (team) ID - pub chat_id: Option, + pub chat_id: Option, // Telegram group ID /// Channel IDs this binding applies to. If empty, all channels in the guild/workspace are allowed. pub channel_ids: Vec, /// Require explicit @mention (or reply-to-bot) for inbound messages. @@ -1659,7 +1659,7 @@ pub(super) struct AdapterValidationState { pub(super) fn is_named_adapter_platform(platform: &str) -> bool { matches!( platform, - "discord" | "slack" | "telegram" | "twitch" | "email" + "discord" | "slack" | "telegram" | "twitch" | "email" | "signal" ) } @@ -1822,6 +1822,26 @@ pub(super) fn build_adapter_validation_states( ); } + if let Some(signal) = &messaging.signal { + let named_instances = validate_instance_names( + "signal", + signal + .instances + .iter() + .map(|instance| instance.name.as_str()), + )?; + let default_present = + !signal.http_url.trim().is_empty() && !signal.account.trim().is_empty(); + validate_runtime_keys("signal", default_present, &named_instances)?; + states.insert( + "signal", + AdapterValidationState { + default_present, + named_instances, + }, + ); + } + Ok(states) } @@ -1942,6 +1962,7 @@ pub struct MessagingConfig { pub email: Option, pub webhook: Option, pub twitch: Option, + pub signal: Option, } #[derive(Clone)] @@ -2435,3 +2456,102 @@ pub struct WebhookConfig { pub bind: String, pub auth_token: Option, } + +/// Signal messaging via signal-cli JSON-RPC daemon. +/// +/// Connects to a running `signal-cli daemon --http` instance for sending and +/// receiving Signal messages. Supports both direct messages and group chats. +#[derive(Clone)] +pub struct SignalConfig { + pub enabled: bool, + /// Base URL of the signal-cli JSON-RPC HTTP daemon (e.g. `http://127.0.0.1:8686`). + /// May contain embedded credentials which are redacted in debug output. + pub http_url: String, + /// E.164 phone number of the bot's Signal account (e.g. `+1234567890`). + pub account: String, + /// Additional named Signal adapter instances. + pub instances: Vec, + /// Phone numbers or UUIDs allowed to DM the bot. If empty, DMs are ignored. + pub dm_allowed_users: Vec, + /// Group IDs allowed for this adapter. If empty, all groups are blocked + /// (same as `None` in the permission filter — groups are opt-in only). + pub group_ids: Vec, + /// User IDs allowed to message in Signal groups. + pub group_allowed_users: Vec, + /// Whether to silently drop story messages (default: true). + pub ignore_stories: bool, +} + +/// Per-instance config for a named Signal adapter. +#[derive(Clone)] +pub struct SignalInstanceConfig { + pub name: String, + pub enabled: bool, + /// Base URL of this instance's signal-cli daemon. + pub http_url: String, + /// E.164 phone number for this instance's Signal account. + pub account: String, + /// Phone numbers or UUIDs allowed to DM this instance. + pub dm_allowed_users: Vec, + /// Group IDs allowed for this instance. + pub group_ids: Vec, + /// User IDs allowed to message in Signal groups for this instance. + pub group_allowed_users: Vec, + /// Whether this instance drops story messages. + pub ignore_stories: bool, +} + +impl std::fmt::Debug for SignalInstanceConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SignalInstanceConfig") + .field("name", &self.name) + .field("enabled", &self.enabled) + .field("http_url", &"[REDACTED]") + .field("account", &"[REDACTED]") + .field("dm_allowed_users", &"[REDACTED]") + .field("group_ids", &self.group_ids) + .field("group_allowed_users", &"[REDACTED]") + .field("ignore_stories", &self.ignore_stories) + .finish() + } +} + +impl std::fmt::Debug for SignalConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SignalConfig") + .field("enabled", &self.enabled) + .field("http_url", &"[REDACTED]") + .field("account", &"[REDACTED]") + .field("instances", &self.instances) + .field("dm_allowed_users", &"[REDACTED]") + .field("group_ids", &self.group_ids) + .field("group_allowed_users", &"[REDACTED]") + .field("ignore_stories", &self.ignore_stories) + .finish() + } +} + +impl SystemSecrets for SignalConfig { + fn section() -> &'static str { + "signal" + } + + fn is_messaging_adapter() -> bool { + true + } + + fn secret_fields() -> &'static [SecretField] { + &[ + SecretField { + toml_key: "http_url", + secret_name: "SIGNAL_HTTP_URL", + instance_pattern: None, + }, + SecretField { + toml_key: "account", + secret_name: "SIGNAL_ACCOUNT", + instance_pattern: None, + }, + ] + } +} diff --git a/src/config/watcher.rs b/src/config/watcher.rs index e39ec1533..8daa4dbdd 100644 --- a/src/config/watcher.rs +++ b/src/config/watcher.rs @@ -2,8 +2,8 @@ use std::path::PathBuf; use std::sync::Arc; use super::{ - Binding, Config, DiscordPermissions, RuntimeConfig, SlackPermissions, TelegramPermissions, - TwitchPermissions, binding_runtime_adapter_key, + Binding, Config, DiscordPermissions, RuntimeConfig, SignalPermissions, SlackPermissions, + TelegramPermissions, TwitchPermissions, binding_runtime_adapter_key, }; /// Per-agent context needed by the file watcher: (id, prompt_dir, identity_dir, @@ -31,6 +31,7 @@ pub fn spawn_file_watcher( slack_permissions: Option>>, telegram_permissions: Option>>, twitch_permissions: Option>>, + signal_permissions: Option>>, bindings: Arc>>, messaging_manager: Option>, llm_manager: Arc, @@ -240,6 +241,14 @@ pub fn spawn_file_watcher( tracing::info!("twitch permissions reloaded"); } + if let Some(ref perms) = signal_permissions + && let Some(signal_config) = &config.messaging.signal + { + let new_perms = SignalPermissions::from_config(signal_config); + perms.store(Arc::new(new_perms)); + tracing::info!("signal permissions reloaded"); + } + // Hot-start adapters that are newly enabled in the config if let Some(ref manager) = messaging_manager { let rt = tokio::runtime::Handle::current(); @@ -249,6 +258,7 @@ pub fn spawn_file_watcher( let slack_permissions = slack_permissions.clone(); let telegram_permissions = telegram_permissions.clone(); let twitch_permissions = twitch_permissions.clone(); + let signal_permissions = signal_permissions.clone(); let instance_dir = instance_dir.clone(); rt.spawn(async move { @@ -531,6 +541,62 @@ pub fn spawn_file_watcher( } } } + + // Signal: start default + named instances that are enabled and not already running. + if let Some(signal_config) = &config.messaging.signal + && signal_config.enabled { + if !signal_config.http_url.is_empty() + && !signal_config.account.is_empty() + && !manager.has_adapter("signal").await + { + let permissions = match signal_permissions { + Some(ref existing) => existing.clone(), + None => { + let permissions = SignalPermissions::from_config(signal_config); + Arc::new(arc_swap::ArcSwap::from_pointee(permissions)) + } + }; + let tmp_dir = instance_dir.join("tmp"); + let adapter = crate::messaging::signal::SignalAdapter::new( + "signal", + &signal_config.http_url, + &signal_config.account, + signal_config.ignore_stories, + permissions, + tmp_dir, + ); + if let Err(error) = manager.register_and_start(adapter).await { + tracing::error!(%error, "failed to hot-start signal adapter from config change"); + } + } + + for instance in signal_config.instances.iter().filter(|instance| instance.enabled) { + let runtime_key = binding_runtime_adapter_key( + "signal", + Some(instance.name.as_str()), + ); + if manager.has_adapter(runtime_key.as_str()).await { + // TODO: named instance permissions not hot-updated (see discord block comment) + continue; + } + + let permissions = Arc::new(arc_swap::ArcSwap::from_pointee( + SignalPermissions::from_instance_config(instance), + )); + let tmp_dir = instance_dir.join("tmp"); + let adapter = crate::messaging::signal::SignalAdapter::new( + runtime_key, + &instance.http_url, + &instance.account, + instance.ignore_stories, + permissions, + tmp_dir, + ); + if let Err(error) = manager.register_and_start(adapter).await { + tracing::error!(%error, adapter = %instance.name, "failed to hot-start named signal adapter from config change"); + } + } + } }); } } diff --git a/src/main.rs b/src/main.rs index 1c09da7d7..776673a0f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1587,6 +1587,7 @@ async fn run( let mut slack_permissions = None; let mut telegram_permissions = None; let mut twitch_permissions = None; + let mut signal_permissions = None; initialize_agents( &config, &llm_manager, @@ -1604,6 +1605,7 @@ async fn run( &mut slack_permissions, &mut telegram_permissions, &mut twitch_permissions, + &mut signal_permissions, agent_links.clone(), agent_humans.clone(), injection_tx.clone(), @@ -1622,6 +1624,7 @@ async fn run( slack_permissions, telegram_permissions, twitch_permissions, + signal_permissions, bindings.clone(), Some(messaging_manager.clone()), llm_manager.clone(), @@ -1638,6 +1641,7 @@ async fn run( None, None, None, + None, bindings.clone(), None, llm_manager.clone(), @@ -2316,6 +2320,7 @@ async fn run( let mut new_slack_permissions = None; let mut new_telegram_permissions = None; let mut new_twitch_permissions = None; + let mut new_signal_permissions = None; match initialize_agents( &new_config, &new_llm_manager, @@ -2333,6 +2338,7 @@ async fn run( &mut new_slack_permissions, &mut new_telegram_permissions, &mut new_twitch_permissions, + &mut new_signal_permissions, agent_links.clone(), agent_humans.clone(), injection_tx.clone(), @@ -2350,6 +2356,7 @@ async fn run( new_slack_permissions, new_telegram_permissions, new_twitch_permissions, + new_signal_permissions, bindings.clone(), Some(messaging_manager.clone()), new_llm_manager.clone(), @@ -2472,6 +2479,7 @@ async fn initialize_agents( slack_permissions: &mut Option>>, telegram_permissions: &mut Option>>, twitch_permissions: &mut Option>>, + signal_permissions: &mut Option>>, agent_links: Arc>>, agent_humans: Arc>>, injection_tx: tokio::sync::mpsc::Sender, @@ -3172,6 +3180,58 @@ async fn initialize_agents( } } + // Shared Signal permissions (hot-reloadable via file watcher) + *signal_permissions = config.messaging.signal.as_ref().map(|signal_config| { + let perms = spacebot::config::SignalPermissions::from_config(signal_config); + Arc::new(ArcSwap::from_pointee(perms)) + }); + + if let Some(signal_config) = &config.messaging.signal + && signal_config.enabled + { + let tmp_dir = config.instance_dir.join("tmp"); + if !signal_config.http_url.is_empty() && !signal_config.account.is_empty() { + let adapter = spacebot::messaging::signal::SignalAdapter::new( + "signal", + &signal_config.http_url, + &signal_config.account, + signal_config.ignore_stories, + signal_permissions.clone().ok_or_else(|| { + anyhow::anyhow!("signal permissions not initialized when signal is enabled") + })?, + tmp_dir.clone(), + ); + new_messaging_manager.register(adapter).await; + } + + for instance in signal_config + .instances + .iter() + .filter(|instance| instance.enabled) + { + if instance.http_url.is_empty() || instance.account.is_empty() { + tracing::warn!(adapter = %instance.name, "skipping enabled signal instance with missing credentials"); + continue; + } + let runtime_key = spacebot::config::binding_runtime_adapter_key( + "signal", + Some(instance.name.as_str()), + ); + let perms = Arc::new(ArcSwap::from_pointee( + spacebot::config::SignalPermissions::from_instance_config(instance), + )); + let adapter = spacebot::messaging::signal::SignalAdapter::new( + runtime_key, + &instance.http_url, + &instance.account, + instance.ignore_stories, + perms, + tmp_dir.clone(), + ); + new_messaging_manager.register(adapter).await; + } + } + let webchat_agent_pools = agents .iter() .map(|(agent_id, agent)| (agent_id.to_string(), agent.db.sqlite.clone())) diff --git a/src/messaging.rs b/src/messaging.rs index f57d5d68a..e85ccf063 100644 --- a/src/messaging.rs +++ b/src/messaging.rs @@ -1,8 +1,9 @@ -//! Messaging adapters (Discord, Slack, Telegram, Twitch, Email, Webhook, WebChat). +//! Messaging adapters (Discord, Slack, Telegram, Twitch, Signal, Email, Webhook, WebChat). pub mod discord; pub mod email; pub mod manager; +pub mod signal; pub mod slack; pub mod target; pub mod telegram; diff --git a/src/messaging/signal.rs b/src/messaging/signal.rs new file mode 100644 index 000000000..b9f16a0a6 --- /dev/null +++ b/src/messaging/signal.rs @@ -0,0 +1,2057 @@ +//! Signal messaging adapter using signal-cli JSON-RPC daemon. +//! +//! Connects to a running `signal-cli daemon --http` instance for sending and +//! receiving Signal messages via its JSON-RPC and Server-Sent Events (SSE) APIs. +//! +//! ## Architecture +//! +//! - **Inbound:** SSE stream from `{http_url}/api/v1/events?account={account}` with +//! automatic reconnection and exponential backoff (2s → 60s). +//! - **Outbound:** JSON-RPC `send` calls to `{http_url}/api/v1/rpc`. DM recipients +//! must be passed as a JSON **array** (signal-cli requirement). Group messages use +//! `groupId` instead of `recipient`. +//! - **Typing:** JSON-RPC `sendTyping` method with repeating task (Signal indicators +//! expire after ~5s). +//! - **Attachments outbound:** Written to `{tmp_dir}/` as temp files, file paths passed +//! in the `attachments` JSON array, cleaned up after send. +//! - **Attachments inbound:** signal-cli provides file paths on disk; currently treated +//! as opaque (message text falls back to `[Attachment]` for attachment-only messages). +//! - **Streaming:** Not supported (Signal can't edit sent messages). `StreamStart`, +//! `StreamChunk`, and `StreamEnd` are no-ops. +//! +//! ## Limitations +//! +//! - **Threading:** Signal has no native thread concept. `ThreadReply` responses are +//! sent as regular messages without thread context. +//! - **Rich formatting:** Signal has no rich formatting (bold, italic, etc.). +//! `RichMessage` is sent as plain text. +//! - **Reactions:** Signal reactions are supported via JSON-RPC but require complex +//! target identification (author UUID + timestamp). Not currently implemented. +//! - **Ephemeral messages:** Not supported; sent as regular messages. +//! - **Scheduled messages:** Not supported; sent immediately. + +use crate::config::SignalPermissions; +use crate::messaging::traits::{ + InboundStream, Messaging, apply_runtime_adapter_to_conversation_id, +}; +use crate::{InboundMessage, MessageContent, OutboundResponse, StatusUpdate, metadata_keys}; + +use anyhow::Context as _; +use arc_swap::ArcSwap; +use futures::StreamExt; +use serde::Deserialize; +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::{RwLock, mpsc}; +use tokio::task::JoinHandle; +use uuid::Uuid; + +// ── constants ─────────────────────────────────────────────────── + +/// Maximum SSE line buffer size before reset (prevents OOM from runaway streams). +const MAX_SSE_BUFFER_SIZE: usize = 1024 * 1024; // 1 MiB + +/// Maximum single SSE event size (prevents huge JSON payloads). +const MAX_SSE_EVENT_SIZE: usize = 256 * 1024; // 256 KiB + +/// Maximum JSON-RPC response body size. +const MAX_RPC_RESPONSE_SIZE: usize = 1024 * 1024; // 1 MiB + +/// Prefix used to encode group targets in the reply target string. +const GROUP_TARGET_PREFIX: &str = "group:"; + +/// Signal-cli health check endpoint. +const HEALTH_ENDPOINT: &str = "/api/v1/check"; + +/// JSON-RPC endpoint path. +const RPC_ENDPOINT: &str = "/api/v1/rpc"; + +/// SSE events endpoint path. +const SSE_ENDPOINT: &str = "/api/v1/events"; + +/// Typing indicator repeat interval. Signal indicators expire after ~5 seconds, +/// so we resend every 4 seconds. +const TYPING_REPEAT_INTERVAL: Duration = Duration::from_secs(4); + +/// HTTP connect timeout for the reqwest client. +const HTTP_CONNECT_TIMEOUT: Duration = Duration::from_secs(10); + +/// Per-request timeout for JSON-RPC calls. +const RPC_REQUEST_TIMEOUT: Duration = Duration::from_secs(30); + +/// Initial SSE reconnection delay. +const SSE_INITIAL_BACKOFF: Duration = Duration::from_secs(2); + +/// Maximum SSE reconnection delay. +const SSE_MAX_BACKOFF: Duration = Duration::from_secs(60); + +// ── PII redaction helpers ─────────────────────────────────────── + +/// Redact a Signal identifier (phone number or UUID) for logging. +/// Keeps first 4 and last 4 characters, masks the middle with "****". +fn redact_identifier(id: &str) -> String { + if id.len() <= 8 { + return "[redacted]".to_string(); + } + let prefix = &id[..4.min(id.len())]; + let suffix = &id[id.len().saturating_sub(4)..]; + format!("{}****{}", prefix, suffix) +} + +/// Redact a URL for logging, keeping only the scheme and host (no userinfo). +/// Parses the URL to properly handle user:pass@host patterns. +fn redact_url(url: &str) -> String { + // Find the scheme separator + let Some(scheme_end) = url.find("://") else { + // Not a proper URL, return as-is truncated + return url.chars().take(50).collect(); + }; + + let scheme = &url[..scheme_end]; + let after_scheme = &url[scheme_end + 3..]; + + // Skip userinfo if present (user:pass@host) + let host_start = after_scheme.find('@').map(|i| i + 1).unwrap_or(0); + let host_port = &after_scheme[host_start..]; + + // Extract just the host (and port if present), stopping at first path segment + let host_end = host_port.find('/').unwrap_or(host_port.len()); + let host = &host_port[..host_end]; + + if host.is_empty() { + url.chars().take(50).collect() + } else { + format!("{}://{}", scheme, host) + } +} + +// ── signal-cli SSE event JSON shapes ──────────────────────────── + +#[derive(Debug, Deserialize)] +struct SseEnvelope { + #[serde(default)] + envelope: Option, +} + +#[derive(Debug, Deserialize)] +struct Envelope { + /// Source identifier (UUID for privacy-enabled users). + #[serde(default)] + source: Option, + /// E.164 phone number of the sender (preferred identifier). + #[serde(rename = "sourceNumber", default)] + source_number: Option, + /// Display name of the sender. + #[serde(rename = "sourceName", default)] + source_name: Option, + /// UUID of the sender. + #[serde(rename = "sourceUuid", default)] + source_uuid: Option, + #[serde(rename = "dataMessage", default)] + data_message: Option, + /// Present when the envelope is a story message (dropped when `ignore_stories` is true). + #[serde(rename = "storyMessage", default)] + story_message: Option, + #[serde(default)] + timestamp: Option, +} + +#[derive(Debug, Deserialize)] +struct DataMessage { + #[serde(default)] + message: Option, + #[serde(default)] + timestamp: Option, + #[serde(rename = "groupInfo", default)] + group_info: Option, + /// Inbound attachments as opaque JSON. signal-cli provides file paths on disk + /// in each attachment object (e.g. `contentType`, `filename`, `file`). + #[serde(default)] + attachments: Option>, +} + +#[derive(Debug, Deserialize)] +struct GroupInfo { + #[serde(rename = "groupId", default)] + group_id: Option, +} + +// ── recipient routing ─────────────────────────────────────────── + +/// Classification for outbound message routing. +#[derive(Debug, Clone)] +enum RecipientTarget { + /// Direct message to a phone number or UUID. + Direct(String), + /// Group message by group ID. + Group(String), +} + +// ── adapter ───────────────────────────────────────────────────── + +/// Signal messaging adapter. +/// +/// Connects to a signal-cli JSON-RPC daemon for sending/receiving messages. +/// Implements the [`Messaging`] trait for integration with [`MessagingManager`]. +pub struct SignalAdapter { + /// Runtime key used for registration and routing (e.g. `"signal"` or `"signal:personal"`). + runtime_key: String, + /// Base URL of the signal-cli HTTP daemon (e.g. `http://127.0.0.1:8686`). + http_url: String, + /// E.164 phone number of the bot's Signal account. + account: String, + /// Whether to silently drop story messages. + ignore_stories: bool, + /// Hot-reloadable permissions (DM allowlist + group filter). + permissions: Arc>, + /// HTTP client for JSON-RPC and SSE connections. + client: reqwest::Client, + /// Directory for temporary outbound attachment files. + tmp_dir: PathBuf, + /// Repeating typing indicator tasks per conversation_id. + typing_tasks: Arc>>>, + /// Shutdown signal for the SSE listener loop. + shutdown_tx: Arc>>>, +} + +impl SignalAdapter { + /// Create a new Signal adapter. + /// + /// # Arguments + /// - `runtime_key` — Adapter name for registration (e.g. `"signal"` or `"signal:work"`). + /// - `http_url` — Base URL of the signal-cli daemon (e.g. `http://127.0.0.1:8686`). + /// - `account` — E.164 phone number of the bot's Signal account. + /// - `ignore_stories` — Whether to silently drop story messages. + /// - `permissions` — Hot-reloadable permission filters. + /// - `tmp_dir` — Directory for temporary outbound attachment files. + pub fn new( + runtime_key: impl Into, + http_url: impl Into, + account: impl Into, + ignore_stories: bool, + permissions: Arc>, + tmp_dir: PathBuf, + ) -> Self { + let client = reqwest::Client::builder() + .connect_timeout(HTTP_CONNECT_TIMEOUT) + .build() + .unwrap_or_else(|error| { + tracing::warn!( + %error, + "failed to build reqwest client for signal adapter; falling back to default client" + ); + reqwest::Client::new() + }); + + Self { + runtime_key: runtime_key.into(), + http_url: http_url.into(), + account: account.into(), + ignore_stories, + permissions, + client, + tmp_dir, + typing_tasks: Arc::new(RwLock::new(HashMap::new())), + shutdown_tx: Arc::new(RwLock::new(None)), + } + } + + // ── JSON-RPC ──────────────────────────────────────────────── + + /// Send a JSON-RPC request to the signal-cli daemon. + /// + /// Returns the `result` field from the response, or `None` if the response + /// has no body (e.g. 201 for typing indicators). + async fn rpc_request( + &self, + method: &str, + params: serde_json::Value, + ) -> anyhow::Result> { + let url = format!("{}{RPC_ENDPOINT}", self.http_url); + let request_id = Uuid::new_v4().to_string(); + + let body = serde_json::json!({ + "jsonrpc": "2.0", + "method": method, + "params": params, + "id": request_id, + }); + + let response = self + .client + .post(&url) + .timeout(RPC_REQUEST_TIMEOUT) + .header("Content-Type", "application/json") + .json(&body) + .send() + .await + .with_context(|| format!("signal RPC request to {method} failed"))?; + + // 201 = success with no body (e.g. typing indicators). + if response.status().as_u16() == 201 { + return Ok(None); + } + + // Reject oversized responses before buffering. + if let Some(length) = response.content_length() + && length as usize > MAX_RPC_RESPONSE_SIZE + { + anyhow::bail!( + "signal RPC response too large: {length} bytes (max {MAX_RPC_RESPONSE_SIZE})" + ); + } + + let status = response.status(); + + // Stream the response body with size guard. + let mut stream = response.bytes_stream(); + let mut response_body = Vec::new(); + let mut total_bytes = 0usize; + + while let Some(chunk) = stream.next().await { + let chunk = chunk.context("failed to read signal RPC response chunk")?; + total_bytes += chunk.len(); + if total_bytes > MAX_RPC_RESPONSE_SIZE { + anyhow::bail!( + "signal RPC response exceeded {MAX_RPC_RESPONSE_SIZE} bytes while streaming" + ); + } + response_body.extend_from_slice(&chunk); + } + + if !status.is_success() { + let truncated_length = std::cmp::min(response_body.len(), 512); + let truncated_body = String::from_utf8_lossy(&response_body[..truncated_length]); + anyhow::bail!("signal RPC HTTP {}: {truncated_body}", status.as_u16()); + } + + if response_body.is_empty() { + return Ok(None); + } + + let parsed: serde_json::Value = + serde_json::from_slice(&response_body).context("invalid signal RPC response JSON")?; + + if let Some(error) = parsed.get("error") { + let code = error.get("code").and_then(|c| c.as_i64()).unwrap_or(-1); + let message = error + .get("message") + .and_then(|m| m.as_str()) + .unwrap_or("unknown"); + anyhow::bail!("signal RPC error {code}: {message}"); + } + + Ok(parsed.get("result").cloned()) + } + + /// Build JSON-RPC params for a send or typing call. + /// + /// **Critical:** DM `recipient` must be a JSON **array** (signal-cli requirement). + /// Group messages use `groupId` instead (mutually exclusive with `recipient`). + fn build_rpc_params( + &self, + target: &RecipientTarget, + message: Option<&str>, + attachments: Option<&[String]>, + ) -> serde_json::Value { + let mut params = match target { + RecipientTarget::Direct(identifier) => { + serde_json::json!({ + "recipient": [identifier], + "account": &self.account, + }) + } + RecipientTarget::Group(group_id) => { + serde_json::json!({ + "groupId": group_id, + "account": &self.account, + }) + } + }; + + if let Some(text) = message { + params["message"] = serde_json::Value::String(text.to_string()); + } + + if let Some(paths) = attachments + && !paths.is_empty() + { + params["attachments"] = serde_json::Value::Array( + paths + .iter() + .map(|path| serde_json::Value::String(path.clone())) + .collect(), + ); + } + + params + } + + // ── outbound helpers ──────────────────────────────────────── + + /// Resolve the outbound target for a conversation from message metadata. + fn resolve_target(&self, message: &InboundMessage) -> Option { + // Target is set during inbound processing via signal_target metadata. + if let Some(target_string) = message + .metadata + .get("signal_target") + .and_then(|value| value.as_str()) + { + return Some(parse_recipient_target(target_string)); + } + None + } + + /// Send a text message to the resolved target. + async fn send_text(&self, target: &RecipientTarget, text: &str) -> anyhow::Result<()> { + let params = self.build_rpc_params(target, Some(text), None); + self.rpc_request("send", params).await?; + Ok(()) + } + + /// Send a file attachment by writing the data to a temp file, sending it via + /// the JSON-RPC `attachments` field, then cleaning up the temp file. + async fn send_file( + &self, + target: &RecipientTarget, + filename: &str, + data: &[u8], + caption: Option<&str>, + ) -> anyhow::Result<()> { + // Ensure tmp dir exists. + tokio::fs::create_dir_all(&self.tmp_dir) + .await + .with_context(|| { + format!( + "failed to create signal tmp dir: {}", + self.tmp_dir.display() + ) + })?; + + // Write temp file with a unique name to avoid collisions. + // Sanitize filename to prevent path traversal. + let safe_filename = std::path::Path::new(filename) + .file_name() + .and_then(|name| name.to_str()) + .filter(|name| !name.is_empty()) + .unwrap_or("attachment.bin"); + let unique_name = format!("{}_{}", Uuid::new_v4(), safe_filename); + let tmp_path = self.tmp_dir.join(&unique_name); + tokio::fs::write(&tmp_path, data).await.with_context(|| { + format!( + "failed to write signal attachment to {}", + tmp_path.display() + ) + })?; + + let attachment_path = tmp_path + .to_str() + .context("signal tmp path is not valid UTF-8")? + .to_string(); + + let result = { + let params = self.build_rpc_params(target, caption, Some(&[attachment_path])); + self.rpc_request("send", params).await + }; + + // Clean up temp file regardless of send result. + if let Err(error) = tokio::fs::remove_file(&tmp_path).await { + tracing::debug!( + path = %tmp_path.display(), + %error, + "failed to clean up signal attachment temp file" + ); + } + + result?; + Ok(()) + } + + /// Cancel the repeating typing indicator for a conversation. + async fn stop_typing(&self, conversation_id: &str) { + if let Some(handle) = self.typing_tasks.write().await.remove(conversation_id) { + handle.abort(); + } + } + + // ── SSE envelope processing ───────────────────────────────── + + /// Process a single SSE envelope into an `InboundMessage` if it passes + /// permission filters and contains valid content. + /// + /// Returns `(InboundMessage, reply_target_string)` on success. + fn process_envelope(&self, envelope: &Envelope) -> Option<(InboundMessage, String)> { + // Drop story messages when configured. + if self.ignore_stories && envelope.story_message.is_some() { + tracing::debug!("signal: dropping story message"); + return None; + } + + let data_message = envelope.data_message.as_ref()?; + + let has_attachments = data_message + .attachments + .as_ref() + .is_some_and(|attachments| !attachments.is_empty()); + let has_text = data_message + .message + .as_ref() + .is_some_and(|text| !text.is_empty()); + + // Need either text or attachments to produce a message. + if !has_text && !has_attachments { + return None; + } + + // Use message text, or fall back to "[Attachment]" for attachment-only messages. + let text = data_message + .message + .as_deref() + .filter(|text| !text.is_empty()) + .map(String::from) + .or_else(|| { + if has_attachments { + Some("[Attachment]".to_string()) + } else { + None + } + })?; + + // Resolve sender: prefer sourceNumber (E.164), fall back to source (UUID), then source_uuid. + let sender = envelope + .source_number + .as_deref() + .or(envelope.source.as_deref()) + .or(envelope.source_uuid.as_deref()) + .map(String::from)?; + + // Determine if this is a group message. + let group_id = data_message + .group_info + .as_ref() + .and_then(|group| group.group_id.as_deref()); + let is_group = group_id.is_some(); + + // ── permission checks ─────────────────────────────────── + let permissions = self.permissions.load(); + + if is_group { + // Group filter: None/empty = block all groups, ["*"] = allow all, specific list = check + match &permissions.group_filter { + None => { + let redacted_group = group_id.map(redact_identifier); + tracing::info!( + group_id = ?redacted_group, + "signal: group message blocked (no groups configured)" + ); + return None; + } + Some(allowed_groups) => { + if allowed_groups.is_empty() { + let redacted_group = group_id.map(redact_identifier); + tracing::info!( + group_id = ?redacted_group, + "signal: group message blocked (empty group filter)" + ); + return None; + } + if allowed_groups.iter().any(|g| g == "*") { + // Wildcard: allow all groups + } else if let Some(gid) = group_id + && !allowed_groups.iter().any(|allowed| allowed == gid) + { + tracing::info!( + group_id = %redact_identifier(gid), + "signal: group message rejected (not in group filter)" + ); + return None; + } + } + }; + // Check sender is allowed for groups: + // Both dm_allowed_users AND group_allowed_users apply to groups (merged) + // ["*"] in either = allow all, [] = block all, specific list = check + let all_group_users: Vec<&String> = permissions + .dm_allowed_users + .iter() + .chain(permissions.group_allowed_users.iter()) + .collect(); + + let sender_allowed = if all_group_users.is_empty() { + false // Empty = block all + } else if all_group_users.iter().any(|u| u.as_str() == "*") { + true // Wildcard = allow all + } else { + all_group_users.iter().any(|allowed| { + allowed.as_str() == sender + || envelope + .source_uuid + .as_deref() + .is_some_and(|uuid| allowed.as_str() == uuid) + }) + }; + if !sender_allowed { + tracing::info!( + sender = %redact_identifier(&sender), + "signal: group message rejected (sender not in allowed users)" + ); + return None; + } + } else { + // DM filter: ["*"] = allow all, [] = block all, specific list = check + let sender_allowed = if permissions.dm_allowed_users.is_empty() { + false // Empty = block all + } else if permissions + .dm_allowed_users + .iter() + .any(|u| u.as_str() == "*") + { + true // Wildcard = allow all + } else { + permissions.dm_allowed_users.iter().any(|allowed| { + // Match against phone number, UUID, or source field. + allowed.as_str() == sender + || envelope + .source_uuid + .as_deref() + .is_some_and(|uuid| allowed.as_str() == uuid) + }) + }; + if !sender_allowed { + tracing::info!( + sender = %redact_identifier(&sender), + "signal: DM rejected (sender not in dm_allowed_users)" + ); + return None; + } + } + + // Log authorized message + let sender_display = envelope + .source_uuid + .as_deref() + .or(envelope.source_number.as_deref()) + .unwrap_or(&sender); + tracing::info!( + sender = %redact_identifier(sender_display), + text_len = %text.len(), + is_group = is_group, + "signal: message received" + ); + + // ── build reply target ────────────────────────────────── + + let reply_target = if let Some(gid) = group_id { + format!("{GROUP_TARGET_PREFIX}{gid}") + } else { + sender.clone() + }; + + // ── build conversation ID ─────────────────────────────── + + let base_conversation_id = if let Some(gid) = group_id { + format!("signal:group:{gid}") + } else { + // Use UUID if available (stable), fall back to phone number. + // Canonicalize UUID-backed targets to "signal:uuid:{uuid}" format + if let Some(uuid) = envelope.source_uuid.as_deref() { + format!("signal:uuid:{uuid}") + } else { + format!("signal:{sender}") + } + }; + let conversation_id = + apply_runtime_adapter_to_conversation_id(&self.runtime_key, base_conversation_id); + + // ── build timestamp ───────────────────────────────────── + + let timestamp_millis = data_message + .timestamp + .or(envelope.timestamp) + .unwrap_or_else(|| chrono::Utc::now().timestamp_millis().max(0) as u64); + + let timestamp = chrono::DateTime::from_timestamp_millis(timestamp_millis as i64) + .unwrap_or_else(chrono::Utc::now); + + // ── build metadata ────────────────────────────────────── + + let (metadata, formatted_author) = build_metadata(envelope, &reply_target, group_id); + + // ── build sender_id ───────────────────────────────────── + + let sender_id = envelope + .source_uuid + .as_deref() + .unwrap_or(&sender) + .to_string(); + + // ── build message ID ──────────────────────────────────── + + let message_id = format!("{timestamp_millis}_{sender_id}"); + + // ── assemble InboundMessage ───────────────────────────── + + let inbound = InboundMessage { + id: message_id, + source: "signal".into(), + adapter: Some(self.runtime_key.clone()), + conversation_id, + sender_id, + agent_id: None, + content: MessageContent::Text(text), + timestamp, + metadata, + formatted_author, + }; + + Some((inbound, reply_target)) + } +} + +// ── Messaging trait implementation ────────────────────────────── + +impl Messaging for SignalAdapter { + fn name(&self) -> &str { + &self.runtime_key + } + + async fn start(&self) -> crate::Result { + let (inbound_tx, inbound_rx) = mpsc::channel(256); + let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); + + *self.shutdown_tx.write().await = Some(shutdown_tx); + + // Verify connectivity before spawning the listener. + self.health_check().await?; + + tracing::info!( + adapter = %self.runtime_key, + account = %redact_identifier(&self.account), + "signal adapter connected" + ); + + // Clone fields for the spawned SSE listener task. + let client = self.client.clone(); + let http_url = self.http_url.clone(); + let account = self.account.clone(); + let runtime_key = self.runtime_key.clone(); + let ignore_stories = self.ignore_stories; + let permissions = self.permissions.clone(); + let tmp_dir = self.tmp_dir.clone(); + + tokio::spawn(async move { + // Build a temporary adapter for envelope processing inside the task. + // We need access to process_envelope which reads permissions. + let adapter = SignalAdapter { + runtime_key, + http_url: http_url.clone(), + account: account.clone(), + ignore_stories, + permissions, + client: client.clone(), + tmp_dir, + typing_tasks: Arc::new(RwLock::new(HashMap::new())), + shutdown_tx: Arc::new(RwLock::new(None)), + }; + + sse_listener(adapter, client, http_url, account, inbound_tx, shutdown_rx).await; + }); + + let stream = tokio_stream::wrappers::ReceiverStream::new(inbound_rx); + Ok(Box::pin(stream)) + } + + async fn respond( + &self, + message: &InboundMessage, + response: OutboundResponse, + ) -> crate::Result<()> { + let target = self.resolve_target(message).with_context(|| { + format!( + "cannot resolve signal reply target for conversation {}", + message.conversation_id + ) + })?; + + match response { + OutboundResponse::Text(text) => { + self.stop_typing(&message.conversation_id).await; + self.send_text(&target, &text).await?; + } + OutboundResponse::RichMessage { text, .. } => { + // Signal has no rich formatting — send plain text. + self.stop_typing(&message.conversation_id).await; + self.send_text(&target, &text).await?; + } + OutboundResponse::ThreadReply { text, .. } => { + // Signal has no named threads — send as regular message. + self.stop_typing(&message.conversation_id).await; + self.send_text(&target, &text).await?; + } + OutboundResponse::File { + filename, + data, + caption, + .. + } => { + self.stop_typing(&message.conversation_id).await; + self.send_file(&target, &filename, &data, caption.as_deref()) + .await?; + } + OutboundResponse::Reaction(_) => { + // Signal supports reactions via JSON-RPC but the API shape is complex + // (requires target author UUID + timestamp). Skip for now. + tracing::debug!( + conversation_id = %message.conversation_id, + "signal: reactions not supported, dropping" + ); + } + OutboundResponse::RemoveReaction(_) => { + tracing::debug!( + conversation_id = %message.conversation_id, + "signal: remove reactions not supported, dropping" + ); + } + OutboundResponse::Ephemeral { text, .. } => { + // Signal has no ephemeral messages — send as regular text. + self.stop_typing(&message.conversation_id).await; + self.send_text(&target, &text).await?; + } + OutboundResponse::ScheduledMessage { text, .. } => { + // Signal has no scheduled messages — send immediately. + self.stop_typing(&message.conversation_id).await; + self.send_text(&target, &text).await?; + } + OutboundResponse::StreamStart + | OutboundResponse::StreamChunk(_) + | OutboundResponse::StreamEnd => { + // Signal can't edit sent messages — streaming is not supported. + // StreamStart/Chunk/End are no-ops. + } + OutboundResponse::Status(status) => { + self.send_status(message, status).await?; + } + } + + Ok(()) + } + + async fn send_status( + &self, + message: &InboundMessage, + status: StatusUpdate, + ) -> crate::Result<()> { + match status { + StatusUpdate::Thinking => { + let Some(target) = self.resolve_target(message) else { + return Ok(()); + }; + + // Abort any existing typing task before starting a new one. + self.stop_typing(&message.conversation_id).await; + + let client = self.client.clone(); + let http_url = self.http_url.clone(); + let account = self.account.clone(); + let conversation_id = message.conversation_id.clone(); + let rpc_url = format!("{http_url}{RPC_ENDPOINT}"); + + // Send typing indicator immediately, then repeat every 4 seconds. + let handle = tokio::spawn(async move { + loop { + let params = match &target { + RecipientTarget::Direct(identifier) => { + serde_json::json!({ + "recipient": [identifier], + "account": &account, + }) + } + RecipientTarget::Group(group_id) => { + serde_json::json!({ + "groupId": group_id, + "account": &account, + }) + } + }; + + let body = serde_json::json!({ + "jsonrpc": "2.0", + "method": "sendTyping", + "params": params, + "id": Uuid::new_v4().to_string(), + }); + + match client + .post(&rpc_url) + .timeout(RPC_REQUEST_TIMEOUT) + .header("Content-Type", "application/json") + .json(&body) + .send() + .await + { + Ok(response) => { + if !response.status().is_success() { + let status = response.status(); + // Log response body at debug level only (may contain sensitive data) + if let Ok(body_text) = response.text().await { + let truncated = if body_text.len() > 200 { + format!("{}...", &body_text[..200]) + } else { + body_text + }; + tracing::debug!(%status, body = %truncated, "signal typing indicator failed"); + } + tracing::warn!(%status, "signal typing indicator request failed"); + break; + } + } + Err(error) => { + tracing::debug!(%error, "failed to send signal typing indicator"); + break; + } + } + + tokio::time::sleep(TYPING_REPEAT_INTERVAL).await; + } + }); + + self.typing_tasks + .write() + .await + .insert(conversation_id, handle); + } + _ => { + self.stop_typing(&message.conversation_id).await; + } + } + + Ok(()) + } + + async fn broadcast( + &self, + target: &str, + response: crate::OutboundResponse, + ) -> crate::Result<()> { + // Parse target into RecipientTarget + // Format: uuid:xxx, group:xxx, or +xxx + let recipient = if let Some(uuid) = target.strip_prefix("uuid:") { + RecipientTarget::Direct(uuid.to_string()) + } else if let Some(group_id) = target.strip_prefix("group:") { + RecipientTarget::Group(group_id.to_string()) + } else if target.starts_with('+') { + RecipientTarget::Direct(target.to_string()) + } else { + return Err(crate::Error::Other(anyhow::anyhow!( + "invalid signal broadcast target format: {target}" + ))); + }; + + match response { + crate::OutboundResponse::Text(text) => { + self.send_text(&recipient, &text).await?; + } + crate::OutboundResponse::RichMessage { text, .. } => { + // Signal has no rich formatting — send plain text + self.send_text(&recipient, &text).await?; + } + crate::OutboundResponse::File { + filename, + data, + caption, + .. + } => { + self.send_file(&recipient, &filename, &data, caption.as_deref()) + .await?; + } + crate::OutboundResponse::ThreadReply { text, .. } => { + // Signal has no threads — send as regular message + self.send_text(&recipient, &text).await?; + } + crate::OutboundResponse::Ephemeral { text, .. } => { + // Signal has no ephemeral messages — send as regular text + self.send_text(&recipient, &text).await?; + } + crate::OutboundResponse::ScheduledMessage { text, .. } => { + // Signal has no scheduled messages — send immediately + self.send_text(&recipient, &text).await?; + } + _ => { + // Other response types are not supported for broadcast + tracing::debug!( + target = %redact_identifier(target), + "signal: unsupported broadcast response type, dropping" + ); + } + } + + Ok(()) + } + + async fn health_check(&self) -> crate::Result<()> { + let url = format!("{}{HEALTH_ENDPOINT}", self.http_url); + tracing::debug!(url = %redact_url(&url), "Signal health check: GET"); + let response = self + .client + .get(&url) + .timeout(Duration::from_secs(10)) + .send() + .await + .context("signal health check failed: connection error")?; + + tracing::debug!(status = %response.status(), "Signal health check response"); + + if response.status().is_success() { + Ok(()) + } else { + Err(anyhow::anyhow!( + "signal health check returned HTTP {}", + response.status() + ))? + } + } + + async fn shutdown(&self) -> crate::Result<()> { + // Cancel all typing indicator tasks. + let mut tasks = self.typing_tasks.write().await; + for (_, handle) in tasks.drain() { + handle.abort(); + } + + // Signal the SSE listener to stop. + if let Some(shutdown_tx) = self.shutdown_tx.write().await.take() { + shutdown_tx.try_send(()).ok(); + } + + tracing::info!(adapter = %self.runtime_key, "signal adapter shut down"); + Ok(()) + } +} + +// ── SSE listener ──────────────────────────────────────────────── + +/// Long-running SSE listener that reconnects with exponential backoff. +/// +/// Connects to `{http_url}/api/v1/events?account={account}`, parses SSE events, +/// processes envelopes through the adapter's permission filters, and sends +/// valid inbound messages through the channel. +async fn sse_listener( + adapter: SignalAdapter, + client: reqwest::Client, + http_url: String, + account: String, + inbound_tx: mpsc::Sender, + mut shutdown_rx: mpsc::Receiver<()>, +) { + let sse_url = match reqwest::Url::parse(&format!("{http_url}{SSE_ENDPOINT}")) { + Ok(mut url) => { + url.query_pairs_mut().append_pair("account", &account); + url + } + Err(error) => { + tracing::error!(%error, "signal: invalid SSE URL, listener exiting"); + return; + } + }; + + let mut retry_delay = SSE_INITIAL_BACKOFF; + + loop { + // Check for shutdown before each connection attempt. + if shutdown_rx.try_recv().is_ok() { + tracing::info!("signal SSE listener shutting down"); + return; + } + + let response = client + .get(sse_url.clone()) + .header("Accept", "text/event-stream") + .send() + .await; + + let response = match response { + Ok(response) if response.status().is_success() => response, + Ok(response) => { + tracing::warn!( + status = %response.status(), + "signal SSE returned error, retrying in {:?}", + retry_delay + ); + tokio::select! { + _ = tokio::time::sleep(retry_delay) => {} + _ = shutdown_rx.recv() => { + tracing::info!("signal SSE listener shutting down during retry"); + return; + } + } + retry_delay = (retry_delay * 2).min(SSE_MAX_BACKOFF); + continue; + } + Err(error) => { + tracing::warn!( + %error, + "signal SSE connect error, retrying in {:?}", + retry_delay + ); + tokio::select! { + _ = tokio::time::sleep(retry_delay) => {} + _ = shutdown_rx.recv() => { + tracing::info!("signal SSE listener shutting down during retry"); + return; + } + } + retry_delay = (retry_delay * 2).min(SSE_MAX_BACKOFF); + continue; + } + }; + + // Connection succeeded — reset backoff. + retry_delay = SSE_INITIAL_BACKOFF; + tracing::info!("signal SSE connected"); + + // ── stream processing loop ────────────────────────────── + + let mut bytes_stream = response.bytes_stream(); + let mut buffer = String::with_capacity(8192); + let mut current_data = String::with_capacity(4096); + // Holds trailing bytes from the previous chunk that form an incomplete + // multi-byte UTF-8 sequence. At most 3 bytes (the longest incomplete + // leading sequence for a 4-byte character). + let mut utf8_carry: Vec = Vec::with_capacity(4); + + 'stream: loop { + tokio::select! { + _ = shutdown_rx.recv() => { + tracing::info!("signal SSE listener shutting down"); + return; + } + chunk = bytes_stream.next() => { + let Some(chunk) = chunk else { + // Stream ended — break to reconnect. + break 'stream; + }; + let chunk = match chunk { + Ok(chunk) => chunk, + Err(error) => { + tracing::debug!(%error, "signal SSE chunk error, reconnecting"); + break 'stream; + } + }; + + // Prepend any leftover bytes from the previous chunk. + let decode_buf = if utf8_carry.is_empty() { + chunk.to_vec() + } else { + let mut combined = std::mem::take(&mut utf8_carry); + combined.extend_from_slice(&chunk); + combined + }; + + // Decode as much valid UTF-8 as possible, carrying over any + // incomplete trailing sequence to the next iteration. + let (valid_len, carry_start) = match std::str::from_utf8(&decode_buf) { + Ok(_) => (decode_buf.len(), decode_buf.len()), + Err(error) => { + let valid_up_to = error.valid_up_to(); + match error.error_len() { + Some(bad_len) => { + // Genuinely invalid byte sequence — skip the bad byte(s). + tracing::debug!( + offset = valid_up_to, + "signal SSE: invalid UTF-8 byte, skipping" + ); + (valid_up_to, valid_up_to + bad_len) + } + None => { + // Incomplete multi-byte sequence at the end — carry it over. + (valid_up_to, valid_up_to) + } + } + } + }; + + let text = match std::str::from_utf8(&decode_buf[..valid_len]) { + Ok(s) => s, + Err(_) => { + tracing::warn!( + "signal SSE: unexpected invalid UTF-8 at boundary, skipping chunk" + ); + continue; + } + }; + + // Buffer overflow protection. + if buffer.len() + text.len() > MAX_SSE_BUFFER_SIZE { + tracing::warn!( + buffer_len = buffer.len(), + text_len = text.len(), + "signal SSE buffer overflow, resetting" + ); + buffer.clear(); + utf8_carry.clear(); + current_data.clear(); + continue; + } + buffer.push_str(text); + + // Preserve any trailing incomplete bytes for the next chunk. + if carry_start < decode_buf.len() { + utf8_carry.extend_from_slice(&decode_buf[carry_start..]); + } + + // Parse complete lines from the buffer. + while let Some(newline_pos) = buffer.find('\n') { + let line = buffer[..newline_pos].trim_end_matches('\r').to_string(); + buffer.drain(..=newline_pos); + + // Skip SSE comments (keepalive pings). + if line.starts_with(':') { + continue; + } + + if line.is_empty() { + // Empty line = event boundary — dispatch accumulated data. + if !current_data.is_empty() { + process_sse_event(&adapter, ¤t_data, &inbound_tx).await; + current_data.clear(); + } + } else if let Some(data) = line.strip_prefix("data:") { + // Guard against oversized single events. + if current_data.len() + data.len() > MAX_SSE_EVENT_SIZE { + tracing::warn!("signal SSE event too large, dropping"); + current_data.clear(); + continue; + } + if !current_data.is_empty() { + current_data.push('\n'); + } + current_data.push_str(data.trim_start()); + } + // Ignore "event:", "id:", "retry:" lines. + } + } + } + } + + // Process any trailing data before reconnect. + if !current_data.is_empty() { + process_sse_event(&adapter, ¤t_data, &inbound_tx).await; + } + + tracing::debug!("signal SSE stream ended, reconnecting with backoff..."); + tokio::select! { + _ = tokio::time::sleep(retry_delay) => {} + _ = shutdown_rx.recv() => { + tracing::info!("signal SSE listener shutting down during reconnect"); + return; + } + } + retry_delay = (retry_delay * 2).min(SSE_MAX_BACKOFF); + } +} + +/// Parse and dispatch a single SSE event. +async fn process_sse_event( + adapter: &SignalAdapter, + data: &str, + inbound_tx: &mpsc::Sender, +) { + let sse: SseEnvelope = match serde_json::from_str(data) { + Ok(sse) => sse, + Err(error) => { + tracing::debug!(%error, "signal SSE: failed to parse event, skipping"); + return; + } + }; + + let Some(ref envelope) = sse.envelope else { + return; + }; + + let Some((inbound, _reply_target)) = adapter.process_envelope(envelope) else { + return; + }; + + if let Err(error) = inbound_tx.send(inbound).await { + tracing::warn!(%error, "signal: inbound channel closed (receiver dropped)"); + } +} + +// ── metadata builder ──────────────────────────────────────────── + +/// Build platform-specific metadata for a Signal message. +/// +/// Returns `(metadata_map, formatted_author)`. +fn build_metadata( + envelope: &Envelope, + reply_target: &str, + group_id: Option<&str>, +) -> (HashMap, Option) { + let mut metadata = HashMap::new(); + + // Sender identifiers. + // Try phone number first, then UUID, then source (legacy), then unknown + // This ensures UUID-only privacy-mode envelopes preserve sender identity + let sender = envelope + .source_number + .as_deref() + .or(envelope.source_uuid.as_deref()) + .or(envelope.source.as_deref()) + .unwrap_or("unknown"); + + metadata.insert( + "signal_source".into(), + serde_json::Value::String(sender.to_string()), + ); + + // Always include both UUID and phone number (null if not available) + metadata.insert( + "signal_source_number".into(), + envelope + .source_number + .clone() + .map(serde_json::Value::String) + .unwrap_or(serde_json::Value::Null), + ); + + metadata.insert( + "signal_source_uuid".into(), + envelope + .source_uuid + .clone() + .map(serde_json::Value::String) + .unwrap_or(serde_json::Value::Null), + ); + + // Sender context for agent visibility - always shows both e164 and uuid + let e164 = envelope + .source_number + .as_deref() + .map(|n| format!("e164:{}", n)) + .unwrap_or_else(|| "e164:none".to_string()); + + let uuid = envelope + .source_uuid + .as_deref() + .map(|u| format!("uuid:{}", u)) + .unwrap_or_else(|| "uuid:none".to_string()); + + let sender_context = format!("[Signal: {} {}]", e164, uuid); + metadata.insert( + "sender_context".into(), + serde_json::Value::String(sender_context), + ); + + if let Some(ref source_name) = envelope.source_name { + metadata.insert( + "signal_source_name".into(), + serde_json::Value::String(source_name.clone()), + ); + } + + // Reply target for outbound routing. + metadata.insert( + "signal_target".into(), + serde_json::Value::String(reply_target.to_string()), + ); + + // Timestamp. + let timestamp = envelope + .data_message + .as_ref() + .and_then(|dm| dm.timestamp) + .or(envelope.timestamp); + if let Some(ts) = timestamp { + metadata.insert( + "signal_timestamp".into(), + serde_json::Value::Number(ts.into()), + ); + } + + // Chat type. + let chat_type = if group_id.is_some() { "group" } else { "dm" }; + metadata.insert( + "signal_chat_type".into(), + serde_json::Value::String(chat_type.into()), + ); + + // Group ID. + if let Some(gid) = group_id { + metadata.insert( + "signal_group_id".into(), + serde_json::Value::String(gid.to_string()), + ); + } + + // Standard metadata keys. + metadata.insert( + metadata_keys::MESSAGE_ID.into(), + serde_json::Value::String(format!("{}", timestamp.unwrap_or(0))), + ); + + // Channel name: "Group Chat {id}" for groups, "Direct Message with {name}" for DMs. + let channel_name = if let Some(gid) = group_id { + format!("Group Chat {}", gid) + } else { + envelope + .source_name + .as_deref() + .filter(|name| !name.is_empty()) + .map(|name| format!("Direct Message with {}", name)) + .unwrap_or_else(|| "Direct Message".to_string()) + }; + metadata.insert( + metadata_keys::CHANNEL_NAME.into(), + serde_json::Value::String(channel_name), + ); + + // Sender display name. + let display_name = envelope + .source_name + .as_deref() + .filter(|name| !name.is_empty()) + .unwrap_or(sender); + + metadata.insert( + "sender_display_name".into(), + serde_json::Value::String(display_name.to_string()), + ); + + // Formatted author: "Display Name (+phone)" or just the display name. + let formatted_author = if let Some(ref source_number) = envelope.source_number + && display_name != source_number + { + Some(format!("{display_name} ({source_number})")) + } else { + Some(display_name.to_string()) + }; + + (metadata, formatted_author) +} + +// ── helper functions ──────────────────────────────────────────── + +/// Parse a reply target string into a `RecipientTarget`. +/// +/// Format: `"group:{group_id}"` for groups, otherwise treated as a direct +/// message identifier (phone number or UUID). +fn parse_recipient_target(target: &str) -> RecipientTarget { + if let Some(group_id) = target.strip_prefix(GROUP_TARGET_PREFIX) { + RecipientTarget::Group(group_id.to_string()) + } else { + RecipientTarget::Direct(target.to_string()) + } +} + +// ── tests ─────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_recipient_target_dm() { + let target = parse_recipient_target("+1234567890"); + assert!(matches!(target, RecipientTarget::Direct(ref id) if id == "+1234567890")); + } + + #[test] + fn parse_recipient_target_uuid() { + let target = parse_recipient_target("a1b2c3d4-e5f6-7890-abcd-ef1234567890"); + assert!(matches!( + target, + RecipientTarget::Direct(ref id) if id == "a1b2c3d4-e5f6-7890-abcd-ef1234567890" + )); + } + + #[test] + fn parse_recipient_target_group() { + let target = parse_recipient_target("group:abc123"); + assert!(matches!(target, RecipientTarget::Group(ref id) if id == "abc123")); + } + + #[test] + fn parse_sse_envelope_dm() { + let json = r#"{ + "envelope": { + "sourceNumber": "+1111111111", + "sourceName": "Alice", + "sourceUuid": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + "dataMessage": { + "message": "Hello", + "timestamp": 1700000000000 + } + } + }"#; + let sse: SseEnvelope = serde_json::from_str(json).unwrap(); + let envelope = sse.envelope.unwrap(); + assert_eq!(envelope.source_number.as_deref(), Some("+1111111111")); + assert_eq!(envelope.source_name.as_deref(), Some("Alice")); + let dm = envelope.data_message.unwrap(); + assert_eq!(dm.message.as_deref(), Some("Hello")); + assert_eq!(dm.timestamp, Some(1700000000000)); + assert!(dm.group_info.is_none()); + } + + #[test] + fn parse_sse_envelope_group() { + let json = r#"{ + "envelope": { + "sourceNumber": "+1111111111", + "dataMessage": { + "message": "Group hello", + "groupInfo": { + "groupId": "testgroup123" + } + } + } + }"#; + let sse: SseEnvelope = serde_json::from_str(json).unwrap(); + let dm = sse.envelope.unwrap().data_message.unwrap(); + assert_eq!( + dm.group_info.unwrap().group_id.as_deref(), + Some("testgroup123") + ); + } + + #[test] + fn parse_sse_envelope_story_message() { + let json = r#"{ + "envelope": { + "sourceNumber": "+1111111111", + "storyMessage": {"text": "story content"} + } + }"#; + let sse: SseEnvelope = serde_json::from_str(json).unwrap(); + assert!(sse.envelope.unwrap().story_message.is_some()); + } + + #[test] + fn parse_sse_envelope_with_attachments() { + let json = r#"{ + "envelope": { + "sourceNumber": "+1111111111", + "dataMessage": { + "message": "See attached", + "attachments": [ + {"contentType": "image/jpeg", "filename": "photo.jpg"}, + {"contentType": "application/pdf"} + ] + } + } + }"#; + let sse: SseEnvelope = serde_json::from_str(json).unwrap(); + let dm = sse.envelope.unwrap().data_message.unwrap(); + assert_eq!(dm.attachments.unwrap().len(), 2); + } + + #[test] + fn parse_sse_envelope_empty() { + let json = r#"{}"#; + let sse: SseEnvelope = serde_json::from_str(json).unwrap(); + assert!(sse.envelope.is_none()); + } + + #[test] + fn parse_sse_envelope_no_data_message() { + let json = r#"{"envelope": {"sourceNumber": "+1111111111"}}"#; + let sse: SseEnvelope = serde_json::from_str(json).unwrap(); + assert!(sse.envelope.unwrap().data_message.is_none()); + } + + #[test] + fn build_rpc_params_dm() { + let adapter = test_adapter(); + let target = RecipientTarget::Direct("+5555555555".to_string()); + let params = adapter.build_rpc_params(&target, Some("hello"), None); + + assert_eq!(params["recipient"], serde_json::json!(["+5555555555"])); + assert_eq!(params["account"], "+0000000000"); + assert_eq!(params["message"], "hello"); + assert!(params.get("groupId").is_none()); + } + + #[test] + fn build_rpc_params_group() { + let adapter = test_adapter(); + let target = RecipientTarget::Group("abc123".to_string()); + let params = adapter.build_rpc_params(&target, Some("hello"), None); + + assert_eq!(params["groupId"], "abc123"); + assert_eq!(params["account"], "+0000000000"); + assert!(params.get("recipient").is_none()); + } + + #[test] + fn build_rpc_params_with_attachments() { + let adapter = test_adapter(); + let target = RecipientTarget::Direct("+5555555555".to_string()); + let paths = vec!["/tmp/file.png".to_string()]; + let params = adapter.build_rpc_params(&target, Some("caption"), Some(&paths)); + + assert_eq!(params["attachments"], serde_json::json!(["/tmp/file.png"])); + assert_eq!(params["message"], "caption"); + } + + #[test] + fn build_rpc_params_no_message() { + let adapter = test_adapter(); + let target = RecipientTarget::Direct("+5555555555".to_string()); + let params = adapter.build_rpc_params(&target, None, None); + + assert!(params.get("message").is_none()); + assert!(params.get("attachments").is_none()); + } + + #[test] + fn build_metadata_dm() { + let envelope = Envelope { + source: None, + source_number: Some("+1234567890".into()), + source_name: Some("Alice".into()), + source_uuid: Some("aaaa-bbbb".into()), + data_message: Some(DataMessage { + message: Some("hi".into()), + timestamp: Some(1700000000000), + group_info: None, + attachments: None, + }), + story_message: None, + timestamp: None, + }; + + let (metadata, author) = build_metadata(&envelope, "+1234567890", None); + + assert_eq!( + metadata.get("signal_source").unwrap().as_str().unwrap(), + "+1234567890" + ); + assert_eq!( + metadata + .get("signal_source_uuid") + .unwrap() + .as_str() + .unwrap(), + "aaaa-bbbb" + ); + assert_eq!( + metadata.get("signal_chat_type").unwrap().as_str().unwrap(), + "dm" + ); + assert!(!metadata.contains_key("signal_group_id")); + assert_eq!(author.unwrap(), "Alice (+1234567890)"); + assert_eq!( + metadata.get("sender_context").unwrap().as_str().unwrap(), + "[Signal: e164:+1234567890 uuid:aaaa-bbbb]" + ); + } + + #[test] + fn build_metadata_group() { + let envelope = Envelope { + source: None, + source_number: Some("+1234567890".into()), + source_name: Some("Bob".into()), + source_uuid: None, + data_message: Some(DataMessage { + message: Some("group msg".into()), + timestamp: Some(1700000000000), + group_info: Some(GroupInfo { + group_id: Some("grp123".into()), + }), + attachments: None, + }), + story_message: None, + timestamp: None, + }; + + let (metadata, _) = build_metadata(&envelope, "group:grp123", Some("grp123")); + + assert_eq!( + metadata.get("signal_chat_type").unwrap().as_str().unwrap(), + "group" + ); + assert_eq!( + metadata.get("signal_group_id").unwrap().as_str().unwrap(), + "grp123" + ); + assert_eq!( + metadata.get("sender_context").unwrap().as_str().unwrap(), + "[Signal: e164:+1234567890 uuid:none]" + ); + } + + #[test] + fn build_metadata_privacy_mode() { + let envelope = Envelope { + source: None, + source_number: None, + source_name: Some("Alice".into()), + source_uuid: Some("uuid-only-123".into()), + data_message: Some(DataMessage { + message: Some("hi from privacy mode".into()), + timestamp: Some(1700000000000), + group_info: None, + attachments: None, + }), + story_message: None, + timestamp: None, + }; + + let (metadata, _) = build_metadata(&envelope, "+1234567890", None); + + assert_eq!( + metadata.get("sender_context").unwrap().as_str().unwrap(), + "[Signal: e164:none uuid:uuid-only-123]" + ); + } + + #[test] + fn process_envelope_drops_story_when_configured() { + let adapter = test_adapter(); + let envelope = Envelope { + source: None, + source_number: Some("+1111111111".into()), + source_name: None, + source_uuid: None, + data_message: None, + story_message: Some(serde_json::json!({"text": "story"})), + timestamp: None, + }; + assert!(adapter.process_envelope(&envelope).is_none()); + } + + #[test] + fn process_envelope_drops_empty_message() { + let adapter = test_adapter(); + let envelope = Envelope { + source: None, + source_number: Some("+1111111111".into()), + source_name: None, + source_uuid: None, + data_message: Some(DataMessage { + message: None, + timestamp: None, + group_info: None, + attachments: None, + }), + story_message: None, + timestamp: None, + }; + assert!(adapter.process_envelope(&envelope).is_none()); + } + + #[test] + fn process_envelope_attachment_only_produces_placeholder() { + let adapter = test_adapter_open_dm(); + let envelope = Envelope { + source: None, + source_number: Some("+2222222222".into()), + source_name: None, + source_uuid: None, + data_message: Some(DataMessage { + message: None, + timestamp: Some(1700000000000), + group_info: None, + attachments: Some(vec![serde_json::json!({"contentType": "image/png"})]), + }), + story_message: None, + timestamp: None, + }; + let result = adapter.process_envelope(&envelope); + assert!(result.is_some()); + let (msg, _) = result.unwrap(); + if let MessageContent::Text(text) = &msg.content { + assert_eq!(text, "[Attachment]"); + } else { + panic!("expected Text content"); + } + } + + #[test] + fn process_envelope_dm_allowed() { + let adapter = test_adapter_with_dm_allowed(vec!["+2222222222".to_string()]); + let envelope = make_dm_envelope("+2222222222", "hello"); + let result = adapter.process_envelope(&envelope); + assert!(result.is_some()); + } + + #[test] + fn process_envelope_dm_rejected() { + let adapter = test_adapter_with_dm_allowed(vec!["+3333333333".to_string()]); + let envelope = make_dm_envelope("+2222222222", "hello"); + assert!(adapter.process_envelope(&envelope).is_none()); + } + + #[test] + fn process_envelope_dm_blocked_when_empty() { + // Empty dm_allowed_users = block all DMs + let adapter = test_adapter(); + let envelope = make_dm_envelope("+2222222222", "hello"); + assert!(adapter.process_envelope(&envelope).is_none()); + } + + #[test] + fn process_envelope_dm_allowed_wildcard() { + // ["*"] in dm_allowed_users = allow all DMs + let adapter = test_adapter_open_dm(); + let envelope = make_dm_envelope("+9999999999", "hello from anyone"); + let result = adapter.process_envelope(&envelope); + assert!(result.is_some()); + } + + #[test] + fn process_envelope_group_blocked_by_default() { + // Default group_filter is None = block all groups. + let adapter = test_adapter(); + let envelope = make_group_envelope("+1111111111", "hi group", "grp123"); + assert!(adapter.process_envelope(&envelope).is_none()); + } + + #[test] + fn process_envelope_group_blocked_when_empty() { + // Empty group_filter = block all groups + let perms = SignalPermissions { + group_filter: Some(vec![]), + dm_allowed_users: vec!["*".to_string()], + group_allowed_users: vec!["*".to_string()], + }; + let adapter = test_adapter_with_permissions(perms); + let envelope = make_group_envelope("+1111111111", "hi group", "grp123"); + assert!(adapter.process_envelope(&envelope).is_none()); + } + + #[test] + fn process_envelope_group_allowed_wildcard() { + // ["*"] in group_filter = allow all groups + let perms = SignalPermissions { + group_filter: Some(vec!["*".to_string()]), + dm_allowed_users: vec!["*".to_string()], + group_allowed_users: vec!["*".to_string()], + }; + let adapter = test_adapter_with_permissions(perms); + let envelope = make_group_envelope("+1111111111", "hi group", "any-group"); + let result = adapter.process_envelope(&envelope); + assert!(result.is_some()); + } + + #[test] + fn process_envelope_group_allowed_when_configured() { + let perms = SignalPermissions { + group_filter: Some(vec!["grp123".to_string()]), + dm_allowed_users: vec!["+1111111111".to_string()], + group_allowed_users: vec!["+1111111111".to_string()], + }; + let adapter = test_adapter_with_permissions(perms); + let envelope = make_group_envelope("+1111111111", "hi group", "grp123"); + let result = adapter.process_envelope(&envelope); + assert!(result.is_some()); + let (msg, target) = result.unwrap(); + assert_eq!(target, "group:grp123"); + assert!(msg.conversation_id.starts_with("signal:group:grp123")); + } + + #[test] + fn process_envelope_conversation_id_dm() { + let adapter = test_adapter_open_dm(); + let envelope = Envelope { + source: None, + source_number: Some("+1234567890".into()), + source_name: None, + source_uuid: Some("uuid-1234".into()), + data_message: Some(DataMessage { + message: Some("test".into()), + timestamp: Some(1700000000000), + group_info: None, + attachments: None, + }), + story_message: None, + timestamp: None, + }; + let (msg, _) = adapter.process_envelope(&envelope).unwrap(); + // Should use UUID when available, in canonical "signal:uuid:{uuid}" format. + assert_eq!(msg.conversation_id, "signal:uuid:uuid-1234"); + } + + // ── test helpers ──────────────────────────────────────────── + + fn test_adapter() -> SignalAdapter { + // Default: blocks all DMs and groups (empty lists) + test_adapter_with_permissions(SignalPermissions::default()) + } + + fn test_adapter_open_dm() -> SignalAdapter { + // Wildcard ["*"] means allow all DMs (but groups still blocked by default) + test_adapter_with_permissions(SignalPermissions { + group_filter: None, + dm_allowed_users: vec!["*".to_string()], + group_allowed_users: vec![], + }) + } + + fn test_adapter_with_dm_allowed(users: Vec) -> SignalAdapter { + test_adapter_with_permissions(SignalPermissions { + group_filter: None, + dm_allowed_users: users, + group_allowed_users: vec![], + }) + } + + fn test_adapter_with_permissions(perms: SignalPermissions) -> SignalAdapter { + SignalAdapter { + runtime_key: "signal".into(), + http_url: "http://127.0.0.1:8686".into(), + account: "+0000000000".into(), + ignore_stories: true, + permissions: Arc::new(ArcSwap::from_pointee(perms)), + client: reqwest::Client::new(), + tmp_dir: PathBuf::from("/tmp/spacebot-test"), + typing_tasks: Arc::new(RwLock::new(HashMap::new())), + shutdown_tx: Arc::new(RwLock::new(None)), + } + } + + fn make_dm_envelope(sender: &str, text: &str) -> Envelope { + Envelope { + source: None, + source_number: Some(sender.into()), + source_name: None, + source_uuid: None, + data_message: Some(DataMessage { + message: Some(text.into()), + timestamp: Some(1700000000000), + group_info: None, + attachments: None, + }), + story_message: None, + timestamp: None, + } + } + + fn make_group_envelope(sender: &str, text: &str, group_id: &str) -> Envelope { + Envelope { + source: None, + source_number: Some(sender.into()), + source_name: None, + source_uuid: None, + data_message: Some(DataMessage { + message: Some(text.into()), + timestamp: Some(1700000000000), + group_info: Some(GroupInfo { + group_id: Some(group_id.into()), + }), + attachments: None, + }), + story_message: None, + timestamp: None, + } + } + + #[test] + fn process_envelope_dm_allowed_by_uuid() { + // Test that UUID matching works when sender has no phone number + let perms = SignalPermissions { + group_filter: None, + dm_allowed_users: vec!["uuid-1234".to_string()], + group_allowed_users: vec![], + }; + let adapter = test_adapter_with_permissions(perms); + let envelope = Envelope { + source: None, + source_number: None, + source_name: None, + source_uuid: Some("uuid-1234".into()), + data_message: Some(DataMessage { + message: Some("hello".into()), + timestamp: Some(1700000000000), + group_info: None, + attachments: None, + }), + story_message: None, + timestamp: None, + }; + let result = adapter.process_envelope(&envelope); + assert!( + result.is_some(), + "DM should be allowed when sender UUID matches" + ); + } + + #[test] + fn process_envelope_group_message_rejected_when_sender_not_allowed() { + // Group message from sender not in group_allowed_users should be rejected + let perms = SignalPermissions { + group_filter: Some(vec!["grp123".to_string()]), + dm_allowed_users: vec!["+9999999999".to_string()], // Different from sender + group_allowed_users: vec!["+9999999999".to_string()], + }; + let adapter = test_adapter_with_permissions(perms); + let envelope = make_group_envelope("+1111111111", "hi group", "grp123"); + let result = adapter.process_envelope(&envelope); + assert!( + result.is_none(), + "Group message should be rejected when sender not in allowed users" + ); + } +} + +#[cfg(test)] +mod rpc_error_tests { + use super::*; + + #[test] + fn rpc_params_build_direct_message() { + let adapter = test_adapter_with_permissions(SignalPermissions::default()); + let params = adapter.build_rpc_params( + &RecipientTarget::Direct("+1234567890".to_string()), + Some("test message"), + None, + ); + // Verify recipient is an array (signal-cli requirement) + assert!(params.get("recipient").is_some()); + assert!(params["recipient"].is_array()); + } + + #[test] + fn rpc_params_build_group_message() { + let adapter = test_adapter_with_permissions(SignalPermissions::default()); + let params = adapter.build_rpc_params( + &RecipientTarget::Group("base64groupid".to_string()), + Some("group message"), + None, + ); + // Verify groupId is used instead of recipient + assert!(params.get("groupId").is_some()); + assert!(params.get("recipient").is_none()); + } + + #[test] + fn rpc_params_with_attachments() { + let adapter = test_adapter_with_permissions(SignalPermissions::default()); + let attachments = vec!["/tmp/file1.jpg".to_string(), "/tmp/file2.png".to_string()]; + let params = adapter.build_rpc_params( + &RecipientTarget::Direct("+1234567890".to_string()), + Some("check this out"), + Some(&attachments), + ); + assert!(params.get("attachments").is_some()); + let attachments_arr = params["attachments"].as_array().unwrap(); + assert_eq!(attachments_arr.len(), 2); + } + + fn test_adapter_with_permissions(perms: SignalPermissions) -> SignalAdapter { + SignalAdapter { + runtime_key: "signal".into(), + http_url: "http://127.0.0.1:8686".into(), + account: "+0000000000".into(), + ignore_stories: true, + permissions: Arc::new(ArcSwap::from_pointee(perms)), + client: reqwest::Client::new(), + tmp_dir: PathBuf::from("/tmp/spacebot-test"), + typing_tasks: Arc::new(RwLock::new(HashMap::new())), + shutdown_tx: Arc::new(RwLock::new(None)), + } + } +} diff --git a/src/messaging/target.rs b/src/messaging/target.rs index 51e7f721a..17676afab 100644 --- a/src/messaging/target.rs +++ b/src/messaging/target.rs @@ -101,6 +101,28 @@ pub fn resolve_broadcast_target(channel: &ChannelInfo) -> Option { + // Signal channels store target in signal_target metadata + if let Some(signal_target) = channel + .platform_meta + .as_ref() + .and_then(|meta| meta.get("signal_target")) + .and_then(json_value_to_string) + { + // signal_target is already normalized (e.g., "uuid:xxxx", "group:xxxx", "+123...") + // Determine adapter from channel.id: named if format is "signal:{name}:..." + let adapter = extract_signal_adapter_from_channel_id(&channel.id); + let target = normalize_signal_target(&signal_target)?; + return Some(BroadcastTarget { adapter, target }); + } + + // Fallback: parse from conversation ID + // Format: signal:{target} or signal:{instance}:{target} + // where {target} is uuid:xxx, group:xxx, or +xxx + let parts: Vec<&str> = channel.id.split(':').collect(); + // Skip "signal" prefix and use shared parser for the rest + return parse_signal_target_parts(parts.get(1..).unwrap_or(&[])); + } "email" => { let reply_to = channel .platform_meta @@ -129,7 +151,7 @@ pub fn resolve_broadcast_target(channel: &ChannelInfo) -> Option Option { +pub fn normalize_target(adapter: &str, raw_target: &str) -> Option { let trimmed = raw_target.trim(); if trimmed.is_empty() { return None; @@ -143,6 +165,7 @@ fn normalize_target(adapter: &str, raw_target: &str) -> Option { "email" => normalize_email_target(trimmed), // Webchat targets are full conversation IDs (e.g. "portal:chat:main") "webchat" => Some(trimmed.to_string()), + "signal" => normalize_signal_target(trimmed), _ => Some(trimmed.to_string()), } } @@ -235,6 +258,55 @@ fn normalize_email_target(raw_target: &str) -> Option { None } +fn normalize_signal_target(raw_target: &str) -> Option { + let target = strip_repeated_prefix(raw_target, "signal"); + + // Handle uuid:xxxx-xxxx format + if let Some(uuid) = target.strip_prefix("uuid:") { + if !uuid.is_empty() { + return Some(format!("uuid:{uuid}")); + } + return None; + } + + // Handle group:grp123 format + if let Some(group_id) = target.strip_prefix("group:") { + if !group_id.is_empty() { + return Some(format!("group:{group_id}")); + } + return None; + } + + // Handle e164:+123 or bare +123 format + if let Some(phone) = target.strip_prefix("e164:") { + let phone = phone.trim_start_matches('+'); + if !phone.is_empty() && phone.len() >= 7 && phone.chars().all(|c| c.is_ascii_digit()) { + return Some(format!("+{phone}")); + } + return None; + } + + // Bare +123 format + if let Some(phone) = target.strip_prefix('+') { + if !phone.is_empty() && phone.len() >= 7 && phone.chars().all(|c| c.is_ascii_digit()) { + return Some(target.to_string()); + } + return None; + } + + // Check if it's a valid UUID (contains dashes and alphanumeric) + if target.contains('-') && target.len() > 8 && target.chars().any(|c| c.is_ascii_digit()) { + return Some(format!("uuid:{target}")); + } + + // Check if it's a bare phone number (7+ digits required for E.164) + if target.chars().all(|c| c.is_ascii_digit()) && target.len() >= 7 { + return Some(format!("+{target}")); + } + + None +} + fn strip_repeated_prefix<'a>(raw_target: &'a str, adapter: &str) -> &'a str { let mut target = raw_target; let prefix = format!("{adapter}:"); @@ -257,6 +329,100 @@ fn json_value_to_string(value: &serde_json::Value) -> Option { None } +/// Extract the Signal adapter name from a channel ID. +/// +/// Channel ID formats: +/// - "signal:{target}" -> default adapter "signal" +/// - "signal:{instance}:{target}" -> named adapter "signal:{instance}" +/// +/// Where {target} is uuid:xxx, group:xxx, or +xxx (starts with valid target prefix) +fn extract_signal_adapter_from_channel_id(channel_id: &str) -> String { + let parts: Vec<&str> = channel_id.split(':').collect(); + match parts.as_slice() { + // Named adapter: signal:{instance}:uuid:{uuid}, signal:{instance}:group:{id} + // or signal:{instance}:e164:+{phone} + ["signal", instance, "uuid", ..] + | ["signal", instance, "group", ..] + | ["signal", instance, "e164", ..] => { + format!("signal:{instance}") + } + // Named adapter: signal:{instance}:+{phone} + ["signal", instance, phone, ..] if phone.starts_with('+') => { + format!("signal:{instance}") + } + // Default adapter: signal:{target} + _ => "signal".to_string(), + } +} + +/// Parse Signal target components into BroadcastTarget. +/// +/// Handles formats: +/// - Default adapter: ["uuid", xxx], ["group", xxx], ["e164", +xxx], ["+xxx"] +/// - Named adapter: [instance, "uuid", xxx], [instance, "group", xxx], [instance, "e164", +xxx], [instance, "+xxx"] +/// +/// Returns None for invalid formats. +pub fn parse_signal_target_parts(parts: &[&str]) -> Option { + match parts { + // Default adapter: signal:uuid:xxx, signal:group:xxx, signal:e164:+xxx, signal:+xxx + ["uuid", uuid] => Some(BroadcastTarget { + adapter: "signal".to_string(), + target: format!("uuid:{uuid}"), + }), + ["group", group_id] => Some(BroadcastTarget { + adapter: "signal".to_string(), + target: format!("group:{group_id}"), + }), + // Use normalize_signal_target for phone/e164 to ensure consistent parsing + ["e164", phone] => { + normalize_signal_target(&format!("e164:{phone}")).map(|target| BroadcastTarget { + adapter: "signal".to_string(), + target, + }) + } + [phone] if phone.starts_with('+') => { + normalize_signal_target(phone).map(|target| BroadcastTarget { + adapter: "signal".to_string(), + target, + }) + } + // Single-part targets: delegate to normalize_signal_target for bare UUIDs/phones + [single] => normalize_signal_target(single).map(|target| BroadcastTarget { + adapter: "signal".to_string(), + target, + }), + // Named adapter: signal:instance:uuid:xxx, signal:instance:group:xxx + [instance, "uuid", uuid] => Some(BroadcastTarget { + adapter: format!("signal:{instance}"), + target: format!("uuid:{uuid}"), + }), + [instance, "group", group_id] => Some(BroadcastTarget { + adapter: format!("signal:{instance}"), + target: format!("group:{group_id}"), + }), + // Named adapter: signal:instance:e164:+xxx - use normalize_signal_target + [instance, "e164", phone] => { + normalize_signal_target(&format!("e164:{phone}")).map(|target| BroadcastTarget { + adapter: format!("signal:{instance}"), + target, + }) + } + // Named adapter: signal:instance:+xxx - use normalize_signal_target + [instance, phone] if phone.starts_with('+') => { + normalize_signal_target(phone).map(|target| BroadcastTarget { + adapter: format!("signal:{instance}"), + target, + }) + } + // Named adapter with single-part target: delegate to normalize_signal_target + [instance, single] => normalize_signal_target(single).map(|target| BroadcastTarget { + adapter: format!("signal:{instance}"), + target, + }), + _ => None, + } +} + #[cfg(test)] mod tests { use super::{parse_delivery_target, resolve_broadcast_target}; @@ -366,4 +532,175 @@ mod tests { }) ); } + + // Signal tests + #[test] + fn parse_signal_uuid_with_prefix() { + let parsed = parse_delivery_target("signal:uuid:abc-123-def"); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal".to_string(), + target: "uuid:abc-123-def".to_string(), + }) + ); + } + + #[test] + fn parse_signal_group_with_prefix() { + let parsed = parse_delivery_target("signal:group:grp123"); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal".to_string(), + target: "group:grp123".to_string(), + }) + ); + } + + #[test] + fn parse_signal_phone_with_prefix() { + let parsed = parse_delivery_target("signal:+1234567890"); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal".to_string(), + target: "+1234567890".to_string(), + }) + ); + } + + #[test] + fn parse_signal_phone_e164_format() { + let parsed = parse_delivery_target("signal:e164:+1234567890"); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal".to_string(), + target: "+1234567890".to_string(), + }) + ); + } + + #[test] + fn parse_signal_phone_e164_no_plus() { + let parsed = parse_delivery_target("signal:e164:1234567890"); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal".to_string(), + target: "+1234567890".to_string(), + }) + ); + } + + // Tests for parse_signal_target_parts + #[test] + fn parse_signal_target_parts_uuid_default() { + let parsed = + super::parse_signal_target_parts(&["uuid", "550e8400-e29b-41d4-a716-446655440000"]); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal".to_string(), + target: "uuid:550e8400-e29b-41d4-a716-446655440000".to_string(), + }) + ); + } + + #[test] + fn parse_signal_target_parts_group_default() { + let parsed = super::parse_signal_target_parts(&["group", "grp123"]); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal".to_string(), + target: "group:grp123".to_string(), + }) + ); + } + + #[test] + fn parse_signal_target_parts_phone_default() { + let parsed = super::parse_signal_target_parts(&["+1234567890"]); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal".to_string(), + target: "+1234567890".to_string(), + }) + ); + } + + #[test] + fn parse_signal_target_parts_e164_default() { + let parsed = super::parse_signal_target_parts(&["e164", "+1234567890"]); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal".to_string(), + target: "+1234567890".to_string(), + }) + ); + } + + #[test] + fn parse_signal_target_parts_uuid_named() { + let parsed = super::parse_signal_target_parts(&[ + "gvoice1", + "uuid", + "550e8400-e29b-41d4-a716-446655440000", + ]); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal:gvoice1".to_string(), + target: "uuid:550e8400-e29b-41d4-a716-446655440000".to_string(), + }) + ); + } + + #[test] + fn parse_signal_target_parts_group_named() { + let parsed = super::parse_signal_target_parts(&["gvoice1", "group", "grp123"]); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal:gvoice1".to_string(), + target: "group:grp123".to_string(), + }) + ); + } + + #[test] + fn parse_signal_target_parts_phone_named() { + let parsed = super::parse_signal_target_parts(&["gvoice1", "+1234567890"]); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal:gvoice1".to_string(), + target: "+1234567890".to_string(), + }) + ); + } + + #[test] + fn parse_signal_target_parts_e164_named() { + let parsed = super::parse_signal_target_parts(&["gvoice1", "e164", "+1234567890"]); + assert_eq!( + parsed, + Some(super::BroadcastTarget { + adapter: "signal:gvoice1".to_string(), + target: "+1234567890".to_string(), + }) + ); + } + + #[test] + fn parse_signal_target_parts_invalid() { + assert!(super::parse_signal_target_parts(&[]).is_none()); + assert!(super::parse_signal_target_parts(&["unknown"]).is_none()); + assert!(super::parse_signal_target_parts(&["uuid"]).is_none()); // missing UUID value + assert!(super::parse_signal_target_parts(&["gvoice1", "unknown"]).is_none()); + } } diff --git a/src/prompts/engine.rs b/src/prompts/engine.rs index 510bcb6c2..16969accb 100644 --- a/src/prompts/engine.rs +++ b/src/prompts/engine.rs @@ -76,6 +76,10 @@ impl PromptEngine { crate::prompts::text::get("adapters/email"), )?; env.add_template("adapters/cron", crate::prompts::text::get("adapters/cron"))?; + env.add_template( + "adapters/signal", + crate::prompts::text::get("adapters/signal"), + )?; // Fragment templates env.add_template( @@ -471,6 +475,7 @@ impl PromptEngine { let template_name = match adapter { "email" => "adapters/email", "cron" => "adapters/cron", + "signal" => "adapters/signal", _ => return None, }; diff --git a/src/prompts/text.rs b/src/prompts/text.rs index 88fe86df8..717e4c4a4 100644 --- a/src/prompts/text.rs +++ b/src/prompts/text.rs @@ -69,6 +69,7 @@ fn lookup(lang: &str, key: &str) -> &'static str { // Adapter-specific prompt fragments ("en", "adapters/email") => include_str!("../../prompts/en/adapters/email.md.j2"), ("en", "adapters/cron") => include_str!("../../prompts/en/adapters/cron.md.j2"), + ("en", "adapters/signal") => include_str!("../../prompts/en/adapters/signal.md.j2"), // Fragment Templates ("en", "fragments/worker_capabilities") => { diff --git a/src/secrets/store.rs b/src/secrets/store.rs index 0a89e0961..3c4bcc19a 100644 --- a/src/secrets/store.rs +++ b/src/secrets/store.rs @@ -1101,8 +1101,8 @@ pub trait SystemSecrets { /// here. pub fn system_secret_registry() -> Vec<&'static SecretField> { use crate::config::{ - DefaultsConfig, DiscordConfig, EmailConfig, LlmConfig, SlackConfig, TelegramConfig, - TwitchConfig, + DefaultsConfig, DiscordConfig, EmailConfig, LlmConfig, SignalConfig, SlackConfig, + TelegramConfig, TwitchConfig, }; let mut fields = Vec::new(); @@ -1116,6 +1116,7 @@ pub fn system_secret_registry() -> Vec<&'static SecretField> { fields.extend(TelegramConfig::secret_fields()); fields.extend(TwitchConfig::secret_fields()); fields.extend(EmailConfig::secret_fields()); + fields.extend(SignalConfig::secret_fields()); fields } diff --git a/src/tools.rs b/src/tools.rs index 4ed3bd7f5..a688d46f8 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -341,6 +341,7 @@ pub async fn add_channel_tools( cron_tool: Option, send_agent_message_tool: Option, allow_direct_reply: bool, + current_adapter: Option, ) -> Result<(), rig::tool::server::ToolServerError> { let conversation_id = conversation_id.into(); @@ -378,6 +379,7 @@ pub async fn add_channel_tools( state.channel_store.clone(), state.conversation_logger.clone(), send_message_display_name, + current_adapter.clone(), )) .await?; } diff --git a/src/tools/send_message_to_another_channel.rs b/src/tools/send_message_to_another_channel.rs index 2523ca5e0..abf72e239 100644 --- a/src/tools/send_message_to_another_channel.rs +++ b/src/tools/send_message_to_another_channel.rs @@ -11,6 +11,12 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::sync::Arc; +/// Check if a string is a valid UUID format. +/// Accepts standard UUID format: 8-4-4-4-12 hexadecimal digits. +fn is_valid_uuid(s: &str) -> bool { + uuid::Uuid::parse_str(s).is_ok() +} + /// Tool for sending messages to other channels or DMs. /// /// Resolves targets by name or ID via the channel store, extracts the @@ -23,6 +29,7 @@ pub struct SendMessageTool { channel_store: ChannelStore, conversation_logger: ConversationLogger, agent_display_name: String, + current_adapter: Option, } impl std::fmt::Debug for SendMessageTool { @@ -37,12 +44,14 @@ impl SendMessageTool { channel_store: ChannelStore, conversation_logger: ConversationLogger, agent_display_name: String, + current_adapter: Option, ) -> Self { Self { messaging_manager, channel_store, conversation_logger, agent_display_name, + current_adapter, } } } @@ -79,6 +88,12 @@ impl Tool for SendMessageTool { async fn definition(&self, _prompt: String) -> ToolDefinition { let email_adapter_available = self.messaging_manager.has_adapter("email").await; + // Check if current adapter is Signal (e.g., "signal:gvoice1" starts with "signal") + let signal_adapter_available = self + .current_adapter + .as_ref() + .map(|adapter| adapter.starts_with("signal")) + .unwrap_or(false); let mut description = crate::prompts::text::get("tools/send_message_to_another_channel").to_string(); @@ -93,6 +108,15 @@ impl Tool for SendMessageTool { ); } + if signal_adapter_available { + description.push_str( + " Signal messaging is enabled: you can target `signal:uuid:{uuid}`, `signal:group:{group_id}`, or `signal:+{phone}`.", + ); + target_description.push_str( + " With Signal enabled, explicit targets are also allowed: `signal:uuid:{uuid}`, `signal:group:{group_id}`, `signal:+{phone}`", + ); + } + ToolDefinition { name: Self::NAME.to_string(), description, @@ -120,6 +144,79 @@ impl Tool for SendMessageTool { "send_message_to_another_channel tool called" ); + // Check for explicit signal: prefix first - always honored regardless of current adapter. + // This allows users to explicitly target Signal even when in Discord/Telegram/etc. + if let Some(mut target) = parse_explicit_signal_prefix(&args.target) { + // If explicit prefix returned default "signal" adapter but we're in a named + // Signal adapter conversation (e.g., signal:gvoice1), use the current adapter + // to ensure the message goes through the correct account. + if target.adapter == "signal" { + if let Some(current_adapter) = self + .current_adapter + .as_ref() + .filter(|adapter| adapter.starts_with("signal:")) + { + target.adapter = current_adapter.clone(); + } + } + + self.messaging_manager + .broadcast( + &target.adapter, + &target.target, + crate::OutboundResponse::Text(args.message), + ) + .await + .map_err(|error| SendMessageError(format!("failed to send message: {error}")))?; + + tracing::info!( + adapter = %target.adapter, + broadcast_target = %"[REDACTED]", + "message sent via explicit signal: prefix" + ); + + return Ok(SendMessageOutput { + success: true, + target: target.target, + platform: target.adapter, + }); + } + + // Check for implicit Signal shorthands, but only when in a Signal conversation. + // This prevents bare UUIDs, group:..., or +phone from being hijacked as Signal targets + // when the user is actually in a Discord/Telegram/etc conversation. + if let Some(current_adapter) = self + .current_adapter + .as_ref() + .filter(|adapter| adapter.starts_with("signal")) + { + if let Some(target) = parse_implicit_signal_shorthand(&args.target, current_adapter) { + self.messaging_manager + .broadcast( + &target.adapter, + &target.target, + crate::OutboundResponse::Text(args.message), + ) + .await + .map_err(|error| { + SendMessageError(format!("failed to send message: {error}")) + })?; + + tracing::info!( + adapter = %target.adapter, + broadcast_target = %"[REDACTED]", + "message sent via implicit Signal shorthand" + ); + + return Ok(SendMessageOutput { + success: true, + target: target.target, + platform: target.adapter, + }); + } + } + + // Check for explicit email target if let Some(explicit_target) = parse_explicit_email_target(&args.target) { self.messaging_manager .broadcast( @@ -144,17 +241,25 @@ impl Tool for SendMessageTool { }); } - let channel = self + // Try to find channel by name first + let channel_result = self .channel_store .find_by_name(&args.target) .await - .map_err(|error| SendMessageError(format!("failed to search channels: {error}")))? - .ok_or_else(|| { - SendMessageError(format!( - "no channel found matching '{}'. Use a channel name/ID from the available channels list or an explicit email target like email:alice@example.com.", + .map_err(|error| SendMessageError(format!("failed to search channels: {error}")))?; + + // If channel not found, return error. + // Signal targets are handled earlier (explicit signal: prefix always, + // implicit shorthands only in Signal conversations). + let channel = match channel_result { + Some(ch) => ch, + None => { + return Err(SendMessageError(format!( + "no channel found matching '{}'. Use a channel name/ID from the available channels list, an explicit email target like email:alice@example.com, or signal: prefix for Signal targets.", args.target - )) - })?; + ))); + } + }; let broadcast_target = crate::messaging::target::resolve_broadcast_target(&channel) .ok_or_else(|| { @@ -199,6 +304,77 @@ impl Tool for SendMessageTool { } } +/// Parse explicit signal: prefix - always honored regardless of current adapter. +/// Returns BroadcastTarget with adapter="signal" for targets like: +/// - signal:uuid:xxx +/// - signal:group:xxx +/// - signal:+1234567890 +/// - signal:e164:... +fn parse_explicit_signal_prefix(raw: &str) -> Option { + let trimmed = raw.trim(); + if trimmed.is_empty() { + return None; + } + + // Only handle explicit signal: prefix + if let Some(rest) = trimmed.strip_prefix("signal:") { + let parts: Vec<&str> = rest.split(':').collect(); + // Use shared parser for Signal target components + if let Some(target) = crate::messaging::target::parse_signal_target_parts(&parts) { + return Some(target); + } + // Fallback: try parsing the full signal:... string + return crate::messaging::target::parse_delivery_target(trimmed); + } + + None +} + +/// Parse implicit Signal shorthands - only in Signal conversations. +/// Handles bare UUIDs, group:xxx, and +phone without explicit signal: prefix. +/// Returns BroadcastTarget directly instead of building strings to avoid +/// parse_delivery_target() issues with colons in named adapters. +fn parse_implicit_signal_shorthand( + raw: &str, + current_adapter: &str, +) -> Option { + let trimmed = raw.trim(); + if trimmed.is_empty() { + return None; + } + + use crate::messaging::target::BroadcastTarget; + + // Check for bare UUID format (strict validation) + if is_valid_uuid(trimmed) { + return Some(BroadcastTarget { + adapter: current_adapter.to_string(), + target: format!("uuid:{trimmed}"), + }); + } + + // Phone number format: starts with + followed by 7+ digits + if trimmed.starts_with('+') + && trimmed[1..].len() >= 7 + && trimmed[1..].chars().all(|c| c.is_ascii_digit()) + { + return Some(BroadcastTarget { + adapter: current_adapter.to_string(), + target: trimmed.to_string(), + }); + } + + // Group ID format: group:xxx + if trimmed.starts_with("group:") { + return Some(BroadcastTarget { + adapter: current_adapter.to_string(), + target: trimmed.to_string(), + }); + } + + None +} + fn parse_explicit_email_target(raw: &str) -> Option { let trimmed = raw.trim(); if trimmed.is_empty() { @@ -218,7 +394,9 @@ fn parse_explicit_email_target(raw: &str) -> Option