From aca51e1e19e8a49110c013a1225d3d219a520749 Mon Sep 17 00:00:00 2001 From: Anton Sychev Date: Wed, 25 Mar 2026 19:49:13 +0100 Subject: [PATCH 1/2] =?UTF-8?q?docs(installation):=20mejorar=20instruccion?= =?UTF-8?q?es=20y=20estructura=20de=20instalaci=C3=B3n?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Reorganizar la sección de instalación con métodos y comandos más claros - Añadir opciones Docker con detalles para instalación manual y una línea - Incluir comandos útiles de Docker y PM2 para despliegue y gestión - Añadir sección para despliegue local con validaciones y pasos detallados - Incluir instrucciones completas para uso de PM2 con autoarranque y archivo ecosystem - Mejorar formato y lenguaje para mayor claridad en ejemplos y descripciones - Añadir encabezados y separadores para facilitar la lectura y navegación --- README.md | 208 ++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 154 insertions(+), 54 deletions(-) diff --git a/README.md b/README.md index 91163c2..ba1157d 100644 --- a/README.md +++ b/README.md @@ -34,34 +34,45 @@ The app runs as a Next.js service and stores runtime state on disk (`./data`). - Code of conduct: [CODE_OF_CONDUCT.md](./CODE_OF_CONDUCT.md) - Security policy: [SECURITY.md](./SECURITY.md) -## Installation (All Supported Paths) +## Installation -| Path | Best for | Command | +Choose the deployment method that best fits your needs: + +| Method | Best For | Command | | --- | --- | --- | -| One-command installer | Fastest setup, Docker-first | `curl -fsSL https://raw.githubusercontent.com/eggent-ai/eggent/main/scripts/install.sh \| bash` | -| Local production | Run directly on your machine (Node + npm) | `npm run setup:local` | -| Docker isolated | Containerized runtime | `npm run setup:docker` | -| Manual setup | Full control | see [Manual Setup](#manual-setup) | +| **Docker** (One-command) | Fastest setup, VPS, production | `curl -fsSL https://raw.githubusercontent.com/eggent-ai/eggent/main/scripts/install.sh \| bash` | +| **Docker** (Manual) | Containerized runtime, full control | `npm run setup:docker` | +| **Local/Node.js** | Run directly on your machine | `npm run setup:local` | +| **Development** | Active development, hot reload | `npm run dev` | + +--- + +## Docker Deployment + +### Option A: One-command Installer (Recommended) -## 1) One-command Installer +The fastest way to get Eggent running, especially on VPS: ```bash curl -fsSL https://raw.githubusercontent.com/eggent-ai/eggent/main/scripts/install.sh | bash ``` What it does: -- installs Docker (best-effort on macOS/Linux) if missing -- clones/updates Eggent in `~/.eggent` -- runs Docker deployment via `scripts/install-docker.sh` +- Installs Docker (best-effort on macOS/Linux) if missing +- Clones/updates Eggent in `~/.eggent` +- Runs Docker deployment via `scripts/install-docker.sh` -Installer environment variables: -- `EGGENT_INSTALL_DIR`: target directory (default: `~/.eggent`) -- `EGGENT_BRANCH`: git branch (default: `main`) -- `EGGENT_REPO_URL`: git repo URL (default: `https://github.com/eggent-ai/eggent.git`) -- `EGGENT_AUTO_INSTALL_DOCKER`: `1`/`0` (default: `1`) -- `EGGENT_APP_BIND_HOST`: Docker published bind host (`Linux default: 0.0.0.0`, otherwise `127.0.0.1`) +**Environment variables:** -Example: +| Variable | Default | Description | +| --- | --- | --- | +| `EGGENT_INSTALL_DIR` | `~/.eggent` | Target directory | +| `EGGENT_BRANCH` | `main` | Git branch to use | +| `EGGENT_REPO_URL` | `https://github.com/eggent-ai/eggent.git` | Repository URL | +| `EGGENT_AUTO_INSTALL_DOCKER` | `1` | Auto-install Docker if missing | +| `EGGENT_APP_BIND_HOST` | `0.0.0.0` (Linux) / `127.0.0.1` | Docker bind host | + +Example with custom options: ```bash EGGENT_INSTALL_DIR=~/apps/eggent \ @@ -70,24 +81,55 @@ EGGENT_AUTO_INSTALL_DOCKER=1 \ curl -fsSL https://raw.githubusercontent.com/eggent-ai/eggent/main/scripts/install.sh | bash ``` -On Linux (including VPS installs), the one-command installer publishes app port on all interfaces by default, so app is reachable at `http://:3000`. +On Linux (including VPS installs), the one-command installer publishes the app port on all interfaces by default, making it reachable at `http://:3000`. -## 2) Local Production Setup (Node + npm) +### Option B: Manual Docker Setup + +If you already have the repository cloned: + +```bash +npm run setup:docker +``` + +This script: +- Validates Docker + Compose +- Prepares `.env` and `data/` +- Builds image and starts container +- Waits for `GET /api/health` to succeed + +**Useful Docker commands:** + +```bash +docker compose logs -f app # View logs +docker compose restart app # Restart app +docker compose down # Stop and remove +docker compose up -d app # Start in background +``` + +Open: `http://localhost:3000` + +--- + +## Local/Node.js Deployment + +Run Eggent directly on your machine with Node.js: + +### Quick Start ```bash npm run setup:local ``` This script: -- validates Node/npm availability -- validates `python3` availability (required for Code Execution with Python runtime) -- validates `curl` availability (required for terminal commands like `curl ...`) -- warns if recommended utilities are missing: `git`, `jq`, `pip3`, `rg` -- creates `.env` from `.env.example` if needed -- generates secure defaults for token placeholders -- installs dependencies -- builds production output -- runs a health smoke-check +- Validates Node/npm availability +- Validates `python3` availability (required for Code Execution with Python runtime) +- Validates `curl` availability (required for terminal commands) +- Warns if recommended utilities are missing: `git`, `jq`, `pip3`, `rg` +- Creates `.env` from `.env.example` if needed +- Generates secure defaults for token placeholders +- Installs dependencies +- Builds production output +- Runs a health smoke-check Start the app: @@ -95,54 +137,112 @@ Start the app: npm run start ``` -Open: -- `http://localhost:3000` +Open: `http://localhost:3000` -## 3) Docker Isolated Setup +### PM2 Auto-start (Optional - Linux/macOS) + +For production deployments with auto-restart on boot, use PM2: + +**1. Install PM2 globally:** ```bash -npm run setup:docker +npm install -g pm2 ``` -This script: -- validates Docker + Compose -- prepares `.env` and `data/` -- builds image and starts container -- waits for `GET /api/health` to succeed +**2. Start Eggent with PM2:** -Open: -- `http://localhost:3000` +```bash +pm2 start npm --name eggent -- run start +``` -Useful Docker commands: +**3. Save PM2 configuration:** ```bash -docker compose logs -f app -docker compose restart app -docker compose down +pm2 save ``` -## 4) Manual Setup +**4. Setup systemd auto-start:** ```bash -cp .env.example .env -# ensure python3 is installed and available in PATH -npm install -npm run build -npm run start +pm2 startup systemd ``` -Open: -- `http://localhost:3000` +Copy and execute the command output (requires sudo). -## 5) Development Mode +**5. Verify auto-start works:** + +```bash +sudo reboot +# After reboot: +pm2 status +``` + +**Alternative: Using ecosystem file** + +Create `ecosystem.config.js` in your Eggent directory: + +```javascript +module.exports = { + apps: [ + { + name: "eggent", + cwd: "/home/YOUR_USERNAME/.eggent", + script: "npm", + args: "run start", + autorestart: true, + watch: false, + env: { + NODE_ENV: "production" + } + } + ] +} +``` + +Then start with: + +```bash +pm2 start ecosystem.config.js +pm2 save +pm2 startup systemd +``` + +**Common PM2 commands:** + +```bash +pm2 status # Check status +pm2 logs eggent # View logs +pm2 restart eggent # Restart app +pm2 stop eggent # Stop app +pm2 delete eggent # Remove from PM2 +``` + +--- + +## Development Mode + +For active development with hot reload: ```bash npm install npm run dev ``` -Open: -- `http://localhost:3000` +Open: `http://localhost:3000` + +### Manual Setup (Full Control) + +If you prefer complete manual control: + +```bash +cp .env.example .env +# Ensure python3 is installed and available in PATH +npm install +npm run build +npm run start +``` + +Open: `http://localhost:3000` ## Updating Eggent From 292bb62ed5049930be9671ba9e30eea2e0e3f5e9 Mon Sep 17 00:00:00 2001 From: Anton Sychev Date: Wed, 25 Mar 2026 21:05:14 +0100 Subject: [PATCH 2/2] refactor(telegram): Simplify and modularize the handling of Telegram updates MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Removal of complex internal functions and previously integrated helper modules - Centralized integration of update processing via `processTelegramUpdate` - Reduced code for direct handling of sessions, files, commands, and messages - Improved error handling with rollback for duplicate or failed updates - Updated the React TelegramIntegrationManager component to support connection modes - Added support for “auto,” “webhook,” and “polling” modes with control and status in the interface - Implementation of automatic mode detection based on public URL or local environment - UI improvements for configuration and connection statuses, including polling and webhook - Initialization of the Telegram lifecycle (polling or webhook) upon server startup - Updated documentation and text to reflect new integration and usage options --- .../integrations/telegram/polling/route.ts | 104 +++ src/app/api/integrations/telegram/route.ts | 688 +---------------- src/app/dashboard/api/page.tsx | 39 +- src/app/layout.tsx | 8 + .../telegram-integration-manager.tsx | 640 ++++++++++------ src/lib/storage/telegram-integration-store.ts | 91 +++ src/lib/telegram/polling-lifecycle.ts | 149 ++++ src/lib/telegram/polling-service.ts | 216 ++++++ src/lib/telegram/telegram-message-handler.ts | 711 ++++++++++++++++++ 9 files changed, 1741 insertions(+), 905 deletions(-) create mode 100644 src/app/api/integrations/telegram/polling/route.ts create mode 100644 src/lib/telegram/polling-lifecycle.ts create mode 100644 src/lib/telegram/polling-service.ts create mode 100644 src/lib/telegram/telegram-message-handler.ts diff --git a/src/app/api/integrations/telegram/polling/route.ts b/src/app/api/integrations/telegram/polling/route.ts new file mode 100644 index 0000000..5cb1209 --- /dev/null +++ b/src/app/api/integrations/telegram/polling/route.ts @@ -0,0 +1,104 @@ +import { NextRequest } from "next/server"; +import { + getTelegramIntegrationRuntimeConfig, + detectTelegramMode, +} from "@/lib/storage/telegram-integration-store"; +import { telegramPollingService } from "@/lib/telegram/polling-service"; + +export const maxDuration = 300; + +export async function GET() { + const runtime = await getTelegramIntegrationRuntimeConfig(); + const detectedMode = detectTelegramMode(runtime); + + return Response.json({ + status: "ok", + polling: telegramPollingService.status, + config: { + mode: runtime.mode, + detectedMode, + canStartPolling: !!runtime.botToken && detectedMode === "polling", + }, + }); +} + +export async function POST(req: NextRequest) { + try { + const runtime = await getTelegramIntegrationRuntimeConfig(); + const detectedMode = detectTelegramMode(runtime); + + if (!runtime.botToken.trim()) { + return Response.json( + { error: "Telegram bot token is not configured" }, + { status: 503 } + ); + } + + // Only allow polling if detected mode is polling or user explicitly forces it + const body = (await req.json().catch(() => ({}))) as { force?: boolean }; + const force = body.force === true; + + if (detectedMode === "webhook" && !force) { + return Response.json( + { + error: "Detected mode is webhook. Use force=true to start polling anyway.", + detectedMode, + }, + { status: 400 } + ); + } + + if (telegramPollingService.status.isRunning) { + return Response.json( + { + error: "Polling is already running", + polling: telegramPollingService.status, + }, + { status: 409 } + ); + } + + await telegramPollingService.start(runtime); + + return Response.json({ + ok: true, + message: "Polling started", + polling: telegramPollingService.status, + }); + } catch (error) { + console.error("[Telegram Polling API] Error starting polling:", error); + return Response.json( + { + error: error instanceof Error ? error.message : "Failed to start polling", + }, + { status: 500 } + ); + } +} + +export async function DELETE() { + try { + if (!telegramPollingService.status.isRunning) { + return Response.json( + { error: "Polling is not running" }, + { status: 409 } + ); + } + + telegramPollingService.stop(); + + return Response.json({ + ok: true, + message: "Polling stopped", + polling: telegramPollingService.status, + }); + } catch (error) { + console.error("[Telegram Polling API] Error stopping polling:", error); + return Response.json( + { + error: error instanceof Error ? error.message : "Failed to stop polling", + }, + { status: 500 } + ); + } +} diff --git a/src/app/api/integrations/telegram/route.ts b/src/app/api/integrations/telegram/route.ts index 542835c..7399de9 100644 --- a/src/app/api/integrations/telegram/route.ts +++ b/src/app/api/integrations/telegram/route.ts @@ -1,136 +1,12 @@ import { NextRequest } from "next/server"; import { timingSafeEqual } from "node:crypto"; import { - ExternalMessageError, - handleExternalMessage, -} from "@/lib/external/handle-external-message"; -import { - createDefaultTelegramSessionId, - createFreshTelegramSessionId, - getTelegramChatSessionId, - setTelegramChatSessionId, -} from "@/lib/storage/telegram-session-store"; -import { - claimTelegramUpdate, - releaseTelegramUpdate, -} from "@/lib/storage/telegram-update-store"; -import { - consumeTelegramAccessCode, getTelegramIntegrationRuntimeConfig, - normalizeTelegramUserId, } from "@/lib/storage/telegram-integration-store"; -import { saveChatFile } from "@/lib/storage/chat-files-store"; -import { createChat, getChat } from "@/lib/storage/chat-store"; import { - contextKey, - type ExternalSession, - getOrCreateExternalSession, - saveExternalSession, -} from "@/lib/storage/external-session-store"; -import { getAllProjects } from "@/lib/storage/project-store"; - -const TELEGRAM_TEXT_LIMIT = 4096; -const TELEGRAM_FILE_MAX_BYTES = 30 * 1024 * 1024; - -interface TelegramUpdate { - update_id?: unknown; - message?: TelegramMessage; -} - -interface TelegramMessage { - message_id?: unknown; - text?: unknown; - caption?: unknown; - from?: { - id?: unknown; - }; - chat?: { - id?: unknown; - type?: unknown; - }; - document?: { - file_id?: unknown; - file_name?: unknown; - mime_type?: unknown; - }; - photo?: Array<{ - file_id?: unknown; - width?: unknown; - height?: unknown; - }>; - audio?: { - file_id?: unknown; - file_name?: unknown; - mime_type?: unknown; - }; - video?: { - file_id?: unknown; - file_name?: unknown; - mime_type?: unknown; - }; - voice?: { - file_id?: unknown; - mime_type?: unknown; - }; -} - -interface TelegramApiResponse { - ok?: boolean; - description?: string; - result?: Record; -} - -interface TelegramIncomingFile { - fileId: string; - fileName: string; -} - -interface TelegramExternalChatContext { - chatId: string; - projectId?: string; - currentPath: string; -} - -function normalizeTelegramCurrentPath(rawPath: string | undefined): string { - const value = (rawPath ?? "").trim(); - if (!value || value === "/telegram") { - return ""; - } - return value; -} - -interface TelegramResolvedProjectContext { - session: ExternalSession; - resolvedProjectId?: string; - projectName?: string; -} - -function parseTelegramError(status: number, payload: TelegramApiResponse | null): string { - const description = payload?.description?.trim(); - return description - ? `Telegram API error (${status}): ${description}` - : `Telegram API error (${status})`; -} - -async function callTelegramApi( - botToken: string, - method: string, - body?: Record -): Promise { - const response = await fetch(`https://api.telegram.org/bot${botToken}/${method}`, { - method: body ? "POST" : "GET", - headers: body ? { "Content-Type": "application/json" } : undefined, - body: body ? JSON.stringify(body) : undefined, - }); - - const payload = (await response.json().catch(() => null)) as - | TelegramApiResponse - | null; - if (!response.ok || !payload?.ok) { - throw new Error(parseTelegramError(response.status, payload)); - } - return payload; -} + processTelegramUpdate, + type TelegramUpdate, +} from "@/lib/telegram/telegram-message-handler"; function safeTokenMatch(actual: string, expected: string): boolean { const actualBytes = Buffer.from(actual); @@ -142,334 +18,6 @@ function safeTokenMatch(actual: string, expected: string): boolean { return timingSafeEqual(actualBytes, expectedBytes); } -function getBotId(botToken: string): string { - const [rawBotId] = botToken.trim().split(":", 1); - const botId = rawBotId?.trim() || "default"; - return botId.replace(/[^a-zA-Z0-9._:-]/g, "_").slice(0, 128) || "default"; -} - -function chatBelongsToProject( - chatProjectId: string | undefined, - projectId: string | undefined -): boolean { - const left = chatProjectId ?? null; - const right = projectId ?? null; - return left === right; -} - -async function ensureTelegramExternalChatContext(params: { - sessionId: string; - defaultProjectId?: string; -}): Promise { - const { session, resolvedProjectId } = await resolveTelegramProjectContext({ - sessionId: params.sessionId, - defaultProjectId: params.defaultProjectId, - }); - const projectKey = contextKey(resolvedProjectId); - let resolvedChatId = session.activeChats[projectKey]; - if (resolvedChatId) { - const existing = await getChat(resolvedChatId); - if (!existing || !chatBelongsToProject(existing.projectId, resolvedProjectId)) { - resolvedChatId = ""; - } - } - - if (!resolvedChatId) { - resolvedChatId = crypto.randomUUID(); - await createChat( - resolvedChatId, - `External session ${session.id}`, - resolvedProjectId - ); - } - - session.activeChats[projectKey] = resolvedChatId; - session.currentPaths[projectKey] = normalizeTelegramCurrentPath( - session.currentPaths[projectKey] - ); - session.updatedAt = new Date().toISOString(); - await saveExternalSession(session); - - return { - chatId: resolvedChatId, - projectId: resolvedProjectId, - currentPath: session.currentPaths[projectKey] ?? "", - }; -} - -async function resolveTelegramProjectContext(params: { - sessionId: string; - defaultProjectId?: string; -}): Promise { - const session = await getOrCreateExternalSession(params.sessionId); - const projects = await getAllProjects(); - const projectById = new Map(projects.map((project) => [project.id, project])); - - let resolvedProjectId: string | undefined; - const explicitProjectId = params.defaultProjectId?.trim() || ""; - if (explicitProjectId) { - if (!projectById.has(explicitProjectId)) { - throw new Error(`Project "${explicitProjectId}" not found`); - } - resolvedProjectId = explicitProjectId; - session.activeProjectId = explicitProjectId; - } else if (session.activeProjectId && projectById.has(session.activeProjectId)) { - resolvedProjectId = session.activeProjectId; - } else if (projects.length > 0) { - resolvedProjectId = projects[0].id; - session.activeProjectId = projects[0].id; - } else { - session.activeProjectId = null; - } - - return { - session, - resolvedProjectId, - projectName: resolvedProjectId ? projectById.get(resolvedProjectId)?.name : undefined, - }; -} - -function extensionFromMime(mimeType: string): string { - const lower = mimeType.toLowerCase(); - if (lower.includes("pdf")) return ".pdf"; - if (lower.includes("png")) return ".png"; - if (lower.includes("jpeg") || lower.includes("jpg")) return ".jpg"; - if (lower.includes("webp")) return ".webp"; - if (lower.includes("gif")) return ".gif"; - if (lower.includes("mp4")) return ".mp4"; - if (lower.includes("mpeg") || lower.includes("mp3")) return ".mp3"; - if (lower.includes("ogg")) return ".ogg"; - if (lower.includes("wav")) return ".wav"; - if (lower.includes("plain")) return ".txt"; - return ""; -} - -function buildIncomingFileName(params: { - base: string; - messageId?: number; - mimeType?: string; -}): string { - const suffix = params.messageId ?? Date.now(); - const ext = params.mimeType ? extensionFromMime(params.mimeType) : ""; - return `${params.base}-${suffix}${ext}`; -} - -function sanitizeFileName(value: string): string { - const base = value.trim().replace(/[\\/]+/g, "_"); - return base || `file-${Date.now()}`; -} - -function withMessageIdPrefix(fileName: string, messageId?: number): string { - if (typeof messageId !== "number") return fileName; - return `${messageId}-${fileName}`; -} - -function extractIncomingFile( - message: TelegramMessage, - messageId?: number -): TelegramIncomingFile | null { - const documentFileId = - typeof message.document?.file_id === "string" - ? message.document.file_id.trim() - : ""; - if (documentFileId) { - const docNameRaw = - typeof message.document?.file_name === "string" - ? message.document.file_name - : ""; - const fallback = buildIncomingFileName({ - base: "document", - messageId, - mimeType: - typeof message.document?.mime_type === "string" - ? message.document.mime_type - : undefined, - }); - return { - fileId: documentFileId, - fileName: withMessageIdPrefix(sanitizeFileName(docNameRaw || fallback), messageId), - }; - } - - const photos: Array<{ file_id?: unknown }> = Array.isArray(message.photo) - ? message.photo - : []; - for (let i = photos.length - 1; i >= 0; i -= 1) { - const photo = photos[i]; - const fileId = typeof photo?.file_id === "string" ? photo.file_id.trim() : ""; - if (fileId) { - return { - fileId, - fileName: sanitizeFileName( - buildIncomingFileName({ base: "photo", messageId, mimeType: "image/jpeg" }) - ), - }; - } - } - - const audioFileId = - typeof message.audio?.file_id === "string" ? message.audio.file_id.trim() : ""; - if (audioFileId) { - const audioNameRaw = - typeof message.audio?.file_name === "string" ? message.audio.file_name : ""; - const fallback = buildIncomingFileName({ - base: "audio", - messageId, - mimeType: - typeof message.audio?.mime_type === "string" - ? message.audio.mime_type - : undefined, - }); - return { - fileId: audioFileId, - fileName: withMessageIdPrefix(sanitizeFileName(audioNameRaw || fallback), messageId), - }; - } - - const videoFileId = - typeof message.video?.file_id === "string" ? message.video.file_id.trim() : ""; - if (videoFileId) { - const videoNameRaw = - typeof message.video?.file_name === "string" ? message.video.file_name : ""; - const fallback = buildIncomingFileName({ - base: "video", - messageId, - mimeType: - typeof message.video?.mime_type === "string" - ? message.video.mime_type - : undefined, - }); - return { - fileId: videoFileId, - fileName: withMessageIdPrefix(sanitizeFileName(videoNameRaw || fallback), messageId), - }; - } - - const voiceFileId = - typeof message.voice?.file_id === "string" ? message.voice.file_id.trim() : ""; - if (voiceFileId) { - return { - fileId: voiceFileId, - fileName: sanitizeFileName( - buildIncomingFileName({ - base: "voice", - messageId, - mimeType: - typeof message.voice?.mime_type === "string" - ? message.voice.mime_type - : undefined, - }) - ), - }; - } - - return null; -} - -async function downloadTelegramFile(botToken: string, fileId: string): Promise { - const payload = await callTelegramApi(botToken, "getFile", { - file_id: fileId, - }); - const result = payload.result ?? {}; - const filePath = typeof result.file_path === "string" ? result.file_path : ""; - if (!filePath) { - throw new Error("Telegram getFile returned empty file_path"); - } - - const fileUrl = `https://api.telegram.org/file/bot${botToken}/${filePath}`; - const response = await fetch(fileUrl); - if (!response.ok) { - throw new Error(`Failed to download Telegram file (${response.status})`); - } - - const bytes = await response.arrayBuffer(); - if (bytes.byteLength > TELEGRAM_FILE_MAX_BYTES) { - throw new Error( - `Telegram file is too large (${bytes.byteLength} bytes). Max supported size is ${TELEGRAM_FILE_MAX_BYTES} bytes.` - ); - } - return Buffer.from(bytes); -} - -function extractCommand(text: string): string | null { - const first = text.trim().split(/\s+/, 1)[0]; - if (!first || !first.startsWith("/")) return null; - return first.split("@", 1)[0].toLowerCase(); -} - -function extractAccessCodeCandidate(text: string): string | null { - const value = text.trim(); - if (!value) return null; - - const fromCommand = value.match( - /^\/(?:code|start)(?:@[a-zA-Z0-9_]+)?\s+([A-Za-z0-9_-]{6,64})$/i - ); - if (fromCommand?.[1]) { - return fromCommand[1]; - } - - if (/^[A-Za-z0-9_-]{6,64}$/.test(value)) { - return value; - } - return null; -} - -function normalizeOutgoingText(text: string): string { - const value = text.trim(); - if (!value) return "Пустой ответ от агента."; - if (value.length <= TELEGRAM_TEXT_LIMIT) return value; - return `${value.slice(0, TELEGRAM_TEXT_LIMIT - 1)}…`; -} - -async function sendTelegramMessage( - botToken: string, - chatId: number | string, - text: string, - replyToMessageId?: number -): Promise { - const response = await fetch(`https://api.telegram.org/bot${botToken}/sendMessage`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ - chat_id: chatId, - text: normalizeOutgoingText(text), - ...(typeof replyToMessageId === "number" ? { reply_to_message_id: replyToMessageId } : {}), - }), - }); - - const payload = (await response.json().catch(() => null)) as - | { ok?: boolean; description?: string } - | null; - - if (!response.ok || !payload?.ok) { - throw new Error( - `Telegram sendMessage failed (${response.status})${payload?.description ? `: ${payload.description}` : ""}` - ); - } -} - -function helpText(activeProject?: { id?: string; name?: string }): string { - const activeProjectLine = activeProject?.id - ? `Active project: ${activeProject.name ? `${activeProject.name} (${activeProject.id})` : activeProject.id}` - : "Active project: not selected"; - return [ - "Telegram connection is active.", - activeProjectLine, - "", - "Commands:", - "/start - show this help", - "/help - show this help", - "/code - activate access for your Telegram user", - "/new - start a new conversation (reset context)", - "", - "Text messages are sent to the agent.", - "File uploads are saved into chat files.", - "You can also ask the agent to send a local file back to Telegram.", - ].join("\n"); -} - export const maxDuration = 300; export async function GET() { @@ -484,8 +32,6 @@ export async function POST(req: NextRequest) { const runtime = await getTelegramIntegrationRuntimeConfig(); const botToken = runtime.botToken.trim(); const webhookSecret = runtime.webhookSecret.trim(); - const defaultProjectId = runtime.defaultProjectId || undefined; - const allowedUserIds = new Set(runtime.allowedUserIds); if (!botToken || !webhookSecret) { return Response.json( @@ -502,235 +48,11 @@ export async function POST(req: NextRequest) { return Response.json({ error: "Unauthorized" }, { status: 401 }); } - let botIdForRollback: string | null = null; - let updateIdForRollback: number | null = null; - try { const body = (await req.json()) as TelegramUpdate; - const updateId = - typeof body.update_id === "number" && Number.isInteger(body.update_id) - ? body.update_id - : null; - if (updateId === null) { - return Response.json({ error: "Invalid update_id" }, { status: 400 }); - } - - const botId = getBotId(botToken); - botIdForRollback = botId; - updateIdForRollback = updateId; - const isNewUpdate = await claimTelegramUpdate(botId, updateId); - if (!isNewUpdate) { - return Response.json({ ok: true, duplicate: true }); - } - - const message = body.message; - const chatId = - typeof message?.chat?.id === "number" || typeof message?.chat?.id === "string" - ? message.chat.id - : null; - const chatType = typeof message?.chat?.type === "string" ? message.chat.type : ""; - const messageId = - typeof message?.message_id === "number" ? message.message_id : undefined; - - if (chatId === null || !chatType) { - return Response.json({ ok: true, ignored: true, reason: "unsupported_update" }); - } - - if (chatType !== "private") { - return Response.json({ ok: true, ignored: true, reason: "private_only" }); - } - - const text = typeof message?.text === "string" ? message.text.trim() : ""; - const caption = - typeof message?.caption === "string" ? message.caption.trim() : ""; - const incomingText = text || caption; - const fromUserId = normalizeTelegramUserId(message?.from?.id); - - if (!fromUserId) { - return Response.json({ - ok: true, - ignored: true, - reason: "missing_user_id", - }); - } - - if (!allowedUserIds.has(fromUserId)) { - const accessCode = extractAccessCodeCandidate(text); - const granted = - accessCode && - (await consumeTelegramAccessCode({ - code: accessCode, - userId: fromUserId, - })); - - if (granted) { - await sendTelegramMessage( - botToken, - chatId, - "Доступ выдан. Теперь можно отправлять сообщения агенту.", - messageId - ); - return Response.json({ - ok: true, - accessGranted: true, - userId: fromUserId, - }); - } - - await sendTelegramMessage( - botToken, - chatId, - [ - "Доступ запрещён: ваш user_id не в списке разрешённых.", - "Отправьте код активации командой /code <код> или /start <код>.", - `Ваш user_id: ${fromUserId}`, - ].join("\n"), - messageId - ); - return Response.json({ - ok: true, - ignored: true, - reason: "user_not_allowed", - userId: fromUserId, - }); - } - - let sessionId = await getTelegramChatSessionId(botId, chatId); - if (!sessionId) { - sessionId = createDefaultTelegramSessionId(botId, chatId); - await setTelegramChatSessionId(botId, chatId, sessionId); - } - - const command = extractCommand(text); - if (command === "/start" || command === "/help") { - const resolvedProject = await resolveTelegramProjectContext({ - sessionId, - defaultProjectId, - }); - await saveExternalSession({ - ...resolvedProject.session, - updatedAt: new Date().toISOString(), - }); - await sendTelegramMessage( - botToken, - chatId, - helpText({ - id: resolvedProject.resolvedProjectId, - name: resolvedProject.projectName, - }), - messageId - ); - return Response.json({ ok: true, command }); - } - - if (command === "/new") { - const freshSessionId = createFreshTelegramSessionId(botId, chatId); - await setTelegramChatSessionId(botId, chatId, freshSessionId); - await sendTelegramMessage( - botToken, - chatId, - "Начал новый диалог. Контекст очищен для следующего сообщения.", - messageId - ); - return Response.json({ ok: true, command }); - } - - let incomingSavedFile: - | { - name: string; - path: string; - size: number; - } - | null = null; - - const incomingFile = message ? extractIncomingFile(message, messageId) : null; - let externalContext: TelegramExternalChatContext | null = null; - if (incomingFile) { - externalContext = await ensureTelegramExternalChatContext({ - sessionId, - defaultProjectId, - }); - const fileBuffer = await downloadTelegramFile(botToken, incomingFile.fileId); - const saved = await saveChatFile( - externalContext.chatId, - fileBuffer, - incomingFile.fileName - ); - incomingSavedFile = { - name: saved.name, - path: saved.path, - size: saved.size, - }; - } - - if (!incomingText) { - if (incomingSavedFile) { - await sendTelegramMessage( - botToken, - chatId, - `File "${incomingSavedFile.name}" saved to chat files.`, - messageId - ); - return Response.json({ - ok: true, - fileSaved: true, - file: incomingSavedFile, - }); - } - - await sendTelegramMessage( - botToken, - chatId, - "Only text messages and file uploads are supported right now.", - messageId - ); - return Response.json({ ok: true, ignored: true, reason: "non_text" }); - } - - try { - const result = await handleExternalMessage({ - sessionId, - message: incomingSavedFile - ? `${incomingText}\n\nAttached file: ${incomingSavedFile.name}` - : incomingText, - projectId: externalContext?.projectId ?? defaultProjectId, - chatId: externalContext?.chatId, - currentPath: normalizeTelegramCurrentPath(externalContext?.currentPath), - runtimeData: { - telegram: { - botToken, - chatId, - replyToMessageId: messageId ?? null, - }, - }, - }); - - await sendTelegramMessage(botToken, chatId, result.reply, messageId); - return Response.json({ ok: true }); - } catch (error) { - if (error instanceof ExternalMessageError) { - const errorMessage = - typeof error.payload.error === "string" - ? error.payload.error - : "Не удалось обработать сообщение."; - await sendTelegramMessage(botToken, chatId, `Ошибка: ${errorMessage}`, messageId); - return Response.json({ ok: true, handledError: true, status: error.status }); - } - throw error; - } + const result = await processTelegramUpdate(body, runtime); + return Response.json(result); } catch (error) { - if ( - botIdForRollback && - typeof updateIdForRollback === "number" && - Number.isInteger(updateIdForRollback) - ) { - try { - await releaseTelegramUpdate(botIdForRollback, updateIdForRollback); - } catch (releaseError) { - console.error("Telegram rollback error:", releaseError); - } - } - console.error("Telegram webhook error:", error); return Response.json( { diff --git a/src/app/dashboard/api/page.tsx b/src/app/dashboard/api/page.tsx index 8efd3a8..a71e209 100644 --- a/src/app/dashboard/api/page.tsx +++ b/src/app/dashboard/api/page.tsx @@ -54,7 +54,7 @@ export default function ApiPage() {
-

Telegram Webhook

+

Telegram Integration

Telegram endpoint: POST /api/integrations/telegram. It reuses the same external session context engine as{" "} @@ -64,14 +64,45 @@ export default function ApiPage() { Configure credentials in Dashboard -> Messengers (bot token is enough; webhook secret/url are configured automatically).

- +

Connection Modes

+
    +
  • Webhook (default for public HTTPS URLs): Telegram pushes updates to your server. Requires a public HTTPS URL.
  • +
  • Long Polling (default for localhost): Your server periodically fetches updates from Telegram. Works without HTTPS, perfect for local development.
  • +
  • Auto (recommended): Automatically selects the best mode based on your Public Base URL configuration.
  • +
+ + +
+

Webhook Setup (Production)

+ + /> +
+ +
+

Long Polling API (Development)

+

+ When using long polling mode, control the polling service via API: +

+ +
+

Supported commands: /start,{" "} /help,{" "} diff --git a/src/app/layout.tsx b/src/app/layout.tsx index 5542cb1..873a634 100644 --- a/src/app/layout.tsx +++ b/src/app/layout.tsx @@ -2,6 +2,7 @@ import type { Metadata } from "next"; import { Geist, Geist_Mono } from "next/font/google"; import { unstable_noStore as noStore } from "next/cache"; import { getSettings } from "@/lib/storage/settings-store"; +import { initTelegramLifecycle } from "@/lib/telegram/polling-lifecycle"; import "./globals.css"; const geistSans = Geist({ @@ -19,6 +20,13 @@ export const metadata: Metadata = { description: "AI Agent Terminal - Execute code, manage memory, search the web", }; +// Initialize Telegram lifecycle (polling or webhook) on server startup +if (typeof window === "undefined") { + initTelegramLifecycle().catch((error) => { + console.error("Failed to initialize Telegram lifecycle:", error); + }); +} + export default async function RootLayout({ children, }: Readonly<{ diff --git a/src/components/telegram-integration-manager.tsx b/src/components/telegram-integration-manager.tsx index d04aa00..1c71e9d 100644 --- a/src/components/telegram-integration-manager.tsx +++ b/src/components/telegram-integration-manager.tsx @@ -1,7 +1,7 @@ "use client"; import { useCallback, useEffect, useMemo, useState } from "react"; -import { KeyRound, Loader2, Link2, RotateCcw, ShieldCheck, Trash2 } from "lucide-react"; +import { KeyRound, Loader2, Link2, ShieldCheck, Trash2, Play, Square, Radio, Globe } from "lucide-react"; import { Button } from "@/components/ui/button"; import { Input } from "@/components/ui/input"; import { Label } from "@/components/ui/label"; @@ -14,9 +14,13 @@ interface TelegramSettingsResponse { allowedUserIds: string[]; pendingAccessCodes: number; updatedAt: string | null; + mode: "auto" | "webhook" | "polling"; + pollingInterval: number; + detectedMode: "webhook" | "polling"; sources: { botToken: "stored" | "env" | "none"; webhookSecret: "stored" | "env" | "none"; + mode: "stored" | "env" | "none"; }; error?: string; } @@ -29,19 +33,24 @@ interface TelegramAccessCodeResponse { error?: string; } -interface WebhookStatusResponse { - configured: boolean; - message?: string; - webhook: { - url: string; - pendingUpdateCount: number; - lastErrorDate: number | null; - lastErrorMessage: string | null; - } | null; - error?: string; +interface PollingStatusResponse { + status: string; + polling: { + isRunning: boolean; + lastUpdateId: number | null; + lastPollTime: string | null; + errorCount: number; + consecutiveErrors: number; + }; + config: { + mode: "auto" | "webhook" | "polling"; + detectedMode: "webhook" | "polling"; + canStartPolling: boolean; + }; } type ActionState = "idle" | "loading"; +type TelegramMode = "auto" | "webhook" | "polling"; function sourceLabel(source: "stored" | "env" | "none"): string { if (source === "stored") return "stored in app"; @@ -56,6 +65,22 @@ export function TelegramIntegrationManager() { const [tokenSource, setTokenSource] = useState<"stored" | "env" | "none">( "none" ); + const [mode, setMode] = useState("auto"); + const [detectedMode, setDetectedMode] = useState<"webhook" | "polling">("polling"); + + // Helper to detect if URL is localhost/private (needs polling) or public (can use webhook) + const detectUrlMode = useCallback((url: string): "webhook" | "polling" => { + if (!url.trim()) return "polling"; + const lowerUrl = url.toLowerCase().trim(); + // Check for localhost, private IPs, or non-HTTPS + if (lowerUrl.includes("localhost")) return "polling"; + if (lowerUrl.includes("127.0.0.1")) return "polling"; + if (lowerUrl.includes("192.168.")) return "polling"; + if (lowerUrl.includes("10.0.")) return "polling"; + if (lowerUrl.includes("172.16.")) return "polling"; + if (lowerUrl.startsWith("http://")) return "polling"; + return "webhook"; + }, []); const [allowedUserIdsInput, setAllowedUserIdsInput] = useState(""); const [pendingAccessCodes, setPendingAccessCodes] = useState(0); const [generatedAccessCode, setGeneratedAccessCode] = useState(null); @@ -63,16 +88,14 @@ export function TelegramIntegrationManager() { string | null >(null); const [updatedAt, setUpdatedAt] = useState(null); - const [webhookStatus, setWebhookStatus] = useState( - null - ); + const [pollingStatus, setPollingStatus] = useState(null); const [loadingSettings, setLoadingSettings] = useState(true); const [connectState, setConnectState] = useState("idle"); - const [reconnectState, setReconnectState] = useState("idle"); const [disconnectState, setDisconnectState] = useState("idle"); const [saveAllowedUsersState, setSaveAllowedUsersState] = useState("idle"); const [generateCodeState, setGenerateCodeState] = useState("idle"); - const [webhookState, setWebhookState] = useState("idle"); + const [pollingState, setPollingState] = useState("idle"); + const [modeState, setModeState] = useState("idle"); const [error, setError] = useState(null); const [success, setSuccess] = useState(null); @@ -90,6 +113,8 @@ export function TelegramIntegrationManager() { setStoredMaskedToken(data.botToken || ""); setPublicBaseUrl(data.publicBaseUrl || ""); setTokenSource(data.sources.botToken); + setMode(data.mode || "auto"); + setDetectedMode(data.detectedMode || "polling"); setAllowedUserIdsInput((data.allowedUserIds || []).join(", ")); setPendingAccessCodes( typeof data.pendingAccessCodes === "number" ? data.pendingAccessCodes : 0 @@ -102,28 +127,37 @@ export function TelegramIntegrationManager() { } }, []); - const loadWebhookStatus = useCallback(async () => { - setWebhookState("loading"); + const loadPollingStatus = useCallback(async () => { + setPollingState("loading"); try { - const res = await fetch("/api/integrations/telegram/webhook", { + const res = await fetch("/api/integrations/telegram/polling", { cache: "no-store", }); - const data = (await res.json()) as WebhookStatusResponse; + const data = (await res.json()) as PollingStatusResponse; if (!res.ok) { - throw new Error(data.error || "Failed to load webhook status"); + throw new Error("Failed to load polling status"); } - setWebhookStatus(data); + setPollingStatus(data); } catch { - setWebhookStatus(null); + setPollingStatus(null); } finally { - setWebhookState("idle"); + setPollingState("idle"); } }, []); useEffect(() => { loadSettings(); - loadWebhookStatus(); - }, [loadSettings, loadWebhookStatus]); + loadPollingStatus(); + + // Refresh polling status every 5 seconds when in polling mode + const interval = setInterval(() => { + if (detectedMode === "polling" || mode === "polling") { + loadPollingStatus(); + } + }, 5000); + + return () => clearInterval(interval); + }, [loadSettings, loadPollingStatus, detectedMode, mode]); const connectTelegram = useCallback(async () => { setConnectState("loading"); @@ -133,9 +167,6 @@ export function TelegramIntegrationManager() { const trimmedToken = botToken.trim(); const trimmedBaseUrl = publicBaseUrl.trim(); - if (!trimmedBaseUrl) { - throw new Error("Public Base URL is required"); - } if (!trimmedToken && tokenSource === "none") { throw new Error("Telegram bot token is required"); } @@ -169,43 +200,15 @@ export function TelegramIntegrationManager() { throw new Error(setupData.error || "Failed to connect Telegram"); } - setSuccess(setupData.message || "Telegram connected"); + setSuccess(setupData.message || "Webhook configured"); setBotToken(""); - await Promise.all([loadSettings(), loadWebhookStatus()]); + await loadSettings(); } catch (e) { setError(e instanceof Error ? e.message : "Failed to connect Telegram"); } finally { setConnectState("idle"); } - }, [botToken, loadSettings, loadWebhookStatus, publicBaseUrl, tokenSource]); - - const reconnectTelegram = useCallback(async () => { - setReconnectState("loading"); - setError(null); - setSuccess(null); - try { - const res = await fetch("/api/integrations/telegram/setup", { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({}), - }); - const data = (await res.json()) as { - success?: boolean; - message?: string; - error?: string; - }; - if (!res.ok) { - throw new Error(data.error || "Failed to reconnect Telegram"); - } - - setSuccess(data.message || "Telegram reconnected"); - await Promise.all([loadSettings(), loadWebhookStatus()]); - } catch (e) { - setError(e instanceof Error ? e.message : "Failed to reconnect Telegram"); - } finally { - setReconnectState("idle"); - } - }, [loadSettings, loadWebhookStatus]); + }, [botToken, loadSettings, publicBaseUrl, tokenSource]); const disconnectTelegram = useCallback(async () => { setDisconnectState("loading"); @@ -231,13 +234,13 @@ export function TelegramIntegrationManager() { setSuccess(messages.join(" ")); setBotToken(""); - await Promise.all([loadSettings(), loadWebhookStatus()]); + await loadSettings(); } catch (e) { setError(e instanceof Error ? e.message : "Failed to disconnect Telegram"); } finally { setDisconnectState("idle"); } - }, [loadSettings, loadWebhookStatus]); + }, [loadSettings]); const saveAllowedUsers = useCallback(async () => { setSaveAllowedUsersState("loading"); @@ -295,23 +298,84 @@ export function TelegramIntegrationManager() { } }, [loadSettings]); - const hasTokenConfigured = tokenSource !== "none"; - const hasBaseUrlConfigured = publicBaseUrl.trim().length > 0; - const isConnected = hasTokenConfigured && hasBaseUrlConfigured; + const startPolling = useCallback(async () => { + setPollingState("loading"); + setError(null); + setSuccess(null); + try { + const res = await fetch("/api/integrations/telegram/polling", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({}), + }); + const data = (await res.json()) as { ok?: boolean; message?: string; error?: string }; + if (!res.ok) { + throw new Error(data.error || "Failed to start polling"); + } + setSuccess(data.message || "Polling started"); + await loadPollingStatus(); + } catch (e) { + setError(e instanceof Error ? e.message : "Failed to start polling"); + } finally { + setPollingState("idle"); + } + }, [loadPollingStatus]); + + const stopPolling = useCallback(async () => { + setPollingState("loading"); + setError(null); + setSuccess(null); + try { + const res = await fetch("/api/integrations/telegram/polling", { + method: "DELETE", + }); + const data = (await res.json()) as { ok?: boolean; message?: string; error?: string }; + if (!res.ok) { + throw new Error(data.error || "Failed to stop polling"); + } + setSuccess(data.message || "Polling stopped"); + await loadPollingStatus(); + } catch (e) { + setError(e instanceof Error ? e.message : "Failed to stop polling"); + } finally { + setPollingState("idle"); + } + }, [loadPollingStatus]); + + const saveMode = useCallback(async (newMode: TelegramMode) => { + setModeState("loading"); + setError(null); + setSuccess(null); + try { + const res = await fetch("/api/integrations/telegram/config", { + method: "PUT", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ mode: newMode }), + }); + const data = (await res.json()) as TelegramSettingsResponse; + if (!res.ok) { + throw new Error(data.error || "Failed to save mode"); + } + setMode(data.mode || "auto"); + setDetectedMode(data.detectedMode || "polling"); + setSuccess(`Mode updated to ${newMode}`); + } catch (e) { + setError(e instanceof Error ? e.message : "Failed to save mode"); + } finally { + setModeState("idle"); + } + }, []); - const canConnect = useMemo(() => { - if (!publicBaseUrl.trim()) return false; - if (botToken.trim()) return true; - return tokenSource !== "none"; - }, [botToken, publicBaseUrl, tokenSource]); + const hasTokenConfigured = tokenSource !== "none"; const isBusy = loadingSettings || connectState === "loading" || - reconnectState === "loading" || disconnectState === "loading" || saveAllowedUsersState === "loading" || - generateCodeState === "loading"; + generateCodeState === "loading" || + pollingState === "loading" || + modeState === "loading"; const updatedAtLabel = useMemo(() => { if (!updatedAt) return null; @@ -320,127 +384,311 @@ export function TelegramIntegrationManager() { return date.toLocaleString(); }, [updatedAt]); + // Determine effective mode considering auto detection + const effectiveMode = mode === "auto" ? detectedMode : mode; + return (

+ {/* Step 1: Bot Token */}
-

Telegram

- {!isConnected ? ( -

- Enter the bot token and Public Base URL, then click Connect Telegram. -

- ) : ( -

- Telegram is connected. You can reconnect or disconnect it. +

1. Bot Token

+

+ Enter your Telegram bot token from @BotFather. +

+
+ +
+ + setBotToken(e.target.value)} + placeholder="123456789:AA..." + disabled={isBusy || hasTokenConfigured} + /> + {hasTokenConfigured && ( +

+ Token saved ({sourceLabel(tokenSource)}) + {storedMaskedToken ? `: ${storedMaskedToken}` : ""}

)}
- {!isConnected ? ( - <> -
- - setBotToken(e.target.value)} - placeholder="123456789:AA..." - disabled={isBusy} - /> -

- Current source: {sourceLabel(tokenSource)} - {storedMaskedToken ? ` (${storedMaskedToken})` : ""} -

-
+ {!hasTokenConfigured && ( +
+ +
+ )} +
+ + {/* Step 2: Connection Mode */} + {hasTokenConfigured && ( +
+
+

2. Connection Mode

+

+ Choose how Telegram connects to your bot. +

+
-
- - setPublicBaseUrl(e.target.value)} - placeholder="https://your-public-host.example.com" - disabled={isBusy} - /> -

- Webhook endpoint:{" "} - {publicBaseUrl || "https://..."}/api/integrations/telegram -

+
+
+
+ + +
+
+ +
+ {effectiveMode === "webhook" ? ( + <> + + Webhook + + ) : ( + <> + + Long Polling + + )} +
+
-
- +
+
+ )} + + {/* Polling Controls - only show when polling mode is active */} + {effectiveMode === "polling" && ( +
+
+
+

Long Polling

+

+ Bot will receive messages via long polling (no HTTPS required). +

+
+ {!pollingStatus?.polling?.isRunning ? ( + + ) : ( + + )} +
+ + {pollingStatus?.polling && ( +
+
+ Status:{" "} + {pollingStatus.polling.isRunning ? ( + + + + + + Running + + ) : ( + Stopped + )} +
+ {pollingStatus.polling.lastUpdateId !== null && ( +
+ Last update ID: {pollingStatus.polling.lastUpdateId} +
+ )} +
)} - -
- - ) : ( - <> -
-
- Token source: {sourceLabel(tokenSource)} - {storedMaskedToken ? ` (${storedMaskedToken})` : ""}
+ )} +
+
+ )} + + {/* Connected Status */} + {hasTokenConfigured && ( +
+
+

Connection Status

+
+ +
+
+ Token: {sourceLabel(tokenSource)} + {storedMaskedToken ? ` (${storedMaskedToken})` : ""} +
+ {publicBaseUrl && (
Public Base URL:{" "} {publicBaseUrl}
- {updatedAtLabel && ( -
Updated: {updatedAtLabel}
- )} + )} +
+ Mode: {effectiveMode === "webhook" ? "Webhook" : "Long Polling"}
+ {updatedAtLabel && ( +
Updated: {updatedAtLabel}
+ )} +
-
- - -
- - )} -
+
+ +
+
+ )}
@@ -518,50 +766,6 @@ export function TelegramIntegrationManager() {
- {isConnected && ( -
-
-

Webhook Status

-

- Current webhook status from the latest check. -

-
- - {webhookState === "loading" && ( -

Loading webhook status...

- )} - - {webhookStatus?.webhook && ( -
-
- URL:{" "} - - {webhookStatus.webhook.url || "(empty)"} - -
-
Pending updates: {webhookStatus.webhook.pendingUpdateCount}
- {webhookStatus.webhook.lastErrorMessage && ( -
- Last error: {webhookStatus.webhook.lastErrorMessage} -
- )} - {webhookStatus.webhook.lastErrorDate && ( -
- Last error at:{" "} - {new Date(webhookStatus.webhook.lastErrorDate * 1000).toLocaleString()} -
- )} -
- )} - - {webhookState !== "loading" && !webhookStatus?.webhook && ( -

- {webhookStatus?.message || "Webhook status is not loaded yet."} -

- )} -
- )} - {success &&

{success}

} {error &&

{error}

} diff --git a/src/lib/storage/telegram-integration-store.ts b/src/lib/storage/telegram-integration-store.ts index 041e2a0..0f6a9ee 100644 --- a/src/lib/storage/telegram-integration-store.ts +++ b/src/lib/storage/telegram-integration-store.ts @@ -10,6 +10,7 @@ const TELEGRAM_SETTINGS_FILE = path.join( ); export type TelegramConfigSource = "stored" | "env" | "none"; +export type TelegramMode = "auto" | "webhook" | "polling"; interface TelegramAccessCodeRecord { hash: string; @@ -24,6 +25,8 @@ interface TelegramIntegrationFileRecord { defaultProjectId?: string; allowedUserIds?: unknown; accessCodes?: unknown; + mode?: unknown; + pollingInterval?: unknown; createdAt?: string; updatedAt?: string; } @@ -35,6 +38,8 @@ export interface TelegramIntegrationStoredSettings { defaultProjectId: string; allowedUserIds: string[]; accessCodes: TelegramAccessCodeRecord[]; + mode: TelegramMode; + pollingInterval: number; createdAt: string; updatedAt: string; } @@ -45,9 +50,13 @@ export interface TelegramIntegrationRuntimeConfig { publicBaseUrl: string; defaultProjectId: string; allowedUserIds: string[]; + mode: TelegramMode; + pollingInterval: number; + detectedMode: TelegramMode; sources: { botToken: TelegramConfigSource; webhookSecret: TelegramConfigSource; + mode: TelegramConfigSource; }; } @@ -221,6 +230,43 @@ async function readStoredRecord(): Promise { } } +function normalizeMode(raw: unknown): TelegramMode { + if (raw === "webhook" || raw === "polling") return raw; + return "auto"; +} + +function normalizePollingInterval(raw: unknown): number { + const numeric = typeof raw === "number" && Number.isFinite(raw) ? raw : 5000; + return Math.max(1000, Math.min(60000, numeric)); +} + +function isLocalhostUrl(url: string): boolean { + if (!url) return true; + try { + const parsed = new URL(url); + const hostname = parsed.hostname.toLowerCase(); + return ( + hostname === "localhost" || + hostname === "127.0.0.1" || + hostname.startsWith("192.168.") || + hostname.startsWith("10.") || + hostname.startsWith("172.") || + hostname.endsWith(".local") + ); + } catch { + return true; + } +} + +export function detectTelegramMode(config: { + mode: TelegramMode; + publicBaseUrl: string; +}): "webhook" | "polling" { + if (config.mode !== "auto") return config.mode; + if (isLocalhostUrl(config.publicBaseUrl)) return "polling"; + return "webhook"; +} + function normalizeStoredRecord( record: TelegramIntegrationFileRecord ): TelegramIntegrationStoredSettings { @@ -231,6 +277,8 @@ function normalizeStoredRecord( defaultProjectId: trimString(record.defaultProjectId), allowedUserIds: normalizeAllowedUserIds(record.allowedUserIds), accessCodes: normalizeAccessCodeRecords(record.accessCodes), + mode: normalizeMode(record.mode), + pollingInterval: normalizePollingInterval(record.pollingInterval), createdAt: trimString(record.createdAt), updatedAt: trimString(record.updatedAt), }; @@ -259,6 +307,8 @@ export async function saveTelegramIntegrationStoredSettings(input: { defaultProjectId?: string; allowedUserIds?: string[]; accessCodes?: TelegramAccessCodeRecord[]; + mode?: TelegramMode; + pollingInterval?: number; }): Promise { const current = await getTelegramIntegrationStoredSettings(); @@ -284,6 +334,12 @@ export async function saveTelegramIntegrationStoredSettings(input: { input.accessCodes !== undefined ? normalizeAccessCodeRecords(input.accessCodes) : current.accessCodes; + const nextMode = + input.mode !== undefined ? normalizeMode(input.mode) : current.mode; + const nextPollingInterval = + input.pollingInterval !== undefined + ? normalizePollingInterval(input.pollingInterval) + : current.pollingInterval; const now = new Date().toISOString(); const next: TelegramIntegrationStoredSettings = { @@ -293,6 +349,8 @@ export async function saveTelegramIntegrationStoredSettings(input: { defaultProjectId: nextDefaultProjectId, allowedUserIds: nextAllowedUserIds, accessCodes: nextAccessCodes, + mode: nextMode, + pollingInterval: nextPollingInterval, createdAt: current.createdAt || now, updatedAt: now, }; @@ -310,6 +368,7 @@ export async function getTelegramIntegrationRuntimeConfig(): Promise { const stored = await getTelegramIntegrationStoredSettings(); @@ -365,6 +441,9 @@ export async function getTelegramIntegrationPublicSettings(): Promise<{ publicBaseUrl: runtime.publicBaseUrl, defaultProjectId: runtime.defaultProjectId, allowedUserIds: runtime.allowedUserIds, + mode: runtime.mode, + pollingInterval: runtime.pollingInterval, + detectedMode: runtime.detectedMode, pendingAccessCodes: stored.accessCodes.length, updatedAt: stored.updatedAt || null, sources: runtime.sources, @@ -377,6 +456,8 @@ export async function saveTelegramIntegrationFromPublicInput(input: { publicBaseUrl?: unknown; defaultProjectId?: unknown; allowedUserIds?: unknown; + mode?: unknown; + pollingInterval?: unknown; }): Promise { const currentStored = await getTelegramIntegrationStoredSettings(); @@ -407,6 +488,14 @@ export async function saveTelegramIntegrationFromPublicInput(input: { ? input.defaultProjectId : undefined; const allowedUserIds = parseAllowedUserIdsInput(input.allowedUserIds); + const mode = + typeof input.mode === "string" + ? normalizeMode(input.mode) + : undefined; + const pollingInterval = + typeof input.pollingInterval === "number" + ? normalizePollingInterval(input.pollingInterval) + : undefined; await saveTelegramIntegrationStoredSettings({ botToken, @@ -414,6 +503,8 @@ export async function saveTelegramIntegrationFromPublicInput(input: { publicBaseUrl, defaultProjectId, allowedUserIds, + mode, + pollingInterval, }); } diff --git a/src/lib/telegram/polling-lifecycle.ts b/src/lib/telegram/polling-lifecycle.ts new file mode 100644 index 0000000..529a845 --- /dev/null +++ b/src/lib/telegram/polling-lifecycle.ts @@ -0,0 +1,149 @@ +import { + getTelegramIntegrationRuntimeConfig, + detectTelegramMode, + type TelegramIntegrationRuntimeConfig, +} from "@/lib/storage/telegram-integration-store"; +import { telegramPollingService } from "@/lib/telegram/polling-service"; + +let lifecycleInitialized = false; + +interface TelegramLifecycleOptions { + autoStartPolling?: boolean; + autoSetupWebhook?: boolean; +} + +export async function initTelegramLifecycle( + options: TelegramLifecycleOptions = {} +): Promise { + if (lifecycleInitialized) { + return; + } + + const runtime = await getTelegramIntegrationRuntimeConfig(); + const detectedMode = detectTelegramMode(runtime); + + console.log(`[Telegram Lifecycle] Mode: ${runtime.mode}, Detected: ${detectedMode}`); + + if (detectedMode === "polling") { + if (options.autoStartPolling !== false && runtime.botToken.trim()) { + try { + await telegramPollingService.start(runtime); + console.log("[Telegram Lifecycle] Polling started automatically"); + } catch (error) { + console.error("[Telegram Lifecycle] Failed to start polling:", error); + } + } + } else if (detectedMode === "webhook") { + if (options.autoSetupWebhook !== false && runtime.botToken.trim() && runtime.publicBaseUrl.trim()) { + try { + await setupTelegramWebhook(runtime); + console.log("[Telegram Lifecycle] Webhook configured"); + } catch (error) { + console.error("[Telegram Lifecycle] Failed to setup webhook:", error); + } + } + } + + setupGracefulShutdown(); + lifecycleInitialized = true; +} + +async function setupTelegramWebhook( + runtime: TelegramIntegrationRuntimeConfig +): Promise { + const { botToken, publicBaseUrl, webhookSecret } = runtime; + + if (!botToken.trim() || !publicBaseUrl.trim()) { + throw new Error("Bot token and public base URL are required"); + } + + const webhookUrl = `${publicBaseUrl.replace(/\/$/, "")}/api/integrations/telegram`; + + const response = await fetch( + `https://api.telegram.org/bot${botToken}/setWebhook`, + { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + url: webhookUrl, + secret_token: webhookSecret.trim() || undefined, + allowed_updates: ["message"], + }), + } + ); + + const payload = (await response.json().catch(() => null)) as + | { ok?: boolean; description?: string } + | null; + + if (!response.ok || !payload?.ok) { + throw new Error( + `Failed to set webhook: ${payload?.description || response.statusText}` + ); + } +} + +export async function migrateToWebhook( + runtime: TelegramIntegrationRuntimeConfig +): Promise { + // Stop polling if running + if (telegramPollingService.status.isRunning) { + telegramPollingService.stop(); + console.log("[Telegram Migration] Polling stopped"); + } + + // Setup webhook + await setupTelegramWebhook(runtime); + console.log("[Telegram Migration] Migrated to webhook mode"); +} + +export async function migrateToPolling( + runtime: TelegramIntegrationRuntimeConfig +): Promise { + // Delete webhook + await deleteTelegramWebhook(runtime.botToken); + + // Start polling + if (!telegramPollingService.status.isRunning) { + await telegramPollingService.start(runtime); + console.log("[Telegram Migration] Migrated to polling mode"); + } +} + +async function deleteTelegramWebhook(botToken: string): Promise { + const response = await fetch( + `https://api.telegram.org/bot${botToken}/deleteWebhook`, + { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ drop_pending_updates: false }), + } + ); + + const payload = (await response.json().catch(() => null)) as + | { ok?: boolean; description?: string } + | null; + + if (payload?.ok) { + console.log("[Telegram Lifecycle] Webhook deleted"); + } else { + console.warn("[Telegram Lifecycle] Failed to delete webhook:", payload?.description); + } +} + +function setupGracefulShutdown(): void { + const shutdown = () => { + console.log("[Telegram Lifecycle] Shutting down..."); + if (telegramPollingService.status.isRunning) { + telegramPollingService.stop(); + } + process.exit(0); + }; + + process.on("SIGINT", shutdown); + process.on("SIGTERM", shutdown); +} + +export function isLifecycleInitialized(): boolean { + return lifecycleInitialized; +} diff --git a/src/lib/telegram/polling-service.ts b/src/lib/telegram/polling-service.ts new file mode 100644 index 0000000..af09b46 --- /dev/null +++ b/src/lib/telegram/polling-service.ts @@ -0,0 +1,216 @@ +import { + type TelegramIntegrationRuntimeConfig, +} from "@/lib/storage/telegram-integration-store"; +import { + processTelegramUpdate, + type TelegramUpdate, +} from "@/lib/telegram/telegram-message-handler"; + +interface TelegramApiResponse { + ok?: boolean; + description?: string; + result?: Record | Array>; +} + +export interface PollingStatus { + isRunning: boolean; + lastUpdateId: number | null; + lastPollTime: string | null; + errorCount: number; + consecutiveErrors: number; +} + +class TelegramPollingService { + private isRunning = false; + private abortController: AbortController | null = null; + private lastUpdateId: number | null = null; + private errorCount = 0; + private consecutiveErrors = 0; + private lastPollTime: string | null = null; + private runtimeConfig: TelegramIntegrationRuntimeConfig | null = null; + private pollTimeout: NodeJS.Timeout | null = null; + + get status(): PollingStatus { + return { + isRunning: this.isRunning, + lastUpdateId: this.lastUpdateId, + lastPollTime: this.lastPollTime, + errorCount: this.errorCount, + consecutiveErrors: this.consecutiveErrors, + }; + } + + async start(runtimeConfig: TelegramIntegrationRuntimeConfig): Promise { + if (this.isRunning) { + throw new Error("Polling is already running"); + } + + if (!runtimeConfig.botToken.trim()) { + throw new Error("Bot token is required"); + } + + this.runtimeConfig = runtimeConfig; + this.isRunning = true; + this.abortController = new AbortController(); + this.consecutiveErrors = 0; + + console.log("[Telegram Polling] Starting polling service..."); + + // Delete webhook if exists to ensure polling works + await this.deleteWebhook(runtimeConfig.botToken); + + // Start first poll immediately + this.scheduleNextPoll(0); + } + + stop(): void { + if (!this.isRunning) { + return; + } + + console.log("[Telegram Polling] Stopping polling service..."); + + this.isRunning = false; + + if (this.pollTimeout) { + clearTimeout(this.pollTimeout); + this.pollTimeout = null; + } + + if (this.abortController) { + this.abortController.abort(); + this.abortController = null; + } + + this.runtimeConfig = null; + } + + private scheduleNextPoll(delay?: number): void { + if (!this.isRunning) { + return; + } + + const actualDelay = delay ?? this.runtimeConfig?.pollingInterval ?? 5000; + + this.pollTimeout = setTimeout(() => { + this.poll(); + }, actualDelay); + } + + private async poll(): Promise { + if (!this.isRunning || !this.runtimeConfig) { + return; + } + + const { botToken } = this.runtimeConfig; + + try { + const updates = await this.getUpdates(botToken); + + this.consecutiveErrors = 0; + this.lastPollTime = new Date().toISOString(); + + for (const update of updates) { + if (!this.isRunning) break; + await this.processUpdate(update); + } + } catch (error) { + this.errorCount++; + this.consecutiveErrors++; + + const errorMessage = error instanceof Error ? error.message : String(error); + console.error(`[Telegram Polling] Error (consecutive: ${this.consecutiveErrors}):`, errorMessage); + + if (this.consecutiveErrors >= 10) { + console.error("[Telegram Polling] Too many consecutive errors, stopping polling"); + this.stop(); + return; + } + } + + this.scheduleNextPoll(); + } + + private async getUpdates(botToken: string): Promise { + const params: Record = { + limit: 100, + timeout: 30, + }; + + if (this.lastUpdateId !== null) { + params.offset = this.lastUpdateId + 1; + } + + const response = await fetch( + `https://api.telegram.org/bot${botToken}/getUpdates`, + { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(params), + signal: this.abortController?.signal, + } + ); + + const payload = (await response.json().catch(() => null)) as + | TelegramApiResponse + | null; + + if (!response.ok || !payload?.ok) { + const description = payload?.description || "Unknown error"; + throw new Error(`getUpdates failed (${response.status}): ${description}`); + } + + const result = payload.result; + if (!Array.isArray(result)) { + return []; + } + + // Update lastUpdateId to the highest received + for (const update of result) { + const updateId = typeof update.update_id === "number" ? update.update_id : null; + if (updateId !== null && (this.lastUpdateId === null || updateId > this.lastUpdateId)) { + this.lastUpdateId = updateId; + } + } + + return result as TelegramUpdate[]; + } + + private async processUpdate(update: TelegramUpdate): Promise { + if (!this.runtimeConfig) return; + + try { + await processTelegramUpdate(update, this.runtimeConfig); + } catch (error) { + console.error("[Telegram Polling] Error processing update:", error); + // Don't throw - continue processing other updates + } + } + + private async deleteWebhook(botToken: string): Promise { + try { + const response = await fetch( + `https://api.telegram.org/bot${botToken}/deleteWebhook`, + { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ drop_pending_updates: true }), + } + ); + + const payload = (await response.json().catch(() => null)) as + | TelegramApiResponse + | null; + + if (payload?.ok) { + console.log("[Telegram Polling] Webhook deleted successfully"); + } else { + console.warn("[Telegram Polling] Failed to delete webhook:", payload?.description); + } + } catch (error) { + console.warn("[Telegram Polling] Error deleting webhook:", error); + } + } +} + +export const telegramPollingService = new TelegramPollingService(); diff --git a/src/lib/telegram/telegram-message-handler.ts b/src/lib/telegram/telegram-message-handler.ts new file mode 100644 index 0000000..3d58304 --- /dev/null +++ b/src/lib/telegram/telegram-message-handler.ts @@ -0,0 +1,711 @@ +import { + handleExternalMessage, + ExternalMessageError, +} from "@/lib/external/handle-external-message"; +import { + createDefaultTelegramSessionId, + createFreshTelegramSessionId, + getTelegramChatSessionId, + setTelegramChatSessionId, +} from "@/lib/storage/telegram-session-store"; +import { + claimTelegramUpdate, + releaseTelegramUpdate, +} from "@/lib/storage/telegram-update-store"; +import { + consumeTelegramAccessCode, + normalizeTelegramUserId, + type TelegramIntegrationRuntimeConfig, +} from "@/lib/storage/telegram-integration-store"; +import { saveChatFile } from "@/lib/storage/chat-files-store"; +import { createChat, getChat } from "@/lib/storage/chat-store"; +import { + contextKey, + type ExternalSession, + getOrCreateExternalSession, + saveExternalSession, +} from "@/lib/storage/external-session-store"; +import { getAllProjects } from "@/lib/storage/project-store"; +import crypto from "node:crypto"; + +const TELEGRAM_TEXT_LIMIT = 4096; +const TELEGRAM_FILE_MAX_BYTES = 30 * 1024 * 1024; + +export interface TelegramUpdate { + update_id?: unknown; + message?: TelegramMessage; +} + +export interface TelegramMessage { + message_id?: unknown; + text?: unknown; + caption?: unknown; + from?: { + id?: unknown; + }; + chat?: { + id?: unknown; + type?: unknown; + }; + document?: { + file_id?: unknown; + file_name?: unknown; + mime_type?: unknown; + }; + photo?: Array<{ + file_id?: unknown; + width?: unknown; + height?: unknown; + }>; + audio?: { + file_id?: unknown; + file_name?: unknown; + mime_type?: unknown; + }; + video?: { + file_id?: unknown; + file_name?: unknown; + mime_type?: unknown; + }; + voice?: { + file_id?: unknown; + mime_type?: unknown; + }; +} + +interface TelegramApiResponse { + ok?: boolean; + description?: string; + result?: Record; +} + +interface TelegramFileResult { + file_id?: string; + file_unique_id?: string; + file_size?: number; + file_path?: string; +} + +export interface TelegramIncomingFile { + fileId: string; + fileName: string; +} + +export interface TelegramExternalChatContext { + chatId: string; + projectId?: string; + currentPath: string; +} + +interface TelegramResolvedProjectContext { + session: ExternalSession; + resolvedProjectId?: string; + projectName?: string; +} + +export interface ProcessTelegramUpdateResult { + ok: boolean; + duplicate?: boolean; + ignored?: boolean; + reason?: string; + command?: string; + accessGranted?: boolean; + userId?: string; + fileSaved?: boolean; + file?: { + name: string; + path: string; + size: number; + }; + handledError?: boolean; + status?: number; +} + +function normalizeTelegramCurrentPath(rawPath: string | undefined): string { + const value = (rawPath ?? "").trim(); + if (!value || value === "/telegram") { + return ""; + } + return value; +} + +function parseTelegramError(status: number, payload: TelegramApiResponse | null): string { + const description = payload?.description?.trim(); + return description + ? `Telegram API error (${status}): ${description}` + : `Telegram API error (${status})`; +} + +async function callTelegramApi( + botToken: string, + method: string, + body?: Record +): Promise { + const response = await fetch(`https://api.telegram.org/bot${botToken}/${method}`, { + method: body ? "POST" : "GET", + headers: body ? { "Content-Type": "application/json" } : undefined, + body: body ? JSON.stringify(body) : undefined, + }); + + const payload = (await response.json().catch(() => null)) as + | TelegramApiResponse + | null; + if (!response.ok || !payload?.ok) { + throw new Error(parseTelegramError(response.status, payload)); + } + return payload; +} + +function getBotId(botToken: string): string { + const [rawBotId] = botToken.trim().split(":", 1); + const botId = rawBotId?.trim() || "default"; + return botId.replace(/[^a-zA-Z0-9._:-]/g, "_").slice(0, 128) || "default"; +} + +function chatBelongsToProject( + chatProjectId: string | undefined, + projectId: string | undefined +): boolean { + const left = chatProjectId ?? null; + const right = projectId ?? null; + return left === right; +} + +async function ensureTelegramExternalChatContext(params: { + sessionId: string; + defaultProjectId?: string; +}): Promise { + const { session, resolvedProjectId } = await resolveTelegramProjectContext({ + sessionId: params.sessionId, + defaultProjectId: params.defaultProjectId, + }); + const projectKey = contextKey(resolvedProjectId); + let resolvedChatId = session.activeChats[projectKey]; + if (resolvedChatId) { + const existing = await getChat(resolvedChatId); + if (!existing || !chatBelongsToProject(existing.projectId, resolvedProjectId)) { + resolvedChatId = ""; + } + } + + if (!resolvedChatId) { + resolvedChatId = crypto.randomUUID(); + await createChat( + resolvedChatId, + `External session ${session.id}`, + resolvedProjectId + ); + } + + session.activeChats[projectKey] = resolvedChatId; + session.currentPaths[projectKey] = normalizeTelegramCurrentPath( + session.currentPaths[projectKey] + ); + session.updatedAt = new Date().toISOString(); + await saveExternalSession(session); + + return { + chatId: resolvedChatId, + projectId: resolvedProjectId, + currentPath: session.currentPaths[projectKey] ?? "", + }; +} + +async function resolveTelegramProjectContext(params: { + sessionId: string; + defaultProjectId?: string; +}): Promise { + const session = await getOrCreateExternalSession(params.sessionId); + const projects = await getAllProjects(); + const projectById = new Map(projects.map((project) => [project.id, project])); + + let resolvedProjectId: string | undefined; + const explicitProjectId = params.defaultProjectId?.trim() || ""; + if (explicitProjectId) { + if (!projectById.has(explicitProjectId)) { + throw new Error(`Project "${explicitProjectId}" not found`); + } + resolvedProjectId = explicitProjectId; + session.activeProjectId = explicitProjectId; + } else if (session.activeProjectId && projectById.has(session.activeProjectId)) { + resolvedProjectId = session.activeProjectId; + } else if (projects.length > 0) { + resolvedProjectId = projects[0].id; + session.activeProjectId = projects[0].id; + } else { + session.activeProjectId = null; + } + + return { + session, + resolvedProjectId, + projectName: resolvedProjectId ? projectById.get(resolvedProjectId)?.name : undefined, + }; +} + +function extensionFromMime(mimeType: string): string { + const lower = mimeType.toLowerCase(); + if (lower.includes("pdf")) return ".pdf"; + if (lower.includes("png")) return ".png"; + if (lower.includes("jpeg") || lower.includes("jpg")) return ".jpg"; + if (lower.includes("webp")) return ".webp"; + if (lower.includes("gif")) return ".gif"; + if (lower.includes("mp4")) return ".mp4"; + if (lower.includes("mpeg") || lower.includes("mp3")) return ".mp3"; + if (lower.includes("ogg")) return ".ogg"; + if (lower.includes("wav")) return ".wav"; + if (lower.includes("plain")) return ".txt"; + return ""; +} + +function buildIncomingFileName(params: { + base: string; + messageId?: number; + mimeType?: string; +}): string { + const suffix = params.messageId ?? Date.now(); + const ext = params.mimeType ? extensionFromMime(params.mimeType) : ""; + return `${params.base}-${suffix}${ext}`; +} + +function sanitizeFileName(value: string): string { + const base = value.trim().replace(/[\\/]+/g, "_"); + return base || `file-${Date.now()}`; +} + +function withMessageIdPrefix(fileName: string, messageId?: number): string { + if (typeof messageId !== "number") return fileName; + return `${messageId}-${fileName}`; +} + +export function extractIncomingFile( + message: TelegramMessage, + messageId?: number +): TelegramIncomingFile | null { + const documentFileId = + typeof message.document?.file_id === "string" + ? message.document.file_id.trim() + : ""; + if (documentFileId) { + const docNameRaw = + typeof message.document?.file_name === "string" + ? message.document.file_name + : ""; + const fallback = buildIncomingFileName({ + base: "document", + messageId, + mimeType: + typeof message.document?.mime_type === "string" + ? message.document.mime_type + : undefined, + }); + return { + fileId: documentFileId, + fileName: withMessageIdPrefix(sanitizeFileName(docNameRaw || fallback), messageId), + }; + } + + const photos: Array<{ file_id?: unknown }> = Array.isArray(message.photo) + ? message.photo + : []; + for (let i = photos.length - 1; i >= 0; i -= 1) { + const photo = photos[i]; + const fileId = typeof photo?.file_id === "string" ? photo.file_id.trim() : ""; + if (fileId) { + return { + fileId, + fileName: sanitizeFileName( + buildIncomingFileName({ base: "photo", messageId, mimeType: "image/jpeg" }) + ), + }; + } + } + + const audioFileId = + typeof message.audio?.file_id === "string" ? message.audio.file_id.trim() : ""; + if (audioFileId) { + const audioNameRaw = + typeof message.audio?.file_name === "string" ? message.audio.file_name : ""; + const fallback = buildIncomingFileName({ + base: "audio", + messageId, + mimeType: + typeof message.audio?.mime_type === "string" + ? message.audio.mime_type + : undefined, + }); + return { + fileId: audioFileId, + fileName: withMessageIdPrefix(sanitizeFileName(audioNameRaw || fallback), messageId), + }; + } + + const videoFileId = + typeof message.video?.file_id === "string" ? message.video.file_id.trim() : ""; + if (videoFileId) { + const videoNameRaw = + typeof message.video?.file_name === "string" ? message.video.file_name : ""; + const fallback = buildIncomingFileName({ + base: "video", + messageId, + mimeType: + typeof message.video?.mime_type === "string" + ? message.video.mime_type + : undefined, + }); + return { + fileId: videoFileId, + fileName: withMessageIdPrefix(sanitizeFileName(videoNameRaw || fallback), messageId), + }; + } + + const voiceFileId = + typeof message.voice?.file_id === "string" ? message.voice.file_id.trim() : ""; + if (voiceFileId) { + return { + fileId: voiceFileId, + fileName: sanitizeFileName( + buildIncomingFileName({ + base: "voice", + messageId, + mimeType: + typeof message.voice?.mime_type === "string" + ? message.voice.mime_type + : undefined, + }) + ), + }; + } + + return null; +} + +export async function downloadTelegramFile(botToken: string, fileId: string): Promise { + const payload = await callTelegramApi(botToken, "getFile", { + file_id: fileId, + }); + const result = payload.result as TelegramFileResult | undefined; + const filePath = result?.file_path ?? ""; + if (!filePath) { + throw new Error("Telegram getFile returned empty file_path"); + } + + const fileUrl = `https://api.telegram.org/file/bot${botToken}/${filePath}`; + const response = await fetch(fileUrl); + if (!response.ok) { + throw new Error(`Failed to download Telegram file (${response.status})`); + } + + const bytes = await response.arrayBuffer(); + if (bytes.byteLength > TELEGRAM_FILE_MAX_BYTES) { + throw new Error( + `Telegram file is too large (${bytes.byteLength} bytes). Max supported size is ${TELEGRAM_FILE_MAX_BYTES} bytes.` + ); + } + return Buffer.from(bytes); +} + +function extractCommand(text: string): string | null { + const first = text.trim().split(/\s+/, 1)[0]; + if (!first || !first.startsWith("/")) return null; + return first.split("@", 1)[0].toLowerCase(); +} + +function extractAccessCodeCandidate(text: string): string | null { + const value = text.trim(); + if (!value) return null; + + const fromCommand = value.match( + /^\/(?:code|start)(?:@[a-zA-Z0-9_]+)?\s+([A-Za-z0-9_-]{6,64})$/i + ); + if (fromCommand?.[1]) { + return fromCommand[1]; + } + + if (/^[A-Za-z0-9_-]{6,64}$/.test(value)) { + return value; + } + return null; +} + +function normalizeOutgoingText(text: string): string { + const value = text.trim(); + if (!value) return "Пустой ответ от агента."; + if (value.length <= TELEGRAM_TEXT_LIMIT) return value; + return `${value.slice(0, TELEGRAM_TEXT_LIMIT - 1)}…`; +} + +export async function sendTelegramMessage( + botToken: string, + chatId: number | string, + text: string, + replyToMessageId?: number +): Promise { + const response = await fetch(`https://api.telegram.org/bot${botToken}/sendMessage`, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ + chat_id: chatId, + text: normalizeOutgoingText(text), + ...(typeof replyToMessageId === "number" ? { reply_to_message_id: replyToMessageId } : {}), + }), + }); + + const payload = (await response.json().catch(() => null)) as + | { ok?: boolean; description?: string } + | null; + + if (!response.ok || !payload?.ok) { + throw new Error( + `Telegram sendMessage failed (${response.status})${payload?.description ? `: ${payload.description}` : ""}` + ); + } +} + +function helpText(activeProject?: { id?: string; name?: string }): string { + const activeProjectLine = activeProject?.id + ? `Active project: ${activeProject.name ? `${activeProject.name} (${activeProject.id})` : activeProject.id}` + : "Active project: not selected"; + return [ + "Telegram connection is active.", + activeProjectLine, + "", + "Commands:", + "/start - show this help", + "/help - show this help", + "/code - activate access for your Telegram user", + "/new - start a new conversation (reset context)", + "", + "Text messages are sent to the agent.", + "File uploads are saved into chat files.", + "You can also ask the agent to send a local file back to Telegram.", + ].join("\n"); +} + +export async function processTelegramUpdate( + update: TelegramUpdate, + runtime: TelegramIntegrationRuntimeConfig +): Promise { + const botToken = runtime.botToken.trim(); + const defaultProjectId = runtime.defaultProjectId || undefined; + const allowedUserIds = new Set(runtime.allowedUserIds); + + if (!botToken) { + throw new Error("Telegram bot token is not configured"); + } + + const updateId = + typeof update.update_id === "number" && Number.isInteger(update.update_id) + ? update.update_id + : null; + if (updateId === null) { + throw new Error("Invalid update_id"); + } + + const botId = getBotId(botToken); + const isNewUpdate = await claimTelegramUpdate(botId, updateId); + if (!isNewUpdate) { + return { ok: true, duplicate: true }; + } + + try { + const message = update.message; + const chatId = + typeof message?.chat?.id === "number" || typeof message?.chat?.id === "string" + ? message.chat.id + : null; + const chatType = typeof message?.chat?.type === "string" ? message.chat.type : ""; + const messageId = + typeof message?.message_id === "number" ? message.message_id : undefined; + + if (chatId === null || !chatType) { + return { ok: true, ignored: true, reason: "unsupported_update" }; + } + + if (chatType !== "private") { + return { ok: true, ignored: true, reason: "private_only" }; + } + + const text = typeof message?.text === "string" ? message.text.trim() : ""; + const caption = + typeof message?.caption === "string" ? message.caption.trim() : ""; + const incomingText = text || caption; + const fromUserId = normalizeTelegramUserId(message?.from?.id); + + if (!fromUserId) { + return { + ok: true, + ignored: true, + reason: "missing_user_id", + }; + } + + if (!allowedUserIds.has(fromUserId)) { + const accessCode = extractAccessCodeCandidate(text); + const granted = + accessCode && + (await consumeTelegramAccessCode({ + code: accessCode, + userId: fromUserId, + })); + + if (granted) { + await sendTelegramMessage( + botToken, + chatId, + "Доступ выдан. Теперь можно отправлять сообщения агенту.", + messageId + ); + return { + ok: true, + accessGranted: true, + userId: fromUserId, + }; + } + + await sendTelegramMessage( + botToken, + chatId, + [ + "Доступ запрещён: ваш user_id не в списке разрешённых.", + "Отправьте код активации командой /code <код> или /start <код>.", + `Ваш user_id: ${fromUserId}`, + ].join("\n"), + messageId + ); + return { + ok: true, + ignored: true, + reason: "user_not_allowed", + userId: fromUserId, + }; + } + + let sessionId = await getTelegramChatSessionId(botId, chatId); + if (!sessionId) { + sessionId = createDefaultTelegramSessionId(botId, chatId); + await setTelegramChatSessionId(botId, chatId, sessionId); + } + + const command = extractCommand(text); + if (command === "/start" || command === "/help") { + const resolvedProject = await resolveTelegramProjectContext({ + sessionId, + defaultProjectId, + }); + await saveExternalSession({ + ...resolvedProject.session, + updatedAt: new Date().toISOString(), + }); + await sendTelegramMessage( + botToken, + chatId, + helpText({ + id: resolvedProject.resolvedProjectId, + name: resolvedProject.projectName, + }), + messageId + ); + return { ok: true, command }; + } + + if (command === "/new") { + const freshSessionId = createFreshTelegramSessionId(botId, chatId); + await setTelegramChatSessionId(botId, chatId, freshSessionId); + await sendTelegramMessage( + botToken, + chatId, + "Начал новый диалог. Контекст очищен для следующего сообщения.", + messageId + ); + return { ok: true, command }; + } + + let incomingSavedFile: { + name: string; + path: string; + size: number; + } | null = null; + + const incomingFile = message ? extractIncomingFile(message, messageId) : null; + let externalContext: TelegramExternalChatContext | null = null; + if (incomingFile) { + externalContext = await ensureTelegramExternalChatContext({ + sessionId, + defaultProjectId, + }); + const fileBuffer = await downloadTelegramFile(botToken, incomingFile.fileId); + const saved = await saveChatFile( + externalContext.chatId, + fileBuffer, + incomingFile.fileName + ); + incomingSavedFile = { + name: saved.name, + path: saved.path, + size: saved.size, + }; + } + + if (!incomingText) { + if (incomingSavedFile) { + await sendTelegramMessage( + botToken, + chatId, + `File "${incomingSavedFile.name}" saved to chat files.`, + messageId + ); + return { + ok: true, + fileSaved: true, + file: incomingSavedFile, + }; + } + + await sendTelegramMessage( + botToken, + chatId, + "Only text messages and file uploads are supported right now.", + messageId + ); + return { ok: true, ignored: true, reason: "non_text" }; + } + + try { + const result = await handleExternalMessage({ + sessionId, + message: incomingSavedFile + ? `${incomingText}\n\nAttached file: ${incomingSavedFile.name}` + : incomingText, + projectId: externalContext?.projectId ?? defaultProjectId, + chatId: externalContext?.chatId, + currentPath: normalizeTelegramCurrentPath(externalContext?.currentPath), + runtimeData: { + telegram: { + botToken, + chatId, + replyToMessageId: messageId ?? null, + }, + }, + }); + + await sendTelegramMessage(botToken, chatId, result.reply, messageId); + return { ok: true }; + } catch (error) { + if (error instanceof ExternalMessageError) { + const errorMessage = + typeof error.payload.error === "string" + ? error.payload.error + : "Не удалось обработать сообщение."; + await sendTelegramMessage(botToken, chatId, `Ошибка: ${errorMessage}`, messageId); + return { ok: true, handledError: true, status: error.status }; + } + throw error; + } + } catch (error) { + await releaseTelegramUpdate(botId, updateId); + throw error; + } +}