diff --git a/surfsense_backend/app/gateway/whatsapp/commands.py b/surfsense_backend/app/gateway/whatsapp/commands.py new file mode 100644 index 000000000..28b765347 --- /dev/null +++ b/surfsense_backend/app/gateway/whatsapp/commands.py @@ -0,0 +1,123 @@ +"""WhatsApp command handlers.""" + +from __future__ import annotations + +from app.gateway.base.adapter import BasePlatformAdapter, ParsedInboundEvent +from app.gateway.base.commands import BaseGatewayCommands +from app.gateway.pairing import redeem_pairing_code +from app.gateway.ratelimit import acquire_token + +HELP_TEXT = ( + "SurfSense WhatsApp commands:\n" + "/start - pair this chat\n" + "/new - start a fresh conversation\n" + "/help - show this help" +) + + +async def handle_start_command( + *, + session, + adapter: BasePlatformAdapter, + event: ParsedInboundEvent, +) -> bool: + text = event.text or "" + parts = text.split(maxsplit=1) + if len(parts) != 2 or not event.external_peer_id: + await adapter.send_message( + external_peer_id=event.external_peer_id or "", + text=( + "Generate a pairing code in SurfSense Settings > Messaging Channels, " + "then send /start CODE here." + ), + ) + return True + + binding = await redeem_pairing_code( + session, + code=parts[1].strip(), + external_peer_id=event.external_peer_id, + external_peer_kind=event.external_peer_kind, + external_display_name=event.display_name, + external_username=event.username, + external_metadata=event.metadata, + ) + if binding is None: + await adapter.send_message( + external_peer_id=event.external_peer_id, + text="That pairing code is invalid or expired. Generate a new code in SurfSense.", + ) + return True + + await adapter.send_message( + external_peer_id=event.external_peer_id, + text="SurfSense is connected. Send a message here to chat with your agent.", + ) + return True + + +async def handle_help_command( + *, + adapter: BasePlatformAdapter, + event: ParsedInboundEvent, +) -> bool: + if not event.external_peer_id: + return True + await adapter.send_message(external_peer_id=event.external_peer_id, text=HELP_TEXT) + return True + + +async def send_unbound_onboarding( + *, + adapter: BasePlatformAdapter, + event: ParsedInboundEvent, + dashboard_url: str, +) -> None: + if not event.external_peer_id: + return + wait_ms = await acquire_token( + f"wa:onboarded:{event.external_peer_id}", + capacity=1, + refill_per_sec=1 / 3600, + ) + if wait_ms > 0: + return + await adapter.send_message( + external_peer_id=event.external_peer_id, + text=( + "Hi! To use SurfSense via WhatsApp, generate a pairing code at " + f"{dashboard_url} and send /start CODE here." + ), + ) + + +class WhatsAppGatewayCommands(BaseGatewayCommands): + async def handle_start_command( + self, + *, + session, + adapter: BasePlatformAdapter, + event: ParsedInboundEvent, + ) -> bool: + return await handle_start_command(session=session, adapter=adapter, event=event) + + async def handle_help_command( + self, + *, + adapter: BasePlatformAdapter, + event: ParsedInboundEvent, + ) -> bool: + return await handle_help_command(adapter=adapter, event=event) + + async def send_unbound_onboarding( + self, + *, + adapter: BasePlatformAdapter, + event: ParsedInboundEvent, + dashboard_url: str, + ) -> None: + await send_unbound_onboarding( + adapter=adapter, + event=event, + dashboard_url=dashboard_url, + ) diff --git a/surfsense_backend/app/gateway/whatsapp/translator.py b/surfsense_backend/app/gateway/whatsapp/translator.py new file mode 100644 index 000000000..deef8b452 --- /dev/null +++ b/surfsense_backend/app/gateway/whatsapp/translator.py @@ -0,0 +1,90 @@ +"""Translate agent stream events into WhatsApp Cloud API messages.""" + +from __future__ import annotations + +import logging +from collections.abc import AsyncIterator + +from app.gateway.base.adapter import BasePlatformAdapter, PlatformSendResult +from app.gateway.base.formatting import split_text_message +from app.gateway.base.translator import BaseStreamTranslator, GatewayStreamEvent +from app.gateway.whatsapp.adapter_cloud import WhatsAppCloudAdapter +from app.observability.metrics import ( + record_gateway_hitl_aborted, + record_gateway_outbound, +) + +logger = logging.getLogger(__name__) + +HITL_UNSUPPORTED_MESSAGE = ( + "This action requires approval and is not yet supported from WhatsApp. " + "Try again with a different request." +) + + +class WhatsAppCloudStreamTranslator(BaseStreamTranslator): + def __init__( + self, + *, + adapter: BasePlatformAdapter, + external_peer_id: str, + inbound_message_id: str | None = None, + ) -> None: + self.adapter = adapter + self.external_peer_id = external_peer_id + self.inbound_message_id = inbound_message_id + self._buffer = "" + self._typing_sent = False + + async def translate(self, events: AsyncIterator[GatewayStreamEvent]) -> None: + async for event in events: + if event.type in {"text-delta", "text_delta", "text"}: + if not self._typing_sent: + await self._send_typing_indicator() + self._buffer += str(event.data.get("text") or event.data.get("delta") or "") + elif event.type in {"data-interrupt-request", "interrupt"}: + await self._handle_hitl_interrupt() + return + elif event.type in {"finish", "done"}: + break + + await self._flush_final() + + async def _flush_final(self) -> None: + if not self._buffer: + return + for chunk in split_text_message(self._buffer): + await self._send_text(chunk) + + async def _send_typing_indicator(self) -> None: + self._typing_sent = True + if not self.inbound_message_id: + return + if not isinstance(self.adapter, WhatsAppCloudAdapter): + return + try: + await self.adapter.send_typing_indicator( + inbound_message_id=self.inbound_message_id + ) + record_gateway_outbound(platform="whatsapp", kind="typing", status="sent") + except Exception: + logger.debug("WhatsApp typing indicator failed", exc_info=True) + record_gateway_outbound(platform="whatsapp", kind="typing", status="failed") + + async def _send_text(self, text: str) -> PlatformSendResult: + try: + result = await self.adapter.send_message( + external_peer_id=self.external_peer_id, + text=text, + ) + except Exception: + record_gateway_outbound(platform="whatsapp", kind="send", status="failed") + raise + record_gateway_outbound(platform="whatsapp", kind="send", status="sent") + return result + + async def _handle_hitl_interrupt(self) -> None: + if self._buffer: + await self._flush_final() + await self._send_text(HITL_UNSUPPORTED_MESSAGE) + record_gateway_hitl_aborted(platform="whatsapp")