refactor(gateway): run inbox and BYO polling from FastAPI lifespan

This commit is contained in:
Anish Sarkar 2026-05-28 04:38:00 +05:30
parent 72024353f9
commit 08bf3cc023
9 changed files with 415 additions and 81 deletions

View file

@ -0,0 +1,94 @@
"""FastAPI lifespan integration for self-hosted BYO Telegram long-polling."""
from __future__ import annotations
import asyncio
import logging
from sqlalchemy import select
from app.config import config
from app.db import ExternalChatPlatform, ExternalChatAccount, async_session_maker
from app.gateway.accounts import account_token
from app.gateway.runner import _run_telegram_account
logger = logging.getLogger(__name__)
_tasks: set[asyncio.Task[None]] = set()
_shutdown_event: asyncio.Event | None = None
async def _sleep_or_shutdown(seconds: float) -> None:
if _shutdown_event is None:
await asyncio.sleep(seconds)
return
try:
await asyncio.wait_for(_shutdown_event.wait(), timeout=seconds)
except TimeoutError:
return
async def _byo_account_supervisor(account_id: int, token: str) -> None:
while _shutdown_event is None or not _shutdown_event.is_set():
try:
await _run_telegram_account(account_id, token)
except asyncio.CancelledError:
raise
except Exception:
logger.exception(
"BYO Telegram long-poll failed account_id=%s; retrying in 30s",
account_id,
)
await _sleep_or_shutdown(30)
async def start_byo_long_poll_supervisors() -> None:
"""Start one BYO long-poll supervisor per active non-system Telegram account."""
global _shutdown_event
if not config.GATEWAY_BYO_LONGPOLL_ENABLED:
return
if _tasks:
return
_shutdown_event = asyncio.Event()
async with async_session_maker() as session:
result = await session.execute(
select(ExternalChatAccount).where(
ExternalChatAccount.platform == ExternalChatPlatform.TELEGRAM,
ExternalChatAccount.is_system_account.is_(False),
ExternalChatAccount.suspended_at.is_(None),
)
)
accounts = list(result.scalars())
for account in accounts:
token = account_token(account)
if not token:
continue
task = asyncio.create_task(
_byo_account_supervisor(int(account.id), token),
name=f"gateway-byo-telegram-{account.id}",
)
_tasks.add(task)
task.add_done_callback(_tasks.discard)
logger.info("Started BYO Telegram long-poll supervisor account_id=%s", account.id)
async def stop_byo_long_poll_supervisors() -> None:
"""Cancel and await all BYO long-poll supervisors."""
global _shutdown_event
if _shutdown_event is not None:
_shutdown_event.set()
tasks = list(_tasks)
for task in tasks:
task.cancel()
if tasks:
try:
await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=10)
except TimeoutError:
logger.warning("Timed out waiting for BYO Telegram long-poll supervisors to stop")
_tasks.clear()
_shutdown_event = None

View file

@ -0,0 +1,55 @@
"""FastAPI lifespan worker for gateway inbox processing."""
from __future__ import annotations
import asyncio
import logging
from contextlib import suppress
from app.gateway.inbox_processor import claim_next_inbound_event, process_inbound_event
logger = logging.getLogger(__name__)
_task: asyncio.Task[None] | None = None
async def _process_inbox_forever() -> None:
logger.info("Gateway inbox processor started in FastAPI process")
while True:
try:
inbox_id = await claim_next_inbound_event()
if inbox_id is None:
await asyncio.sleep(0.5)
continue
logger.info("Gateway processing inbox_id=%s", inbox_id)
await process_inbound_event(inbox_id)
logger.info("Gateway processed inbox_id=%s", inbox_id)
except asyncio.CancelledError:
raise
except RuntimeError as exc:
if str(exc) == "gateway_thread_busy":
logger.info("Gateway inbox_id busy; will retry from RECEIVED state")
else:
logger.exception("Gateway inbox processor failed one iteration")
await asyncio.sleep(1)
except Exception:
logger.exception("Gateway inbox processor failed one iteration")
await asyncio.sleep(1)
async def start_gateway_inbox_worker() -> None:
global _task
if _task is not None and not _task.done():
return
_task = asyncio.create_task(_process_inbox_forever(), name="gateway-inbox-worker")
async def stop_gateway_inbox_worker() -> None:
global _task
if _task is None:
return
_task.cancel()
with suppress(TimeoutError, asyncio.CancelledError):
await asyncio.wait_for(_task, timeout=10)
_task = None

View file

@ -1,18 +1,17 @@
"""Long-lived messaging gateway runner."""
"""Telegram BYO long-poll helper for FastAPI lifespan."""
from __future__ import annotations
import asyncio
import hashlib
import logging
import uuid
from sqlalchemy import select, text
from sqlalchemy import text
from app.db import GatewayPlatform, GatewayPlatformAccount, async_session_maker, engine
from app.gateway.accounts import account_token
from app.db import ExternalChatPlatform, ExternalChatAccount, async_session_maker, engine
from app.gateway.inbox import persist_inbound_event, telegram_event_dedupe_key
from app.gateway.inbox_processor import claim_next_inbound_event, process_inbound_event
from app.gateway.telegram.adapter import TelegramAdapter
from app.observability.metrics import record_gateway_byo_longpoll_running_delta
logger = logging.getLogger(__name__)
@ -22,76 +21,45 @@ def _lock_key(token: str) -> int:
return int.from_bytes(digest[:8], "big", signed=True)
class GatewayRunner:
async def run(self) -> None:
logger.info("Gateway runner started. Waiting for inbound events.")
tasks = [asyncio.create_task(self._process_inbox_forever())]
async with async_session_maker() as session:
result = await session.execute(
select(GatewayPlatformAccount).where(
GatewayPlatformAccount.platform == GatewayPlatform.TELEGRAM,
GatewayPlatformAccount.is_system_account.is_(False),
GatewayPlatformAccount.suspended_at.is_(None),
)
)
accounts = list(result.scalars())
for account in accounts:
token = account_token(account)
if not token:
continue
logger.info("Starting Telegram long-poll loop for account_id=%s", account.id)
tasks.append(asyncio.create_task(self._run_telegram_account(account.id, token)))
await asyncio.gather(*tasks)
async def _process_inbox_forever(self) -> None:
logger.info("Gateway inbox processor started")
while True:
try:
inbox_id = await claim_next_inbound_event()
if inbox_id is None:
await asyncio.sleep(0.5)
continue
logger.info("Gateway processing inbox_id=%s", inbox_id)
await process_inbound_event(inbox_id)
logger.info("Gateway processed inbox_id=%s", inbox_id)
except Exception:
logger.exception("Gateway inbox processor failed one iteration")
await asyncio.sleep(1)
async def _run_telegram_account(self, account_id: int, token: str) -> None:
async with engine.connect() as conn:
got_lock = await conn.scalar(
text("SELECT pg_try_advisory_lock(:key)"),
{"key": _lock_key(token)},
)
if not got_lock:
logger.warning("Another Telegram gateway runner is active; exiting")
return
async def _run_telegram_account(account_id: int, token: str) -> None:
async with engine.connect() as conn:
lock_key = _lock_key(token)
got_lock = await conn.scalar(
text("SELECT pg_try_advisory_lock(:key)"),
{"key": lock_key},
)
if not got_lock:
logger.warning("Another Telegram gateway runner is active; exiting")
return
record_gateway_byo_longpoll_running_delta(1, account_id=account_id)
try:
adapter = TelegramAdapter(token)
async with async_session_maker() as session:
account = await session.get(GatewayPlatformAccount, account_id)
account = await session.get(ExternalChatAccount, account_id)
offset = None
if account is not None:
offset = int((account.cursor_state or {}).get("last_update_id", 0)) + 1
async for update in adapter.fetch_updates(offset=offset):
request_id = f"gateway_{uuid.uuid4().hex[:16]}"
async with async_session_maker() as session:
parsed = adapter.parse_inbound(update)
inbox_id = await persist_inbound_event(
session,
account_id=account_id,
platform=GatewayPlatform.TELEGRAM,
platform=ExternalChatPlatform.TELEGRAM,
event_dedupe_key=telegram_event_dedupe_key(update["update_id"]),
external_event_id=str(update["update_id"]),
external_message_id=parsed.external_message_id,
event_kind=parsed.event_kind,
raw_payload=update,
request_id=request_id,
)
await session.commit()
if inbox_id is not None:
logger.debug("Persisted Telegram polling update inbox_id=%s", inbox_id)
finally:
record_gateway_byo_longpoll_running_delta(-1, account_id=account_id)
await conn.execute(text("SELECT pg_advisory_unlock(:key)"), {"key": lock_key})