Add chat tool streaming registry with shared, default, and connector tools.

This commit is contained in:
CREDO23 2026-05-06 20:08:48 +02:00
parent ee16e1d5f9
commit 1392abf5b1
13 changed files with 275 additions and 0 deletions

View file

@ -0,0 +1,23 @@
"""Per-tool streaming: thinking-step and completion emission."""
from __future__ import annotations
from app.tasks.chat.streaming.handlers.tools.emission_context import (
ToolCompletionEmissionContext,
)
from app.tasks.chat.streaming.handlers.tools.registry import (
iter_tool_completion_emission_frames,
resolve_tool_completed_thinking_step,
resolve_tool_start_thinking,
)
from app.tasks.chat.streaming.handlers.tools.shared.model import (
ToolStartThinking,
)
__all__ = [
"ToolCompletionEmissionContext",
"ToolStartThinking",
"iter_tool_completion_emission_frames",
"resolve_tool_completed_thinking_step",
"resolve_tool_start_thinking",
]

View file

@ -0,0 +1,15 @@
from __future__ import annotations
from collections.abc import Iterator
from app.tasks.chat.streaming.handlers.tools.emission_context import (
ToolCompletionEmissionContext,
)
def iter_completion_emission_frames(
ctx: ToolCompletionEmissionContext,
) -> Iterator[str]:
out = ctx.tool_output
payload = out if isinstance(out, dict) else {"result": out}
yield ctx.emit_tool_output_card(payload)

View file

@ -0,0 +1,22 @@
from __future__ import annotations
from typing import Any
from app.tasks.chat.streaming.handlers.tools.default import (
thinking as default_thinking,
)
from app.tasks.chat.streaming.handlers.tools.shared.model import (
ToolStartThinking,
)
def resolve_start_thinking(tool_name: str, tool_input: Any) -> ToolStartThinking:
return default_thinking.resolve_start_thinking(tool_name, tool_input)
def resolve_completed_thinking(
tool_name: str, tool_output: Any, last_items: list[str],
) -> tuple[str, list[str]]:
return default_thinking.resolve_completed_thinking(
tool_name, tool_output, last_items
)

View file

@ -0,0 +1,31 @@
from __future__ import annotations
SHARED_CONNECTOR_TOOLS: frozenset[str] = frozenset(
{
"create_calendar_event",
"create_confluence_page",
"create_dropbox_file",
"create_gmail_draft",
"create_google_drive_file",
"create_jira_issue",
"create_linear_issue",
"create_notion_page",
"create_onedrive_file",
"delete_calendar_event",
"delete_confluence_page",
"delete_dropbox_file",
"delete_google_drive_file",
"delete_jira_issue",
"delete_linear_issue",
"delete_notion_page",
"delete_onedrive_file",
"send_gmail_email",
"trash_gmail_email",
"update_calendar_event",
"update_confluence_page",
"update_gmail_draft",
"update_jira_issue",
"update_linear_issue",
"update_notion_page",
}
)

View file

@ -0,0 +1,3 @@
"""Fallback tool package."""
from __future__ import annotations

View file

@ -0,0 +1,24 @@
"""Default tool-output card and a short completion terminal line."""
from __future__ import annotations
from collections.abc import Iterator
from app.tasks.chat.streaming.handlers.tools.emission_context import (
ToolCompletionEmissionContext,
)
def iter_completion_emission_frames(
ctx: ToolCompletionEmissionContext,
) -> Iterator[str]:
yield ctx.emit_tool_output_card(
{
"status": "completed",
"result_length": len(str(ctx.tool_output)),
},
)
yield ctx.streaming_service.format_terminal_info(
f"Tool {ctx.tool_name} completed",
"success",
)

View file

@ -0,0 +1,23 @@
"""Fallback thinking-step copy for unknown tools and connectors without custom UI."""
from __future__ import annotations
from typing import Any
from app.tasks.chat.streaming.handlers.tools.shared.model import (
ToolStartThinking,
)
def resolve_start_thinking(tool_name: str, tool_input: Any) -> ToolStartThinking:
del tool_input
title = tool_name.replace("_", " ").strip().capitalize() or tool_name
return ToolStartThinking(title=title, items=[], include_items_on_frame=False)
def resolve_completed_thinking(
tool_name: str, tool_output: Any, last_items: list[str]
) -> tuple[str, list[str]]:
del tool_output
title = tool_name.replace("_", " ").strip().capitalize() or tool_name
return (title, last_items)

View file

@ -0,0 +1,34 @@
"""Context for one tool-completion emission pass."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from app.tasks.chat.streaming.handlers.tool_output_frame import (
emit_tool_output_available_frame,
)
@dataclass
class ToolCompletionEmissionContext:
"""Streaming service, tool output, and ids for completion frames."""
tool_name: str
tool_call_id: str
tool_output: Any
streaming_service: Any
content_builder: Any | None
langchain_tool_call_id_holder: dict[str, str | None]
stream_result: Any
langgraph_config: dict[str, Any]
staged_workspace_file_path: str | None
def emit_tool_output_card(self, payload: Any) -> str:
return emit_tool_output_available_frame(
streaming_service=self.streaming_service,
content_builder=self.content_builder,
langchain_id_holder=self.langchain_tool_call_id_holder,
call_id=self.tool_call_id,
output=payload,
)

View file

@ -0,0 +1,88 @@
"""Resolve thinking and emission modules by tool name."""
from __future__ import annotations
import importlib
from collections.abc import Iterator
from typing import Any
from app.tasks.chat.streaming.handlers.tools.connector.shared.tool_names import (
SHARED_CONNECTOR_TOOLS,
)
from app.tasks.chat.streaming.handlers.tools.deliverables.tool_names import (
DELIVERABLE_TOOLS,
)
from app.tasks.chat.streaming.handlers.tools.emission_context import (
ToolCompletionEmissionContext,
)
from app.tasks.chat.streaming.handlers.tools.filesystem.tool_names import (
FILESYSTEM_TOOLS,
)
from app.tasks.chat.streaming.handlers.tools.shared.model import (
ToolStartThinking,
)
_BASE = "app.tasks.chat.streaming.handlers.tools"
_CONNECTOR_SHARED = "connector.shared"
_THINKING_ALIAS: dict[str, str] = {
"execute_code": "filesystem.execute",
}
_EMISSION_ALIAS: dict[str, str] = {
"edit_file": "filesystem.write_file",
"execute_code": "filesystem.execute",
}
def _thinking_module(tool_name: str) -> str:
if tool_name in SHARED_CONNECTOR_TOOLS:
return _CONNECTOR_SHARED
if tool_name in FILESYSTEM_TOOLS:
return f"filesystem.{tool_name}"
if tool_name in DELIVERABLE_TOOLS:
return f"deliverables.{tool_name}"
return _THINKING_ALIAS.get(tool_name, tool_name)
def _emission_module(tool_name: str) -> str:
if tool_name in _EMISSION_ALIAS:
return _EMISSION_ALIAS[tool_name]
if tool_name in SHARED_CONNECTOR_TOOLS:
return _CONNECTOR_SHARED
if tool_name in DELIVERABLE_TOOLS:
return f"deliverables.{tool_name}"
if tool_name in FILESYSTEM_TOOLS:
return f"filesystem.{tool_name}"
return tool_name
def _import_thinking(tool_name: str):
try:
return importlib.import_module(f"{_BASE}.{_thinking_module(tool_name)}.thinking")
except ModuleNotFoundError:
return importlib.import_module(f"{_BASE}.default.thinking")
def _import_emission(tool_name: str):
try:
return importlib.import_module(f"{_BASE}.{_emission_module(tool_name)}.emission")
except ModuleNotFoundError:
return importlib.import_module(f"{_BASE}.default.emission")
def resolve_tool_start_thinking(tool_name: str, tool_input: Any) -> ToolStartThinking:
return _import_thinking(tool_name).resolve_start_thinking(tool_name, tool_input)
def resolve_tool_completed_thinking_step(
tool_name: str, tool_output: Any, last_items: list[str]
) -> tuple[str, list[str]]:
return _import_thinking(tool_name).resolve_completed_thinking(
tool_name, tool_output, last_items
)
def iter_tool_completion_emission_frames(
ctx: ToolCompletionEmissionContext,
) -> Iterator[str]:
yield from _import_emission(ctx.tool_name).iter_completion_emission_frames(ctx)

View file

@ -0,0 +1,12 @@
"""In-progress thinking-step title and bullet lines."""
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class ToolStartThinking:
title: str
items: list[str]
include_items_on_frame: bool = True