From b7dd79f695a4bb4722a6742aa03bc24a76a5e732 Mon Sep 17 00:00:00 2001 From: slysian Date: Mon, 2 Mar 2026 14:26:38 +0800 Subject: [PATCH 1/2] feat: add multimedia support for Telegram and Discord adapters Add comprehensive media handling for channel adapters: Telegram (receive): - Voice messages: download + transcribe via Groq Whisper (fallback: OpenAI Whisper) - Photos: download + recognize via Gemini Vision API - Documents: download + extract text content or recognize images Telegram & Discord (send): - File sending via multipart upload (sendDocument / Discord files API) - Image sending with optional captions Discord (receive): - Attachment processing: images via Gemini Vision, text files extracted - Mixed content (text + attachments) handled correctly Shared utilities (new media_utils module): - Gemini Vision image recognition - MIME type detection from magic bytes - Text file detection by extension/MIME - HTTP download helper - Attachment-to-text processing pipeline Closes #158 Co-Authored-By: Claude Opus 4.6 --- crates/openfang-channels/src/discord.rs | 143 ++++++- crates/openfang-channels/src/lib.rs | 1 + crates/openfang-channels/src/media_utils.rs | 222 ++++++++++ crates/openfang-channels/src/telegram.rs | 433 +++++++++++++++++++- 4 files changed, 780 insertions(+), 19 deletions(-) create mode 100644 crates/openfang-channels/src/media_utils.rs diff --git a/crates/openfang-channels/src/discord.rs b/crates/openfang-channels/src/discord.rs index 0c52b6ce2..32ec3c9b4 100644 --- a/crates/openfang-channels/src/discord.rs +++ b/crates/openfang-channels/src/discord.rs @@ -112,6 +112,48 @@ impl DiscordAdapter { Ok(()) } + /// Send a file to a Discord channel via multipart upload. + async fn api_send_file( + &self, + channel_id: &str, + file_url: &str, + filename: &str, + caption: Option<&str>, + ) -> Result<(), Box> { + let url = format!("{DISCORD_API_BASE}/channels/{channel_id}/messages"); + + let file_bytes = crate::media_utils::download_url(&self.client, file_url) + .await + .ok_or("Failed to download file for sending")?; + + let part = reqwest::multipart::Part::bytes(file_bytes.to_vec()) + .file_name(filename.to_string()) + .mime_str("application/octet-stream")?; + + let mut form = reqwest::multipart::Form::new().part("files[0]", part); + + if let Some(cap) = caption { + form = form.text( + "payload_json", + serde_json::json!({"content": cap}).to_string(), + ); + } + + let resp = self + .client + .post(&url) + .header("Authorization", format!("Bot {}", self.token.as_str())) + .multipart(form) + .send() + .await?; + + if !resp.status().is_success() { + let body_text = resp.text().await.unwrap_or_default(); + warn!("Discord sendFile failed: {body_text}"); + } + Ok(()) + } + /// Send typing indicator to a Discord channel. async fn api_send_typing(&self, channel_id: &str) -> Result<(), Box> { let url = format!("{DISCORD_API_BASE}/channels/{channel_id}/typing"); @@ -145,6 +187,7 @@ impl ChannelAdapter for DiscordAdapter { let (tx, rx) = mpsc::channel::(256); let token = self.token.clone(); + let client = self.client.clone(); let intents = self.intents; let allowed_guilds = self.allowed_guilds.clone(); let bot_user_id = self.bot_user_id.clone(); @@ -307,7 +350,7 @@ impl ChannelAdapter for DiscordAdapter { "MESSAGE_CREATE" | "MESSAGE_UPDATE" => { if let Some(msg) = - parse_discord_message(d, &bot_user_id, &allowed_guilds) + parse_discord_message(d, &bot_user_id, &allowed_guilds, &client) .await { debug!( @@ -400,6 +443,14 @@ impl ChannelAdapter for DiscordAdapter { ChannelContent::Text(text) => { self.api_send_message(channel_id, &text).await?; } + ChannelContent::File { url, filename } => { + self.api_send_file(channel_id, &url, &filename, None) + .await?; + } + ChannelContent::Image { url, caption } => { + self.api_send_file(channel_id, &url, "image.jpg", caption.as_deref()) + .await?; + } _ => { self.api_send_message(channel_id, "(Unsupported content type)") .await?; @@ -419,10 +470,14 @@ impl ChannelAdapter for DiscordAdapter { } /// Parse a Discord MESSAGE_CREATE or MESSAGE_UPDATE payload into a `ChannelMessage`. +/// +/// Handles text messages, slash commands, and attachments (images, files). +/// Attachments are downloaded and processed: images via Gemini Vision, text files extracted. async fn parse_discord_message( d: &serde_json::Value, bot_user_id: &Arc>>, allowed_guilds: &[u64], + client: &reqwest::Client, ) -> Option { let author = d.get("author")?; let author_id = author["id"].as_str()?; @@ -450,7 +505,11 @@ async fn parse_discord_message( } let content_text = d["content"].as_str().unwrap_or(""); - if content_text.is_empty() { + let attachments = d["attachments"].as_array(); + let has_attachments = attachments.map_or(false, |a| !a.is_empty()); + + // Skip if no content AND no attachments + if content_text.is_empty() && !has_attachments { return None; } @@ -470,12 +529,51 @@ async fn parse_discord_message( .map(|dt| dt.with_timezone(&chrono::Utc)) .unwrap_or_else(chrono::Utc::now); - // Parse commands (messages starting with /) - let content = if content_text.starts_with('/') { - let parts: Vec<&str> = content_text.splitn(2, ' ').collect(); - let cmd_name = &parts[0][1..]; - let args = if parts.len() > 1 { - parts[1].split_whitespace().map(String::from).collect() + // Process attachments into text descriptions + let mut parts: Vec = Vec::new(); + + if has_attachments { + for att in attachments.unwrap() { + let att_url = att["url"].as_str().unwrap_or(""); + let att_filename = att["filename"].as_str().unwrap_or("file"); + let att_content_type = att["content_type"].as_str().unwrap_or("application/octet-stream"); + let att_size = att["size"].as_u64().unwrap_or(0); + + if att_url.is_empty() { + continue; + } + + info!( + "Discord: attachment '{}' ({} bytes) from {}", + att_filename, att_size, display_name + ); + + let description = crate::media_utils::process_attachment_to_text( + client, + att_url, + att_filename, + att_content_type, + att_size, + content_text, + ) + .await; + parts.push(description); + } + } + + // Build final content + let content = if !parts.is_empty() { + // Has attachments — combine text + attachment descriptions + if !content_text.is_empty() && !has_attachments_only_images(attachments) { + parts.insert(0, content_text.to_string()); + } + ChannelContent::Text(parts.join("\n")) + } else if content_text.starts_with('/') { + // Parse commands (messages starting with /) + let cmd_parts: Vec<&str> = content_text.splitn(2, ' ').collect(); + let cmd_name = &cmd_parts[0][1..]; + let args = if cmd_parts.len() > 1 { + cmd_parts[1].split_whitespace().map(String::from).collect() } else { vec![] }; @@ -504,6 +602,17 @@ async fn parse_discord_message( }) } +/// Check if all attachments are images (to avoid duplicating text content as caption). +fn has_attachments_only_images(attachments: Option<&Vec>) -> bool { + attachments.map_or(true, |atts| { + atts.iter().all(|a| { + let ct = a["content_type"].as_str().unwrap_or(""); + let fname = a["filename"].as_str().unwrap_or(""); + crate::media_utils::is_image(fname, ct) + }) + }) +} + #[cfg(test)] mod tests { use super::*; @@ -524,7 +633,7 @@ mod tests { "timestamp": "2024-01-01T00:00:00+00:00" }); - let msg = parse_discord_message(&d, &bot_id, &[]).await.unwrap(); + let msg = parse_discord_message(&d, &bot_id, &[], &reqwest::Client::new()).await.unwrap(); assert_eq!(msg.channel, ChannelType::Discord); assert_eq!(msg.sender.display_name, "alice"); assert_eq!(msg.sender.platform_id, "ch1"); @@ -546,7 +655,7 @@ mod tests { "timestamp": "2024-01-01T00:00:00+00:00" }); - let msg = parse_discord_message(&d, &bot_id, &[]).await; + let msg = parse_discord_message(&d, &bot_id, &[], &reqwest::Client::new()).await; assert!(msg.is_none()); } @@ -566,7 +675,7 @@ mod tests { "timestamp": "2024-01-01T00:00:00+00:00" }); - let msg = parse_discord_message(&d, &bot_id, &[]).await; + let msg = parse_discord_message(&d, &bot_id, &[], &reqwest::Client::new()).await; assert!(msg.is_none()); } @@ -587,11 +696,11 @@ mod tests { }); // Not in allowed guilds - let msg = parse_discord_message(&d, &bot_id, &[111, 222]).await; + let msg = parse_discord_message(&d, &bot_id, &[111, 222], &reqwest::Client::new()).await; assert!(msg.is_none()); // In allowed guilds - let msg = parse_discord_message(&d, &bot_id, &[999]).await; + let msg = parse_discord_message(&d, &bot_id, &[999], &reqwest::Client::new()).await; assert!(msg.is_some()); } @@ -610,7 +719,7 @@ mod tests { "timestamp": "2024-01-01T00:00:00+00:00" }); - let msg = parse_discord_message(&d, &bot_id, &[]).await.unwrap(); + let msg = parse_discord_message(&d, &bot_id, &[], &reqwest::Client::new()).await.unwrap(); match &msg.content { ChannelContent::Command { name, args } => { assert_eq!(name, "agent"); @@ -635,7 +744,7 @@ mod tests { "timestamp": "2024-01-01T00:00:00+00:00" }); - let msg = parse_discord_message(&d, &bot_id, &[]).await; + let msg = parse_discord_message(&d, &bot_id, &[], &reqwest::Client::new()).await; assert!(msg.is_none()); } @@ -654,7 +763,7 @@ mod tests { "timestamp": "2024-01-01T00:00:00+00:00" }); - let msg = parse_discord_message(&d, &bot_id, &[]).await.unwrap(); + let msg = parse_discord_message(&d, &bot_id, &[], &reqwest::Client::new()).await.unwrap(); assert_eq!(msg.sender.display_name, "alice#1234"); } @@ -676,7 +785,7 @@ mod tests { }); // MESSAGE_UPDATE uses the same parse function as MESSAGE_CREATE - let msg = parse_discord_message(&d, &bot_id, &[]).await.unwrap(); + let msg = parse_discord_message(&d, &bot_id, &[], &reqwest::Client::new()).await.unwrap(); assert_eq!(msg.channel, ChannelType::Discord); assert!( matches!(msg.content, ChannelContent::Text(ref t) if t == "Edited message content") diff --git a/crates/openfang-channels/src/lib.rs b/crates/openfang-channels/src/lib.rs index 978c202e7..e8563cc3b 100644 --- a/crates/openfang-channels/src/lib.rs +++ b/crates/openfang-channels/src/lib.rs @@ -7,6 +7,7 @@ pub mod bridge; pub mod discord; pub mod email; pub mod formatter; +pub mod media_utils; pub mod google_chat; pub mod irc; pub mod matrix; diff --git a/crates/openfang-channels/src/media_utils.rs b/crates/openfang-channels/src/media_utils.rs new file mode 100644 index 000000000..6b5dfee7e --- /dev/null +++ b/crates/openfang-channels/src/media_utils.rs @@ -0,0 +1,222 @@ +//! Shared media processing utilities for channel adapters. +//! +//! Provides Gemini Vision image recognition, text file detection, +//! and HTTP download helpers used by Telegram, Discord, and other adapters. + +use std::time::Duration; +use tracing::warn; + +/// Recognize image content using Gemini Vision API. +/// +/// Returns a human-readable description of the image. On failure, returns +/// an error string wrapped in brackets (e.g., `[Image recognition failed: ...]`). +pub async fn recognize_image_gemini( + client: &reqwest::Client, + image_data: &[u8], + user_text: &str, +) -> String { + let gemini_key = match std::env::var("GEMINI_API_KEY") { + Ok(key) => key, + Err(_) => { + warn!("GEMINI_API_KEY not set, cannot perform image recognition"); + return "[Image recognition unavailable: GEMINI_API_KEY not configured]".to_string(); + } + }; + + // Detect MIME type from magic bytes + let mime = detect_image_mime(image_data); + + use base64::Engine; + let b64_data = base64::engine::general_purpose::STANDARD.encode(image_data); + + let prompt = if user_text.is_empty() { + "Describe this image in detail." + } else { + user_text + }; + + let payload = serde_json::json!({ + "contents": [{ + "parts": [ + {"text": prompt}, + { + "inline_data": { + "mime_type": mime, + "data": b64_data, + } + } + ] + }], + "generationConfig": { + "temperature": 0.4, + "maxOutputTokens": 2048, + } + }); + + let url = format!( + "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent?key={gemini_key}" + ); + + match client + .post(&url) + .json(&payload) + .timeout(Duration::from_secs(60)) + .send() + .await + { + Ok(resp) => { + if resp.status().is_success() { + match resp.json::().await { + Ok(result) => { + if let Some(candidates) = result["candidates"].as_array() { + if let Some(first) = candidates.first() { + if let Some(parts) = first["content"]["parts"].as_array() { + let text: String = parts + .iter() + .filter_map(|p| p["text"].as_str()) + .collect::>() + .join(" "); + if !text.is_empty() { + return text; + } + } + } + } + "[Image recognition returned no result]".to_string() + } + Err(e) => { + warn!("Gemini Vision parse error: {e}"); + format!("[Image recognition parse error: {e}]") + } + } + } else { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + warn!( + "Gemini Vision error [{status}]: {}", + openfang_types::truncate_str(&body, 200) + ); + format!("[Image recognition failed: HTTP {status}]") + } + } + Err(e) => { + warn!("Gemini Vision request error: {e}"); + format!("[Image recognition request error: {e}]") + } + } +} + +/// Detect image MIME type from magic bytes. +pub fn detect_image_mime(data: &[u8]) -> &'static str { + if data.starts_with(b"\x89PNG\r\n\x1a\n") { + "image/png" + } else if data.starts_with(b"GIF8") { + "image/gif" + } else if data.len() >= 12 && &data[..4] == b"RIFF" && &data[8..12] == b"WEBP" { + "image/webp" + } else { + "image/jpeg" + } +} + +/// Check if a file is text-based by extension or MIME type. +pub fn is_text_file(filename: &str, mime_type: &str) -> bool { + const TEXT_EXTENSIONS: &[&str] = &[ + ".txt", ".md", ".py", ".js", ".ts", ".json", ".csv", ".xml", ".html", ".log", ".yaml", + ".yml", ".toml", ".ini", ".cfg", ".conf", ".sh", ".bat", ".rs", ".go", ".java", ".c", + ".cpp", ".h", ".hpp", ".css", ".sql", ".rb", ".php", ".lua", + ]; + let lower = filename.to_lowercase(); + TEXT_EXTENSIONS.iter().any(|ext| lower.ends_with(ext)) + || mime_type.starts_with("text/") + || mime_type == "application/json" + || mime_type == "application/xml" +} + +/// Check if a MIME type or filename represents an image. +pub fn is_image(filename: &str, content_type: &str) -> bool { + content_type.starts_with("image/") + || filename + .to_lowercase() + .ends_with(".jpg") + || filename.to_lowercase().ends_with(".jpeg") + || filename.to_lowercase().ends_with(".png") + || filename.to_lowercase().ends_with(".gif") + || filename.to_lowercase().ends_with(".webp") + || filename.to_lowercase().ends_with(".bmp") +} + +/// Download a file from a URL, returning the bytes on success. +pub async fn download_url(client: &reqwest::Client, url: &str) -> Option> { + match client + .get(url) + .timeout(Duration::from_secs(30)) + .send() + .await + { + Ok(resp) => { + if resp.status().is_success() { + resp.bytes().await.ok().map(|b| b.to_vec()) + } else { + warn!("Download failed [{}]: {}", resp.status(), url); + None + } + } + Err(e) => { + warn!("Download error for {}: {e}", url); + None + } + } +} + +/// Process an attachment (image or file) into a text description. +/// +/// - Images: recognized via Gemini Vision +/// - Text files (<50KB): content extracted +/// - Other files: metadata reported +pub async fn process_attachment_to_text( + client: &reqwest::Client, + url: &str, + filename: &str, + content_type: &str, + file_size: u64, + caption: &str, +) -> String { + let bytes = match download_url(client, url).await { + Some(b) => b, + None => { + return format!( + "[File {filename} download failed (size: {file_size} bytes)]" + ); + } + }; + + if is_image(filename, content_type) { + let recognition = recognize_image_gemini(client, &bytes, caption).await; + if caption.is_empty() { + format!("[User sent an image: {filename}]\nImage content: {recognition}") + } else { + format!("[User sent an image: {filename}, saying: {caption}]\nImage content: {recognition}") + } + } else if is_text_file(filename, content_type) && bytes.len() < 50_000 { + match std::str::from_utf8(&bytes) { + Ok(content) => { + let truncated = openfang_types::truncate_str(content, 3000); + let mut text = format!( + "[User sent file: {filename} (size: {file_size} bytes)]\nFile content:\n```\n{truncated}\n```" + ); + if content.len() > 3000 { + text.push_str(&format!("\n... ({} chars total, truncated)", content.len())); + } + text + } + Err(_) => { + format!("[User sent file: {filename} (size: {file_size} bytes, binary file)]") + } + } + } else { + format!( + "[User sent file: {filename} (size: {file_size} bytes, type: {content_type})]" + ) + } +} diff --git a/crates/openfang-channels/src/telegram.rs b/crates/openfang-channels/src/telegram.rs index ddde03921..8671cdaaf 100644 --- a/crates/openfang-channels/src/telegram.rs +++ b/crates/openfang-channels/src/telegram.rs @@ -97,6 +97,44 @@ impl TelegramAdapter { Ok(()) } + /// Send a file via `sendDocument` on the Telegram API. + async fn api_send_document( + &self, + chat_id: i64, + file_url: &str, + filename: &str, + caption: Option<&str>, + ) -> Result<(), Box> { + let url = format!( + "https://api.telegram.org/bot{}/sendDocument", + self.token.as_str() + ); + + // Download the file first, then upload via multipart + let file_bytes = crate::media_utils::download_url(&self.client, file_url) + .await + .ok_or("Failed to download file for sending")?; + + let part = reqwest::multipart::Part::bytes(file_bytes.to_vec()) + .file_name(filename.to_string()) + .mime_str("application/octet-stream")?; + + let mut form = reqwest::multipart::Form::new() + .text("chat_id", chat_id.to_string()) + .part("document", part); + + if let Some(cap) = caption { + form = form.text("caption", cap.to_string()); + } + + let resp = self.client.post(&url).multipart(form).send().await?; + if !resp.status().is_success() { + let body_text = resp.text().await.unwrap_or_default(); + warn!("Telegram sendDocument failed: {body_text}"); + } + Ok(()) + } + /// Call `sendChatAction` to show "typing..." indicator. async fn api_send_typing(&self, chat_id: i64) -> Result<(), Box> { let url = format!( @@ -243,10 +281,21 @@ impl ChannelAdapter for TelegramAdapter { offset = Some(update_id + 1); } - // Parse the message + // Parse text messages first, then try multimedia handlers let msg = match parse_telegram_update(update, &allowed_users) { Some(m) => m, - None => continue, // filtered out or unparseable + None => { + // Not a text message — try voice, photo, document handlers + if let Some(m) = try_handle_voice(update, &allowed_users, &client, &token).await { + m + } else if let Some(m) = try_handle_photo(update, &allowed_users, &client, &token).await { + m + } else if let Some(m) = try_handle_document(update, &allowed_users, &client, &token).await { + m + } else { + continue; + } + } }; debug!( @@ -285,6 +334,14 @@ impl ChannelAdapter for TelegramAdapter { ChannelContent::Text(text) => { self.api_send_message(chat_id, &text).await?; } + ChannelContent::File { url, filename } => { + self.api_send_document(chat_id, &url, &filename, None) + .await?; + } + ChannelContent::Image { url, caption } => { + self.api_send_document(chat_id, &url, "image.jpg", caption.as_deref()) + .await?; + } _ => { self.api_send_message(chat_id, "(Unsupported content type)") .await?; @@ -388,6 +445,378 @@ fn parse_telegram_update( }) } +/// Handle voice messages from Telegram by downloading and transcribing with Groq Whisper. +/// +/// Returns a `ChannelMessage` with the transcription as text, or `None` if not a voice message +/// or transcription fails. +async fn try_handle_voice( + update: &serde_json::Value, + allowed_users: &[i64], + client: &reqwest::Client, + token: &str, +) -> Option { + let message = update.get("message")?; + let voice = message.get("voice")?; + let from = message.get("from")?; + let user_id = from["id"].as_i64()?; + + if !allowed_users.is_empty() && !allowed_users.contains(&user_id) { + return None; + } + + let chat_id = message["chat"]["id"].as_i64()?; + let first_name = from["first_name"].as_str().unwrap_or("Unknown"); + let last_name = from["last_name"].as_str().unwrap_or(""); + let display_name = if last_name.is_empty() { + first_name.to_string() + } else { + format!("{first_name} {last_name}") + }; + let chat_type = message["chat"]["type"].as_str().unwrap_or("private"); + let is_group = chat_type == "group" || chat_type == "supergroup"; + let message_id = message["message_id"].as_i64().unwrap_or(0); + let timestamp = message["date"] + .as_i64() + .and_then(|ts| chrono::DateTime::from_timestamp(ts, 0)) + .unwrap_or_else(chrono::Utc::now); + + let file_id = voice["file_id"].as_str()?; + let _duration = voice["duration"].as_u64().unwrap_or(0); + info!("Telegram: voice message from {display_name}, downloading..."); + + // Step 1: Get file path via Telegram getFile API + let get_file_url = format!("https://api.telegram.org/bot{token}/getFile?file_id={file_id}"); + let file_resp: serde_json::Value = client.get(&get_file_url).send().await.ok()?.json().await.ok()?; + let file_path = file_resp["result"]["file_path"].as_str()?; + + // Step 2: Download the audio file + let download_url = format!("https://api.telegram.org/file/bot{token}/{file_path}"); + let audio_bytes = client.get(&download_url).send().await.ok()?.bytes().await.ok()?; + + // Step 3: Transcribe with Groq Whisper (or OpenAI Whisper as fallback) + let transcription = transcribe_audio(client, &audio_bytes, file_path).await; + let text = match transcription { + Some(t) if !t.is_empty() => t, + _ => { + warn!("Telegram: voice transcription failed for {display_name}"); + "[Voice message received, but transcription failed]".to_string() + } + }; + + info!("Telegram: voice transcribed for {display_name}: {}", openfang_types::truncate_str(&text, 100)); + + Some(ChannelMessage { + channel: ChannelType::Telegram, + platform_message_id: message_id.to_string(), + sender: ChannelUser { + platform_id: chat_id.to_string(), + display_name, + openfang_user: None, + }, + content: ChannelContent::Text(text), + target_agent: None, + timestamp, + is_group, + thread_id: None, + metadata: HashMap::new(), + }) +} + +/// Transcribe audio bytes using Groq Whisper or OpenAI Whisper. +async fn transcribe_audio( + client: &reqwest::Client, + audio_bytes: &[u8], + filename: &str, +) -> Option { + // Determine MIME type and normalize filename for API compatibility. + // Telegram uses .oga (Ogg Opus) which Groq doesn't recognize — rename to .ogg. + let (mime, upload_filename) = if filename.ends_with(".oga") || filename.ends_with(".ogg") { + ("audio/ogg", filename.replace(".oga", ".ogg")) + } else if filename.ends_with(".mp3") { + ("audio/mpeg", filename.to_string()) + } else if filename.ends_with(".wav") { + ("audio/wav", filename.to_string()) + } else if filename.ends_with(".m4a") { + ("audio/mp4", filename.to_string()) + } else { + ("audio/ogg", format!("{filename}.ogg")) + }; + + // Try Groq Whisper first (fast, free tier) + if let Ok(groq_key) = std::env::var("GROQ_API_KEY") { + let form = reqwest::multipart::Form::new() + .part( + "file", + reqwest::multipart::Part::bytes(audio_bytes.to_vec()) + .file_name(upload_filename.clone()) + .mime_str(mime) + .ok()?, + ) + .text("model", "whisper-large-v3-turbo") + .text("response_format", "json"); + + match client + .post("https://api.groq.com/openai/v1/audio/transcriptions") + .bearer_auth(&groq_key) + .multipart(form) + .send() + .await + { + Ok(resp) => { + let status = resp.status(); + match resp.json::().await { + Ok(result) => { + if let Some(text) = result["text"].as_str() { + return Some(text.to_string()); + } + warn!("Groq Whisper: no 'text' in response (status={status}): {result}"); + } + Err(e) => { + warn!("Groq Whisper: failed to parse response (status={status}): {e}"); + } + } + } + Err(e) => { + warn!("Groq Whisper: request failed: {e}"); + } + } + } else { + warn!("GROQ_API_KEY not set, skipping Groq Whisper"); + } + + // Fallback: OpenAI Whisper + if let Ok(openai_key) = std::env::var("OPENAI_API_KEY") { + let form = reqwest::multipart::Form::new() + .part( + "file", + reqwest::multipart::Part::bytes(audio_bytes.to_vec()) + .file_name(upload_filename.clone()) + .mime_str(mime) + .ok()?, + ) + .text("model", "whisper-1") + .text("response_format", "json"); + + match client + .post("https://api.openai.com/v1/audio/transcriptions") + .bearer_auth(&openai_key) + .multipart(form) + .send() + .await + { + Ok(resp) => { + let status = resp.status(); + match resp.json::().await { + Ok(result) => { + if let Some(text) = result["text"].as_str() { + return Some(text.to_string()); + } + warn!("OpenAI Whisper: no 'text' in response (status={status}): {result}"); + } + Err(e) => { + warn!("OpenAI Whisper: failed to parse response (status={status}): {e}"); + } + } + } + Err(e) => { + warn!("OpenAI Whisper: request failed: {e}"); + } + } + } + + warn!("Voice transcription: all providers failed for file '{filename}' ({} bytes, mime={mime})", audio_bytes.len()); + None +} + +/// Handle photo messages from Telegram by downloading and recognizing with Gemini Vision. +/// +/// Picks the largest available photo size. Captions are used as the recognition prompt. +/// Returns a `ChannelMessage` with the recognition result as text. +async fn try_handle_photo( + update: &serde_json::Value, + allowed_users: &[i64], + client: &reqwest::Client, + token: &str, +) -> Option { + let message = update.get("message")?; + let photos = message.get("photo")?.as_array()?; + if photos.is_empty() { + return None; + } + let from = message.get("from")?; + let user_id = from["id"].as_i64()?; + + if !allowed_users.is_empty() && !allowed_users.contains(&user_id) { + return None; + } + + let chat_id = message["chat"]["id"].as_i64()?; + let first_name = from["first_name"].as_str().unwrap_or("Unknown"); + let last_name = from["last_name"].as_str().unwrap_or(""); + let display_name = if last_name.is_empty() { + first_name.to_string() + } else { + format!("{first_name} {last_name}") + }; + let chat_type = message["chat"]["type"].as_str().unwrap_or("private"); + let is_group = chat_type == "group" || chat_type == "supergroup"; + let message_id = message["message_id"].as_i64().unwrap_or(0); + let timestamp = message["date"] + .as_i64() + .and_then(|ts| chrono::DateTime::from_timestamp(ts, 0)) + .unwrap_or_else(chrono::Utc::now); + + // Pick the largest photo (last in the array — Telegram sorts by size ascending) + let photo = photos.last()?; + let file_id = photo["file_id"].as_str()?; + let caption = message["caption"].as_str().unwrap_or(""); + + info!("Telegram: photo from {display_name}, downloading..."); + + // Download via Telegram getFile API + let get_file_url = format!("https://api.telegram.org/bot{token}/getFile?file_id={file_id}"); + let file_resp: serde_json::Value = + client.get(&get_file_url).send().await.ok()?.json().await.ok()?; + let file_path = file_resp["result"]["file_path"].as_str()?; + let download_url = format!("https://api.telegram.org/file/bot{token}/{file_path}"); + let image_bytes = client.get(&download_url).send().await.ok()?.bytes().await.ok()?; + + // Recognize with Gemini Vision + let recognition = crate::media_utils::recognize_image_gemini(client, &image_bytes, caption).await; + + let text = if caption.is_empty() { + format!("[User sent a photo]\nImage content: {recognition}") + } else { + format!("[User sent a photo with caption: {caption}]\nImage content: {recognition}") + }; + + info!( + "Telegram: photo recognized for {display_name}: {}", + openfang_types::truncate_str(&text, 100) + ); + + Some(ChannelMessage { + channel: ChannelType::Telegram, + platform_message_id: message_id.to_string(), + sender: ChannelUser { + platform_id: chat_id.to_string(), + display_name, + openfang_user: None, + }, + content: ChannelContent::Text(text), + target_agent: None, + timestamp, + is_group, + thread_id: None, + metadata: HashMap::new(), + }) +} + +/// Handle document/file messages from Telegram. +/// +/// Downloads the file and: +/// - If image: recognizes with Gemini Vision +/// - If text file (<50KB): extracts content +/// - Otherwise: reports filename, size, and MIME type +async fn try_handle_document( + update: &serde_json::Value, + allowed_users: &[i64], + client: &reqwest::Client, + token: &str, +) -> Option { + let message = update.get("message")?; + let document = message.get("document")?; + let from = message.get("from")?; + let user_id = from["id"].as_i64()?; + + if !allowed_users.is_empty() && !allowed_users.contains(&user_id) { + return None; + } + + let chat_id = message["chat"]["id"].as_i64()?; + let first_name = from["first_name"].as_str().unwrap_or("Unknown"); + let last_name = from["last_name"].as_str().unwrap_or(""); + let display_name = if last_name.is_empty() { + first_name.to_string() + } else { + format!("{first_name} {last_name}") + }; + let chat_type = message["chat"]["type"].as_str().unwrap_or("private"); + let is_group = chat_type == "group" || chat_type == "supergroup"; + let message_id = message["message_id"].as_i64().unwrap_or(0); + let timestamp = message["date"] + .as_i64() + .and_then(|ts| chrono::DateTime::from_timestamp(ts, 0)) + .unwrap_or_else(chrono::Utc::now); + + let file_name = document["file_name"].as_str().unwrap_or("unknown_file"); + let file_size = document["file_size"].as_u64().unwrap_or(0); + let mime_type = document["mime_type"] + .as_str() + .unwrap_or("application/octet-stream"); + let file_id = document["file_id"].as_str()?; + let caption = message["caption"].as_str().unwrap_or(""); + + info!("Telegram: document '{file_name}' ({file_size} bytes) from {display_name}"); + + // Download via Telegram getFile API (bots can download files up to 20MB) + let get_file_url = format!("https://api.telegram.org/bot{token}/getFile?file_id={file_id}"); + let file_resp: serde_json::Value = + client.get(&get_file_url).send().await.ok()?.json().await.ok()?; + let file_path = file_resp["result"]["file_path"].as_str()?; + let download_url = format!("https://api.telegram.org/file/bot{token}/{file_path}"); + let file_bytes = client.get(&download_url).send().await.ok()?.bytes().await.ok()?; + + let mut text = + format!("[User sent file: {file_name} (size: {file_size} bytes, type: {mime_type})]"); + + // If it's an image sent as document (uncompressed), use Gemini Vision + if mime_type.starts_with("image/") { + let recognition = crate::media_utils::recognize_image_gemini(client, &file_bytes, caption).await; + text.push_str(&format!("\nImage content: {recognition}")); + } + // If it's a text-like file and small enough, extract content + else if crate::media_utils::is_text_file(file_name, mime_type) && file_bytes.len() < 50_000 { + match std::str::from_utf8(&file_bytes) { + Ok(content) => { + let truncated = openfang_types::truncate_str(content, 3000); + text.push_str(&format!("\nFile content:\n```\n{truncated}\n```")); + if content.len() > 3000 { + text.push_str(&format!("\n... ({} chars total, truncated)", content.len())); + } + } + Err(_) => { + text.push_str("\n[Binary file, cannot display text content]"); + } + } + } + + if !caption.is_empty() { + text.push_str(&format!("\nUser caption: {caption}")); + } + + info!( + "Telegram: document processed for {display_name}: {}", + openfang_types::truncate_str(&text, 100) + ); + + Some(ChannelMessage { + channel: ChannelType::Telegram, + platform_message_id: message_id.to_string(), + sender: ChannelUser { + platform_id: chat_id.to_string(), + display_name, + openfang_user: None, + }, + content: ChannelContent::Text(text), + target_agent: None, + timestamp, + is_group, + thread_id: None, + metadata: HashMap::new(), + }) +} + /// Calculate exponential backoff capped at MAX_BACKOFF. pub fn calculate_backoff(current: Duration) -> Duration { (current * 2).min(MAX_BACKOFF) From 209258af733cc0076520916d75882455895a0717 Mon Sep 17 00:00:00 2001 From: slysian Date: Tue, 3 Mar 2026 09:44:53 +0800 Subject: [PATCH 2/2] refactor: extract hardcoded model IDs and API URLs to env vars Replace hardcoded model names and API endpoints with LazyLock statics that read from environment variables at first use, with sensible defaults: - VISION_MODEL (default: gemini-2.5-flash) - VISION_API_BASE (default: generativelanguage.googleapis.com/v1beta) - GROQ_STT_MODEL (default: whisper-large-v3-turbo) - GROQ_STT_URL (default: api.groq.com/openai/v1/audio/transcriptions) - OPENAI_STT_MODEL (default: whisper-1) - OPENAI_STT_URL (default: api.openai.com/v1/audio/transcriptions) This allows users to swap models or providers without recompiling. Co-Authored-By: Claude Opus 4.6 --- crates/openfang-channels/src/media_utils.rs | 16 ++++++++++- crates/openfang-channels/src/telegram.rs | 32 ++++++++++++++++++--- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/crates/openfang-channels/src/media_utils.rs b/crates/openfang-channels/src/media_utils.rs index 6b5dfee7e..481030586 100644 --- a/crates/openfang-channels/src/media_utils.rs +++ b/crates/openfang-channels/src/media_utils.rs @@ -3,9 +3,22 @@ //! Provides Gemini Vision image recognition, text file detection, //! and HTTP download helpers used by Telegram, Discord, and other adapters. +use std::sync::LazyLock; use std::time::Duration; use tracing::warn; +/// Vision model for image recognition (override via `VISION_MODEL` env var). +static VISION_MODEL: LazyLock = LazyLock::new(|| { + std::env::var("VISION_MODEL").unwrap_or_else(|_| "gemini-2.5-flash".to_string()) +}); + +/// Vision API base URL (override via `VISION_API_BASE` env var). +static VISION_API_BASE: LazyLock = LazyLock::new(|| { + std::env::var("VISION_API_BASE").unwrap_or_else(|_| { + "https://generativelanguage.googleapis.com/v1beta".to_string() + }) +}); + /// Recognize image content using Gemini Vision API. /// /// Returns a human-readable description of the image. On failure, returns @@ -54,7 +67,8 @@ pub async fn recognize_image_gemini( }); let url = format!( - "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent?key={gemini_key}" + "{}/models/{}:generateContent?key={gemini_key}", + *VISION_API_BASE, *VISION_MODEL ); match client diff --git a/crates/openfang-channels/src/telegram.rs b/crates/openfang-channels/src/telegram.rs index 8671cdaaf..fb3334f94 100644 --- a/crates/openfang-channels/src/telegram.rs +++ b/crates/openfang-channels/src/telegram.rs @@ -16,6 +16,30 @@ use tokio::sync::{mpsc, watch}; use tracing::{debug, error, info, warn}; use zeroize::Zeroizing; +use std::sync::LazyLock; + +/// Groq STT model (override via `GROQ_STT_MODEL` env var). +static GROQ_STT_MODEL: LazyLock = LazyLock::new(|| { + std::env::var("GROQ_STT_MODEL").unwrap_or_else(|_| "whisper-large-v3-turbo".to_string()) +}); + +/// Groq STT API URL (override via `GROQ_STT_URL` env var). +static GROQ_STT_URL: LazyLock = LazyLock::new(|| { + std::env::var("GROQ_STT_URL") + .unwrap_or_else(|_| "https://api.groq.com/openai/v1/audio/transcriptions".to_string()) +}); + +/// OpenAI STT model (override via `OPENAI_STT_MODEL` env var). +static OPENAI_STT_MODEL: LazyLock = LazyLock::new(|| { + std::env::var("OPENAI_STT_MODEL").unwrap_or_else(|_| "whisper-1".to_string()) +}); + +/// OpenAI STT API URL (override via `OPENAI_STT_URL` env var). +static OPENAI_STT_URL: LazyLock = LazyLock::new(|| { + std::env::var("OPENAI_STT_URL") + .unwrap_or_else(|_| "https://api.openai.com/v1/audio/transcriptions".to_string()) +}); + /// Maximum backoff duration on API failures. const MAX_BACKOFF: Duration = Duration::from_secs(60); /// Initial backoff duration on API failures. @@ -552,11 +576,11 @@ async fn transcribe_audio( .mime_str(mime) .ok()?, ) - .text("model", "whisper-large-v3-turbo") + .text("model", GROQ_STT_MODEL.as_str()) .text("response_format", "json"); match client - .post("https://api.groq.com/openai/v1/audio/transcriptions") + .post(GROQ_STT_URL.as_str()) .bearer_auth(&groq_key) .multipart(form) .send() @@ -594,11 +618,11 @@ async fn transcribe_audio( .mime_str(mime) .ok()?, ) - .text("model", "whisper-1") + .text("model", OPENAI_STT_MODEL.as_str()) .text("response_format", "json"); match client - .post("https://api.openai.com/v1/audio/transcriptions") + .post(OPENAI_STT_URL.as_str()) .bearer_auth(&openai_key) .multipart(form) .send()