feat(gateway): enhance logging and event handling in agent and Telegram translator

This commit is contained in:
Anish Sarkar 2026-05-28 00:07:37 +05:30
parent 5f9d16530d
commit 708e3a9120
4 changed files with 38 additions and 6 deletions

View file

@ -22,6 +22,7 @@ logger = logging.getLogger(__name__)
async def _events_from_sse(chunks: AsyncIterator[str]) -> AsyncIterator[GatewayStreamEvent]:
saw_text = False
async for chunk in chunks:
for raw_line in chunk.splitlines():
line = raw_line.strip()
@ -29,6 +30,7 @@ async def _events_from_sse(chunks: AsyncIterator[str]) -> AsyncIterator[GatewayS
continue
payload = line.removeprefix("data:").strip()
if payload == "[DONE]":
logger.info("Gateway SSE normalized: done")
yield GatewayStreamEvent(type="done")
continue
try:
@ -37,12 +39,16 @@ async def _events_from_sse(chunks: AsyncIterator[str]) -> AsyncIterator[GatewayS
continue
event_type = str(data.get("type") or "")
if event_type == "text-delta":
yield GatewayStreamEvent(type="text-delta", data={"delta": data.get("delta", "")})
elif event_type == "text-end":
yield GatewayStreamEvent(type="text-end", data=data)
elif event_type == "finish":
delta = data.get("delta", "")
if delta and not saw_text:
logger.info("Gateway SSE normalized: text stream started")
saw_text = True
yield GatewayStreamEvent(type="text-delta", data={"delta": delta})
elif event_type in {"finish", "done"}:
logger.info("Gateway SSE normalized: %s", event_type)
yield GatewayStreamEvent(type="finish", data=data)
elif event_type == "data-interrupt-request":
logger.info("Gateway SSE normalized: interrupt request")
yield GatewayStreamEvent(type="data-interrupt-request", data=data)

View file

@ -24,7 +24,7 @@ def _lock_key(token: str) -> int:
class GatewayRunner:
async def run(self) -> None:
print("Gateway runner started. Waiting for inbound events...", flush=True)
logger.info("Gateway runner started. Waiting for inbound events.")
tasks = [asyncio.create_task(self._process_inbox_forever())]
async with async_session_maker() as session:

View file

@ -54,7 +54,7 @@ class TelegramStreamTranslator(BaseStreamTranslator):
elif event.type in {"data-interrupt-request", "interrupt"}:
await self._handle_hitl_interrupt()
return
elif event.type in {"text-end", "finish", "done"}:
elif event.type in {"finish", "done"}:
break
await self._flush(final=True)
@ -100,6 +100,11 @@ class TelegramStreamTranslator(BaseStreamTranslator):
async def _send_text(self, text: str) -> PlatformSendResult:
await self._throttle()
parse_mode = None if self._plaintext_mode else ParseMode.MARKDOWN_V2
logger.info(
"Telegram gateway sending message peer=%s chars=%d",
self.external_peer_id,
len(text),
)
try:
result = await retry_plaintext_on_bad_markdown(
self.adapter.send_message,
@ -110,12 +115,23 @@ class TelegramStreamTranslator(BaseStreamTranslator):
except Exception:
record_gateway_outbound(platform="telegram", kind="send", status="failed")
raise
logger.info(
"Telegram gateway sent message peer=%s message_id=%s",
self.external_peer_id,
result.external_message_id,
)
record_gateway_outbound(platform="telegram", kind="send", status="sent")
return result
async def _edit_text(self, message_id: str, text: str) -> PlatformSendResult:
await self._throttle()
parse_mode = None if self._plaintext_mode else ParseMode.MARKDOWN_V2
logger.info(
"Telegram gateway editing message peer=%s message_id=%s chars=%d",
self.external_peer_id,
message_id,
len(text),
)
try:
result = await retry_plaintext_on_bad_markdown(
self.adapter.edit_message,
@ -127,6 +143,11 @@ class TelegramStreamTranslator(BaseStreamTranslator):
except Exception:
record_gateway_outbound(platform="telegram", kind="edit", status="failed")
raise
logger.info(
"Telegram gateway edited message peer=%s message_id=%s",
self.external_peer_id,
result.external_message_id,
)
record_gateway_outbound(platform="telegram", kind="edit", status="edited")
return result

View file

@ -3,9 +3,14 @@
from __future__ import annotations
import asyncio
import logging
from app.gateway.runner import GatewayRunner
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s [%(name)s] %(message)s",
)
asyncio.run(GatewayRunner().run())