diff --git a/surfsense_backend/app/routes/gateway_webhook_routes.py b/surfsense_backend/app/routes/gateway_webhook_routes.py index 86b84f067..f9b6acc93 100644 --- a/surfsense_backend/app/routes/gateway_webhook_routes.py +++ b/surfsense_backend/app/routes/gateway_webhook_routes.py @@ -2,6 +2,9 @@ from __future__ import annotations +import hmac +import logging +import uuid from datetime import UTC, datetime from typing import Any @@ -13,10 +16,10 @@ from starlette.responses import Response from app.config import config from app.db import ( - GatewayBindingState, - GatewayConversationBinding, - GatewayPlatform, - GatewayPlatformAccount, + ExternalChatBindingState, + ExternalChatBinding, + ExternalChatPlatform, + ExternalChatAccount, User, get_async_session, ) @@ -24,15 +27,18 @@ from app.gateway.accounts import get_or_create_system_telegram_account from app.gateway.bindings import resume_binding, revoke_binding from app.gateway.inbox import persist_inbound_event, telegram_event_dedupe_key from app.gateway.pairing import generate_pairing_code, pairing_expires_at -from app.observability.metrics import record_gateway_inbox_write -from app.rate_limiter import limiter +from app.observability.metrics import ( + record_gateway_inbox_write, + record_gateway_webhook_parse_error, +) from app.users import current_active_user router = APIRouter(prefix="/gateway", tags=["gateway"]) +logger = logging.getLogger(__name__) class StartBindingRequest(BaseModel): - platform: GatewayPlatform = GatewayPlatform.TELEGRAM + platform: ExternalChatPlatform = ExternalChatPlatform.TELEGRAM search_space_id: int @@ -60,63 +66,63 @@ def _telegram_message(payload: dict[str, Any]) -> dict[str, Any] | None: async def _resolve_webhook_account( session: AsyncSession, *, - secret: str, + account_id: int, header_secret: str | None, -) -> GatewayPlatformAccount: - if config.TELEGRAM_WEBHOOK_SECRET and secret == config.TELEGRAM_WEBHOOK_SECRET: - if header_secret != config.TELEGRAM_WEBHOOK_SECRET: - raise HTTPException(status_code=403, detail="Invalid Telegram webhook secret") - return await get_or_create_system_telegram_account(session) - - result = await session.execute( - select(GatewayPlatformAccount).where( - GatewayPlatformAccount.platform == GatewayPlatform.TELEGRAM - ) - ) - for account in result.scalars(): - metadata = account.account_metadata or {} - webhook_secret = metadata.get("webhook_secret") - if webhook_secret and webhook_secret == secret: - if header_secret != webhook_secret: - raise HTTPException(status_code=403, detail="Invalid Telegram webhook secret") - return account - - raise HTTPException(status_code=404, detail="Gateway account not found") +) -> ExternalChatAccount: + account = await session.get(ExternalChatAccount, account_id) + if account is None or account.platform != ExternalChatPlatform.TELEGRAM: + raise HTTPException(status_code=404, detail="Gateway account not found") + expected_secret = account.webhook_secret or "" + if not expected_secret or not hmac.compare_digest(header_secret or "", expected_secret): + raise HTTPException(status_code=403, detail="Invalid Telegram webhook secret") + return account -@router.post("/webhooks/telegram/{secret}") -@limiter.limit("60/minute", key_func=lambda request: f"tg-webhook:{request.path_params['secret']}") +@router.post("/webhooks/telegram/{account_id}") async def telegram_webhook( request: Request, - secret: str, + account_id: int, session: AsyncSession = Depends(get_async_session), ) -> Response: - payload = await request.json() - account = await _resolve_webhook_account( - session, - secret=secret, - header_secret=request.headers.get("X-Telegram-Bot-Api-Secret-Token"), - ) - update_id = payload.get("update_id") - if update_id is None: + request_id = f"gateway_{uuid.uuid4().hex[:16]}" + try: + payload = await request.json() + except ValueError: + record_gateway_webhook_parse_error() return Response(status_code=200) - message = _telegram_message(payload) or {} - inbox_id = await persist_inbound_event( + account = await _resolve_webhook_account( session, - account_id=account.id, - platform=GatewayPlatform.TELEGRAM, - event_dedupe_key=telegram_event_dedupe_key(update_id), - external_event_id=str(update_id), - external_message_id=( - str(message["message_id"]) if message.get("message_id") is not None else None - ), - event_kind=_classify_telegram_event(payload), - raw_payload=payload, + account_id=account_id, + header_secret=request.headers.get("X-Telegram-Bot-Api-Secret-Token"), ) - await session.commit() - record_gateway_inbox_write(platform="telegram", dedup_skipped=inbox_id is None) - return Response(status_code=200) + + try: + update_id = payload.get("update_id") + if update_id is None: + return Response(status_code=200) + + message = _telegram_message(payload) or {} + inbox_id = await persist_inbound_event( + session, + account_id=account.id, + platform=ExternalChatPlatform.TELEGRAM, + event_dedupe_key=telegram_event_dedupe_key(update_id), + external_event_id=str(update_id), + external_message_id=( + str(message["message_id"]) if message.get("message_id") is not None else None + ), + event_kind=_classify_telegram_event(payload), + raw_payload=payload, + request_id=request_id, + ) + await session.commit() + record_gateway_inbox_write(platform="telegram", dedup_skipped=inbox_id is None) + return Response(status_code=200) + except Exception: + await session.rollback() + logger.exception("Telegram webhook processing failed account_id=%s", account_id) + return Response(status_code=200) @router.post("/bindings/start", response_model=StartBindingResponse) @@ -125,17 +131,17 @@ async def start_binding( user: User = Depends(current_active_user), session: AsyncSession = Depends(get_async_session), ) -> StartBindingResponse: - if body.platform != GatewayPlatform.TELEGRAM: + if body.platform != ExternalChatPlatform.TELEGRAM: raise HTTPException(status_code=400, detail="Only Telegram is supported in v1") account = await get_or_create_system_telegram_account(session) code = generate_pairing_code() expires_at = pairing_expires_at() - binding = GatewayConversationBinding( + binding = ExternalChatBinding( account_id=account.id, user_id=user.id, search_space_id=body.search_space_id, - state=GatewayBindingState.PENDING, + state=ExternalChatBindingState.PENDING, pairing_code=code, pairing_code_expires_at=expires_at, ) @@ -143,7 +149,7 @@ async def start_binding( await session.commit() await session.refresh(binding) - username = account.account_metadata.get("bot_username") or config.TELEGRAM_SHARED_BOT_USERNAME + username = account.bot_username or config.TELEGRAM_SHARED_BOT_USERNAME if not username: raise HTTPException(status_code=500, detail="Telegram bot username is not configured") return StartBindingResponse( @@ -160,8 +166,8 @@ async def list_bindings( session: AsyncSession = Depends(get_async_session), ) -> list[dict[str, Any]]: result = await session.execute( - select(GatewayConversationBinding).where( - GatewayConversationBinding.user_id == user.id + select(ExternalChatBinding).where( + ExternalChatBinding.user_id == user.id ) ) return [ @@ -184,9 +190,9 @@ async def list_platforms( session: AsyncSession = Depends(get_async_session), ) -> list[dict[str, Any]]: result = await session.execute( - select(GatewayPlatformAccount).where( - (GatewayPlatformAccount.owner_user_id == user.id) - | (GatewayPlatformAccount.is_system_account.is_(True)) + select(ExternalChatAccount).where( + (ExternalChatAccount.owner_user_id == user.id) + | (ExternalChatAccount.is_system_account.is_(True)) ) ) return [ @@ -194,7 +200,7 @@ async def list_platforms( "id": account.id, "platform": account.platform.value, "mode": account.mode.value, - "bot_username": (account.account_metadata or {}).get("bot_username"), + "bot_username": account.bot_username, "health_status": account.health_status.value, "last_health_check_at": account.last_health_check_at, } @@ -208,7 +214,7 @@ async def delete_binding( user: User = Depends(current_active_user), session: AsyncSession = Depends(get_async_session), ) -> dict[str, bool]: - binding = await session.get(GatewayConversationBinding, binding_id) + binding = await session.get(ExternalChatBinding, binding_id) if binding is None or binding.user_id != user.id: raise HTTPException(status_code=404, detail="Binding not found") revoke_binding(binding) @@ -217,12 +223,12 @@ async def delete_binding( @router.post("/bindings/{binding_id}/resume") -async def resume_gateway_binding( +async def resume_external_chat_binding( binding_id: int, user: User = Depends(current_active_user), session: AsyncSession = Depends(get_async_session), ) -> dict[str, bool]: - binding = await session.get(GatewayConversationBinding, binding_id) + binding = await session.get(ExternalChatBinding, binding_id) if binding is None or binding.user_id != user.id: raise HTTPException(status_code=404, detail="Binding not found") resume_binding(binding) diff --git a/surfsense_backend/scripts/register_webhook.py b/surfsense_backend/scripts/register_webhook.py index 2004ad118..44ead9470 100644 --- a/surfsense_backend/scripts/register_webhook.py +++ b/surfsense_backend/scripts/register_webhook.py @@ -10,6 +10,9 @@ import sys from dotenv import load_dotenv from telegram import Bot +from app.db import async_session_maker +from app.gateway.accounts import get_or_create_system_telegram_account + load_dotenv() WEBHOOK_SECRET_RE = re.compile(r"^[A-Za-z0-9_-]{1,256}$") @@ -32,7 +35,13 @@ async def main() -> int: ) return 1 - webhook_url = f"{base_url.rstrip('/')}/api/v1/gateway/webhooks/telegram/{secret}" + async with async_session_maker() as session: + account = await get_or_create_system_telegram_account(session) + account.webhook_secret = secret + await session.commit() + account_id = int(account.id) + + webhook_url = f"{base_url.rstrip('/')}/api/v1/gateway/webhooks/telegram/{account_id}" bot = Bot(token=token) ok = await bot.set_webhook( url=webhook_url, diff --git a/surfsense_backend/tests/unit/gateway/test_webhook_routes.py b/surfsense_backend/tests/unit/gateway/test_webhook_routes.py new file mode 100644 index 000000000..9a62a3cce --- /dev/null +++ b/surfsense_backend/tests/unit/gateway/test_webhook_routes.py @@ -0,0 +1,149 @@ +from __future__ import annotations + +import inspect + +import pytest + +from app.db import ExternalChatPlatform, ExternalChatAccount +from app.routes import gateway_webhook_routes as routes + + +class RequestStub: + def __init__(self, payload=None, *, headers=None, json_exc: Exception | None = None): + self.headers = headers or {} + self._payload = payload + self._json_exc = json_exc + + async def json(self): + if self._json_exc is not None: + raise self._json_exc + return self._payload + + +def _account(secret: str = "secret") -> ExternalChatAccount: + return ExternalChatAccount( + id=123, + platform=ExternalChatPlatform.TELEGRAM, + webhook_secret=secret, + bot_username="surf_bot", + ) + + +async def _call_webhook(*, request: RequestStub, account_id: int, session): + return await routes.telegram_webhook( + request=request, + account_id=account_id, + session=session, + ) + + +@pytest.mark.asyncio +async def test_telegram_webhook_returns_200_on_null_update_id(mocker): + session = mocker.AsyncMock() + session.get.return_value = _account() + request = RequestStub( + {"message": {"message_id": 7}}, + headers={"X-Telegram-Bot-Api-Secret-Token": "secret"}, + ) + + response = await _call_webhook( + request=request, + account_id=123, + session=session, + ) + + assert response.status_code == 200 + session.commit.assert_not_called() + + +@pytest.mark.asyncio +async def test_telegram_webhook_returns_200_on_bad_json(mocker, monkeypatch): + parse_metric = mocker.Mock() + monkeypatch.setattr(routes, "record_gateway_webhook_parse_error", parse_metric) + request = RequestStub(json_exc=ValueError("bad json")) + + response = await _call_webhook( + request=request, + account_id=123, + session=mocker.AsyncMock(), + ) + + assert response.status_code == 200 + parse_metric.assert_called_once_with() + + +@pytest.mark.asyncio +async def test_resolve_webhook_account_rejects_missing_or_wrong_header(mocker): + session = mocker.AsyncMock() + session.get.return_value = _account() + + with pytest.raises(routes.HTTPException) as missing: + await routes._resolve_webhook_account( + session, + account_id=123, + header_secret=None, + ) + assert missing.value.status_code == 403 + + with pytest.raises(routes.HTTPException) as wrong: + await routes._resolve_webhook_account( + session, + account_id=123, + header_secret="wrong", + ) + assert wrong.value.status_code == 403 + + +@pytest.mark.asyncio +async def test_telegram_webhook_persists_for_fastapi_inbox_worker(mocker, monkeypatch): + session = mocker.AsyncMock() + session.get.return_value = _account() + persist = mocker.AsyncMock(return_value=99) + monkeypatch.setattr(routes, "persist_inbound_event", persist) + + request = RequestStub( + { + "update_id": 10, + "message": {"message_id": 7, "chat": {"id": 1}, "from": {"id": 2}}, + }, + headers={"X-Telegram-Bot-Api-Secret-Token": "secret"}, + ) + + response = await _call_webhook( + request=request, + account_id=123, + session=session, + ) + + assert response.status_code == 200 + persist.assert_awaited_once() + session.commit.assert_awaited_once() + assert persist.await_args.kwargs["request_id"].startswith("gateway_") + + +@pytest.mark.asyncio +async def test_telegram_webhook_commits_dedup_without_enqueue(mocker, monkeypatch): + session = mocker.AsyncMock() + session.get.return_value = _account() + monkeypatch.setattr(routes, "persist_inbound_event", mocker.AsyncMock(return_value=None)) + + request = RequestStub( + {"update_id": 10, "message": {"message_id": 7}}, + headers={"X-Telegram-Bot-Api-Secret-Token": "secret"}, + ) + + response = await _call_webhook( + request=request, + account_id=123, + session=session, + ) + + assert response.status_code == 200 + session.commit.assert_awaited_once() + + +def test_telegram_webhook_does_not_use_slowapi_limiter(): + route_source = inspect.getsource(routes) + + assert "@limiter.limit" not in route_source +