From 59e64753484e4cd4929c88b2fa588c572f30f53f Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Wed, 27 May 2026 23:38:09 +0530 Subject: [PATCH] feat(gateway): add Telegram adapter and formatting --- .../app/gateway/telegram/__init__.py | 2 + .../app/gateway/telegram/adapter.py | 114 ++++++++++++++++++ .../app/gateway/telegram/client.py | 109 +++++++++++++++++ .../app/gateway/telegram/formatting.py | 55 +++++++++ .../tests/unit/gateway/test_formatting.py | 18 +++ 5 files changed, 298 insertions(+) create mode 100644 surfsense_backend/app/gateway/telegram/__init__.py create mode 100644 surfsense_backend/app/gateway/telegram/adapter.py create mode 100644 surfsense_backend/app/gateway/telegram/client.py create mode 100644 surfsense_backend/app/gateway/telegram/formatting.py create mode 100644 surfsense_backend/tests/unit/gateway/test_formatting.py diff --git a/surfsense_backend/app/gateway/telegram/__init__.py b/surfsense_backend/app/gateway/telegram/__init__.py new file mode 100644 index 000000000..45dc05414 --- /dev/null +++ b/surfsense_backend/app/gateway/telegram/__init__.py @@ -0,0 +1,2 @@ +"""Telegram gateway adapter.""" + diff --git a/surfsense_backend/app/gateway/telegram/adapter.py b/surfsense_backend/app/gateway/telegram/adapter.py new file mode 100644 index 000000000..4f0001128 --- /dev/null +++ b/surfsense_backend/app/gateway/telegram/adapter.py @@ -0,0 +1,114 @@ +"""Telegram platform adapter.""" + +from __future__ import annotations + +from collections.abc import AsyncIterator +from typing import Any + +from app.gateway.base.adapter import ( + BasePlatformAdapter, + ParsedInboundEvent, + PlatformSendResult, +) +from app.gateway.telegram.client import TelegramClient + + +class TelegramAdapter(BasePlatformAdapter): + platform = "telegram" + + def __init__(self, token: str) -> None: + self.client = TelegramClient(token) + + def parse_inbound(self, raw_payload: dict[str, Any]) -> ParsedInboundEvent: + event_kind = "other" + message = raw_payload.get("message") + if message is not None: + event_kind = "message" + else: + message = raw_payload.get("edited_message") + if message is not None: + event_kind = "edited_message" + + if message is None: + return ParsedInboundEvent( + platform=self.platform, + event_kind=event_kind, + external_peer_id=None, + external_peer_kind="unknown", + external_message_id=None, + external_user_id=None, + text=None, + raw_payload=raw_payload, + ) + + chat = message.get("chat") or {} + sender = message.get("from") or {} + chat_type = str(chat.get("type") or "unknown") + peer_kind = { + "private": "direct", + "group": "group", + "supergroup": "group", + "channel": "channel", + }.get(chat_type, "unknown") + display_name = chat.get("title") or " ".join( + part + for part in (sender.get("first_name"), sender.get("last_name")) + if part + ) + + return ParsedInboundEvent( + platform=self.platform, + event_kind=event_kind, + external_peer_id=str(chat["id"]) if chat.get("id") is not None else None, + external_peer_kind=peer_kind, + external_message_id=( + str(message["message_id"]) if message.get("message_id") is not None else None + ), + external_user_id=str(sender["id"]) if sender.get("id") is not None else None, + text=message.get("text") or message.get("caption"), + raw_payload=raw_payload, + display_name=display_name or None, + username=sender.get("username") or chat.get("username"), + metadata={"chat_type": chat_type, "update_id": raw_payload.get("update_id")}, + ) + + async def send_message( + self, + *, + external_peer_id: str, + text: str, + parse_mode: str | None = None, + reply_to_message_id: str | None = None, + ) -> PlatformSendResult: + return await self.client.send_message( + chat_id=external_peer_id, + text=text, + parse_mode=parse_mode, + reply_to_message_id=reply_to_message_id, + ) + + async def edit_message( + self, + *, + external_peer_id: str, + external_message_id: str, + text: str, + parse_mode: str | None = None, + ) -> PlatformSendResult: + return await self.client.edit_message( + chat_id=external_peer_id, + message_id=external_message_id, + text=text, + parse_mode=parse_mode, + ) + + async def validate_credentials(self) -> dict[str, Any]: + return await self.client.validate() + + async def leave_chat(self, *, external_peer_id: str) -> None: + await self.client.leave_chat(chat_id=external_peer_id) + + async def fetch_updates(self, *, offset: int | None) -> AsyncIterator[dict[str, Any]]: + async for update in self.client.get_updates(offset=offset): + yield update + diff --git a/surfsense_backend/app/gateway/telegram/client.py b/surfsense_backend/app/gateway/telegram/client.py new file mode 100644 index 000000000..6f36f0564 --- /dev/null +++ b/surfsense_backend/app/gateway/telegram/client.py @@ -0,0 +1,109 @@ +"""Thin async Telegram Bot API client.""" + +from __future__ import annotations + +import asyncio +from collections.abc import AsyncIterator +from datetime import timedelta +from typing import Any + +from telegram import Bot +from telegram.error import BadRequest, RetryAfter + +from app.gateway.base.adapter import PlatformSendResult + + +def retry_after_seconds(value: int | timedelta) -> float: + if isinstance(value, timedelta): + return value.total_seconds() + return float(value) + + +class TelegramClient: + def __init__(self, token: str) -> None: + self.token = token + self.bot = Bot(token=token) + + async def send_message( + self, + *, + chat_id: str, + text: str, + parse_mode: str | None = None, + reply_to_message_id: str | None = None, + ) -> PlatformSendResult: + kwargs: dict[str, Any] = {} + if parse_mode: + kwargs["parse_mode"] = parse_mode + if reply_to_message_id: + kwargs["reply_to_message_id"] = int(reply_to_message_id) + try: + msg = await self.bot.send_message(chat_id=chat_id, text=text, **kwargs) + except RetryAfter as exc: + await asyncio.sleep(retry_after_seconds(exc.retry_after)) + msg = await self.bot.send_message(chat_id=chat_id, text=text, **kwargs) + return PlatformSendResult( + external_message_id=str(msg.message_id), + raw_response=msg.to_dict(), + ) + + async def edit_message( + self, + *, + chat_id: str, + message_id: str, + text: str, + parse_mode: str | None = None, + ) -> PlatformSendResult: + kwargs: dict[str, Any] = {} + if parse_mode: + kwargs["parse_mode"] = parse_mode + try: + msg = await self.bot.edit_message_text( + chat_id=chat_id, + message_id=int(message_id), + text=text, + **kwargs, + ) + except RetryAfter as exc: + await asyncio.sleep(retry_after_seconds(exc.retry_after)) + msg = await self.bot.edit_message_text( + chat_id=chat_id, + message_id=int(message_id), + text=text, + **kwargs, + ) + return PlatformSendResult( + external_message_id=str(msg.message_id), + raw_response=msg.to_dict(), + ) + + async def validate(self) -> dict[str, Any]: + me = await self.bot.get_me() + return me.to_dict() + + async def leave_chat(self, *, chat_id: str) -> None: + await self.bot.leave_chat(chat_id=chat_id) + + async def get_updates(self, *, offset: int | None) -> AsyncIterator[dict[str, Any]]: + next_offset = offset + while True: + updates = await self.bot.get_updates( + offset=next_offset, + timeout=30, + allowed_updates=["message", "edited_message"], + ) + for update in updates: + next_offset = update.update_id + 1 + yield update.to_dict() + + +async def retry_plaintext_on_bad_markdown(call, *args, **kwargs) -> PlatformSendResult: + try: + return await call(*args, **kwargs) + except BadRequest as exc: + if "can't parse entities" not in str(exc).lower(): + raise + kwargs["parse_mode"] = None + return await call(*args, **kwargs) + diff --git a/surfsense_backend/app/gateway/telegram/formatting.py b/surfsense_backend/app/gateway/telegram/formatting.py new file mode 100644 index 000000000..ecc7064bd --- /dev/null +++ b/surfsense_backend/app/gateway/telegram/formatting.py @@ -0,0 +1,55 @@ +"""Telegram formatting helpers.""" + +from __future__ import annotations + +import re + +MARKDOWN_V2_RESERVED = r"_*[]()~`>#+-=|{}.!" +MAX_TELEGRAM_MESSAGE_UNITS = 4096 + +_RESERVED_RE = re.compile(r"([_\*\[\]\(\)~`>#+\-=|{}\.!])") + + +def escape_markdown_v2(text: str) -> str: + """Escape all Telegram MarkdownV2 reserved characters.""" + return _RESERVED_RE.sub(r"\\\1", text) + + +def _utf16_len(text: str) -> int: + return len(text.encode("utf-16-le")) // 2 + + +def _split_at_boundary(text: str, max_units: int) -> tuple[str, str]: + if _utf16_len(text) <= max_units: + return text, "" + + # Build a hard upper bound by code point, then walk back to natural + # boundaries. Telegram's limit is UTF-16 code units, so verify candidates. + end = min(len(text), max_units) + while end > 0 and _utf16_len(text[:end]) > max_units: + end -= 1 + + candidate = text[:end] + boundary = max(candidate.rfind("\n\n"), candidate.rfind(". "), candidate.rfind("\n")) + if boundary > max(200, end // 2): + end = boundary + (2 if candidate[boundary : boundary + 2] in {"\n\n", ". "} else 1) + + return text[:end], text[end:] + + +def chunk_message( + text: str, + *, + max_units: int = MAX_TELEGRAM_MESSAGE_UNITS, +) -> list[str]: + """Split a Telegram message at paragraph/sentence boundaries.""" + if not text: + return [""] + + chunks: list[str] = [] + remaining = text + while remaining: + chunk, remaining = _split_at_boundary(remaining, max_units) + chunks.append(chunk) + return chunks + diff --git a/surfsense_backend/tests/unit/gateway/test_formatting.py b/surfsense_backend/tests/unit/gateway/test_formatting.py new file mode 100644 index 000000000..61c7ea20f --- /dev/null +++ b/surfsense_backend/tests/unit/gateway/test_formatting.py @@ -0,0 +1,18 @@ +from app.gateway.telegram.formatting import chunk_message, escape_markdown_v2 + + +def test_escape_markdown_v2_reserved_chars(): + text = r"_*[]()~`>#+-=|{}.!" + + assert escape_markdown_v2(text) == r"\_\*\[\]\(\)\~\`\>\#\+\-\=\|\{\}\.\!" + + +def test_chunk_message_preserves_content_and_limits_size(): + text = "First paragraph.\n\n" + ("x" * 5000) + + chunks = chunk_message(text, max_units=4096) + + assert "".join(chunks) == text + assert len(chunks) > 1 + assert all(len(chunk.encode("utf-16-le")) // 2 <= 4096 for chunk in chunks) +