multi_agent_chat/filesystem: extract dedicated FS middleware package

This commit is contained in:
CREDO23 2026-05-12 10:43:45 +02:00
parent df2afed18d
commit 3adfa37565
61 changed files with 2689 additions and 2 deletions

View file

@ -0,0 +1,11 @@
"""SurfSense filesystem middleware (multi-agent flavour)."""
from __future__ import annotations
from .index import build_filesystem_mw
from .middleware import SurfSenseFilesystemMiddleware
__all__ = [
"SurfSenseFilesystemMiddleware",
"build_filesystem_mw",
]

View file

@ -1,11 +1,12 @@
"""SurfSense filesystem tools/middleware."""
"""Public composition factory for the filesystem middleware."""
from __future__ import annotations
from typing import Any
from app.agents.new_chat.filesystem_selection import FilesystemMode
from app.agents.new_chat.middleware import SurfSenseFilesystemMiddleware
from .middleware import SurfSenseFilesystemMiddleware
def build_filesystem_mw(

View file

@ -0,0 +1,33 @@
"""SurfSense filesystem middleware: class + focused-responsibility helpers."""
from __future__ import annotations
from .index import (
SurfSenseFilesystemMiddleware,
check_cloud_write_namespace,
current_cwd,
default_cwd,
get_contract_suggested_path,
is_cloud,
normalize_local_mount_path,
resolve_list_target_path,
resolve_move_target_path,
resolve_relative,
resolve_write_target_path,
run_async_blocking,
)
__all__ = [
"SurfSenseFilesystemMiddleware",
"check_cloud_write_namespace",
"current_cwd",
"default_cwd",
"get_contract_suggested_path",
"is_cloud",
"normalize_local_mount_path",
"resolve_list_target_path",
"resolve_move_target_path",
"resolve_relative",
"resolve_write_target_path",
"run_async_blocking",
]

View file

@ -0,0 +1,22 @@
"""Sync/async dispatcher: drive an async tool body from a sync entry-point."""
from __future__ import annotations
import asyncio
from typing import Any
def run_async_blocking(coro: Any) -> Any:
"""Run ``coro`` to completion, blocking the current thread.
Returns an error string instead of raising if the current thread is
already inside a running event loop keeps sync tool entry-points
safe to call from any context.
"""
try:
loop = asyncio.get_running_loop()
if loop.is_running():
return "Error: sync filesystem operation not supported inside an active event loop."
except RuntimeError:
pass
return asyncio.run(coro)

View file

@ -0,0 +1,32 @@
"""Public surface of the middleware package: class + helpers used by tool factories."""
from __future__ import annotations
from .async_dispatch import run_async_blocking
from .middleware import SurfSenseFilesystemMiddleware
from .mode import default_cwd, is_cloud
from .namespace_policy import check_cloud_write_namespace
from .path_resolution import (
current_cwd,
get_contract_suggested_path,
normalize_local_mount_path,
resolve_list_target_path,
resolve_move_target_path,
resolve_relative,
resolve_write_target_path,
)
__all__ = [
"SurfSenseFilesystemMiddleware",
"check_cloud_write_namespace",
"current_cwd",
"default_cwd",
"get_contract_suggested_path",
"is_cloud",
"normalize_local_mount_path",
"resolve_list_target_path",
"resolve_move_target_path",
"resolve_relative",
"resolve_write_target_path",
"run_async_blocking",
]

View file

@ -0,0 +1,97 @@
"""``SurfSenseFilesystemMiddleware``: per-session state + tool registration."""
from __future__ import annotations
from typing import Any
from deepagents import FilesystemMiddleware
from langchain_core.tools import BaseTool
from app.agents.new_chat.filesystem_selection import FilesystemMode
from app.agents.new_chat.filesystem_state import SurfSenseFilesystemState
from app.agents.new_chat.sandbox import is_sandbox_enabled
from ..system_prompt import build_system_prompt
from ..tools import (
create_cd_tool,
create_edit_file_tool,
create_execute_code_tool,
create_list_tree_tool,
create_ls_tool,
create_mkdir_tool,
create_move_file_tool,
create_pwd_tool,
create_read_file_tool,
create_rm_tool,
create_rmdir_tool,
create_write_file_tool,
)
from ..tools.glob.description import select_description as glob_description
from ..tools.grep.description import select_description as grep_description
class SurfSenseFilesystemMiddleware(FilesystemMiddleware):
"""SurfSense-specific filesystem middleware (cloud + desktop)."""
state_schema = SurfSenseFilesystemState
def __init__(
self,
*,
backend: Any = None,
filesystem_mode: FilesystemMode = FilesystemMode.CLOUD,
search_space_id: int | None = None,
created_by_id: str | None = None,
thread_id: int | str | None = None,
tool_token_limit_before_evict: int | None = 20000,
) -> None:
self._filesystem_mode = filesystem_mode
self._search_space_id = search_space_id
self._created_by_id = created_by_id
self._thread_id = thread_id
self._sandbox_available = is_sandbox_enabled() and thread_id is not None
system_prompt = build_system_prompt(
filesystem_mode,
sandbox_available=self._sandbox_available,
)
super().__init__(
backend=backend,
system_prompt=system_prompt,
tool_token_limit_before_evict=tool_token_limit_before_evict,
)
self.tools = [t for t in self.tools if t.name != "execute"]
self.tools.append(create_mkdir_tool(self))
self.tools.append(create_cd_tool(self))
self.tools.append(create_pwd_tool(self))
self.tools.append(create_move_file_tool(self))
self.tools.append(create_rm_tool(self))
self.tools.append(create_rmdir_tool(self))
self.tools.append(create_list_tree_tool(self))
if self._sandbox_available:
self.tools.append(create_execute_code_tool(self))
# ----------------------------------------- base-class tool overrides
def _create_ls_tool(self) -> BaseTool:
return create_ls_tool(self)
def _create_read_file_tool(self) -> BaseTool:
return create_read_file_tool(self)
def _create_write_file_tool(self) -> BaseTool:
return create_write_file_tool(self)
def _create_edit_file_tool(self) -> BaseTool:
return create_edit_file_tool(self)
def _create_glob_tool(self) -> BaseTool:
tool = super()._create_glob_tool()
tool.description = glob_description(self._filesystem_mode).rstrip()
return tool
def _create_grep_tool(self) -> BaseTool:
tool = super()._create_grep_tool()
tool.description = grep_description(self._filesystem_mode).rstrip()
return tool

View file

@ -0,0 +1,15 @@
"""Mode-derived facts: ``is_cloud`` and ``default_cwd``."""
from __future__ import annotations
from app.agents.new_chat.filesystem_selection import FilesystemMode
from app.agents.new_chat.path_resolver import DOCUMENTS_ROOT
def is_cloud(mode: FilesystemMode) -> bool:
return mode == FilesystemMode.CLOUD
def default_cwd(mode: FilesystemMode) -> str:
"""``/documents`` on cloud; ``/`` on desktop (mounts are children of ``/``)."""
return DOCUMENTS_ROOT if is_cloud(mode) else "/"

View file

@ -0,0 +1,51 @@
"""Cloud-only write namespace policy.
A write is allowed iff it lands under ``/documents/`` OR its basename uses
the ``temp_`` scratch prefix. The anonymous uploaded document is read-only
even when its path is under ``/documents/``.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from langchain.tools import ToolRuntime
from app.agents.new_chat.filesystem_state import SurfSenseFilesystemState
from app.agents.new_chat.path_resolver import DOCUMENTS_ROOT
from ..shared.paths import TEMP_PREFIX, basename
from .mode import is_cloud
if TYPE_CHECKING:
from .middleware import SurfSenseFilesystemMiddleware
def check_cloud_write_namespace(
mw: "SurfSenseFilesystemMiddleware",
path: str,
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> str | None:
"""Return an error string if cloud writes to ``path`` are not allowed.
Order matters:
1. Reject writes to the anonymous read-only doc.
2. Allow ``/documents/*``.
3. Allow ``temp_*`` basename anywhere.
4. Reject everything else.
"""
if not is_cloud(mw._filesystem_mode):
return None
anon = runtime.state.get("kb_anon_doc") or {}
if isinstance(anon, dict):
anon_path = str(anon.get("path") or "")
if anon_path and anon_path == path:
return "Error: the anonymous uploaded document is read-only."
if path.startswith(DOCUMENTS_ROOT + "/") or path == DOCUMENTS_ROOT:
return None
if basename(path).startswith(TEMP_PREFIX):
return None
return (
"Error: cloud writes must target /documents/<...> or use a 'temp_' "
f"basename for scratch (got '{path}')."
)

View file

@ -0,0 +1,174 @@
"""Resolve user-supplied paths to absolute paths the backends accept."""
from __future__ import annotations
import posixpath
from typing import TYPE_CHECKING
from langchain.tools import ToolRuntime
from app.agents.new_chat.filesystem_selection import FilesystemMode
from app.agents.new_chat.filesystem_state import SurfSenseFilesystemState
from app.agents.new_chat.middleware.multi_root_local_folder_backend import (
MultiRootLocalFolderBackend,
)
from ..shared.paths import (
extract_mount_from_path,
local_parent_path,
normalize_absolute_path,
)
from .mode import default_cwd
if TYPE_CHECKING:
from .middleware import SurfSenseFilesystemMiddleware
def current_cwd(
mw: "SurfSenseFilesystemMiddleware",
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> str:
cwd = runtime.state.get("cwd") if hasattr(runtime, "state") else None
if isinstance(cwd, str) and cwd.startswith("/"):
return cwd
return default_cwd(mw._filesystem_mode)
def get_contract_suggested_path(
mw: "SurfSenseFilesystemMiddleware",
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> str:
"""Read the planner's suggested write path; otherwise default to ``notes.md``."""
contract = runtime.state.get("file_operation_contract") or {}
suggested = contract.get("suggested_path")
if isinstance(suggested, str) and suggested.strip():
return normalize_absolute_path(suggested)
return default_cwd(mw._filesystem_mode).rstrip("/") + "/notes.md"
def resolve_relative(
mw: "SurfSenseFilesystemMiddleware",
path: str,
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> str:
"""Resolve ``path`` against cwd (no-op if already absolute)."""
candidate = path.strip()
if not candidate:
return current_cwd(mw, runtime)
if candidate.startswith("/"):
return normalize_absolute_path(candidate)
cwd = current_cwd(mw, runtime)
joined = posixpath.normpath(posixpath.join(cwd, candidate))
return normalize_absolute_path(joined)
def resolve_write_target_path(
mw: "SurfSenseFilesystemMiddleware",
file_path: str,
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> str:
"""Empty → contract suggestion; desktop → mount-prefix; cloud → cwd-relative."""
candidate = file_path.strip()
if not candidate:
return get_contract_suggested_path(mw, runtime)
if mw._filesystem_mode == FilesystemMode.DESKTOP_LOCAL_FOLDER:
return normalize_local_mount_path(mw, candidate, runtime)
return resolve_relative(mw, candidate, runtime)
def resolve_move_target_path(
mw: "SurfSenseFilesystemMiddleware",
file_path: str,
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> str:
"""Empty → empty (caller validates); desktop → mount-prefix; cloud → cwd-relative."""
candidate = file_path.strip()
if not candidate:
return ""
if mw._filesystem_mode == FilesystemMode.DESKTOP_LOCAL_FOLDER:
return normalize_local_mount_path(mw, candidate, runtime)
return resolve_relative(mw, candidate, runtime)
def resolve_list_target_path(
mw: "SurfSenseFilesystemMiddleware",
path: str,
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> str:
"""Root stays root; desktop → mount-prefix; cloud → cwd-relative."""
candidate = path.strip() or current_cwd(mw, runtime)
if candidate == "/":
return "/"
if mw._filesystem_mode == FilesystemMode.DESKTOP_LOCAL_FOLDER:
return normalize_local_mount_path(mw, candidate, runtime)
return resolve_relative(mw, candidate, runtime)
def normalize_local_mount_path(
mw: "SurfSenseFilesystemMiddleware",
candidate: str,
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> str:
"""Desktop only: prepend a mount prefix when the path doesn't already have one.
Resolution order: explicit mount prefix single available mount
contract-suggested mount mount where the path exists mount where the
parent exists backend default mount.
"""
normalized = normalize_absolute_path(candidate)
backend = mw._get_backend(runtime)
if not isinstance(backend, MultiRootLocalFolderBackend):
return normalized
mounts = backend.list_mounts()
explicit_mount = extract_mount_from_path(normalized, mounts)
if explicit_mount:
return normalized
if len(mounts) == 1:
return f"/{mounts[0]}{normalized}"
suggested_mount: str | None = None
contract = runtime.state.get("file_operation_contract") or {}
suggested_path = contract.get("suggested_path")
if isinstance(suggested_path, str) and suggested_path.strip():
normalized_suggested = normalize_absolute_path(suggested_path)
suggested_mount = extract_mount_from_path(normalized_suggested, mounts)
matching_mounts = [
mount
for mount in mounts
if _path_exists_under_mount(backend, mount, normalized)
]
if len(matching_mounts) == 1:
return f"/{matching_mounts[0]}{normalized}"
parent_path = local_parent_path(normalized)
if parent_path != "/":
parent_matching_mounts = [
mount
for mount in mounts
if _path_exists_under_mount(backend, mount, parent_path)
]
if len(parent_matching_mounts) == 1:
return f"/{parent_matching_mounts[0]}{normalized}"
if suggested_mount:
return f"/{suggested_mount}{normalized}"
return f"/{backend.default_mount()}{normalized}"
def _path_exists_under_mount(
backend: MultiRootLocalFolderBackend,
mount: str,
local_path: str,
) -> bool:
result = backend.list_tree(
f"/{mount}{local_path}",
max_depth=0,
page_size=1,
include_files=True,
include_dirs=True,
)
return not bool(result.get("error"))

View file

@ -0,0 +1,21 @@
"""Stateless utilities shared by the middleware and tool factories."""
from __future__ import annotations
from .paths import (
TEMP_PREFIX,
basename,
extract_mount_from_path,
is_ancestor_of,
local_parent_path,
normalize_absolute_path,
)
__all__ = [
"TEMP_PREFIX",
"basename",
"extract_mount_from_path",
"is_ancestor_of",
"local_parent_path",
"normalize_absolute_path",
]

View file

@ -0,0 +1,51 @@
"""Stateless path utilities shared by the middleware class and tool factories."""
from __future__ import annotations
import re
TEMP_PREFIX = "temp_"
def normalize_absolute_path(candidate: str) -> str:
"""Collapse slashes / backslashes and force an absolute path."""
normalized = re.sub(r"/+", "/", candidate.strip().replace("\\", "/"))
if not normalized:
return "/"
if normalized.startswith("/"):
return normalized
return f"/{normalized.lstrip('/')}"
def extract_mount_from_path(path: str, mounts: tuple[str, ...]) -> str | None:
"""Return the leading mount segment if it's in ``mounts``, else None."""
rel = path.lstrip("/")
if not rel:
return None
mount, _, _ = rel.partition("/")
if mount in mounts:
return mount
return None
def local_parent_path(path: str) -> str:
"""Posix-style parent path (root = ``/``)."""
rel = path.lstrip("/")
if "/" not in rel:
return "/"
parent = rel.rsplit("/", 1)[0].strip("/")
if not parent:
return "/"
return f"/{parent}"
def basename(path: str) -> str:
return path.rsplit("/", 1)[-1]
def is_ancestor_of(candidate: str, target: str) -> bool:
"""True iff ``candidate`` is a strict-or-equal ancestor of ``target``."""
if candidate == "/":
return target != "/"
cand = candidate.rstrip("/")
return target == cand or target.startswith(cand + "/")

View file

@ -0,0 +1,7 @@
"""Filesystem-middleware system prompt (cloud + desktop modes)."""
from __future__ import annotations
from .index import build_system_prompt
__all__ = ["build_system_prompt"]

View file

@ -0,0 +1,71 @@
"""Cloud-mode filesystem system prompt body."""
from __future__ import annotations
BODY = """
## Filesystem Tools
All file paths must start with `/`. Relative paths resolve against the
current working directory (`cwd`, default `/documents`).
- ls(path, offset=0, limit=200): list files and directories at the given path.
- read_file(path, offset, limit): read a file (paginated) from the filesystem.
- write_file(path, content): create a new text file in the workspace.
- edit_file(path, old, new): exact string-replacement edit (lazy-loads KB
documents on first edit).
- glob(pattern, path): find files matching a glob pattern.
- grep(pattern, path, glob): substring search across files.
- mkdir(path): create a folder under `/documents/` (committed at end of turn).
- cd(path): change the current working directory.
- pwd(): print the current working directory.
- move_file(source, dest): move/rename a file under `/documents/`.
- rm(path): delete a single file under `/documents/` (no `-r`).
- rmdir(path): delete an empty directory under `/documents/`.
- list_tree(path, max_depth, page_size): recursively list files/folders.
## Persistence Rules
- Files written under `/documents/<...>` are **persisted** at end of turn as
Documents in the user's knowledge base.
- Files whose **basename** starts with `temp_` (e.g. `temp_plan.md` or
`/documents/temp_scratch.md`) are **discarded** at end of turn use this
prefix for any scratch/working content you do NOT want saved.
- All other paths (outside `/documents/` and not `temp_*`) are rejected.
- mkdir/move_file/rm/rmdir are staged this turn and committed at end of
turn alongside any new/edited documents. Snapshot/revert is enabled
for every destructive operation when action logging is on.
## Reading Documents Efficiently
Documents are formatted as XML. Each document contains:
- `<document_metadata>` title, type, URL, etc.
- `<chunk_index>` a table of every chunk with its **line range** and a
`matched="true"` flag for chunks that matched the search query.
- `<document_content>` the actual chunks in original document order.
**Workflow**: when reading a large document, read the first ~20 lines to see
the `<chunk_index>`, identify chunks marked `matched="true"`, then use
`read_file(path, offset=<start_line>, limit=<lines>)` to jump directly to
those sections instead of reading the entire file sequentially.
Use `<chunk id='...'>` values as citation IDs in your answers.
## Priority List
You receive a `<priority_documents>` system message each turn listing the
top-K paths most relevant to the user's query (by hybrid search). Read those
first matched sections are flagged inside each document's `<chunk_index>`.
## Workspace Tree
You receive a `<workspace_tree>` system message each turn with the current
folder/document layout. The tree may be truncated past a hard cap; in that
case, drill into specific folders with `ls(...)` or `list_tree(...)`.
## grep Line Numbers
`grep` searches across both your in-memory edits and the indexed chunks in
Postgres. State-cached files return real line numbers; database hits return
`line=0` because their position depends on per-document XML layout call
`read_file(path)` to find the exact line.
"""

View file

@ -0,0 +1,22 @@
"""Mode-agnostic prompt fragments: header conventions + sandbox addendum."""
from __future__ import annotations
HEADER = """## Following Conventions
- Read files before editing understand existing content before making changes.
- Mimic existing style, naming conventions, and patterns.
- Never claim a file was created/updated unless filesystem tool output confirms success.
- If a file write/edit fails, explicitly report the failure.
"""
SANDBOX_ADDENDUM = (
"\n- execute_code: run Python code in an isolated sandbox."
"\n\n## Code Execution"
"\n\nUse execute_code whenever a task benefits from running code."
" Never perform arithmetic manually."
"\n\nDocuments here are XML-wrapped markdown, not raw data files."
" To work with them programmatically, read the document first,"
" extract the data, write it as a clean file (CSV, JSON, etc.),"
" and then run your code against it."
)

View file

@ -0,0 +1,49 @@
"""Desktop-mode filesystem system prompt body."""
from __future__ import annotations
BODY = """
## Local Folder Mode
This chat operates directly on the user's local folders. Writes and edits
hit disk immediately there is no end-of-turn staging, no `/documents/`
namespace, and no `temp_` semantics.
## Filesystem Tools
All file paths must start with `/` and use mount-prefixed absolute paths
like `/<mount>/file.ext`. Relative paths resolve against the current working
directory (`cwd`).
- ls(path, offset=0, limit=200): list files and directories at the given path.
- read_file(path, offset, limit): read a file (paginated) from disk.
- write_file(path, content): write a file to disk.
- edit_file(path, old, new): exact string-replacement edit on disk.
- glob(pattern, path): find files matching a glob pattern.
- grep(pattern, path, glob): substring search across files.
- mkdir(path): create a directory on disk.
- cd(path): change the current working directory.
- pwd(): print the current working directory.
- move_file(source, dest): move/rename a file.
- rm(path): delete a single file from disk (no `-r`). NOT reversible.
- rmdir(path): delete an empty directory from disk. NOT reversible.
- list_tree(path, max_depth, page_size): recursively list files/folders.
## Workflow Tips
- If you are unsure which mounts are available, call `ls('/')` first.
- For large trees, prefer `list_tree` then `grep` then `read_file` over
brute-force directory traversal.
- Cross-mount moves are not supported.
- Desktop deletes hit disk immediately and cannot be undone via the
agent's revert flow — confirm before calling `rm`/`rmdir`.
## Priority List
You may receive a `<priority_documents>` system message listing the top-K
documents from the user's SurfSense knowledge base — these are cloud-ingested
via connectors (Notion, Slack, etc.), not local files. Treat it as a hint:
consult it when the task spans both local and cloud sources (e.g. drafting a
local note from a Notion summary); skip when the task is purely about local
files.
"""

View file

@ -0,0 +1,20 @@
"""Public assembly of the FS system prompt for a given session."""
from __future__ import annotations
from app.agents.new_chat.filesystem_selection import FilesystemMode
from .cloud import BODY as CLOUD_BODY
from .common import HEADER, SANDBOX_ADDENDUM
from .desktop import BODY as DESKTOP_BODY
def build_system_prompt(
mode: FilesystemMode, *, sandbox_available: bool
) -> str:
"""Assemble the FS prompt: common header + mode body + optional sandbox section."""
body = CLOUD_BODY if mode == FilesystemMode.CLOUD else DESKTOP_BODY
base = HEADER + body
if sandbox_available:
base += SANDBOX_ADDENDUM
return base

View file

@ -0,0 +1,31 @@
"""Filesystem tool factories — one vertical slice per tool."""
from __future__ import annotations
from .cd import create_cd_tool
from .edit_file import create_edit_file_tool
from .execute_code import create_execute_code_tool
from .list_tree import create_list_tree_tool
from .ls import create_ls_tool
from .mkdir import create_mkdir_tool
from .move_file import create_move_file_tool
from .pwd import create_pwd_tool
from .read_file import create_read_file_tool
from .rm import create_rm_tool
from .rmdir import create_rmdir_tool
from .write_file import create_write_file_tool
__all__ = [
"create_cd_tool",
"create_edit_file_tool",
"create_execute_code_tool",
"create_list_tree_tool",
"create_ls_tool",
"create_mkdir_tool",
"create_move_file_tool",
"create_pwd_tool",
"create_read_file_tool",
"create_rm_tool",
"create_rmdir_tool",
"create_write_file_tool",
]

View file

@ -0,0 +1,7 @@
"""Tool: ``cd`` — change the current working directory (cwd)."""
from __future__ import annotations
from .index import create_cd_tool
__all__ = ["create_cd_tool"]

View file

@ -0,0 +1,19 @@
"""Description string for ``cd`` (mode-agnostic)."""
from __future__ import annotations
from app.agents.new_chat.filesystem_selection import FilesystemMode
_DESCRIPTION = """Changes the current working directory (cwd).
Args:
- path: absolute or relative directory path. Relative paths resolve against
the current cwd.
The new cwd is used by other filesystem tools whenever a relative path is
given. Returns the resolved cwd.
"""
def select_description(mode: FilesystemMode) -> str:
return _DESCRIPTION

View file

@ -0,0 +1,80 @@
"""``cd`` factory: resolve target, verify existence (staged + on-disk), update cwd."""
from __future__ import annotations
from typing import TYPE_CHECKING, Annotated
from deepagents.backends.utils import validate_path
from langchain.tools import ToolRuntime
from langchain_core.messages import ToolMessage
from langchain_core.tools import BaseTool, StructuredTool
from langgraph.types import Command
from app.agents.new_chat.filesystem_state import SurfSenseFilesystemState
from app.agents.new_chat.path_resolver import DOCUMENTS_ROOT
from ...middleware.async_dispatch import run_async_blocking
from ...middleware.path_resolution import resolve_relative
from .description import select_description
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
def create_cd_tool(mw: "SurfSenseFilesystemMiddleware") -> BaseTool:
description = select_description(mw._filesystem_mode)
async def async_cd(
path: Annotated[str, "Absolute or relative directory path to switch into."],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> Command | str:
target = resolve_relative(mw, path, runtime)
try:
validated = validate_path(target)
except ValueError as exc:
return f"Error: {exc}"
backend = mw._get_backend(runtime)
try:
infos = await backend.als_info(validated)
except Exception as exc: # pragma: no cover - defensive
return f"Error: {exc}"
staged_dirs = list(runtime.state.get("staged_dirs") or [])
files = runtime.state.get("files") or {}
cwd_exists = (
bool(infos)
or validated in staged_dirs
or any(p == validated for p in files)
or any(
isinstance(p, str) and p.startswith(validated.rstrip("/") + "/")
for p in files
)
or validated == "/"
or validated == DOCUMENTS_ROOT
)
if not cwd_exists:
return f"Error: directory '{validated}' not found."
return Command(
update={
"cwd": validated,
"messages": [
ToolMessage(
content=f"cwd changed to {validated}",
tool_call_id=runtime.tool_call_id,
)
],
}
)
def sync_cd(
path: Annotated[str, "Absolute or relative directory path to switch into."],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> Command | str:
return run_async_blocking(async_cd(path, runtime))
return StructuredTool.from_function(
name="cd",
description=description,
func=sync_cd,
coroutine=async_cd,
)

View file

@ -0,0 +1,7 @@
"""Tool: ``edit_file`` — exact string replacement on a file."""
from __future__ import annotations
from .index import create_edit_file_tool
__all__ = ["create_edit_file_tool"]

View file

@ -0,0 +1,28 @@
"""Mode-specific description strings for ``edit_file``."""
from __future__ import annotations
from app.agents.new_chat.filesystem_selection import FilesystemMode
_CLOUD_DESCRIPTION = """Performs exact string replacements in files.
IMPORTANT:
- Read the file before editing.
- Preserve exact indentation and formatting.
- Edits to documents under `/documents/` are persisted at end of turn.
- Edits to `temp_*` files are discarded at end of turn.
"""
_DESKTOP_DESCRIPTION = """Performs exact string replacements in files on disk.
IMPORTANT:
- Read the file before editing.
- Preserve exact indentation and formatting.
- Edits hit disk immediately.
"""
def select_description(mode: FilesystemMode) -> str:
if mode == FilesystemMode.CLOUD:
return _CLOUD_DESCRIPTION
return _DESKTOP_DESCRIPTION

View file

@ -0,0 +1,132 @@
"""``edit_file`` factory: lazy-load KB doc, enforce cloud namespace, dispatch to backend."""
from __future__ import annotations
from typing import TYPE_CHECKING, Annotated, Any
from deepagents.backends.protocol import EditResult
from deepagents.backends.utils import validate_path
from langchain.tools import ToolRuntime
from langchain_core.messages import ToolMessage
from langchain_core.tools import BaseTool, StructuredTool
from langgraph.types import Command
from app.agents.new_chat.filesystem_state import SurfSenseFilesystemState
from app.agents.new_chat.middleware.kb_postgres_backend import KBPostgresBackend
from ...middleware.async_dispatch import run_async_blocking
from ...middleware.mode import is_cloud
from ...middleware.namespace_policy import check_cloud_write_namespace
from ...middleware.path_resolution import resolve_relative
from .description import select_description
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
def create_edit_file_tool(mw: "SurfSenseFilesystemMiddleware") -> BaseTool:
description = select_description(mw._filesystem_mode)
async def async_edit_file(
file_path: Annotated[
str,
"Absolute path to the file to edit. Relative paths resolve against the current cwd.",
],
old_string: Annotated[
str,
"Exact text to replace. Must be unique unless replace_all is True.",
],
new_string: Annotated[
str,
"Replacement text. Must differ from old_string.",
],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
*,
replace_all: Annotated[
bool,
"If True, replace all occurrences of old_string. Defaults to False.",
] = False,
) -> Command | str:
target = resolve_relative(mw, file_path, runtime)
try:
validated = validate_path(target)
except ValueError as exc:
return f"Error: {exc}"
namespace_error = check_cloud_write_namespace(mw, validated, runtime)
if namespace_error:
return namespace_error
backend = mw._get_backend(runtime)
files_state = runtime.state.get("files") or {}
doc_id_to_attach: int | None = None
if (
is_cloud(mw._filesystem_mode)
and validated not in files_state
and isinstance(backend, KBPostgresBackend)
):
loaded = await backend._load_file_data(validated)
if loaded is None:
return f"Error: File '{validated}' not found"
_, doc_id_to_attach = loaded
res: EditResult = await backend.aedit(
validated, old_string, new_string, replace_all=replace_all
)
if res.error:
return res.error
path = res.path or validated
files_update = res.files_update or {}
update: dict[str, Any] = {
"files": files_update,
"messages": [
ToolMessage(
content=(
f"Successfully replaced {res.occurrences} instance(s) "
f"of the string in '{path}'"
),
tool_call_id=runtime.tool_call_id,
)
],
}
if is_cloud(mw._filesystem_mode):
update["dirty_paths"] = [path]
update["dirty_path_tool_calls"] = {path: runtime.tool_call_id}
if doc_id_to_attach is not None:
update["doc_id_by_path"] = {path: doc_id_to_attach}
return Command(update=update)
def sync_edit_file(
file_path: Annotated[
str,
"Absolute path to the file to edit. Relative paths resolve against the current cwd.",
],
old_string: Annotated[
str,
"Exact text to replace. Must be unique unless replace_all is True.",
],
new_string: Annotated[
str,
"Replacement text. Must differ from old_string.",
],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
*,
replace_all: Annotated[
bool,
"If True, replace all occurrences of old_string. Defaults to False.",
] = False,
) -> Command | str:
return run_async_blocking(
async_edit_file(
file_path, old_string, new_string, runtime, replace_all=replace_all
)
)
return StructuredTool.from_function(
name="edit_file",
description=description,
func=sync_edit_file,
coroutine=async_edit_file,
)

View file

@ -0,0 +1,7 @@
"""Tool: ``execute_code`` — run Python code in an isolated sandbox."""
from __future__ import annotations
from .index import create_execute_code_tool
__all__ = ["create_execute_code_tool"]

View file

@ -0,0 +1,21 @@
"""Description string for ``execute_code`` (mode-agnostic)."""
from __future__ import annotations
from app.agents.new_chat.filesystem_selection import FilesystemMode
_DESCRIPTION = """Executes Python code in an isolated sandbox environment.
Common data-science packages are pre-installed (pandas, numpy, matplotlib,
scipy, scikit-learn).
Usage notes:
- No outbound network access.
- Returns combined stdout/stderr with exit code.
- Use print() to produce output.
- Use the optional timeout parameter to override the default timeout.
"""
def select_description(mode: FilesystemMode) -> str:
return _DESCRIPTION

View file

@ -0,0 +1,89 @@
"""Sandbox-execution helpers for ``execute_code``.
Wraps user-supplied code in a heredoc and dispatches it to the Daytona
sandbox associated with the current chat thread, with a single retry on
sandbox failure.
"""
from __future__ import annotations
import logging
import secrets
from typing import TYPE_CHECKING
from daytona.common.errors import DaytonaError
from langchain.tools import ToolRuntime
from app.agents.new_chat.filesystem_state import SurfSenseFilesystemState
from app.agents.new_chat.sandbox import (
_evict_sandbox_cache,
delete_sandbox,
get_or_create_sandbox,
)
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
logger = logging.getLogger(__name__)
MAX_EXECUTE_TIMEOUT = 300
def wrap_as_python(code: str) -> str:
"""Wrap ``code`` in a unique-sentinel heredoc for shell execution."""
sentinel = f"_PYEOF_{secrets.token_hex(8)}"
return f"python3 << '{sentinel}'\n{code}\n{sentinel}"
async def execute_in_sandbox(
mw: "SurfSenseFilesystemMiddleware",
command: str,
runtime: ToolRuntime[None, SurfSenseFilesystemState],
timeout: int | None,
) -> str:
"""Top-level entry: wraps + retries once on sandbox failure."""
assert mw._thread_id is not None
command = wrap_as_python(command)
try:
return await _try_sandbox_execute(mw, command, runtime, timeout)
except (DaytonaError, Exception) as first_err:
logger.warning(
"Sandbox execute failed for thread %s, retrying: %s",
mw._thread_id,
first_err,
)
try:
await delete_sandbox(mw._thread_id)
except Exception:
_evict_sandbox_cache(mw._thread_id)
try:
return await _try_sandbox_execute(mw, command, runtime, timeout)
except Exception:
logger.exception(
"Sandbox retry also failed for thread %s", mw._thread_id
)
return "Error: Code execution is temporarily unavailable. Please try again."
async def _try_sandbox_execute(
mw: "SurfSenseFilesystemMiddleware",
command: str,
runtime: ToolRuntime[None, SurfSenseFilesystemState],
timeout: int | None,
) -> str:
"""One sandbox-execute attempt: get/create sandbox, run, format output."""
sandbox, _is_new = await get_or_create_sandbox(mw._thread_id)
result = await sandbox.aexecute(command, timeout=timeout)
output = (result.output or "").strip()
if not output and result.exit_code == 0:
return (
"[Code executed successfully but produced no output. "
"Use print() to display results, then try again.]"
)
parts = [result.output]
if result.exit_code is not None:
status = "succeeded" if result.exit_code == 0 else "failed"
parts.append(f"\n[Command {status} with exit code {result.exit_code}]")
if result.truncated:
parts.append("\n[Output was truncated due to size limits]")
return "".join(parts)

View file

@ -0,0 +1,64 @@
"""``execute_code`` factory: bounds-check timeout, dispatch to the sandbox."""
from __future__ import annotations
from typing import TYPE_CHECKING, Annotated
from langchain.tools import ToolRuntime
from langchain_core.tools import BaseTool, StructuredTool
from app.agents.new_chat.filesystem_state import SurfSenseFilesystemState
from ...middleware.async_dispatch import run_async_blocking
from .description import select_description
from .helpers import MAX_EXECUTE_TIMEOUT, execute_in_sandbox
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
def create_execute_code_tool(mw: "SurfSenseFilesystemMiddleware") -> BaseTool:
description = select_description(mw._filesystem_mode)
def sync_execute_code(
command: Annotated[
str, "Python code to execute. Use print() to see output."
],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
timeout: Annotated[
int | None,
"Optional timeout in seconds.",
] = None,
) -> str:
if timeout is not None:
if timeout < 0:
return f"Error: timeout must be non-negative, got {timeout}."
if timeout > MAX_EXECUTE_TIMEOUT:
return f"Error: timeout {timeout}s exceeds maximum ({MAX_EXECUTE_TIMEOUT}s)."
return run_async_blocking(
execute_in_sandbox(mw, command, runtime, timeout)
)
async def async_execute_code(
command: Annotated[
str, "Python code to execute. Use print() to see output."
],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
timeout: Annotated[
int | None,
"Optional timeout in seconds.",
] = None,
) -> str:
if timeout is not None:
if timeout < 0:
return f"Error: timeout must be non-negative, got {timeout}."
if timeout > MAX_EXECUTE_TIMEOUT:
return f"Error: timeout {timeout}s exceeds maximum ({MAX_EXECUTE_TIMEOUT}s)."
return await execute_in_sandbox(mw, command, runtime, timeout)
return StructuredTool.from_function(
name="execute_code",
description=description,
func=sync_execute_code,
coroutine=async_execute_code,
)

View file

@ -0,0 +1,7 @@
"""Tool: ``glob`` — description override (the tool comes from the base middleware)."""
from __future__ import annotations
from .description import select_description
__all__ = ["select_description"]

View file

@ -0,0 +1,15 @@
"""Description string for ``glob`` (mode-agnostic)."""
from __future__ import annotations
from app.agents.new_chat.filesystem_selection import FilesystemMode
_DESCRIPTION = """Find files matching a glob pattern.
Supports standard glob patterns: `*`, `**`, `?`.
Returns absolute file paths.
"""
def select_description(mode: FilesystemMode) -> str:
return _DESCRIPTION

View file

@ -0,0 +1,7 @@
"""Tool: ``grep`` — description override (the tool comes from the base middleware)."""
from __future__ import annotations
from .description import select_description
__all__ = ["select_description"]

View file

@ -0,0 +1,24 @@
"""Mode-specific description strings for ``grep``."""
from __future__ import annotations
from app.agents.new_chat.filesystem_selection import FilesystemMode
_CLOUD_DESCRIPTION = """Search for a literal text pattern across files.
Searches both your in-memory edits and the indexed chunks in Postgres.
State-cached file matches include real line numbers; database hits return
`line=0` because their position depends on per-document XML layout call
`read_file(path)` afterwards to find the exact line.
"""
_DESKTOP_DESCRIPTION = """Search for a literal text pattern across files.
Searches files on disk and any in-memory edits. Returns real line numbers.
"""
def select_description(mode: FilesystemMode) -> str:
if mode == FilesystemMode.CLOUD:
return _CLOUD_DESCRIPTION
return _DESKTOP_DESCRIPTION

View file

@ -0,0 +1,7 @@
"""Tool: ``list_tree`` — recursively list files / folders in one bounded call."""
from __future__ import annotations
from .index import create_list_tree_tool
__all__ = ["create_list_tree_tool"]

View file

@ -0,0 +1,37 @@
"""Mode-specific description strings for ``list_tree``."""
from __future__ import annotations
from app.agents.new_chat.filesystem_selection import FilesystemMode
_CLOUD_DESCRIPTION = """Lists files/folders recursively in a single bounded call.
Args:
- path: absolute path to start from. Defaults to `/documents`.
- max_depth: recursion depth limit (default 8).
- page_size: maximum number of entries returned (max 1000).
- include_files / include_dirs: filter returned entry types.
Returns JSON with:
- entries: [{path, is_dir, size, modified_at, depth}]
- truncated: true when additional entries were omitted due to page_size.
"""
_DESKTOP_DESCRIPTION = """Lists files/folders recursively in a single bounded call.
Args:
- path: absolute path to start from. Defaults to `/`.
- max_depth: recursion depth limit (default 8).
- page_size: maximum number of entries returned (max 1000).
- include_files / include_dirs: filter returned entry types.
Returns JSON with:
- entries: [{path, is_dir, size, modified_at, depth}]
- truncated: true when additional entries were omitted due to page_size.
"""
def select_description(mode: FilesystemMode) -> str:
if mode == FilesystemMode.CLOUD:
return _CLOUD_DESCRIPTION
return _DESKTOP_DESCRIPTION

View file

@ -0,0 +1,101 @@
"""``list_tree`` factory: bounded recursive listing across cloud / desktop backends."""
from __future__ import annotations
import json
from typing import TYPE_CHECKING, Annotated
from deepagents.backends.utils import validate_path
from langchain.tools import ToolRuntime
from langchain_core.tools import BaseTool, StructuredTool
from app.agents.new_chat.filesystem_state import SurfSenseFilesystemState
from app.agents.new_chat.middleware.kb_postgres_backend import KBPostgresBackend
from ...middleware.async_dispatch import run_async_blocking
from ...middleware.path_resolution import resolve_list_target_path
from .description import select_description
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
def create_list_tree_tool(mw: "SurfSenseFilesystemMiddleware") -> BaseTool:
description = select_description(mw._filesystem_mode)
async def async_list_tree(
runtime: ToolRuntime[None, SurfSenseFilesystemState],
path: Annotated[
str,
"Absolute path to start from. Defaults to /documents in cloud mode.",
] = "",
max_depth: Annotated[int, "Recursion depth limit. Default 8."] = 8,
page_size: Annotated[int, "Maximum entries returned. Max 1000."] = 500,
include_files: Annotated[bool, "Include file entries."] = True,
include_dirs: Annotated[bool, "Include directory entries."] = True,
) -> str:
if max_depth < 0:
return "Error: max_depth must be >= 0."
if page_size < 1:
return "Error: page_size must be >= 1."
if not include_files and not include_dirs:
return "Error: include_files and include_dirs cannot both be false."
target = resolve_list_target_path(mw, path, runtime)
try:
validated = validate_path(target)
except ValueError as exc:
return f"Error: {exc}"
backend = mw._get_backend(runtime)
if isinstance(backend, KBPostgresBackend):
result = await backend.alist_tree_listing(
validated,
max_depth=max_depth,
page_size=page_size,
include_files=include_files,
include_dirs=include_dirs,
)
elif hasattr(backend, "alist_tree"):
result = await backend.alist_tree(
validated,
max_depth=max_depth,
page_size=page_size,
include_files=include_files,
include_dirs=include_dirs,
)
else:
return "Error: list_tree is not supported by the active backend."
if isinstance(result, dict) and isinstance(result.get("error"), str):
return result["error"]
return json.dumps(result, ensure_ascii=True)
def sync_list_tree(
runtime: ToolRuntime[None, SurfSenseFilesystemState],
path: Annotated[
str,
"Absolute path to start from. Defaults to /documents in cloud mode.",
] = "",
max_depth: Annotated[int, "Recursion depth limit. Default 8."] = 8,
page_size: Annotated[int, "Maximum entries returned. Max 1000."] = 500,
include_files: Annotated[bool, "Include file entries."] = True,
include_dirs: Annotated[bool, "Include directory entries."] = True,
) -> str:
return run_async_blocking(
async_list_tree(
runtime,
path=path,
max_depth=max_depth,
page_size=page_size,
include_files=include_files,
include_dirs=include_dirs,
)
)
return StructuredTool.from_function(
name="list_tree",
description=description,
func=sync_list_tree,
coroutine=async_list_tree,
)

View file

@ -0,0 +1,7 @@
"""Tool: ``ls`` — list files and directories at a path."""
from __future__ import annotations
from .index import create_ls_tool
__all__ = ["create_ls_tool"]

View file

@ -0,0 +1,29 @@
"""Mode-specific description strings for ``ls``."""
from __future__ import annotations
from app.agents.new_chat.filesystem_selection import FilesystemMode
_CLOUD_DESCRIPTION = """Lists files and directories at the given path.
Usage:
- Provide an absolute path under `/documents` (relative paths resolve under
the current cwd, which defaults to `/documents`).
- For very large folders, use `offset` and `limit` to paginate the listing.
- Returns one entry per line; directories end with a trailing `/`.
"""
_DESKTOP_DESCRIPTION = """Lists files and directories at the given path.
Usage:
- Provide an absolute path using a mount prefix (e.g. `/<mount>/sub/dir`).
Use `ls('/')` to discover available mounts.
- For very large folders, use `offset` and `limit` to paginate the listing.
- Returns one entry per line; directories end with a trailing `/`.
"""
def select_description(mode: FilesystemMode) -> str:
if mode == FilesystemMode.CLOUD:
return _CLOUD_DESCRIPTION
return _DESKTOP_DESCRIPTION

View file

@ -0,0 +1,96 @@
"""``ls`` factory: resolve target, page through backend listing."""
from __future__ import annotations
from typing import TYPE_CHECKING, Annotated
from deepagents.backends.utils import validate_path
from langchain.tools import ToolRuntime
from langchain_core.tools import BaseTool, StructuredTool
from app.agents.new_chat.filesystem_state import SurfSenseFilesystemState
from app.agents.new_chat.middleware.kb_postgres_backend import paginate_listing
from ...middleware.async_dispatch import run_async_blocking
from ...middleware.path_resolution import resolve_list_target_path
from .description import select_description
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
def create_ls_tool(mw: "SurfSenseFilesystemMiddleware") -> BaseTool:
description = select_description(mw._filesystem_mode)
async def async_ls(
runtime: ToolRuntime[None, SurfSenseFilesystemState],
path: Annotated[
str,
"Absolute path to the directory to list. Relative paths resolve against the current cwd.",
] = "",
offset: Annotated[
int,
"Number of entries to skip. Use for paginating large folders. Defaults to 0.",
] = 0,
limit: Annotated[
int,
"Maximum number of entries to return. Defaults to 200.",
] = 200,
) -> str:
target = resolve_list_target_path(mw, path, runtime)
try:
validated = validate_path(target)
except ValueError as exc:
return f"Error: {exc}"
if offset < 0:
offset = 0
if limit < 1:
limit = 1
backend = mw._get_backend(runtime)
infos = await backend.als_info(validated)
page = paginate_listing(infos, offset=offset, limit=limit)
paths = [
f"{fi.get('path', '')}/" if fi.get("is_dir") else fi.get("path", "")
for fi in page
]
total = len(infos)
shown = len(page)
header = (
f"{validated} ({shown} of {total} entries"
f"{f', offset={offset}' if offset else ''})"
)
if not paths:
return f"{header}\n(empty)"
body = "\n".join(paths)
if total > offset + shown:
body += (
f"\n... {total - offset - shown} more — call ls("
f"'{validated}', offset={offset + shown}, limit={limit})"
)
return f"{header}\n{body}"
def sync_ls(
runtime: ToolRuntime[None, SurfSenseFilesystemState],
path: Annotated[
str,
"Absolute path to the directory to list. Relative paths resolve against the current cwd.",
] = "",
offset: Annotated[
int,
"Number of entries to skip. Use for paginating large folders. Defaults to 0.",
] = 0,
limit: Annotated[
int,
"Maximum number of entries to return. Defaults to 200.",
] = 200,
) -> str:
return run_async_blocking(
async_ls(runtime, path=path, offset=offset, limit=limit)
)
return StructuredTool.from_function(
name="ls",
description=description,
func=sync_ls,
coroutine=async_ls,
)

View file

@ -0,0 +1,7 @@
"""Tool: ``mkdir`` — create a directory."""
from __future__ import annotations
from .index import create_mkdir_tool
__all__ = ["create_mkdir_tool"]

View file

@ -0,0 +1,33 @@
"""Mode-specific description strings for ``mkdir``."""
from __future__ import annotations
from app.agents.new_chat.filesystem_selection import FilesystemMode
_CLOUD_DESCRIPTION = """Creates a directory under `/documents/`.
Stages the folder for end-of-turn commit; the Folder row is inserted only
after the agent's turn finishes successfully.
Args:
- path: absolute path of the new directory (must start with
`/documents/`).
Notes:
- Parent folders are created as needed.
"""
_DESKTOP_DESCRIPTION = """Creates a directory on disk.
Args:
- path: absolute mount-prefixed path of the new directory.
Notes:
- Parent folders are created as needed.
"""
def select_description(mode: FilesystemMode) -> str:
if mode == FilesystemMode.CLOUD:
return _CLOUD_DESCRIPTION
return _DESKTOP_DESCRIPTION

View file

@ -0,0 +1,94 @@
"""``mkdir`` factory: cloud stages for end-of-turn; desktop hits disk immediately."""
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING, Annotated, Any
from deepagents.backends.utils import validate_path
from langchain.tools import ToolRuntime
from langchain_core.messages import ToolMessage
from langchain_core.tools import BaseTool, StructuredTool
from langgraph.types import Command
from app.agents.new_chat.filesystem_state import SurfSenseFilesystemState
from app.agents.new_chat.path_resolver import DOCUMENTS_ROOT
from ...middleware.async_dispatch import run_async_blocking
from ...middleware.mode import is_cloud
from ...middleware.path_resolution import resolve_relative
from .description import select_description
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
def create_mkdir_tool(mw: "SurfSenseFilesystemMiddleware") -> BaseTool:
description = select_description(mw._filesystem_mode)
async def async_mkdir(
path: Annotated[str, "Absolute or relative directory path to create."],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> Command | str:
target = resolve_relative(mw, path, runtime)
try:
validated = validate_path(target)
except ValueError as exc:
return f"Error: {exc}"
if is_cloud(mw._filesystem_mode):
if not (
validated.startswith(DOCUMENTS_ROOT + "/")
or validated == DOCUMENTS_ROOT
):
return (
"Error: cloud mkdir must target a path under /documents/ "
f"(got '{validated}')."
)
return Command(
update={
"staged_dirs": [validated],
"staged_dir_tool_calls": {
validated: runtime.tool_call_id,
},
"messages": [
ToolMessage(
content=(
f"Staged directory '{validated}' (will be created "
"at end of turn)."
),
tool_call_id=runtime.tool_call_id,
)
],
}
)
backend = mw._get_backend(runtime)
local_method = getattr(backend, "amkdir", None) or getattr(
backend, "mkdir", None
)
if callable(local_method):
try:
res: Any = local_method(validated, parents=True, exist_ok=True)
if asyncio.iscoroutine(res):
await res
except TypeError:
res = local_method(validated)
if asyncio.iscoroutine(res):
await res
except Exception as exc: # pragma: no cover
return f"Error: {exc}"
return f"Created directory {validated}"
def sync_mkdir(
path: Annotated[str, "Absolute or relative directory path to create."],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> Command | str:
return run_async_blocking(async_mkdir(path, runtime))
return StructuredTool.from_function(
name="mkdir",
description=description,
func=sync_mkdir,
coroutine=async_mkdir,
)

View file

@ -0,0 +1,7 @@
"""Tool: ``move_file`` — move or rename a file."""
from __future__ import annotations
from .index import create_move_file_tool
__all__ = ["create_move_file_tool"]

View file

@ -0,0 +1,33 @@
"""Mode-specific description strings for ``move_file``."""
from __future__ import annotations
from app.agents.new_chat.filesystem_selection import FilesystemMode
_CLOUD_DESCRIPTION = """Moves or renames a file or folder.
Use absolute paths for both source and destination.
Notes:
- `move_file` is staged this turn and committed at end of turn.
- The agent cannot overwrite an existing destination pass a fresh dest
path or move the existing destination away first.
- The anonymous uploaded document is read-only and cannot be moved.
- Rename is a special case of move (same folder, different filename).
"""
_DESKTOP_DESCRIPTION = """Moves or renames a file or folder on disk.
Use mount-prefixed absolute paths for both source and destination
(e.g. `/<mount>/old.txt` -> `/<mount>/new.txt`).
Notes:
- Cross-mount moves are not supported.
- Rename is a special case of move (same folder, different filename).
"""
def select_description(mode: FilesystemMode) -> str:
if mode == FilesystemMode.CLOUD:
return _CLOUD_DESCRIPTION
return _DESKTOP_DESCRIPTION

View file

@ -0,0 +1,111 @@
"""Cloud-mode move helper: stages source/dest into pending_moves + files."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from langchain.tools import ToolRuntime
from langchain_core.messages import ToolMessage
from langgraph.types import Command
from app.agents.new_chat.filesystem_state import SurfSenseFilesystemState
from app.agents.new_chat.middleware.kb_postgres_backend import KBPostgresBackend
from app.agents.new_chat.path_resolver import DOCUMENTS_ROOT
from app.agents.new_chat.state_reducers import _CLEAR
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
async def cloud_move_file(
mw: "SurfSenseFilesystemMiddleware",
runtime: ToolRuntime[None, SurfSenseFilesystemState],
source: str,
dest: str,
*,
overwrite: bool,
) -> Command | str:
"""Stage a source/dest move in cloud mode (commit at end of turn)."""
backend = mw._get_backend(runtime)
if not isinstance(backend, KBPostgresBackend):
return "Error: cloud move requires KBPostgresBackend."
if source == dest:
return f"Moved '{source}' to '{dest}' (no-op)"
if overwrite:
return (
"Error: overwrite=True is not supported in cloud mode. Move/edit "
"the destination doc explicitly first."
)
if not source.startswith(DOCUMENTS_ROOT + "/"):
return (
"Error: cloud move_file source must be under /documents/ (got "
f"'{source}')."
)
if not dest.startswith(DOCUMENTS_ROOT + "/"):
return (
"Error: cloud move_file destination must be under /documents/ (got "
f"'{dest}')."
)
anon = runtime.state.get("kb_anon_doc") or {}
if isinstance(anon, dict):
anon_path = str(anon.get("path") or "")
if anon_path and (anon_path in (source, dest)):
return "Error: the anonymous uploaded document is read-only."
files = runtime.state.get("files") or {}
doc_id_by_path = runtime.state.get("doc_id_by_path") or {}
pending_moves = list(runtime.state.get("pending_moves") or [])
if dest in files:
return f"Error: destination '{dest}' already exists."
if any(move.get("dest") == dest for move in pending_moves):
return f"Error: destination '{dest}' already exists."
if dest != source:
existing_dest = await backend._load_file_data(dest)
if existing_dest is not None:
return f"Error: destination '{dest}' already exists."
source_file_data = files.get(source)
source_doc_id = doc_id_by_path.get(source)
if source_file_data is None:
loaded = await backend._load_file_data(source)
if loaded is None:
return f"Error: source '{source}' not found."
source_file_data, loaded_doc_id = loaded
if source_doc_id is None:
source_doc_id = loaded_doc_id
files_update: dict[str, Any] = {source: None, dest: source_file_data}
update: dict[str, Any] = {
"files": files_update,
"pending_moves": [
{
"source": source,
"dest": dest,
"overwrite": False,
"tool_call_id": runtime.tool_call_id,
}
],
"messages": [
ToolMessage(
content=(
f"Moved '{source}' to '{dest}' (will commit at end of turn)."
),
tool_call_id=runtime.tool_call_id,
)
],
}
doc_id_update: dict[str, int | None] = {source: None}
if source_doc_id is not None:
doc_id_update[dest] = source_doc_id
update["doc_id_by_path"] = doc_id_update
dirty_paths = list(runtime.state.get("dirty_paths") or [])
if source in dirty_paths:
new_dirty: list[Any] = [_CLEAR]
for entry in dirty_paths:
new_dirty.append(dest if entry == source else entry)
update["dirty_paths"] = new_dirty
return Command(update=update)

View file

@ -0,0 +1,98 @@
"""``move_file`` factory: dispatches cloud (staged) vs desktop (direct disk) moves."""
from __future__ import annotations
from typing import TYPE_CHECKING, Annotated, Any
from deepagents.backends.protocol import WriteResult
from deepagents.backends.utils import validate_path
from langchain.tools import ToolRuntime
from langchain_core.messages import ToolMessage
from langchain_core.tools import BaseTool, StructuredTool
from langgraph.types import Command
from app.agents.new_chat.filesystem_state import SurfSenseFilesystemState
from ...middleware.async_dispatch import run_async_blocking
from ...middleware.mode import is_cloud
from ...middleware.path_resolution import resolve_move_target_path
from .description import select_description
from .helpers import cloud_move_file
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
def create_move_file_tool(mw: "SurfSenseFilesystemMiddleware") -> BaseTool:
description = select_description(mw._filesystem_mode)
async def async_move_file(
source_path: Annotated[str, "Absolute or relative source path."],
destination_path: Annotated[str, "Absolute or relative destination path."],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
*,
overwrite: Annotated[
bool,
"If True, replace existing destination. Cloud mode rejects True. Defaults to False.",
] = False,
) -> Command | str:
if not source_path.strip() or not destination_path.strip():
return "Error: source_path and destination_path are required."
source = resolve_move_target_path(mw, source_path, runtime)
dest = resolve_move_target_path(mw, destination_path, runtime)
try:
validated_source = validate_path(source)
validated_dest = validate_path(dest)
except ValueError as exc:
return f"Error: {exc}"
if is_cloud(mw._filesystem_mode):
return await cloud_move_file(
mw,
runtime,
validated_source,
validated_dest,
overwrite=overwrite,
)
backend = mw._get_backend(runtime)
res: WriteResult = await backend.amove(
validated_source, validated_dest, overwrite=overwrite
)
if res.error:
return res.error
update: dict[str, Any] = {
"messages": [
ToolMessage(
content=f"Moved '{validated_source}' to '{res.path or validated_dest}'",
tool_call_id=runtime.tool_call_id,
)
],
}
if res.files_update is not None:
update["files"] = res.files_update
return Command(update=update)
def sync_move_file(
source_path: Annotated[str, "Absolute or relative source path."],
destination_path: Annotated[str, "Absolute or relative destination path."],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
*,
overwrite: Annotated[
bool,
"If True, replace existing destination. Cloud mode rejects True. Defaults to False.",
] = False,
) -> Command | str:
return run_async_blocking(
async_move_file(
source_path, destination_path, runtime, overwrite=overwrite
)
)
return StructuredTool.from_function(
name="move_file",
description=description,
func=sync_move_file,
coroutine=async_move_file,
)

View file

@ -0,0 +1,7 @@
"""Tool: ``pwd`` — print the current working directory."""
from __future__ import annotations
from .index import create_pwd_tool
__all__ = ["create_pwd_tool"]

View file

@ -0,0 +1,11 @@
"""Description string for ``pwd`` (mode-agnostic)."""
from __future__ import annotations
from app.agents.new_chat.filesystem_selection import FilesystemMode
_DESCRIPTION = """Prints the current working directory."""
def select_description(mode: FilesystemMode) -> str:
return _DESCRIPTION

View file

@ -0,0 +1,37 @@
"""``pwd`` factory: read the cwd from state."""
from __future__ import annotations
from typing import TYPE_CHECKING
from langchain.tools import ToolRuntime
from langchain_core.tools import BaseTool, StructuredTool
from app.agents.new_chat.filesystem_state import SurfSenseFilesystemState
from ...middleware.path_resolution import current_cwd
from .description import select_description
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
def create_pwd_tool(mw: "SurfSenseFilesystemMiddleware") -> BaseTool:
description = select_description(mw._filesystem_mode)
def sync_pwd(
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> str:
return current_cwd(mw, runtime)
async def async_pwd(
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> str:
return current_cwd(mw, runtime)
return StructuredTool.from_function(
name="pwd",
description=description,
func=sync_pwd,
coroutine=async_pwd,
)

View file

@ -0,0 +1,7 @@
"""Tool: ``read_file`` — read a file (paginated) from the filesystem."""
from __future__ import annotations
from .index import create_read_file_tool
__all__ = ["create_read_file_tool"]

View file

@ -0,0 +1,22 @@
"""Description string for ``read_file`` (mode-agnostic)."""
from __future__ import annotations
from app.agents.new_chat.filesystem_selection import FilesystemMode
_DESCRIPTION = """Reads a file from the filesystem.
Usage:
- By default, reads up to 100 lines from the beginning.
- Use `offset` and `limit` for pagination when files are large.
- Results include line numbers.
- Documents contain a `<chunk_index>` near the top listing every chunk with
its line range and a `matched="true"` flag for search-relevant chunks.
Read the index first, then jump to matched chunks with
`read_file(path, offset=<start_line>, limit=<num_lines>)`.
- Use chunk IDs (`<chunk id='...'>`) as citations in answers.
"""
def select_description(mode: FilesystemMode) -> str:
return _DESCRIPTION

View file

@ -0,0 +1,102 @@
"""``read_file`` factory: state-cache lookup, then lazy KB load, then disk read."""
from __future__ import annotations
from typing import TYPE_CHECKING, Annotated, Any
from deepagents.backends.utils import format_read_response, validate_path
from langchain.tools import ToolRuntime
from langchain_core.messages import ToolMessage
from langchain_core.tools import BaseTool, StructuredTool
from langgraph.types import Command
from app.agents.new_chat.filesystem_state import SurfSenseFilesystemState
from app.agents.new_chat.middleware.kb_postgres_backend import KBPostgresBackend
from ...middleware.async_dispatch import run_async_blocking
from ...middleware.path_resolution import resolve_relative
from .description import select_description
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
def create_read_file_tool(mw: "SurfSenseFilesystemMiddleware") -> BaseTool:
description = select_description(mw._filesystem_mode)
async def async_read_file(
file_path: Annotated[
str,
"Absolute path to the file to read. Relative paths resolve against the current cwd.",
],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
offset: Annotated[
int,
"Line number to start reading from (0-indexed).",
] = 0,
limit: Annotated[
int,
"Maximum number of lines to read.",
] = 100,
) -> Command | str:
target = resolve_relative(mw, file_path, runtime)
try:
validated = validate_path(target)
except ValueError as exc:
return f"Error: {exc}"
files = runtime.state.get("files") or {}
if validated in files:
return format_read_response(files[validated], offset, limit)
backend = mw._get_backend(runtime)
if isinstance(backend, KBPostgresBackend):
loaded = await backend._load_file_data(validated)
if loaded is None:
return f"Error: File '{validated}' not found"
file_data, doc_id = loaded
rendered = format_read_response(file_data, offset, limit)
update: dict[str, Any] = {
"files": {validated: file_data},
"messages": [
ToolMessage(
content=rendered,
tool_call_id=runtime.tool_call_id,
)
],
}
if doc_id is not None:
update["doc_id_by_path"] = {validated: doc_id}
return Command(update=update)
try:
rendered = await backend.aread(validated, offset=offset, limit=limit)
except Exception as exc: # pragma: no cover - defensive
return f"Error: {exc}"
return rendered
def sync_read_file(
file_path: Annotated[
str,
"Absolute path to the file to read. Relative paths resolve against the current cwd.",
],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
offset: Annotated[
int,
"Line number to start reading from (0-indexed).",
] = 0,
limit: Annotated[
int,
"Maximum number of lines to read.",
] = 100,
) -> Command | str:
return run_async_blocking(
async_read_file(file_path, runtime, offset, limit)
)
return StructuredTool.from_function(
name="read_file",
description=description,
func=sync_read_file,
coroutine=async_read_file,
)

View file

@ -0,0 +1,7 @@
"""Tool: ``rm`` — delete a single file."""
from __future__ import annotations
from .index import create_rm_tool
__all__ = ["create_rm_tool"]

View file

@ -0,0 +1,38 @@
"""Mode-specific description strings for ``rm``."""
from __future__ import annotations
from app.agents.new_chat.filesystem_selection import FilesystemMode
_CLOUD_DESCRIPTION = """Deletes a single file under `/documents/`.
Mirrors POSIX `rm path` (no `-r`, no glob expansion). Stages the deletion
for end-of-turn commit; the row is removed only after the agent's turn
finishes successfully.
Args:
- path: absolute or relative file path. Cannot point at a directory use
`rmdir` for empty folders. Cannot target the root or `/documents`.
Notes:
- The action is reversible via the per-action revert flow when action
logging is enabled.
- The anonymous uploaded document is read-only and cannot be deleted.
"""
_DESKTOP_DESCRIPTION = """Deletes a single file from disk.
Mirrors POSIX `rm path` (no `-r`, no glob expansion). The deletion hits
disk immediately. Desktop deletes are NOT reversible via the agent's
revert flow.
Args:
- path: absolute mount-prefixed file path. Cannot point at a directory
use `rmdir` for empty folders.
"""
def select_description(mode: FilesystemMode) -> str:
if mode == FilesystemMode.CLOUD:
return _CLOUD_DESCRIPTION
return _DESKTOP_DESCRIPTION

View file

@ -0,0 +1,139 @@
"""Cloud and desktop ``rm`` branches.
Both branches receive an already-resolved + validated absolute path.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from deepagents.backends.protocol import WriteResult
from langchain.tools import ToolRuntime
from langchain_core.messages import ToolMessage
from langgraph.types import Command
from app.agents.new_chat.filesystem_state import SurfSenseFilesystemState
from app.agents.new_chat.middleware.kb_postgres_backend import KBPostgresBackend
from app.agents.new_chat.path_resolver import DOCUMENTS_ROOT
from app.agents.new_chat.state_reducers import _CLEAR
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
async def cloud_rm(
mw: "SurfSenseFilesystemMiddleware",
runtime: ToolRuntime[None, SurfSenseFilesystemState],
validated: str,
) -> Command | str:
"""Stage a deletion in cloud mode (commit at end of turn)."""
if validated in ("/", DOCUMENTS_ROOT):
return f"Error: refusing to rm '{validated}'."
if not validated.startswith(DOCUMENTS_ROOT + "/"):
return (
"Error: cloud rm must target a path under /documents/ "
f"(got '{validated}')."
)
anon = runtime.state.get("kb_anon_doc") or {}
if isinstance(anon, dict) and str(anon.get("path") or "") == validated:
return "Error: the anonymous uploaded document is read-only."
staged_dirs = list(runtime.state.get("staged_dirs") or [])
if validated in staged_dirs:
return (
f"Error: '{validated}' is a directory. Use rmdir for "
"empty directories."
)
pending_dir_deletes = list(runtime.state.get("pending_dir_deletes") or [])
if any(
isinstance(d, dict) and d.get("path") == validated
for d in pending_dir_deletes
):
return f"Error: '{validated}' is already queued for rmdir."
backend = mw._get_backend(runtime)
if isinstance(backend, KBPostgresBackend):
children = await backend.als_info(validated)
if children:
return (
f"Error: '{validated}' is a directory. Use rmdir for "
"empty directories."
)
pending_deletes = list(runtime.state.get("pending_deletes") or [])
if any(
isinstance(d, dict) and d.get("path") == validated for d in pending_deletes
):
return f"'{validated}' is already queued for deletion."
files_state = runtime.state.get("files") or {}
doc_id_by_path = runtime.state.get("doc_id_by_path") or {}
resolved_doc_id: int | None = doc_id_by_path.get(validated)
if (
validated not in files_state
and resolved_doc_id is None
and isinstance(backend, KBPostgresBackend)
):
loaded = await backend._load_file_data(validated)
if loaded is None:
return f"Error: file '{validated}' not found."
_, resolved_doc_id = loaded
files_update: dict[str, Any] = {validated: None}
update: dict[str, Any] = {
"pending_deletes": [
{
"path": validated,
"tool_call_id": runtime.tool_call_id,
}
],
"files": files_update,
"doc_id_by_path": {validated: None},
"messages": [
ToolMessage(
content=(
f"Staged delete of '{validated}' (will commit at "
"end of turn)."
),
tool_call_id=runtime.tool_call_id,
)
],
}
dirty_paths = list(runtime.state.get("dirty_paths") or [])
if validated in dirty_paths:
new_dirty: list[Any] = [_CLEAR]
for entry in dirty_paths:
if entry != validated:
new_dirty.append(entry)
update["dirty_paths"] = new_dirty
update["dirty_path_tool_calls"] = {validated: None}
return Command(update=update)
async def desktop_rm(
mw: "SurfSenseFilesystemMiddleware",
runtime: ToolRuntime[None, SurfSenseFilesystemState],
validated: str,
) -> Command | str:
"""Hit disk immediately in desktop mode."""
backend = mw._get_backend(runtime)
adelete = getattr(backend, "adelete_file", None)
if not callable(adelete):
return "Error: rm is not supported by the active backend."
res: WriteResult = await adelete(validated)
if res.error:
return res.error
return Command(
update={
"files": {validated: None},
"messages": [
ToolMessage(
content=f"Deleted file '{res.path or validated}'",
tool_call_id=runtime.tool_call_id,
)
],
}
)

View file

@ -0,0 +1,61 @@
"""``rm`` factory: resolve + validate the path, then dispatch to cloud / desktop."""
from __future__ import annotations
from typing import TYPE_CHECKING, Annotated
from deepagents.backends.utils import validate_path
from langchain.tools import ToolRuntime
from langchain_core.tools import BaseTool, StructuredTool
from langgraph.types import Command
from app.agents.new_chat.filesystem_state import SurfSenseFilesystemState
from ...middleware.async_dispatch import run_async_blocking
from ...middleware.mode import is_cloud
from ...middleware.path_resolution import resolve_relative
from .description import select_description
from .helpers import cloud_rm, desktop_rm
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
def create_rm_tool(mw: "SurfSenseFilesystemMiddleware") -> BaseTool:
description = select_description(mw._filesystem_mode)
async def async_rm(
path: Annotated[
str,
"Absolute or relative path to the file to delete.",
],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> Command | str:
if not path or not path.strip():
return "Error: path is required."
target = resolve_relative(mw, path, runtime)
try:
validated = validate_path(target)
except ValueError as exc:
return f"Error: {exc}"
if is_cloud(mw._filesystem_mode):
return await cloud_rm(mw, runtime, validated)
return await desktop_rm(mw, runtime, validated)
def sync_rm(
path: Annotated[
str,
"Absolute or relative path to the file to delete.",
],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> Command | str:
return run_async_blocking(async_rm(path, runtime))
return StructuredTool.from_function(
name="rm",
description=description,
func=sync_rm,
coroutine=async_rm,
)

View file

@ -0,0 +1,7 @@
"""Tool: ``rmdir`` — delete an empty directory."""
from __future__ import annotations
from .index import create_rmdir_tool
__all__ = ["create_rmdir_tool"]

View file

@ -0,0 +1,42 @@
"""Mode-specific description strings for ``rmdir``."""
from __future__ import annotations
from app.agents.new_chat.filesystem_selection import FilesystemMode
_CLOUD_DESCRIPTION = """Deletes an empty directory under `/documents/`.
Mirrors POSIX `rmdir path`: refuses non-empty directories. Recursive
deletion (`rm -r`) is intentionally NOT supported clear contents with
`rm` first.
Args:
- path: absolute or relative directory path. Cannot target the root,
`/documents`, the current cwd, or any ancestor of cwd (use `cd` to
move out first).
Notes:
- Emptiness is evaluated against the post-staged view, so a same-turn
`rm /a/x.md` followed by `rmdir /a` is fine.
- If the directory was added in this same turn via `mkdir` and never
committed, the staged mkdir is dropped instead of issuing a delete.
- The action is reversible via the per-action revert flow when action
logging is enabled.
"""
_DESKTOP_DESCRIPTION = """Deletes an empty directory from disk.
Mirrors POSIX `rmdir path`: refuses non-empty directories. Recursive
deletion is NOT supported. The deletion hits disk immediately and is
NOT reversible via the agent's revert flow.
Args:
- path: absolute mount-prefixed directory path. Cannot target the mount
root or any directory containing files/subfolders.
"""
def select_description(mode: FilesystemMode) -> str:
if mode == FilesystemMode.CLOUD:
return _CLOUD_DESCRIPTION
return _DESKTOP_DESCRIPTION

View file

@ -0,0 +1,144 @@
"""Cloud and desktop ``rmdir`` branches.
Both branches receive an already-resolved + validated absolute path.
"""
from __future__ import annotations
import posixpath
from typing import TYPE_CHECKING
from deepagents.backends.protocol import WriteResult
from langchain.tools import ToolRuntime
from langchain_core.messages import ToolMessage
from langgraph.types import Command
from app.agents.new_chat.filesystem_state import SurfSenseFilesystemState
from app.agents.new_chat.middleware.kb_postgres_backend import KBPostgresBackend
from app.agents.new_chat.path_resolver import DOCUMENTS_ROOT
from app.agents.new_chat.state_reducers import _CLEAR
from ...middleware.path_resolution import current_cwd
from ...shared.paths import is_ancestor_of
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
async def cloud_rmdir(
mw: "SurfSenseFilesystemMiddleware",
runtime: ToolRuntime[None, SurfSenseFilesystemState],
validated: str,
) -> Command | str:
"""Stage an empty-folder delete in cloud mode (commit at end of turn)."""
if validated in ("/", DOCUMENTS_ROOT):
return f"Error: refusing to rmdir '{validated}'."
if not validated.startswith(DOCUMENTS_ROOT + "/"):
return (
"Error: cloud rmdir must target a path under /documents/ "
f"(got '{validated}')."
)
cwd = current_cwd(mw, runtime)
if validated == cwd or is_ancestor_of(validated, cwd):
return (
f"Error: cannot rmdir '{validated}' because the current "
"cwd is at or under it. cd out first."
)
staged_dirs = list(runtime.state.get("staged_dirs") or [])
pending_dir_deletes = list(runtime.state.get("pending_dir_deletes") or [])
if any(
isinstance(d, dict) and d.get("path") == validated
for d in pending_dir_deletes
):
return f"'{validated}' is already queued for deletion."
backend = mw._get_backend(runtime)
exists_in_staged = validated in staged_dirs
children: list = []
if isinstance(backend, KBPostgresBackend):
children = list(await backend.als_info(validated))
if (
isinstance(backend, KBPostgresBackend)
and not children
and not exists_in_staged
):
loaded = await backend._load_file_data(validated)
if loaded is not None:
return f"Error: '{validated}' is a file. Use rm to delete files."
parent = posixpath.dirname(validated) or "/"
parent_listing = await backend.als_info(parent)
parent_has_dir = any(
info.get("path") == validated and info.get("is_dir")
for info in parent_listing
)
if not parent_has_dir:
return f"Error: directory '{validated}' not found."
if children:
return (
f"Error: directory '{validated}' is not empty. Remove contents first."
)
if exists_in_staged:
rest = [d for d in staged_dirs if d != validated]
return Command(
update={
"staged_dirs": [_CLEAR, *rest],
"staged_dir_tool_calls": {validated: None},
"messages": [
ToolMessage(
content=(f"Un-staged directory '{validated}'."),
tool_call_id=runtime.tool_call_id,
)
],
}
)
return Command(
update={
"pending_dir_deletes": [
{
"path": validated,
"tool_call_id": runtime.tool_call_id,
}
],
"messages": [
ToolMessage(
content=(
f"Staged rmdir of '{validated}' (will commit "
"at end of turn)."
),
tool_call_id=runtime.tool_call_id,
)
],
}
)
async def desktop_rmdir(
mw: "SurfSenseFilesystemMiddleware",
runtime: ToolRuntime[None, SurfSenseFilesystemState],
validated: str,
) -> Command | str:
"""Hit disk immediately in desktop mode."""
backend = mw._get_backend(runtime)
armdir = getattr(backend, "armdir", None)
if not callable(armdir):
return "Error: rmdir is not supported by the active backend."
res: WriteResult = await armdir(validated)
if res.error:
return res.error
return Command(
update={
"messages": [
ToolMessage(
content=f"Deleted directory '{res.path or validated}'",
tool_call_id=runtime.tool_call_id,
)
],
}
)

View file

@ -0,0 +1,61 @@
"""``rmdir`` factory: resolve + validate the path, then dispatch to cloud / desktop."""
from __future__ import annotations
from typing import TYPE_CHECKING, Annotated
from deepagents.backends.utils import validate_path
from langchain.tools import ToolRuntime
from langchain_core.tools import BaseTool, StructuredTool
from langgraph.types import Command
from app.agents.new_chat.filesystem_state import SurfSenseFilesystemState
from ...middleware.async_dispatch import run_async_blocking
from ...middleware.mode import is_cloud
from ...middleware.path_resolution import resolve_relative
from .description import select_description
from .helpers import cloud_rmdir, desktop_rmdir
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
def create_rmdir_tool(mw: "SurfSenseFilesystemMiddleware") -> BaseTool:
description = select_description(mw._filesystem_mode)
async def async_rmdir(
path: Annotated[
str,
"Absolute or relative path of the empty directory to delete.",
],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> Command | str:
if not path or not path.strip():
return "Error: path is required."
target = resolve_relative(mw, path, runtime)
try:
validated = validate_path(target)
except ValueError as exc:
return f"Error: {exc}"
if is_cloud(mw._filesystem_mode):
return await cloud_rmdir(mw, runtime, validated)
return await desktop_rmdir(mw, runtime, validated)
def sync_rmdir(
path: Annotated[
str,
"Absolute or relative path of the empty directory to delete.",
],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> Command | str:
return run_async_blocking(async_rmdir(path, runtime))
return StructuredTool.from_function(
name="rmdir",
description=description,
func=sync_rmdir,
coroutine=async_rmdir,
)

View file

@ -0,0 +1,7 @@
"""Tool: ``write_file`` — create or overwrite a text file."""
from __future__ import annotations
from .index import create_write_file_tool
__all__ = ["create_write_file_tool"]

View file

@ -0,0 +1,35 @@
"""Mode-specific description strings for ``write_file``."""
from __future__ import annotations
from app.agents.new_chat.filesystem_selection import FilesystemMode
_CLOUD_DESCRIPTION = """Writes a new text file to the workspace.
Usage:
- Files written under `/documents/<...>` are persisted as Documents at end
of turn.
- Use a `temp_` filename prefix (e.g. `temp_plan.md` or `/documents/temp_x.md`)
for scratch/working files; they are automatically discarded at end of turn.
- Writes outside `/documents/` are rejected unless the basename starts with
`temp_`.
- Supported outputs include common LLM-friendly text formats like markdown,
json, yaml, csv, xml, html, css, sql, and code files.
- Avoid placeholders; produce concrete and useful text.
"""
_DESKTOP_DESCRIPTION = """Writes a text file to disk.
Usage:
- Use mount-prefixed absolute paths like `/<mount>/sub/file.ext`.
- Writes hit disk immediately. There is no end-of-turn staging.
- Supported outputs include common LLM-friendly text formats like markdown,
json, yaml, csv, xml, html, css, sql, and code files.
- Avoid placeholders; produce concrete and useful text.
"""
def select_description(mode: FilesystemMode) -> str:
if mode == FilesystemMode.CLOUD:
return _CLOUD_DESCRIPTION
return _DESKTOP_DESCRIPTION

View file

@ -0,0 +1,85 @@
"""``write_file`` factory: resolve target, enforce cloud namespace, dispatch to backend."""
from __future__ import annotations
from typing import TYPE_CHECKING, Annotated, Any
from deepagents.backends.protocol import WriteResult
from deepagents.backends.utils import create_file_data, validate_path
from langchain.tools import ToolRuntime
from langchain_core.messages import ToolMessage
from langchain_core.tools import BaseTool, StructuredTool
from langgraph.types import Command
from app.agents.new_chat.filesystem_state import SurfSenseFilesystemState
from ...middleware.async_dispatch import run_async_blocking
from ...middleware.mode import is_cloud
from ...middleware.namespace_policy import check_cloud_write_namespace
from ...middleware.path_resolution import resolve_write_target_path
from .description import select_description
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
def create_write_file_tool(mw: "SurfSenseFilesystemMiddleware") -> BaseTool:
description = select_description(mw._filesystem_mode)
async def async_write_file(
file_path: Annotated[
str,
"Absolute path where the file should be created. Relative paths resolve against the current cwd.",
],
content: Annotated[str, "Text content to write to the file."],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> Command | str:
target = resolve_write_target_path(mw, file_path, runtime)
try:
validated = validate_path(target)
except ValueError as exc:
return f"Error: {exc}"
namespace_error = check_cloud_write_namespace(mw, validated, runtime)
if namespace_error:
return namespace_error
backend = mw._get_backend(runtime)
res: WriteResult = await backend.awrite(validated, content)
if res.error:
return res.error
path = res.path or validated
files_update = res.files_update or {path: create_file_data(content)}
update: dict[str, Any] = {
"files": files_update,
"messages": [
ToolMessage(
content=f"Updated file {path}",
tool_call_id=runtime.tool_call_id,
)
],
}
if is_cloud(mw._filesystem_mode):
update["dirty_paths"] = [path]
update["dirty_path_tool_calls"] = {path: runtime.tool_call_id}
return Command(update=update)
def sync_write_file(
file_path: Annotated[
str,
"Absolute path where the file should be created. Relative paths resolve against the current cwd.",
],
content: Annotated[str, "Text content to write to the file."],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> Command | str:
return run_async_blocking(
async_write_file(file_path, content, runtime)
)
return StructuredTool.from_function(
name="write_file",
description=description,
func=sync_write_file,
coroutine=async_write_file,
)