diff --git a/surfsense_backend/app/routes/gateway_webhook_routes.py b/surfsense_backend/app/routes/gateway_webhook_routes.py index 5508e534c..ffadd19d7 100644 --- a/surfsense_backend/app/routes/gateway_webhook_routes.py +++ b/surfsense_backend/app/routes/gateway_webhook_routes.py @@ -2,24 +2,32 @@ from __future__ import annotations +import hashlib import hmac +import json import logging +import time import uuid from datetime import UTC, datetime from typing import Any -from urllib.parse import quote +from urllib.parse import quote, urlencode +from uuid import UUID +import httpx from fastapi import APIRouter, Depends, HTTPException, Request from pydantic import BaseModel from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession -from starlette.responses import Response +from starlette.responses import JSONResponse, RedirectResponse, Response from app.config import config from app.db import ( ExternalChatAccount, + ExternalChatAccountMode, ExternalChatBinding, ExternalChatBindingState, + ExternalChatHealthStatus, + ExternalChatPeerKind, ExternalChatPlatform, User, get_async_session, @@ -27,19 +35,92 @@ from app.db import ( from app.gateway.accounts import ( get_or_create_system_telegram_account, get_or_create_system_whatsapp_account, + get_slack_account_by_team, ) from app.gateway.bindings import resume_binding, revoke_binding -from app.gateway.inbox import persist_inbound_event, telegram_event_dedupe_key +from app.gateway.inbox import ( + persist_inbound_event, + slack_event_dedupe_key, + telegram_event_dedupe_key, +) from app.gateway.pairing import generate_pairing_code, pairing_expires_at +from app.gateway.slack.adapter import slack_user_peer_id from app.observability.metrics import ( record_gateway_inbox_write, record_gateway_webhook_parse_error, ) from app.users import current_active_user +from app.utils.oauth_security import OAuthStateManager, TokenEncryption router = APIRouter(prefix="/gateway", tags=["gateway"]) logger = logging.getLogger(__name__) +SLACK_AUTHORIZATION_URL = "https://slack.com/oauth/v2/authorize" +SLACK_TOKEN_URL = "https://slack.com/api/oauth.v2.access" +SLACK_BOT_SCOPES = [ + "app_mentions:read", + "chat:write", + "channels:read", + "groups:read", + "im:write", + "users:read", + "team:read", +] +_state_manager: OAuthStateManager | None = None +_token_encryption: TokenEncryption | None = None + + +def _get_state_manager() -> OAuthStateManager: + global _state_manager + if _state_manager is None: + if not config.SECRET_KEY: + raise HTTPException(status_code=500, detail="SECRET_KEY is not configured") + _state_manager = OAuthStateManager(config.SECRET_KEY) + return _state_manager + + +def _get_token_encryption() -> TokenEncryption: + global _token_encryption + if _token_encryption is None: + if not config.SECRET_KEY: + raise HTTPException(status_code=500, detail="SECRET_KEY is not configured") + _token_encryption = TokenEncryption(config.SECRET_KEY) + return _token_encryption + + +def _slack_redirect_uri() -> str: + if config.GATEWAY_SLACK_REDIRECT_URI: + return config.GATEWAY_SLACK_REDIRECT_URI + base = config.BACKEND_URL or "" + return f"{base.rstrip('/')}/api/v1/gateway/slack/callback" + + +def _slack_frontend_redirect(space_id: int, *, success: bool = False, error: str | None = None) -> RedirectResponse: + qs = "slack_gateway=connected" if success else f"error={error or 'slack_gateway_failed'}" + return RedirectResponse( + url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/user-settings?{qs}" + ) + + +def verify_slack_signature(*, signing_secret: str, timestamp: str | None, signature: str | None, body: bytes) -> bool: + if not signing_secret or not timestamp or not signature: + return False + try: + ts = int(timestamp) + except ValueError: + return False + if abs(time.time() - ts) > 60 * 5: + return False + base = b"v0:" + timestamp.encode() + b":" + body + digest = hmac.new(signing_secret.encode(), base, hashlib.sha256).hexdigest() + expected = f"v0={digest}" + return hmac.compare_digest(expected, signature) + + +def _slack_event_kind(payload: dict[str, Any]) -> str: + event_type = str((payload.get("event") or {}).get("type") or "") + return "message" if event_type in {"app_mention", "message"} else "other" + class StartBindingRequest(BaseModel): platform: ExternalChatPlatform = ExternalChatPlatform.TELEGRAM @@ -67,6 +148,213 @@ def _telegram_message(payload: dict[str, Any]) -> dict[str, Any] | None: return payload.get("message") or payload.get("edited_message") +@router.get("/slack/install") +async def install_slack_gateway( + search_space_id: int, + user: User = Depends(current_active_user), +) -> dict[str, str]: + if not config.GATEWAY_SLACK_CLIENT_ID: + raise HTTPException(status_code=500, detail="Slack gateway OAuth is not configured") + state = _get_state_manager().generate_secure_state(search_space_id, user.id) + auth_params = { + "client_id": config.GATEWAY_SLACK_CLIENT_ID, + "scope": ",".join(SLACK_BOT_SCOPES), + "redirect_uri": _slack_redirect_uri(), + "state": state, + } + return {"auth_url": f"{SLACK_AUTHORIZATION_URL}?{urlencode(auth_params)}"} + + +@router.get("/slack/callback") +async def slack_gateway_callback( + code: str | None = None, + error: str | None = None, + state: str | None = None, + session: AsyncSession = Depends(get_async_session), +) -> RedirectResponse: + space_id = None + if state: + try: + state_data = _get_state_manager().validate_state(state) + space_id = int(state_data["space_id"]) + except Exception: + state_data = None + else: + state_data = None + + if error: + return _slack_frontend_redirect(space_id or 0, error="slack_gateway_oauth_denied") + if not code or state_data is None: + raise HTTPException(status_code=400, detail="Invalid Slack gateway OAuth callback") + if not config.GATEWAY_SLACK_CLIENT_ID or not config.GATEWAY_SLACK_CLIENT_SECRET: + raise HTTPException(status_code=500, detail="Slack gateway OAuth is not configured") + + user_id = UUID(state_data["user_id"]) + token_payload = { + "client_id": config.GATEWAY_SLACK_CLIENT_ID, + "client_secret": config.GATEWAY_SLACK_CLIENT_SECRET, + "code": code, + "redirect_uri": _slack_redirect_uri(), + } + async with httpx.AsyncClient(timeout=30.0) as client: + token_response = await client.post( + SLACK_TOKEN_URL, + data=token_payload, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + token_response.raise_for_status() + token_json = token_response.json() + if not token_json.get("ok", False): + raise HTTPException( + status_code=400, + detail=f"Slack gateway OAuth failed: {token_json.get('error', 'unknown_error')}", + ) + + bot_token = token_json.get("access_token") + team = token_json.get("team") or {} + team_id = team.get("id") + if not bot_token or not team_id: + raise HTTPException(status_code=400, detail="Slack gateway OAuth returned incomplete data") + + bot_user_id = token_json.get("bot_user_id") + app_id = token_json.get("app_id") + authed_user = token_json.get("authed_user") or {} + authed_slack_user_id = authed_user.get("id") + enc = _get_token_encryption() + credentials = { + "bot_token": bot_token, + "token_type": token_json.get("token_type", "bot"), + "scope": token_json.get("scope"), + } + cursor_state = { + "team_id": team_id, + "team_name": team.get("name"), + "enterprise_id": (token_json.get("enterprise") or {}).get("id"), + "app_id": app_id, + "bot_user_id": bot_user_id, + "scope": token_json.get("scope"), + } + + account = await get_slack_account_by_team(session, team_id=team_id) + if account is None: + account = ExternalChatAccount( + platform=ExternalChatPlatform.SLACK, + mode=ExternalChatAccountMode.CLOUD_SHARED, + is_system_account=True, + encrypted_credentials=enc.encrypt_token(json.dumps(credentials)), + bot_username="SurfSense", + cursor_state=cursor_state, + health_status=ExternalChatHealthStatus.UNKNOWN, + ) + session.add(account) + await session.flush() + else: + account.encrypted_credentials = enc.encrypt_token(json.dumps(credentials)) + account.cursor_state = {**(account.cursor_state or {}), **cursor_state} + account.health_status = ExternalChatHealthStatus.UNKNOWN + + if authed_slack_user_id: + peer_id = slack_user_peer_id(team_id, authed_slack_user_id) + existing_binding_result = await session.execute( + select(ExternalChatBinding).where( + ExternalChatBinding.account_id == account.id, + ExternalChatBinding.external_peer_id == peer_id, + ExternalChatBinding.state.in_( + [ExternalChatBindingState.BOUND, ExternalChatBindingState.SUSPENDED] + ), + ) + ) + binding = existing_binding_result.scalars().first() + if binding is None: + session.add( + ExternalChatBinding( + account_id=account.id, + user_id=user_id, + search_space_id=space_id, + state=ExternalChatBindingState.BOUND, + external_peer_id=peer_id, + external_peer_kind=ExternalChatPeerKind.DIRECT, + external_username=authed_slack_user_id, + external_metadata={ + "kind": "slack_user", + "team_id": team_id, + "slack_user_id": authed_slack_user_id, + }, + ) + ) + elif binding.user_id == user_id: + binding.search_space_id = space_id + binding.external_metadata = { + **(binding.external_metadata or {}), + "kind": "slack_user", + "team_id": team_id, + "slack_user_id": authed_slack_user_id, + } + + await session.commit() + return _slack_frontend_redirect(space_id, success=True) + + +@router.post("/webhooks/slack") +async def slack_webhook( + request: Request, + session: AsyncSession = Depends(get_async_session), +) -> Response: + body = await request.body() + if not verify_slack_signature( + signing_secret=config.GATEWAY_SLACK_SIGNING_SECRET or "", + timestamp=request.headers.get("X-Slack-Request-Timestamp"), + signature=request.headers.get("X-Slack-Signature"), + body=body, + ): + raise HTTPException(status_code=403, detail="Invalid Slack signature") + + try: + payload = json.loads(body.decode()) + except ValueError: + record_gateway_webhook_parse_error() + return Response(status_code=200) + + if payload.get("type") == "url_verification": + return JSONResponse({"challenge": payload.get("challenge", "")}) + if payload.get("type") != "event_callback": + return Response(status_code=200) + + event = payload.get("event") or {} + event_id = payload.get("event_id") + team_id = payload.get("team_id") or event.get("team") + if not event_id or not team_id: + return Response(status_code=200) + + account = await get_slack_account_by_team(session, team_id=str(team_id)) + if account is None: + logger.warning("Ignoring Slack event for uninstalled team_id=%s", team_id) + return Response(status_code=200) + + bot_user_id = (account.cursor_state or {}).get("bot_user_id") + if event.get("bot_id") or (bot_user_id and event.get("user") == bot_user_id): + return Response(status_code=200) + + try: + inbox_id = await persist_inbound_event( + session, + account_id=account.id, + platform=ExternalChatPlatform.SLACK, + event_dedupe_key=slack_event_dedupe_key(event_id), + external_event_id=str(event_id), + external_message_id=str(event.get("ts")) if event.get("ts") else None, + event_kind=_slack_event_kind(payload), + raw_payload=payload, + request_id=f"gateway_{uuid.uuid4().hex[:16]}", + ) + await session.commit() + record_gateway_inbox_write(platform="slack", dedup_skipped=inbox_id is None) + except Exception: + await session.rollback() + logger.exception("Slack webhook persistence failed team_id=%s", team_id) + return Response(status_code=200) + + async def _resolve_webhook_account( session: AsyncSession, *, @@ -207,6 +495,7 @@ async def list_bindings( "search_space_id": binding.search_space_id, "external_display_name": binding.external_display_name, "external_username": binding.external_username, + "external_metadata": binding.external_metadata, "suspended_reason": binding.suspended_reason, } for binding, account in result.all() diff --git a/surfsense_backend/tests/unit/gateway/test_webhook_routes.py b/surfsense_backend/tests/unit/gateway/test_webhook_routes.py index 9a62a3cce..a624cbde1 100644 --- a/surfsense_backend/tests/unit/gateway/test_webhook_routes.py +++ b/surfsense_backend/tests/unit/gateway/test_webhook_routes.py @@ -1,10 +1,14 @@ from __future__ import annotations +import hashlib +import hmac import inspect +import json +import time import pytest -from app.db import ExternalChatPlatform, ExternalChatAccount +from app.db import ExternalChatAccount, ExternalChatAccountMode, ExternalChatPlatform from app.routes import gateway_webhook_routes as routes @@ -19,6 +23,9 @@ class RequestStub: raise self._json_exc return self._payload + async def body(self): + return json.dumps(self._payload).encode() + def _account(secret: str = "secret") -> ExternalChatAccount: return ExternalChatAccount( @@ -29,6 +36,38 @@ def _account(secret: str = "secret") -> ExternalChatAccount: ) +def _slack_account() -> ExternalChatAccount: + return ExternalChatAccount( + id=456, + platform=ExternalChatPlatform.SLACK, + mode=ExternalChatAccountMode.CLOUD_SHARED, + is_system_account=True, + cursor_state={"team_id": "T123", "bot_user_id": "U_BOT"}, + ) + + +def _signed_slack_request(payload: dict, *, secret: str = "signing-secret") -> RequestStub: + body = json.dumps(payload).encode() + timestamp = str(int(time.time())) + digest = hmac.new( + secret.encode(), + b"v0:" + timestamp.encode() + b":" + body, + hashlib.sha256, + ).hexdigest() + + class SlackRequestStub(RequestStub): + async def body(self): + return body + + return SlackRequestStub( + payload, + headers={ + "X-Slack-Request-Timestamp": timestamp, + "X-Slack-Signature": f"v0={digest}", + }, + ) + + async def _call_webhook(*, request: RequestStub, account_id: int, session): return await routes.telegram_webhook( request=request, @@ -147,3 +186,89 @@ def test_telegram_webhook_does_not_use_slowapi_limiter(): assert "@limiter.limit" not in route_source + +def test_verify_slack_signature_accepts_valid_signature(): + payload = b'{"type":"event_callback"}' + timestamp = str(int(time.time())) + digest = hmac.new( + b"secret", + b"v0:" + timestamp.encode() + b":" + payload, + hashlib.sha256, + ).hexdigest() + + assert routes.verify_slack_signature( + signing_secret="secret", + timestamp=timestamp, + signature=f"v0={digest}", + body=payload, + ) + + +@pytest.mark.asyncio +async def test_slack_webhook_url_verification(monkeypatch, mocker): + monkeypatch.setattr(routes.config, "GATEWAY_SLACK_SIGNING_SECRET", "signing-secret") + request = _signed_slack_request({"type": "url_verification", "challenge": "abc123"}) + + response = await routes.slack_webhook(request=request, session=mocker.AsyncMock()) + + assert response.status_code == 200 + assert json.loads(response.body)["challenge"] == "abc123" + + +@pytest.mark.asyncio +async def test_slack_webhook_persists_event(monkeypatch, mocker): + monkeypatch.setattr(routes.config, "GATEWAY_SLACK_SIGNING_SECRET", "signing-secret") + session = mocker.AsyncMock() + monkeypatch.setattr(routes, "get_slack_account_by_team", mocker.AsyncMock(return_value=_slack_account())) + persist = mocker.AsyncMock(return_value=100) + monkeypatch.setattr(routes, "persist_inbound_event", persist) + payload = { + "type": "event_callback", + "team_id": "T123", + "event_id": "Ev123", + "event": { + "type": "app_mention", + "channel": "C123", + "user": "U123", + "text": "<@U_BOT> hello", + "ts": "1717000000.000100", + }, + } + request = _signed_slack_request(payload) + + response = await routes.slack_webhook(request=request, session=session) + + assert response.status_code == 200 + persist.assert_awaited_once() + assert persist.await_args.kwargs["event_dedupe_key"] == "slack_event:Ev123" + assert persist.await_args.kwargs["platform"] == ExternalChatPlatform.SLACK + session.commit.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_slack_webhook_ignores_self_event(monkeypatch, mocker): + monkeypatch.setattr(routes.config, "GATEWAY_SLACK_SIGNING_SECRET", "signing-secret") + session = mocker.AsyncMock() + monkeypatch.setattr(routes, "get_slack_account_by_team", mocker.AsyncMock(return_value=_slack_account())) + persist = mocker.AsyncMock(return_value=100) + monkeypatch.setattr(routes, "persist_inbound_event", persist) + request = _signed_slack_request( + { + "type": "event_callback", + "team_id": "T123", + "event_id": "Ev123", + "event": { + "type": "app_mention", + "channel": "C123", + "user": "U_BOT", + "text": "loop", + "ts": "1717000000.000100", + }, + } + ) + + response = await routes.slack_webhook(request=request, session=session) + + assert response.status_code == 200 + persist.assert_not_awaited() +