From 51bf2a8361b24e9a97ad77e788a282b324e82c79 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Fri, 29 May 2026 10:20:25 +0530 Subject: [PATCH] feat(gateway): wire WhatsApp bridge runtime --- docker/docker-compose.yml | 22 +++++ .../app/gateway/byo_long_poll.py | 97 +++++++++++++++---- 2 files changed, 102 insertions(+), 17 deletions(-) diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 06a3ac79a..12b7a6a33 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -118,6 +118,7 @@ services: UNSTRUCTURED_HAS_PATCHED_LOOP: "1" NEXT_FRONTEND_URL: ${NEXT_FRONTEND_URL:-http://localhost:${FRONTEND_PORT:-3929}} SEARXNG_DEFAULT_HOST: ${SEARXNG_DEFAULT_HOST:-http://searxng:8080} + WHATSAPP_BRIDGE_URL: ${WHATSAPP_BRIDGE_URL:-http://whatsapp-bridge:3000} # Daytona Sandbox – uncomment and set credentials to enable cloud code execution # DAYTONA_SANDBOX_ENABLED: "TRUE" # DAYTONA_API_KEY: ${DAYTONA_API_KEY:-} @@ -143,6 +144,23 @@ services: retries: 30 start_period: 200s + whatsapp-bridge: + build: ../surfsense_backend/scripts/whatsapp-bridge + profiles: + - whatsapp + volumes: + - whatsapp_sessions:/data/sessions + environment: + WHATSAPP_MODE: ${WHATSAPP_MODE:-self-chat} + WHATSAPP_SESSION_DIR: /data/sessions + mem_limit: 512m + restart: unless-stopped + healthcheck: + test: ["CMD", "wget", "-qO-", "http://localhost:3000/health"] + interval: 30s + timeout: 5s + retries: 5 + celery_worker: image: ghcr.io/modsetter/surfsense-backend:${SURFSENSE_VERSION:-latest} volumes: @@ -264,6 +282,8 @@ services: NEXT_PUBLIC_FASTAPI_BACKEND_AUTH_TYPE: ${AUTH_TYPE:-LOCAL} NEXT_PUBLIC_ETL_SERVICE: ${ETL_SERVICE:-DOCLING} NEXT_PUBLIC_DEPLOYMENT_MODE: ${DEPLOYMENT_MODE:-self-hosted} + NEXT_PUBLIC_GATEWAY_WHATSAPP_INTAKE_MODE: ${GATEWAY_WHATSAPP_INTAKE_MODE:-disabled} + NEXT_PUBLIC_WHATSAPP_DISPLAY_PHONE_NUMBER: ${WHATSAPP_SHARED_DISPLAY_PHONE_NUMBER:-} FASTAPI_BACKEND_INTERNAL_URL: ${FASTAPI_BACKEND_INTERNAL_URL:-http://backend:8000} labels: - "com.centurylinklabs.watchtower.enable=true" @@ -285,3 +305,5 @@ volumes: name: surfsense-zero-cache zero_init: name: surfsense-zero-init + whatsapp_sessions: + name: surfsense-whatsapp-sessions diff --git a/surfsense_backend/app/gateway/byo_long_poll.py b/surfsense_backend/app/gateway/byo_long_poll.py index 0be448ae3..bb7ba53ad 100644 --- a/surfsense_backend/app/gateway/byo_long_poll.py +++ b/surfsense_backend/app/gateway/byo_long_poll.py @@ -8,9 +8,17 @@ import logging from sqlalchemy import select from app.config import config -from app.db import ExternalChatPlatform, ExternalChatAccount, async_session_maker +from app.db import ( + ExternalChatAccount, + ExternalChatAccountMode, + ExternalChatPlatform, + async_session_maker, +) from app.gateway.accounts import account_token +from app.gateway.inbox import persist_inbound_event from app.gateway.runner import _run_telegram_account +from app.gateway.whatsapp.adapter_baileys import WhatsAppBaileysAdapter +from app.observability.metrics import record_gateway_inbox_write logger = logging.getLogger(__name__) @@ -42,37 +50,92 @@ async def _byo_account_supervisor(account_id: int, token: str) -> None: await _sleep_or_shutdown(30) +async def _whatsapp_baileys_supervisor() -> None: + adapter = WhatsAppBaileysAdapter() + while _shutdown_event is None or not _shutdown_event.is_set(): + try: + async for raw_event in adapter.fetch_updates(offset=None): + async with async_session_maker() as session: + result = await session.execute( + select(ExternalChatAccount).where( + ExternalChatAccount.platform == ExternalChatPlatform.WHATSAPP, + ExternalChatAccount.mode == ExternalChatAccountMode.SELF_HOST_BYO, + ExternalChatAccount.is_system_account.is_(False), + ExternalChatAccount.suspended_at.is_(None), + ) + ) + account = result.scalars().first() + if account is None: + continue + message_id = str(raw_event.get("messageId") or "") + if not message_id: + continue + inbox_id = await persist_inbound_event( + session, + account_id=account.id, + platform=ExternalChatPlatform.WHATSAPP, + event_dedupe_key=f"baileys:{message_id}", + external_event_id=message_id, + external_message_id=message_id, + event_kind="message", + raw_payload=raw_event, + ) + await session.commit() + record_gateway_inbox_write( + platform="whatsapp", + dedup_skipped=inbox_id is None, + ) + except asyncio.CancelledError: + raise + except Exception: + logger.exception("WhatsApp Baileys intake failed; retrying in 10s") + await _sleep_or_shutdown(10) + + async def start_byo_long_poll_supervisors() -> None: """Start one BYO long-poll supervisor per active non-system Telegram account.""" global _shutdown_event - if config.GATEWAY_TELEGRAM_INTAKE_MODE != "longpoll": + if ( + config.GATEWAY_TELEGRAM_INTAKE_MODE != "longpoll" + and config.GATEWAY_WHATSAPP_INTAKE_MODE != "baileys" + ): return if _tasks: return _shutdown_event = asyncio.Event() - async with async_session_maker() as session: - result = await session.execute( - select(ExternalChatAccount).where( - ExternalChatAccount.platform == ExternalChatPlatform.TELEGRAM, - ExternalChatAccount.is_system_account.is_(False), - ExternalChatAccount.suspended_at.is_(None), + if config.GATEWAY_TELEGRAM_INTAKE_MODE == "longpoll": + async with async_session_maker() as session: + result = await session.execute( + select(ExternalChatAccount).where( + ExternalChatAccount.platform == ExternalChatPlatform.TELEGRAM, + ExternalChatAccount.is_system_account.is_(False), + ExternalChatAccount.suspended_at.is_(None), + ) ) - ) - accounts = list(result.scalars()) + accounts = list(result.scalars()) - for account in accounts: - token = account_token(account) - if not token: - continue + for account in accounts: + token = account_token(account) + if not token: + continue + task = asyncio.create_task( + _byo_account_supervisor(int(account.id), token), + name=f"gateway-byo-telegram-{account.id}", + ) + _tasks.add(task) + task.add_done_callback(_tasks.discard) + logger.info("Started BYO Telegram long-poll supervisor account_id=%s", account.id) + + if config.GATEWAY_WHATSAPP_INTAKE_MODE == "baileys": task = asyncio.create_task( - _byo_account_supervisor(int(account.id), token), - name=f"gateway-byo-telegram-{account.id}", + _whatsapp_baileys_supervisor(), + name="gateway-byo-whatsapp-baileys", ) _tasks.add(task) task.add_done_callback(_tasks.discard) - logger.info("Started BYO Telegram long-poll supervisor account_id=%s", account.id) + logger.info("Started WhatsApp Baileys bridge intake supervisor") async def stop_byo_long_poll_supervisors() -> None: