Add filesystem tool streaming handlers for chat runs.

This commit is contained in:
CREDO23 2026-05-06 20:08:48 +02:00
parent 1392abf5b1
commit a322eedaa1
30 changed files with 507 additions and 0 deletions

View file

@ -0,0 +1,27 @@
"""edit_file: thinking-step copy."""
from __future__ import annotations
from typing import Any
from app.tasks.chat.streaming.handlers.tools.filesystem.shared.tool_input import (
as_tool_input_dict,
truncate_path,
)
from app.tasks.chat.streaming.handlers.tools.shared.model import (
ToolStartThinking,
)
def resolve_start_thinking(tool_name: str, tool_input: Any) -> ToolStartThinking:
del tool_name
d = as_tool_input_dict(tool_input)
fp = d.get("file_path", "") if isinstance(tool_input, dict) else str(tool_input)
return ToolStartThinking(title="Editing file", items=[truncate_path(fp)])
def resolve_completed_thinking(
tool_name: str, tool_output: Any, last_items: list[str],
) -> tuple[str, list[str]]:
del tool_output, tool_name
return ("Editing file", last_items)

View file

@ -0,0 +1,40 @@
"""execute: exit code, stdout, sandbox file hints."""
from __future__ import annotations
import re
from collections.abc import Iterator
from app.tasks.chat.streaming.handlers.tools.emission_context import (
ToolCompletionEmissionContext,
)
def iter_completion_emission_frames(
ctx: ToolCompletionEmissionContext,
) -> Iterator[str]:
out = ctx.tool_output
raw_text = out.get("result", "") if isinstance(out, dict) else str(out)
exit_code: int | None = None
output_text = raw_text
m = re.match(r"^Exit code:\s*(\d+)", raw_text)
if m:
exit_code = int(m.group(1))
om = re.search(r"\nOutput:\n([\s\S]*)", raw_text)
output_text = om.group(1) if om else ""
thread_id_str = ctx.langgraph_config.get("configurable", {}).get("thread_id", "")
for sf_match in re.finditer(
r"^SANDBOX_FILE:\s*(.+)$", output_text, re.MULTILINE
):
fpath = sf_match.group(1).strip()
if fpath and fpath not in ctx.stream_result.sandbox_files:
ctx.stream_result.sandbox_files.append(fpath)
yield ctx.emit_tool_output_card(
{
"exit_code": exit_code,
"output": output_text,
"thread_id": thread_id_str,
},
)

View file

@ -0,0 +1,42 @@
"""execute: sandbox command thinking + completion lines."""
from __future__ import annotations
import re
from typing import Any
from app.tasks.chat.streaming.handlers.tools.filesystem.shared.tool_input import (
as_tool_input_dict,
)
from app.tasks.chat.streaming.handlers.tools.shared.model import (
ToolStartThinking,
)
def resolve_start_thinking(tool_name: str, tool_input: Any) -> ToolStartThinking:
del tool_name
d = as_tool_input_dict(tool_input)
cmd = d.get("command", "") if isinstance(tool_input, dict) else str(tool_input)
display_cmd = cmd[:80] + ("" if len(cmd) > 80 else "")
return ToolStartThinking(title="Running command", items=[f"$ {display_cmd}"])
def resolve_completed_thinking(
tool_name: str, tool_output: Any, last_items: list[str],
) -> tuple[str, list[str]]:
del tool_name
items = last_items
raw_text = (
tool_output.get("result", "")
if isinstance(tool_output, dict)
else str(tool_output)
)
m = re.match(r"^Exit code:\s*(\d+)", raw_text)
exit_code_val = int(m.group(1)) if m else None
if exit_code_val is not None and exit_code_val == 0:
completed = [*items, "Completed successfully"]
elif exit_code_val is not None:
completed = [*items, f"Exit code: {exit_code_val}"]
else:
completed = [*items, "Finished"]
return ("Running command", completed)

View file

@ -0,0 +1,27 @@
"""glob: thinking-step copy."""
from __future__ import annotations
from typing import Any
from app.tasks.chat.streaming.handlers.tools.filesystem.shared.tool_input import (
as_tool_input_dict,
)
from app.tasks.chat.streaming.handlers.tools.shared.model import (
ToolStartThinking,
)
def resolve_start_thinking(tool_name: str, tool_input: Any) -> ToolStartThinking:
del tool_name
d = as_tool_input_dict(tool_input)
pat = d.get("pattern", "") if isinstance(tool_input, dict) else str(tool_input)
base = d.get("path", "/") if isinstance(tool_input, dict) else "/"
return ToolStartThinking(title="Searching files", items=[f"{pat} in {base}"])
def resolve_completed_thinking(
tool_name: str, tool_output: Any, last_items: list[str],
) -> tuple[str, list[str]]:
del tool_output, tool_name
return ("Searching files", last_items)

View file

@ -0,0 +1,31 @@
"""grep: thinking-step copy."""
from __future__ import annotations
from typing import Any
from app.tasks.chat.streaming.handlers.tools.filesystem.shared.tool_input import (
as_tool_input_dict,
)
from app.tasks.chat.streaming.handlers.tools.shared.model import (
ToolStartThinking,
)
def resolve_start_thinking(tool_name: str, tool_input: Any) -> ToolStartThinking:
del tool_name
d = as_tool_input_dict(tool_input)
pat = d.get("pattern", "") if isinstance(tool_input, dict) else str(tool_input)
grep_path = d.get("path", "") if isinstance(tool_input, dict) else ""
display_pat = pat[:60] + ("" if len(pat) > 60 else "")
return ToolStartThinking(
title="Searching content",
items=[f'"{display_pat}"' + (f" in {grep_path}" if grep_path else "")],
)
def resolve_completed_thinking(
tool_name: str, tool_output: Any, last_items: list[str],
) -> tuple[str, list[str]]:
del tool_output, tool_name
return ("Searching content", last_items)

View file

@ -0,0 +1,59 @@
"""ls: thinking-step copy for directory listing."""
from __future__ import annotations
import ast
from typing import Any
from app.tasks.chat.streaming.handlers.tools.shared.model import (
ToolStartThinking,
)
def resolve_start_thinking(tool_name: str, tool_input: Any) -> ToolStartThinking:
del tool_name
if isinstance(tool_input, dict):
path = tool_input.get("path", "/")
else:
path = str(tool_input)
return ToolStartThinking(title="Listing files", items=[path])
def resolve_completed_thinking(
tool_name: str, tool_output: Any, last_items: list[str],
) -> tuple[str, list[str]]:
del tool_name
if isinstance(tool_output, dict):
ls_output = tool_output.get("result", "")
elif isinstance(tool_output, str):
ls_output = tool_output
else:
ls_output = str(tool_output) if tool_output else ""
file_names: list[str] = []
if ls_output:
paths: list[str] = []
try:
parsed = ast.literal_eval(ls_output)
if isinstance(parsed, list):
paths = [str(p) for p in parsed]
except (ValueError, SyntaxError):
paths = [
line.strip()
for line in ls_output.strip().split("\n")
if line.strip()
]
for p in paths:
name = p.rstrip("/").split("/")[-1]
if name and len(name) <= 40:
file_names.append(name)
elif name:
file_names.append(name[:37] + "...")
if file_names:
if len(file_names) <= 5:
completed = [f"[{name}]" for name in file_names]
else:
completed = [f"[{name}]" for name in file_names[:4]]
completed.append(f"(+{len(file_names) - 4} more)")
else:
completed = ["No files found"]
return ("Listing files", completed)

View file

@ -0,0 +1,27 @@
"""mkdir: thinking-step copy."""
from __future__ import annotations
from typing import Any
from app.tasks.chat.streaming.handlers.tools.filesystem.shared.tool_input import (
as_tool_input_dict,
)
from app.tasks.chat.streaming.handlers.tools.shared.model import (
ToolStartThinking,
)
def resolve_start_thinking(tool_name: str, tool_input: Any) -> ToolStartThinking:
del tool_name
d = as_tool_input_dict(tool_input)
p = d.get("path", "") if isinstance(tool_input, dict) else str(tool_input)
display = p if len(p) <= 80 else "" + p[-77:]
return ToolStartThinking(title="Creating folder", items=[display] if display else [])
def resolve_completed_thinking(
tool_name: str, tool_output: Any, last_items: list[str],
) -> tuple[str, list[str]]:
del tool_output, tool_name
return ("Creating folder", last_items)

View file

@ -0,0 +1,33 @@
"""move_file: thinking-step copy."""
from __future__ import annotations
from typing import Any
from app.tasks.chat.streaming.handlers.tools.filesystem.shared.tool_input import (
as_tool_input_dict,
truncate_middle,
)
from app.tasks.chat.streaming.handlers.tools.shared.model import (
ToolStartThinking,
)
def resolve_start_thinking(tool_name: str, tool_input: Any) -> ToolStartThinking:
del tool_name
d = as_tool_input_dict(tool_input)
src = d.get("source_path", "") if isinstance(tool_input, dict) else ""
dst = d.get("destination_path", "") if isinstance(tool_input, dict) else ""
display_src = truncate_middle(src, max_len=60)
display_dst = truncate_middle(dst, max_len=60)
return ToolStartThinking(
title="Moving file",
items=[f"{display_src}{display_dst}"] if src or dst else [],
)
def resolve_completed_thinking(
tool_name: str, tool_output: Any, last_items: list[str],
) -> tuple[str, list[str]]:
del tool_output, tool_name
return ("Moving file", last_items)

View file

@ -0,0 +1,27 @@
"""read_file: thinking-step copy."""
from __future__ import annotations
from typing import Any
from app.tasks.chat.streaming.handlers.tools.filesystem.shared.tool_input import (
as_tool_input_dict,
truncate_path,
)
from app.tasks.chat.streaming.handlers.tools.shared.model import (
ToolStartThinking,
)
def resolve_start_thinking(tool_name: str, tool_input: Any) -> ToolStartThinking:
del tool_name
d = as_tool_input_dict(tool_input)
fp = d.get("file_path", "") if isinstance(tool_input, dict) else str(tool_input)
return ToolStartThinking(title="Reading file", items=[truncate_path(fp)])
def resolve_completed_thinking(
tool_name: str, tool_output: Any, last_items: list[str],
) -> tuple[str, list[str]]:
del tool_output, tool_name
return ("Reading file", last_items)

View file

@ -0,0 +1,28 @@
"""rm: thinking-step copy."""
from __future__ import annotations
from typing import Any
from app.tasks.chat.streaming.handlers.tools.filesystem.shared.tool_input import (
as_tool_input_dict,
truncate_path,
)
from app.tasks.chat.streaming.handlers.tools.shared.model import (
ToolStartThinking,
)
def resolve_start_thinking(tool_name: str, tool_input: Any) -> ToolStartThinking:
del tool_name
d = as_tool_input_dict(tool_input)
rm_path = d.get("path", "") if isinstance(tool_input, dict) else str(tool_input)
display = truncate_path(rm_path)
return ToolStartThinking(title="Deleting file", items=[display] if display else [])
def resolve_completed_thinking(
tool_name: str, tool_output: Any, last_items: list[str],
) -> tuple[str, list[str]]:
del tool_output, tool_name
return ("Deleting file", last_items)

View file

@ -0,0 +1,27 @@
"""rmdir: thinking-step copy."""
from __future__ import annotations
from typing import Any
from app.tasks.chat.streaming.handlers.tools.filesystem.shared.tool_input import (
as_tool_input_dict,
)
from app.tasks.chat.streaming.handlers.tools.shared.model import (
ToolStartThinking,
)
def resolve_start_thinking(tool_name: str, tool_input: Any) -> ToolStartThinking:
del tool_name
d = as_tool_input_dict(tool_input)
p = d.get("path", "") if isinstance(tool_input, dict) else str(tool_input)
display = p if len(p) <= 80 else "" + p[-77:]
return ToolStartThinking(title="Deleting folder", items=[display] if display else [])
def resolve_completed_thinking(
tool_name: str, tool_output: Any, last_items: list[str],
) -> tuple[str, list[str]]:
del tool_output, tool_name
return ("Deleting folder", last_items)

View file

@ -0,0 +1,17 @@
"""Tool-call args + display truncation for filesystem thinking modules."""
from __future__ import annotations
from typing import Any
def as_tool_input_dict(tool_input: Any) -> dict[str, Any]:
return tool_input if isinstance(tool_input, dict) else {}
def truncate_path(fp: str, *, max_len: int = 80) -> str:
return fp if len(fp) <= max_len else "" + fp[-(max_len - 3) :]
def truncate_middle(s: str, *, max_len: int = 60) -> str:
return s if len(s) <= max_len else "" + s[-(max_len - 3) :]

View file

@ -0,0 +1,18 @@
from __future__ import annotations
FILESYSTEM_TOOLS: frozenset[str] = frozenset(
{
"read_file",
"glob",
"grep",
"ls",
"mkdir",
"move_file",
"rm",
"rmdir",
"write_todos",
"write_file",
"edit_file",
"execute",
}
)

View file

@ -0,0 +1,43 @@
"""write_file: path + status envelope on the tool-output card."""
from __future__ import annotations
from collections.abc import Iterator
from app.tasks.chat.streaming.handlers.tools.emission_context import (
ToolCompletionEmissionContext,
)
from app.tasks.chat.streaming.helpers.tool_output import (
extract_resolved_file_path,
tool_output_has_error,
tool_output_to_text,
)
def iter_completion_emission_frames(
ctx: ToolCompletionEmissionContext,
) -> Iterator[str]:
resolved_path = extract_resolved_file_path(
tool_name=ctx.tool_name,
tool_output=ctx.tool_output,
tool_input={"file_path": ctx.staged_workspace_file_path}
if ctx.staged_workspace_file_path
else None,
)
result_text = tool_output_to_text(ctx.tool_output)
if tool_output_has_error(ctx.tool_output):
yield ctx.emit_tool_output_card(
{
"status": "error",
"error": result_text,
"path": resolved_path,
},
)
else:
yield ctx.emit_tool_output_card(
{
"status": "completed",
"path": resolved_path,
"result": result_text,
},
)

View file

@ -0,0 +1,27 @@
"""write_file: thinking-step copy."""
from __future__ import annotations
from typing import Any
from app.tasks.chat.streaming.handlers.tools.filesystem.shared.tool_input import (
as_tool_input_dict,
truncate_path,
)
from app.tasks.chat.streaming.handlers.tools.shared.model import (
ToolStartThinking,
)
def resolve_start_thinking(tool_name: str, tool_input: Any) -> ToolStartThinking:
del tool_name
d = as_tool_input_dict(tool_input)
fp = d.get("file_path", "") if isinstance(tool_input, dict) else str(tool_input)
return ToolStartThinking(title="Writing file", items=[truncate_path(fp)])
def resolve_completed_thinking(
tool_name: str, tool_output: Any, last_items: list[str],
) -> tuple[str, list[str]]:
del tool_output, tool_name
return ("Writing file", last_items)

View file

@ -0,0 +1,34 @@
"""write_todos: thinking-step copy."""
from __future__ import annotations
from typing import Any
from app.tasks.chat.streaming.handlers.tools.filesystem.shared.tool_input import (
as_tool_input_dict,
)
from app.tasks.chat.streaming.handlers.tools.shared.model import (
ToolStartThinking,
)
def resolve_start_thinking(tool_name: str, tool_input: Any) -> ToolStartThinking:
del tool_name
d = as_tool_input_dict(tool_input)
todos = d.get("todos", []) if isinstance(tool_input, dict) else []
todo_count = len(todos) if isinstance(todos, list) else 0
return ToolStartThinking(
title="Planning tasks",
items=(
[f"{todo_count} task{'s' if todo_count != 1 else ''}"]
if todo_count
else []
),
)
def resolve_completed_thinking(
tool_name: str, tool_output: Any, last_items: list[str],
) -> tuple[str, list[str]]:
del tool_output, tool_name
return ("Planning tasks", last_items)