diff --git a/surfsense_backend/app/gateway/agent_invoke.py b/surfsense_backend/app/gateway/agent_invoke.py index b876d1977..39d1fb299 100644 --- a/surfsense_backend/app/gateway/agent_invoke.py +++ b/surfsense_backend/app/gateway/agent_invoke.py @@ -1,4 +1,4 @@ -"""Invoke SurfSense chat agent for gateway channels.""" +"""Invoke SurfSense chat agent for external chat surfaces.""" from __future__ import annotations @@ -6,9 +6,10 @@ import json import logging from collections.abc import AsyncIterator +from sqlalchemy import update from sqlalchemy.ext.asyncio import AsyncSession -from app.db import GatewayConversationBinding +from app.db import ExternalChatBinding, NewChatMessage from app.gateway.auth_invariant import assert_authorization_invariant from app.gateway.base.translator import GatewayStreamEvent from app.gateway.bindings import get_or_create_thread_for_binding @@ -55,7 +56,7 @@ async def _events_from_sse(chunks: AsyncIterator[str]) -> AsyncIterator[GatewayS async def call_agent_for_gateway( *, session: AsyncSession, - binding: GatewayConversationBinding, + binding: ExternalChatBinding, user_text: str, translator: TelegramStreamTranslator, request_id: str | None = None, @@ -85,6 +86,12 @@ async def call_agent_for_gateway( finally: await events.aclose() await stream.aclose() + await session.execute( + update(NewChatMessage) + .where(NewChatMessage.thread_id == thread.id, NewChatMessage.source == "web") + .values(source="telegram") + ) + await session.commit() record_gateway_turn_latency(0, platform="telegram") finally: release_thread_lock(thread.id) diff --git a/surfsense_backend/app/gateway/inbox_processor.py b/surfsense_backend/app/gateway/inbox_processor.py index 3e3f962b7..c40a6c47c 100644 --- a/surfsense_backend/app/gateway/inbox_processor.py +++ b/surfsense_backend/app/gateway/inbox_processor.py @@ -1,8 +1,7 @@ -"""Long-lived gateway inbox processing. +"""Long-lived external chat inbox processing. -This module owns the agent-turn execution path for messaging gateways. It is -intentionally independent of Celery so LangGraph, async Postgres, Redis, and -Telegram clients all run on one stable event loop in ``GatewayRunner``. +This module owns the agent-turn execution path for external chat surfaces. +FastAPI calls into it after webhook and BYO long-poll intake persist inbox rows. """ from __future__ import annotations @@ -16,12 +15,12 @@ from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from app.config import config from app.db import ( - GatewayBindingState, - GatewayConversationBinding, - GatewayEventStatus, - GatewayInboundEvent, - GatewayPeerKind, - GatewayPlatformAccount, + ExternalChatBindingState, + ExternalChatBinding, + ExternalChatEventStatus, + ExternalChatInboundEvent, + ExternalChatPeerKind, + ExternalChatAccount, NewChatThread, async_session_maker, ) @@ -54,16 +53,16 @@ async def claim_next_inbound_event( async with session_maker() as session: result = await session.execute( - select(GatewayInboundEvent) - .where(GatewayInboundEvent.status == GatewayEventStatus.RECEIVED) - .order_by(GatewayInboundEvent.received_at.asc()) + select(ExternalChatInboundEvent) + .where(ExternalChatInboundEvent.status == ExternalChatEventStatus.RECEIVED) + .order_by(ExternalChatInboundEvent.received_at.asc()) .with_for_update(skip_locked=True) .limit(1) ) event = result.scalars().first() if event is None: return None - event.status = GatewayEventStatus.PROCESSING + event.status = ExternalChatEventStatus.PROCESSING event.attempt_count += 1 await session.commit() return int(event.id) @@ -73,22 +72,22 @@ async def process_inbound_event( inbox_id: int, session_maker: SessionMaker = async_session_maker, ) -> None: - """Process one gateway inbox row and mark its terminal status.""" + """Process one external chat inbox row and mark its terminal status.""" async with session_maker() as session: result = await session.execute( - select(GatewayInboundEvent) - .where(GatewayInboundEvent.id == inbox_id) + select(ExternalChatInboundEvent) + .where(ExternalChatInboundEvent.id == inbox_id) .with_for_update(skip_locked=True) ) event = result.scalars().first() if event is None or event.status in { - GatewayEventStatus.PROCESSED, - GatewayEventStatus.IGNORED, + ExternalChatEventStatus.PROCESSED, + ExternalChatEventStatus.IGNORED, }: return - if event.status == GatewayEventStatus.RECEIVED: - event.status = GatewayEventStatus.PROCESSING + if event.status == ExternalChatEventStatus.RECEIVED: + event.status = ExternalChatEventStatus.PROCESSING event.attempt_count += 1 await session.commit() @@ -98,15 +97,15 @@ async def process_inbound_event( if str(exc) == "gateway_thread_busy": async with session_maker() as session: await session.execute( - update(GatewayInboundEvent) - .where(GatewayInboundEvent.id == inbox_id) + update(ExternalChatInboundEvent) + .where(ExternalChatInboundEvent.id == inbox_id) .values( - status=GatewayEventStatus.RECEIVED, + status=ExternalChatEventStatus.RECEIVED, last_error="gateway_thread_busy", ) ) await session.commit() - return + raise await _mark_failed(inbox_id, str(exc), session_maker) raise except Exception as exc: @@ -114,9 +113,9 @@ async def process_inbound_event( raise async with session_maker() as session: - event = await session.get(GatewayInboundEvent, inbox_id) - if event is not None and event.status == GatewayEventStatus.PROCESSING: - event.status = GatewayEventStatus.PROCESSED + event = await session.get(ExternalChatInboundEvent, inbox_id) + if event is not None and event.status == ExternalChatEventStatus.PROCESSING: + event.status = ExternalChatEventStatus.PROCESSED event.processed_at = datetime.now(UTC) await session.commit() record_gateway_inbox_processed(platform=event.platform.value, status="processed") @@ -129,9 +128,9 @@ async def _mark_failed( ) -> None: async with session_maker() as session: await session.execute( - update(GatewayInboundEvent) - .where(GatewayInboundEvent.id == inbox_id) - .values(status=GatewayEventStatus.FAILED, last_error=error) + update(ExternalChatInboundEvent) + .where(ExternalChatInboundEvent.id == inbox_id) + .values(status=ExternalChatEventStatus.FAILED, last_error=error) ) await session.commit() @@ -141,19 +140,19 @@ async def _dispatch_inbound_event( session_maker: SessionMaker, ) -> None: async with session_maker() as session: - event = await session.get(GatewayInboundEvent, inbox_id) + event = await session.get(ExternalChatInboundEvent, inbox_id) if event is None: return - account = await session.get(GatewayPlatformAccount, event.account_id) + account = await session.get(ExternalChatAccount, event.account_id) if account is None: - event.status = GatewayEventStatus.IGNORED + event.status = ExternalChatEventStatus.IGNORED event.last_error = "account_missing" await session.commit() return token = account_token(account) if not token: - event.status = GatewayEventStatus.FAILED + event.status = ExternalChatEventStatus.FAILED event.last_error = "missing_telegram_token" await session.commit() return @@ -161,7 +160,7 @@ async def _dispatch_inbound_event( adapter = TelegramAdapter(token) parsed = adapter.parse_inbound(event.raw_payload or {}) if parsed.external_peer_id is None: - event.status = GatewayEventStatus.IGNORED + event.status = ExternalChatEventStatus.IGNORED event.last_error = "missing_external_peer_id" await session.commit() return @@ -169,19 +168,19 @@ async def _dispatch_inbound_event( _update_account_cursor(account, parsed.metadata.get("update_id")) result = await session.execute( - select(GatewayConversationBinding).where( - GatewayConversationBinding.account_id == account.id, - GatewayConversationBinding.external_peer_id == parsed.external_peer_id, - GatewayConversationBinding.state.in_( - [GatewayBindingState.BOUND, GatewayBindingState.SUSPENDED] + select(ExternalChatBinding).where( + ExternalChatBinding.account_id == account.id, + ExternalChatBinding.external_peer_id == parsed.external_peer_id, + ExternalChatBinding.state.in_( + [ExternalChatBindingState.BOUND, ExternalChatBindingState.SUSPENDED] ), ) ) binding = result.scalars().first() - if parsed.external_peer_kind != GatewayPeerKind.DIRECT.value: + if parsed.external_peer_kind != ExternalChatPeerKind.DIRECT.value: await adapter.leave_chat(external_peer_id=parsed.external_peer_id) - event.status = GatewayEventStatus.IGNORED + event.status = ExternalChatEventStatus.IGNORED event.last_error = "group_rejected" await session.commit() return @@ -201,30 +200,30 @@ async def _dispatch_inbound_event( event=parsed, dashboard_url=_dashboard_url(), ) - event.status = GatewayEventStatus.IGNORED + event.status = ExternalChatEventStatus.IGNORED event.last_error = "unbound_chat" await session.commit() return - event.binding_id = binding.id + event.external_chat_binding_id = binding.id if cmd == "/help": await handle_help_command(adapter=adapter, event=parsed) - event.status = GatewayEventStatus.PROCESSED + event.status = ExternalChatEventStatus.PROCESSED await session.commit() return if cmd == "/new": - binding.active_thread_id = None + binding.new_chat_thread_id = None await adapter.send_message( external_peer_id=parsed.external_peer_id, text="Started a new SurfSense conversation.", ) - event.status = GatewayEventStatus.PROCESSED + event.status = ExternalChatEventStatus.PROCESSED await session.commit() return if not parsed.text: - event.status = GatewayEventStatus.IGNORED + event.status = ExternalChatEventStatus.IGNORED event.last_error = "empty_message" await session.commit() return @@ -241,7 +240,7 @@ async def _dispatch_inbound_event( binding=binding, user_text=parsed.text, translator=translator, - request_id=f"gateway:{inbox_id}", + request_id=event.request_id or f"gateway:{inbox_id}", ) thread = await session.get(NewChatThread, thread.id) @@ -250,7 +249,7 @@ async def _dispatch_inbound_event( await session.commit() -def _update_account_cursor(account: GatewayPlatformAccount, update_id: object) -> None: +def _update_account_cursor(account: ExternalChatAccount, update_id: object) -> None: if update_id is None: return account.cursor_state = {