diff --git a/.jules/sentinel.md b/.jules/sentinel.md new file mode 100644 index 0000000..28a6924 --- /dev/null +++ b/.jules/sentinel.md @@ -0,0 +1,4 @@ +## 2024-05-23 - Markdown Injection in Telegram Messages +**Vulnerability:** User-controlled input (usernames, search queries, article titles) was directly inserted into Markdown-formatted messages without escaping. +**Learning:** Even in non-web applications like Telegram bots, input validation and output encoding are critical. Unescaped characters like `*` and `_` can break message formatting, leading to denial of service (message send failure) or spoofing. +**Prevention:** Always escape user input before interpolating it into message templates. Use a helper function like `escape_markdown_v1` for Legacy Markdown mode. diff --git a/functions/security_utils.py b/functions/security_utils.py new file mode 100644 index 0000000..3507929 --- /dev/null +++ b/functions/security_utils.py @@ -0,0 +1,22 @@ +""" +Security Utility Functions +""" + +import re + +def escape_markdown_v1(text: str) -> str: + """ + Helper function to escape telegram markup symbols. + Escapes: '_', '*', '`', '[' + """ + if not text: + return "" + # Escape markdown characters + escape_chars = r'[_*`\[]' + return re.sub(f'({escape_chars})', r'\\\1', str(text)) + +def sanitize_html(text: str) -> str: + """ + Sanitize HTML tags from text. + """ + return re.sub(r'<[^>]*>', '', text) diff --git a/functions/telegram_bot.py b/functions/telegram_bot.py index 898d961..4e6be68 100644 --- a/functions/telegram_bot.py +++ b/functions/telegram_bot.py @@ -46,6 +46,7 @@ async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle /start command - welcome message.""" from .user_storage import get_user_language from .translations import t + from .security_utils import escape_markdown_v1 user = update.effective_user telegram_id = user.id @@ -59,7 +60,7 @@ async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE): except Exception as e: print(f"Database not available (running locally?): {e}") - welcome_message = t('welcome', user_lang, username=username) + welcome_message = t('welcome', user_lang, username=escape_markdown_v1(username)) await update.message.reply_text( welcome_message, parse_mode='Markdown' @@ -541,6 +542,7 @@ async def saved_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle /saved command - show saved articles with delete buttons.""" from .user_storage import get_saved_articles, get_user_language from .translations import t + from .security_utils import escape_markdown_v1 import hashlib telegram_id = update.effective_user.id @@ -562,6 +564,7 @@ async def saved_command(update: Update, context: ContextTypes.DEFAULT_TYPE): for i, article in enumerate(articles, 1): title = article.get('title', 'Untitled')[:50] + safe_title = escape_markdown_v1(title) url = article.get('url', '') source = article.get('source', '') category = article.get('category', 'tech') @@ -575,9 +578,9 @@ async def saved_command(update: Update, context: ContextTypes.DEFAULT_TYPE): # Build message line if url.startswith('http'): - message += f"{i}. {emoji} [{title}]({url})" + message += f"{i}. {emoji} [{safe_title}]({url})" else: - message += f"{i}. {emoji} {title}" + message += f"{i}. {emoji} {safe_title}" if date_str: message += f" `{date_str}`" @@ -643,6 +646,7 @@ async def filter_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle /filter command - filter saved articles by category.""" from .user_storage import get_saved_articles, get_user_language from .translations import t + from .security_utils import escape_markdown_v1 telegram_id = update.effective_user.id user_lang = get_user_language(telegram_id) @@ -675,11 +679,12 @@ async def filter_command(update: Update, context: ContextTypes.DEFAULT_TYPE): for i, article in enumerate(articles, 1): title = article.get('title', 'Untitled')[:50] + safe_title = escape_markdown_v1(title) url = article.get('url', '') if url.startswith('http'): - message += f"{i}. [{title}]({url})\n" + message += f"{i}. [{safe_title}]({url})\n" else: - message += f"{i}. {title}\n" + message += f"{i}. {safe_title}\n" try: await update.message.reply_text(message, parse_mode='Markdown', disable_web_page_preview=True) @@ -691,6 +696,7 @@ async def recap_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle /recap command - show weekly summary of saved articles.""" from .user_storage import get_saved_articles, get_user_language from .translations import t + from .security_utils import escape_markdown_v1 from datetime import datetime, timedelta telegram_id = update.effective_user.id @@ -728,14 +734,15 @@ async def recap_command(update: Update, context: ContextTypes.DEFAULT_TYPE): # Show top 5 recent articles for i, article in enumerate(weekly_articles[:5], 1): title = article.get('title', 'Untitled')[:50] + safe_title = escape_markdown_v1(title) url = article.get('url', '') category = article.get('category', 'tech') emoji = cat_emoji.get(category, '🔧') if url.startswith('http'): - message += f"{i}. {emoji} [{title}]({url})\n" + message += f"{i}. {emoji} [{safe_title}]({url})\n" else: - message += f"{i}. {emoji} {title}\n" + message += f"{i}. {emoji} {safe_title}\n" message += f"\n_Total: {len(weekly_articles)} articles this week_" @@ -797,6 +804,7 @@ async def search_command(update: Update, context: ContextTypes.DEFAULT_TYPE): from .user_storage import add_search_history, get_user_language from .rate_limiter import check_rate_limit from .translations import t + from .security_utils import escape_markdown_v1 telegram_id = update.effective_user.id user_lang = get_user_language(telegram_id) @@ -827,7 +835,8 @@ async def search_command(update: Update, context: ContextTypes.DEFAULT_TYPE): add_search_history(telegram_id, query) - await update.message.reply_text(t('searching', user_lang, query=query), parse_mode='Markdown') + safe_query = escape_markdown_v1(query) + await update.message.reply_text(t('searching', user_lang, query=safe_query), parse_mode='Markdown') try: # Fetch news @@ -837,25 +846,27 @@ async def search_command(update: Update, context: ContextTypes.DEFAULT_TYPE): # Filter by query results = [] + query_lower = query.lower() for article in all_news: title = article.get('title', '').lower() - if query in title or any(word in title for word in query.split()): + if query_lower in title or any(word in title for word in query_lower.split()): results.append(article) if not results: await update.message.reply_text( - t('no_results', user_lang, query=query), + t('no_results', user_lang, query=safe_query), parse_mode='Markdown' ) return # Format results - message = t('search_results', user_lang, query=query, count=len(results)) + message = t('search_results', user_lang, query=safe_query, count=len(results)) for i, article in enumerate(results[:10], 1): title = article.get('title', '')[:60] + safe_title = escape_markdown_v1(title) url = article.get('url', '') source = article.get('source', '') - message += f"{i}. [{title}]({url}) _{source}_\n" + message += f"{i}. [{safe_title}]({url}) _{source}_\n" await update.message.reply_text( message,