From 61a3586caf0729f76a10ae5c40281c2f5b4bda50 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Mon, 1 Jun 2026 12:36:39 +0530 Subject: [PATCH] feat(gateway): handle Slack thread replies --- .../app/gateway/slack/commands.py | 64 ++++++++++++++ .../app/gateway/slack/translator.py | 86 +++++++++++++++++++ 2 files changed, 150 insertions(+) create mode 100644 surfsense_backend/app/gateway/slack/commands.py create mode 100644 surfsense_backend/app/gateway/slack/translator.py diff --git a/surfsense_backend/app/gateway/slack/commands.py b/surfsense_backend/app/gateway/slack/commands.py new file mode 100644 index 000000000..ffbd5863b --- /dev/null +++ b/surfsense_backend/app/gateway/slack/commands.py @@ -0,0 +1,64 @@ +"""Slack command/onboarding handlers.""" + +from __future__ import annotations + +from app.gateway.base.adapter import ParsedInboundEvent +from app.gateway.base.commands import BaseGatewayCommands +from app.gateway.ratelimit import acquire_token +from app.gateway.slack.adapter import SlackAdapter + +HELP_TEXT = ( + "SurfSense Slack commands:\n" + "`/new` - start a fresh SurfSense conversation in this thread\n" + "`/help` - show this help\n\n" + "Mention the SurfSense bot in a channel thread to ask your agent a question." +) + + +class SlackGatewayCommands(BaseGatewayCommands): + async def handle_help_command( + self, + *, + adapter: SlackAdapter, + event: ParsedInboundEvent, + ) -> bool: + channel_id = event.metadata.get("channel_id") + thread_ts = event.metadata.get("thread_ts") + if not channel_id or not thread_ts: + return True + await adapter.send_message( + external_peer_id=channel_id, + text=HELP_TEXT, + reply_to_message_id=thread_ts, + ) + return True + + async def send_unbound_onboarding( + self, + *, + adapter: SlackAdapter, + event: ParsedInboundEvent, + dashboard_url: str, + ) -> None: + channel_id = event.metadata.get("channel_id") + thread_ts = event.metadata.get("thread_ts") + slack_user_id = event.metadata.get("slack_user_id") + if not channel_id or not thread_ts: + return + + wait_ms = await acquire_token( + f"slack:onboarded:{event.metadata.get('team_id')}:{slack_user_id}", + capacity=1, + refill_per_sec=1 / 3600, + ) + if wait_ms > 0: + return + + await adapter.send_message( + external_peer_id=channel_id, + reply_to_message_id=thread_ts, + text=( + "Hi! Connect your Slack user to SurfSense before using the bot here: " + f"{dashboard_url}" + ), + ) diff --git a/surfsense_backend/app/gateway/slack/translator.py b/surfsense_backend/app/gateway/slack/translator.py new file mode 100644 index 000000000..658b0cac7 --- /dev/null +++ b/surfsense_backend/app/gateway/slack/translator.py @@ -0,0 +1,86 @@ +"""Translate agent stream events into Slack thread replies.""" + +from __future__ import annotations + +import logging +from collections.abc import AsyncIterator + +from app.gateway.base.adapter import PlatformSendResult +from app.gateway.base.formatting import split_text_message +from app.gateway.base.translator import BaseStreamTranslator, GatewayStreamEvent +from app.gateway.ratelimit import wait_for_token +from app.gateway.slack.adapter import SlackAdapter +from app.observability.metrics import ( + record_gateway_hitl_aborted, + record_gateway_outbound, + record_gateway_rate_limit_hit, +) + +logger = logging.getLogger(__name__) + +SLACK_MAX_MESSAGE_CHARS = 35000 +HITL_UNSUPPORTED_MESSAGE = ( + "This action requires approval and is not yet supported from Slack. " + "Try again with a different request." +) + + +class SlackStreamTranslator(BaseStreamTranslator): + def __init__( + self, + *, + adapter: SlackAdapter, + channel_id: str, + thread_ts: str, + ) -> None: + self.adapter = adapter + self.channel_id = channel_id + self.thread_ts = thread_ts + self._buffer = "" + + async def translate(self, events: AsyncIterator[GatewayStreamEvent]) -> None: + async for event in events: + if event.type in {"text-delta", "text_delta", "text"}: + self._buffer += str(event.data.get("text") or event.data.get("delta") or "") + elif event.type in {"data-interrupt-request", "interrupt"}: + await self._handle_hitl_interrupt() + return + elif event.type in {"finish", "done"}: + break + + await self._flush_final() + + async def _flush_final(self) -> None: + if not self._buffer: + return + for chunk in split_text_message(self._buffer, max_chars=SLACK_MAX_MESSAGE_CHARS): + await self._send_text(chunk) + + async def _send_text(self, text: str) -> PlatformSendResult: + await self._throttle() + try: + result = await self.adapter.send_message( + external_peer_id=self.channel_id, + text=text, + reply_to_message_id=self.thread_ts, + ) + except Exception: + record_gateway_outbound(platform="slack", kind="send", status="failed") + raise + record_gateway_outbound(platform="slack", kind="send", status="sent") + return result + + async def _throttle(self) -> None: + chat_wait = await wait_for_token( + f"slack:channel:{self.channel_id}", + capacity=1, + refill_per_sec=1.0, + ) + if chat_wait: + record_gateway_rate_limit_hit(bucket="slack:channel") + + async def _handle_hitl_interrupt(self) -> None: + if self._buffer: + await self._flush_final() + await self._send_text(HITL_UNSUPPORTED_MESSAGE) + record_gateway_hitl_aborted(platform="slack")