diff --git a/Cargo.lock b/Cargo.lock index c517ea67d..d4129a33e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8483,6 +8483,7 @@ dependencies = [ "tracing-opentelemetry", "tracing-subscriber", "twitch-irc", + "url", "urlencoding", "uuid", "zip 2.4.2", diff --git a/Cargo.toml b/Cargo.toml index 17ce7cfd0..eb157b1c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -153,6 +153,7 @@ prometheus = { version = "0.13", optional = true } pdf-extract = "0.10.0" open = "5.3.3" urlencoding = "2.1.3" +url = "2" moka = "0.12.13" [features] diff --git a/docs/mattermost.md b/docs/mattermost.md new file mode 100644 index 000000000..88bf0e1f0 --- /dev/null +++ b/docs/mattermost.md @@ -0,0 +1,118 @@ +# Mattermost + +Spacebot connects to Mattermost via a bot account using the Mattermost REST API and WebSocket event stream. The integration can be configured in the web UI or via `config.toml`. + +## Features + +### Supported +- Receive messages from channels and direct messages +- Send text replies (long messages are automatically split into multiple posts) +- Streaming replies with live edit-in-place updates and typing indicator +- Thread-aware replies (replies stay in the originating thread) +- File/image attachments (up to a configurable size limit) +- Emoji reactions +- Fetch channel history (used for context window) +- Multiple named instances (connect to more than one Mattermost server) +- Per-team and per-channel allowlists +- DM allowlist (fail-closed: DMs are blocked unless the sender is explicitly listed) +- `require_mention` routing (only respond when the bot is @-mentioned) + +### Not supported (compared to Slack/Discord) +- Slash commands — Mattermost slash commands are not handled; use @-mentions instead +- Ephemeral (private) messages +- Message threading via `parent_id` lookup — threads are followed when the inbound message carries a root ID, but the bot cannot independently look up thread context +- User/channel autocomplete +- Presence or status events +- App marketplace / interactive components (buttons, modals) + +## Setup + +### 1. Create a bot account + +In Mattermost: **System Console → Integrations → Bot Accounts → Add Bot Account** + +- Give it a username (e.g. `spacebot`) +- Copy the generated access token + +The bot must be added to any team and channel it should respond in. Bot accounts in Mattermost are not automatically visible in channels. + +### 2. Configure in config.toml + +```toml +[messaging.mattermost] +enabled = true +base_url = "https://mattermost.example.com" # origin only, no path +token = "your-bot-access-token" +team_id = "team_id_here" # optional: default team for events without one +max_attachment_bytes = 52428800 # optional: default 50 MB +dm_allowed_users = [] # optional: user IDs allowed to DM the bot +``` + +The token can also be supplied via the `MATTERMOST_TOKEN` environment variable and `base_url` via `MATTERMOST_BASE_URL`. + +> **Security**: `base_url` must use `https` for non-localhost hosts. Plain `http` is only accepted for `localhost` / `127.0.0.1` / `::1`. + +### 3. Wire up a binding + +Bindings connect Mattermost channels to agents: + +```toml +[[bindings]] +agent_id = "my-agent" +channel = "mattermost" +channel_ids = ["channel_id_here"] # leave empty to match all channels +require_mention = false +``` + +To scope a binding to a specific Mattermost team, add `team_id`: + +```toml +[[bindings]] +agent_id = "my-agent" +channel = "mattermost" +team_id = "team_id_here" +channel_ids = ["channel_id_here"] +``` + +## Multiple servers (named instances) + +```toml +[[messaging.mattermost.instances]] +name = "corp" +enabled = true +base_url = "https://mattermost.corp.example.com" +token = "corp-bot-token" +team_id = "corp_team_id" + +[[messaging.mattermost.instances]] +name = "community" +enabled = true +base_url = "https://community.example.com" +token = "community-bot-token" +``` + +Named instance tokens can be supplied via `MATTERMOST_CORP_TOKEN` / `MATTERMOST_COMMUNITY_TOKEN` etc. + +In bindings, reference a named instance with the `adapter` field: + +```toml +[[bindings]] +agent_id = "my-agent" +channel = "mattermost" +adapter = "corp" +channel_ids = ["channel_id_here"] +``` + +## Web UI + +All of the above can be configured in the Spacebot web interface under **Settings → Messaging → Mattermost**. The UI supports adding credentials, enabling/disabling the adapter, and managing bindings with team and channel scoping. + +## Finding IDs + +Mattermost does not display IDs in the UI by default. The easiest ways to retrieve them: + +- **Team ID**: `GET /api/v4/teams/name/{team_name}` → `.id` +- **Channel ID**: `GET /api/v4/teams/{team_id}/channels/name/{channel_name}` → `.id` +- **User ID**: `GET /api/v4/users/username/{username}` → `.id` + +Alternatively, enable **Account Settings → Advanced → Enable post formatting** and inspect the network tab when loading a channel — team and channel IDs appear in the request URLs. diff --git a/interface/src/api/client.ts b/interface/src/api/client.ts index e89ca0abb..29e25cfa3 100644 --- a/interface/src/api/client.ts +++ b/interface/src/api/client.ts @@ -1207,6 +1207,8 @@ export interface CreateMessagingInstanceRequest { webhook_port?: number; webhook_bind?: string; webhook_auth_token?: string; + mattermost_base_url?: string; + mattermost_token?: string; }; } diff --git a/interface/src/components/ChannelEditModal.tsx b/interface/src/components/ChannelEditModal.tsx index e29a72251..66e3c070b 100644 --- a/interface/src/components/ChannelEditModal.tsx +++ b/interface/src/components/ChannelEditModal.tsx @@ -18,7 +18,7 @@ import { import {PlatformIcon} from "@/lib/platformIcons"; import {TagInput} from "@/components/TagInput"; -type Platform = "discord" | "slack" | "telegram" | "twitch" | "email" | "webhook"; +type Platform = "discord" | "slack" | "telegram" | "twitch" | "email" | "webhook" | "mattermost"; interface ChannelEditModalProps { platform: Platform; diff --git a/interface/src/components/ChannelSettingCard.tsx b/interface/src/components/ChannelSettingCard.tsx index 198babecb..f61c4b6bb 100644 --- a/interface/src/components/ChannelSettingCard.tsx +++ b/interface/src/components/ChannelSettingCard.tsx @@ -27,7 +27,7 @@ import {TagInput} from "@/components/TagInput"; import {FontAwesomeIcon} from "@fortawesome/react-fontawesome"; import {faChevronDown, faPlus} from "@fortawesome/free-solid-svg-icons"; -type Platform = "discord" | "slack" | "telegram" | "twitch" | "email" | "webhook"; +type Platform = "discord" | "slack" | "telegram" | "twitch" | "email" | "webhook" | "mattermost"; const PLATFORM_LABELS: Record = { discord: "Discord", @@ -36,6 +36,7 @@ const PLATFORM_LABELS: Record = { twitch: "Twitch", email: "Email", webhook: "Webhook", + mattermost: "Mattermost", }; const DOC_LINKS: Partial> = { @@ -43,6 +44,7 @@ const DOC_LINKS: Partial> = { slack: "https://docs.spacebot.sh/slack-setup", telegram: "https://docs.spacebot.sh/telegram-setup", twitch: "https://docs.spacebot.sh/twitch-setup", + mattermost: "https://docs.spacebot.sh/mattermost-setup", }; // --- Platform Catalog (Left Column) --- @@ -59,6 +61,7 @@ export function PlatformCatalog({onAddInstance}: PlatformCatalogProps) { "twitch", "email", "webhook", + "mattermost", ]; const COMING_SOON = [ @@ -636,6 +639,17 @@ export function AddInstanceCard({platform, isDefault, onCancel, onCreated}: AddI credentials.webhook_bind = credentialInputs.webhook_bind.trim(); if (credentialInputs.webhook_auth_token?.trim()) credentials.webhook_auth_token = credentialInputs.webhook_auth_token.trim(); + } else if (platform === "mattermost") { + if (!credentialInputs.mattermost_base_url?.trim()) { + setMessage({text: "Server URL is required", type: "error"}); + return; + } + if (!credentialInputs.mattermost_token?.trim()) { + setMessage({text: "Access token is required", type: "error"}); + return; + } + credentials.mattermost_base_url = credentialInputs.mattermost_base_url.trim(); + credentials.mattermost_token = credentialInputs.mattermost_token.trim(); } if (!isDefault && !instanceName.trim()) { @@ -925,6 +939,31 @@ export function AddInstanceCard({platform, isDefault, onCancel, onCreated}: AddI )} + {platform === "mattermost" && ( + <> +
+ + setCredentialInputs({...credentialInputs, mattermost_base_url: e.target.value})} + placeholder="https://mattermost.example.com" + /> +
+
+ + setCredentialInputs({...credentialInputs, mattermost_token: e.target.value})} + placeholder="Personal access token from Mattermost account settings" + onKeyDown={(e) => { if (e.key === "Enter") handleSave(); }} + /> +
+ + )} + {docLink && (

Need help?{" "} diff --git a/interface/src/lib/platformIcons.tsx b/interface/src/lib/platformIcons.tsx index a6e982375..1193986b4 100644 --- a/interface/src/lib/platformIcons.tsx +++ b/interface/src/lib/platformIcons.tsx @@ -1,6 +1,6 @@ import { FontAwesomeIcon } from "@fortawesome/react-fontawesome"; import { faDiscord, faSlack, faTelegram, faTwitch, faWhatsapp } from "@fortawesome/free-brands-svg-icons"; -import { faLink, faEnvelope, faComments, faComment } from "@fortawesome/free-solid-svg-icons"; +import { faLink, faEnvelope, faComments, faComment, faServer } from "@fortawesome/free-solid-svg-icons"; interface PlatformIconProps { platform: string; @@ -16,6 +16,7 @@ export function PlatformIcon({ platform, className = "text-ink-faint", size = "1 twitch: faTwitch, webhook: faLink, email: faEnvelope, + mattermost: faServer, whatsapp: faWhatsapp, matrix: faComments, imessage: faComment, diff --git a/interface/src/routes/Settings.tsx b/interface/src/routes/Settings.tsx index 8086d9194..fcda17c7e 100644 --- a/interface/src/routes/Settings.tsx +++ b/interface/src/routes/Settings.tsx @@ -884,7 +884,7 @@ function ThemePreview({ themeId }: { themeId: ThemeId }) { ); } -type Platform = "discord" | "slack" | "telegram" | "twitch" | "email" | "webhook"; +type Platform = "discord" | "slack" | "telegram" | "twitch" | "email" | "webhook" | "mattermost"; function ChannelsSection() { const [expandedKey, setExpandedKey] = useState(null); diff --git a/src/api/bindings.rs b/src/api/bindings.rs index 42446bdaf..4693b66d0 100644 --- a/src/api/bindings.rs +++ b/src/api/bindings.rs @@ -14,6 +14,7 @@ pub(super) struct BindingResponse { guild_id: Option, workspace_id: Option, chat_id: Option, + team_id: Option, channel_ids: Vec, require_mention: bool, dm_allowed_users: Vec, @@ -43,6 +44,8 @@ pub(super) struct CreateBindingRequest { #[serde(default)] chat_id: Option, #[serde(default)] + team_id: Option, + #[serde(default)] channel_ids: Vec, #[serde(default)] require_mention: bool, @@ -115,6 +118,8 @@ pub(super) struct DeleteBindingRequest { workspace_id: Option, #[serde(default)] chat_id: Option, + #[serde(default)] + team_id: Option, } #[derive(Serialize)] @@ -135,6 +140,8 @@ pub(super) struct UpdateBindingRequest { original_workspace_id: Option, #[serde(default)] original_chat_id: Option, + #[serde(default)] + original_team_id: Option, agent_id: String, channel: String, @@ -147,6 +154,8 @@ pub(super) struct UpdateBindingRequest { #[serde(default)] chat_id: Option, #[serde(default)] + team_id: Option, + #[serde(default)] channel_ids: Vec, #[serde(default)] require_mention: bool, @@ -185,6 +194,7 @@ pub(super) async fn list_bindings( guild_id: b.guild_id, workspace_id: b.workspace_id, chat_id: b.chat_id, + team_id: b.team_id, channel_ids: b.channel_ids, require_mention: b.require_mention, dm_allowed_users: b.dm_allowed_users, @@ -433,6 +443,9 @@ pub(super) async fn create_binding( if let Some(chat_id) = &request.chat_id { binding_table["chat_id"] = toml_edit::value(chat_id.as_str()); } + if let Some(team_id) = request.team_id.as_deref().map(str::trim).filter(|s| !s.is_empty()) { + binding_table["team_id"] = toml_edit::value(team_id); + } if !request.channel_ids.is_empty() { let mut arr = toml_edit::Array::new(); for id in &request.channel_ids { @@ -700,32 +713,36 @@ pub(super) async fn update_binding( None => table.get("adapter").is_none(), }; let matches_guild = match &request.original_guild_id { - Some(gid) => table + Some(guild_id) => table .get("guild_id") .and_then(|v| v.as_str()) - .is_some_and(|v| v == gid), + .is_some_and(|v| v == guild_id), None => table.get("guild_id").is_none(), }; let matches_workspace = match &request.original_workspace_id { - Some(wid) => table + Some(workspace_id) => table .get("workspace_id") .and_then(|v| v.as_str()) - .is_some_and(|v| v == wid), + .is_some_and(|v| v == workspace_id), None => table.get("workspace_id").is_none(), }; let matches_chat = match &request.original_chat_id { - Some(cid) => table + Some(chat_id) => table .get("chat_id") .and_then(|v| v.as_str()) - .is_some_and(|v| v == cid), + .is_some_and(|v| v == chat_id), None => table.get("chat_id").is_none(), }; + let req_team_id = request.original_team_id.as_deref().map(str::trim).filter(|s| !s.is_empty()); + let toml_team_id = table.get("team_id").and_then(|v| v.as_str()).map(str::trim).filter(|s| !s.is_empty()); + let matches_team = req_team_id == toml_team_id; if matches_agent && matches_channel && matches_adapter && matches_guild && matches_workspace && matches_chat + && matches_team { match_idx = Some(i); break; @@ -750,6 +767,7 @@ pub(super) async fn update_binding( binding.remove("guild_id"); binding.remove("workspace_id"); binding.remove("chat_id"); + binding.remove("team_id"); if let Some(adapter) = request .adapter @@ -775,6 +793,9 @@ pub(super) async fn update_binding( { binding["chat_id"] = toml_edit::value(chat_id); } + if let Some(team_id) = request.team_id.as_deref().map(str::trim).filter(|s| !s.is_empty()) { + binding["team_id"] = toml_edit::value(team_id); + } if !request.channel_ids.is_empty() { let mut arr = toml_edit::Array::new(); @@ -894,32 +915,36 @@ pub(super) async fn delete_binding( None => table.get("adapter").is_none(), }; let matches_guild = match &request.guild_id { - Some(gid) => table + Some(guild_id) => table .get("guild_id") .and_then(|v: &toml_edit::Item| v.as_str()) - .is_some_and(|v| v == gid), + .is_some_and(|v| v == guild_id), None => table.get("guild_id").is_none(), }; let matches_workspace = match &request.workspace_id { - Some(wid) => table + Some(workspace_id) => table .get("workspace_id") .and_then(|v: &toml_edit::Item| v.as_str()) - .is_some_and(|v| v == wid), + .is_some_and(|v| v == workspace_id), None => table.get("workspace_id").is_none(), }; let matches_chat = match &request.chat_id { - Some(cid) => table + Some(chat_id) => table .get("chat_id") .and_then(|v: &toml_edit::Item| v.as_str()) - .is_some_and(|v| v == cid), + .is_some_and(|v| v == chat_id), None => table.get("chat_id").is_none(), }; + let req_team_id = request.team_id.as_deref().map(str::trim).filter(|s| !s.is_empty()); + let toml_team_id = table.get("team_id").and_then(|v: &toml_edit::Item| v.as_str()).map(str::trim).filter(|s| !s.is_empty()); + let matches_team = req_team_id == toml_team_id; if matches_agent && matches_channel && matches_adapter && matches_guild && matches_workspace && matches_chat + && matches_team { match_idx = Some(i); break; diff --git a/src/api/messaging.rs b/src/api/messaging.rs index 692dea009..45313adda 100644 --- a/src/api/messaging.rs +++ b/src/api/messaging.rs @@ -95,6 +95,11 @@ pub(super) struct InstanceCredentials { webhook_bind: Option, #[serde(default)] webhook_auth_token: Option, + // Mattermost credentials + #[serde(default)] + mattermost_base_url: Option, + #[serde(default)] + mattermost_token: Option, } #[derive(Deserialize)] @@ -510,6 +515,60 @@ pub(super) async fn messaging_status( enabled: false, }); + // Populate instances for Mattermost (not in the legacy per-platform status fields) + if let Some(mm) = doc.get("messaging").and_then(|m| m.get("mattermost")) { + let has_url = mm + .get("base_url") + .and_then(|v| v.as_str()) + .is_some_and(|s| !s.is_empty()); + let has_token = mm + .get("token") + .and_then(|v| v.as_str()) + .is_some_and(|s| !s.is_empty()); + let enabled = mm.get("enabled").and_then(|v| v.as_bool()).unwrap_or(false); + + if has_url && has_token { + push_instance_status(&mut instances, bindings, "mattermost", None, true, enabled); + } + + if let Some(named_instances) = mm + .get("instances") + .and_then(|value| value.as_array_of_tables()) + { + for instance in named_instances { + let instance_name = normalize_adapter_selector( + instance.get("name").and_then(|value| value.as_str()), + ); + let instance_enabled = instance + .get("enabled") + .and_then(|value| value.as_bool()) + .unwrap_or(true) + && enabled; + let instance_configured = instance + .get("base_url") + .and_then(|value| value.as_str()) + .is_some_and(|value| !value.is_empty()) + && instance + .get("token") + .and_then(|value| value.as_str()) + .is_some_and(|value| !value.is_empty()); + + if let Some(instance_name) = instance_name + && instance_configured + { + push_instance_status( + &mut instances, + bindings, + "mattermost", + Some(instance_name), + true, + instance_enabled, + ); + } + } + } + } + ( discord_status, slack_status, @@ -1128,7 +1187,7 @@ pub(super) async fn create_messaging_instance( if !matches!( platform.as_str(), - "discord" | "slack" | "telegram" | "twitch" | "email" | "webhook" + "discord" | "slack" | "telegram" | "twitch" | "email" | "webhook" | "mattermost" ) { return Ok(Json(MessagingInstanceActionResponse { success: false, @@ -1269,6 +1328,23 @@ pub(super) async fn create_messaging_instance( platform_table["auth_token"] = toml_edit::value(token.as_str()); } } + "mattermost" => { + if let Some(url) = &credentials.mattermost_base_url { + if url::Url::parse(url) + .map(|u| u.path() != "/" || u.query().is_some() || u.fragment().is_some()) + .unwrap_or(true) + { + return Ok(Json(MessagingInstanceActionResponse { + success: false, + message: format!("invalid mattermost base_url: must be an origin URL (e.g. https://mm.example.com)"), + })); + } + platform_table["base_url"] = toml_edit::value(url.as_str()); + } + if let Some(token) = &credentials.mattermost_token { + platform_table["token"] = toml_edit::value(token.as_str()); + } + } _ => {} } platform_table["enabled"] = toml_edit::value(enabled); @@ -1378,6 +1454,23 @@ pub(super) async fn create_messaging_instance( instance_table["auth_token"] = toml_edit::value(token.as_str()); } } + "mattermost" => { + if let Some(url) = &credentials.mattermost_base_url { + if url::Url::parse(url) + .map(|u| u.path() != "/" || u.query().is_some() || u.fragment().is_some()) + .unwrap_or(true) + { + return Ok(Json(MessagingInstanceActionResponse { + success: false, + message: format!("invalid mattermost base_url: must be an origin URL (e.g. https://mm.example.com)"), + })); + } + instance_table["base_url"] = toml_edit::value(url.as_str()); + } + if let Some(token) = &credentials.mattermost_token { + instance_table["token"] = toml_edit::value(token.as_str()); + } + } _ => {} } @@ -1443,7 +1536,7 @@ pub(super) async fn delete_messaging_instance( if !matches!( platform.as_str(), - "discord" | "slack" | "telegram" | "twitch" | "email" | "webhook" + "discord" | "slack" | "telegram" | "twitch" | "email" | "webhook" | "mattermost" ) { return Ok(Json(MessagingInstanceActionResponse { success: false, @@ -1540,6 +1633,13 @@ pub(super) async fn delete_messaging_instance( table.remove("bind"); table.remove("auth_token"); } + "mattermost" => { + table.remove("base_url"); + table.remove("token"); + table.remove("team_id"); + table.remove("dm_allowed_users"); + table.remove("max_attachment_bytes"); + } _ => {} } } diff --git a/src/config.rs b/src/config.rs index 60b9eb74b..4e6201f91 100644 --- a/src/config.rs +++ b/src/config.rs @@ -15,7 +15,8 @@ pub(crate) use load::resolve_env_value; pub use load::set_resolve_secrets_store; pub use onboarding::run_onboarding; pub use permissions::{ - DiscordPermissions, SignalPermissions, SlackPermissions, TelegramPermissions, TwitchPermissions, + DiscordPermissions, MattermostPermissions, SignalPermissions, SlackPermissions, + TelegramPermissions, TwitchPermissions, }; pub(crate) use providers::default_provider_config; pub use runtime::RuntimeConfig; @@ -576,6 +577,7 @@ bind = "127.0.0.1" guild_id: None, workspace_id: workspace_id.map(String::from), chat_id: None, + team_id: None, channel_ids: vec![], require_mention: false, dm_allowed_users, @@ -1432,6 +1434,7 @@ maintenance_merge_similarity_threshold = 1.1 guild_id: None, workspace_id: None, chat_id: None, + team_id: None, channel_ids: vec![], require_mention: false, dm_allowed_users: vec![], @@ -1448,6 +1451,7 @@ maintenance_merge_similarity_threshold = 1.1 guild_id: None, workspace_id: None, chat_id: None, + team_id: None, channel_ids: vec![], require_mention: false, dm_allowed_users: vec![], @@ -1479,6 +1483,7 @@ maintenance_merge_similarity_threshold = 1.1 guild_id: None, workspace_id: None, chat_id: None, + team_id: None, channel_ids: vec![], require_mention: false, dm_allowed_users: vec![], @@ -1496,6 +1501,7 @@ maintenance_merge_similarity_threshold = 1.1 guild_id: None, workspace_id: None, chat_id: None, + team_id: None, channel_ids: vec![], require_mention: false, dm_allowed_users: vec![], @@ -1513,6 +1519,7 @@ maintenance_merge_similarity_threshold = 1.1 guild_id: None, workspace_id: None, chat_id: None, + team_id: None, channel_ids: vec![], require_mention: false, dm_allowed_users: vec![], @@ -1530,6 +1537,7 @@ maintenance_merge_similarity_threshold = 1.1 guild_id: None, workspace_id: None, chat_id: None, + team_id: None, channel_ids: vec![], require_mention: false, dm_allowed_users: vec![], @@ -1547,6 +1555,7 @@ maintenance_merge_similarity_threshold = 1.1 guild_id: None, workspace_id: None, chat_id: None, + team_id: None, channel_ids: vec![], require_mention: false, dm_allowed_users: vec![], @@ -1575,6 +1584,7 @@ maintenance_merge_similarity_threshold = 1.1 webhook: None, twitch: None, signal: None, + mattermost: None, }; let bindings = vec![ Binding { @@ -1584,7 +1594,7 @@ maintenance_merge_similarity_threshold = 1.1 guild_id: None, workspace_id: None, chat_id: None, - + team_id: None, channel_ids: vec![], require_mention: false, dm_allowed_users: vec![], @@ -1596,7 +1606,7 @@ maintenance_merge_similarity_threshold = 1.1 guild_id: None, workspace_id: None, chat_id: None, - + team_id: None, channel_ids: vec![], require_mention: false, dm_allowed_users: vec![], @@ -1620,6 +1630,7 @@ maintenance_merge_similarity_threshold = 1.1 webhook: None, twitch: None, signal: None, + mattermost: None, }; let bindings = vec![Binding { agent_id: "main".into(), @@ -1628,6 +1639,7 @@ maintenance_merge_similarity_threshold = 1.1 guild_id: None, workspace_id: None, chat_id: None, + team_id: None, channel_ids: vec![], require_mention: false, dm_allowed_users: vec![], @@ -1683,6 +1695,7 @@ maintenance_merge_similarity_threshold = 1.1 webhook: None, twitch: None, signal: None, + mattermost: None, }; let bindings = vec![Binding { agent_id: "main".into(), @@ -1691,6 +1704,7 @@ maintenance_merge_similarity_threshold = 1.1 guild_id: None, workspace_id: None, chat_id: None, + team_id: None, channel_ids: vec![], require_mention: false, dm_allowed_users: vec![], @@ -1718,6 +1732,7 @@ maintenance_merge_similarity_threshold = 1.1 webhook: None, twitch: None, signal: None, + mattermost: None, }; // Binding targets default adapter, but no default credentials exist let bindings = vec![Binding { @@ -1727,6 +1742,7 @@ maintenance_merge_similarity_threshold = 1.1 guild_id: None, workspace_id: None, chat_id: None, + team_id: None, channel_ids: vec![], require_mention: false, dm_allowed_users: vec![], diff --git a/src/config/load.rs b/src/config/load.rs index a4b44882c..a02378ae4 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -14,11 +14,12 @@ use super::{ AgentConfig, ApiConfig, ApiType, Binding, BrowserConfig, ChannelConfig, ClosePolicy, CoalesceConfig, CompactionConfig, Config, CortexConfig, CronDef, DefaultsConfig, DiscordConfig, DiscordInstanceConfig, EmailConfig, EmailInstanceConfig, GroupDef, HumanDef, IngestionConfig, - LinkDef, LlmConfig, McpServerConfig, McpTransport, MemoryPersistenceConfig, MessagingConfig, - MetricsConfig, OpenCodeConfig, ProjectsConfig, ProviderConfig, SignalConfig, - SignalInstanceConfig, SlackCommandConfig, SlackConfig, SlackInstanceConfig, TelegramConfig, - TelegramInstanceConfig, TelemetryConfig, TwitchConfig, TwitchInstanceConfig, WarmupConfig, - WebhookConfig, normalize_adapter, validate_named_messaging_adapters, + LinkDef, LlmConfig, MattermostConfig, MattermostInstanceConfig, McpServerConfig, McpTransport, + MemoryPersistenceConfig, MessagingConfig, MetricsConfig, OpenCodeConfig, ProjectsConfig, + ProviderConfig, SignalConfig, SignalInstanceConfig, SlackCommandConfig, SlackConfig, + SlackInstanceConfig, TelegramConfig, TelegramInstanceConfig, TelemetryConfig, TwitchConfig, + TwitchInstanceConfig, WarmupConfig, WebhookConfig, normalize_adapter, + validate_named_messaging_adapters, }; use crate::error::{ConfigError, Result}; @@ -2227,6 +2228,54 @@ impl Config { ignore_stories: s.ignore_stories, }) }), + mattermost: toml.messaging.mattermost.and_then(|mm| { + let instances = mm + .instances + .into_iter() + .map(|instance| { + let token = instance.token.as_deref().and_then(resolve_env_value); + let base_url = instance.base_url.as_deref().and_then(resolve_env_value); + let has_credentials = token.is_some() && base_url.is_some(); + if instance.enabled && !has_credentials { + tracing::warn!( + adapter = %instance.name, + "mattermost instance is enabled but credentials are missing/unresolvable — disabling" + ); + } + MattermostInstanceConfig { + name: instance.name, + enabled: instance.enabled && has_credentials, + base_url: base_url.unwrap_or_default(), + token: token.unwrap_or_default(), + team_id: instance.team_id, + dm_allowed_users: instance.dm_allowed_users, + max_attachment_bytes: instance.max_attachment_bytes, + } + }) + .collect::>(); + + let token = std::env::var("MATTERMOST_TOKEN") + .ok() + .or_else(|| mm.token.as_deref().and_then(resolve_env_value)); + let base_url = std::env::var("MATTERMOST_BASE_URL") + .ok() + .or_else(|| mm.base_url.as_deref().and_then(resolve_env_value)); + + if (token.is_none() || base_url.is_none()) && instances.is_empty() { + tracing::warn!("mattermost config present but no credentials found"); + return None; + } + + Some(MattermostConfig { + enabled: mm.enabled, + base_url: base_url.unwrap_or_default(), + token: token.unwrap_or_default(), + team_id: mm.team_id, + instances, + dm_allowed_users: mm.dm_allowed_users, + max_attachment_bytes: mm.max_attachment_bytes, + }) + }), }; let bindings: Vec = toml @@ -2239,6 +2288,7 @@ impl Config { guild_id: b.guild_id, workspace_id: b.workspace_id, chat_id: b.chat_id, + team_id: b.team_id, channel_ids: b.channel_ids, require_mention: b.require_mention, dm_allowed_users: b.dm_allowed_users, diff --git a/src/config/permissions.rs b/src/config/permissions.rs index 3c0713228..360f4a81c 100644 --- a/src/config/permissions.rs +++ b/src/config/permissions.rs @@ -1,7 +1,7 @@ use super::{ - Binding, DiscordConfig, DiscordInstanceConfig, SignalConfig, SignalInstanceConfig, SlackConfig, - SlackInstanceConfig, TelegramConfig, TelegramInstanceConfig, TwitchConfig, - TwitchInstanceConfig, + Binding, DiscordConfig, DiscordInstanceConfig, MattermostConfig, MattermostInstanceConfig, + SignalConfig, SignalInstanceConfig, SlackConfig, SlackInstanceConfig, TelegramConfig, + TelegramInstanceConfig, TwitchConfig, TwitchInstanceConfig, }; use std::collections::HashMap; @@ -454,6 +454,84 @@ impl SignalPermissions { } } +/// Per-adapter permissions for the Mattermost platform. +#[derive(Debug, Clone, Default)] +pub struct MattermostPermissions { + pub team_filter: Option>, + pub channel_filter: HashMap>, + pub dm_allowed_users: Vec, +} + +impl MattermostPermissions { + pub fn from_config(config: &MattermostConfig, bindings: &[Binding]) -> Self { + Self::from_bindings_for_adapter(config.dm_allowed_users.clone(), bindings, None) + } + + pub fn from_instance_config(instance: &MattermostInstanceConfig, bindings: &[Binding]) -> Self { + Self::from_bindings_for_adapter( + instance.dm_allowed_users.clone(), + bindings, + Some(instance.name.as_str()), + ) + } + + fn from_bindings_for_adapter( + seed_dm_allowed_users: Vec, + bindings: &[Binding], + adapter_selector: Option<&str>, + ) -> Self { + let mm_bindings: Vec<&Binding> = bindings + .iter() + .filter(|b| { + b.channel == "mattermost" + && binding_adapter_selector_matches(b, adapter_selector) + }) + .collect(); + + let team_filter = { + let team_ids: Vec = mm_bindings + .iter() + .filter_map(|b| b.team_id.clone()) + .collect(); + if team_ids.is_empty() { + None + } else { + Some(team_ids) + } + }; + + let channel_filter = { + let mut filter: HashMap> = HashMap::new(); + for binding in &mm_bindings { + if let Some(team_id) = &binding.team_id + && !binding.channel_ids.is_empty() + { + filter + .entry(team_id.clone()) + .or_default() + .extend(binding.channel_ids.clone()); + } + } + filter + }; + + let mut dm_allowed_users = seed_dm_allowed_users; + for binding in &mm_bindings { + for id in &binding.dm_allowed_users { + if !dm_allowed_users.contains(id) { + dm_allowed_users.push(id.clone()); + } + } + } + + Self { + team_filter, + channel_filter, + dm_allowed_users, + } + } +} + fn binding_adapter_selector_matches(binding: &Binding, adapter_selector: Option<&str>) -> bool { match (binding.adapter.as_deref(), adapter_selector) { (None, None) => true, diff --git a/src/config/toml_schema.rs b/src/config/toml_schema.rs index a3a833484..516c4b065 100644 --- a/src/config/toml_schema.rs +++ b/src/config/toml_schema.rs @@ -501,6 +501,8 @@ pub(super) struct TomlMessagingConfig { pub(super) webhook: Option, pub(super) twitch: Option, pub(super) signal: Option, + #[serde(default)] + pub(super) mattermost: Option, } #[derive(Deserialize)] @@ -780,9 +782,44 @@ pub(super) struct TomlBinding { pub(super) workspace_id: Option, pub(super) chat_id: Option, #[serde(default)] + pub(super) team_id: Option, + #[serde(default)] pub(super) channel_ids: Vec, #[serde(default)] pub(super) require_mention: bool, #[serde(default)] pub(super) dm_allowed_users: Vec, } + +#[derive(Deserialize)] +pub(super) struct TomlMattermostConfig { + #[serde(default)] + pub(super) enabled: bool, + pub(super) base_url: Option, + pub(super) token: Option, + pub(super) team_id: Option, + #[serde(default)] + pub(super) instances: Vec, + #[serde(default)] + pub(super) dm_allowed_users: Vec, + #[serde(default = "default_mattermost_max_attachment_bytes")] + pub(super) max_attachment_bytes: usize, +} + +#[derive(Deserialize)] +pub(super) struct TomlMattermostInstanceConfig { + pub(super) name: String, + #[serde(default)] + pub(super) enabled: bool, + pub(super) base_url: Option, + pub(super) token: Option, + pub(super) team_id: Option, + #[serde(default)] + pub(super) dm_allowed_users: Vec, + #[serde(default = "default_mattermost_max_attachment_bytes")] + pub(super) max_attachment_bytes: usize, +} + +pub(super) fn default_mattermost_max_attachment_bytes() -> usize { + 10 * 1024 * 1024 +} diff --git a/src/config/types.rs b/src/config/types.rs index fe9d31463..77e3de637 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -1473,6 +1473,7 @@ pub struct Binding { pub guild_id: Option, pub workspace_id: Option, // Slack workspace (team) ID pub chat_id: Option, // Telegram group ID + pub team_id: Option, // Mattermost team ID /// Channel IDs this binding applies to. If empty, all channels in the guild/workspace are allowed. pub channel_ids: Vec, /// Require explicit @mention (or reply-to-bot) for inbound messages. @@ -1561,11 +1562,18 @@ impl Binding { .get("twitch_channel") .and_then(|v| v.as_str()); + // Also check Mattermost channel ID + let mattermost_channel = message + .metadata + .get("mattermost_channel_id") + .and_then(|v| v.as_str()); + let direct_match = message_channel .as_ref() .is_some_and(|id| self.channel_ids.contains(id)) || slack_channel.is_some_and(|id| self.channel_ids.contains(&id.to_string())) - || twitch_channel.is_some_and(|id| self.channel_ids.contains(&id.to_string())); + || twitch_channel.is_some_and(|id| self.channel_ids.contains(&id.to_string())) + || mattermost_channel.is_some_and(|id| self.channel_ids.contains(&id.to_string())); let parent_match = parent_channel .as_ref() .is_some_and(|id| self.channel_ids.contains(id)); @@ -1587,6 +1595,18 @@ impl Binding { } } + // Mattermost team filter + if let Some(team_id) = &self.team_id { + if self.channel == "mattermost" { + let message_team = message + .metadata + .get("mattermost_team_id") + .and_then(|v| v.as_str()); + if message_team != Some(team_id.as_str()) { + return false; + } + } + } true } @@ -1629,6 +1649,7 @@ impl Binding { "slack" => "slack_mentions_or_replies_to_bot", "twitch" => "twitch_mentions_or_replies_to_bot", "telegram" => "telegram_mentions_or_replies_to_bot", + "mattermost" => "mattermost_mentions_or_replies_to_bot", // Unknown platforms: if require_mention is set, default to // requiring a mention (safe default). _ => return false, @@ -1670,7 +1691,7 @@ pub(super) struct AdapterValidationState { pub(super) fn is_named_adapter_platform(platform: &str) -> bool { matches!( platform, - "discord" | "slack" | "telegram" | "twitch" | "email" | "signal" + "discord" | "slack" | "telegram" | "twitch" | "email" | "signal" | "mattermost" ) } @@ -1853,9 +1874,94 @@ pub(super) fn build_adapter_validation_states( ); } + if let Some(mattermost) = &messaging.mattermost { + let named_instances = validate_instance_names( + "mattermost", + mattermost.instances.iter().map(|instance| instance.name.as_str()), + )?; + let default_present = + !mattermost.base_url.trim().is_empty() && !mattermost.token.trim().is_empty(); + validate_runtime_keys("mattermost", default_present, &named_instances)?; + if default_present { + validate_mattermost_url(&mattermost.base_url)?; + } + for instance in &mattermost.instances { + if instance.enabled && !instance.base_url.is_empty() { + validate_mattermost_url(&instance.base_url)?; + } + } + states.insert( + "mattermost", + AdapterValidationState { + default_present, + named_instances, + }, + ); + } + Ok(states) } +fn validate_mattermost_url(url: &str) -> Result<()> { + let parsed = url::Url::parse(url) + .map_err(|e| ConfigError::Invalid(format!("invalid mattermost base_url '{url}': {e}")))?; + + if !parsed.username().is_empty() || parsed.password().is_some() { + return Err(ConfigError::Invalid( + "mattermost base_url must not contain credentials".to_string(), + ) + .into()); + } + let path = parsed.path(); + if !path.is_empty() && path != "/" { + return Err(ConfigError::Invalid(format!( + "mattermost base_url must be an origin URL (no path), got path: {path}" + )) + .into()); + } + if parsed.query().is_some() { + return Err(ConfigError::Invalid( + "mattermost base_url must not contain a query string".to_string(), + ) + .into()); + } + if parsed.fragment().is_some() { + return Err(ConfigError::Invalid( + "mattermost base_url must not contain a fragment".to_string(), + ) + .into()); + } + + match parsed.scheme() { + "https" => {} + "http" => { + let is_local = match parsed.host() { + Some(url::Host::Domain(h)) => h.eq_ignore_ascii_case("localhost"), + Some(url::Host::Ipv4(addr)) => addr == std::net::Ipv4Addr::LOCALHOST, + Some(url::Host::Ipv6(addr)) => addr == std::net::Ipv6Addr::LOCALHOST, + None => false, + }; + if !is_local { + return Err(ConfigError::Invalid( + "mattermost base_url must use https for non-localhost hosts".to_string(), + ) + .into()); + } + tracing::warn!( + host = parsed.host_str().unwrap_or(""), + "mattermost base_url uses http for localhost" + ); + } + scheme => { + return Err(ConfigError::Invalid(format!( + "mattermost base_url must use http or https, got: {scheme}" + )) + .into()); + } + } + Ok(()) +} + pub(super) fn validate_instance_names<'a>( platform: &str, names: impl Iterator, @@ -1974,6 +2080,7 @@ pub struct MessagingConfig { pub webhook: Option, pub twitch: Option, pub signal: Option, + pub mattermost: Option, } #[derive(Clone)] @@ -2566,3 +2673,151 @@ impl SystemSecrets for SignalConfig { ] } } + +#[derive(Clone)] +pub struct MattermostConfig { + pub enabled: bool, + pub base_url: String, + pub token: String, + pub team_id: Option, + pub instances: Vec, + pub dm_allowed_users: Vec, + pub max_attachment_bytes: usize, +} + +impl std::fmt::Debug for MattermostConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MattermostConfig") + .field("enabled", &self.enabled) + .field("base_url", &self.base_url) + .field("token", &"[REDACTED]") + .field("team_id", &self.team_id) + .field("instances", &self.instances) + .field("dm_allowed_users", &self.dm_allowed_users) + .field("max_attachment_bytes", &self.max_attachment_bytes) + .finish() + } +} + +#[derive(Clone)] +pub struct MattermostInstanceConfig { + pub name: String, + pub enabled: bool, + pub base_url: String, + pub token: String, + pub team_id: Option, + pub dm_allowed_users: Vec, + pub max_attachment_bytes: usize, +} + +impl SystemSecrets for MattermostConfig { + fn section() -> &'static str { + "mattermost" + } + + fn is_messaging_adapter() -> bool { + true + } + + fn secret_fields() -> &'static [SecretField] { + &[ + SecretField { + toml_key: "token", + secret_name: "MATTERMOST_TOKEN", + instance_pattern: Some(InstancePattern { + platform_prefix: "MATTERMOST", + field_suffix: "TOKEN", + }), + }, + SecretField { + toml_key: "base_url", + secret_name: "MATTERMOST_BASE_URL", + instance_pattern: Some(InstancePattern { + platform_prefix: "MATTERMOST", + field_suffix: "BASE_URL", + }), + }, + ] + } +} + +impl std::fmt::Debug for MattermostInstanceConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MattermostInstanceConfig") + .field("name", &self.name) + .field("enabled", &self.enabled) + .field("base_url", &self.base_url) + .field("token", &"[REDACTED]") + .field("team_id", &self.team_id) + .field("dm_allowed_users", &self.dm_allowed_users) + .field("max_attachment_bytes", &self.max_attachment_bytes) + .finish() + } +} + + +#[cfg(test)] +mod mattermost_url_tests { + use super::validate_mattermost_url; + + #[test] + fn accepts_https_url() { + assert!(validate_mattermost_url("https://mattermost.example.com").is_ok()); + } + + #[test] + fn accepts_http_localhost_with_warning() { + // http is allowed only for localhost (with a warning) + assert!(validate_mattermost_url("http://localhost:8065").is_ok()); + assert!(validate_mattermost_url("http://127.0.0.1:8065").is_ok()); + assert!(validate_mattermost_url("http://[::1]:8065").is_ok()); + } + + #[test] + fn rejects_http_non_localhost() { + // http is rejected for non-local hosts + assert!(validate_mattermost_url("http://mattermost.example.com").is_err()); + assert!(validate_mattermost_url("http://10.0.0.1").is_err()); + } + + #[test] + fn rejects_invalid_scheme() { + assert!(validate_mattermost_url("ftp://mattermost.example.com").is_err()); + assert!(validate_mattermost_url("ws://mattermost.example.com").is_err()); + } + + #[test] + fn rejects_unparseable_url() { + assert!(validate_mattermost_url("not a url at all").is_err()); + assert!(validate_mattermost_url("").is_err()); + } + + #[test] + fn rejects_credentials_in_url() { + assert!(validate_mattermost_url("https://user:pass@mattermost.example.com").is_err()); + assert!(validate_mattermost_url("https://user@mattermost.example.com").is_err()); + } + + #[test] + fn rejects_non_root_path() { + assert!(validate_mattermost_url("https://mattermost.example.com/some/path").is_err()); + assert!(validate_mattermost_url("https://mattermost.example.com/mattermost").is_err()); + } + + #[test] + fn accepts_root_path() { + assert!(validate_mattermost_url("https://mattermost.example.com/").is_ok()); + assert!(validate_mattermost_url("https://mattermost.example.com").is_ok()); + } + + #[test] + fn rejects_query_string() { + assert!(validate_mattermost_url("https://mattermost.example.com/?token=abc").is_err()); + } + + #[test] + fn rejects_fragment() { + assert!(validate_mattermost_url("https://mattermost.example.com/#section").is_err()); + } +} + diff --git a/src/config/watcher.rs b/src/config/watcher.rs index 8daa4dbdd..ab64d235c 100644 --- a/src/config/watcher.rs +++ b/src/config/watcher.rs @@ -2,8 +2,8 @@ use std::path::PathBuf; use std::sync::Arc; use super::{ - Binding, Config, DiscordPermissions, RuntimeConfig, SignalPermissions, SlackPermissions, - TelegramPermissions, TwitchPermissions, binding_runtime_adapter_key, + Binding, Config, DiscordPermissions, MattermostPermissions, RuntimeConfig, SignalPermissions, + SlackPermissions, TelegramPermissions, TwitchPermissions, binding_runtime_adapter_key, }; /// Per-agent context needed by the file watcher: (id, prompt_dir, identity_dir, @@ -31,6 +31,7 @@ pub fn spawn_file_watcher( slack_permissions: Option>>, telegram_permissions: Option>>, twitch_permissions: Option>>, + mattermost_permissions: Option>>, signal_permissions: Option>>, bindings: Arc>>, messaging_manager: Option>, @@ -241,6 +242,15 @@ pub fn spawn_file_watcher( tracing::info!("twitch permissions reloaded"); } + if let Some(ref perms) = mattermost_permissions + && let Some(mattermost_config) = &config.messaging.mattermost + { + let new_perms = + MattermostPermissions::from_config(mattermost_config, &config.bindings); + perms.store(Arc::new(new_perms)); + tracing::info!("mattermost permissions reloaded"); + } + if let Some(ref perms) = signal_permissions && let Some(signal_config) = &config.messaging.signal { @@ -258,6 +268,7 @@ pub fn spawn_file_watcher( let slack_permissions = slack_permissions.clone(); let telegram_permissions = telegram_permissions.clone(); let twitch_permissions = twitch_permissions.clone(); + let mattermost_permissions = mattermost_permissions.clone(); let signal_permissions = signal_permissions.clone(); let instance_dir = instance_dir.clone(); @@ -597,6 +608,71 @@ pub fn spawn_file_watcher( } } } + + // Mattermost: start default + named instances that are enabled and not already running. + if let Some(mattermost_config) = &config.messaging.mattermost + && mattermost_config.enabled { + if !mattermost_config.base_url.is_empty() + && !mattermost_config.token.is_empty() + && !manager.has_adapter("mattermost").await + { + let permissions = match mattermost_permissions { + Some(ref existing) => existing.clone(), + None => { + let permissions = MattermostPermissions::from_config(mattermost_config, &config.bindings); + Arc::new(arc_swap::ArcSwap::from_pointee(permissions)) + } + }; + match crate::messaging::mattermost::MattermostAdapter::new( + "mattermost", + &mattermost_config.base_url, + mattermost_config.token.as_str(), + mattermost_config.team_id.as_deref().map(Arc::from), + mattermost_config.max_attachment_bytes, + permissions, + ) { + Ok(adapter) => { + if let Err(error) = manager.register_and_start(adapter).await { + tracing::error!(%error, "failed to hot-start mattermost adapter from config change"); + } + } + Err(error) => { + tracing::error!(%error, "failed to build mattermost adapter from config change"); + } + } + } + + for instance in mattermost_config.instances.iter().filter(|instance| instance.enabled) { + let runtime_key = binding_runtime_adapter_key( + "mattermost", + Some(instance.name.as_str()), + ); + if manager.has_adapter(runtime_key.as_str()).await { + continue; + } + + let permissions = Arc::new(arc_swap::ArcSwap::from_pointee( + MattermostPermissions::from_instance_config(instance, &config.bindings), + )); + match crate::messaging::mattermost::MattermostAdapter::new( + runtime_key, + &instance.base_url, + instance.token.as_str(), + instance.team_id.as_deref().map(Arc::from), + instance.max_attachment_bytes, + permissions, + ) { + Ok(adapter) => { + if let Err(error) = manager.register_and_start(adapter).await { + tracing::error!(%error, adapter = %instance.name, "failed to hot-start named mattermost adapter from config change"); + } + } + Err(error) => { + tracing::error!(%error, adapter = %instance.name, "failed to build named mattermost adapter from config change"); + } + } + } + } }); } } diff --git a/src/main.rs b/src/main.rs index 0679c27f3..2a90d658c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1651,6 +1651,7 @@ async fn run( let mut slack_permissions = None; let mut telegram_permissions = None; let mut twitch_permissions = None; + let mut mattermost_permissions = None; let mut signal_permissions = None; initialize_agents( &config, @@ -1669,6 +1670,7 @@ async fn run( &mut slack_permissions, &mut telegram_permissions, &mut twitch_permissions, + &mut mattermost_permissions, &mut signal_permissions, agent_links.clone(), agent_humans.clone(), @@ -1688,6 +1690,7 @@ async fn run( slack_permissions, telegram_permissions, twitch_permissions, + mattermost_permissions, signal_permissions, bindings.clone(), Some(messaging_manager.clone()), @@ -1701,11 +1704,12 @@ async fn run( config_path.clone(), config.instance_dir.clone(), Vec::new(), - None, - None, - None, - None, - None, + None, // discord_permissions + None, // slack_permissions + None, // telegram_permissions + None, // twitch_permissions + None, // mattermost_permissions + None, // signal_permissions bindings.clone(), None, llm_manager.clone(), @@ -2242,6 +2246,7 @@ async fn run( let mut new_slack_permissions = None; let mut new_telegram_permissions = None; let mut new_twitch_permissions = None; + let mut new_mattermost_permissions = None; let mut new_signal_permissions = None; match initialize_agents( &new_config, @@ -2260,6 +2265,7 @@ async fn run( &mut new_slack_permissions, &mut new_telegram_permissions, &mut new_twitch_permissions, + &mut new_mattermost_permissions, &mut new_signal_permissions, agent_links.clone(), agent_humans.clone(), @@ -2278,6 +2284,7 @@ async fn run( new_slack_permissions, new_telegram_permissions, new_twitch_permissions, + new_mattermost_permissions, new_signal_permissions, bindings.clone(), Some(messaging_manager.clone()), @@ -2401,6 +2408,7 @@ async fn initialize_agents( slack_permissions: &mut Option>>, telegram_permissions: &mut Option>>, twitch_permissions: &mut Option>>, + mattermost_permissions: &mut Option>>, signal_permissions: &mut Option>>, agent_links: Arc>>, agent_humans: Arc>>, @@ -3102,6 +3110,68 @@ async fn initialize_agents( } } + // Shared Mattermost permissions (hot-reloadable via file watcher) + *mattermost_permissions = config.messaging.mattermost.as_ref().map(|mattermost_config| { + let perms = spacebot::config::MattermostPermissions::from_config(mattermost_config, &config.bindings); + Arc::new(ArcSwap::from_pointee(perms)) + }); + + if let Some(mattermost_config) = &config.messaging.mattermost + && mattermost_config.enabled + { + if !mattermost_config.base_url.is_empty() && !mattermost_config.token.is_empty() { + match spacebot::messaging::mattermost::MattermostAdapter::new( + "mattermost", + &mattermost_config.base_url, + mattermost_config.token.as_str(), + mattermost_config.team_id.as_deref().map(Arc::from), + mattermost_config.max_attachment_bytes, + mattermost_permissions.clone().ok_or_else(|| { + anyhow::anyhow!("mattermost permissions not initialized when mattermost is enabled") + })?, + ) { + Ok(adapter) => { + new_messaging_manager.register(adapter).await; + } + Err(error) => { + tracing::error!(%error, "failed to create mattermost adapter"); + } + } + } + + for instance in mattermost_config.instances.iter().filter(|instance| instance.enabled) { + if instance.base_url.is_empty() || instance.token.is_empty() { + tracing::warn!(adapter = %instance.name, "skipping enabled mattermost instance with missing credentials"); + continue; + } + let runtime_key = spacebot::config::binding_runtime_adapter_key( + "mattermost", + Some(instance.name.as_str()), + ); + let perms = Arc::new(ArcSwap::from_pointee( + spacebot::config::MattermostPermissions::from_instance_config( + instance, + &config.bindings, + ), + )); + match spacebot::messaging::mattermost::MattermostAdapter::new( + runtime_key, + &instance.base_url, + instance.token.as_str(), + instance.team_id.as_deref().map(Arc::from), + instance.max_attachment_bytes, + perms, + ) { + Ok(adapter) => { + new_messaging_manager.register(adapter).await; + } + Err(error) => { + tracing::error!(%error, adapter = %instance.name, "failed to create named mattermost adapter"); + } + } + } + } + // Shared Signal permissions (hot-reloadable via file watcher) *signal_permissions = config.messaging.signal.as_ref().map(|signal_config| { let perms = spacebot::config::SignalPermissions::from_config(signal_config); diff --git a/src/messaging.rs b/src/messaging.rs index e85ccf063..caad68233 100644 --- a/src/messaging.rs +++ b/src/messaging.rs @@ -1,8 +1,9 @@ -//! Messaging adapters (Discord, Slack, Telegram, Twitch, Signal, Email, Webhook, WebChat). +//! Messaging adapters (Discord, Slack, Telegram, Twitch, Signal, Email, Webhook, WebChat, Mattermost). pub mod discord; pub mod email; pub mod manager; +pub mod mattermost; pub mod signal; pub mod slack; pub mod target; diff --git a/src/messaging/mattermost.rs b/src/messaging/mattermost.rs new file mode 100644 index 000000000..a21ce9dc3 --- /dev/null +++ b/src/messaging/mattermost.rs @@ -0,0 +1,1796 @@ +//! Mattermost messaging adapter using a custom HTTP + WebSocket client. + +use crate::config::MattermostPermissions; +use crate::messaging::apply_runtime_adapter_to_conversation_id; +use crate::messaging::traits::{HistoryMessage, InboundStream, Messaging}; +use crate::{InboundMessage, MessageContent, OutboundResponse, StatusUpdate}; + +use anyhow::Context as _; +use arc_swap::ArcSwap; +use futures::{SinkExt, StreamExt}; +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::{OnceCell, RwLock, mpsc}; +use tokio_tungstenite::{ + connect_async, + tungstenite::Message as WsMessage, +}; +use url::Url; + +const MAX_MESSAGE_LENGTH: usize = 16_383; +const STREAM_EDIT_THROTTLE: Duration = Duration::from_millis(500); +const TYPING_INDICATOR_INTERVAL: Duration = Duration::from_secs(5); +const WS_RECONNECT_BASE_DELAY: Duration = Duration::from_secs(1); +const WS_RECONNECT_MAX_DELAY: Duration = Duration::from_secs(60); +const HTTP_TIMEOUT: Duration = Duration::from_secs(30); + +pub struct MattermostAdapter { + runtime_key: Arc, + base_url: Url, + token: Arc, + default_team_id: Option>, + max_attachment_bytes: usize, + client: Client, + permissions: Arc>, + bot_user_id: OnceCell>, + bot_username: OnceCell>, + user_identity_cache: Arc>>, + channel_name_cache: Arc>>, + dm_channel_cache: Arc>>, + active_messages: Arc>>, + typing_tasks: Arc>>>, + shutdown_tx: Arc>>>, + ws_task: Arc>>>, +} + +struct ActiveStream { + post_id: Arc, + #[allow(dead_code)] + channel_id: Arc, + last_edit: Instant, + accumulated_text: String, +} + +impl std::fmt::Debug for MattermostAdapter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MattermostAdapter") + .field("runtime_key", &self.runtime_key) + .field("base_url", &self.base_url) + .field("token", &"[REDACTED]") + .field("default_team_id", &self.default_team_id) + .field("max_attachment_bytes", &self.max_attachment_bytes) + .finish() + } +} + +impl MattermostAdapter { + /// Create a new [`MattermostAdapter`]. + /// + /// `base_url` must be an origin URL with no path, query, or fragment + /// (e.g. `https://mm.example.com`). Returns an error if the URL is + /// malformed or includes a path component. + /// + /// `runtime_key` is the adapter's unique identifier within the messaging + /// manager (e.g. `"mattermost"` or `"mattermost:myinstance"`). + /// + /// `default_team_id` is used as a fallback when a WS event does not carry + /// a team ID in its broadcast envelope. + pub fn new( + runtime_key: impl Into>, + base_url: &str, + token: impl Into>, + default_team_id: Option>, + max_attachment_bytes: usize, + permissions: Arc>, + ) -> anyhow::Result { + let base_url = Url::parse(base_url).context("invalid mattermost base_url")?; + if base_url.path() != "/" || base_url.query().is_some() || base_url.fragment().is_some() { + return Err(anyhow::anyhow!( + "mattermost base_url must be an origin URL without path/query/fragment (got: {})", + base_url + ).into()); + } + + let client = Client::builder() + .timeout(HTTP_TIMEOUT) + .pool_idle_timeout(Duration::from_secs(30)) + .build() + .context("failed to build HTTP client")?; + + Ok(Self { + runtime_key: runtime_key.into(), + base_url, + token: token.into(), + default_team_id, + max_attachment_bytes, + client, + permissions, + bot_user_id: OnceCell::new(), + bot_username: OnceCell::new(), + user_identity_cache: Arc::new(RwLock::new(HashMap::new())), + channel_name_cache: Arc::new(RwLock::new(HashMap::new())), + dm_channel_cache: Arc::new(RwLock::new(HashMap::new())), + active_messages: Arc::new(RwLock::new(HashMap::new())), + typing_tasks: Arc::new(RwLock::new(HashMap::new())), + shutdown_tx: Arc::new(RwLock::new(None)), + ws_task: Arc::new(RwLock::new(None)), + }) + } + + fn api_url(&self, path: &str) -> Url { + let mut url = self.base_url.clone(); + url.path_segments_mut() + .expect("base_url is a valid base URL") + .extend(["api", "v4"]) + .extend(path.trim_start_matches('/').split('/')); + url + } + + fn ws_url(&self) -> Url { + let mut url = self.base_url.clone(); + url.set_scheme(match self.base_url.scheme() { + "https" => "wss", + "http" => "ws", + other => unreachable!("unsupported URL scheme: {other}"), + }) + .expect("scheme substitution is valid"); + url.path_segments_mut() + .expect("base_url is a valid base URL") + .extend(["api", "v4", "websocket"]); + url + } + + fn extract_channel_id<'a>(&self, message: &'a InboundMessage) -> crate::Result<&'a str> { + message + .metadata + .get("mattermost_channel_id") + .and_then(|v| v.as_str()) + .ok_or_else(|| { + anyhow::anyhow!("missing mattermost_channel_id metadata").into() + }) + } + + /// Validate a Mattermost resource ID (post, channel, user, etc.). + /// + /// A valid ID is 1–64 ASCII alphanumeric characters, hyphens, or + /// underscores. Returns an error for empty strings, IDs that are too long, + /// or IDs that contain characters outside that set (e.g. colons in + /// `dm:{user_id}` composite targets). + fn validate_id(id: &str) -> crate::Result<()> { + if id.is_empty() || id.len() > 64 { + return Err(anyhow::anyhow!("invalid mattermost ID: empty or too long").into()); + } + if !id.chars().all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_') { + return Err(anyhow::anyhow!("invalid mattermost ID format: {id}").into()); + } + Ok(()) + } + + /// Cancel any active typing indicator for `channel_id`. + /// + /// Aborts the background loop that posts `/users/{id}/typing` and removes + /// it from the task map. No-op if no indicator is running. + async fn stop_typing(&self, channel_id: &str) { + if let Some(handle) = self.typing_tasks.write().await.remove(channel_id) { + handle.abort(); + } + } + + /// Start a repeating typing indicator for `channel_id`. + /// + /// Spawns a background task that posts `/users/{bot_id}/typing` every + /// [`TYPING_INDICATOR_INTERVAL`]. Any previously running indicator for the + /// same channel is aborted first to prevent task leaks. The task runs until + /// [`stop_typing`](Self::stop_typing) is called or the adapter shuts down. + /// + /// Does nothing if `bot_user_id` has not been set (i.e. [`start`](Self::start) + /// has not been called yet). + async fn start_typing(&self, channel_id: &str) { + self.stop_typing(channel_id).await; + let Some(user_id) = self.bot_user_id.get().cloned() else { + return; + }; + let channel_id_owned = channel_id.to_string(); + let client = self.client.clone(); + let token = self.token.clone(); + let url = self.api_url(&format!("/users/{user_id}/typing")); + + let handle = tokio::spawn(async move { + let mut interval = tokio::time::interval(TYPING_INDICATOR_INTERVAL); + loop { + interval.tick().await; + let result = client + .post(url.clone()) + .bearer_auth(token.as_ref()) + .json(&serde_json::json!({ + "channel_id": channel_id_owned, + "parent_id": "", + })) + .send() + .await; + if let Err(error) = result { + tracing::warn!(%error, "typing indicator request failed"); + } + } + }); + + self.typing_tasks + .write() + .await + .insert(channel_id.to_string(), handle); + } + + /// Create a new post in `channel_id` and return the created post. + /// + /// Pass `root_id` to place the post inside an existing thread; the root ID + /// must be the ID of the first post in that thread (not a reply). Pass + /// `None` to create a top-level post. + /// + /// Both `channel_id` and `root_id` (if provided) are validated with + /// [`validate_id`](Self::validate_id) before the request is sent. + async fn create_post( + &self, + channel_id: &str, + message: &str, + root_id: Option<&str>, + ) -> crate::Result { + Self::validate_id(channel_id)?; + if let Some(rid) = root_id { + Self::validate_id(rid)?; + } + + let response = self + .client + .post(self.api_url("/posts")) + .bearer_auth(self.token.as_ref()) + .json(&serde_json::json!({ + "channel_id": channel_id, + "message": message, + "root_id": root_id.unwrap_or(""), + })) + .send() + .await + .context("failed to create post")?; + + let status = response.status(); + if !status.is_success() { + let body = response.text().await.unwrap_or_default(); + return Err(anyhow::anyhow!( + "mattermost POST /posts failed with status {}: {body}", + status.as_u16() + ) + .into()); + } + + response + .json() + .await + .context("failed to parse post response") + .map_err(Into::into) + } + + /// Replace the text of an existing post in-place. + /// + /// Used by the streaming path to update the placeholder post created by + /// [`StreamStart`](crate::OutboundResponse::StreamStart) as chunks arrive, + /// and to finalize it on [`StreamEnd`](crate::OutboundResponse::StreamEnd). + async fn edit_post(&self, post_id: &str, message: &str) -> crate::Result<()> { + Self::validate_id(post_id)?; + + let response = self + .client + .put(self.api_url(&format!("/posts/{post_id}"))) + .bearer_auth(self.token.as_ref()) + .json(&serde_json::json!({ "message": message })) + .send() + .await + .context("failed to edit post")?; + + let status = response.status(); + if !status.is_success() { + let body = response.text().await.unwrap_or_default(); + return Err(anyhow::anyhow!( + "mattermost PUT /posts/{post_id} failed with status {}: {body}", + status.as_u16() + ) + .into()); + } + + Ok(()) + } + + /// Fetch up to `limit` posts from `channel_id`, sorted by creation time. + /// + /// Pass `before_post_id` to retrieve posts that appeared before a specific + /// post (exclusive), which is used by [`fetch_history`](Self::fetch_history) + /// to anchor the history window to the triggering message. The Mattermost + /// API always returns the most recent matching posts first; callers are + /// responsible for reversing the order if needed. + async fn get_channel_posts( + &self, + channel_id: &str, + before_post_id: Option<&str>, + limit: u32, + ) -> crate::Result { + Self::validate_id(channel_id)?; + + let mut url = self.api_url(&format!("/channels/{channel_id}/posts")); + { + let mut query = url.query_pairs_mut(); + query.append_pair("page", "0"); + query.append_pair("per_page", &limit.to_string()); + if let Some(before) = before_post_id { + query.append_pair("before", before); + } + } + + let response = self + .client + .get(url) + .bearer_auth(self.token.as_ref()) + .send() + .await + .context("failed to fetch channel posts")?; + + let status = response.status(); + if !status.is_success() { + let body = response.text().await.unwrap_or_default(); + return Err(anyhow::anyhow!( + "mattermost GET /channels/{channel_id}/posts failed with status {}: {body}", + status.as_u16() + ) + .into()); + } + + response + .json() + .await + .context("failed to parse posts response") + .map_err(Into::into) + } + + /// Return the Mattermost channel ID for a direct message conversation with + /// `user_id`, creating it via the API if it does not exist yet. + /// + /// Results are cached in `dm_channel_cache` so that subsequent calls for + /// the same user avoid a round-trip. Requires [`start`](Self::start) to + /// have been called so that `bot_user_id` is available. + async fn get_or_create_dm_channel(&self, user_id: &str) -> crate::Result { + if let Some(channel_id) = self.dm_channel_cache.read().await.get(user_id).cloned() { + return Ok(channel_id); + } + let bot_user_id = self + .bot_user_id + .get() + .ok_or_else(|| anyhow::anyhow!("bot_user_id not initialized"))? + .as_ref() + .to_string(); + let response = self + .client + .post(self.api_url("/channels/direct")) + .bearer_auth(self.token.as_ref()) + .json(&serde_json::json!([bot_user_id, user_id])) + .send() + .await + .context("failed to create DM channel")?; + let status = response.status(); + if !status.is_success() { + let body = response.text().await.unwrap_or_default(); + return Err(anyhow::anyhow!( + "mattermost POST /channels/direct failed with status {}: {body}", + status.as_u16() + ).into()); + } + let channel: MattermostChannel = response + .json() + .await + .context("failed to parse DM channel response")?; + self.dm_channel_cache + .write() + .await + .insert(user_id.to_string(), channel.id.clone()); + Ok(channel.id) + } +} + +impl Messaging for MattermostAdapter { + fn name(&self) -> &str { + &self.runtime_key + } + + async fn start(&self) -> crate::Result { + let (inbound_tx, inbound_rx) = mpsc::channel(256); + let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1); + *self.shutdown_tx.write().await = Some(shutdown_tx); + + let me_response = self + .client + .get(self.api_url("/users/me")) + .bearer_auth(self.token.as_ref()) + .send() + .await + .context("failed to get bot user")?; + let me_status = me_response.status(); + if !me_status.is_success() { + let body = me_response.text().await.unwrap_or_default(); + return Err(anyhow::anyhow!( + "mattermost /users/me failed with status {}: {body}", + me_status.as_u16() + ).into()); + } + let me: MattermostUser = me_response + .json() + .await + .context("failed to parse user response")?; + + let user_id: Arc = me.id.clone().into(); + let username: Arc = me.username.clone().into(); + + if self.bot_user_id.set(user_id.clone()).is_err() { + tracing::warn!(adapter = %self.runtime_key, "bot_user_id already initialized — start() called more than once"); + } + if self.bot_username.set(username.clone()).is_err() { + tracing::warn!(adapter = %self.runtime_key, "bot_username already initialized — start() called more than once"); + } + + tracing::info!( + adapter = %self.runtime_key, + bot_id = %user_id, + bot_username = %username, + "mattermost adapter connected" + ); + + let ws_url = self.ws_url(); + let runtime_key = self.runtime_key.clone(); + let token = self.token.clone(); + let permissions = self.permissions.clone(); + let bot_user_id = user_id; + let bot_username_ws = username; + let user_identity_cache = self.user_identity_cache.clone(); + let channel_name_cache = self.channel_name_cache.clone(); + let ws_client = self.client.clone(); + let ws_base_url = self.base_url.clone(); + let inbound_tx_clone = inbound_tx.clone(); + let default_team_id = self.default_team_id.clone(); + + let handle = tokio::spawn(async move { + let mut retry_delay = WS_RECONNECT_BASE_DELAY; + + loop { + let connect_result = connect_async(ws_url.as_str()).await; + + match connect_result { + Ok((ws_stream, _)) => { + retry_delay = WS_RECONNECT_BASE_DELAY; + + let (mut write, mut read) = ws_stream.split(); + + let auth_msg = serde_json::json!({ + "seq": 1, + "action": "authentication_challenge", + "data": {"token": token.as_ref()} + }); + + if let Ok(msg) = serde_json::to_string(&auth_msg) { + if write.send(WsMessage::Text(msg.into())).await.is_err() { + tracing::error!(adapter = %runtime_key, "failed to send websocket auth"); + continue; + } + } + + loop { + tokio::select! { + _ = shutdown_rx.recv() => { + tracing::info!(adapter = %runtime_key, "mattermost websocket shutting down"); + if let Err(error) = write.send(WsMessage::Close(None)).await { + tracing::debug!(%error, "failed to send websocket close frame"); + } + return; + } + + msg = read.next() => { + match msg { + Some(Ok(WsMessage::Text(text))) => { + match serde_json::from_str::(&text) { + Err(error) => { + tracing::debug!(%error, text = text.as_str(), "failed to parse Mattermost WS event envelope"); + } + Ok(event) => { + if event.event == "posted" { + // The post is double-encoded as a JSON string in the data field. + let post_result = event + .data + .get("post") + .and_then(|v| v.as_str()) + .map(|s| serde_json::from_str::(s)); + + let post_result = match post_result { + Some(Ok(p)) => Some(p), + Some(Err(error)) => { + tracing::debug!(%error, "failed to parse Mattermost WS post payload"); + None + } + None => None, + }; + + if let Some(mut post) = post_result { + if post.user_id != bot_user_id.as_ref() { + // channel_type comes from event.data, not the post struct. + let channel_type = event + .data + .get("channel_type") + .and_then(|v| v.as_str()) + .map(String::from); + post.channel_type = channel_type; + + let team_id = event.broadcast.team_id.clone() + .or_else(|| default_team_id.as_ref().map(|s| s.to_string())); + let perms = permissions.load(); + + let display_name = resolve_user_display_name( + &user_identity_cache, + &ws_client, + token.as_ref(), + &ws_base_url, + &post.user_id, + ).await; + let channel_name = resolve_channel_name( + &channel_name_cache, + &ws_client, + token.as_ref(), + &ws_base_url, + &post.channel_id, + ).await; + + if let Some(mut msg) = build_message_from_post( + &post, + &runtime_key, + &bot_user_id, + &bot_username_ws, + &team_id, + &perms, + display_name.as_deref(), + channel_name.as_deref(), + ) { + // Detect thread replies to the bot: + // if root_id is set, fetch the root post + // and check if the bot authored it. + if !post.root_id.is_empty() { + if let Some(root_author) = resolve_root_post_author( + &ws_client, + token.as_ref(), + &ws_base_url, + &post.root_id, + ).await { + if root_author == bot_user_id.as_ref() { + msg.metadata.insert( + "mattermost_mentions_or_replies_to_bot".into(), + serde_json::json!(true), + ); + } + } + } + if inbound_tx_clone.send(msg).await.is_err() { + tracing::debug!("inbound channel closed"); + return; + } + } + } + } + } + } // close Ok(event) arm + } // close match MattermostWsEvent + } + Some(Ok(WsMessage::Ping(data))) => { + if write.send(WsMessage::Pong(data)).await.is_err() { + tracing::warn!(adapter = %runtime_key, "failed to send pong"); + break; + } + } + Some(Ok(WsMessage::Pong(_))) => {} + Some(Ok(WsMessage::Close(_))) => { + tracing::info!(adapter = %runtime_key, "websocket closed by server"); + break; + } + Some(Err(e)) => { + tracing::error!(adapter = %runtime_key, error = %e, "websocket error"); + break; + } + None => break, + _ => {} + } + } + } + } + + tracing::info!(adapter = %runtime_key, "websocket disconnected, reconnecting..."); + } + Err(e) => { + tracing::error!( + adapter = %runtime_key, + error = %e, + delay_ms = retry_delay.as_millis(), + "websocket connection failed, retrying" + ); + } + } + + tokio::select! { + _ = tokio::time::sleep(retry_delay) => { + retry_delay = (retry_delay * 2).min(WS_RECONNECT_MAX_DELAY); + } + _ = shutdown_rx.recv() => { + tracing::info!(adapter = %runtime_key, "mattermost adapter shutting down during reconnect delay"); + return; + } + } + } + }); + + *self.ws_task.write().await = Some(handle); + + let stream = tokio_stream::wrappers::ReceiverStream::new(inbound_rx); + Ok(Box::pin(stream)) + } + + async fn respond( + &self, + message: &InboundMessage, + response: OutboundResponse, + ) -> crate::Result<()> { + let channel_id = self.extract_channel_id(message)?; + + match response { + OutboundResponse::Text(text) => { + self.stop_typing(channel_id).await; + // Use root_id for threading: prefer mattermost_root_id (when triggered from a + // threaded message) or REPLY_TO_MESSAGE_ID (set by channel.rs for branch/worker + // replies). + let root_id = message + .metadata + .get("mattermost_root_id") + .and_then(|v| v.as_str()) + .or_else(|| { + message + .metadata + .get(crate::metadata_keys::REPLY_TO_MESSAGE_ID) + .and_then(|v| v.as_str()) + }); + + for chunk in split_message(&text, MAX_MESSAGE_LENGTH) { + self.create_post(channel_id, &chunk, root_id).await?; + } + } + + OutboundResponse::StreamStart => { + let root_id = message + .metadata + .get("mattermost_root_id") + .and_then(|v| v.as_str()) + .or_else(|| { + message + .metadata + .get(crate::metadata_keys::REPLY_TO_MESSAGE_ID) + .and_then(|v| v.as_str()) + }); + self.start_typing(channel_id).await; + // Create a placeholder post with a zero-width space. + let post = match self.create_post(channel_id, "\u{200B}", root_id).await { + Ok(p) => p, + Err(error) => { + self.stop_typing(channel_id).await; + return Err(error); + } + }; + self.active_messages.write().await.insert( + message.id.clone(), + ActiveStream { + post_id: post.id.into(), + channel_id: channel_id.to_string().into(), + last_edit: Instant::now(), + accumulated_text: String::new(), + }, + ); + } + + OutboundResponse::StreamChunk(chunk) => { + let pending_edit = { + let mut active_messages = self.active_messages.write().await; + if let Some(active) = active_messages.get_mut(&message.id) { + active.accumulated_text.push_str(&chunk); + + if active.last_edit.elapsed() > STREAM_EDIT_THROTTLE { + let display_text = if active.accumulated_text.len() > MAX_MESSAGE_LENGTH + { + let end = active + .accumulated_text + .floor_char_boundary(MAX_MESSAGE_LENGTH - 3); + format!("{}...", &active.accumulated_text[..end]) + } else { + active.accumulated_text.clone() + }; + active.last_edit = Instant::now(); + Some((active.post_id.clone(), display_text)) + } else { + None + } + } else { + None + } + }; + if let Some((post_id, display_text)) = pending_edit { + if let Err(error) = self.edit_post(&post_id, &display_text).await { + tracing::warn!(%error, "failed to edit streaming message"); + } + } + } + + OutboundResponse::StreamEnd => { + self.stop_typing(channel_id).await; + let root_id = message + .metadata + .get("mattermost_root_id") + .and_then(|v| v.as_str()) + .or_else(|| { + message + .metadata + .get(crate::metadata_keys::REPLY_TO_MESSAGE_ID) + .and_then(|v| v.as_str()) + }); + if let Some(active) = self.active_messages.write().await.remove(&message.id) { + let chunks = split_message(&active.accumulated_text, MAX_MESSAGE_LENGTH); + let mut first = true; + for chunk in chunks { + if first { + first = false; + if let Err(error) = self.edit_post(&active.post_id, &chunk).await { + tracing::warn!(%error, "failed to finalize streaming message"); + } + } else { + if let Err(error) = self.create_post(channel_id, &chunk, root_id).await { + tracing::warn!(%error, "failed to create overflow chunk for streaming message"); + } + } + } + } + } + + OutboundResponse::Status(status) => self.send_status(message, status).await?, + + OutboundResponse::Reaction(emoji) => { + let post_id = message + .metadata + .get("mattermost_post_id") + .and_then(|v| v.as_str()) + .ok_or_else(|| { + anyhow::anyhow!("missing mattermost_post_id metadata") + })?; + let emoji_name = sanitize_reaction_name(&emoji); + + let bot_user_id = self + .bot_user_id + .get() + .ok_or_else(|| anyhow::anyhow!("bot_user_id not initialized; call start() first"))? + .as_ref() + .to_string(); + + let response = self + .client + .post(self.api_url("/reactions")) + .bearer_auth(self.token.as_ref()) + .json(&serde_json::json!({ + "user_id": bot_user_id, + "post_id": post_id, + "emoji_name": emoji_name, + })) + .send() + .await + .context("failed to add reaction")?; + + if !response.status().is_success() { + tracing::warn!( + status = %response.status(), + emoji = %emoji_name, + "failed to add reaction" + ); + } + } + + OutboundResponse::File { + filename, + data, + mime_type, + caption, + } => { + if data.len() > self.max_attachment_bytes { + return Err(anyhow::anyhow!( + "file too large: {} bytes (max: {})", + data.len(), + self.max_attachment_bytes + ) + .into()); + } + + let part = reqwest::multipart::Part::bytes(data) + .file_name(filename.clone()) + .mime_str(&mime_type) + .context("invalid mime type")?; + + let form = reqwest::multipart::Form::new() + .part("files", part) + .text("channel_id", channel_id.to_string()); + + let response = self + .client + .post(self.api_url("/files")) + .bearer_auth(self.token.as_ref()) + .multipart(form) + .send() + .await + .context("failed to upload file")?; + + if !response.status().is_success() { + let body = response.text().await.unwrap_or_default(); + return Err(anyhow::anyhow!( + "mattermost file upload failed: {body}" + ) + .into()); + } + + let upload: MattermostFileUpload = response + .json() + .await + .context("failed to parse file upload response")?; + + let file_ids: Vec<_> = + upload.file_infos.iter().map(|f| f.id.as_str()).collect(); + let root_id = message + .metadata + .get("mattermost_root_id") + .and_then(|v| v.as_str()) + .or_else(|| { + message + .metadata + .get(crate::metadata_keys::REPLY_TO_MESSAGE_ID) + .and_then(|v| v.as_str()) + }); + let post_response = self.client + .post(self.api_url("/posts")) + .bearer_auth(self.token.as_ref()) + .json(&serde_json::json!({ + "channel_id": channel_id, + "message": caption.unwrap_or_default(), + "file_ids": file_ids, + "root_id": root_id.unwrap_or(""), + })) + .send() + .await + .context("failed to create post with file")?; + let post_status = post_response.status(); + if !post_status.is_success() { + let body = post_response.text().await.unwrap_or_default(); + return Err(anyhow::anyhow!( + "mattermost POST /posts (file) failed with status {}: {body}", + post_status.as_u16() + ).into()); + } + } + + _ => { + tracing::debug!(?response, "mattermost adapter does not support this response type"); + } + } + + Ok(()) + } + + async fn send_status( + &self, + message: &InboundMessage, + status: StatusUpdate, + ) -> crate::Result<()> { + let channel_id = self.extract_channel_id(message)?; + + match status { + StatusUpdate::Thinking => { + self.start_typing(channel_id).await; + } + StatusUpdate::StopTyping => { + self.stop_typing(channel_id).await; + } + _ => {} + } + + Ok(()) + } + + async fn fetch_history( + &self, + message: &InboundMessage, + limit: usize, + ) -> crate::Result> { + let channel_id = self.extract_channel_id(message)?; + let before_post_id = message + .metadata + .get("mattermost_post_id") + .and_then(|v| v.as_str()); + + let capped_limit = limit.min(200) as u32; + let posts = self + .get_channel_posts(channel_id, before_post_id, capped_limit) + .await?; + + let bot_id = self + .bot_user_id + .get() + .map(|s| s.as_ref().to_string()); + + let mut posts_vec: Vec<_> = posts + .posts + .into_values() + .filter(|p| bot_id.as_deref() != Some(p.user_id.as_str())) + .collect(); + posts_vec.sort_by_key(|p| p.create_at); + + let history: Vec = posts_vec + .into_iter() + .map(|p| HistoryMessage { + author: p.user_id, + content: p.message, + is_bot: false, + timestamp: chrono::DateTime::from_timestamp_millis(p.create_at), + }) + .collect(); + + Ok(history) + } + + async fn health_check(&self) -> crate::Result<()> { + let response = self + .client + .get(self.api_url("/system/ping")) + .bearer_auth(self.token.as_ref()) + .send() + .await + .context("health check request failed")?; + + if !response.status().is_success() { + return Err(anyhow::anyhow!( + "mattermost health check failed: status {}", + response.status() + ) + .into()); + } + + Ok(()) + } + + async fn shutdown(&self) -> crate::Result<()> { + if let Some(tx) = self.shutdown_tx.write().await.take() { + tx.send(()).await.ok(); + } + + if let Some(handle) = self.ws_task.write().await.take() { + handle.abort(); + } + + for (_, handle) in self.typing_tasks.write().await.drain() { + handle.abort(); + } + self.active_messages.write().await.clear(); + + tracing::info!(adapter = %self.runtime_key, "mattermost adapter shut down"); + Ok(()) + } + + async fn broadcast(&self, target: &str, response: OutboundResponse) -> crate::Result<()> { + // Resolve DM targets (dm:{user_id}) to a real Mattermost channel ID. + let resolved_target; + let target = if let Some(user_id) = target.strip_prefix("dm:") { + resolved_target = self.get_or_create_dm_channel(user_id).await?; + resolved_target.as_str() + } else { + target + }; + + match response { + OutboundResponse::Text(text) => { + for chunk in split_message(&text, MAX_MESSAGE_LENGTH) { + self.create_post(target, &chunk, None).await?; + } + } + OutboundResponse::File { + filename, + data, + mime_type, + caption, + } => { + if data.len() > self.max_attachment_bytes { + return Err(anyhow::anyhow!( + "file too large: {} bytes (max: {})", + data.len(), + self.max_attachment_bytes + ) + .into()); + } + let part = reqwest::multipart::Part::bytes(data) + .file_name(filename) + .mime_str(&mime_type) + .context("invalid mime type")?; + let form = reqwest::multipart::Form::new() + .part("files", part) + .text("channel_id", target.to_string()); + let upload_response = self + .client + .post(self.api_url("/files")) + .bearer_auth(self.token.as_ref()) + .multipart(form) + .send() + .await + .context("failed to upload file")?; + let upload_status = upload_response.status(); + if !upload_status.is_success() { + let body = upload_response.text().await.unwrap_or_default(); + return Err(anyhow::anyhow!( + "mattermost file upload failed with status {}: {body}", + upload_status.as_u16() + ).into()); + } + let upload: MattermostFileUpload = upload_response + .json() + .await + .context("failed to parse file upload response")?; + let file_ids: Vec<_> = + upload.file_infos.iter().map(|f| f.id.as_str()).collect(); + let post_response = self.client + .post(self.api_url("/posts")) + .bearer_auth(self.token.as_ref()) + .json(&serde_json::json!({ + "channel_id": target, + "message": caption.unwrap_or_default(), + "file_ids": file_ids, + })) + .send() + .await + .context("failed to create post with file")?; + let post_status = post_response.status(); + if !post_status.is_success() { + let body = post_response.text().await.unwrap_or_default(); + return Err(anyhow::anyhow!( + "mattermost create post with file failed with status {}: {body}", + post_status.as_u16() + ).into()); + } + } + other => { + tracing::debug!(?other, "mattermost broadcast does not support this response type"); + } + } + Ok(()) + } +} + +/// Convert a [`MattermostPost`] from a WebSocket event into an [`InboundMessage`], +/// applying all permission filters. +/// +/// Returns `None` (message dropped) when any of the following hold: +/// - The post was authored by the bot itself. +/// - A `team_filter` is configured and the event's team ID does not match, or +/// the team ID is absent (fail-closed). +/// - A `channel_filter` is configured and the channel is not in the allow-list +/// for the event's team, or the team ID is absent (fail-closed). +/// - The channel is a direct message (`channel_type = "D"`) and either +/// `dm_allowed_users` is empty or the sender is not listed. +/// +/// When a message passes all filters the following metadata keys are set: +/// `message_id`, `mattermost_post_id`, `mattermost_channel_id`, +/// `mattermost_team_id` (if known), `mattermost_root_id` (if in a thread), +/// `sender_display_name` (if `display_name` is provided), +/// `mattermost_channel_name` and `channel_name` (if `channel_name` is provided), +/// and `mattermost_mentions_or_replies_to_bot` (true when the message text +/// contains `@{bot_username}`). +fn build_message_from_post( + post: &MattermostPost, + runtime_key: &str, + bot_user_id: &str, + bot_username: &str, + team_id: &Option, + permissions: &MattermostPermissions, + display_name: Option<&str>, + channel_name: Option<&str>, +) -> Option { + if post.user_id == bot_user_id { + return None; + } + + if let Some(team_filter) = &permissions.team_filter { + // Fail-closed: no team_id in the event → can't verify team → reject. + let Some(tid) = team_id else { return None }; + if !team_filter.contains(tid) { + return None; + } + } + + if !permissions.channel_filter.is_empty() { + // Fail-closed: no team_id or no allowlist entry for this team → reject. + let Some(tid) = team_id else { return None }; + let Some(allowed_channels) = permissions.channel_filter.get(tid) else { + return None; + }; + if !allowed_channels.contains(&post.channel_id) { + return None; + } + } + + // DM filter: if channel_type is "D", enforce dm_allowed_users (fail-closed) + if post.channel_type.as_deref() == Some("D") { + if permissions.dm_allowed_users.is_empty() { + return None; + } + if !permissions.dm_allowed_users.contains(&post.user_id) { + return None; + } + } + + // "D" = direct message, "G" = group DM + let conversation_id = if post.channel_type.as_deref() == Some("D") { + apply_runtime_adapter_to_conversation_id( + runtime_key, + format!( + "mattermost:{}:dm:{}", + team_id.as_deref().unwrap_or(""), + post.user_id + ), + ) + } else { + apply_runtime_adapter_to_conversation_id( + runtime_key, + format!( + "mattermost:{}:{}", + team_id.as_deref().unwrap_or(""), + post.channel_id + ), + ) + }; + + let mut metadata = HashMap::new(); + + metadata.insert( + crate::metadata_keys::MESSAGE_ID.into(), + serde_json::json!(&post.id), + ); + + metadata.insert("mattermost_post_id".into(), serde_json::json!(&post.id)); + metadata.insert( + "mattermost_channel_id".into(), + serde_json::json!(&post.channel_id), + ); + if let Some(tid) = team_id { + metadata.insert("mattermost_team_id".into(), serde_json::json!(tid)); + } + if !post.root_id.is_empty() { + metadata.insert( + "mattermost_root_id".into(), + serde_json::json!(&post.root_id), + ); + } + + // FN1: sender display name + if let Some(dn) = display_name { + metadata.insert("sender_display_name".into(), serde_json::json!(dn)); + } + + // FN2: channel name + if let Some(cn) = channel_name { + metadata.insert("mattermost_channel_name".into(), serde_json::json!(cn)); + metadata.insert( + crate::metadata_keys::CHANNEL_NAME.into(), + serde_json::json!(cn), + ); + } + + // FN4: bot mention detection — @mention, DM, or thread reply to bot post. + // Thread-reply-to-bot detection is handled asynchronously in the WS event + // handler and may upgrade this to true after this function returns. + let is_dm = post.channel_type.as_deref() == Some("D"); + let mentions_bot = is_dm + || (!bot_username.is_empty() && post.message.contains(&format!("@{bot_username}"))); + metadata.insert( + "mattermost_mentions_or_replies_to_bot".into(), + serde_json::json!(mentions_bot), + ); + + // FN1: formatted_author — "Display Name" when display name is available + let formatted_author = display_name.map(|dn| dn.to_string()); + + Some(InboundMessage { + id: post.id.clone(), + source: "mattermost".into(), + adapter: Some(runtime_key.to_string()), + conversation_id, + sender_id: post.user_id.clone(), + agent_id: None, + content: MessageContent::Text(post.message.clone()), + timestamp: chrono::DateTime::from_timestamp_millis(post.create_at) + .unwrap_or_else(chrono::Utc::now), + metadata, + formatted_author, + }) +} + +// --- API Types --- + +#[derive(Debug, Clone, Deserialize)] +struct MattermostUser { + id: String, + username: String, + #[serde(default)] + first_name: String, + #[serde(default)] + last_name: String, + #[serde(default)] + nickname: String, +} + +impl MattermostUser { + /// Resolve the best available display name for this user. + /// + /// Priority order: nickname → "first last" (trimmed) → username. + fn display_name(&self) -> String { + if !self.nickname.is_empty() { + return self.nickname.clone(); + } + let full = format!("{} {}", self.first_name, self.last_name); + let full = full.trim(); + if !full.is_empty() { + return full.to_string(); + } + self.username.clone() + } +} + +#[derive(Debug, Deserialize)] +struct MattermostChannel { + id: String, + display_name: String, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +struct MattermostPost { + id: String, + create_at: i64, + #[allow(dead_code)] + update_at: i64, + user_id: String, + channel_id: String, + root_id: String, + message: String, + /// "D" = direct message, "G" = group DM, "O" = public, "P" = private. + /// Not present in REST list responses; injected from WS event data. + #[serde(default)] + channel_type: Option, + #[serde(default)] + #[allow(dead_code)] + file_ids: Vec, +} + +#[derive(Debug, Deserialize)] +struct MattermostPostList { + #[serde(default)] + #[allow(dead_code)] + order: Vec, + #[serde(default)] + posts: HashMap, +} + +#[derive(Debug, Deserialize)] +struct MattermostFileUpload { + #[serde(default)] + file_infos: Vec, +} + +#[derive(Debug, Deserialize)] +struct MattermostFileInfo { + id: String, + #[allow(dead_code)] + name: String, +} + +#[derive(Debug, Deserialize)] +struct MattermostWsEvent { + event: String, + #[serde(default)] + data: serde_json::Value, + #[serde(default)] + broadcast: MattermostWsBroadcast, +} + +#[derive(Debug, Deserialize, Default)] +struct MattermostWsBroadcast { + #[serde(default)] + #[allow(dead_code)] + channel_id: Option, + #[serde(default)] + team_id: Option, + #[serde(default)] + #[allow(dead_code)] + user_id: Option, +} + +/// Look up the display name for a Mattermost user by ID. +/// +/// Returns the cached value if already resolved, otherwise calls +/// `GET /api/v4/users/{user_id}` and caches the result. The resolved name +/// follows the same priority order as [`MattermostUser::display_name`]: +/// nickname → "first last" → username. Returns `None` on any network or +/// API error (logged at `DEBUG` level). +async fn resolve_user_display_name( + cache: &RwLock>, + client: &Client, + token: &str, + base_url: &Url, + user_id: &str, +) -> Option { + if let Some(name) = cache.read().await.get(user_id).cloned() { + return Some(name); + } + let mut url = base_url.clone(); + url.path_segments_mut() + .ok()? + .extend(["api", "v4", "users", user_id]); + let resp = match client.get(url).bearer_auth(token).send().await { + Ok(r) => r, + Err(error) => { + tracing::debug!(%error, user_id, "failed to fetch mattermost user"); + return None; + } + }; + if !resp.status().is_success() { + tracing::debug!(status = %resp.status(), user_id, "mattermost user fetch returned non-success"); + return None; + } + let user: MattermostUser = match resp.json().await { + Ok(u) => u, + Err(error) => { + tracing::debug!(%error, user_id, "failed to parse mattermost user response"); + return None; + } + }; + let name = user.display_name(); + cache.write().await.insert(user_id.to_string(), name.clone()); + Some(name) +} + +/// Look up the display name for a Mattermost channel by ID. +/// +/// Returns the cached value if already resolved, otherwise calls +/// `GET /api/v4/channels/{channel_id}` and caches the result using the +/// channel's `display_name` field. Returns `None` on any network or API +/// error (logged at `DEBUG` level). +async fn resolve_channel_name( + cache: &RwLock>, + client: &Client, + token: &str, + base_url: &Url, + channel_id: &str, +) -> Option { + if let Some(name) = cache.read().await.get(channel_id).cloned() { + return Some(name); + } + let mut url = base_url.clone(); + url.path_segments_mut() + .ok()? + .extend(["api", "v4", "channels", channel_id]); + let resp = match client.get(url).bearer_auth(token).send().await { + Ok(r) => r, + Err(error) => { + tracing::debug!(%error, channel_id, "failed to fetch mattermost channel"); + return None; + } + }; + if !resp.status().is_success() { + tracing::debug!(status = %resp.status(), channel_id, "mattermost channel fetch returned non-success"); + return None; + } + let channel: MattermostChannel = match resp.json().await { + Ok(c) => c, + Err(error) => { + tracing::debug!(%error, channel_id, "failed to parse mattermost channel response"); + return None; + } + }; + let name = channel.display_name; + cache.write().await.insert(channel_id.to_string(), name.clone()); + Some(name) +} + +/// Fetch the `user_id` of the author of a Mattermost post by its ID. +/// +/// Used to detect thread replies to the bot: if a post's `root_id` resolves to +/// a post authored by the bot, the reply should be treated as directed at the +/// bot for `require_mention` purposes. Returns `None` on any network or API +/// error (logged at `DEBUG` level). +async fn resolve_root_post_author( + client: &Client, + token: &str, + base_url: &Url, + post_id: &str, +) -> Option { + let mut url = base_url.clone(); + url.path_segments_mut() + .ok()? + .extend(["api", "v4", "posts", post_id]); + let resp = match client.get(url).bearer_auth(token).send().await { + Ok(r) => r, + Err(error) => { + tracing::debug!(%error, post_id, "failed to fetch mattermost root post"); + return None; + } + }; + if !resp.status().is_success() { + tracing::debug!(status = %resp.status(), post_id, "mattermost root post fetch returned non-success"); + return None; + } + let post: MattermostPost = match resp.json().await { + Ok(p) => p, + Err(error) => { + tracing::debug!(%error, post_id, "failed to parse mattermost root post response"); + return None; + } + }; + Some(post.user_id) +} + +/// Convert an emoji input to a Mattermost reaction short-code name. +/// +/// Handles three input forms: +/// 1. Unicode emoji (e.g. "👍") → looked up via the `emojis` crate → "thumbsup" +/// 2. Colon-wrapped short-code (e.g. ":thumbsup:") → stripped to "thumbsup" +/// 3. Plain short-code (e.g. "thumbsup") → lowercased and passed through +fn sanitize_reaction_name(emoji: &str) -> String { + let trimmed = emoji.trim(); + if let Some(e) = emojis::get(trimmed) { + if let Some(shortcode) = e.shortcode() { + return shortcode.to_string(); + } + return e.name().replace(' ', "_").to_lowercase(); + } + trimmed + .trim_start_matches(':') + .trim_end_matches(':') + .to_lowercase() +} + +/// Split `text` into chunks no longer than `max_len` bytes. +/// +/// Splits preferentially on newlines, then on spaces, and falls back to a +/// hard character-boundary break when no whitespace is found. All splits +/// respect UTF-8 character boundaries. Leading whitespace is stripped from +/// each chunk after a split. +fn split_message(text: &str, max_len: usize) -> Vec { + if text.len() <= max_len { + return vec![text.to_string()]; + } + + let mut chunks = Vec::new(); + let mut remaining = text; + + while !remaining.is_empty() { + if remaining.len() <= max_len { + chunks.push(remaining.to_string()); + break; + } + + let search_end = remaining.floor_char_boundary(max_len); + let search_region = &remaining[..search_end]; + // Ignore a break point at position 0 to avoid pushing an empty chunk + // (e.g. when the region starts with a newline or space). + let break_point = search_region + .rfind('\n') + .or_else(|| search_region.rfind(' ')) + .filter(|&pos| pos > 0) + .unwrap_or(search_end); + + let end = remaining.floor_char_boundary(break_point); + chunks.push(remaining[..end].to_string()); + remaining = remaining[end..].trim_start_matches('\n').trim_start(); + } + + chunks +} + +#[cfg(test)] +mod tests { + use super::*; + + // --- helpers --- + + fn post(user_id: &str, channel_id: &str, channel_type: Option<&str>) -> MattermostPost { + MattermostPost { + id: "post1".into(), + create_at: 0, + update_at: 0, + user_id: user_id.into(), + channel_id: channel_id.into(), + root_id: String::new(), + message: "hello".into(), + channel_type: channel_type.map(String::from), + file_ids: vec![], + } + } + + fn no_filters() -> MattermostPermissions { + MattermostPermissions { + team_filter: None, + channel_filter: HashMap::new(), + dm_allowed_users: vec![], + } + } + + fn build_message_from_mattermost_post(post: &MattermostPost, bot_id: &str, team_id: Option<&str>, perms: &MattermostPermissions) -> Option { + build_message_from_post(post, "mattermost", bot_id, "botuser", &team_id.map(String::from), perms, None, None) + } + + fn build_message_from_mattermost_post_named(post: &MattermostPost, bot_id: &str, bot_username: &str, team_id: Option<&str>, perms: &MattermostPermissions, display_name: Option<&str>, channel_name: Option<&str>) -> Option { + build_message_from_post(post, "mattermost", bot_id, bot_username, &team_id.map(String::from), perms, display_name, channel_name) + } + + // --- build_message_from_post --- + + #[test] + fn bot_messages_are_filtered() { + let p = post("bot123", "chan1", None); + assert!(build_message_from_mattermost_post(&p, "bot123", None, &no_filters()).is_none()); + } + + #[test] + fn non_bot_message_passes_without_filters() { + let p = post("user1", "chan1", None); + assert!(build_message_from_mattermost_post(&p, "bot123", Some("team1"), &no_filters()).is_some()); + } + + #[test] + fn team_filter_allows_matching_team() { + let p = post("user1", "chan1", None); + let perms = MattermostPermissions { + team_filter: Some(vec!["team1".into()]), + channel_filter: HashMap::new(), + dm_allowed_users: vec![], + }; + assert!(build_message_from_mattermost_post(&p, "bot", Some("team1"), &perms).is_some()); + } + + #[test] + fn team_filter_rejects_wrong_team() { + let p = post("user1", "chan1", None); + let perms = MattermostPermissions { + team_filter: Some(vec!["team1".into()]), + channel_filter: HashMap::new(), + dm_allowed_users: vec![], + }; + assert!(build_message_from_mattermost_post(&p, "bot", Some("team2"), &perms).is_none()); + } + + #[test] + fn team_filter_fail_closed_when_team_id_absent() { + let p = post("user1", "chan1", None); + let perms = MattermostPermissions { + team_filter: Some(vec!["team1".into()]), + channel_filter: HashMap::new(), + dm_allowed_users: vec![], + }; + // No team_id in the event — must reject (fail-closed) + assert!(build_message_from_mattermost_post(&p, "bot", None, &perms).is_none()); + } + + #[test] + fn channel_filter_allows_matching_channel() { + let p = post("user1", "chan1", None); + let mut cf = HashMap::new(); + cf.insert("team1".into(), vec!["chan1".into()]); + let perms = MattermostPermissions { team_filter: None, channel_filter: cf, dm_allowed_users: vec![] }; + assert!(build_message_from_mattermost_post(&p, "bot", Some("team1"), &perms).is_some()); + } + + #[test] + fn channel_filter_rejects_unlisted_channel() { + let p = post("user1", "chan2", None); + let mut cf = HashMap::new(); + cf.insert("team1".into(), vec!["chan1".into()]); + let perms = MattermostPermissions { team_filter: None, channel_filter: cf, dm_allowed_users: vec![] }; + assert!(build_message_from_mattermost_post(&p, "bot", Some("team1"), &perms).is_none()); + } + + #[test] + fn channel_filter_fail_closed_when_team_id_absent() { + let p = post("user1", "chan1", None); + let mut cf = HashMap::new(); + cf.insert("team1".into(), vec!["chan1".into()]); + let perms = MattermostPermissions { team_filter: None, channel_filter: cf, dm_allowed_users: vec![] }; + // No team_id → can't look up allowed channels → reject + assert!(build_message_from_mattermost_post(&p, "bot", None, &perms).is_none()); + } + + #[test] + fn channel_filter_fail_closed_when_team_not_in_filter() { + // channel_filter has an entry for team1 but the message comes from team2. + // Even though chan1 is the allowed channel, the missing team2 entry must + // reject (fail-closed), not silently pass through. + let p = post("user1", "chan1", None); + let mut cf = HashMap::new(); + cf.insert("team1".into(), vec!["chan1".into()]); + let perms = MattermostPermissions { team_filter: None, channel_filter: cf, dm_allowed_users: vec![] }; + assert!(build_message_from_mattermost_post(&p, "bot", Some("team2"), &perms).is_none()); + } + + fn dm_perms(allowed: &[&str]) -> MattermostPermissions { + MattermostPermissions { + team_filter: None, + channel_filter: HashMap::new(), + dm_allowed_users: allowed.iter().map(|s| s.to_string()).collect(), + } + } + + #[test] + fn dm_blocked_when_dm_allowed_users_empty() { + let p = post("user1", "chan1", Some("D")); + assert!(build_message_from_mattermost_post(&p, "bot", Some("team1"), &no_filters()).is_none()); + } + + #[test] + fn dm_allowed_for_listed_user() { + let p = post("user1", "chan1", Some("D")); + assert!(build_message_from_mattermost_post(&p, "bot", Some("team1"), &dm_perms(&["user1"])).is_some()); + } + + #[test] + fn dm_blocked_for_unlisted_user() { + let p = post("user2", "chan1", Some("D")); + assert!(build_message_from_mattermost_post(&p, "bot", Some("team1"), &dm_perms(&["user1"])).is_none()); + } + + #[test] + fn dm_filter_does_not_affect_channel_messages() { + // channel messages (type "O") pass even with empty dm_allowed_users + let p = post("user1", "chan1", Some("O")); + assert!(build_message_from_mattermost_post(&p, "bot", Some("team1"), &no_filters()).is_some()); + } + + #[test] + fn dm_conversation_id_uses_user_id() { + let p = post("user1", "chan1", Some("D")); + let msg = build_message_from_mattermost_post(&p, "bot", Some("team1"), &dm_perms(&["user1"])).unwrap(); + assert!(msg.conversation_id.contains(":dm:user1"), "expected DM conversation_id, got {}", msg.conversation_id); + } + + #[test] + fn channel_conversation_id_uses_channel_id() { + let p = post("user1", "chan1", Some("O")); + let msg = build_message_from_mattermost_post(&p, "bot", Some("team1"), &no_filters()).unwrap(); + assert!(msg.conversation_id.contains(":chan1"), "expected channel conversation_id, got {}", msg.conversation_id); + assert!(!msg.conversation_id.contains(":dm:"), "should not be DM, got {}", msg.conversation_id); + } + + #[test] + fn message_id_metadata_is_set() { + let p = post("user1", "chan1", None); + let msg = build_message_from_mattermost_post(&p, "bot", Some("team1"), &no_filters()).unwrap(); + assert!(msg.metadata.contains_key(crate::metadata_keys::MESSAGE_ID)); + } + + // --- FN4: bot mention detection --- + + #[test] + fn mention_sets_flag_when_at_bot_username_in_message() { + let mut p = post("user1", "chan1", Some("O")); + p.message = "hey @botuser can you help?".into(); + let msg = build_message_from_mattermost_post(&p, "bot", Some("team1"), &no_filters()).unwrap(); + assert_eq!( + msg.metadata.get("mattermost_mentions_or_replies_to_bot").and_then(|v| v.as_bool()), + Some(true), + ); + } + + #[test] + fn no_mention_flag_when_bot_not_mentioned() { + let p = post("user1", "chan1", Some("O")); + let msg = build_message_from_mattermost_post(&p, "bot", Some("team1"), &no_filters()).unwrap(); + assert_eq!( + msg.metadata.get("mattermost_mentions_or_replies_to_bot").and_then(|v| v.as_bool()), + Some(false), + ); + } + + // --- FN1: display name / sender_display_name --- + + #[test] + fn sender_display_name_set_when_provided() { + let p = post("user1", "chan1", Some("O")); + let msg = build_message_from_mattermost_post_named(&p, "bot", "botuser", Some("team1"), &no_filters(), Some("Alice"), None).unwrap(); + assert_eq!( + msg.metadata.get("sender_display_name").and_then(|v| v.as_str()), + Some("Alice"), + ); + assert_eq!(msg.formatted_author.as_deref(), Some("Alice")); + } + + #[test] + fn sender_display_name_absent_when_not_provided() { + let p = post("user1", "chan1", Some("O")); + let msg = build_message_from_mattermost_post(&p, "bot", Some("team1"), &no_filters()).unwrap(); + assert!(msg.metadata.get("sender_display_name").is_none()); + assert!(msg.formatted_author.is_none()); + } + + // --- FN2: channel name --- + + #[test] + fn channel_name_metadata_set_when_provided() { + let p = post("user1", "chan1", Some("O")); + let msg = build_message_from_mattermost_post_named(&p, "bot", "botuser", Some("team1"), &no_filters(), None, Some("general")).unwrap(); + assert_eq!( + msg.metadata.get("mattermost_channel_name").and_then(|v| v.as_str()), + Some("general"), + ); + assert_eq!( + msg.metadata.get(crate::metadata_keys::CHANNEL_NAME).and_then(|v| v.as_str()), + Some("general"), + ); + } + + // --- SEC2: sanitize_reaction_name --- + + #[test] + fn sanitize_reaction_unicode_to_shortcode() { + assert_eq!(sanitize_reaction_name("\u{1F44D}"), "+1"); + } + + #[test] + fn sanitize_reaction_colon_wrapped() { + assert_eq!(sanitize_reaction_name(":thumbsup:"), "thumbsup"); + } + + #[test] + fn sanitize_reaction_plain_lowercased() { + assert_eq!(sanitize_reaction_name("ThumbsUp"), "thumbsup"); + } + + // --- split_message --- + + #[test] + fn test_split_message_short() { + let result = split_message("hello", 100); + assert_eq!(result, vec!["hello"]); + } + + #[test] + fn test_split_message_exact_boundary() { + let text = "a".repeat(100); + let result = split_message(&text, 100); + assert_eq!(result.len(), 1); + } + + #[test] + fn test_split_message_on_newline() { + let text = "line1\nline2\nline3"; + let result = split_message(text, 8); + assert_eq!(result, vec!["line1", "line2", "line3"]); + } + + #[test] + fn test_split_message_on_space() { + let text = "word1 word2 word3"; + let result = split_message(text, 12); + assert_eq!(result, vec!["word1 word2", "word3"]); + } + + #[test] + fn test_split_message_forced_break() { + let text = "abcdefghijklmnopqrstuvwxyz"; + let result = split_message(text, 10); + assert_eq!(result, vec!["abcdefghij", "klmnopqrst", "uvwxyz"]); + } +} diff --git a/src/messaging/target.rs b/src/messaging/target.rs index 17676afab..6360cb070 100644 --- a/src/messaging/target.rs +++ b/src/messaging/target.rs @@ -140,6 +140,32 @@ pub fn resolve_broadcast_target(channel: &ChannelInfo) -> Option { + let adapter = extract_mattermost_adapter_from_channel_id(&channel.id); + let raw_target = if let Some(channel_id) = channel + .platform_meta + .as_ref() + .and_then(|meta| meta.get("mattermost_channel_id")) + .and_then(json_value_to_string) + { + channel_id + } else { + // conversation id: mattermost:{team_id}:{channel_id} + // or mattermost:{team_id}:dm:{user_id} + // Named instance: mattermost:{instance}:{team_id}:{channel_id} + // Named DM: mattermost:{instance}:{team_id}:dm:{user_id} + let parts: Vec<&str> = channel.id.split(':').collect(); + match parts.as_slice() { + [_, _team_id, "dm", user_id] => format!("dm:{user_id}"), + [_, _team_id, channel_id] => (*channel_id).to_string(), + [_, _instance, _team_id, "dm", user_id] => format!("dm:{user_id}"), + [_, _instance, _team_id, channel_id] => (*channel_id).to_string(), + _ => return None, + } + }; + let target = normalize_mattermost_target(&raw_target)?; + return Some(BroadcastTarget { adapter, target }); + } _ => return None, }; @@ -163,6 +189,7 @@ pub fn normalize_target(adapter: &str, raw_target: &str) -> Option { "telegram" => normalize_telegram_target(trimmed), "twitch" => normalize_twitch_target(trimmed), "email" => normalize_email_target(trimmed), + "mattermost" => normalize_mattermost_target(trimmed), // Webchat targets are full conversation IDs (e.g. "portal:chat:main") "webchat" => Some(trimmed.to_string()), "signal" => normalize_signal_target(trimmed), @@ -238,6 +265,64 @@ fn normalize_twitch_target(raw_target: &str) -> Option { } } +/// Extract the runtime adapter key from a Mattermost conversation ID. +/// +/// Mattermost conversation IDs encode whether a named instance was used: +/// - Default channel: `mattermost:{team_id}:{channel_id}` (3 parts) → `"mattermost"` +/// - Default DM: `mattermost:{team_id}:dm:{user_id}` (4 parts, 3rd = `"dm"`) → `"mattermost"` +/// - Named channel: `mattermost:{instance}:{team_id}:{channel_id}` (4 parts, last ≠ `"dm"`) → `"mattermost:{instance}"` +/// - Named DM: `mattermost:{instance}:{team_id}:dm:{user_id}` (5 parts) → `"mattermost:{instance}"` +fn extract_mattermost_adapter_from_channel_id(channel_id: &str) -> String { + // Named instance conv IDs: "mattermost:{instance}:{team_id}:{channel_id}" (4 parts) + // or: "mattermost:{instance}:{team_id}:dm:{user_id}" (5 parts) + // Default conv IDs: "mattermost:{team_id}:{channel_id}" (3 parts) + // or: "mattermost:{team_id}:dm:{user_id}" (4 parts, 3rd part = "dm") + let parts: Vec<&str> = channel_id.split(':').collect(); + match parts.as_slice() { + // Default DM: mattermost:{team_id}:dm:{user_id} — must come before the named-channel arm + ["mattermost", _, "dm", _] => "mattermost".to_string(), + // Named DM: mattermost:{instance}:{team_id}:dm:{user_id} + ["mattermost", instance, _, "dm", _] => format!("mattermost:{instance}"), + // Named channel: mattermost:{instance}:{team_id}:{channel_id} + ["mattermost", instance, _, _] => format!("mattermost:{instance}"), + _ => "mattermost".to_string(), + } +} + +/// Normalize a raw Mattermost target string to a bare channel ID or `dm:{user_id}`. +/// +/// Accepts any of the following forms (with or without a leading `mattermost:` prefix): +/// - `channel_id` → `channel_id` +/// - `dm:{user_id}` → `dm:{user_id}` +/// - `{team_id}:{channel_id}` → `channel_id` +/// - `{team_id}:dm:{user_id}` → `dm:{user_id}` +/// - `{instance}:{team_id}:{channel_id}` → `channel_id` +/// - `{instance}:{team_id}:dm:{user_id}` → `dm:{user_id}` +/// +/// Returns `None` if the input is empty or does not match any recognised shape. +fn normalize_mattermost_target(raw_target: &str) -> Option { + let target = strip_repeated_prefix(raw_target, "mattermost"); + // Parse out just the channel_id or dm:{user_id}, discarding any team/instance prefix. + match target.split(':').collect::>().as_slice() { + // Already bare: "channel_id" (but not the bare word "dm" without a user_id) + [channel_id] if !channel_id.is_empty() && *channel_id != "dm" => Some((*channel_id).to_string()), + ["dm", user_id] if !user_id.is_empty() => Some(format!("dm:{user_id}")), + // With team prefix: "team_id:channel_id" or "team_id:dm:user_id" + [_team_id, channel_id] if !channel_id.is_empty() && *channel_id != "dm" => { + Some((*channel_id).to_string()) + } + [_team_id, "dm", user_id] if !user_id.is_empty() => Some(format!("dm:{user_id}")), + // With instance+team prefix: "instance:team_id:channel_id" or "instance:team_id:dm:user_id" + [_instance, _team_id, channel_id] if !channel_id.is_empty() && *channel_id != "dm" => { + Some((*channel_id).to_string()) + } + [_instance, _team_id, "dm", user_id] if !user_id.is_empty() => { + Some(format!("dm:{user_id}")) + } + _ => None, + } +} + fn normalize_email_target(raw_target: &str) -> Option { let target = strip_repeated_prefix(raw_target, "email").trim(); if target.is_empty() { diff --git a/src/secrets/store.rs b/src/secrets/store.rs index 3c4bcc19a..0e9ca9218 100644 --- a/src/secrets/store.rs +++ b/src/secrets/store.rs @@ -1101,8 +1101,8 @@ pub trait SystemSecrets { /// here. pub fn system_secret_registry() -> Vec<&'static SecretField> { use crate::config::{ - DefaultsConfig, DiscordConfig, EmailConfig, LlmConfig, SignalConfig, SlackConfig, - TelegramConfig, TwitchConfig, + DefaultsConfig, DiscordConfig, EmailConfig, LlmConfig, MattermostConfig, SignalConfig, + SlackConfig, TelegramConfig, TwitchConfig, }; let mut fields = Vec::new(); @@ -1117,6 +1117,7 @@ pub fn system_secret_registry() -> Vec<&'static SecretField> { fields.extend(TwitchConfig::secret_fields()); fields.extend(EmailConfig::secret_fields()); fields.extend(SignalConfig::secret_fields()); + fields.extend(MattermostConfig::secret_fields()); fields }