diff --git a/surfsense_backend/app/gateway/bindings.py b/surfsense_backend/app/gateway/bindings.py index e7205c5f1..971633571 100644 --- a/surfsense_backend/app/gateway/bindings.py +++ b/surfsense_backend/app/gateway/bindings.py @@ -9,8 +9,8 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.db import ( ChatVisibility, - ExternalChatBindingState, ExternalChatBinding, + ExternalChatBindingState, NewChatThread, ) @@ -27,12 +27,17 @@ async def get_or_create_thread_for_binding( if thread is not None and not thread.archived: return thread + source = str((binding.external_metadata or {}).get("platform") or "").strip() + if not source: + kind = str((binding.external_metadata or {}).get("kind") or "") + source = "slack" if kind.startswith("slack_") else "telegram" + thread = NewChatThread( - title="Telegram chat", + title=f"{source.title()} chat", search_space_id=binding.search_space_id, created_by_id=binding.user_id, visibility=ChatVisibility.PRIVATE, - source="telegram", + source=source, external_chat_binding_id=binding.id, ) session.add(thread) diff --git a/surfsense_backend/app/gateway/inbox.py b/surfsense_backend/app/gateway/inbox.py index 9bc660b9d..5769c8cc4 100644 --- a/surfsense_backend/app/gateway/inbox.py +++ b/surfsense_backend/app/gateway/inbox.py @@ -12,6 +12,10 @@ def telegram_event_dedupe_key(update_id: int | str) -> str: return f"update:{update_id}" +def slack_event_dedupe_key(event_id: int | str) -> str: + return f"slack_event:{event_id}" + + async def persist_inbound_event( session: AsyncSession, *, diff --git a/surfsense_backend/app/gateway/inbox_processor.py b/surfsense_backend/app/gateway/inbox_processor.py index bdf768d61..3e87b582d 100644 --- a/surfsense_backend/app/gateway/inbox_processor.py +++ b/surfsense_backend/app/gateway/inbox_processor.py @@ -21,6 +21,7 @@ from app.db import ( ExternalChatEventStatus, ExternalChatInboundEvent, ExternalChatPeerKind, + ExternalChatPlatform, NewChatThread, async_session_maker, ) @@ -128,6 +129,86 @@ async def _mark_failed( await session.commit() +async def _resolve_binding_for_event( + session: AsyncSession, + account: ExternalChatAccount, + parsed, +) -> ExternalChatBinding | None: + if account.platform == ExternalChatPlatform.SLACK: + return await _resolve_slack_thread_binding(session, account, parsed) + + result = await session.execute( + select(ExternalChatBinding).where( + ExternalChatBinding.account_id == account.id, + ExternalChatBinding.external_peer_id == parsed.external_peer_id, + ExternalChatBinding.state.in_( + [ExternalChatBindingState.BOUND, ExternalChatBindingState.SUSPENDED] + ), + ) + ) + return result.scalars().first() + + +async def _resolve_slack_thread_binding( + session: AsyncSession, + account: ExternalChatAccount, + parsed, +) -> ExternalChatBinding | None: + user_peer_id = parsed.metadata.get("slack_user_peer_id") + thread_peer_id = parsed.metadata.get("slack_thread_peer_id") or parsed.external_peer_id + if not user_peer_id or not thread_peer_id: + return None + + user_result = await session.execute( + select(ExternalChatBinding).where( + ExternalChatBinding.account_id == account.id, + ExternalChatBinding.external_peer_id == user_peer_id, + ExternalChatBinding.state.in_( + [ExternalChatBindingState.BOUND, ExternalChatBindingState.SUSPENDED] + ), + ) + ) + user_binding = user_result.scalars().first() + if user_binding is None: + return None + + thread_result = await session.execute( + select(ExternalChatBinding).where( + ExternalChatBinding.account_id == account.id, + ExternalChatBinding.external_peer_id == thread_peer_id, + ExternalChatBinding.state.in_( + [ExternalChatBindingState.BOUND, ExternalChatBindingState.SUSPENDED] + ), + ) + ) + thread_binding = thread_result.scalars().first() + if thread_binding is not None: + return thread_binding + + thread_binding = ExternalChatBinding( + account_id=account.id, + user_id=user_binding.user_id, + search_space_id=user_binding.search_space_id, + state=ExternalChatBindingState.BOUND, + external_peer_id=thread_peer_id, + external_peer_kind=ExternalChatPeerKind.CHANNEL, + external_thread_id=parsed.metadata.get("thread_ts"), + external_display_name=parsed.metadata.get("channel_id"), + external_username=parsed.external_user_id, + external_metadata={ + "kind": "slack_thread", + "team_id": parsed.metadata.get("team_id"), + "channel_id": parsed.metadata.get("channel_id"), + "thread_ts": parsed.metadata.get("thread_ts"), + "slack_user_id": parsed.metadata.get("slack_user_id"), + "user_binding_id": user_binding.id, + }, + ) + session.add(thread_binding) + await session.flush() + return thread_binding + + async def _dispatch_inbound_event( inbox_id: int, session_maker: SessionMaker, @@ -161,18 +242,12 @@ async def _dispatch_inbound_event( _update_account_cursor(account, parsed.metadata.get("update_id")) - result = await session.execute( - select(ExternalChatBinding).where( - ExternalChatBinding.account_id == account.id, - ExternalChatBinding.external_peer_id == parsed.external_peer_id, - ExternalChatBinding.state.in_( - [ExternalChatBindingState.BOUND, ExternalChatBindingState.SUSPENDED] - ), - ) - ) - binding = result.scalars().first() + binding = await _resolve_binding_for_event(session, account, parsed) - if parsed.external_peer_kind != ExternalChatPeerKind.DIRECT.value: + if ( + account.platform != ExternalChatPlatform.SLACK + and parsed.external_peer_kind != ExternalChatPeerKind.DIRECT.value + ): if hasattr(adapter, "leave_chat"): await adapter.leave_chat(external_peer_id=parsed.external_peer_id) event.status = ExternalChatEventStatus.IGNORED diff --git a/surfsense_backend/app/gateway/registry.py b/surfsense_backend/app/gateway/registry.py index db334b7f1..fc9cb37e5 100644 --- a/surfsense_backend/app/gateway/registry.py +++ b/surfsense_backend/app/gateway/registry.py @@ -6,7 +6,7 @@ from collections.abc import Callable from dataclasses import dataclass from app.db import ExternalChatAccount, ExternalChatAccountMode, ExternalChatPlatform -from app.gateway.accounts import account_token +from app.gateway.accounts import account_token, slack_account_credentials from app.gateway.base.adapter import BasePlatformAdapter, ParsedInboundEvent from app.gateway.base.commands import BaseGatewayCommands from app.gateway.base.translator import BaseStreamTranslator @@ -70,6 +70,23 @@ def _whatsapp_baileys_translator_factory( ) +def _slack_translator_factory( + adapter: BasePlatformAdapter, + event: ParsedInboundEvent, +) -> BaseStreamTranslator: + channel_id = event.metadata.get("channel_id") + thread_ts = event.metadata.get("thread_ts") + if not channel_id or not thread_ts: + raise RuntimeError("missing_slack_thread_metadata") + from app.gateway.slack.translator import SlackStreamTranslator + + return SlackStreamTranslator( + adapter=adapter, # type: ignore[arg-type] + channel_id=channel_id, + thread_ts=thread_ts, + ) + + def resolve_platform_bundle(account: ExternalChatAccount) -> PlatformBundle: if account.platform == ExternalChatPlatform.TELEGRAM: token = account_token(account) @@ -108,4 +125,24 @@ def resolve_platform_bundle(account: ExternalChatAccount) -> PlatformBundle: auto_bind_owner=True, ) + if account.platform == ExternalChatPlatform.SLACK: + from app.gateway.slack.adapter import SlackAdapter + from app.gateway.slack.commands import SlackGatewayCommands + + credentials = slack_account_credentials(account) + bot_token = credentials.get("bot_token") + if not bot_token: + raise RuntimeError("missing_slack_bot_token") + cursor_state = account.cursor_state or {} + return PlatformBundle( + adapter=SlackAdapter( + bot_token, + bot_user_id=cursor_state.get("bot_user_id"), + ), + translator_factory=_slack_translator_factory, + platform_label="slack", + commands=SlackGatewayCommands(), + auto_bind_owner=False, + ) + raise RuntimeError(f"unsupported_gateway_platform:{account.platform.value}:{account.mode.value}")