mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-02 19:55:18 +02:00
feat(gateway): add Baileys WhatsApp adapter
This commit is contained in:
parent
dbd9966219
commit
3b529a3ab2
2 changed files with 241 additions and 0 deletions
118
surfsense_backend/app/gateway/whatsapp/adapter_baileys.py
Normal file
118
surfsense_backend/app/gateway/whatsapp/adapter_baileys.py
Normal file
|
|
@ -0,0 +1,118 @@
|
|||
"""Baileys bridge platform adapter."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import AsyncIterator
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
from app.config import config
|
||||
from app.gateway.base.adapter import (
|
||||
BasePlatformAdapter,
|
||||
ParsedInboundEvent,
|
||||
PlatformSendResult,
|
||||
)
|
||||
|
||||
|
||||
class WhatsAppBaileysAdapter(BasePlatformAdapter):
|
||||
platform = "whatsapp"
|
||||
|
||||
def __init__(self, bridge_url: str | None = None) -> None:
|
||||
self.bridge_url = (bridge_url or config.WHATSAPP_BRIDGE_URL).rstrip("/")
|
||||
|
||||
def parse_inbound(self, raw_payload: dict[str, Any]) -> ParsedInboundEvent:
|
||||
chat_id = str(raw_payload.get("chatId") or "")
|
||||
sender_id = str(raw_payload.get("senderId") or chat_id)
|
||||
message_id = str(raw_payload.get("messageId") or "")
|
||||
body = raw_payload.get("body")
|
||||
is_group = bool(raw_payload.get("isGroup"))
|
||||
return ParsedInboundEvent(
|
||||
platform=self.platform,
|
||||
event_kind="message",
|
||||
external_peer_id=chat_id or None,
|
||||
external_peer_kind="group" if is_group else "direct",
|
||||
external_message_id=message_id or None,
|
||||
external_user_id=sender_id or None,
|
||||
text=str(body) if body is not None else None,
|
||||
raw_payload=raw_payload,
|
||||
display_name=str(raw_payload.get("chatName") or sender_id or chat_id) or None,
|
||||
username=None,
|
||||
metadata={
|
||||
"sender_id": sender_id,
|
||||
"from_me": bool(raw_payload.get("fromMe")),
|
||||
"timestamp": raw_payload.get("timestamp"),
|
||||
},
|
||||
)
|
||||
|
||||
async def send_message(
|
||||
self,
|
||||
*,
|
||||
external_peer_id: str,
|
||||
text: str,
|
||||
parse_mode: str | None = None,
|
||||
reply_to_message_id: str | None = None,
|
||||
) -> PlatformSendResult:
|
||||
payload: dict[str, Any] = {"chatId": external_peer_id, "message": text}
|
||||
if reply_to_message_id:
|
||||
payload["replyTo"] = reply_to_message_id
|
||||
data = await self._post("/send", payload)
|
||||
return PlatformSendResult(
|
||||
external_message_id=str(data.get("messageId") or ""),
|
||||
raw_response=data,
|
||||
)
|
||||
|
||||
async def edit_message(
|
||||
self,
|
||||
*,
|
||||
external_peer_id: str,
|
||||
external_message_id: str,
|
||||
text: str,
|
||||
parse_mode: str | None = None,
|
||||
) -> PlatformSendResult:
|
||||
data = await self._post(
|
||||
"/edit",
|
||||
{
|
||||
"chatId": external_peer_id,
|
||||
"messageId": external_message_id,
|
||||
"message": text,
|
||||
},
|
||||
)
|
||||
return PlatformSendResult(
|
||||
external_message_id=str(data.get("messageId") or external_message_id),
|
||||
raw_response=data,
|
||||
)
|
||||
|
||||
async def send_typing_indicator(self, *, external_peer_id: str) -> None:
|
||||
await self._post("/typing", {"chatId": external_peer_id}, expect_json=False)
|
||||
|
||||
async def validate_credentials(self) -> dict[str, Any]:
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
response = await client.get(f"{self.bridge_url}/health")
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
async def fetch_updates(self, *, offset: int | None) -> AsyncIterator[dict[str, Any]]:
|
||||
async with httpx.AsyncClient(timeout=35) as client:
|
||||
response = await client.get(f"{self.bridge_url}/messages")
|
||||
response.raise_for_status()
|
||||
for message in response.json():
|
||||
if isinstance(message, dict):
|
||||
yield message
|
||||
|
||||
async def request_pairing_code(self, *, phone_number: str) -> dict[str, Any]:
|
||||
return await self._post("/pair", {"phoneNumber": phone_number})
|
||||
|
||||
async def _post(
|
||||
self,
|
||||
path: str,
|
||||
payload: dict[str, Any],
|
||||
*,
|
||||
expect_json: bool = True,
|
||||
) -> dict[str, Any]:
|
||||
async with httpx.AsyncClient(timeout=30) as client:
|
||||
response = await client.post(f"{self.bridge_url}{path}", json=payload)
|
||||
response.raise_for_status()
|
||||
if not expect_json or response.status_code == 204:
|
||||
return {}
|
||||
return response.json()
|
||||
123
surfsense_backend/app/gateway/whatsapp/translator_baileys.py
Normal file
123
surfsense_backend/app/gateway/whatsapp/translator_baileys.py
Normal file
|
|
@ -0,0 +1,123 @@
|
|||
"""Translate agent stream events into Baileys bridge messages."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from collections.abc import AsyncIterator
|
||||
|
||||
from app.gateway.base.adapter import BasePlatformAdapter, PlatformSendResult
|
||||
from app.gateway.base.formatting import split_text_message
|
||||
from app.gateway.base.translator import BaseStreamTranslator, GatewayStreamEvent
|
||||
from app.gateway.whatsapp.adapter_baileys import WhatsAppBaileysAdapter
|
||||
from app.observability.metrics import (
|
||||
record_gateway_hitl_aborted,
|
||||
record_gateway_outbound,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
HITL_UNSUPPORTED_MESSAGE = (
|
||||
"This action requires approval and is not yet supported from WhatsApp. "
|
||||
"Try again with a different request."
|
||||
)
|
||||
|
||||
|
||||
class WhatsAppBaileysStreamTranslator(BaseStreamTranslator):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
adapter: BasePlatformAdapter,
|
||||
external_peer_id: str,
|
||||
debounce_seconds: float = 1.5,
|
||||
) -> None:
|
||||
self.adapter = adapter
|
||||
self.external_peer_id = external_peer_id
|
||||
self.debounce_seconds = debounce_seconds
|
||||
self._buffer = ""
|
||||
self._last_flush_at = 0.0
|
||||
self._external_message_ids: list[str] = []
|
||||
|
||||
async def translate(self, events: AsyncIterator[GatewayStreamEvent]) -> None:
|
||||
await self._send_typing_indicator()
|
||||
async for event in events:
|
||||
if event.type in {"text-delta", "text_delta", "text"}:
|
||||
self._buffer += str(event.data.get("text") or event.data.get("delta") or "")
|
||||
await self._maybe_flush()
|
||||
elif event.type in {"data-interrupt-request", "interrupt"}:
|
||||
await self._handle_hitl_interrupt()
|
||||
return
|
||||
elif event.type in {"finish", "done"}:
|
||||
break
|
||||
|
||||
await self._flush(final=True)
|
||||
|
||||
async def _maybe_flush(self) -> None:
|
||||
now = asyncio.get_running_loop().time()
|
||||
if now - self._last_flush_at < self.debounce_seconds:
|
||||
return
|
||||
await self._flush(final=False)
|
||||
self._last_flush_at = now
|
||||
|
||||
async def _flush(self, *, final: bool) -> None:
|
||||
if not self._buffer:
|
||||
return
|
||||
|
||||
chunks = split_text_message(self._buffer)
|
||||
if len(chunks) > 1:
|
||||
for chunk in chunks[:-1]:
|
||||
result = await self._send_text(chunk)
|
||||
self._external_message_ids.append(result.external_message_id)
|
||||
self._buffer = chunks[-1]
|
||||
|
||||
if self._external_message_ids:
|
||||
await self._edit_text(self._external_message_ids[-1], self._buffer)
|
||||
else:
|
||||
result = await self._send_text(self._buffer)
|
||||
self._external_message_ids.append(result.external_message_id)
|
||||
|
||||
if final:
|
||||
logger.debug(
|
||||
"WhatsApp Baileys finalized external_ids=%s",
|
||||
self._external_message_ids,
|
||||
)
|
||||
|
||||
async def _send_typing_indicator(self) -> None:
|
||||
if not isinstance(self.adapter, WhatsAppBaileysAdapter):
|
||||
return
|
||||
try:
|
||||
await self.adapter.send_typing_indicator(external_peer_id=self.external_peer_id)
|
||||
record_gateway_outbound(platform="whatsapp", kind="typing", status="sent")
|
||||
except Exception:
|
||||
logger.debug("WhatsApp Baileys typing indicator failed", exc_info=True)
|
||||
|
||||
async def _send_text(self, text: str) -> PlatformSendResult:
|
||||
try:
|
||||
result = await self.adapter.send_message(
|
||||
external_peer_id=self.external_peer_id,
|
||||
text=text,
|
||||
)
|
||||
except Exception:
|
||||
record_gateway_outbound(platform="whatsapp", kind="send", status="failed")
|
||||
raise
|
||||
record_gateway_outbound(platform="whatsapp", kind="send", status="sent")
|
||||
return result
|
||||
|
||||
async def _edit_text(self, message_id: str, text: str) -> PlatformSendResult:
|
||||
try:
|
||||
result = await self.adapter.edit_message(
|
||||
external_peer_id=self.external_peer_id,
|
||||
external_message_id=message_id,
|
||||
text=text,
|
||||
)
|
||||
except Exception:
|
||||
record_gateway_outbound(platform="whatsapp", kind="edit", status="failed")
|
||||
raise
|
||||
record_gateway_outbound(platform="whatsapp", kind="edit", status="edited")
|
||||
return result
|
||||
|
||||
async def _handle_hitl_interrupt(self) -> None:
|
||||
if self._buffer:
|
||||
await self._flush(final=False)
|
||||
await self._send_text(HITL_UNSUPPORTED_MESSAGE)
|
||||
record_gateway_hitl_aborted(platform="whatsapp")
|
||||
Loading…
Add table
Add a link
Reference in a new issue