feat(gateway): add Telegram adapter and formatting

This commit is contained in:
Anish Sarkar 2026-05-27 23:38:09 +05:30
parent c9b7d7b572
commit 59e6475348
5 changed files with 298 additions and 0 deletions

View file

@ -0,0 +1,2 @@
"""Telegram gateway adapter."""

View file

@ -0,0 +1,114 @@
"""Telegram platform adapter."""
from __future__ import annotations
from collections.abc import AsyncIterator
from typing import Any
from app.gateway.base.adapter import (
BasePlatformAdapter,
ParsedInboundEvent,
PlatformSendResult,
)
from app.gateway.telegram.client import TelegramClient
class TelegramAdapter(BasePlatformAdapter):
platform = "telegram"
def __init__(self, token: str) -> None:
self.client = TelegramClient(token)
def parse_inbound(self, raw_payload: dict[str, Any]) -> ParsedInboundEvent:
event_kind = "other"
message = raw_payload.get("message")
if message is not None:
event_kind = "message"
else:
message = raw_payload.get("edited_message")
if message is not None:
event_kind = "edited_message"
if message is None:
return ParsedInboundEvent(
platform=self.platform,
event_kind=event_kind,
external_peer_id=None,
external_peer_kind="unknown",
external_message_id=None,
external_user_id=None,
text=None,
raw_payload=raw_payload,
)
chat = message.get("chat") or {}
sender = message.get("from") or {}
chat_type = str(chat.get("type") or "unknown")
peer_kind = {
"private": "direct",
"group": "group",
"supergroup": "group",
"channel": "channel",
}.get(chat_type, "unknown")
display_name = chat.get("title") or " ".join(
part
for part in (sender.get("first_name"), sender.get("last_name"))
if part
)
return ParsedInboundEvent(
platform=self.platform,
event_kind=event_kind,
external_peer_id=str(chat["id"]) if chat.get("id") is not None else None,
external_peer_kind=peer_kind,
external_message_id=(
str(message["message_id"]) if message.get("message_id") is not None else None
),
external_user_id=str(sender["id"]) if sender.get("id") is not None else None,
text=message.get("text") or message.get("caption"),
raw_payload=raw_payload,
display_name=display_name or None,
username=sender.get("username") or chat.get("username"),
metadata={"chat_type": chat_type, "update_id": raw_payload.get("update_id")},
)
async def send_message(
self,
*,
external_peer_id: str,
text: str,
parse_mode: str | None = None,
reply_to_message_id: str | None = None,
) -> PlatformSendResult:
return await self.client.send_message(
chat_id=external_peer_id,
text=text,
parse_mode=parse_mode,
reply_to_message_id=reply_to_message_id,
)
async def edit_message(
self,
*,
external_peer_id: str,
external_message_id: str,
text: str,
parse_mode: str | None = None,
) -> PlatformSendResult:
return await self.client.edit_message(
chat_id=external_peer_id,
message_id=external_message_id,
text=text,
parse_mode=parse_mode,
)
async def validate_credentials(self) -> dict[str, Any]:
return await self.client.validate()
async def leave_chat(self, *, external_peer_id: str) -> None:
await self.client.leave_chat(chat_id=external_peer_id)
async def fetch_updates(self, *, offset: int | None) -> AsyncIterator[dict[str, Any]]:
async for update in self.client.get_updates(offset=offset):
yield update

View file

@ -0,0 +1,109 @@
"""Thin async Telegram Bot API client."""
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator
from datetime import timedelta
from typing import Any
from telegram import Bot
from telegram.error import BadRequest, RetryAfter
from app.gateway.base.adapter import PlatformSendResult
def retry_after_seconds(value: int | timedelta) -> float:
if isinstance(value, timedelta):
return value.total_seconds()
return float(value)
class TelegramClient:
def __init__(self, token: str) -> None:
self.token = token
self.bot = Bot(token=token)
async def send_message(
self,
*,
chat_id: str,
text: str,
parse_mode: str | None = None,
reply_to_message_id: str | None = None,
) -> PlatformSendResult:
kwargs: dict[str, Any] = {}
if parse_mode:
kwargs["parse_mode"] = parse_mode
if reply_to_message_id:
kwargs["reply_to_message_id"] = int(reply_to_message_id)
try:
msg = await self.bot.send_message(chat_id=chat_id, text=text, **kwargs)
except RetryAfter as exc:
await asyncio.sleep(retry_after_seconds(exc.retry_after))
msg = await self.bot.send_message(chat_id=chat_id, text=text, **kwargs)
return PlatformSendResult(
external_message_id=str(msg.message_id),
raw_response=msg.to_dict(),
)
async def edit_message(
self,
*,
chat_id: str,
message_id: str,
text: str,
parse_mode: str | None = None,
) -> PlatformSendResult:
kwargs: dict[str, Any] = {}
if parse_mode:
kwargs["parse_mode"] = parse_mode
try:
msg = await self.bot.edit_message_text(
chat_id=chat_id,
message_id=int(message_id),
text=text,
**kwargs,
)
except RetryAfter as exc:
await asyncio.sleep(retry_after_seconds(exc.retry_after))
msg = await self.bot.edit_message_text(
chat_id=chat_id,
message_id=int(message_id),
text=text,
**kwargs,
)
return PlatformSendResult(
external_message_id=str(msg.message_id),
raw_response=msg.to_dict(),
)
async def validate(self) -> dict[str, Any]:
me = await self.bot.get_me()
return me.to_dict()
async def leave_chat(self, *, chat_id: str) -> None:
await self.bot.leave_chat(chat_id=chat_id)
async def get_updates(self, *, offset: int | None) -> AsyncIterator[dict[str, Any]]:
next_offset = offset
while True:
updates = await self.bot.get_updates(
offset=next_offset,
timeout=30,
allowed_updates=["message", "edited_message"],
)
for update in updates:
next_offset = update.update_id + 1
yield update.to_dict()
async def retry_plaintext_on_bad_markdown(call, *args, **kwargs) -> PlatformSendResult:
try:
return await call(*args, **kwargs)
except BadRequest as exc:
if "can't parse entities" not in str(exc).lower():
raise
kwargs["parse_mode"] = None
return await call(*args, **kwargs)

View file

@ -0,0 +1,55 @@
"""Telegram formatting helpers."""
from __future__ import annotations
import re
MARKDOWN_V2_RESERVED = r"_*[]()~`>#+-=|{}.!"
MAX_TELEGRAM_MESSAGE_UNITS = 4096
_RESERVED_RE = re.compile(r"([_\*\[\]\(\)~`>#+\-=|{}\.!])")
def escape_markdown_v2(text: str) -> str:
"""Escape all Telegram MarkdownV2 reserved characters."""
return _RESERVED_RE.sub(r"\\\1", text)
def _utf16_len(text: str) -> int:
return len(text.encode("utf-16-le")) // 2
def _split_at_boundary(text: str, max_units: int) -> tuple[str, str]:
if _utf16_len(text) <= max_units:
return text, ""
# Build a hard upper bound by code point, then walk back to natural
# boundaries. Telegram's limit is UTF-16 code units, so verify candidates.
end = min(len(text), max_units)
while end > 0 and _utf16_len(text[:end]) > max_units:
end -= 1
candidate = text[:end]
boundary = max(candidate.rfind("\n\n"), candidate.rfind(". "), candidate.rfind("\n"))
if boundary > max(200, end // 2):
end = boundary + (2 if candidate[boundary : boundary + 2] in {"\n\n", ". "} else 1)
return text[:end], text[end:]
def chunk_message(
text: str,
*,
max_units: int = MAX_TELEGRAM_MESSAGE_UNITS,
) -> list[str]:
"""Split a Telegram message at paragraph/sentence boundaries."""
if not text:
return [""]
chunks: list[str] = []
remaining = text
while remaining:
chunk, remaining = _split_at_boundary(remaining, max_units)
chunks.append(chunk)
return chunks

View file

@ -0,0 +1,18 @@
from app.gateway.telegram.formatting import chunk_message, escape_markdown_v2
def test_escape_markdown_v2_reserved_chars():
text = r"_*[]()~`>#+-=|{}.!"
assert escape_markdown_v2(text) == r"\_\*\[\]\(\)\~\`\>\#\+\-\=\|\{\}\.\!"
def test_chunk_message_preserves_content_and_limits_size():
text = "First paragraph.\n\n" + ("x" * 5000)
chunks = chunk_message(text, max_units=4096)
assert "".join(chunks) == text
assert len(chunks) > 1
assert all(len(chunk.encode("utf-16-le")) // 2 <= 4096 for chunk in chunks)