diff --git a/surfsense_backend/app/routes/__init__.py b/surfsense_backend/app/routes/__init__.py index ec4d1650f..f46b6fc65 100644 --- a/surfsense_backend/app/routes/__init__.py +++ b/surfsense_backend/app/routes/__init__.py @@ -18,6 +18,7 @@ from .dropbox_add_connector_route import router as dropbox_add_connector_router from .editor_routes import router as editor_router from .export_routes import router as export_router from .folders_routes import router as folders_router +from .gateway_webhook_routes import router as gateway_router from .google_calendar_add_connector_route import ( router as google_calendar_add_connector_router, ) @@ -68,6 +69,7 @@ router.include_router(editor_router) router.include_router(export_router) router.include_router(documents_router) router.include_router(folders_router) +router.include_router(gateway_router) router.include_router(notes_router) router.include_router(new_chat_router) # Chat with assistant-ui persistence router.include_router(agent_revert_router) # POST /threads/{id}/revert/{action_id} diff --git a/surfsense_backend/app/routes/gateway_webhook_routes.py b/surfsense_backend/app/routes/gateway_webhook_routes.py new file mode 100644 index 000000000..86b84f067 --- /dev/null +++ b/surfsense_backend/app/routes/gateway_webhook_routes.py @@ -0,0 +1,232 @@ +"""Messaging gateway routes.""" + +from __future__ import annotations + +from datetime import UTC, datetime +from typing import Any + +from fastapi import APIRouter, Depends, HTTPException, Request +from pydantic import BaseModel +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession +from starlette.responses import Response + +from app.config import config +from app.db import ( + GatewayBindingState, + GatewayConversationBinding, + GatewayPlatform, + GatewayPlatformAccount, + User, + get_async_session, +) +from app.gateway.accounts import get_or_create_system_telegram_account +from app.gateway.bindings import resume_binding, revoke_binding +from app.gateway.inbox import persist_inbound_event, telegram_event_dedupe_key +from app.gateway.pairing import generate_pairing_code, pairing_expires_at +from app.observability.metrics import record_gateway_inbox_write +from app.rate_limiter import limiter +from app.users import current_active_user + +router = APIRouter(prefix="/gateway", tags=["gateway"]) + + +class StartBindingRequest(BaseModel): + platform: GatewayPlatform = GatewayPlatform.TELEGRAM + search_space_id: int + + +class StartBindingResponse(BaseModel): + binding_id: int + code: str + deep_link: str + expires_at: datetime + + +def _classify_telegram_event(payload: dict[str, Any]) -> str: + if "message" in payload: + return "message" + if "edited_message" in payload: + return "edited_message" + if "callback_query" in payload: + return "callback_query" + return "other" + + +def _telegram_message(payload: dict[str, Any]) -> dict[str, Any] | None: + return payload.get("message") or payload.get("edited_message") + + +async def _resolve_webhook_account( + session: AsyncSession, + *, + secret: str, + header_secret: str | None, +) -> GatewayPlatformAccount: + if config.TELEGRAM_WEBHOOK_SECRET and secret == config.TELEGRAM_WEBHOOK_SECRET: + if header_secret != config.TELEGRAM_WEBHOOK_SECRET: + raise HTTPException(status_code=403, detail="Invalid Telegram webhook secret") + return await get_or_create_system_telegram_account(session) + + result = await session.execute( + select(GatewayPlatformAccount).where( + GatewayPlatformAccount.platform == GatewayPlatform.TELEGRAM + ) + ) + for account in result.scalars(): + metadata = account.account_metadata or {} + webhook_secret = metadata.get("webhook_secret") + if webhook_secret and webhook_secret == secret: + if header_secret != webhook_secret: + raise HTTPException(status_code=403, detail="Invalid Telegram webhook secret") + return account + + raise HTTPException(status_code=404, detail="Gateway account not found") + + +@router.post("/webhooks/telegram/{secret}") +@limiter.limit("60/minute", key_func=lambda request: f"tg-webhook:{request.path_params['secret']}") +async def telegram_webhook( + request: Request, + secret: str, + session: AsyncSession = Depends(get_async_session), +) -> Response: + payload = await request.json() + account = await _resolve_webhook_account( + session, + secret=secret, + header_secret=request.headers.get("X-Telegram-Bot-Api-Secret-Token"), + ) + update_id = payload.get("update_id") + if update_id is None: + return Response(status_code=200) + + message = _telegram_message(payload) or {} + inbox_id = await persist_inbound_event( + session, + account_id=account.id, + platform=GatewayPlatform.TELEGRAM, + event_dedupe_key=telegram_event_dedupe_key(update_id), + external_event_id=str(update_id), + external_message_id=( + str(message["message_id"]) if message.get("message_id") is not None else None + ), + event_kind=_classify_telegram_event(payload), + raw_payload=payload, + ) + await session.commit() + record_gateway_inbox_write(platform="telegram", dedup_skipped=inbox_id is None) + return Response(status_code=200) + + +@router.post("/bindings/start", response_model=StartBindingResponse) +async def start_binding( + body: StartBindingRequest, + user: User = Depends(current_active_user), + session: AsyncSession = Depends(get_async_session), +) -> StartBindingResponse: + if body.platform != GatewayPlatform.TELEGRAM: + raise HTTPException(status_code=400, detail="Only Telegram is supported in v1") + + account = await get_or_create_system_telegram_account(session) + code = generate_pairing_code() + expires_at = pairing_expires_at() + binding = GatewayConversationBinding( + account_id=account.id, + user_id=user.id, + search_space_id=body.search_space_id, + state=GatewayBindingState.PENDING, + pairing_code=code, + pairing_code_expires_at=expires_at, + ) + session.add(binding) + await session.commit() + await session.refresh(binding) + + username = account.account_metadata.get("bot_username") or config.TELEGRAM_SHARED_BOT_USERNAME + if not username: + raise HTTPException(status_code=500, detail="Telegram bot username is not configured") + return StartBindingResponse( + binding_id=binding.id, + code=code, + deep_link=f"https://t.me/{username}?start={code}", + expires_at=expires_at, + ) + + +@router.get("/bindings") +async def list_bindings( + user: User = Depends(current_active_user), + session: AsyncSession = Depends(get_async_session), +) -> list[dict[str, Any]]: + result = await session.execute( + select(GatewayConversationBinding).where( + GatewayConversationBinding.user_id == user.id + ) + ) + return [ + { + "id": binding.id, + "platform": "telegram", + "state": binding.state.value, + "search_space_id": binding.search_space_id, + "external_display_name": binding.external_display_name, + "external_username": binding.external_username, + "suspended_reason": binding.suspended_reason, + } + for binding in result.scalars() + ] + + +@router.get("/platforms") +async def list_platforms( + user: User = Depends(current_active_user), + session: AsyncSession = Depends(get_async_session), +) -> list[dict[str, Any]]: + result = await session.execute( + select(GatewayPlatformAccount).where( + (GatewayPlatformAccount.owner_user_id == user.id) + | (GatewayPlatformAccount.is_system_account.is_(True)) + ) + ) + return [ + { + "id": account.id, + "platform": account.platform.value, + "mode": account.mode.value, + "bot_username": (account.account_metadata or {}).get("bot_username"), + "health_status": account.health_status.value, + "last_health_check_at": account.last_health_check_at, + } + for account in result.scalars() + ] + + +@router.delete("/bindings/{binding_id}") +async def delete_binding( + binding_id: int, + user: User = Depends(current_active_user), + session: AsyncSession = Depends(get_async_session), +) -> dict[str, bool]: + binding = await session.get(GatewayConversationBinding, binding_id) + if binding is None or binding.user_id != user.id: + raise HTTPException(status_code=404, detail="Binding not found") + revoke_binding(binding) + await session.commit() + return {"ok": True} + + +@router.post("/bindings/{binding_id}/resume") +async def resume_gateway_binding( + binding_id: int, + user: User = Depends(current_active_user), + session: AsyncSession = Depends(get_async_session), +) -> dict[str, bool]: + binding = await session.get(GatewayConversationBinding, binding_id) + if binding is None or binding.user_id != user.id: + raise HTTPException(status_code=404, detail="Binding not found") + resume_binding(binding) + binding.updated_at = datetime.now(UTC) + await session.commit() + return {"ok": True} + diff --git a/surfsense_backend/scripts/register_webhook.py b/surfsense_backend/scripts/register_webhook.py new file mode 100644 index 000000000..2004ad118 --- /dev/null +++ b/surfsense_backend/scripts/register_webhook.py @@ -0,0 +1,52 @@ +"""Register the SurfSense Telegram webhook.""" + +from __future__ import annotations + +import asyncio +import os +import re +import sys + +from dotenv import load_dotenv +from telegram import Bot + +load_dotenv() + +WEBHOOK_SECRET_RE = re.compile(r"^[A-Za-z0-9_-]{1,256}$") + + +async def main() -> int: + token = os.getenv("TELEGRAM_SHARED_BOT_TOKEN") + secret = os.getenv("TELEGRAM_WEBHOOK_SECRET") + base_url = os.getenv("GATEWAY_BASE_URL") or os.getenv("BACKEND_URL") + if not token or not secret or not base_url: + print( + "Missing TELEGRAM_SHARED_BOT_TOKEN, TELEGRAM_WEBHOOK_SECRET, or GATEWAY_BASE_URL/BACKEND_URL", + file=sys.stderr, + ) + return 1 + if not WEBHOOK_SECRET_RE.fullmatch(secret): + print( + "TELEGRAM_WEBHOOK_SECRET must be 1-256 chars and contain only A-Z, a-z, 0-9, '_' or '-'", + file=sys.stderr, + ) + return 1 + + webhook_url = f"{base_url.rstrip('/')}/api/v1/gateway/webhooks/telegram/{secret}" + bot = Bot(token=token) + ok = await bot.set_webhook( + url=webhook_url, + secret_token=secret, + allowed_updates=["message", "edited_message"], + drop_pending_updates=True, + ) + if not ok: + print("Telegram rejected webhook registration", file=sys.stderr) + return 1 + print(f"Registered Telegram webhook: {webhook_url}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(asyncio.run(main())) +