diff --git a/.changeset/chat-stream-options-adapter-fallback.md b/.changeset/chat-stream-options-adapter-fallback.md new file mode 100644 index 00000000..758c83f4 --- /dev/null +++ b/.changeset/chat-stream-options-adapter-fallback.md @@ -0,0 +1,5 @@ +--- +"chat": patch +--- + +Pass configured fallback streaming options (`updateIntervalMs` and `fallbackPlaceholderText`) through native `adapter.stream()` calls so adapters can align their fallback behavior with `Chat` streaming config. diff --git a/.changeset/telegram-draft-streaming-polish.md b/.changeset/telegram-draft-streaming-polish.md new file mode 100644 index 00000000..e2daf7a9 --- /dev/null +++ b/.changeset/telegram-draft-streaming-polish.md @@ -0,0 +1,5 @@ +--- +"@chat-adapter/telegram": patch +--- + +Add Telegram `sendMessageDraft` streaming in DMs with post+edit fallback, plus HTML markdown rendering for streamed and posted markdown content. diff --git a/README.md b/README.md index 5e9faa3d..33629dd7 100644 --- a/README.md +++ b/README.md @@ -53,14 +53,14 @@ See the [Getting Started guide](https://chat-sdk.dev/docs/getting-started) for a | Microsoft Teams | `@chat-adapter/teams` | Yes | Read-only | Yes | No | Post+Edit | Yes | | Google Chat | `@chat-adapter/gchat` | Yes | Yes | Yes | No | Post+Edit | Yes | | Discord | `@chat-adapter/discord` | Yes | Yes | Yes | No | Post+Edit | Yes | -| Telegram | `@chat-adapter/telegram` | Yes | Yes | Partial | No | Post+Edit | Yes | +| Telegram | `@chat-adapter/telegram` | Yes | Yes | Partial | No | Draft API (DMs) + Post+Edit | Yes | | GitHub | `@chat-adapter/github` | Yes | Yes | No | No | No | No | | Linear | `@chat-adapter/linear` | Yes | Yes | No | No | No | No | ## Features - [**Event handlers**](https://chat-sdk.dev/docs/usage) — mentions, messages, reactions, button clicks, slash commands, modals -- [**AI streaming**](https://chat-sdk.dev/docs/streaming) — stream LLM responses with native Slack streaming and post+edit fallback +- [**AI streaming**](https://chat-sdk.dev/docs/streaming) — stream LLM responses with native Slack/Telegram DM streaming and post+edit fallback - [**Cards**](https://chat-sdk.dev/docs/cards) — JSX-based interactive cards (Block Kit, Adaptive Cards, Google Chat Cards) - [**Actions**](https://chat-sdk.dev/docs/actions) — handle button clicks and dropdown selections - [**Modals**](https://chat-sdk.dev/docs/modals) — form dialogs with text inputs, dropdowns, and validation diff --git a/apps/docs/content/docs/adapters/index.mdx b/apps/docs/content/docs/adapters/index.mdx index c2d91091..9a86963d 100644 --- a/apps/docs/content/docs/adapters/index.mdx +++ b/apps/docs/content/docs/adapters/index.mdx @@ -18,7 +18,7 @@ Adapters handle webhook verification, message parsing, and API calls for each pl | Edit message | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | | Delete message | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | | File uploads | ✅ | ✅ | ❌ | ✅ | ⚠️ Single file | ❌ | ❌ | -| Streaming | ✅ Native | ⚠️ Post+Edit | ⚠️ Post+Edit | ⚠️ Post+Edit | ⚠️ Post+Edit | ❌ | ❌ | +| Streaming | ✅ Native | ⚠️ Post+Edit | ⚠️ Post+Edit | ⚠️ Post+Edit | ⚠️ Draft API (DMs) + Post+Edit | ❌ | ❌ | ### Rich content diff --git a/apps/docs/content/docs/adapters/telegram.mdx b/apps/docs/content/docs/adapters/telegram.mdx index 118880b8..2a24d64b 100644 --- a/apps/docs/content/docs/adapters/telegram.mdx +++ b/apps/docs/content/docs/adapters/telegram.mdx @@ -141,7 +141,7 @@ TELEGRAM_API_BASE_URL=https://api.telegram.org | Reactions (add/remove) | Yes | | Cards | Text fallback + inline keyboard buttons/link buttons | | Modals | No | -| Streaming | Post+Edit fallback | +| Streaming | `sendMessageDraft` in DMs + Post+Edit fallback | | DMs | Yes | | Ephemeral messages | No | | File uploads | Single file (`sendDocument`) | @@ -159,3 +159,5 @@ TELEGRAM_API_BASE_URL=https://api.telegram.org - `Button` and `LinkButton` in card `Actions` render as inline keyboard buttons. - Telegram callback data is limited to 64 bytes. Keep button `id`/`value` payloads short. - Other rich card elements (images/select menus/radios) render as fallback text only. +- Native draft streaming (`sendMessageDraft`) is currently available in private chats; groups/channels use post+edit fallback. +- `{ markdown: ... }` and `{ ast: ... }` messages are sent with Telegram HTML parse mode for formatting support; plain strings and `{ raw: ... }` are sent as plain text. diff --git a/apps/docs/content/docs/index.mdx b/apps/docs/content/docs/index.mdx index 2de1d503..d53d0106 100644 --- a/apps/docs/content/docs/index.mdx +++ b/apps/docs/content/docs/index.mdx @@ -55,7 +55,7 @@ Each adapter factory auto-detects credentials from environment variables (`SLACK | Microsoft Teams | `@chat-adapter/teams` | Yes | Read-only | Yes | No | Post+Edit | Yes | | Google Chat | `@chat-adapter/gchat` | Yes | Yes | Yes | No | Post+Edit | Yes | | Discord | `@chat-adapter/discord` | Yes | Yes | Yes | No | Post+Edit | Yes | -| Telegram | `@chat-adapter/telegram` | Yes | Yes | Partial | No | Post+Edit | Yes | +| Telegram | `@chat-adapter/telegram` | Yes | Yes | Partial | No | Draft API (DMs) + Post+Edit | Yes | | GitHub | `@chat-adapter/github` | Yes | Yes | No | No | No | No | | Linear | `@chat-adapter/linear` | Yes | Yes | No | No | No | No | diff --git a/apps/docs/content/docs/streaming.mdx b/apps/docs/content/docs/streaming.mdx index c1363b9e..5479b795 100644 --- a/apps/docs/content/docs/streaming.mdx +++ b/apps/docs/content/docs/streaming.mdx @@ -62,6 +62,7 @@ await thread.post(stream); | Teams | Post + Edit | Posts a message then edits it as chunks arrive | | Google Chat | Post + Edit | Posts a message then edits it as chunks arrive | | Discord | Post + Edit | Posts a message then edits it as chunks arrive | +| Telegram | Draft API (DMs) + Post + Edit | Uses `sendMessageDraft` in private chats, falls back to post+edit in other chats | The post+edit fallback throttles edits to avoid rate limits. Configure the update interval when creating your `Chat` instance: diff --git a/packages/adapter-telegram/README.md b/packages/adapter-telegram/README.md index 55ea318e..2f2e00bf 100644 --- a/packages/adapter-telegram/README.md +++ b/packages/adapter-telegram/README.md @@ -27,7 +27,57 @@ const bot = new Chat({ }); ``` -Features include mentions, reactions, typing indicators, file uploads, and card fallback rendering with inline keyboard buttons for card actions. +Features include mentions, reactions, typing indicators, file uploads, Markdown/AST formatting via Telegram HTML parse mode, Telegram draft streaming in DMs (`sendMessageDraft`) with post+edit fallback elsewhere, and card fallback rendering with inline keyboard buttons for card actions. + +## Polling mode + +When developing locally, you typically can't expose a public URL for Telegram to send webhooks to. Polling mode uses `getUpdates` to fetch messages directly from Telegram instead — no public endpoint needed. + +The `longPolling` option is entirely optional. Sensible defaults are applied when omitted. + +```typescript +import { createMemoryState } from "@chat-adapter/state-memory"; + +const telegram = createTelegramAdapter({ + botToken: process.env.TELEGRAM_BOT_TOKEN!, + mode: "polling", + // Optional — fine-tune polling behavior: + // longPolling: { timeout: 30, dropPendingUpdates: false }, +}); + +const bot = new Chat({ + userName: "mybot", + adapters: { telegram }, + state: createMemoryState(), +}); + +// Optional manual control +await telegram.resetWebhook(); +await telegram.startPolling(); +await telegram.stopPolling(); +``` + +### Auto mode + +With `mode: "auto"` (the default), the adapter picks the right strategy for you. In a serverless environment like Vercel it uses webhooks; everywhere else (e.g. local dev) it falls back to polling. + +```typescript +const telegram = createTelegramAdapter({ + botToken: process.env.TELEGRAM_BOT_TOKEN!, + mode: "auto", // default +}); + +const bot = new Chat({ + userName: "mybot", + adapters: { telegram }, + state: createMemoryState(), +}); + +// Call initialize() so polling can start in long-running local processes: +void bot.initialize(); + +console.log(telegram.runtimeMode); // "webhook" | "polling" +``` ## Polling mode diff --git a/packages/adapter-telegram/src/index.test.ts b/packages/adapter-telegram/src/index.test.ts index 20c2cc1d..03d314e8 100644 --- a/packages/adapter-telegram/src/index.test.ts +++ b/packages/adapter-telegram/src/index.test.ts @@ -867,10 +867,17 @@ describe("TelegramAdapter", () => { const sendMessageBody = JSON.parse( String((mockFetch.mock.calls[1]?.[1] as RequestInit).body) - ) as { chat_id: string; text: string }; + ) as { chat_id: string; text: string; parse_mode?: string }; + const editMessageBody = JSON.parse( + String((mockFetch.mock.calls[2]?.[1] as RequestInit).body) + ) as { chat_id: string; text: string; parse_mode?: string }; expect(sendMessageBody.chat_id).toBe("123"); expect(sendMessageBody.text).toBe("hello"); + expect(sendMessageBody.parse_mode).toBe("HTML"); + expect(editMessageBody.chat_id).toBe("123"); + expect(editMessageBody.text).toBe("updated"); + expect(editMessageBody.parse_mode).toBeUndefined(); }); it("posts cards with inline keyboard buttons", async () => { @@ -920,6 +927,8 @@ describe("TelegramAdapter", () => { const sendMessageBody = JSON.parse( String((mockFetch.mock.calls[1]?.[1] as RequestInit).body) ) as { + text: string; + parse_mode?: string; reply_markup?: { inline_keyboard: Array< Array<{ text: string; callback_data?: string; url?: string }> @@ -929,7 +938,8 @@ describe("TelegramAdapter", () => { const row = sendMessageBody.reply_markup?.inline_keyboard[0]; expect(row).toBeDefined(); - expect(sendMessageBody.parse_mode).toBe("Markdown"); + expect(sendMessageBody.parse_mode).toBe("HTML"); + expect(sendMessageBody.text).toContain("Approval needed"); expect(row?.[0]).toEqual({ text: "Approve", callback_data: encodeTelegramCallbackData("approve", "request-123"), @@ -940,6 +950,451 @@ describe("TelegramAdapter", () => { }); }); + it("streams in private chats using sendMessageDraft", async () => { + mockFetch + .mockResolvedValueOnce( + telegramOk({ + id: 999, + is_bot: true, + first_name: "Bot", + username: "mybot", + }) + ) + .mockResolvedValueOnce(telegramOk(true)) + .mockResolvedValueOnce(telegramOk(true)) + .mockResolvedValueOnce( + telegramOk( + sampleMessage({ + text: "hello world", + }) + ) + ); + + const adapter = createTelegramAdapter({ + botToken: "token", + mode: "webhook", + logger: mockLogger, + userName: "mybot", + }); + + await adapter.initialize(createMockChat()); + + const stream = (async function* () { + yield "hello"; + yield " world"; + })(); + + const result = await adapter.stream("telegram:123", stream, { + updateIntervalMs: 0, + }); + + expect(result.id).toBe("123:11"); + expect(result.threadId).toBe("telegram:123"); + + const draftUrl1 = String(mockFetch.mock.calls[1]?.[0]); + const draftUrl2 = String(mockFetch.mock.calls[2]?.[0]); + const finalSendUrl = String(mockFetch.mock.calls[3]?.[0]); + + expect(draftUrl1).toContain("/sendMessageDraft"); + expect(draftUrl2).toContain("/sendMessageDraft"); + expect(finalSendUrl).toContain("/sendMessage"); + + const draftBody1 = JSON.parse( + String((mockFetch.mock.calls[1]?.[1] as RequestInit).body) + ) as { + chat_id: string; + draft_id: string; + text: string; + parse_mode?: string; + }; + const draftBody2 = JSON.parse( + String((mockFetch.mock.calls[2]?.[1] as RequestInit).body) + ) as { + chat_id: string; + draft_id: string; + text: string; + parse_mode?: string; + }; + const finalBody = JSON.parse( + String((mockFetch.mock.calls[3]?.[1] as RequestInit).body) + ) as { chat_id: string; text: string; parse_mode?: string }; + + expect(draftBody1.chat_id).toBe("123"); + expect(draftBody2.chat_id).toBe("123"); + expect(draftBody1.text).toBe("hello"); + expect(draftBody2.text).toBe("hello world"); + expect(draftBody1.parse_mode).toBe("HTML"); + expect(draftBody2.parse_mode).toBe("HTML"); + expect(draftBody1.draft_id).toBe(draftBody2.draft_id); + expect(finalBody.chat_id).toBe("123"); + expect(finalBody.text).toBe("hello world"); + expect(finalBody.parse_mode).toBe("HTML"); + }); + + it("falls back to post+edit streaming in non-DM chats", async () => { + mockFetch + .mockResolvedValueOnce( + telegramOk({ + id: 999, + is_bot: true, + first_name: "Bot", + username: "mybot", + }) + ) + .mockResolvedValueOnce( + telegramOk( + sampleMessage({ + chat: { id: -100123, type: "supergroup", title: "General" }, + text: "...", + }) + ) + ) + .mockResolvedValueOnce( + telegramOk( + sampleMessage({ + chat: { id: -100123, type: "supergroup", title: "General" }, + text: "hello", + edit_date: 1735689700, + }) + ) + ); + + const adapter = createTelegramAdapter({ + botToken: "token", + mode: "webhook", + logger: mockLogger, + userName: "mybot", + }); + + await adapter.initialize(createMockChat()); + + const stream = (async function* () { + yield "hello"; + })(); + + const result = await adapter.stream("telegram:-100123", stream, { + updateIntervalMs: 0, + }); + + expect(result.id).toBe("-100123:11"); + expect(result.threadId).toBe("telegram:-100123"); + + const postUrl = String(mockFetch.mock.calls[1]?.[0]); + const editUrl = String(mockFetch.mock.calls[2]?.[0]); + + expect(postUrl).toContain("/sendMessage"); + expect(editUrl).toContain("/editMessageText"); + expect(postUrl).not.toContain("/sendMessageDraft"); + expect(editUrl).not.toContain("/sendMessageDraft"); + + const postBody = JSON.parse( + String((mockFetch.mock.calls[1]?.[1] as RequestInit).body) + ) as { chat_id: string; text: string }; + const editBody = JSON.parse( + String((mockFetch.mock.calls[2]?.[1] as RequestInit).body) + ) as { chat_id: string; text: string; parse_mode?: string }; + + expect(postBody.chat_id).toBe("-100123"); + expect(postBody.text).toBe("..."); + expect(editBody.chat_id).toBe("-100123"); + expect(editBody.text).toBe("hello"); + expect(editBody.parse_mode).toBe("HTML"); + }); + + it("falls back to post+edit when sendMessageDraft is unavailable", async () => { + mockFetch + .mockResolvedValueOnce( + telegramOk({ + id: 999, + is_bot: true, + first_name: "Bot", + username: "mybot", + }) + ) + .mockResolvedValueOnce(telegramError(404, 404, "Not Found")) + .mockResolvedValueOnce( + telegramOk( + sampleMessage({ + text: "...", + }) + ) + ) + .mockResolvedValueOnce( + telegramOk( + sampleMessage({ + text: "hello", + edit_date: 1735689700, + }) + ) + ); + + const adapter = createTelegramAdapter({ + botToken: "token", + mode: "webhook", + logger: mockLogger, + userName: "mybot", + }); + + await adapter.initialize(createMockChat()); + + const stream = (async function* () { + yield "hello"; + })(); + + const result = await adapter.stream("telegram:123", stream, { + updateIntervalMs: 0, + }); + + expect(result.id).toBe("123:11"); + expect(result.threadId).toBe("telegram:123"); + + const draftUrl = String(mockFetch.mock.calls[1]?.[0]); + const postUrl = String(mockFetch.mock.calls[2]?.[0]); + const editUrl = String(mockFetch.mock.calls[3]?.[0]); + + expect(draftUrl).toContain("/sendMessageDraft"); + expect(postUrl).toContain("/sendMessage"); + expect(editUrl).toContain("/editMessageText"); + }); + + it("falls back to post+edit when sendMessageDraft returns method-not-found validation", async () => { + mockFetch + .mockResolvedValueOnce( + telegramOk({ + id: 999, + is_bot: true, + first_name: "Bot", + username: "mybot", + }) + ) + .mockResolvedValueOnce( + telegramError(400, 400, "Bad Request: method not found") + ) + .mockResolvedValueOnce( + telegramOk( + sampleMessage({ + text: "...", + }) + ) + ) + .mockResolvedValueOnce( + telegramOk( + sampleMessage({ + text: "hello", + edit_date: 1735689700, + }) + ) + ); + + const adapter = createTelegramAdapter({ + botToken: "token", + mode: "webhook", + logger: mockLogger, + userName: "mybot", + }); + + await adapter.initialize(createMockChat()); + + const stream = (async function* () { + yield "hello"; + })(); + + const result = await adapter.stream("telegram:123", stream, { + updateIntervalMs: 0, + }); + + expect(result.id).toBe("123:11"); + expect(result.threadId).toBe("telegram:123"); + + const draftUrl = String(mockFetch.mock.calls[1]?.[0]); + const postUrl = String(mockFetch.mock.calls[2]?.[0]); + const editUrl = String(mockFetch.mock.calls[3]?.[0]); + + expect(draftUrl).toContain("/sendMessageDraft"); + expect(postUrl).toContain("/sendMessage"); + expect(editUrl).toContain("/editMessageText"); + }); + + it("renders markdown with Telegram HTML parse mode", async () => { + mockFetch + .mockResolvedValueOnce( + telegramOk({ + id: 999, + is_bot: true, + first_name: "Bot", + username: "mybot", + }) + ) + .mockResolvedValueOnce(telegramOk(sampleMessage())); + + const adapter = createTelegramAdapter({ + botToken: "token", + mode: "webhook", + logger: mockLogger, + userName: "mybot", + }); + + await adapter.initialize(createMockChat()); + + await adapter.postMessage("telegram:123", { + markdown: + "**Bold** _italic_ [Docs](https://example.com) `code`\n\n> Quote", + }); + + const sendMessageBody = JSON.parse( + String((mockFetch.mock.calls[1]?.[1] as RequestInit).body) + ) as { text: string; parse_mode?: string }; + + expect(sendMessageBody.parse_mode).toBe("HTML"); + expect(sendMessageBody.text).toContain("Bold"); + expect(sendMessageBody.text).toContain("italic"); + expect(sendMessageBody.text).toContain( + 'Docs' + ); + expect(sendMessageBody.text).toContain("code"); + expect(sendMessageBody.text).toContain("
Quote
"); + }); + + it("supports disabling fallback placeholder in non-DM stream fallback", async () => { + mockFetch + .mockResolvedValueOnce( + telegramOk({ + id: 999, + is_bot: true, + first_name: "Bot", + username: "mybot", + }) + ) + .mockResolvedValueOnce( + telegramOk( + sampleMessage({ + chat: { id: -100123, type: "supergroup", title: "General" }, + text: "hello", + }) + ) + ); + + const adapter = createTelegramAdapter({ + botToken: "token", + mode: "webhook", + logger: mockLogger, + userName: "mybot", + }); + + await adapter.initialize(createMockChat()); + + const stream = (async function* () { + yield "hello"; + })(); + + await adapter.stream("telegram:-100123", stream, { + updateIntervalMs: 0, + fallbackPlaceholderText: null, + }); + + expect(mockFetch).toHaveBeenCalledTimes(2); + const sendMessageUrl = String(mockFetch.mock.calls[1]?.[0]); + expect(sendMessageUrl).toContain("/sendMessage"); + + const sendMessageBody = JSON.parse( + String((mockFetch.mock.calls[1]?.[1] as RequestInit).body) + ) as { chat_id: string; text: string; parse_mode?: string }; + + expect(sendMessageBody.chat_id).toBe("-100123"); + expect(sendMessageBody.text).toBe("hello"); + expect(sendMessageBody.parse_mode).toBe("HTML"); + }); + + it("streams markdown drafts with Telegram HTML parse mode", async () => { + mockFetch + .mockResolvedValueOnce( + telegramOk({ + id: 999, + is_bot: true, + first_name: "Bot", + username: "mybot", + }) + ) + .mockResolvedValueOnce(telegramOk(true)) + .mockResolvedValueOnce(telegramOk(true)) + .mockResolvedValueOnce(telegramOk(sampleMessage({ text: "done" }))); + + const adapter = createTelegramAdapter({ + botToken: "token", + mode: "webhook", + logger: mockLogger, + userName: "mybot", + }); + + await adapter.initialize(createMockChat()); + + const stream = (async function* () { + yield "**bold"; + yield "** text"; + })(); + + await adapter.stream("telegram:123", stream, { + updateIntervalMs: 0, + }); + + const draftBody1 = JSON.parse( + String((mockFetch.mock.calls[1]?.[1] as RequestInit).body) + ) as { text: string; parse_mode?: string }; + const draftBody2 = JSON.parse( + String((mockFetch.mock.calls[2]?.[1] as RequestInit).body) + ) as { text: string; parse_mode?: string }; + + expect(draftBody1.parse_mode).toBe("HTML"); + expect(draftBody2.parse_mode).toBe("HTML"); + expect(draftBody2.text).toContain("bold"); + }); + + it("cleans up placeholder and throws when fallback stream is empty", async () => { + mockFetch + .mockResolvedValueOnce( + telegramOk({ + id: 999, + is_bot: true, + first_name: "Bot", + username: "mybot", + }) + ) + .mockResolvedValueOnce( + telegramOk( + sampleMessage({ + chat: { id: -100123, type: "supergroup", title: "General" }, + text: "...", + }) + ) + ) + .mockResolvedValueOnce(telegramOk(true)); + + const adapter = createTelegramAdapter({ + botToken: "token", + mode: "webhook", + logger: mockLogger, + userName: "mybot", + }); + + await adapter.initialize(createMockChat()); + + const emptyStream = (async function* () { + // empty by design + })(); + + await expect( + adapter.stream("telegram:-100123", emptyStream, { + updateIntervalMs: 0, + }) + ).rejects.toBeInstanceOf(ValidationError); + + const postUrl = String(mockFetch.mock.calls[1]?.[0]); + const deleteUrl = String(mockFetch.mock.calls[2]?.[0]); + + expect(postUrl).toContain("/sendMessage"); + expect(deleteUrl).toContain("/deleteMessage"); + }); + it("adds and removes reactions", async () => { mockFetch .mockResolvedValueOnce( diff --git a/packages/adapter-telegram/src/index.ts b/packages/adapter-telegram/src/index.ts index a88c32cc..5b2e129d 100644 --- a/packages/adapter-telegram/src/index.ts +++ b/packages/adapter-telegram/src/index.ts @@ -13,6 +13,7 @@ import type { Adapter, AdapterPostableMessage, Attachment, + CardElement, ChannelInfo, ChatInstance, EmojiValue, @@ -21,6 +22,7 @@ import type { FormattedContent, Logger, RawMessage, + StreamOptions, ThreadInfo, WebhookOptions, } from "chat"; @@ -63,7 +65,7 @@ const TELEGRAM_MESSAGE_LIMIT = 4096; const TELEGRAM_CAPTION_LIMIT = 1024; const TELEGRAM_SECRET_TOKEN_HEADER = "x-telegram-bot-api-secret-token"; const MESSAGE_ID_PATTERN = /^([^:]+):(\d+)$/; -const TELEGRAM_MARKDOWN_PARSE_MODE = "Markdown"; +const TELEGRAM_HTML_PARSE_MODE = "HTML"; const trimTrailingSlashes = (url: string): string => { let end = url.length; while (end > 0 && url[end - 1] === "/") { @@ -82,6 +84,8 @@ const TELEGRAM_MAX_POLLING_LIMIT = 100; const TELEGRAM_MIN_POLLING_LIMIT = 1; const TELEGRAM_MIN_POLLING_TIMEOUT_SECONDS = 0; const TELEGRAM_MAX_POLLING_TIMEOUT_SECONDS = 300; +const TELEGRAM_STREAM_PLACEHOLDER = "..."; +const TELEGRAM_DEFAULT_STREAM_UPDATE_INTERVAL_MS = 350; interface TelegramMessageAuthor { fullName: string; isBot: boolean | "unknown"; @@ -555,14 +559,11 @@ export class TelegramAdapter const card = extractCard(message); const replyMarkup = card ? cardToTelegramInlineKeyboard(card) : undefined; - const parseMode = card ? TELEGRAM_MARKDOWN_PARSE_MODE : undefined; + const rendered = card + ? this.renderTelegramCardText(card) + : this.renderTelegramText(message); const text = this.truncateMessage( - convertEmojiPlaceholders( - card - ? cardToFallbackText(card) - : this.formatConverter.renderPostable(message), - "gchat" - ) + convertEmojiPlaceholders(rendered.text, "gchat") ); const files = extractFiles(message); @@ -585,7 +586,7 @@ export class TelegramAdapter file, text, replyMarkup, - parseMode + rendered.parseMode ); } else { if (!text.trim()) { @@ -597,7 +598,7 @@ export class TelegramAdapter message_thread_id: parsedThread.messageThreadId, text, reply_markup: replyMarkup, - parse_mode: parseMode, + parse_mode: rendered.parseMode, }); } @@ -642,14 +643,11 @@ export class TelegramAdapter const card = extractCard(message); const replyMarkup = card ? cardToTelegramInlineKeyboard(card) : undefined; - const parseMode = card ? TELEGRAM_MARKDOWN_PARSE_MODE : undefined; + const rendered = card + ? this.renderTelegramCardText(card) + : this.renderTelegramText(message); const text = this.truncateMessage( - convertEmojiPlaceholders( - card - ? cardToFallbackText(card) - : this.formatConverter.renderPostable(message), - "gchat" - ) + convertEmojiPlaceholders(rendered.text, "gchat") ); if (!text.trim()) { @@ -663,7 +661,7 @@ export class TelegramAdapter message_id: telegramMessageId, text, reply_markup: replyMarkup ?? emptyTelegramInlineKeyboard(), - parse_mode: parseMode, + parse_mode: rendered.parseMode, } ); @@ -768,6 +766,105 @@ export class TelegramAdapter }); } + async stream( + threadId: string, + textStream: AsyncIterable, + options?: StreamOptions + ): Promise> { + const parsedThread = this.resolveThreadId(threadId); + + // Telegram drafts are currently private-chat only, so keep post+edit for groups/topics. + if (parsedThread.messageThreadId || !this.isDM(threadId)) { + return this.streamViaPostEdit(threadId, textStream, options); + } + + const updateIntervalMs = this.resolveStreamUpdateInterval(options); + + const iterator = textStream[Symbol.asyncIterator](); + const draftId = this.createDraftId(); + let rawAccumulated = ""; + let renderedAccumulated = ""; + let lastDraftText = ""; + let lastDraftSentAt = 0; + let draftUpdatesSent = 0; + + while (true) { + const next = await iterator.next(); + if (next.done) { + break; + } + + rawAccumulated += next.value; + + const now = Date.now(); + const intervalElapsed = + draftUpdatesSent === 0 || now - lastDraftSentAt >= updateIntervalMs; + + if (!intervalElapsed) { + continue; + } + + renderedAccumulated = this.renderStreamMarkdown(rawAccumulated); + + if ( + !renderedAccumulated.trim() || + renderedAccumulated === lastDraftText + ) { + continue; + } + + try { + await this.sendDraftMessage( + parsedThread.chatId, + draftId, + renderedAccumulated + ); + lastDraftText = renderedAccumulated; + lastDraftSentAt = now; + draftUpdatesSent += 1; + } catch (error) { + const isUnsupported = + draftUpdatesSent === 0 && this.isDraftMethodUnsupported(error); + + this.logger[isUnsupported ? "info" : "warn"]( + `Telegram: sendMessageDraft ${isUnsupported ? "unavailable" : "failed during stream"}, falling back to post+edit streaming`, + isUnsupported ? undefined : { error: String(error) } + ); + + const resumedTextStream = this.resumeStreamFrom( + rawAccumulated, + iterator + ); + + return this.streamViaPostEdit(threadId, resumedTextStream, options); + } + } + + if (!rawAccumulated.trim()) { + throw new ValidationError("telegram", "Message text cannot be empty"); + } + + renderedAccumulated = this.renderStreamMarkdown(rawAccumulated); + if (renderedAccumulated !== lastDraftText) { + try { + await this.sendDraftMessage( + parsedThread.chatId, + draftId, + renderedAccumulated + ); + } catch (error) { + this.logger.warn( + "Telegram: final sendMessageDraft update failed; sending final message anyway", + { + error: String(error), + } + ); + } + } + + return this.postMessage(threadId, { markdown: rawAccumulated }); + } + async fetchMessages( threadId: string, options: FetchOptions = {} @@ -869,6 +966,9 @@ export class TelegramAdapter isDM(threadId: string): boolean { const { chatId } = this.resolveThreadId(threadId); + // Telegram group/supergroup/channel chat IDs are always negative. + // Private (DM) chat IDs are positive. This heuristic is reliable + // because Telegram never assigns negative IDs to private chats. return !chatId.startsWith("-"); } @@ -1383,6 +1483,221 @@ export class TelegramAdapter return input.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); } + private async streamViaPostEdit( + threadId: string, + textStream: AsyncIterable, + options?: StreamOptions + ): Promise> { + const intervalMs = this.resolveStreamUpdateInterval(options); + const placeholderText = this.resolveFallbackPlaceholderText(options); + let posted = + placeholderText === null + ? null + : await this.postMessage(threadId, placeholderText); + let threadIdForEdits = posted?.threadId || threadId; + + let rawAccumulated = ""; + let lastRendered = placeholderText ?? ""; + let lastEditAt = Date.now(); + + for await (const chunk of textStream) { + rawAccumulated += chunk; + const now = Date.now(); + + if (!posted) { + const rendered = this.renderStreamMarkdown(rawAccumulated); + if (!rendered.trim()) { + continue; + } + + posted = await this.postMessage(threadId, { + markdown: rawAccumulated, + }); + threadIdForEdits = posted.threadId || threadId; + lastRendered = rendered; + lastEditAt = now; + continue; + } + + if (now - lastEditAt < intervalMs) { + continue; + } + + const rendered = this.renderStreamMarkdown(rawAccumulated); + if (!rendered.trim() || rendered === lastRendered) { + continue; + } + + try { + await this.editMessage(threadIdForEdits, posted.id, { + markdown: rawAccumulated, + }); + lastRendered = rendered; + lastEditAt = now; + } catch (error) { + this.logger.debug("Telegram: intermediate stream edit failed", { + error: String(error), + threadId: threadIdForEdits, + messageId: posted.id, + }); + } + } + + if (!rawAccumulated.trim()) { + if (posted) { + try { + await this.deleteMessage(threadIdForEdits, posted.id); + } catch (error) { + this.logger.debug( + "Telegram: failed to cleanup empty stream placeholder", + { + error: String(error), + threadId: threadIdForEdits, + messageId: posted.id, + } + ); + } + } + throw new ValidationError("telegram", "Message text cannot be empty"); + } + + if (!posted) { + return this.postMessage(threadId, { + markdown: rawAccumulated, + }); + } + + const finalRendered = this.renderStreamMarkdown(rawAccumulated); + if (finalRendered.trim() && finalRendered !== lastRendered) { + return this.editMessage(threadIdForEdits, posted.id, { + markdown: rawAccumulated, + }); + } + + return posted; + } + + private resolveStreamUpdateInterval(options?: StreamOptions): number { + return Math.max( + 0, + options?.updateIntervalMs ?? TELEGRAM_DEFAULT_STREAM_UPDATE_INTERVAL_MS + ); + } + + private resolveFallbackPlaceholderText( + options?: StreamOptions + ): string | null { + return options?.fallbackPlaceholderText === undefined + ? TELEGRAM_STREAM_PLACEHOLDER + : options.fallbackPlaceholderText; + } + + private resumeStreamFrom( + consumed: string, + remaining: AsyncIterator + ): AsyncIterable { + return (async function* () { + if (consumed) { + yield consumed; + } + + while (true) { + const item = await remaining.next(); + if (item.done) { + return; + } + yield item.value; + } + })(); + } + + private createDraftId(): string { + return ( + globalThis.crypto?.randomUUID?.() ?? + `draft-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 10)}` + ); + } + + private async sendDraftMessage( + chatId: string, + draftId: string, + text: string + ): Promise { + await this.telegramFetch("sendMessageDraft", { + chat_id: chatId, + draft_id: draftId, + text, + parse_mode: TELEGRAM_HTML_PARSE_MODE, + }); + } + + private isDraftMethodUnsupported(error: unknown): boolean { + if ( + error instanceof ResourceNotFoundError && + error.resourceType === "sendMessageDraft" + ) { + return true; + } + + if (error instanceof ValidationError) { + const lower = error.message.toLowerCase(); + return ( + lower.includes("sendmessagedraft") || + lower.includes("method not found") || + lower.includes("unknown method") || + lower.includes("method is not available") + ); + } + + return false; + } + + private renderStreamMarkdown(rawText: string): string { + return this.truncateMessage(this.formatConverter.fromMarkdown(rawText)); + } + + private renderTelegramCardText(card: CardElement): { + text: string; + parseMode: string; + } { + return { + text: this.formatConverter.fromMarkdown( + cardToFallbackText(card, { boldFormat: "**" }) + ), + parseMode: TELEGRAM_HTML_PARSE_MODE, + }; + } + + private renderTelegramText(message: AdapterPostableMessage): { + text: string; + parseMode?: string; + } { + if (typeof message === "string") { + return { text: message }; + } + + if ("raw" in message) { + return { text: message.raw }; + } + + if ("markdown" in message) { + return { + text: this.formatConverter.fromMarkdown(message.markdown), + parseMode: TELEGRAM_HTML_PARSE_MODE, + }; + } + + if ("ast" in message) { + return { + text: this.formatConverter.fromAst(message.ast), + parseMode: TELEGRAM_HTML_PARSE_MODE, + }; + } + + return { + text: this.formatConverter.renderPostable(message), + }; + } private normalizeUserName(value: unknown): string { if (typeof value !== "string") { return "bot"; diff --git a/packages/adapter-telegram/src/markdown.ts b/packages/adapter-telegram/src/markdown.ts index 0720415f..2c45bdbd 100644 --- a/packages/adapter-telegram/src/markdown.ts +++ b/packages/adapter-telegram/src/markdown.ts @@ -1,27 +1,37 @@ /** * Telegram format conversion. * - * Telegram supports Markdown/HTML parse modes, but to avoid - * platform-specific escaping pitfalls this adapter emits normalized - * markdown text as plain message text. + * Telegram supports parse modes for rich formatting. + * We emit Telegram-compatible HTML for formatted messages. */ import { - type AdapterPostableMessage, BaseFormatConverter, type Content, + getNodeChildren, + getNodeValue, + isBlockquoteNode, + isCodeNode, + isDeleteNode, + isEmphasisNode, + isInlineCodeNode, + isLinkNode, + isListItemNode, + isListNode, + isParagraphNode, + isStrongNode, isTableNode, + isTextNode, parseMarkdown, type Root, - stringifyMarkdown, tableToAscii, walkAst, } from "chat"; export class TelegramFormatConverter extends BaseFormatConverter { fromAst(ast: Root): string { - // Check for table nodes and replace them with code blocks, - // since Telegram renders raw pipe syntax as garbled text. + // Replace table nodes with code blocks since Telegram HTML + // does not support tables natively. const transformed = walkAst(structuredClone(ast), (node: Content) => { if (isTableNode(node)) { return { @@ -32,26 +42,123 @@ export class TelegramFormatConverter extends BaseFormatConverter { } return node; }); - return stringifyMarkdown(transformed).trim(); + return this.fromAstWithNodeConverter(transformed, (node) => + this.nodeToTelegramHtml(node) + ).trim(); } toAst(text: string): Root { return parseMarkdown(text); } - override renderPostable(message: AdapterPostableMessage): string { - if (typeof message === "string") { - return message; + private nodeToTelegramHtml(node: Content): string { + if (isParagraphNode(node)) { + return getNodeChildren(node) + .map((child) => this.nodeToTelegramHtml(child)) + .join(""); } - if ("raw" in message) { - return message.raw; + + if (isTextNode(node)) { + return this.escapeHtmlText(node.value); + } + + if (isStrongNode(node)) { + const content = getNodeChildren(node) + .map((child) => this.nodeToTelegramHtml(child)) + .join(""); + return `${content}`; + } + + if (isEmphasisNode(node)) { + const content = getNodeChildren(node) + .map((child) => this.nodeToTelegramHtml(child)) + .join(""); + return `${content}`; + } + + if (isDeleteNode(node)) { + const content = getNodeChildren(node) + .map((child) => this.nodeToTelegramHtml(child)) + .join(""); + return `${content}`; + } + + if (isInlineCodeNode(node)) { + return `${this.escapeHtmlText(node.value)}`; + } + + if (isCodeNode(node)) { + const language = node.lang?.trim(); + const escapedCode = this.escapeHtmlText(node.value); + if (language) { + return `
${escapedCode}
`; + } + return `
${escapedCode}
`; } - if ("markdown" in message) { - return this.fromMarkdown(message.markdown); + + if (isLinkNode(node)) { + const text = getNodeChildren(node) + .map((child) => this.nodeToTelegramHtml(child)) + .join(""); + const label = text || this.escapeHtmlText(node.url); + return `${label}`; + } + + if (isBlockquoteNode(node)) { + const content = getNodeChildren(node) + .map((child) => this.nodeToTelegramHtml(child)) + .join(""); + return `
${content}
`; } - if ("ast" in message) { - return this.fromAst(message.ast); + + if (isListNode(node)) { + return getNodeChildren(node) + .map((item, index) => { + const prefix = node.ordered ? `${index + 1}.` : "•"; + const content = getNodeChildren(item) + .map((child) => this.nodeToTelegramHtml(child)) + .join(""); + return `${prefix} ${content}`; + }) + .join("\n"); } - return super.renderPostable(message); + + if (isListItemNode(node)) { + return getNodeChildren(node) + .map((child) => this.nodeToTelegramHtml(child)) + .join(""); + } + + if (node.type === "break") { + return "\n"; + } + + if (node.type === "thematicBreak") { + return "──────────"; + } + + if (node.type === "html") { + return this.escapeHtmlText(node.value); + } + + const children = getNodeChildren(node); + if (children.length > 0) { + return children.map((child) => this.nodeToTelegramHtml(child)).join(""); + } + + return this.escapeHtmlText(getNodeValue(node)); + } + + private escapeHtmlText(value: string): string { + return value + .replace(/&/g, "&") + .replace(//g, ">"); + } + + private escapeHtmlAttribute(value: string): string { + return this.escapeHtmlText(value) + .replace(/"/g, """) + .replace(/'/g, "'"); } } diff --git a/packages/chat/src/thread.test.ts b/packages/chat/src/thread.test.ts index 771c6f51..84c70bff 100644 --- a/packages/chat/src/thread.test.ts +++ b/packages/chat/src/thread.test.ts @@ -569,6 +569,38 @@ describe("ThreadImpl", () => { expect.objectContaining({ recipientUserId: "U456", recipientTeamId: "T123", + updateIntervalMs: 500, + fallbackPlaceholderText: "...", + }) + ); + }); + + it("should pass custom fallback stream config to native stream adapters", async () => { + const mockStream = vi.fn().mockResolvedValue({ + id: "msg-stream", + threadId: "t1", + raw: "Hello", + }); + mockAdapter.stream = mockStream; + + const configuredThread = new ThreadImpl({ + id: "slack:C123:1234.5678", + adapter: mockAdapter, + channelId: "C123", + stateAdapter: mockState, + streamingUpdateIntervalMs: 123, + fallbackStreamingPlaceholderText: null, + }); + + const textStream = createTextStream(["Hello"]); + await configuredThread.post(textStream); + + expect(mockStream).toHaveBeenCalledWith( + "slack:C123:1234.5678", + expect.any(Object), + expect.objectContaining({ + updateIntervalMs: 123, + fallbackPlaceholderText: null, }) ); }); diff --git a/packages/chat/src/thread.ts b/packages/chat/src/thread.ts index f862734b..470caa67 100644 --- a/packages/chat/src/thread.ts +++ b/packages/chat/src/thread.ts @@ -422,7 +422,10 @@ export class ThreadImpl> // Normalize: handles plain strings, AI SDK fullStream events, and StreamChunk objects const textStream = fromFullStream(rawStream); // Build streaming options from current message context - const options: StreamOptions = {}; + const options: StreamOptions = { + updateIntervalMs: this._streamingUpdateIntervalMs, + fallbackPlaceholderText: this._fallbackStreamingPlaceholderText, + }; if (this._currentMessage) { options.recipientUserId = this._currentMessage.author.userId; // Extract teamId from raw Slack payload diff --git a/packages/chat/src/types.ts b/packages/chat/src/types.ts index 2def01a9..38092353 100644 --- a/packages/chat/src/types.ts +++ b/packages/chat/src/types.ts @@ -361,6 +361,8 @@ export interface PlanUpdateChunk { * Platform-specific options are passed through to the adapter. */ export interface StreamOptions { + /** Placeholder text for adapter-managed post+edit fallback streams. Set to null to disable. */ + fallbackPlaceholderText?: string | null; /** Slack: The team/workspace ID */ recipientTeamId?: string; /** Slack: The user ID to stream to (for AI assistant context) */