diff --git a/surfsense_backend/app/gateway/agent_invoke.py b/surfsense_backend/app/gateway/agent_invoke.py index b0cccddaa..4faf3711f 100644 --- a/surfsense_backend/app/gateway/agent_invoke.py +++ b/surfsense_backend/app/gateway/agent_invoke.py @@ -22,6 +22,7 @@ logger = logging.getLogger(__name__) async def _events_from_sse(chunks: AsyncIterator[str]) -> AsyncIterator[GatewayStreamEvent]: + saw_text = False async for chunk in chunks: for raw_line in chunk.splitlines(): line = raw_line.strip() @@ -29,6 +30,7 @@ async def _events_from_sse(chunks: AsyncIterator[str]) -> AsyncIterator[GatewayS continue payload = line.removeprefix("data:").strip() if payload == "[DONE]": + logger.info("Gateway SSE normalized: done") yield GatewayStreamEvent(type="done") continue try: @@ -37,12 +39,16 @@ async def _events_from_sse(chunks: AsyncIterator[str]) -> AsyncIterator[GatewayS continue event_type = str(data.get("type") or "") if event_type == "text-delta": - yield GatewayStreamEvent(type="text-delta", data={"delta": data.get("delta", "")}) - elif event_type == "text-end": - yield GatewayStreamEvent(type="text-end", data=data) - elif event_type == "finish": + delta = data.get("delta", "") + if delta and not saw_text: + logger.info("Gateway SSE normalized: text stream started") + saw_text = True + yield GatewayStreamEvent(type="text-delta", data={"delta": delta}) + elif event_type in {"finish", "done"}: + logger.info("Gateway SSE normalized: %s", event_type) yield GatewayStreamEvent(type="finish", data=data) elif event_type == "data-interrupt-request": + logger.info("Gateway SSE normalized: interrupt request") yield GatewayStreamEvent(type="data-interrupt-request", data=data) diff --git a/surfsense_backend/app/gateway/runner.py b/surfsense_backend/app/gateway/runner.py index 1e56ff25d..8ebd89253 100644 --- a/surfsense_backend/app/gateway/runner.py +++ b/surfsense_backend/app/gateway/runner.py @@ -24,7 +24,7 @@ def _lock_key(token: str) -> int: class GatewayRunner: async def run(self) -> None: - print("Gateway runner started. Waiting for inbound events...", flush=True) + logger.info("Gateway runner started. Waiting for inbound events.") tasks = [asyncio.create_task(self._process_inbox_forever())] async with async_session_maker() as session: diff --git a/surfsense_backend/app/gateway/telegram/translator.py b/surfsense_backend/app/gateway/telegram/translator.py index d98f208c4..96903bea0 100644 --- a/surfsense_backend/app/gateway/telegram/translator.py +++ b/surfsense_backend/app/gateway/telegram/translator.py @@ -54,7 +54,7 @@ class TelegramStreamTranslator(BaseStreamTranslator): elif event.type in {"data-interrupt-request", "interrupt"}: await self._handle_hitl_interrupt() return - elif event.type in {"text-end", "finish", "done"}: + elif event.type in {"finish", "done"}: break await self._flush(final=True) @@ -100,6 +100,11 @@ class TelegramStreamTranslator(BaseStreamTranslator): async def _send_text(self, text: str) -> PlatformSendResult: await self._throttle() parse_mode = None if self._plaintext_mode else ParseMode.MARKDOWN_V2 + logger.info( + "Telegram gateway sending message peer=%s chars=%d", + self.external_peer_id, + len(text), + ) try: result = await retry_plaintext_on_bad_markdown( self.adapter.send_message, @@ -110,12 +115,23 @@ class TelegramStreamTranslator(BaseStreamTranslator): except Exception: record_gateway_outbound(platform="telegram", kind="send", status="failed") raise + logger.info( + "Telegram gateway sent message peer=%s message_id=%s", + self.external_peer_id, + result.external_message_id, + ) record_gateway_outbound(platform="telegram", kind="send", status="sent") return result async def _edit_text(self, message_id: str, text: str) -> PlatformSendResult: await self._throttle() parse_mode = None if self._plaintext_mode else ParseMode.MARKDOWN_V2 + logger.info( + "Telegram gateway editing message peer=%s message_id=%s chars=%d", + self.external_peer_id, + message_id, + len(text), + ) try: result = await retry_plaintext_on_bad_markdown( self.adapter.edit_message, @@ -127,6 +143,11 @@ class TelegramStreamTranslator(BaseStreamTranslator): except Exception: record_gateway_outbound(platform="telegram", kind="edit", status="failed") raise + logger.info( + "Telegram gateway edited message peer=%s message_id=%s", + self.external_peer_id, + result.external_message_id, + ) record_gateway_outbound(platform="telegram", kind="edit", status="edited") return result diff --git a/surfsense_backend/gateway_runner.py b/surfsense_backend/gateway_runner.py index 72a1749a9..27077ef48 100644 --- a/surfsense_backend/gateway_runner.py +++ b/surfsense_backend/gateway_runner.py @@ -3,9 +3,14 @@ from __future__ import annotations import asyncio +import logging from app.gateway.runner import GatewayRunner if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s [%(name)s] %(message)s", + ) asyncio.run(GatewayRunner().run())