From 3b529a3ab27a8ea1ad6556c90bb2556a6f7a79bc Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Fri, 29 May 2026 10:19:13 +0530 Subject: [PATCH] feat(gateway): add Baileys WhatsApp adapter --- .../app/gateway/whatsapp/adapter_baileys.py | 118 +++++++++++++++++ .../gateway/whatsapp/translator_baileys.py | 123 ++++++++++++++++++ 2 files changed, 241 insertions(+) create mode 100644 surfsense_backend/app/gateway/whatsapp/adapter_baileys.py create mode 100644 surfsense_backend/app/gateway/whatsapp/translator_baileys.py diff --git a/surfsense_backend/app/gateway/whatsapp/adapter_baileys.py b/surfsense_backend/app/gateway/whatsapp/adapter_baileys.py new file mode 100644 index 000000000..99489e27b --- /dev/null +++ b/surfsense_backend/app/gateway/whatsapp/adapter_baileys.py @@ -0,0 +1,118 @@ +"""Baileys bridge platform adapter.""" + +from __future__ import annotations + +from collections.abc import AsyncIterator +from typing import Any + +import httpx + +from app.config import config +from app.gateway.base.adapter import ( + BasePlatformAdapter, + ParsedInboundEvent, + PlatformSendResult, +) + + +class WhatsAppBaileysAdapter(BasePlatformAdapter): + platform = "whatsapp" + + def __init__(self, bridge_url: str | None = None) -> None: + self.bridge_url = (bridge_url or config.WHATSAPP_BRIDGE_URL).rstrip("/") + + def parse_inbound(self, raw_payload: dict[str, Any]) -> ParsedInboundEvent: + chat_id = str(raw_payload.get("chatId") or "") + sender_id = str(raw_payload.get("senderId") or chat_id) + message_id = str(raw_payload.get("messageId") or "") + body = raw_payload.get("body") + is_group = bool(raw_payload.get("isGroup")) + return ParsedInboundEvent( + platform=self.platform, + event_kind="message", + external_peer_id=chat_id or None, + external_peer_kind="group" if is_group else "direct", + external_message_id=message_id or None, + external_user_id=sender_id or None, + text=str(body) if body is not None else None, + raw_payload=raw_payload, + display_name=str(raw_payload.get("chatName") or sender_id or chat_id) or None, + username=None, + metadata={ + "sender_id": sender_id, + "from_me": bool(raw_payload.get("fromMe")), + "timestamp": raw_payload.get("timestamp"), + }, + ) + + async def send_message( + self, + *, + external_peer_id: str, + text: str, + parse_mode: str | None = None, + reply_to_message_id: str | None = None, + ) -> PlatformSendResult: + payload: dict[str, Any] = {"chatId": external_peer_id, "message": text} + if reply_to_message_id: + payload["replyTo"] = reply_to_message_id + data = await self._post("/send", payload) + return PlatformSendResult( + external_message_id=str(data.get("messageId") or ""), + raw_response=data, + ) + + async def edit_message( + self, + *, + external_peer_id: str, + external_message_id: str, + text: str, + parse_mode: str | None = None, + ) -> PlatformSendResult: + data = await self._post( + "/edit", + { + "chatId": external_peer_id, + "messageId": external_message_id, + "message": text, + }, + ) + return PlatformSendResult( + external_message_id=str(data.get("messageId") or external_message_id), + raw_response=data, + ) + + async def send_typing_indicator(self, *, external_peer_id: str) -> None: + await self._post("/typing", {"chatId": external_peer_id}, expect_json=False) + + async def validate_credentials(self) -> dict[str, Any]: + async with httpx.AsyncClient(timeout=10) as client: + response = await client.get(f"{self.bridge_url}/health") + response.raise_for_status() + return response.json() + + async def fetch_updates(self, *, offset: int | None) -> AsyncIterator[dict[str, Any]]: + async with httpx.AsyncClient(timeout=35) as client: + response = await client.get(f"{self.bridge_url}/messages") + response.raise_for_status() + for message in response.json(): + if isinstance(message, dict): + yield message + + async def request_pairing_code(self, *, phone_number: str) -> dict[str, Any]: + return await self._post("/pair", {"phoneNumber": phone_number}) + + async def _post( + self, + path: str, + payload: dict[str, Any], + *, + expect_json: bool = True, + ) -> dict[str, Any]: + async with httpx.AsyncClient(timeout=30) as client: + response = await client.post(f"{self.bridge_url}{path}", json=payload) + response.raise_for_status() + if not expect_json or response.status_code == 204: + return {} + return response.json() diff --git a/surfsense_backend/app/gateway/whatsapp/translator_baileys.py b/surfsense_backend/app/gateway/whatsapp/translator_baileys.py new file mode 100644 index 000000000..8a4c8acfa --- /dev/null +++ b/surfsense_backend/app/gateway/whatsapp/translator_baileys.py @@ -0,0 +1,123 @@ +"""Translate agent stream events into Baileys bridge messages.""" + +from __future__ import annotations + +import asyncio +import logging +from collections.abc import AsyncIterator + +from app.gateway.base.adapter import BasePlatformAdapter, PlatformSendResult +from app.gateway.base.formatting import split_text_message +from app.gateway.base.translator import BaseStreamTranslator, GatewayStreamEvent +from app.gateway.whatsapp.adapter_baileys import WhatsAppBaileysAdapter +from app.observability.metrics import ( + record_gateway_hitl_aborted, + record_gateway_outbound, +) + +logger = logging.getLogger(__name__) + +HITL_UNSUPPORTED_MESSAGE = ( + "This action requires approval and is not yet supported from WhatsApp. " + "Try again with a different request." +) + + +class WhatsAppBaileysStreamTranslator(BaseStreamTranslator): + def __init__( + self, + *, + adapter: BasePlatformAdapter, + external_peer_id: str, + debounce_seconds: float = 1.5, + ) -> None: + self.adapter = adapter + self.external_peer_id = external_peer_id + self.debounce_seconds = debounce_seconds + self._buffer = "" + self._last_flush_at = 0.0 + self._external_message_ids: list[str] = [] + + async def translate(self, events: AsyncIterator[GatewayStreamEvent]) -> None: + await self._send_typing_indicator() + async for event in events: + if event.type in {"text-delta", "text_delta", "text"}: + self._buffer += str(event.data.get("text") or event.data.get("delta") or "") + await self._maybe_flush() + elif event.type in {"data-interrupt-request", "interrupt"}: + await self._handle_hitl_interrupt() + return + elif event.type in {"finish", "done"}: + break + + await self._flush(final=True) + + async def _maybe_flush(self) -> None: + now = asyncio.get_running_loop().time() + if now - self._last_flush_at < self.debounce_seconds: + return + await self._flush(final=False) + self._last_flush_at = now + + async def _flush(self, *, final: bool) -> None: + if not self._buffer: + return + + chunks = split_text_message(self._buffer) + if len(chunks) > 1: + for chunk in chunks[:-1]: + result = await self._send_text(chunk) + self._external_message_ids.append(result.external_message_id) + self._buffer = chunks[-1] + + if self._external_message_ids: + await self._edit_text(self._external_message_ids[-1], self._buffer) + else: + result = await self._send_text(self._buffer) + self._external_message_ids.append(result.external_message_id) + + if final: + logger.debug( + "WhatsApp Baileys finalized external_ids=%s", + self._external_message_ids, + ) + + async def _send_typing_indicator(self) -> None: + if not isinstance(self.adapter, WhatsAppBaileysAdapter): + return + try: + await self.adapter.send_typing_indicator(external_peer_id=self.external_peer_id) + record_gateway_outbound(platform="whatsapp", kind="typing", status="sent") + except Exception: + logger.debug("WhatsApp Baileys typing indicator failed", exc_info=True) + + async def _send_text(self, text: str) -> PlatformSendResult: + try: + result = await self.adapter.send_message( + external_peer_id=self.external_peer_id, + text=text, + ) + except Exception: + record_gateway_outbound(platform="whatsapp", kind="send", status="failed") + raise + record_gateway_outbound(platform="whatsapp", kind="send", status="sent") + return result + + async def _edit_text(self, message_id: str, text: str) -> PlatformSendResult: + try: + result = await self.adapter.edit_message( + external_peer_id=self.external_peer_id, + external_message_id=message_id, + text=text, + ) + except Exception: + record_gateway_outbound(platform="whatsapp", kind="edit", status="failed") + raise + record_gateway_outbound(platform="whatsapp", kind="edit", status="edited") + return result + + async def _handle_hitl_interrupt(self) -> None: + if self._buffer: + await self._flush(final=False) + await self._send_text(HITL_UNSUPPORTED_MESSAGE) + record_gateway_hitl_aborted(platform="whatsapp")