feat(gateway): add Slack gateway webhook flow

This commit is contained in:
Anish Sarkar 2026-06-01 12:37:04 +05:30
parent f305a2e67d
commit 9c7e093db4
2 changed files with 418 additions and 4 deletions

View file

@ -2,24 +2,32 @@
from __future__ import annotations
import hashlib
import hmac
import json
import logging
import time
import uuid
from datetime import UTC, datetime
from typing import Any
from urllib.parse import quote
from urllib.parse import quote, urlencode
from uuid import UUID
import httpx
from fastapi import APIRouter, Depends, HTTPException, Request
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from starlette.responses import Response
from starlette.responses import JSONResponse, RedirectResponse, Response
from app.config import config
from app.db import (
ExternalChatAccount,
ExternalChatAccountMode,
ExternalChatBinding,
ExternalChatBindingState,
ExternalChatHealthStatus,
ExternalChatPeerKind,
ExternalChatPlatform,
User,
get_async_session,
@ -27,19 +35,92 @@ from app.db import (
from app.gateway.accounts import (
get_or_create_system_telegram_account,
get_or_create_system_whatsapp_account,
get_slack_account_by_team,
)
from app.gateway.bindings import resume_binding, revoke_binding
from app.gateway.inbox import persist_inbound_event, telegram_event_dedupe_key
from app.gateway.inbox import (
persist_inbound_event,
slack_event_dedupe_key,
telegram_event_dedupe_key,
)
from app.gateway.pairing import generate_pairing_code, pairing_expires_at
from app.gateway.slack.adapter import slack_user_peer_id
from app.observability.metrics import (
record_gateway_inbox_write,
record_gateway_webhook_parse_error,
)
from app.users import current_active_user
from app.utils.oauth_security import OAuthStateManager, TokenEncryption
router = APIRouter(prefix="/gateway", tags=["gateway"])
logger = logging.getLogger(__name__)
SLACK_AUTHORIZATION_URL = "https://slack.com/oauth/v2/authorize"
SLACK_TOKEN_URL = "https://slack.com/api/oauth.v2.access"
SLACK_BOT_SCOPES = [
"app_mentions:read",
"chat:write",
"channels:read",
"groups:read",
"im:write",
"users:read",
"team:read",
]
_state_manager: OAuthStateManager | None = None
_token_encryption: TokenEncryption | None = None
def _get_state_manager() -> OAuthStateManager:
global _state_manager
if _state_manager is None:
if not config.SECRET_KEY:
raise HTTPException(status_code=500, detail="SECRET_KEY is not configured")
_state_manager = OAuthStateManager(config.SECRET_KEY)
return _state_manager
def _get_token_encryption() -> TokenEncryption:
global _token_encryption
if _token_encryption is None:
if not config.SECRET_KEY:
raise HTTPException(status_code=500, detail="SECRET_KEY is not configured")
_token_encryption = TokenEncryption(config.SECRET_KEY)
return _token_encryption
def _slack_redirect_uri() -> str:
if config.GATEWAY_SLACK_REDIRECT_URI:
return config.GATEWAY_SLACK_REDIRECT_URI
base = config.BACKEND_URL or ""
return f"{base.rstrip('/')}/api/v1/gateway/slack/callback"
def _slack_frontend_redirect(space_id: int, *, success: bool = False, error: str | None = None) -> RedirectResponse:
qs = "slack_gateway=connected" if success else f"error={error or 'slack_gateway_failed'}"
return RedirectResponse(
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/user-settings?{qs}"
)
def verify_slack_signature(*, signing_secret: str, timestamp: str | None, signature: str | None, body: bytes) -> bool:
if not signing_secret or not timestamp or not signature:
return False
try:
ts = int(timestamp)
except ValueError:
return False
if abs(time.time() - ts) > 60 * 5:
return False
base = b"v0:" + timestamp.encode() + b":" + body
digest = hmac.new(signing_secret.encode(), base, hashlib.sha256).hexdigest()
expected = f"v0={digest}"
return hmac.compare_digest(expected, signature)
def _slack_event_kind(payload: dict[str, Any]) -> str:
event_type = str((payload.get("event") or {}).get("type") or "")
return "message" if event_type in {"app_mention", "message"} else "other"
class StartBindingRequest(BaseModel):
platform: ExternalChatPlatform = ExternalChatPlatform.TELEGRAM
@ -67,6 +148,213 @@ def _telegram_message(payload: dict[str, Any]) -> dict[str, Any] | None:
return payload.get("message") or payload.get("edited_message")
@router.get("/slack/install")
async def install_slack_gateway(
search_space_id: int,
user: User = Depends(current_active_user),
) -> dict[str, str]:
if not config.GATEWAY_SLACK_CLIENT_ID:
raise HTTPException(status_code=500, detail="Slack gateway OAuth is not configured")
state = _get_state_manager().generate_secure_state(search_space_id, user.id)
auth_params = {
"client_id": config.GATEWAY_SLACK_CLIENT_ID,
"scope": ",".join(SLACK_BOT_SCOPES),
"redirect_uri": _slack_redirect_uri(),
"state": state,
}
return {"auth_url": f"{SLACK_AUTHORIZATION_URL}?{urlencode(auth_params)}"}
@router.get("/slack/callback")
async def slack_gateway_callback(
code: str | None = None,
error: str | None = None,
state: str | None = None,
session: AsyncSession = Depends(get_async_session),
) -> RedirectResponse:
space_id = None
if state:
try:
state_data = _get_state_manager().validate_state(state)
space_id = int(state_data["space_id"])
except Exception:
state_data = None
else:
state_data = None
if error:
return _slack_frontend_redirect(space_id or 0, error="slack_gateway_oauth_denied")
if not code or state_data is None:
raise HTTPException(status_code=400, detail="Invalid Slack gateway OAuth callback")
if not config.GATEWAY_SLACK_CLIENT_ID or not config.GATEWAY_SLACK_CLIENT_SECRET:
raise HTTPException(status_code=500, detail="Slack gateway OAuth is not configured")
user_id = UUID(state_data["user_id"])
token_payload = {
"client_id": config.GATEWAY_SLACK_CLIENT_ID,
"client_secret": config.GATEWAY_SLACK_CLIENT_SECRET,
"code": code,
"redirect_uri": _slack_redirect_uri(),
}
async with httpx.AsyncClient(timeout=30.0) as client:
token_response = await client.post(
SLACK_TOKEN_URL,
data=token_payload,
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
token_response.raise_for_status()
token_json = token_response.json()
if not token_json.get("ok", False):
raise HTTPException(
status_code=400,
detail=f"Slack gateway OAuth failed: {token_json.get('error', 'unknown_error')}",
)
bot_token = token_json.get("access_token")
team = token_json.get("team") or {}
team_id = team.get("id")
if not bot_token or not team_id:
raise HTTPException(status_code=400, detail="Slack gateway OAuth returned incomplete data")
bot_user_id = token_json.get("bot_user_id")
app_id = token_json.get("app_id")
authed_user = token_json.get("authed_user") or {}
authed_slack_user_id = authed_user.get("id")
enc = _get_token_encryption()
credentials = {
"bot_token": bot_token,
"token_type": token_json.get("token_type", "bot"),
"scope": token_json.get("scope"),
}
cursor_state = {
"team_id": team_id,
"team_name": team.get("name"),
"enterprise_id": (token_json.get("enterprise") or {}).get("id"),
"app_id": app_id,
"bot_user_id": bot_user_id,
"scope": token_json.get("scope"),
}
account = await get_slack_account_by_team(session, team_id=team_id)
if account is None:
account = ExternalChatAccount(
platform=ExternalChatPlatform.SLACK,
mode=ExternalChatAccountMode.CLOUD_SHARED,
is_system_account=True,
encrypted_credentials=enc.encrypt_token(json.dumps(credentials)),
bot_username="SurfSense",
cursor_state=cursor_state,
health_status=ExternalChatHealthStatus.UNKNOWN,
)
session.add(account)
await session.flush()
else:
account.encrypted_credentials = enc.encrypt_token(json.dumps(credentials))
account.cursor_state = {**(account.cursor_state or {}), **cursor_state}
account.health_status = ExternalChatHealthStatus.UNKNOWN
if authed_slack_user_id:
peer_id = slack_user_peer_id(team_id, authed_slack_user_id)
existing_binding_result = await session.execute(
select(ExternalChatBinding).where(
ExternalChatBinding.account_id == account.id,
ExternalChatBinding.external_peer_id == peer_id,
ExternalChatBinding.state.in_(
[ExternalChatBindingState.BOUND, ExternalChatBindingState.SUSPENDED]
),
)
)
binding = existing_binding_result.scalars().first()
if binding is None:
session.add(
ExternalChatBinding(
account_id=account.id,
user_id=user_id,
search_space_id=space_id,
state=ExternalChatBindingState.BOUND,
external_peer_id=peer_id,
external_peer_kind=ExternalChatPeerKind.DIRECT,
external_username=authed_slack_user_id,
external_metadata={
"kind": "slack_user",
"team_id": team_id,
"slack_user_id": authed_slack_user_id,
},
)
)
elif binding.user_id == user_id:
binding.search_space_id = space_id
binding.external_metadata = {
**(binding.external_metadata or {}),
"kind": "slack_user",
"team_id": team_id,
"slack_user_id": authed_slack_user_id,
}
await session.commit()
return _slack_frontend_redirect(space_id, success=True)
@router.post("/webhooks/slack")
async def slack_webhook(
request: Request,
session: AsyncSession = Depends(get_async_session),
) -> Response:
body = await request.body()
if not verify_slack_signature(
signing_secret=config.GATEWAY_SLACK_SIGNING_SECRET or "",
timestamp=request.headers.get("X-Slack-Request-Timestamp"),
signature=request.headers.get("X-Slack-Signature"),
body=body,
):
raise HTTPException(status_code=403, detail="Invalid Slack signature")
try:
payload = json.loads(body.decode())
except ValueError:
record_gateway_webhook_parse_error()
return Response(status_code=200)
if payload.get("type") == "url_verification":
return JSONResponse({"challenge": payload.get("challenge", "")})
if payload.get("type") != "event_callback":
return Response(status_code=200)
event = payload.get("event") or {}
event_id = payload.get("event_id")
team_id = payload.get("team_id") or event.get("team")
if not event_id or not team_id:
return Response(status_code=200)
account = await get_slack_account_by_team(session, team_id=str(team_id))
if account is None:
logger.warning("Ignoring Slack event for uninstalled team_id=%s", team_id)
return Response(status_code=200)
bot_user_id = (account.cursor_state or {}).get("bot_user_id")
if event.get("bot_id") or (bot_user_id and event.get("user") == bot_user_id):
return Response(status_code=200)
try:
inbox_id = await persist_inbound_event(
session,
account_id=account.id,
platform=ExternalChatPlatform.SLACK,
event_dedupe_key=slack_event_dedupe_key(event_id),
external_event_id=str(event_id),
external_message_id=str(event.get("ts")) if event.get("ts") else None,
event_kind=_slack_event_kind(payload),
raw_payload=payload,
request_id=f"gateway_{uuid.uuid4().hex[:16]}",
)
await session.commit()
record_gateway_inbox_write(platform="slack", dedup_skipped=inbox_id is None)
except Exception:
await session.rollback()
logger.exception("Slack webhook persistence failed team_id=%s", team_id)
return Response(status_code=200)
async def _resolve_webhook_account(
session: AsyncSession,
*,
@ -207,6 +495,7 @@ async def list_bindings(
"search_space_id": binding.search_space_id,
"external_display_name": binding.external_display_name,
"external_username": binding.external_username,
"external_metadata": binding.external_metadata,
"suspended_reason": binding.suspended_reason,
}
for binding, account in result.all()

View file

@ -1,10 +1,14 @@
from __future__ import annotations
import hashlib
import hmac
import inspect
import json
import time
import pytest
from app.db import ExternalChatPlatform, ExternalChatAccount
from app.db import ExternalChatAccount, ExternalChatAccountMode, ExternalChatPlatform
from app.routes import gateway_webhook_routes as routes
@ -19,6 +23,9 @@ class RequestStub:
raise self._json_exc
return self._payload
async def body(self):
return json.dumps(self._payload).encode()
def _account(secret: str = "secret") -> ExternalChatAccount:
return ExternalChatAccount(
@ -29,6 +36,38 @@ def _account(secret: str = "secret") -> ExternalChatAccount:
)
def _slack_account() -> ExternalChatAccount:
return ExternalChatAccount(
id=456,
platform=ExternalChatPlatform.SLACK,
mode=ExternalChatAccountMode.CLOUD_SHARED,
is_system_account=True,
cursor_state={"team_id": "T123", "bot_user_id": "U_BOT"},
)
def _signed_slack_request(payload: dict, *, secret: str = "signing-secret") -> RequestStub:
body = json.dumps(payload).encode()
timestamp = str(int(time.time()))
digest = hmac.new(
secret.encode(),
b"v0:" + timestamp.encode() + b":" + body,
hashlib.sha256,
).hexdigest()
class SlackRequestStub(RequestStub):
async def body(self):
return body
return SlackRequestStub(
payload,
headers={
"X-Slack-Request-Timestamp": timestamp,
"X-Slack-Signature": f"v0={digest}",
},
)
async def _call_webhook(*, request: RequestStub, account_id: int, session):
return await routes.telegram_webhook(
request=request,
@ -147,3 +186,89 @@ def test_telegram_webhook_does_not_use_slowapi_limiter():
assert "@limiter.limit" not in route_source
def test_verify_slack_signature_accepts_valid_signature():
payload = b'{"type":"event_callback"}'
timestamp = str(int(time.time()))
digest = hmac.new(
b"secret",
b"v0:" + timestamp.encode() + b":" + payload,
hashlib.sha256,
).hexdigest()
assert routes.verify_slack_signature(
signing_secret="secret",
timestamp=timestamp,
signature=f"v0={digest}",
body=payload,
)
@pytest.mark.asyncio
async def test_slack_webhook_url_verification(monkeypatch, mocker):
monkeypatch.setattr(routes.config, "GATEWAY_SLACK_SIGNING_SECRET", "signing-secret")
request = _signed_slack_request({"type": "url_verification", "challenge": "abc123"})
response = await routes.slack_webhook(request=request, session=mocker.AsyncMock())
assert response.status_code == 200
assert json.loads(response.body)["challenge"] == "abc123"
@pytest.mark.asyncio
async def test_slack_webhook_persists_event(monkeypatch, mocker):
monkeypatch.setattr(routes.config, "GATEWAY_SLACK_SIGNING_SECRET", "signing-secret")
session = mocker.AsyncMock()
monkeypatch.setattr(routes, "get_slack_account_by_team", mocker.AsyncMock(return_value=_slack_account()))
persist = mocker.AsyncMock(return_value=100)
monkeypatch.setattr(routes, "persist_inbound_event", persist)
payload = {
"type": "event_callback",
"team_id": "T123",
"event_id": "Ev123",
"event": {
"type": "app_mention",
"channel": "C123",
"user": "U123",
"text": "<@U_BOT> hello",
"ts": "1717000000.000100",
},
}
request = _signed_slack_request(payload)
response = await routes.slack_webhook(request=request, session=session)
assert response.status_code == 200
persist.assert_awaited_once()
assert persist.await_args.kwargs["event_dedupe_key"] == "slack_event:Ev123"
assert persist.await_args.kwargs["platform"] == ExternalChatPlatform.SLACK
session.commit.assert_awaited_once()
@pytest.mark.asyncio
async def test_slack_webhook_ignores_self_event(monkeypatch, mocker):
monkeypatch.setattr(routes.config, "GATEWAY_SLACK_SIGNING_SECRET", "signing-secret")
session = mocker.AsyncMock()
monkeypatch.setattr(routes, "get_slack_account_by_team", mocker.AsyncMock(return_value=_slack_account()))
persist = mocker.AsyncMock(return_value=100)
monkeypatch.setattr(routes, "persist_inbound_event", persist)
request = _signed_slack_request(
{
"type": "event_callback",
"team_id": "T123",
"event_id": "Ev123",
"event": {
"type": "app_mention",
"channel": "C123",
"user": "U_BOT",
"text": "loop",
"ts": "1717000000.000100",
},
}
)
response = await routes.slack_webhook(request=request, session=session)
assert response.status_code == 200
persist.assert_not_awaited()