From 1392abf5b1d9cbb2309c7780edeacf9c0c72f205 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 6 May 2026 20:08:48 +0200 Subject: [PATCH] Add chat tool streaming registry with shared, default, and connector tools. --- .../chat/streaming/handlers/tools/__init__.py | 23 +++++ .../handlers/tools/connector/__init__.py | 0 .../tools/connector/shared/__init__.py | 0 .../tools/connector/shared/emission.py | 15 ++++ .../tools/connector/shared/thinking.py | 22 +++++ .../tools/connector/shared/tool_names.py | 31 +++++++ .../handlers/tools/default/__init__.py | 3 + .../handlers/tools/default/emission.py | 24 +++++ .../handlers/tools/default/thinking.py | 23 +++++ .../handlers/tools/emission_context.py | 34 +++++++ .../chat/streaming/handlers/tools/registry.py | 88 +++++++++++++++++++ .../handlers/tools/shared/__init__.py | 0 .../streaming/handlers/tools/shared/model.py | 12 +++ 13 files changed, 275 insertions(+) create mode 100644 surfsense_backend/app/tasks/chat/streaming/handlers/tools/__init__.py create mode 100644 surfsense_backend/app/tasks/chat/streaming/handlers/tools/connector/__init__.py create mode 100644 surfsense_backend/app/tasks/chat/streaming/handlers/tools/connector/shared/__init__.py create mode 100644 surfsense_backend/app/tasks/chat/streaming/handlers/tools/connector/shared/emission.py create mode 100644 surfsense_backend/app/tasks/chat/streaming/handlers/tools/connector/shared/thinking.py create mode 100644 surfsense_backend/app/tasks/chat/streaming/handlers/tools/connector/shared/tool_names.py create mode 100644 surfsense_backend/app/tasks/chat/streaming/handlers/tools/default/__init__.py create mode 100644 surfsense_backend/app/tasks/chat/streaming/handlers/tools/default/emission.py create mode 100644 surfsense_backend/app/tasks/chat/streaming/handlers/tools/default/thinking.py create mode 100644 surfsense_backend/app/tasks/chat/streaming/handlers/tools/emission_context.py create mode 100644 surfsense_backend/app/tasks/chat/streaming/handlers/tools/registry.py create mode 100644 surfsense_backend/app/tasks/chat/streaming/handlers/tools/shared/__init__.py create mode 100644 surfsense_backend/app/tasks/chat/streaming/handlers/tools/shared/model.py diff --git a/surfsense_backend/app/tasks/chat/streaming/handlers/tools/__init__.py b/surfsense_backend/app/tasks/chat/streaming/handlers/tools/__init__.py new file mode 100644 index 000000000..4b191c100 --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/handlers/tools/__init__.py @@ -0,0 +1,23 @@ +"""Per-tool streaming: thinking-step and completion emission.""" + +from __future__ import annotations + +from app.tasks.chat.streaming.handlers.tools.emission_context import ( + ToolCompletionEmissionContext, +) +from app.tasks.chat.streaming.handlers.tools.registry import ( + iter_tool_completion_emission_frames, + resolve_tool_completed_thinking_step, + resolve_tool_start_thinking, +) +from app.tasks.chat.streaming.handlers.tools.shared.model import ( + ToolStartThinking, +) + +__all__ = [ + "ToolCompletionEmissionContext", + "ToolStartThinking", + "iter_tool_completion_emission_frames", + "resolve_tool_completed_thinking_step", + "resolve_tool_start_thinking", +] diff --git a/surfsense_backend/app/tasks/chat/streaming/handlers/tools/connector/__init__.py b/surfsense_backend/app/tasks/chat/streaming/handlers/tools/connector/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/surfsense_backend/app/tasks/chat/streaming/handlers/tools/connector/shared/__init__.py b/surfsense_backend/app/tasks/chat/streaming/handlers/tools/connector/shared/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/surfsense_backend/app/tasks/chat/streaming/handlers/tools/connector/shared/emission.py b/surfsense_backend/app/tasks/chat/streaming/handlers/tools/connector/shared/emission.py new file mode 100644 index 000000000..8e19dc224 --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/handlers/tools/connector/shared/emission.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +from collections.abc import Iterator + +from app.tasks.chat.streaming.handlers.tools.emission_context import ( + ToolCompletionEmissionContext, +) + + +def iter_completion_emission_frames( + ctx: ToolCompletionEmissionContext, +) -> Iterator[str]: + out = ctx.tool_output + payload = out if isinstance(out, dict) else {"result": out} + yield ctx.emit_tool_output_card(payload) diff --git a/surfsense_backend/app/tasks/chat/streaming/handlers/tools/connector/shared/thinking.py b/surfsense_backend/app/tasks/chat/streaming/handlers/tools/connector/shared/thinking.py new file mode 100644 index 000000000..7e9dd8b96 --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/handlers/tools/connector/shared/thinking.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +from typing import Any + +from app.tasks.chat.streaming.handlers.tools.default import ( + thinking as default_thinking, +) +from app.tasks.chat.streaming.handlers.tools.shared.model import ( + ToolStartThinking, +) + + +def resolve_start_thinking(tool_name: str, tool_input: Any) -> ToolStartThinking: + return default_thinking.resolve_start_thinking(tool_name, tool_input) + + +def resolve_completed_thinking( + tool_name: str, tool_output: Any, last_items: list[str], +) -> tuple[str, list[str]]: + return default_thinking.resolve_completed_thinking( + tool_name, tool_output, last_items + ) diff --git a/surfsense_backend/app/tasks/chat/streaming/handlers/tools/connector/shared/tool_names.py b/surfsense_backend/app/tasks/chat/streaming/handlers/tools/connector/shared/tool_names.py new file mode 100644 index 000000000..ab698b32d --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/handlers/tools/connector/shared/tool_names.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +SHARED_CONNECTOR_TOOLS: frozenset[str] = frozenset( + { + "create_calendar_event", + "create_confluence_page", + "create_dropbox_file", + "create_gmail_draft", + "create_google_drive_file", + "create_jira_issue", + "create_linear_issue", + "create_notion_page", + "create_onedrive_file", + "delete_calendar_event", + "delete_confluence_page", + "delete_dropbox_file", + "delete_google_drive_file", + "delete_jira_issue", + "delete_linear_issue", + "delete_notion_page", + "delete_onedrive_file", + "send_gmail_email", + "trash_gmail_email", + "update_calendar_event", + "update_confluence_page", + "update_gmail_draft", + "update_jira_issue", + "update_linear_issue", + "update_notion_page", + } +) diff --git a/surfsense_backend/app/tasks/chat/streaming/handlers/tools/default/__init__.py b/surfsense_backend/app/tasks/chat/streaming/handlers/tools/default/__init__.py new file mode 100644 index 000000000..5e84a37f4 --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/handlers/tools/default/__init__.py @@ -0,0 +1,3 @@ +"""Fallback tool package.""" + +from __future__ import annotations diff --git a/surfsense_backend/app/tasks/chat/streaming/handlers/tools/default/emission.py b/surfsense_backend/app/tasks/chat/streaming/handlers/tools/default/emission.py new file mode 100644 index 000000000..e24c619a7 --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/handlers/tools/default/emission.py @@ -0,0 +1,24 @@ +"""Default tool-output card and a short completion terminal line.""" + +from __future__ import annotations + +from collections.abc import Iterator + +from app.tasks.chat.streaming.handlers.tools.emission_context import ( + ToolCompletionEmissionContext, +) + + +def iter_completion_emission_frames( + ctx: ToolCompletionEmissionContext, +) -> Iterator[str]: + yield ctx.emit_tool_output_card( + { + "status": "completed", + "result_length": len(str(ctx.tool_output)), + }, + ) + yield ctx.streaming_service.format_terminal_info( + f"Tool {ctx.tool_name} completed", + "success", + ) diff --git a/surfsense_backend/app/tasks/chat/streaming/handlers/tools/default/thinking.py b/surfsense_backend/app/tasks/chat/streaming/handlers/tools/default/thinking.py new file mode 100644 index 000000000..46d15a4e7 --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/handlers/tools/default/thinking.py @@ -0,0 +1,23 @@ +"""Fallback thinking-step copy for unknown tools and connectors without custom UI.""" + +from __future__ import annotations + +from typing import Any + +from app.tasks.chat.streaming.handlers.tools.shared.model import ( + ToolStartThinking, +) + + +def resolve_start_thinking(tool_name: str, tool_input: Any) -> ToolStartThinking: + del tool_input + title = tool_name.replace("_", " ").strip().capitalize() or tool_name + return ToolStartThinking(title=title, items=[], include_items_on_frame=False) + + +def resolve_completed_thinking( + tool_name: str, tool_output: Any, last_items: list[str] +) -> tuple[str, list[str]]: + del tool_output + title = tool_name.replace("_", " ").strip().capitalize() or tool_name + return (title, last_items) diff --git a/surfsense_backend/app/tasks/chat/streaming/handlers/tools/emission_context.py b/surfsense_backend/app/tasks/chat/streaming/handlers/tools/emission_context.py new file mode 100644 index 000000000..d9ff796c0 --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/handlers/tools/emission_context.py @@ -0,0 +1,34 @@ +"""Context for one tool-completion emission pass.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +from app.tasks.chat.streaming.handlers.tool_output_frame import ( + emit_tool_output_available_frame, +) + + +@dataclass +class ToolCompletionEmissionContext: + """Streaming service, tool output, and ids for completion frames.""" + + tool_name: str + tool_call_id: str + tool_output: Any + streaming_service: Any + content_builder: Any | None + langchain_tool_call_id_holder: dict[str, str | None] + stream_result: Any + langgraph_config: dict[str, Any] + staged_workspace_file_path: str | None + + def emit_tool_output_card(self, payload: Any) -> str: + return emit_tool_output_available_frame( + streaming_service=self.streaming_service, + content_builder=self.content_builder, + langchain_id_holder=self.langchain_tool_call_id_holder, + call_id=self.tool_call_id, + output=payload, + ) diff --git a/surfsense_backend/app/tasks/chat/streaming/handlers/tools/registry.py b/surfsense_backend/app/tasks/chat/streaming/handlers/tools/registry.py new file mode 100644 index 000000000..c0568f870 --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/handlers/tools/registry.py @@ -0,0 +1,88 @@ +"""Resolve thinking and emission modules by tool name.""" + +from __future__ import annotations + +import importlib +from collections.abc import Iterator +from typing import Any + +from app.tasks.chat.streaming.handlers.tools.connector.shared.tool_names import ( + SHARED_CONNECTOR_TOOLS, +) +from app.tasks.chat.streaming.handlers.tools.deliverables.tool_names import ( + DELIVERABLE_TOOLS, +) +from app.tasks.chat.streaming.handlers.tools.emission_context import ( + ToolCompletionEmissionContext, +) +from app.tasks.chat.streaming.handlers.tools.filesystem.tool_names import ( + FILESYSTEM_TOOLS, +) +from app.tasks.chat.streaming.handlers.tools.shared.model import ( + ToolStartThinking, +) + +_BASE = "app.tasks.chat.streaming.handlers.tools" +_CONNECTOR_SHARED = "connector.shared" + +_THINKING_ALIAS: dict[str, str] = { + "execute_code": "filesystem.execute", +} +_EMISSION_ALIAS: dict[str, str] = { + "edit_file": "filesystem.write_file", + "execute_code": "filesystem.execute", +} + + +def _thinking_module(tool_name: str) -> str: + if tool_name in SHARED_CONNECTOR_TOOLS: + return _CONNECTOR_SHARED + if tool_name in FILESYSTEM_TOOLS: + return f"filesystem.{tool_name}" + if tool_name in DELIVERABLE_TOOLS: + return f"deliverables.{tool_name}" + return _THINKING_ALIAS.get(tool_name, tool_name) + + +def _emission_module(tool_name: str) -> str: + if tool_name in _EMISSION_ALIAS: + return _EMISSION_ALIAS[tool_name] + if tool_name in SHARED_CONNECTOR_TOOLS: + return _CONNECTOR_SHARED + if tool_name in DELIVERABLE_TOOLS: + return f"deliverables.{tool_name}" + if tool_name in FILESYSTEM_TOOLS: + return f"filesystem.{tool_name}" + return tool_name + + +def _import_thinking(tool_name: str): + try: + return importlib.import_module(f"{_BASE}.{_thinking_module(tool_name)}.thinking") + except ModuleNotFoundError: + return importlib.import_module(f"{_BASE}.default.thinking") + + +def _import_emission(tool_name: str): + try: + return importlib.import_module(f"{_BASE}.{_emission_module(tool_name)}.emission") + except ModuleNotFoundError: + return importlib.import_module(f"{_BASE}.default.emission") + + +def resolve_tool_start_thinking(tool_name: str, tool_input: Any) -> ToolStartThinking: + return _import_thinking(tool_name).resolve_start_thinking(tool_name, tool_input) + + +def resolve_tool_completed_thinking_step( + tool_name: str, tool_output: Any, last_items: list[str] +) -> tuple[str, list[str]]: + return _import_thinking(tool_name).resolve_completed_thinking( + tool_name, tool_output, last_items + ) + + +def iter_tool_completion_emission_frames( + ctx: ToolCompletionEmissionContext, +) -> Iterator[str]: + yield from _import_emission(ctx.tool_name).iter_completion_emission_frames(ctx) diff --git a/surfsense_backend/app/tasks/chat/streaming/handlers/tools/shared/__init__.py b/surfsense_backend/app/tasks/chat/streaming/handlers/tools/shared/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/surfsense_backend/app/tasks/chat/streaming/handlers/tools/shared/model.py b/surfsense_backend/app/tasks/chat/streaming/handlers/tools/shared/model.py new file mode 100644 index 000000000..047a84374 --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/handlers/tools/shared/model.py @@ -0,0 +1,12 @@ +"""In-progress thinking-step title and bullet lines.""" + +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True, slots=True) +class ToolStartThinking: + title: str + items: list[str] + include_items_on_frame: bool = True