diff --git a/crates/openfang-channels/src/discord.rs b/crates/openfang-channels/src/discord.rs index 0c52b6ce2..32ec3c9b4 100644 --- a/crates/openfang-channels/src/discord.rs +++ b/crates/openfang-channels/src/discord.rs @@ -112,6 +112,48 @@ impl DiscordAdapter { Ok(()) } + /// Send a file to a Discord channel via multipart upload. + async fn api_send_file( + &self, + channel_id: &str, + file_url: &str, + filename: &str, + caption: Option<&str>, + ) -> Result<(), Box> { + let url = format!("{DISCORD_API_BASE}/channels/{channel_id}/messages"); + + let file_bytes = crate::media_utils::download_url(&self.client, file_url) + .await + .ok_or("Failed to download file for sending")?; + + let part = reqwest::multipart::Part::bytes(file_bytes.to_vec()) + .file_name(filename.to_string()) + .mime_str("application/octet-stream")?; + + let mut form = reqwest::multipart::Form::new().part("files[0]", part); + + if let Some(cap) = caption { + form = form.text( + "payload_json", + serde_json::json!({"content": cap}).to_string(), + ); + } + + let resp = self + .client + .post(&url) + .header("Authorization", format!("Bot {}", self.token.as_str())) + .multipart(form) + .send() + .await?; + + if !resp.status().is_success() { + let body_text = resp.text().await.unwrap_or_default(); + warn!("Discord sendFile failed: {body_text}"); + } + Ok(()) + } + /// Send typing indicator to a Discord channel. async fn api_send_typing(&self, channel_id: &str) -> Result<(), Box> { let url = format!("{DISCORD_API_BASE}/channels/{channel_id}/typing"); @@ -145,6 +187,7 @@ impl ChannelAdapter for DiscordAdapter { let (tx, rx) = mpsc::channel::(256); let token = self.token.clone(); + let client = self.client.clone(); let intents = self.intents; let allowed_guilds = self.allowed_guilds.clone(); let bot_user_id = self.bot_user_id.clone(); @@ -307,7 +350,7 @@ impl ChannelAdapter for DiscordAdapter { "MESSAGE_CREATE" | "MESSAGE_UPDATE" => { if let Some(msg) = - parse_discord_message(d, &bot_user_id, &allowed_guilds) + parse_discord_message(d, &bot_user_id, &allowed_guilds, &client) .await { debug!( @@ -400,6 +443,14 @@ impl ChannelAdapter for DiscordAdapter { ChannelContent::Text(text) => { self.api_send_message(channel_id, &text).await?; } + ChannelContent::File { url, filename } => { + self.api_send_file(channel_id, &url, &filename, None) + .await?; + } + ChannelContent::Image { url, caption } => { + self.api_send_file(channel_id, &url, "image.jpg", caption.as_deref()) + .await?; + } _ => { self.api_send_message(channel_id, "(Unsupported content type)") .await?; @@ -419,10 +470,14 @@ impl ChannelAdapter for DiscordAdapter { } /// Parse a Discord MESSAGE_CREATE or MESSAGE_UPDATE payload into a `ChannelMessage`. +/// +/// Handles text messages, slash commands, and attachments (images, files). +/// Attachments are downloaded and processed: images via Gemini Vision, text files extracted. async fn parse_discord_message( d: &serde_json::Value, bot_user_id: &Arc>>, allowed_guilds: &[u64], + client: &reqwest::Client, ) -> Option { let author = d.get("author")?; let author_id = author["id"].as_str()?; @@ -450,7 +505,11 @@ async fn parse_discord_message( } let content_text = d["content"].as_str().unwrap_or(""); - if content_text.is_empty() { + let attachments = d["attachments"].as_array(); + let has_attachments = attachments.map_or(false, |a| !a.is_empty()); + + // Skip if no content AND no attachments + if content_text.is_empty() && !has_attachments { return None; } @@ -470,12 +529,51 @@ async fn parse_discord_message( .map(|dt| dt.with_timezone(&chrono::Utc)) .unwrap_or_else(chrono::Utc::now); - // Parse commands (messages starting with /) - let content = if content_text.starts_with('/') { - let parts: Vec<&str> = content_text.splitn(2, ' ').collect(); - let cmd_name = &parts[0][1..]; - let args = if parts.len() > 1 { - parts[1].split_whitespace().map(String::from).collect() + // Process attachments into text descriptions + let mut parts: Vec = Vec::new(); + + if has_attachments { + for att in attachments.unwrap() { + let att_url = att["url"].as_str().unwrap_or(""); + let att_filename = att["filename"].as_str().unwrap_or("file"); + let att_content_type = att["content_type"].as_str().unwrap_or("application/octet-stream"); + let att_size = att["size"].as_u64().unwrap_or(0); + + if att_url.is_empty() { + continue; + } + + info!( + "Discord: attachment '{}' ({} bytes) from {}", + att_filename, att_size, display_name + ); + + let description = crate::media_utils::process_attachment_to_text( + client, + att_url, + att_filename, + att_content_type, + att_size, + content_text, + ) + .await; + parts.push(description); + } + } + + // Build final content + let content = if !parts.is_empty() { + // Has attachments — combine text + attachment descriptions + if !content_text.is_empty() && !has_attachments_only_images(attachments) { + parts.insert(0, content_text.to_string()); + } + ChannelContent::Text(parts.join("\n")) + } else if content_text.starts_with('/') { + // Parse commands (messages starting with /) + let cmd_parts: Vec<&str> = content_text.splitn(2, ' ').collect(); + let cmd_name = &cmd_parts[0][1..]; + let args = if cmd_parts.len() > 1 { + cmd_parts[1].split_whitespace().map(String::from).collect() } else { vec![] }; @@ -504,6 +602,17 @@ async fn parse_discord_message( }) } +/// Check if all attachments are images (to avoid duplicating text content as caption). +fn has_attachments_only_images(attachments: Option<&Vec>) -> bool { + attachments.map_or(true, |atts| { + atts.iter().all(|a| { + let ct = a["content_type"].as_str().unwrap_or(""); + let fname = a["filename"].as_str().unwrap_or(""); + crate::media_utils::is_image(fname, ct) + }) + }) +} + #[cfg(test)] mod tests { use super::*; @@ -524,7 +633,7 @@ mod tests { "timestamp": "2024-01-01T00:00:00+00:00" }); - let msg = parse_discord_message(&d, &bot_id, &[]).await.unwrap(); + let msg = parse_discord_message(&d, &bot_id, &[], &reqwest::Client::new()).await.unwrap(); assert_eq!(msg.channel, ChannelType::Discord); assert_eq!(msg.sender.display_name, "alice"); assert_eq!(msg.sender.platform_id, "ch1"); @@ -546,7 +655,7 @@ mod tests { "timestamp": "2024-01-01T00:00:00+00:00" }); - let msg = parse_discord_message(&d, &bot_id, &[]).await; + let msg = parse_discord_message(&d, &bot_id, &[], &reqwest::Client::new()).await; assert!(msg.is_none()); } @@ -566,7 +675,7 @@ mod tests { "timestamp": "2024-01-01T00:00:00+00:00" }); - let msg = parse_discord_message(&d, &bot_id, &[]).await; + let msg = parse_discord_message(&d, &bot_id, &[], &reqwest::Client::new()).await; assert!(msg.is_none()); } @@ -587,11 +696,11 @@ mod tests { }); // Not in allowed guilds - let msg = parse_discord_message(&d, &bot_id, &[111, 222]).await; + let msg = parse_discord_message(&d, &bot_id, &[111, 222], &reqwest::Client::new()).await; assert!(msg.is_none()); // In allowed guilds - let msg = parse_discord_message(&d, &bot_id, &[999]).await; + let msg = parse_discord_message(&d, &bot_id, &[999], &reqwest::Client::new()).await; assert!(msg.is_some()); } @@ -610,7 +719,7 @@ mod tests { "timestamp": "2024-01-01T00:00:00+00:00" }); - let msg = parse_discord_message(&d, &bot_id, &[]).await.unwrap(); + let msg = parse_discord_message(&d, &bot_id, &[], &reqwest::Client::new()).await.unwrap(); match &msg.content { ChannelContent::Command { name, args } => { assert_eq!(name, "agent"); @@ -635,7 +744,7 @@ mod tests { "timestamp": "2024-01-01T00:00:00+00:00" }); - let msg = parse_discord_message(&d, &bot_id, &[]).await; + let msg = parse_discord_message(&d, &bot_id, &[], &reqwest::Client::new()).await; assert!(msg.is_none()); } @@ -654,7 +763,7 @@ mod tests { "timestamp": "2024-01-01T00:00:00+00:00" }); - let msg = parse_discord_message(&d, &bot_id, &[]).await.unwrap(); + let msg = parse_discord_message(&d, &bot_id, &[], &reqwest::Client::new()).await.unwrap(); assert_eq!(msg.sender.display_name, "alice#1234"); } @@ -676,7 +785,7 @@ mod tests { }); // MESSAGE_UPDATE uses the same parse function as MESSAGE_CREATE - let msg = parse_discord_message(&d, &bot_id, &[]).await.unwrap(); + let msg = parse_discord_message(&d, &bot_id, &[], &reqwest::Client::new()).await.unwrap(); assert_eq!(msg.channel, ChannelType::Discord); assert!( matches!(msg.content, ChannelContent::Text(ref t) if t == "Edited message content") diff --git a/crates/openfang-channels/src/lib.rs b/crates/openfang-channels/src/lib.rs index 978c202e7..e8563cc3b 100644 --- a/crates/openfang-channels/src/lib.rs +++ b/crates/openfang-channels/src/lib.rs @@ -7,6 +7,7 @@ pub mod bridge; pub mod discord; pub mod email; pub mod formatter; +pub mod media_utils; pub mod google_chat; pub mod irc; pub mod matrix; diff --git a/crates/openfang-channels/src/media_utils.rs b/crates/openfang-channels/src/media_utils.rs new file mode 100644 index 000000000..481030586 --- /dev/null +++ b/crates/openfang-channels/src/media_utils.rs @@ -0,0 +1,236 @@ +//! Shared media processing utilities for channel adapters. +//! +//! Provides Gemini Vision image recognition, text file detection, +//! and HTTP download helpers used by Telegram, Discord, and other adapters. + +use std::sync::LazyLock; +use std::time::Duration; +use tracing::warn; + +/// Vision model for image recognition (override via `VISION_MODEL` env var). +static VISION_MODEL: LazyLock = LazyLock::new(|| { + std::env::var("VISION_MODEL").unwrap_or_else(|_| "gemini-2.5-flash".to_string()) +}); + +/// Vision API base URL (override via `VISION_API_BASE` env var). +static VISION_API_BASE: LazyLock = LazyLock::new(|| { + std::env::var("VISION_API_BASE").unwrap_or_else(|_| { + "https://generativelanguage.googleapis.com/v1beta".to_string() + }) +}); + +/// Recognize image content using Gemini Vision API. +/// +/// Returns a human-readable description of the image. On failure, returns +/// an error string wrapped in brackets (e.g., `[Image recognition failed: ...]`). +pub async fn recognize_image_gemini( + client: &reqwest::Client, + image_data: &[u8], + user_text: &str, +) -> String { + let gemini_key = match std::env::var("GEMINI_API_KEY") { + Ok(key) => key, + Err(_) => { + warn!("GEMINI_API_KEY not set, cannot perform image recognition"); + return "[Image recognition unavailable: GEMINI_API_KEY not configured]".to_string(); + } + }; + + // Detect MIME type from magic bytes + let mime = detect_image_mime(image_data); + + use base64::Engine; + let b64_data = base64::engine::general_purpose::STANDARD.encode(image_data); + + let prompt = if user_text.is_empty() { + "Describe this image in detail." + } else { + user_text + }; + + let payload = serde_json::json!({ + "contents": [{ + "parts": [ + {"text": prompt}, + { + "inline_data": { + "mime_type": mime, + "data": b64_data, + } + } + ] + }], + "generationConfig": { + "temperature": 0.4, + "maxOutputTokens": 2048, + } + }); + + let url = format!( + "{}/models/{}:generateContent?key={gemini_key}", + *VISION_API_BASE, *VISION_MODEL + ); + + match client + .post(&url) + .json(&payload) + .timeout(Duration::from_secs(60)) + .send() + .await + { + Ok(resp) => { + if resp.status().is_success() { + match resp.json::().await { + Ok(result) => { + if let Some(candidates) = result["candidates"].as_array() { + if let Some(first) = candidates.first() { + if let Some(parts) = first["content"]["parts"].as_array() { + let text: String = parts + .iter() + .filter_map(|p| p["text"].as_str()) + .collect::>() + .join(" "); + if !text.is_empty() { + return text; + } + } + } + } + "[Image recognition returned no result]".to_string() + } + Err(e) => { + warn!("Gemini Vision parse error: {e}"); + format!("[Image recognition parse error: {e}]") + } + } + } else { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + warn!( + "Gemini Vision error [{status}]: {}", + openfang_types::truncate_str(&body, 200) + ); + format!("[Image recognition failed: HTTP {status}]") + } + } + Err(e) => { + warn!("Gemini Vision request error: {e}"); + format!("[Image recognition request error: {e}]") + } + } +} + +/// Detect image MIME type from magic bytes. +pub fn detect_image_mime(data: &[u8]) -> &'static str { + if data.starts_with(b"\x89PNG\r\n\x1a\n") { + "image/png" + } else if data.starts_with(b"GIF8") { + "image/gif" + } else if data.len() >= 12 && &data[..4] == b"RIFF" && &data[8..12] == b"WEBP" { + "image/webp" + } else { + "image/jpeg" + } +} + +/// Check if a file is text-based by extension or MIME type. +pub fn is_text_file(filename: &str, mime_type: &str) -> bool { + const TEXT_EXTENSIONS: &[&str] = &[ + ".txt", ".md", ".py", ".js", ".ts", ".json", ".csv", ".xml", ".html", ".log", ".yaml", + ".yml", ".toml", ".ini", ".cfg", ".conf", ".sh", ".bat", ".rs", ".go", ".java", ".c", + ".cpp", ".h", ".hpp", ".css", ".sql", ".rb", ".php", ".lua", + ]; + let lower = filename.to_lowercase(); + TEXT_EXTENSIONS.iter().any(|ext| lower.ends_with(ext)) + || mime_type.starts_with("text/") + || mime_type == "application/json" + || mime_type == "application/xml" +} + +/// Check if a MIME type or filename represents an image. +pub fn is_image(filename: &str, content_type: &str) -> bool { + content_type.starts_with("image/") + || filename + .to_lowercase() + .ends_with(".jpg") + || filename.to_lowercase().ends_with(".jpeg") + || filename.to_lowercase().ends_with(".png") + || filename.to_lowercase().ends_with(".gif") + || filename.to_lowercase().ends_with(".webp") + || filename.to_lowercase().ends_with(".bmp") +} + +/// Download a file from a URL, returning the bytes on success. +pub async fn download_url(client: &reqwest::Client, url: &str) -> Option> { + match client + .get(url) + .timeout(Duration::from_secs(30)) + .send() + .await + { + Ok(resp) => { + if resp.status().is_success() { + resp.bytes().await.ok().map(|b| b.to_vec()) + } else { + warn!("Download failed [{}]: {}", resp.status(), url); + None + } + } + Err(e) => { + warn!("Download error for {}: {e}", url); + None + } + } +} + +/// Process an attachment (image or file) into a text description. +/// +/// - Images: recognized via Gemini Vision +/// - Text files (<50KB): content extracted +/// - Other files: metadata reported +pub async fn process_attachment_to_text( + client: &reqwest::Client, + url: &str, + filename: &str, + content_type: &str, + file_size: u64, + caption: &str, +) -> String { + let bytes = match download_url(client, url).await { + Some(b) => b, + None => { + return format!( + "[File {filename} download failed (size: {file_size} bytes)]" + ); + } + }; + + if is_image(filename, content_type) { + let recognition = recognize_image_gemini(client, &bytes, caption).await; + if caption.is_empty() { + format!("[User sent an image: {filename}]\nImage content: {recognition}") + } else { + format!("[User sent an image: {filename}, saying: {caption}]\nImage content: {recognition}") + } + } else if is_text_file(filename, content_type) && bytes.len() < 50_000 { + match std::str::from_utf8(&bytes) { + Ok(content) => { + let truncated = openfang_types::truncate_str(content, 3000); + let mut text = format!( + "[User sent file: {filename} (size: {file_size} bytes)]\nFile content:\n```\n{truncated}\n```" + ); + if content.len() > 3000 { + text.push_str(&format!("\n... ({} chars total, truncated)", content.len())); + } + text + } + Err(_) => { + format!("[User sent file: {filename} (size: {file_size} bytes, binary file)]") + } + } + } else { + format!( + "[User sent file: {filename} (size: {file_size} bytes, type: {content_type})]" + ) + } +} diff --git a/crates/openfang-channels/src/telegram.rs b/crates/openfang-channels/src/telegram.rs index ddde03921..fb3334f94 100644 --- a/crates/openfang-channels/src/telegram.rs +++ b/crates/openfang-channels/src/telegram.rs @@ -16,6 +16,30 @@ use tokio::sync::{mpsc, watch}; use tracing::{debug, error, info, warn}; use zeroize::Zeroizing; +use std::sync::LazyLock; + +/// Groq STT model (override via `GROQ_STT_MODEL` env var). +static GROQ_STT_MODEL: LazyLock = LazyLock::new(|| { + std::env::var("GROQ_STT_MODEL").unwrap_or_else(|_| "whisper-large-v3-turbo".to_string()) +}); + +/// Groq STT API URL (override via `GROQ_STT_URL` env var). +static GROQ_STT_URL: LazyLock = LazyLock::new(|| { + std::env::var("GROQ_STT_URL") + .unwrap_or_else(|_| "https://api.groq.com/openai/v1/audio/transcriptions".to_string()) +}); + +/// OpenAI STT model (override via `OPENAI_STT_MODEL` env var). +static OPENAI_STT_MODEL: LazyLock = LazyLock::new(|| { + std::env::var("OPENAI_STT_MODEL").unwrap_or_else(|_| "whisper-1".to_string()) +}); + +/// OpenAI STT API URL (override via `OPENAI_STT_URL` env var). +static OPENAI_STT_URL: LazyLock = LazyLock::new(|| { + std::env::var("OPENAI_STT_URL") + .unwrap_or_else(|_| "https://api.openai.com/v1/audio/transcriptions".to_string()) +}); + /// Maximum backoff duration on API failures. const MAX_BACKOFF: Duration = Duration::from_secs(60); /// Initial backoff duration on API failures. @@ -97,6 +121,44 @@ impl TelegramAdapter { Ok(()) } + /// Send a file via `sendDocument` on the Telegram API. + async fn api_send_document( + &self, + chat_id: i64, + file_url: &str, + filename: &str, + caption: Option<&str>, + ) -> Result<(), Box> { + let url = format!( + "https://api.telegram.org/bot{}/sendDocument", + self.token.as_str() + ); + + // Download the file first, then upload via multipart + let file_bytes = crate::media_utils::download_url(&self.client, file_url) + .await + .ok_or("Failed to download file for sending")?; + + let part = reqwest::multipart::Part::bytes(file_bytes.to_vec()) + .file_name(filename.to_string()) + .mime_str("application/octet-stream")?; + + let mut form = reqwest::multipart::Form::new() + .text("chat_id", chat_id.to_string()) + .part("document", part); + + if let Some(cap) = caption { + form = form.text("caption", cap.to_string()); + } + + let resp = self.client.post(&url).multipart(form).send().await?; + if !resp.status().is_success() { + let body_text = resp.text().await.unwrap_or_default(); + warn!("Telegram sendDocument failed: {body_text}"); + } + Ok(()) + } + /// Call `sendChatAction` to show "typing..." indicator. async fn api_send_typing(&self, chat_id: i64) -> Result<(), Box> { let url = format!( @@ -243,10 +305,21 @@ impl ChannelAdapter for TelegramAdapter { offset = Some(update_id + 1); } - // Parse the message + // Parse text messages first, then try multimedia handlers let msg = match parse_telegram_update(update, &allowed_users) { Some(m) => m, - None => continue, // filtered out or unparseable + None => { + // Not a text message — try voice, photo, document handlers + if let Some(m) = try_handle_voice(update, &allowed_users, &client, &token).await { + m + } else if let Some(m) = try_handle_photo(update, &allowed_users, &client, &token).await { + m + } else if let Some(m) = try_handle_document(update, &allowed_users, &client, &token).await { + m + } else { + continue; + } + } }; debug!( @@ -285,6 +358,14 @@ impl ChannelAdapter for TelegramAdapter { ChannelContent::Text(text) => { self.api_send_message(chat_id, &text).await?; } + ChannelContent::File { url, filename } => { + self.api_send_document(chat_id, &url, &filename, None) + .await?; + } + ChannelContent::Image { url, caption } => { + self.api_send_document(chat_id, &url, "image.jpg", caption.as_deref()) + .await?; + } _ => { self.api_send_message(chat_id, "(Unsupported content type)") .await?; @@ -388,6 +469,378 @@ fn parse_telegram_update( }) } +/// Handle voice messages from Telegram by downloading and transcribing with Groq Whisper. +/// +/// Returns a `ChannelMessage` with the transcription as text, or `None` if not a voice message +/// or transcription fails. +async fn try_handle_voice( + update: &serde_json::Value, + allowed_users: &[i64], + client: &reqwest::Client, + token: &str, +) -> Option { + let message = update.get("message")?; + let voice = message.get("voice")?; + let from = message.get("from")?; + let user_id = from["id"].as_i64()?; + + if !allowed_users.is_empty() && !allowed_users.contains(&user_id) { + return None; + } + + let chat_id = message["chat"]["id"].as_i64()?; + let first_name = from["first_name"].as_str().unwrap_or("Unknown"); + let last_name = from["last_name"].as_str().unwrap_or(""); + let display_name = if last_name.is_empty() { + first_name.to_string() + } else { + format!("{first_name} {last_name}") + }; + let chat_type = message["chat"]["type"].as_str().unwrap_or("private"); + let is_group = chat_type == "group" || chat_type == "supergroup"; + let message_id = message["message_id"].as_i64().unwrap_or(0); + let timestamp = message["date"] + .as_i64() + .and_then(|ts| chrono::DateTime::from_timestamp(ts, 0)) + .unwrap_or_else(chrono::Utc::now); + + let file_id = voice["file_id"].as_str()?; + let _duration = voice["duration"].as_u64().unwrap_or(0); + info!("Telegram: voice message from {display_name}, downloading..."); + + // Step 1: Get file path via Telegram getFile API + let get_file_url = format!("https://api.telegram.org/bot{token}/getFile?file_id={file_id}"); + let file_resp: serde_json::Value = client.get(&get_file_url).send().await.ok()?.json().await.ok()?; + let file_path = file_resp["result"]["file_path"].as_str()?; + + // Step 2: Download the audio file + let download_url = format!("https://api.telegram.org/file/bot{token}/{file_path}"); + let audio_bytes = client.get(&download_url).send().await.ok()?.bytes().await.ok()?; + + // Step 3: Transcribe with Groq Whisper (or OpenAI Whisper as fallback) + let transcription = transcribe_audio(client, &audio_bytes, file_path).await; + let text = match transcription { + Some(t) if !t.is_empty() => t, + _ => { + warn!("Telegram: voice transcription failed for {display_name}"); + "[Voice message received, but transcription failed]".to_string() + } + }; + + info!("Telegram: voice transcribed for {display_name}: {}", openfang_types::truncate_str(&text, 100)); + + Some(ChannelMessage { + channel: ChannelType::Telegram, + platform_message_id: message_id.to_string(), + sender: ChannelUser { + platform_id: chat_id.to_string(), + display_name, + openfang_user: None, + }, + content: ChannelContent::Text(text), + target_agent: None, + timestamp, + is_group, + thread_id: None, + metadata: HashMap::new(), + }) +} + +/// Transcribe audio bytes using Groq Whisper or OpenAI Whisper. +async fn transcribe_audio( + client: &reqwest::Client, + audio_bytes: &[u8], + filename: &str, +) -> Option { + // Determine MIME type and normalize filename for API compatibility. + // Telegram uses .oga (Ogg Opus) which Groq doesn't recognize — rename to .ogg. + let (mime, upload_filename) = if filename.ends_with(".oga") || filename.ends_with(".ogg") { + ("audio/ogg", filename.replace(".oga", ".ogg")) + } else if filename.ends_with(".mp3") { + ("audio/mpeg", filename.to_string()) + } else if filename.ends_with(".wav") { + ("audio/wav", filename.to_string()) + } else if filename.ends_with(".m4a") { + ("audio/mp4", filename.to_string()) + } else { + ("audio/ogg", format!("{filename}.ogg")) + }; + + // Try Groq Whisper first (fast, free tier) + if let Ok(groq_key) = std::env::var("GROQ_API_KEY") { + let form = reqwest::multipart::Form::new() + .part( + "file", + reqwest::multipart::Part::bytes(audio_bytes.to_vec()) + .file_name(upload_filename.clone()) + .mime_str(mime) + .ok()?, + ) + .text("model", GROQ_STT_MODEL.as_str()) + .text("response_format", "json"); + + match client + .post(GROQ_STT_URL.as_str()) + .bearer_auth(&groq_key) + .multipart(form) + .send() + .await + { + Ok(resp) => { + let status = resp.status(); + match resp.json::().await { + Ok(result) => { + if let Some(text) = result["text"].as_str() { + return Some(text.to_string()); + } + warn!("Groq Whisper: no 'text' in response (status={status}): {result}"); + } + Err(e) => { + warn!("Groq Whisper: failed to parse response (status={status}): {e}"); + } + } + } + Err(e) => { + warn!("Groq Whisper: request failed: {e}"); + } + } + } else { + warn!("GROQ_API_KEY not set, skipping Groq Whisper"); + } + + // Fallback: OpenAI Whisper + if let Ok(openai_key) = std::env::var("OPENAI_API_KEY") { + let form = reqwest::multipart::Form::new() + .part( + "file", + reqwest::multipart::Part::bytes(audio_bytes.to_vec()) + .file_name(upload_filename.clone()) + .mime_str(mime) + .ok()?, + ) + .text("model", OPENAI_STT_MODEL.as_str()) + .text("response_format", "json"); + + match client + .post(OPENAI_STT_URL.as_str()) + .bearer_auth(&openai_key) + .multipart(form) + .send() + .await + { + Ok(resp) => { + let status = resp.status(); + match resp.json::().await { + Ok(result) => { + if let Some(text) = result["text"].as_str() { + return Some(text.to_string()); + } + warn!("OpenAI Whisper: no 'text' in response (status={status}): {result}"); + } + Err(e) => { + warn!("OpenAI Whisper: failed to parse response (status={status}): {e}"); + } + } + } + Err(e) => { + warn!("OpenAI Whisper: request failed: {e}"); + } + } + } + + warn!("Voice transcription: all providers failed for file '{filename}' ({} bytes, mime={mime})", audio_bytes.len()); + None +} + +/// Handle photo messages from Telegram by downloading and recognizing with Gemini Vision. +/// +/// Picks the largest available photo size. Captions are used as the recognition prompt. +/// Returns a `ChannelMessage` with the recognition result as text. +async fn try_handle_photo( + update: &serde_json::Value, + allowed_users: &[i64], + client: &reqwest::Client, + token: &str, +) -> Option { + let message = update.get("message")?; + let photos = message.get("photo")?.as_array()?; + if photos.is_empty() { + return None; + } + let from = message.get("from")?; + let user_id = from["id"].as_i64()?; + + if !allowed_users.is_empty() && !allowed_users.contains(&user_id) { + return None; + } + + let chat_id = message["chat"]["id"].as_i64()?; + let first_name = from["first_name"].as_str().unwrap_or("Unknown"); + let last_name = from["last_name"].as_str().unwrap_or(""); + let display_name = if last_name.is_empty() { + first_name.to_string() + } else { + format!("{first_name} {last_name}") + }; + let chat_type = message["chat"]["type"].as_str().unwrap_or("private"); + let is_group = chat_type == "group" || chat_type == "supergroup"; + let message_id = message["message_id"].as_i64().unwrap_or(0); + let timestamp = message["date"] + .as_i64() + .and_then(|ts| chrono::DateTime::from_timestamp(ts, 0)) + .unwrap_or_else(chrono::Utc::now); + + // Pick the largest photo (last in the array — Telegram sorts by size ascending) + let photo = photos.last()?; + let file_id = photo["file_id"].as_str()?; + let caption = message["caption"].as_str().unwrap_or(""); + + info!("Telegram: photo from {display_name}, downloading..."); + + // Download via Telegram getFile API + let get_file_url = format!("https://api.telegram.org/bot{token}/getFile?file_id={file_id}"); + let file_resp: serde_json::Value = + client.get(&get_file_url).send().await.ok()?.json().await.ok()?; + let file_path = file_resp["result"]["file_path"].as_str()?; + let download_url = format!("https://api.telegram.org/file/bot{token}/{file_path}"); + let image_bytes = client.get(&download_url).send().await.ok()?.bytes().await.ok()?; + + // Recognize with Gemini Vision + let recognition = crate::media_utils::recognize_image_gemini(client, &image_bytes, caption).await; + + let text = if caption.is_empty() { + format!("[User sent a photo]\nImage content: {recognition}") + } else { + format!("[User sent a photo with caption: {caption}]\nImage content: {recognition}") + }; + + info!( + "Telegram: photo recognized for {display_name}: {}", + openfang_types::truncate_str(&text, 100) + ); + + Some(ChannelMessage { + channel: ChannelType::Telegram, + platform_message_id: message_id.to_string(), + sender: ChannelUser { + platform_id: chat_id.to_string(), + display_name, + openfang_user: None, + }, + content: ChannelContent::Text(text), + target_agent: None, + timestamp, + is_group, + thread_id: None, + metadata: HashMap::new(), + }) +} + +/// Handle document/file messages from Telegram. +/// +/// Downloads the file and: +/// - If image: recognizes with Gemini Vision +/// - If text file (<50KB): extracts content +/// - Otherwise: reports filename, size, and MIME type +async fn try_handle_document( + update: &serde_json::Value, + allowed_users: &[i64], + client: &reqwest::Client, + token: &str, +) -> Option { + let message = update.get("message")?; + let document = message.get("document")?; + let from = message.get("from")?; + let user_id = from["id"].as_i64()?; + + if !allowed_users.is_empty() && !allowed_users.contains(&user_id) { + return None; + } + + let chat_id = message["chat"]["id"].as_i64()?; + let first_name = from["first_name"].as_str().unwrap_or("Unknown"); + let last_name = from["last_name"].as_str().unwrap_or(""); + let display_name = if last_name.is_empty() { + first_name.to_string() + } else { + format!("{first_name} {last_name}") + }; + let chat_type = message["chat"]["type"].as_str().unwrap_or("private"); + let is_group = chat_type == "group" || chat_type == "supergroup"; + let message_id = message["message_id"].as_i64().unwrap_or(0); + let timestamp = message["date"] + .as_i64() + .and_then(|ts| chrono::DateTime::from_timestamp(ts, 0)) + .unwrap_or_else(chrono::Utc::now); + + let file_name = document["file_name"].as_str().unwrap_or("unknown_file"); + let file_size = document["file_size"].as_u64().unwrap_or(0); + let mime_type = document["mime_type"] + .as_str() + .unwrap_or("application/octet-stream"); + let file_id = document["file_id"].as_str()?; + let caption = message["caption"].as_str().unwrap_or(""); + + info!("Telegram: document '{file_name}' ({file_size} bytes) from {display_name}"); + + // Download via Telegram getFile API (bots can download files up to 20MB) + let get_file_url = format!("https://api.telegram.org/bot{token}/getFile?file_id={file_id}"); + let file_resp: serde_json::Value = + client.get(&get_file_url).send().await.ok()?.json().await.ok()?; + let file_path = file_resp["result"]["file_path"].as_str()?; + let download_url = format!("https://api.telegram.org/file/bot{token}/{file_path}"); + let file_bytes = client.get(&download_url).send().await.ok()?.bytes().await.ok()?; + + let mut text = + format!("[User sent file: {file_name} (size: {file_size} bytes, type: {mime_type})]"); + + // If it's an image sent as document (uncompressed), use Gemini Vision + if mime_type.starts_with("image/") { + let recognition = crate::media_utils::recognize_image_gemini(client, &file_bytes, caption).await; + text.push_str(&format!("\nImage content: {recognition}")); + } + // If it's a text-like file and small enough, extract content + else if crate::media_utils::is_text_file(file_name, mime_type) && file_bytes.len() < 50_000 { + match std::str::from_utf8(&file_bytes) { + Ok(content) => { + let truncated = openfang_types::truncate_str(content, 3000); + text.push_str(&format!("\nFile content:\n```\n{truncated}\n```")); + if content.len() > 3000 { + text.push_str(&format!("\n... ({} chars total, truncated)", content.len())); + } + } + Err(_) => { + text.push_str("\n[Binary file, cannot display text content]"); + } + } + } + + if !caption.is_empty() { + text.push_str(&format!("\nUser caption: {caption}")); + } + + info!( + "Telegram: document processed for {display_name}: {}", + openfang_types::truncate_str(&text, 100) + ); + + Some(ChannelMessage { + channel: ChannelType::Telegram, + platform_message_id: message_id.to_string(), + sender: ChannelUser { + platform_id: chat_id.to_string(), + display_name, + openfang_user: None, + }, + content: ChannelContent::Text(text), + target_agent: None, + timestamp, + is_group, + thread_id: None, + metadata: HashMap::new(), + }) +} + /// Calculate exponential backoff capped at MAX_BACKOFF. pub fn calculate_backoff(current: Duration) -> Duration { (current * 2).min(MAX_BACKOFF)