mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-02 19:55:18 +02:00
feat(gateway): handle Slack thread replies
This commit is contained in:
parent
78315eb55b
commit
61a3586caf
2 changed files with 150 additions and 0 deletions
64
surfsense_backend/app/gateway/slack/commands.py
Normal file
64
surfsense_backend/app/gateway/slack/commands.py
Normal file
|
|
@ -0,0 +1,64 @@
|
||||||
|
"""Slack command/onboarding handlers."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from app.gateway.base.adapter import ParsedInboundEvent
|
||||||
|
from app.gateway.base.commands import BaseGatewayCommands
|
||||||
|
from app.gateway.ratelimit import acquire_token
|
||||||
|
from app.gateway.slack.adapter import SlackAdapter
|
||||||
|
|
||||||
|
HELP_TEXT = (
|
||||||
|
"SurfSense Slack commands:\n"
|
||||||
|
"`/new` - start a fresh SurfSense conversation in this thread\n"
|
||||||
|
"`/help` - show this help\n\n"
|
||||||
|
"Mention the SurfSense bot in a channel thread to ask your agent a question."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class SlackGatewayCommands(BaseGatewayCommands):
|
||||||
|
async def handle_help_command(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
adapter: SlackAdapter,
|
||||||
|
event: ParsedInboundEvent,
|
||||||
|
) -> bool:
|
||||||
|
channel_id = event.metadata.get("channel_id")
|
||||||
|
thread_ts = event.metadata.get("thread_ts")
|
||||||
|
if not channel_id or not thread_ts:
|
||||||
|
return True
|
||||||
|
await adapter.send_message(
|
||||||
|
external_peer_id=channel_id,
|
||||||
|
text=HELP_TEXT,
|
||||||
|
reply_to_message_id=thread_ts,
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
|
||||||
|
async def send_unbound_onboarding(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
adapter: SlackAdapter,
|
||||||
|
event: ParsedInboundEvent,
|
||||||
|
dashboard_url: str,
|
||||||
|
) -> None:
|
||||||
|
channel_id = event.metadata.get("channel_id")
|
||||||
|
thread_ts = event.metadata.get("thread_ts")
|
||||||
|
slack_user_id = event.metadata.get("slack_user_id")
|
||||||
|
if not channel_id or not thread_ts:
|
||||||
|
return
|
||||||
|
|
||||||
|
wait_ms = await acquire_token(
|
||||||
|
f"slack:onboarded:{event.metadata.get('team_id')}:{slack_user_id}",
|
||||||
|
capacity=1,
|
||||||
|
refill_per_sec=1 / 3600,
|
||||||
|
)
|
||||||
|
if wait_ms > 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
await adapter.send_message(
|
||||||
|
external_peer_id=channel_id,
|
||||||
|
reply_to_message_id=thread_ts,
|
||||||
|
text=(
|
||||||
|
"Hi! Connect your Slack user to SurfSense before using the bot here: "
|
||||||
|
f"{dashboard_url}"
|
||||||
|
),
|
||||||
|
)
|
||||||
86
surfsense_backend/app/gateway/slack/translator.py
Normal file
86
surfsense_backend/app/gateway/slack/translator.py
Normal file
|
|
@ -0,0 +1,86 @@
|
||||||
|
"""Translate agent stream events into Slack thread replies."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from collections.abc import AsyncIterator
|
||||||
|
|
||||||
|
from app.gateway.base.adapter import PlatformSendResult
|
||||||
|
from app.gateway.base.formatting import split_text_message
|
||||||
|
from app.gateway.base.translator import BaseStreamTranslator, GatewayStreamEvent
|
||||||
|
from app.gateway.ratelimit import wait_for_token
|
||||||
|
from app.gateway.slack.adapter import SlackAdapter
|
||||||
|
from app.observability.metrics import (
|
||||||
|
record_gateway_hitl_aborted,
|
||||||
|
record_gateway_outbound,
|
||||||
|
record_gateway_rate_limit_hit,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
SLACK_MAX_MESSAGE_CHARS = 35000
|
||||||
|
HITL_UNSUPPORTED_MESSAGE = (
|
||||||
|
"This action requires approval and is not yet supported from Slack. "
|
||||||
|
"Try again with a different request."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class SlackStreamTranslator(BaseStreamTranslator):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
adapter: SlackAdapter,
|
||||||
|
channel_id: str,
|
||||||
|
thread_ts: str,
|
||||||
|
) -> None:
|
||||||
|
self.adapter = adapter
|
||||||
|
self.channel_id = channel_id
|
||||||
|
self.thread_ts = thread_ts
|
||||||
|
self._buffer = ""
|
||||||
|
|
||||||
|
async def translate(self, events: AsyncIterator[GatewayStreamEvent]) -> None:
|
||||||
|
async for event in events:
|
||||||
|
if event.type in {"text-delta", "text_delta", "text"}:
|
||||||
|
self._buffer += str(event.data.get("text") or event.data.get("delta") or "")
|
||||||
|
elif event.type in {"data-interrupt-request", "interrupt"}:
|
||||||
|
await self._handle_hitl_interrupt()
|
||||||
|
return
|
||||||
|
elif event.type in {"finish", "done"}:
|
||||||
|
break
|
||||||
|
|
||||||
|
await self._flush_final()
|
||||||
|
|
||||||
|
async def _flush_final(self) -> None:
|
||||||
|
if not self._buffer:
|
||||||
|
return
|
||||||
|
for chunk in split_text_message(self._buffer, max_chars=SLACK_MAX_MESSAGE_CHARS):
|
||||||
|
await self._send_text(chunk)
|
||||||
|
|
||||||
|
async def _send_text(self, text: str) -> PlatformSendResult:
|
||||||
|
await self._throttle()
|
||||||
|
try:
|
||||||
|
result = await self.adapter.send_message(
|
||||||
|
external_peer_id=self.channel_id,
|
||||||
|
text=text,
|
||||||
|
reply_to_message_id=self.thread_ts,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
record_gateway_outbound(platform="slack", kind="send", status="failed")
|
||||||
|
raise
|
||||||
|
record_gateway_outbound(platform="slack", kind="send", status="sent")
|
||||||
|
return result
|
||||||
|
|
||||||
|
async def _throttle(self) -> None:
|
||||||
|
chat_wait = await wait_for_token(
|
||||||
|
f"slack:channel:{self.channel_id}",
|
||||||
|
capacity=1,
|
||||||
|
refill_per_sec=1.0,
|
||||||
|
)
|
||||||
|
if chat_wait:
|
||||||
|
record_gateway_rate_limit_hit(bucket="slack:channel")
|
||||||
|
|
||||||
|
async def _handle_hitl_interrupt(self) -> None:
|
||||||
|
if self._buffer:
|
||||||
|
await self._flush_final()
|
||||||
|
await self._send_text(HITL_UNSUPPORTED_MESSAGE)
|
||||||
|
record_gateway_hitl_aborted(platform="slack")
|
||||||
Loading…
Add table
Add a link
Reference in a new issue