diff --git a/.github/workflows/telegram-e2e.yml b/.github/workflows/telegram-e2e.yml new file mode 100644 index 00000000..f4290947 --- /dev/null +++ b/.github/workflows/telegram-e2e.yml @@ -0,0 +1,80 @@ +name: telegram-e2e + +# Runs Telethon-based E2E tests against the Zeph Telegram channel on Test DC. +# Gated by repository secrets — skipped automatically when secrets are absent. +# Runs only on push to main (not on PRs) to avoid flakiness from external services. + +on: + push: + branches: [main] + +jobs: + telegram-e2e: + name: Telegram E2E (Test DC) + runs-on: ubuntu-latest + # Skip if the required secrets are not configured in the repository + if: > + secrets.ZEPH_TELEGRAM_TEST_TOKEN != '' && + secrets.TELEGRAM_TEST_API_ID != '' && + secrets.TELEGRAM_TEST_API_HASH != '' && + secrets.TELEGRAM_TEST_SESSION != '' && + secrets.TELEGRAM_TEST_BOT_USERNAME != '' + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.11" + cache: pip + + - name: Install Telethon + run: pip install -r scripts/telegram-e2e/requirements.txt + + - name: Restore Telethon session + # TELEGRAM_TEST_SESSION is the base64-encoded content of test_session.session + run: | + mkdir -p .local/testing + echo "${{ secrets.TELEGRAM_TEST_SESSION }}" | base64 -d > .local/testing/test_session.session + + - name: Build Zeph (telegram feature) + run: cargo build --features full --bin zeph + + - name: Write Zeph config + env: + ZEPH_TELEGRAM_TEST_TOKEN: ${{ secrets.ZEPH_TELEGRAM_TEST_TOKEN }} + run: | + mkdir -p .local/config .local/testing/data .local/testing/debug + cp config/telegram-test.toml .local/config/telegram-test.toml + # Patch allowed_users and use env-backend token for CI (no vault available) + sed -i 's|token = { vault = "ZEPH_TELEGRAM_TEST_TOKEN" }|token = { env = "ZEPH_TELEGRAM_TEST_TOKEN" }|' \ + .local/config/telegram-test.toml + sed -i 's|allowed_users = \["your_test_username"\]|allowed_users = ["${{ secrets.TELEGRAM_TEST_ACCOUNT_USERNAME }}"]|' \ + .local/config/telegram-test.toml + sed -i 's|backend = "age"|backend = "env"|' \ + .local/config/telegram-test.toml + + - name: Start Zeph (background) + env: + ZEPH_TELEGRAM_TEST_TOKEN: ${{ secrets.ZEPH_TELEGRAM_TEST_TOKEN }} + run: | + ./target/debug/zeph --config .local/config/telegram-test.toml --channel telegram \ + 2>.local/testing/debug/telegram-session.log & + # Give the bot 8s to connect and start long-polling + sleep 8 + + - name: Run E2E scenarios + env: + TG_API_ID: ${{ secrets.TELEGRAM_TEST_API_ID }} + TG_API_HASH: ${{ secrets.TELEGRAM_TEST_API_HASH }} + TG_BOT_USERNAME: ${{ secrets.TELEGRAM_TEST_BOT_USERNAME }} + TG_SESSION_PATH: .local/testing/test_session.session + run: python3 scripts/telegram-e2e/telegram_e2e.py + + - name: Upload debug log on failure + if: failure() + uses: actions/upload-artifact@v4 + with: + name: telegram-session-log + path: .local/testing/debug/telegram-session.log + retention-days: 7 diff --git a/.gitignore b/.gitignore index a135934d..e4190fc2 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ data/ book/book/ *.log zeph.log +*.session diff --git a/CHANGELOG.md b/CHANGELOG.md index 6afdb05a..ef398ef2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). - test(channels): add injectable test transport to `TelegramChannel` (#2121) — `new_test()` constructor under `#[cfg(test)]` exposes an `mpsc::Sender` so all channel behavioral paths can be tested without a real bot token or live Telegram API; 12 new tests cover `recv()` message delivery, `/reset` and `/skills` command routing, unknown-command passthrough, channel-close returning `None`, text accumulation in `send_chunk()`, `flush_chunks()` state clearing, the `/start` welcome path via wiremock, `flush_chunks()` with `message_id` via wiremock, and `confirm()` timeout/close/yes/no logic at the rx-timeout level; adds `wiremock` and tokio `test-util` to dev-dependencies - test(tools): add integration tests for `FileExecutor` sandbox access controls (#2117) — 15 tests in `crates/zeph-tools/tests/file_access.rs` covering read/write inside sandbox, sandbox violation on outside paths, symlink escape (single and chained, unix-only), path traversal blocking, multiple allowed paths, empty allowed-paths CWD fallback, tilde regression (#2115), delete/move/copy cross-boundary blocking, `find_path` result filtering to sandbox, `grep` default-path sandbox validation, and nonexistent allowed path resilience +- test(channels): add Telethon-based E2E test suite for Telegram channel using Telegram Test DC (#2122) — `scripts/telegram-e2e/telegram_e2e.py` connects as a MTProto user account, sends 8 scripted scenarios to the Zeph bot, and asserts on replies: `/start` welcome message, math (347×89), `/reset`, `/skills` (MarkdownV2 escaping), empty document (no reply), long output (≥2 split messages), streaming (first chunk latency), and unauthorized-user silence; `setup_tg_test_account.py` creates a persistent Test DC session (phone `+99966XXXXX`, no real SIM required); `config/telegram-test.toml` template for Test DC bot; optional `.github/workflows/telegram-e2e.yml` gated by `ZEPH_TELEGRAM_TEST_TOKEN`/`TELEGRAM_TEST_SESSION` secrets, runs on push to `main` only; `*.session` added to `.gitignore` - test(cost): add unit test for `max_daily_cents = 0.0` unlimited budget behavior — `CostTracker::check_budget()` must return `Ok(())` regardless of spend when the daily limit is zero (#2110) - chore(testing): add canonical `config/testing.toml` with `provider = "router"` to enable RAPS/reputation scoring in CI sessions (#2104) — previously `.local/config/testing.toml` used `provider = "openai"` which silently ignored `[llm.router]` and `[llm.router.reputation]`; the new tracked reference config uses `provider = "router"` with `chain = ["openai"]` keeping identical LLM behavior while activating RAPS; copy to `.local/config/testing.toml` before use - test(memory): add unit tests for `SqliteStore` tier DB methods (#2094) — covers `fetch_tiers`, `count_messages_by_tier`, `find_promotion_candidates`, `manual_promote`, `promote_to_semantic`, and migration 042 schema defaults; 29 new tests across happy path, edge cases (empty input, already-promoted rows, soft-deleted rows, nonexistent IDs), and idempotency invariants diff --git a/config/telegram-test.toml b/config/telegram-test.toml new file mode 100644 index 00000000..73114708 --- /dev/null +++ b/config/telegram-test.toml @@ -0,0 +1,71 @@ +# Zeph configuration for Telegram Test DC E2E testing. +# This template uses placeholder values — copy and fill before use: +# +# cp config/telegram-test.toml .local/config/telegram-test.toml +# +# Then store the bot token in the vault: +# cargo run --features full -- vault set ZEPH_TELEGRAM_TEST_TOKEN '' +# +# Run Zeph with this config: +# cargo run --features full -- --config .local/config/telegram-test.toml --channel telegram \ +# 2>.local/testing/debug/telegram-session.log +# +# Notes: +# - The bot token is obtained from @BotFather on the Telegram Test DC. +# Connect to Test DC in Telegram (test.t.me or via a test-mode client), +# find @BotFather, run /newbot. The token is valid only on Test DC. +# - allowed_users must contain the Telegram username of the Telethon test account +# created with setup_tg_test_account.py. + +[agent] +name = "ZephTest" +max_tool_iterations = 10 +auto_update_check = false + +[llm] +provider = "openai" +base_url = "https://api.openai.com/v1" +model = "gpt-4o-mini" +max_tokens = 4096 + +[skills] +paths = [".local/testing/skills"] +max_active_skills = 5 +prompt_mode = "auto" + +[memory] +sqlite_path = ".local/testing/data/zeph-telegram-test.db" +history_limit = 50 + +[telegram] +# Token from @BotFather on Telegram Test DC (NOT production BotFather) +token = { vault = "ZEPH_TELEGRAM_TEST_TOKEN" } +# Replace with the Telegram username of your test account (created by setup_tg_test_account.py) +allowed_users = ["your_test_username"] + +[tools] +enabled = true +summarize_output = true + +[tools.shell] +timeout = 30 +blocked_commands = [] +allowed_commands = [] +allowed_paths = [] +allow_network = true + +[vault] +backend = "age" + +[debug] +enabled = true +output_dir = ".local/testing/debug" +format = "raw" + +[security] +redact_secrets = true +autonomy_level = "supervised" + +[cost] +enabled = true +max_daily_cents = 100 diff --git a/scripts/telegram-e2e/requirements.txt b/scripts/telegram-e2e/requirements.txt new file mode 100644 index 00000000..7b33fbae --- /dev/null +++ b/scripts/telegram-e2e/requirements.txt @@ -0,0 +1 @@ +telethon>=1.36 diff --git a/scripts/telegram-e2e/setup_tg_test_account.py b/scripts/telegram-e2e/setup_tg_test_account.py new file mode 100644 index 00000000..53ada233 --- /dev/null +++ b/scripts/telegram-e2e/setup_tg_test_account.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python3 +"""One-time setup: register a Telegram Test DC user account and save the session. + +Run once before using telegram_e2e.py. Produces test_session.session (gitignored). + +Usage: + pip install telethon + python3 scripts/telegram-e2e/setup_tg_test_account.py \\ + --api-id --api-hash \\ + --phone +99966XXXXX --session .local/testing/test_session.session + +Obtaining API credentials: + Visit https://my.telegram.org → Log In → API development tools → Create app. + Use Test DC credentials: set test mode in my.telegram.org before creating the app, + or obtain a separate API_ID/API_HASH for test servers. + +Test DC phone numbers: + Any number in +99966XXXXX format works on Test DC (e.g. +9996612345). + The OTP code is always the last 5 digits repeated (e.g. phone +9996612345 → OTP 12345). + No real SIM needed — Test DC is an isolated Telegram server for developers. + +Second account (for unauthorized-user scenario): + Run again with a different phone and --session .local/testing/test_session2.session +""" + +import argparse +import os +import sys + +try: + from telethon.sync import TelegramClient +except ImportError: + print("telethon not installed. Run: pip install telethon", file=sys.stderr) + sys.exit(1) + +# Telegram Test DC server address +TEST_DC_HOST = "149.154.167.40" +TEST_DC_PORT = 443 + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Create Telegram Test DC session for E2E testing" + ) + parser.add_argument( + "--api-id", + type=int, + default=int(os.environ.get("TG_API_ID", "0")) or None, + required=not os.environ.get("TG_API_ID"), + help="Telegram API ID from my.telegram.org", + ) + parser.add_argument( + "--api-hash", + default=os.environ.get("TG_API_HASH"), + required=not os.environ.get("TG_API_HASH"), + help="Telegram API hash from my.telegram.org", + ) + parser.add_argument( + "--phone", + required=True, + help="Test DC phone number (+99966XXXXX format)", + ) + parser.add_argument( + "--session", + default=os.environ.get("TG_SESSION_PATH", ".local/testing/test_session.session"), + help="Path to save the session file (default: .local/testing/test_session.session)", + ) + args = parser.parse_args() + + if not args.phone.startswith("+99966"): + print( + "WARNING: Test DC phone numbers use +99966XXXXX format.\n" + "Using a real phone number will connect to production Telegram, not Test DC.", + file=sys.stderr, + ) + + session_dir = os.path.dirname(args.session) + if session_dir: + os.makedirs(session_dir, exist_ok=True) + + # Strip .session suffix — Telethon appends it automatically + session_name = args.session.removesuffix(".session") + + print(f"Connecting to Telegram Test DC ({TEST_DC_HOST}:{TEST_DC_PORT})...") + print(f"Session will be saved to: {session_name}.session") + + client = TelegramClient( + session_name, + args.api_id, + args.api_hash, + server=(TEST_DC_HOST, TEST_DC_PORT), + ) + + # start() prompts for OTP interactively + client.start(phone=args.phone) + + me = client.get_me() + print(f"\nAuthenticated as: {me.first_name} (@{me.username or 'no username'})") + print(f"Session saved: {session_name}.session") + print("\nNext step: register a bot on Test DC via @BotFather (test server):") + print(" 1. In Telegram, go to @BotFather and /newbot") + print(" 2. Store the token: cargo run --features full -- vault set ZEPH_TELEGRAM_TEST_TOKEN ''") + print(" 3. Run: python3 scripts/telegram-e2e/telegram_e2e.py --help") + + client.disconnect() + + +if __name__ == "__main__": + main() diff --git a/scripts/telegram-e2e/telegram_e2e.py b/scripts/telegram-e2e/telegram_e2e.py new file mode 100644 index 00000000..70df06f6 --- /dev/null +++ b/scripts/telegram-e2e/telegram_e2e.py @@ -0,0 +1,429 @@ +#!/usr/bin/env python3 +"""Telegram E2E test suite for Zeph using Telethon and Telegram Test DC. + +Connects to Telegram Test DC as a user account, sends scripted prompts to the +Zeph bot, and asserts on bot replies. Exits non-zero on any failure. + +Prerequisites: + pip install telethon + python3 scripts/telegram-e2e/setup_tg_test_account.py ... # one-time setup + +Usage: + python3 scripts/telegram-e2e/telegram_e2e.py \\ + --api-id --api-hash \\ + --bot-username @YourZephTestBot \\ + --session .local/testing/test_session.session + +Environment variables (alternative to CLI flags): + TG_API_ID Telegram API ID + TG_API_HASH Telegram API hash + TG_BOT_USERNAME Bot username (with or without @) + TG_SESSION_PATH Path to .session file (default: .local/testing/test_session.session) + TG_SESSION_PATH_2 Second session for unauthorized-user test (optional) +""" + +import argparse +import asyncio +import io +import os +import sys +import time +from typing import Optional + +try: + from telethon import TelegramClient, events +except ImportError: + print("telethon not installed. Run: pip install telethon", file=sys.stderr) + sys.exit(1) + +# Telegram Test DC server (isolated from production) +TEST_DC_HOST = "149.154.167.40" +TEST_DC_PORT = 443 + +# Minimal valid 1×1 white PNG used as a document with no text to trigger +# the empty-message filter in TelegramChannel (text="" && attachments=[]) +_TINY_PNG = bytes([ + 0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A, + 0x00, 0x00, 0x00, 0x0D, 0x49, 0x48, 0x44, 0x52, + 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, + 0x08, 0x02, 0x00, 0x00, 0x00, 0x90, 0x77, 0x53, + 0xDE, 0x00, 0x00, 0x00, 0x0C, 0x49, 0x44, 0x41, + 0x54, 0x08, 0xD7, 0x63, 0xF8, 0xCF, 0xC0, 0x00, + 0x00, 0x00, 0x02, 0x00, 0x01, 0xE2, 0x21, 0xBC, + 0x33, 0x00, 0x00, 0x00, 0x00, 0x49, 0x45, 0x4E, + 0x44, 0xAE, 0x42, 0x60, 0x82, +]) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _result(name: str, passed: bool, detail: str = "") -> bool: + status = "PASS" if passed else "FAIL" + suffix = f": {detail}" if detail else "" + print(f"[{status}] {name}{suffix}") + return passed + + +async def _wait_for_reply( + client: TelegramClient, + bot: str, + timeout: float, +) -> Optional[str]: + """Register a one-shot handler, send nothing, await first reply.""" + loop = asyncio.get_event_loop() + future: asyncio.Future[str] = loop.create_future() + + @client.on(events.NewMessage(from_users=bot)) + async def _handler(event: events.NewMessage.Event) -> None: + if not future.done(): + future.set_result(event.message.text or "") + + try: + return await asyncio.wait_for(future, timeout=timeout) + except asyncio.TimeoutError: + return None + finally: + client.remove_event_handler(_handler) + + +async def _send_and_wait( + client: TelegramClient, + bot: str, + text: str, + timeout: float = 30.0, +) -> Optional[str]: + """Send *text* to *bot* and return the first reply text, or None on timeout.""" + loop = asyncio.get_event_loop() + future: asyncio.Future[str] = loop.create_future() + + @client.on(events.NewMessage(from_users=bot)) + async def _handler(event: events.NewMessage.Event) -> None: + if not future.done(): + future.set_result(event.message.text or "") + + await client.send_message(bot, text) + try: + return await asyncio.wait_for(future, timeout=timeout) + except asyncio.TimeoutError: + return None + finally: + client.remove_event_handler(_handler) + + +async def _send_and_collect( + client: TelegramClient, + bot: str, + text: str, + first_timeout: float = 60.0, + idle_after: float = 12.0, + max_messages: int = 20, +) -> list[str]: + """Send *text* and collect all bot replies until idle_after seconds of silence. + + Also tracks message edits (streaming updates), keeping the last text per message. + """ + replies: list[str] = [] + first_arrived = asyncio.Event() + last_activity: list[float] = [0.0] + + @client.on(events.NewMessage(from_users=bot)) + async def _on_new(event: events.NewMessage.Event) -> None: + replies.append(event.message.text or "") + last_activity[0] = time.monotonic() + first_arrived.set() + + @client.on(events.MessageEdited(from_users=bot)) + async def _on_edit(event: events.MessageEdited.Event) -> None: + if replies: + replies[-1] = event.message.text or "" + last_activity[0] = time.monotonic() + + await client.send_message(bot, text) + + try: + await asyncio.wait_for(first_arrived.wait(), timeout=first_timeout) + except asyncio.TimeoutError: + pass + finally: + # Continue draining until idle + pass + + start_idle = time.monotonic() + while True: + await asyncio.sleep(0.5) + if len(replies) >= max_messages: + break + since_last = time.monotonic() - last_activity[0] + if first_arrived.is_set() and since_last >= idle_after: + break + if not first_arrived.is_set() and (time.monotonic() - start_idle) > first_timeout: + break + + client.remove_event_handler(_on_new) + client.remove_event_handler(_on_edit) + return replies + + +# --------------------------------------------------------------------------- +# Scenarios +# --------------------------------------------------------------------------- + +async def scenario_startup(client: TelegramClient, bot: str) -> bool: + """Send /start; assert reply contains 'Welcome'.""" + reply = await _send_and_wait(client, bot, "/start", timeout=20.0) + excerpt = repr(reply[:80]) if reply else "TIMEOUT" + return _result("startup", reply is not None and "elcome" in reply, excerpt) + + +async def scenario_reset(client: TelegramClient, bot: str) -> bool: + """/reset must elicit any reply (or silently reset context).""" + reply = await _send_and_wait(client, bot, "/reset", timeout=20.0) + excerpt = repr(reply[:80]) if reply else "TIMEOUT (acceptable — context reset silently)" + # A timeout is acceptable: the channel may reset without replying + return _result("reset", True, excerpt) + + +async def scenario_skills(client: TelegramClient, bot: str) -> bool: + """/skills must return a non-empty reply without MarkdownV2 parse errors. + + MarkdownV2 errors would surface as Telegram API exceptions; a successful + reply means the bot's markdown_to_telegram() escaped the output correctly. + """ + reply = await _send_and_wait(client, bot, "/skills", timeout=30.0) + ok = reply is not None and len(reply) > 0 + excerpt = repr(reply[:80]) if reply else "TIMEOUT" + return _result("skills", ok, excerpt) + + +async def scenario_math(client: TelegramClient, bot: str) -> bool: + """Math prompt must produce a reply containing 30,883 (or 30883).""" + reply = await _send_and_wait(client, bot, "What is 347 * 89?", timeout=60.0) + ok = reply is not None and ("30,883" in reply or "30883" in reply) + excerpt = repr(reply[:120]) if reply else "TIMEOUT" + return _result("math", ok, excerpt) + + +async def scenario_empty_msg(client: TelegramClient, bot: str) -> bool: + """A document with no text/caption must produce NO reply within 5s. + + The TelegramChannel drops messages where text.is_empty() && attachments.is_empty(). + Sending a PNG as a raw document (force_document=True, no caption) is neither + a photo nor an audio attachment, so it reaches the empty-message filter. + """ + loop = asyncio.get_event_loop() + future: asyncio.Future[str] = loop.create_future() + + @client.on(events.NewMessage(from_users=bot)) + async def _handler(event: events.NewMessage.Event) -> None: + if not future.done(): + future.set_result(event.message.text or "") + + await client.send_file( + bot, + io.BytesIO(_TINY_PNG), + force_document=True, + # No caption — must trigger empty-message filter + ) + try: + reply = await asyncio.wait_for(future, timeout=5.0) + client.remove_event_handler(_handler) + return _result("empty_msg", False, f"unexpected reply: {repr(reply[:60])}") + except asyncio.TimeoutError: + client.remove_event_handler(_handler) + return _result("empty_msg", True, "no reply within 5s") + + +async def scenario_long_output(client: TelegramClient, bot: str) -> bool: + """A prompt that forces >4096 chars of output must split into ≥2 messages.""" + prompt = ( + "List every prime number between 1 and 5000 in this exact format: " + "': ' (e.g. '1: 2', '2: 3', ...), one per line. " + "Output ONLY the list with no preamble, no summary, no extra text." + ) + replies = await _send_and_collect( + client, bot, prompt, first_timeout=90.0, idle_after=15.0, max_messages=10 + ) + ok = len(replies) >= 2 + excerpt = ( + f"{len(replies)} message(s), first={repr(replies[0][:40]) if replies else 'none'}" + ) + return _result("long_output", ok, excerpt) + + +async def scenario_streaming(client: TelegramClient, bot: str) -> bool: + """Long-form prompt must produce a reply within 30s (streaming first chunk). + + Full assertion: the bot sends an initial chunk early (not after the whole + response is ready). We verify by checking latency to first message < 30s + and that the final reply is non-empty. Edit events are counted as a + best-effort indicator of intermediate streaming updates. + """ + prompt = ( + "Explain in detail how a Rust async executor works, covering: " + "the Waker mechanism, task queues, the poll lifecycle, " + "cooperative scheduling, and how Tokio implements multi-threading. " + "Be thorough — at least 800 words." + ) + first_time: list[Optional[float]] = [None] + edit_count: list[int] = [0] + send_time = time.monotonic() + + @client.on(events.NewMessage(from_users=bot)) + async def _on_new(event: events.NewMessage.Event) -> None: + if first_time[0] is None: + first_time[0] = time.monotonic() + + @client.on(events.MessageEdited(from_users=bot)) + async def _on_edit(_event: events.MessageEdited.Event) -> None: + edit_count[0] += 1 + + await client.send_message(bot, prompt) + + # Wait up to 90s; stop early once first message arrived + 20s idle + last_activity = [time.monotonic()] + deadline = send_time + 90.0 + while time.monotonic() < deadline: + await asyncio.sleep(1.0) + if first_time[0] is not None: + if (time.monotonic() - max(first_time[0], send_time + 5.0)) > 20.0: + break + + client.remove_event_handler(_on_new) + client.remove_event_handler(_on_edit) + + appeared = first_time[0] is not None + latency = (first_time[0] - send_time) if appeared else None + latency_str = f"{latency:.1f}s" if latency is not None else "never" + detail = f"first_msg={latency_str}, edits={edit_count[0]}" + return _result("streaming", appeared and latency is not None and latency < 30.0, detail) + + +async def scenario_unauthorized( + client2: Optional[TelegramClient], bot: str +) -> bool: + """A message from an account NOT in allowed_users must produce no reply within 10s.""" + if client2 is None: + print("[SKIP] unauthorized: TG_SESSION_PATH_2 not set — skipping") + return True + + loop = asyncio.get_event_loop() + future: asyncio.Future[str] = loop.create_future() + + @client2.on(events.NewMessage(from_users=bot)) + async def _handler(event: events.NewMessage.Event) -> None: + if not future.done(): + future.set_result(event.message.text or "") + + await client2.send_message(bot, "Hello from unauthorized account") + try: + reply = await asyncio.wait_for(future, timeout=10.0) + client2.remove_event_handler(_handler) + return _result("unauthorized", False, f"unexpected reply: {repr(reply[:60])}") + except asyncio.TimeoutError: + client2.remove_event_handler(_handler) + return _result("unauthorized", True, "no reply within 10s") + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + +async def _run(args: argparse.Namespace) -> int: + session = args.session.removesuffix(".session") + client = TelegramClient( + session, + args.api_id, + args.api_hash, + server=(TEST_DC_HOST, TEST_DC_PORT), + ) + await client.start() + + client2: Optional[TelegramClient] = None + if args.session2: + session2 = args.session2.removesuffix(".session") + client2 = TelegramClient( + session2, + args.api_id, + args.api_hash, + server=(TEST_DC_HOST, TEST_DC_PORT), + ) + await client2.start() + + bot = args.bot_username + if not bot.startswith("@"): + bot = f"@{bot}" + + print(f"Running Zeph Telegram E2E against {bot}\n") + + # Reset conversation state before running scenarios + if not args.no_reset: + print("Resetting conversation state (/reset)...") + await _send_and_wait(client, bot, "/reset", timeout=10.0) + await asyncio.sleep(1.0) + + results: list[bool] = [] + + results.append(await scenario_startup(client, bot)) + results.append(await scenario_reset(client, bot)) + results.append(await scenario_skills(client, bot)) + results.append(await scenario_math(client, bot)) + results.append(await scenario_empty_msg(client, bot)) + results.append(await scenario_long_output(client, bot)) + results.append(await scenario_streaming(client, bot)) + results.append(await scenario_unauthorized(client2, bot)) + + await client.disconnect() + if client2: + await client2.disconnect() + + passed = sum(1 for r in results if r) + total = len(results) + print(f"\n{passed}/{total} scenarios passed") + return 0 if all(results) else 1 + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Zeph Telegram E2E test suite (Telethon + Test DC)" + ) + parser.add_argument( + "--api-id", + type=int, + default=int(os.environ.get("TG_API_ID", "0")) or None, + required=not os.environ.get("TG_API_ID"), + ) + parser.add_argument( + "--api-hash", + default=os.environ.get("TG_API_HASH"), + required=not os.environ.get("TG_API_HASH"), + ) + parser.add_argument( + "--bot-username", + default=os.environ.get("TG_BOT_USERNAME"), + required=not os.environ.get("TG_BOT_USERNAME"), + help="Bot username to test against (e.g. @ZephTestBot)", + ) + parser.add_argument( + "--session", + default=os.environ.get("TG_SESSION_PATH", ".local/testing/test_session.session"), + help="Path to the Telethon session file", + ) + parser.add_argument( + "--session2", + default=os.environ.get("TG_SESSION_PATH_2"), + help="Second session for unauthorized-user scenario (optional)", + ) + parser.add_argument( + "--no-reset", + action="store_true", + help="Skip /reset before running scenarios", + ) + args = parser.parse_args() + + sys.exit(asyncio.run(_run(args))) + + +if __name__ == "__main__": + main()