mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-02 19:55:18 +02:00
feat(gateway): add Telegram command and stream handling
This commit is contained in:
parent
59e6475348
commit
967ec099c8
2 changed files with 241 additions and 0 deletions
91
surfsense_backend/app/gateway/telegram/commands.py
Normal file
91
surfsense_backend/app/gateway/telegram/commands.py
Normal file
|
|
@ -0,0 +1,91 @@
|
||||||
|
"""Telegram command handlers."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from app.gateway.base.adapter import ParsedInboundEvent
|
||||||
|
from app.gateway.pairing import redeem_pairing_code
|
||||||
|
from app.gateway.ratelimit import acquire_token
|
||||||
|
from app.gateway.telegram.adapter import TelegramAdapter
|
||||||
|
|
||||||
|
HELP_TEXT = (
|
||||||
|
"SurfSense Telegram commands:\n"
|
||||||
|
"/start <code> - pair this chat\n"
|
||||||
|
"/new - start a fresh conversation\n"
|
||||||
|
"/help - show this help"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def command_name(text: str | None) -> str | None:
|
||||||
|
if not text or not text.startswith("/"):
|
||||||
|
return None
|
||||||
|
return text.split(maxsplit=1)[0].split("@", 1)[0].lower()
|
||||||
|
|
||||||
|
|
||||||
|
async def handle_start_command(
|
||||||
|
*,
|
||||||
|
session,
|
||||||
|
adapter: TelegramAdapter,
|
||||||
|
event: ParsedInboundEvent,
|
||||||
|
) -> bool:
|
||||||
|
text = event.text or ""
|
||||||
|
parts = text.split(maxsplit=1)
|
||||||
|
if len(parts) != 2 or not event.external_peer_id:
|
||||||
|
await adapter.send_message(
|
||||||
|
external_peer_id=event.external_peer_id or "",
|
||||||
|
text="Generate a pairing code in SurfSense Settings > Messaging Channels, then send /start CODE here.",
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
|
||||||
|
binding = await redeem_pairing_code(
|
||||||
|
session,
|
||||||
|
code=parts[1].strip(),
|
||||||
|
external_peer_id=event.external_peer_id,
|
||||||
|
external_peer_kind=event.external_peer_kind,
|
||||||
|
external_display_name=event.display_name,
|
||||||
|
external_username=event.username,
|
||||||
|
external_metadata=event.metadata,
|
||||||
|
)
|
||||||
|
if binding is None:
|
||||||
|
await adapter.send_message(
|
||||||
|
external_peer_id=event.external_peer_id,
|
||||||
|
text="That pairing code is invalid or expired. Generate a new code in SurfSense.",
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
|
||||||
|
await adapter.send_message(
|
||||||
|
external_peer_id=event.external_peer_id,
|
||||||
|
text="SurfSense is connected. Send a message here to chat with your agent.",
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
async def handle_help_command(*, adapter: TelegramAdapter, event: ParsedInboundEvent) -> bool:
|
||||||
|
if not event.external_peer_id:
|
||||||
|
return True
|
||||||
|
await adapter.send_message(external_peer_id=event.external_peer_id, text=HELP_TEXT)
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
async def send_unbound_onboarding(
|
||||||
|
*,
|
||||||
|
adapter: TelegramAdapter,
|
||||||
|
event: ParsedInboundEvent,
|
||||||
|
dashboard_url: str,
|
||||||
|
) -> None:
|
||||||
|
if not event.external_peer_id:
|
||||||
|
return
|
||||||
|
wait_ms = await acquire_token(
|
||||||
|
f"tg:onboarded:{event.external_peer_id}",
|
||||||
|
capacity=1,
|
||||||
|
refill_per_sec=1 / 3600,
|
||||||
|
)
|
||||||
|
if wait_ms > 0:
|
||||||
|
return
|
||||||
|
await adapter.send_message(
|
||||||
|
external_peer_id=event.external_peer_id,
|
||||||
|
text=(
|
||||||
|
"Hi! To use SurfSense via Telegram, generate a pairing code at "
|
||||||
|
f"{dashboard_url} and send /start CODE here."
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
150
surfsense_backend/app/gateway/telegram/translator.py
Normal file
150
surfsense_backend/app/gateway/telegram/translator.py
Normal file
|
|
@ -0,0 +1,150 @@
|
||||||
|
"""Translate agent stream events into Telegram messages."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from collections.abc import AsyncIterator
|
||||||
|
|
||||||
|
from telegram.constants import ParseMode
|
||||||
|
|
||||||
|
from app.gateway.base.adapter import PlatformSendResult
|
||||||
|
from app.gateway.base.translator import BaseStreamTranslator, GatewayStreamEvent
|
||||||
|
from app.gateway.ratelimit import wait_for_token
|
||||||
|
from app.gateway.telegram.adapter import TelegramAdapter
|
||||||
|
from app.gateway.telegram.client import retry_plaintext_on_bad_markdown
|
||||||
|
from app.gateway.telegram.formatting import chunk_message, escape_markdown_v2
|
||||||
|
from app.observability.metrics import (
|
||||||
|
record_gateway_hitl_aborted,
|
||||||
|
record_gateway_outbound,
|
||||||
|
record_gateway_rate_limit_hit,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
HITL_UNSUPPORTED_MESSAGE = (
|
||||||
|
"This action requires approval and is not yet supported from Telegram. "
|
||||||
|
"Try again with a different request."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TelegramStreamTranslator(BaseStreamTranslator):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
adapter: TelegramAdapter,
|
||||||
|
external_peer_id: str,
|
||||||
|
assistant_message_id: int | None = None,
|
||||||
|
debounce_seconds: float = 1.5,
|
||||||
|
) -> None:
|
||||||
|
self.adapter = adapter
|
||||||
|
self.external_peer_id = external_peer_id
|
||||||
|
self.assistant_message_id = assistant_message_id
|
||||||
|
self.debounce_seconds = debounce_seconds
|
||||||
|
self._buffer = ""
|
||||||
|
self._last_flush_at = 0.0
|
||||||
|
self._external_message_ids: list[str] = []
|
||||||
|
self._plaintext_mode = False
|
||||||
|
|
||||||
|
async def translate(self, events: AsyncIterator[GatewayStreamEvent]) -> None:
|
||||||
|
async for event in events:
|
||||||
|
if event.type in {"text-delta", "text_delta", "text"}:
|
||||||
|
self._buffer += str(event.data.get("text") or event.data.get("delta") or "")
|
||||||
|
await self._maybe_flush()
|
||||||
|
elif event.type in {"data-interrupt-request", "interrupt"}:
|
||||||
|
await self._handle_hitl_interrupt()
|
||||||
|
return
|
||||||
|
elif event.type in {"text-end", "finish", "done"}:
|
||||||
|
break
|
||||||
|
|
||||||
|
await self._flush(final=True)
|
||||||
|
|
||||||
|
async def _maybe_flush(self) -> None:
|
||||||
|
now = asyncio.get_running_loop().time()
|
||||||
|
if now - self._last_flush_at < self.debounce_seconds:
|
||||||
|
return
|
||||||
|
await self._flush(final=False)
|
||||||
|
self._last_flush_at = now
|
||||||
|
|
||||||
|
async def _flush(self, *, final: bool) -> None:
|
||||||
|
if not self._buffer:
|
||||||
|
return
|
||||||
|
|
||||||
|
chunks = chunk_message(self._buffer)
|
||||||
|
# During streaming, keep edits on the last chunk only. At final flush,
|
||||||
|
# send any additional chunks and mark the message as finalized by the
|
||||||
|
# persistence layer (wired through agent/task code).
|
||||||
|
if len(chunks) > 1:
|
||||||
|
for chunk in chunks[:-1]:
|
||||||
|
result = await self._send_text(chunk)
|
||||||
|
self._external_message_ids.append(result.external_message_id)
|
||||||
|
self._buffer = chunks[-1]
|
||||||
|
|
||||||
|
text = self._format_text(self._buffer)
|
||||||
|
if self._external_message_ids:
|
||||||
|
await self._edit_text(self._external_message_ids[-1], text)
|
||||||
|
else:
|
||||||
|
result = await self._send_text(self._buffer)
|
||||||
|
self._external_message_ids.append(result.external_message_id)
|
||||||
|
|
||||||
|
if final:
|
||||||
|
logger.debug(
|
||||||
|
"Telegram gateway finalized assistant message id=%s external_ids=%s",
|
||||||
|
self.assistant_message_id,
|
||||||
|
self._external_message_ids,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _format_text(self, text: str) -> str:
|
||||||
|
return text if self._plaintext_mode else escape_markdown_v2(text)
|
||||||
|
|
||||||
|
async def _send_text(self, text: str) -> PlatformSendResult:
|
||||||
|
await self._throttle()
|
||||||
|
parse_mode = None if self._plaintext_mode else ParseMode.MARKDOWN_V2
|
||||||
|
try:
|
||||||
|
result = await retry_plaintext_on_bad_markdown(
|
||||||
|
self.adapter.send_message,
|
||||||
|
external_peer_id=self.external_peer_id,
|
||||||
|
text=self._format_text(text),
|
||||||
|
parse_mode=parse_mode,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
record_gateway_outbound(platform="telegram", kind="send", status="failed")
|
||||||
|
raise
|
||||||
|
record_gateway_outbound(platform="telegram", kind="send", status="sent")
|
||||||
|
return result
|
||||||
|
|
||||||
|
async def _edit_text(self, message_id: str, text: str) -> PlatformSendResult:
|
||||||
|
await self._throttle()
|
||||||
|
parse_mode = None if self._plaintext_mode else ParseMode.MARKDOWN_V2
|
||||||
|
try:
|
||||||
|
result = await retry_plaintext_on_bad_markdown(
|
||||||
|
self.adapter.edit_message,
|
||||||
|
external_peer_id=self.external_peer_id,
|
||||||
|
external_message_id=message_id,
|
||||||
|
text=text,
|
||||||
|
parse_mode=parse_mode,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
record_gateway_outbound(platform="telegram", kind="edit", status="failed")
|
||||||
|
raise
|
||||||
|
record_gateway_outbound(platform="telegram", kind="edit", status="edited")
|
||||||
|
return result
|
||||||
|
|
||||||
|
async def _throttle(self) -> None:
|
||||||
|
chat_wait = await wait_for_token(
|
||||||
|
f"tg:chat:{self.external_peer_id}",
|
||||||
|
capacity=1,
|
||||||
|
refill_per_sec=1.0,
|
||||||
|
)
|
||||||
|
if chat_wait:
|
||||||
|
record_gateway_rate_limit_hit(bucket="tg:chat")
|
||||||
|
global_wait = await wait_for_token("tg:global", capacity=25, refill_per_sec=25.0)
|
||||||
|
if global_wait:
|
||||||
|
record_gateway_rate_limit_hit(bucket="tg:global")
|
||||||
|
|
||||||
|
async def _handle_hitl_interrupt(self) -> None:
|
||||||
|
if self._buffer:
|
||||||
|
await self._flush(final=False)
|
||||||
|
await self._send_text(HITL_UNSUPPORTED_MESSAGE)
|
||||||
|
record_gateway_hitl_aborted(platform="telegram")
|
||||||
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue