refactor(agents): colocate middleware into vertical slices

Eliminate the top-level multi_agent_chat/middleware/ package so each slice
owns its middleware (vertical-slice colocation):

- middleware/shared/   -> shared/middleware/        (cross-slice middleware)
- middleware/subagent/ -> subagents/shared/middleware/ (subagent stack)
- main_agent/middleware/ already colocated in Slice A

The moved shared/ subtree is internally consistent (all relative imports
stay within it), so only external absolute refs were rewritten. The
subagent stack's ..shared.* relatives were promoted to absolute paths to
the new shared/middleware/ location.

multi_agent_chat/ root is now: main_agent/, shared/, subagents/.
Verified: 2430 unit tests pass, 1 skipped (baseline unchanged).
This commit is contained in:
CREDO23 2026-06-04 18:13:47 +02:00
parent 9c845d562e
commit add9e14694
117 changed files with 49 additions and 45 deletions

View file

@ -0,0 +1,9 @@
"""Anthropic prompt caching annotations on system/tool/message blocks."""
from __future__ import annotations
from langchain_anthropic.middleware import AnthropicPromptCachingMiddleware
def build_anthropic_cache_mw() -> AnthropicPromptCachingMiddleware:
return AnthropicPromptCachingMiddleware(unsupported_model_behavior="ignore")

View file

@ -0,0 +1,14 @@
"""Context-window summarization with SurfSense protected sections."""
from __future__ import annotations
from typing import Any
from deepagents.backends import StateBackend
from langchain_core.language_models import BaseChatModel
from app.agents.shared.middleware import create_surfsense_compaction_middleware
def build_compaction_mw(llm: BaseChatModel) -> Any:
return create_surfsense_compaction_middleware(llm, StateBackend)

View file

@ -0,0 +1,11 @@
"""SurfSense filesystem middleware (multi-agent flavour)."""
from __future__ import annotations
from .index import build_filesystem_mw
from .middleware import SurfSenseFilesystemMiddleware
__all__ = [
"SurfSenseFilesystemMiddleware",
"build_filesystem_mw",
]

View file

@ -0,0 +1,28 @@
"""Public composition factory for the filesystem middleware."""
from __future__ import annotations
from typing import Any
from app.agents.shared.filesystem_selection import FilesystemMode
from .middleware import SurfSenseFilesystemMiddleware
def build_filesystem_mw(
*,
backend_resolver: Any,
filesystem_mode: FilesystemMode,
search_space_id: int,
user_id: str | None,
thread_id: int | None,
read_only: bool = False,
) -> SurfSenseFilesystemMiddleware:
return SurfSenseFilesystemMiddleware(
backend=backend_resolver,
filesystem_mode=filesystem_mode,
search_space_id=search_space_id,
created_by_id=user_id,
thread_id=thread_id,
read_only=read_only,
)

View file

@ -0,0 +1,33 @@
"""SurfSense filesystem middleware: class + focused-responsibility helpers."""
from __future__ import annotations
from .index import (
SurfSenseFilesystemMiddleware,
check_cloud_write_namespace,
current_cwd,
default_cwd,
get_contract_suggested_path,
is_cloud,
normalize_local_mount_path,
resolve_list_target_path,
resolve_move_target_path,
resolve_relative,
resolve_write_target_path,
run_async_blocking,
)
__all__ = [
"SurfSenseFilesystemMiddleware",
"check_cloud_write_namespace",
"current_cwd",
"default_cwd",
"get_contract_suggested_path",
"is_cloud",
"normalize_local_mount_path",
"resolve_list_target_path",
"resolve_move_target_path",
"resolve_relative",
"resolve_write_target_path",
"run_async_blocking",
]

View file

@ -0,0 +1,22 @@
"""Sync/async dispatcher: drive an async tool body from a sync entry-point."""
from __future__ import annotations
import asyncio
from typing import Any
def run_async_blocking(coro: Any) -> Any:
"""Run ``coro`` to completion, blocking the current thread.
Returns an error string instead of raising if the current thread is
already inside a running event loop keeps sync tool entry-points
safe to call from any context.
"""
try:
loop = asyncio.get_running_loop()
if loop.is_running():
return "Error: sync filesystem operation not supported inside an active event loop."
except RuntimeError:
pass
return asyncio.run(coro)

View file

@ -0,0 +1,32 @@
"""Public surface of the middleware package: class + helpers used by tool factories."""
from __future__ import annotations
from .async_dispatch import run_async_blocking
from .middleware import SurfSenseFilesystemMiddleware
from .mode import default_cwd, is_cloud
from .namespace_policy import check_cloud_write_namespace
from .path_resolution import (
current_cwd,
get_contract_suggested_path,
normalize_local_mount_path,
resolve_list_target_path,
resolve_move_target_path,
resolve_relative,
resolve_write_target_path,
)
__all__ = [
"SurfSenseFilesystemMiddleware",
"check_cloud_write_namespace",
"current_cwd",
"default_cwd",
"get_contract_suggested_path",
"is_cloud",
"normalize_local_mount_path",
"resolve_list_target_path",
"resolve_move_target_path",
"resolve_relative",
"resolve_write_target_path",
"run_async_blocking",
]

View file

@ -0,0 +1,105 @@
"""``SurfSenseFilesystemMiddleware``: per-session state + tool registration."""
from __future__ import annotations
from typing import Any
from deepagents import FilesystemMiddleware
from langchain_core.tools import BaseTool
from app.agents.shared.filesystem_selection import FilesystemMode
from app.agents.shared.filesystem_state import SurfSenseFilesystemState
from app.agents.shared.sandbox import is_sandbox_enabled
from ..system_prompt import build_system_prompt
from ..tools import (
create_cd_tool,
create_edit_file_tool,
create_execute_code_tool,
create_list_tree_tool,
create_ls_tool,
create_mkdir_tool,
create_move_file_tool,
create_pwd_tool,
create_read_file_tool,
create_rm_tool,
create_rmdir_tool,
create_write_file_tool,
)
from ..tools.glob.description import select_description as glob_description
from ..tools.grep.description import select_description as grep_description
from .read_only_policy import READ_ONLY_TOOL_NAMES
class SurfSenseFilesystemMiddleware(FilesystemMiddleware):
"""SurfSense-specific filesystem middleware (cloud + desktop)."""
state_schema = SurfSenseFilesystemState
def __init__(
self,
*,
backend: Any = None,
filesystem_mode: FilesystemMode = FilesystemMode.CLOUD,
search_space_id: int | None = None,
created_by_id: str | None = None,
thread_id: int | str | None = None,
tool_token_limit_before_evict: int | None = 20000,
read_only: bool = False,
) -> None:
self._filesystem_mode = filesystem_mode
self._search_space_id = search_space_id
self._created_by_id = created_by_id
self._thread_id = thread_id
self._read_only = read_only
self._sandbox_available = (
is_sandbox_enabled() and thread_id is not None and not read_only
)
system_prompt = build_system_prompt(
filesystem_mode,
sandbox_available=self._sandbox_available,
)
super().__init__(
backend=backend,
system_prompt=system_prompt,
tool_token_limit_before_evict=tool_token_limit_before_evict,
)
self.tools = [t for t in self.tools if t.name != "execute"]
self.tools.append(create_mkdir_tool(self))
self.tools.append(create_cd_tool(self))
self.tools.append(create_pwd_tool(self))
self.tools.append(create_move_file_tool(self))
self.tools.append(create_rm_tool(self))
self.tools.append(create_rmdir_tool(self))
self.tools.append(create_list_tree_tool(self))
if self._sandbox_available:
self.tools.append(create_execute_code_tool(self))
if read_only:
self.tools = [t for t in self.tools if t.name in READ_ONLY_TOOL_NAMES]
# ----------------------------------------- base-class tool overrides
def _create_ls_tool(self) -> BaseTool:
return create_ls_tool(self)
def _create_read_file_tool(self) -> BaseTool:
return create_read_file_tool(self)
def _create_write_file_tool(self) -> BaseTool:
return create_write_file_tool(self)
def _create_edit_file_tool(self) -> BaseTool:
return create_edit_file_tool(self)
def _create_glob_tool(self) -> BaseTool:
tool = super()._create_glob_tool()
tool.description = glob_description(self._filesystem_mode).rstrip()
return tool
def _create_grep_tool(self) -> BaseTool:
tool = super()._create_grep_tool()
tool.description = grep_description(self._filesystem_mode).rstrip()
return tool

View file

@ -0,0 +1,15 @@
"""Mode-derived facts: ``is_cloud`` and ``default_cwd``."""
from __future__ import annotations
from app.agents.shared.filesystem_selection import FilesystemMode
from app.agents.shared.path_resolver import DOCUMENTS_ROOT
def is_cloud(mode: FilesystemMode) -> bool:
return mode == FilesystemMode.CLOUD
def default_cwd(mode: FilesystemMode) -> str:
"""``/documents`` on cloud; ``/`` on desktop (mounts are children of ``/``)."""
return DOCUMENTS_ROOT if is_cloud(mode) else "/"

View file

@ -0,0 +1,51 @@
"""Cloud-only write namespace policy.
A write is allowed iff it lands under ``/documents/`` OR its basename uses
the ``temp_`` scratch prefix. The anonymous uploaded document is read-only
even when its path is under ``/documents/``.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from langchain.tools import ToolRuntime
from app.agents.shared.filesystem_state import SurfSenseFilesystemState
from app.agents.shared.path_resolver import DOCUMENTS_ROOT
from ..shared.paths import TEMP_PREFIX, basename
from .mode import is_cloud
if TYPE_CHECKING:
from .middleware import SurfSenseFilesystemMiddleware
def check_cloud_write_namespace(
mw: SurfSenseFilesystemMiddleware,
path: str,
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> str | None:
"""Return an error string if cloud writes to ``path`` are not allowed.
Order matters:
1. Reject writes to the anonymous read-only doc.
2. Allow ``/documents/*``.
3. Allow ``temp_*`` basename anywhere.
4. Reject everything else.
"""
if not is_cloud(mw._filesystem_mode):
return None
anon = runtime.state.get("kb_anon_doc") or {}
if isinstance(anon, dict):
anon_path = str(anon.get("path") or "")
if anon_path and anon_path == path:
return "Error: the anonymous uploaded document is read-only."
if path.startswith(DOCUMENTS_ROOT + "/") or path == DOCUMENTS_ROOT:
return None
if basename(path).startswith(TEMP_PREFIX):
return None
return (
"Error: cloud writes must target /documents/<...> or use a 'temp_' "
f"basename for scratch (got '{path}')."
)

View file

@ -0,0 +1,174 @@
"""Resolve user-supplied paths to absolute paths the backends accept."""
from __future__ import annotations
import posixpath
from typing import TYPE_CHECKING
from langchain.tools import ToolRuntime
from app.agents.shared.filesystem_selection import FilesystemMode
from app.agents.shared.filesystem_state import SurfSenseFilesystemState
from app.agents.shared.middleware.multi_root_local_folder_backend import (
MultiRootLocalFolderBackend,
)
from ..shared.paths import (
extract_mount_from_path,
local_parent_path,
normalize_absolute_path,
)
from .mode import default_cwd
if TYPE_CHECKING:
from .middleware import SurfSenseFilesystemMiddleware
def current_cwd(
mw: SurfSenseFilesystemMiddleware,
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> str:
cwd = runtime.state.get("cwd") if hasattr(runtime, "state") else None
if isinstance(cwd, str) and cwd.startswith("/"):
return cwd
return default_cwd(mw._filesystem_mode)
def get_contract_suggested_path(
mw: SurfSenseFilesystemMiddleware,
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> str:
"""Read the planner's suggested write path; otherwise default to ``notes.md``."""
contract = runtime.state.get("file_operation_contract") or {}
suggested = contract.get("suggested_path")
if isinstance(suggested, str) and suggested.strip():
return normalize_absolute_path(suggested)
return default_cwd(mw._filesystem_mode).rstrip("/") + "/notes.md"
def resolve_relative(
mw: SurfSenseFilesystemMiddleware,
path: str,
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> str:
"""Resolve ``path`` against cwd (no-op if already absolute)."""
candidate = path.strip()
if not candidate:
return current_cwd(mw, runtime)
if candidate.startswith("/"):
return normalize_absolute_path(candidate)
cwd = current_cwd(mw, runtime)
joined = posixpath.normpath(posixpath.join(cwd, candidate))
return normalize_absolute_path(joined)
def resolve_write_target_path(
mw: SurfSenseFilesystemMiddleware,
file_path: str,
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> str:
"""Empty → contract suggestion; desktop → mount-prefix; cloud → cwd-relative."""
candidate = file_path.strip()
if not candidate:
return get_contract_suggested_path(mw, runtime)
if mw._filesystem_mode == FilesystemMode.DESKTOP_LOCAL_FOLDER:
return normalize_local_mount_path(mw, candidate, runtime)
return resolve_relative(mw, candidate, runtime)
def resolve_move_target_path(
mw: SurfSenseFilesystemMiddleware,
file_path: str,
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> str:
"""Empty → empty (caller validates); desktop → mount-prefix; cloud → cwd-relative."""
candidate = file_path.strip()
if not candidate:
return ""
if mw._filesystem_mode == FilesystemMode.DESKTOP_LOCAL_FOLDER:
return normalize_local_mount_path(mw, candidate, runtime)
return resolve_relative(mw, candidate, runtime)
def resolve_list_target_path(
mw: SurfSenseFilesystemMiddleware,
path: str,
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> str:
"""Root stays root; desktop → mount-prefix; cloud → cwd-relative."""
candidate = path.strip() or current_cwd(mw, runtime)
if candidate == "/":
return "/"
if mw._filesystem_mode == FilesystemMode.DESKTOP_LOCAL_FOLDER:
return normalize_local_mount_path(mw, candidate, runtime)
return resolve_relative(mw, candidate, runtime)
def normalize_local_mount_path(
mw: SurfSenseFilesystemMiddleware,
candidate: str,
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> str:
"""Desktop only: prepend a mount prefix when the path doesn't already have one.
Resolution order: explicit mount prefix single available mount
contract-suggested mount mount where the path exists mount where the
parent exists backend default mount.
"""
normalized = normalize_absolute_path(candidate)
backend = mw._get_backend(runtime)
if not isinstance(backend, MultiRootLocalFolderBackend):
return normalized
mounts = backend.list_mounts()
explicit_mount = extract_mount_from_path(normalized, mounts)
if explicit_mount:
return normalized
if len(mounts) == 1:
return f"/{mounts[0]}{normalized}"
suggested_mount: str | None = None
contract = runtime.state.get("file_operation_contract") or {}
suggested_path = contract.get("suggested_path")
if isinstance(suggested_path, str) and suggested_path.strip():
normalized_suggested = normalize_absolute_path(suggested_path)
suggested_mount = extract_mount_from_path(normalized_suggested, mounts)
matching_mounts = [
mount
for mount in mounts
if _path_exists_under_mount(backend, mount, normalized)
]
if len(matching_mounts) == 1:
return f"/{matching_mounts[0]}{normalized}"
parent_path = local_parent_path(normalized)
if parent_path != "/":
parent_matching_mounts = [
mount
for mount in mounts
if _path_exists_under_mount(backend, mount, parent_path)
]
if len(parent_matching_mounts) == 1:
return f"/{parent_matching_mounts[0]}{normalized}"
if suggested_mount:
return f"/{suggested_mount}{normalized}"
return f"/{backend.default_mount()}{normalized}"
def _path_exists_under_mount(
backend: MultiRootLocalFolderBackend,
mount: str,
local_path: str,
) -> bool:
result = backend.list_tree(
f"/{mount}{local_path}",
max_depth=0,
page_size=1,
include_files=True,
include_dirs=True,
)
return not bool(result.get("error"))

View file

@ -0,0 +1,7 @@
"""Allowlist consulted by ``SurfSenseFilesystemMiddleware`` when ``read_only=True``."""
from __future__ import annotations
READ_ONLY_TOOL_NAMES = frozenset(
{"ls", "read_file", "glob", "grep", "list_tree", "pwd", "cd"}
)

View file

@ -0,0 +1,21 @@
"""Stateless utilities shared by the middleware and tool factories."""
from __future__ import annotations
from .paths import (
TEMP_PREFIX,
basename,
extract_mount_from_path,
is_ancestor_of,
local_parent_path,
normalize_absolute_path,
)
__all__ = [
"TEMP_PREFIX",
"basename",
"extract_mount_from_path",
"is_ancestor_of",
"local_parent_path",
"normalize_absolute_path",
]

View file

@ -0,0 +1,51 @@
"""Stateless path utilities shared by the middleware class and tool factories."""
from __future__ import annotations
import re
TEMP_PREFIX = "temp_"
def normalize_absolute_path(candidate: str) -> str:
"""Collapse slashes / backslashes and force an absolute path."""
normalized = re.sub(r"/+", "/", candidate.strip().replace("\\", "/"))
if not normalized:
return "/"
if normalized.startswith("/"):
return normalized
return f"/{normalized.lstrip('/')}"
def extract_mount_from_path(path: str, mounts: tuple[str, ...]) -> str | None:
"""Return the leading mount segment if it's in ``mounts``, else None."""
rel = path.lstrip("/")
if not rel:
return None
mount, _, _ = rel.partition("/")
if mount in mounts:
return mount
return None
def local_parent_path(path: str) -> str:
"""Posix-style parent path (root = ``/``)."""
rel = path.lstrip("/")
if "/" not in rel:
return "/"
parent = rel.rsplit("/", 1)[0].strip("/")
if not parent:
return "/"
return f"/{parent}"
def basename(path: str) -> str:
return path.rsplit("/", 1)[-1]
def is_ancestor_of(candidate: str, target: str) -> bool:
"""True iff ``candidate`` is a strict-or-equal ancestor of ``target``."""
if candidate == "/":
return target != "/"
cand = candidate.rstrip("/")
return target == cand or target.startswith(cand + "/")

View file

@ -0,0 +1,7 @@
"""Filesystem-middleware system prompt (cloud + desktop modes)."""
from __future__ import annotations
from .index import build_system_prompt
__all__ = ["build_system_prompt"]

View file

@ -0,0 +1,71 @@
"""Cloud-mode filesystem system prompt body."""
from __future__ import annotations
BODY = """
## Filesystem Tools
All file paths must start with `/`. Relative paths resolve against the
current working directory (`cwd`, default `/documents`).
- ls(path, offset=0, limit=200): list files and directories at the given path.
- read_file(path, offset, limit): read a file (paginated) from the filesystem.
- write_file(path, content): create a new text file in the workspace.
- edit_file(path, old, new): exact string-replacement edit (lazy-loads KB
documents on first edit).
- glob(pattern, path): find files matching a glob pattern.
- grep(pattern, path, glob): substring search across files.
- mkdir(path): create a folder under `/documents/` (committed at end of turn).
- cd(path): change the current working directory.
- pwd(): print the current working directory.
- move_file(source, dest): move/rename a file under `/documents/`.
- rm(path): delete a single file under `/documents/` (no `-r`).
- rmdir(path): delete an empty directory under `/documents/`.
- list_tree(path, max_depth, page_size): recursively list files/folders.
## Persistence Rules
- Files written under `/documents/<...>` are **persisted** at end of turn as
Documents in the user's knowledge base.
- Files whose **basename** starts with `temp_` (e.g. `temp_plan.md` or
`/documents/temp_scratch.md`) are **discarded** at end of turn use this
prefix for any scratch/working content you do NOT want saved.
- All other paths (outside `/documents/` and not `temp_*`) are rejected.
- mkdir/move_file/rm/rmdir are staged this turn and committed at end of
turn alongside any new/edited documents. Snapshot/revert is enabled
for every destructive operation when action logging is on.
## Reading Documents Efficiently
Documents are formatted as XML. Each document contains:
- `<document_metadata>` title, type, URL, etc.
- `<chunk_index>` a table of every chunk with its **line range** and a
`matched="true"` flag for chunks that matched the search query.
- `<document_content>` the actual chunks in original document order.
**Workflow**: when reading a large document, read the first ~20 lines to see
the `<chunk_index>`, identify chunks marked `matched="true"`, then use
`read_file(path, offset=<start_line>, limit=<lines>)` to jump directly to
those sections instead of reading the entire file sequentially.
Use `<chunk id='...'>` values as citation IDs in your answers.
## Priority List
You receive a `<priority_documents>` system message each turn listing the
top-K paths most relevant to the user's query (by hybrid search). Read those
first matched sections are flagged inside each document's `<chunk_index>`.
## Workspace Tree
You receive a `<workspace_tree>` system message each turn with the current
folder/document layout. The tree may be truncated past a hard cap; in that
case, drill into specific folders with `ls(...)` or `list_tree(...)`.
## grep Line Numbers
`grep` searches across both your in-memory edits and the indexed chunks in
Postgres. State-cached files return real line numbers; database hits return
`line=0` because their position depends on per-document XML layout call
`read_file(path)` to find the exact line.
"""

View file

@ -0,0 +1,22 @@
"""Mode-agnostic prompt fragments: header conventions + sandbox addendum."""
from __future__ import annotations
HEADER = """## Following Conventions
- Read files before editing understand existing content before making changes.
- Mimic existing style, naming conventions, and patterns.
- Never claim a file was created/updated unless filesystem tool output confirms success.
- If a file write/edit fails, explicitly report the failure.
"""
SANDBOX_ADDENDUM = (
"\n- execute_code: run Python code in an isolated sandbox."
"\n\n## Code Execution"
"\n\nUse execute_code whenever a task benefits from running code."
" Never perform arithmetic manually."
"\n\nDocuments here are XML-wrapped markdown, not raw data files."
" To work with them programmatically, read the document first,"
" extract the data, write it as a clean file (CSV, JSON, etc.),"
" and then run your code against it."
)

View file

@ -0,0 +1,49 @@
"""Desktop-mode filesystem system prompt body."""
from __future__ import annotations
BODY = """
## Local Folder Mode
This chat operates directly on the user's local folders. Writes and edits
hit disk immediately there is no end-of-turn staging, no `/documents/`
namespace, and no `temp_` semantics.
## Filesystem Tools
All file paths must start with `/` and use mount-prefixed absolute paths
like `/<mount>/file.ext`. Relative paths resolve against the current working
directory (`cwd`).
- ls(path, offset=0, limit=200): list files and directories at the given path.
- read_file(path, offset, limit): read a file (paginated) from disk.
- write_file(path, content): write a file to disk.
- edit_file(path, old, new): exact string-replacement edit on disk.
- glob(pattern, path): find files matching a glob pattern.
- grep(pattern, path, glob): substring search across files.
- mkdir(path): create a directory on disk.
- cd(path): change the current working directory.
- pwd(): print the current working directory.
- move_file(source, dest): move/rename a file.
- rm(path): delete a single file from disk (no `-r`). NOT reversible.
- rmdir(path): delete an empty directory from disk. NOT reversible.
- list_tree(path, max_depth, page_size): recursively list files/folders.
## Workflow Tips
- If you are unsure which mounts are available, call `ls('/')` first.
- For large trees, prefer `list_tree` then `grep` then `read_file` over
brute-force directory traversal.
- Cross-mount moves are not supported.
- Desktop deletes hit disk immediately and cannot be undone via the
agent's revert flow — confirm before calling `rm`/`rmdir`.
## Priority List
You may receive a `<priority_documents>` system message listing the top-K
documents from the user's SurfSense knowledge base — these are cloud-ingested
via connectors (Notion, Slack, etc.), not local files. Treat it as a hint:
consult it when the task spans both local and cloud sources (e.g. drafting a
local note from a Notion summary); skip when the task is purely about local
files.
"""

View file

@ -0,0 +1,18 @@
"""Public assembly of the FS system prompt for a given session."""
from __future__ import annotations
from app.agents.shared.filesystem_selection import FilesystemMode
from .cloud import BODY as CLOUD_BODY
from .common import HEADER, SANDBOX_ADDENDUM
from .desktop import BODY as DESKTOP_BODY
def build_system_prompt(mode: FilesystemMode, *, sandbox_available: bool) -> str:
"""Assemble the FS prompt: common header + mode body + optional sandbox section."""
body = CLOUD_BODY if mode == FilesystemMode.CLOUD else DESKTOP_BODY
base = HEADER + body
if sandbox_available:
base += SANDBOX_ADDENDUM
return base

View file

@ -0,0 +1,31 @@
"""Filesystem tool factories — one vertical slice per tool."""
from __future__ import annotations
from .cd import create_cd_tool
from .edit_file import create_edit_file_tool
from .execute_code import create_execute_code_tool
from .list_tree import create_list_tree_tool
from .ls import create_ls_tool
from .mkdir import create_mkdir_tool
from .move_file import create_move_file_tool
from .pwd import create_pwd_tool
from .read_file import create_read_file_tool
from .rm import create_rm_tool
from .rmdir import create_rmdir_tool
from .write_file import create_write_file_tool
__all__ = [
"create_cd_tool",
"create_edit_file_tool",
"create_execute_code_tool",
"create_list_tree_tool",
"create_ls_tool",
"create_mkdir_tool",
"create_move_file_tool",
"create_pwd_tool",
"create_read_file_tool",
"create_rm_tool",
"create_rmdir_tool",
"create_write_file_tool",
]

View file

@ -0,0 +1,7 @@
"""Tool: ``cd`` — change the current working directory (cwd)."""
from __future__ import annotations
from .index import create_cd_tool
__all__ = ["create_cd_tool"]

View file

@ -0,0 +1,19 @@
"""Description string for ``cd`` (mode-agnostic)."""
from __future__ import annotations
from app.agents.shared.filesystem_selection import FilesystemMode
_DESCRIPTION = """Changes the current working directory (cwd).
Args:
- path: absolute or relative directory path. Relative paths resolve against
the current cwd.
The new cwd is used by other filesystem tools whenever a relative path is
given. Returns the resolved cwd.
"""
def select_description(mode: FilesystemMode) -> str:
return _DESCRIPTION

View file

@ -0,0 +1,80 @@
"""``cd`` factory: resolve target, verify existence (staged + on-disk), update cwd."""
from __future__ import annotations
from typing import TYPE_CHECKING, Annotated
from deepagents.backends.utils import validate_path
from langchain.tools import ToolRuntime
from langchain_core.messages import ToolMessage
from langchain_core.tools import BaseTool, StructuredTool
from langgraph.types import Command
from app.agents.shared.filesystem_state import SurfSenseFilesystemState
from app.agents.shared.path_resolver import DOCUMENTS_ROOT
from ...middleware.async_dispatch import run_async_blocking
from ...middleware.path_resolution import resolve_relative
from .description import select_description
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
def create_cd_tool(mw: SurfSenseFilesystemMiddleware) -> BaseTool:
description = select_description(mw._filesystem_mode)
async def async_cd(
path: Annotated[str, "Absolute or relative directory path to switch into."],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> Command | str:
target = resolve_relative(mw, path, runtime)
try:
validated = validate_path(target)
except ValueError as exc:
return f"Error: {exc}"
backend = mw._get_backend(runtime)
try:
infos = await backend.als_info(validated)
except Exception as exc: # pragma: no cover - defensive
return f"Error: {exc}"
staged_dirs = list(runtime.state.get("staged_dirs") or [])
files = runtime.state.get("files") or {}
cwd_exists = (
bool(infos)
or validated in staged_dirs
or any(p == validated for p in files)
or any(
isinstance(p, str) and p.startswith(validated.rstrip("/") + "/")
for p in files
)
or validated == "/"
or validated == DOCUMENTS_ROOT
)
if not cwd_exists:
return f"Error: directory '{validated}' not found."
return Command(
update={
"cwd": validated,
"messages": [
ToolMessage(
content=f"cwd changed to {validated}",
tool_call_id=runtime.tool_call_id,
)
],
}
)
def sync_cd(
path: Annotated[str, "Absolute or relative directory path to switch into."],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> Command | str:
return run_async_blocking(async_cd(path, runtime))
return StructuredTool.from_function(
name="cd",
description=description,
func=sync_cd,
coroutine=async_cd,
)

View file

@ -0,0 +1,7 @@
"""Tool: ``edit_file`` — exact string replacement on a file."""
from __future__ import annotations
from .index import create_edit_file_tool
__all__ = ["create_edit_file_tool"]

View file

@ -0,0 +1,28 @@
"""Mode-specific description strings for ``edit_file``."""
from __future__ import annotations
from app.agents.shared.filesystem_selection import FilesystemMode
_CLOUD_DESCRIPTION = """Performs exact string replacements in files.
IMPORTANT:
- Read the file before editing.
- Preserve exact indentation and formatting.
- Edits to documents under `/documents/` are persisted at end of turn.
- Edits to `temp_*` files are discarded at end of turn.
"""
_DESKTOP_DESCRIPTION = """Performs exact string replacements in files on disk.
IMPORTANT:
- Read the file before editing.
- Preserve exact indentation and formatting.
- Edits hit disk immediately.
"""
def select_description(mode: FilesystemMode) -> str:
if mode == FilesystemMode.CLOUD:
return _CLOUD_DESCRIPTION
return _DESKTOP_DESCRIPTION

View file

@ -0,0 +1,132 @@
"""``edit_file`` factory: lazy-load KB doc, enforce cloud namespace, dispatch to backend."""
from __future__ import annotations
from typing import TYPE_CHECKING, Annotated, Any
from deepagents.backends.protocol import EditResult
from deepagents.backends.utils import validate_path
from langchain.tools import ToolRuntime
from langchain_core.messages import ToolMessage
from langchain_core.tools import BaseTool, StructuredTool
from langgraph.types import Command
from app.agents.shared.filesystem_state import SurfSenseFilesystemState
from app.agents.shared.middleware.kb_postgres_backend import KBPostgresBackend
from ...middleware.async_dispatch import run_async_blocking
from ...middleware.mode import is_cloud
from ...middleware.namespace_policy import check_cloud_write_namespace
from ...middleware.path_resolution import resolve_relative
from .description import select_description
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
def create_edit_file_tool(mw: SurfSenseFilesystemMiddleware) -> BaseTool:
description = select_description(mw._filesystem_mode)
async def async_edit_file(
file_path: Annotated[
str,
"Absolute path to the file to edit. Relative paths resolve against the current cwd.",
],
old_string: Annotated[
str,
"Exact text to replace. Must be unique unless replace_all is True.",
],
new_string: Annotated[
str,
"Replacement text. Must differ from old_string.",
],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
*,
replace_all: Annotated[
bool,
"If True, replace all occurrences of old_string. Defaults to False.",
] = False,
) -> Command | str:
target = resolve_relative(mw, file_path, runtime)
try:
validated = validate_path(target)
except ValueError as exc:
return f"Error: {exc}"
namespace_error = check_cloud_write_namespace(mw, validated, runtime)
if namespace_error:
return namespace_error
backend = mw._get_backend(runtime)
files_state = runtime.state.get("files") or {}
doc_id_to_attach: int | None = None
if (
is_cloud(mw._filesystem_mode)
and validated not in files_state
and isinstance(backend, KBPostgresBackend)
):
loaded = await backend._load_file_data(validated)
if loaded is None:
return f"Error: File '{validated}' not found"
_, doc_id_to_attach = loaded
res: EditResult = await backend.aedit(
validated, old_string, new_string, replace_all=replace_all
)
if res.error:
return res.error
path = res.path or validated
files_update = res.files_update or {}
update: dict[str, Any] = {
"files": files_update,
"messages": [
ToolMessage(
content=(
f"Successfully replaced {res.occurrences} instance(s) "
f"of the string in '{path}'"
),
tool_call_id=runtime.tool_call_id,
)
],
}
if is_cloud(mw._filesystem_mode):
update["dirty_paths"] = [path]
update["dirty_path_tool_calls"] = {path: runtime.tool_call_id}
if doc_id_to_attach is not None:
update["doc_id_by_path"] = {path: doc_id_to_attach}
return Command(update=update)
def sync_edit_file(
file_path: Annotated[
str,
"Absolute path to the file to edit. Relative paths resolve against the current cwd.",
],
old_string: Annotated[
str,
"Exact text to replace. Must be unique unless replace_all is True.",
],
new_string: Annotated[
str,
"Replacement text. Must differ from old_string.",
],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
*,
replace_all: Annotated[
bool,
"If True, replace all occurrences of old_string. Defaults to False.",
] = False,
) -> Command | str:
return run_async_blocking(
async_edit_file(
file_path, old_string, new_string, runtime, replace_all=replace_all
)
)
return StructuredTool.from_function(
name="edit_file",
description=description,
func=sync_edit_file,
coroutine=async_edit_file,
)

View file

@ -0,0 +1,7 @@
"""Tool: ``execute_code`` — run Python code in an isolated sandbox."""
from __future__ import annotations
from .index import create_execute_code_tool
__all__ = ["create_execute_code_tool"]

View file

@ -0,0 +1,21 @@
"""Description string for ``execute_code`` (mode-agnostic)."""
from __future__ import annotations
from app.agents.shared.filesystem_selection import FilesystemMode
_DESCRIPTION = """Executes Python code in an isolated sandbox environment.
Common data-science packages are pre-installed (pandas, numpy, matplotlib,
scipy, scikit-learn).
Usage notes:
- No outbound network access.
- Returns combined stdout/stderr with exit code.
- Use print() to produce output.
- Use the optional timeout parameter to override the default timeout.
"""
def select_description(mode: FilesystemMode) -> str:
return _DESCRIPTION

View file

@ -0,0 +1,87 @@
"""Sandbox-execution helpers for ``execute_code``.
Wraps user-supplied code in a heredoc and dispatches it to the Daytona
sandbox associated with the current chat thread, with a single retry on
sandbox failure.
"""
from __future__ import annotations
import logging
import secrets
from typing import TYPE_CHECKING
from daytona.common.errors import DaytonaError
from langchain.tools import ToolRuntime
from app.agents.shared.filesystem_state import SurfSenseFilesystemState
from app.agents.shared.sandbox import (
_evict_sandbox_cache,
delete_sandbox,
get_or_create_sandbox,
)
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
logger = logging.getLogger(__name__)
MAX_EXECUTE_TIMEOUT = 300
def wrap_as_python(code: str) -> str:
"""Wrap ``code`` in a unique-sentinel heredoc for shell execution."""
sentinel = f"_PYEOF_{secrets.token_hex(8)}"
return f"python3 << '{sentinel}'\n{code}\n{sentinel}"
async def execute_in_sandbox(
mw: SurfSenseFilesystemMiddleware,
command: str,
runtime: ToolRuntime[None, SurfSenseFilesystemState],
timeout: int | None,
) -> str:
"""Top-level entry: wraps + retries once on sandbox failure."""
assert mw._thread_id is not None
command = wrap_as_python(command)
try:
return await _try_sandbox_execute(mw, command, runtime, timeout)
except (DaytonaError, Exception) as first_err:
logger.warning(
"Sandbox execute failed for thread %s, retrying: %s",
mw._thread_id,
first_err,
)
try:
await delete_sandbox(mw._thread_id)
except Exception:
_evict_sandbox_cache(mw._thread_id)
try:
return await _try_sandbox_execute(mw, command, runtime, timeout)
except Exception:
logger.exception("Sandbox retry also failed for thread %s", mw._thread_id)
return "Error: Code execution is temporarily unavailable. Please try again."
async def _try_sandbox_execute(
mw: SurfSenseFilesystemMiddleware,
command: str,
runtime: ToolRuntime[None, SurfSenseFilesystemState],
timeout: int | None,
) -> str:
"""One sandbox-execute attempt: get/create sandbox, run, format output."""
sandbox, _is_new = await get_or_create_sandbox(mw._thread_id)
result = await sandbox.aexecute(command, timeout=timeout)
output = (result.output or "").strip()
if not output and result.exit_code == 0:
return (
"[Code executed successfully but produced no output. "
"Use print() to display results, then try again.]"
)
parts = [result.output]
if result.exit_code is not None:
status = "succeeded" if result.exit_code == 0 else "failed"
parts.append(f"\n[Command {status} with exit code {result.exit_code}]")
if result.truncated:
parts.append("\n[Output was truncated due to size limits]")
return "".join(parts)

View file

@ -0,0 +1,58 @@
"""``execute_code`` factory: bounds-check timeout, dispatch to the sandbox."""
from __future__ import annotations
from typing import TYPE_CHECKING, Annotated
from langchain.tools import ToolRuntime
from langchain_core.tools import BaseTool, StructuredTool
from app.agents.shared.filesystem_state import SurfSenseFilesystemState
from ...middleware.async_dispatch import run_async_blocking
from .description import select_description
from .helpers import MAX_EXECUTE_TIMEOUT, execute_in_sandbox
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
def create_execute_code_tool(mw: SurfSenseFilesystemMiddleware) -> BaseTool:
description = select_description(mw._filesystem_mode)
def sync_execute_code(
command: Annotated[str, "Python code to execute. Use print() to see output."],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
timeout: Annotated[
int | None,
"Optional timeout in seconds.",
] = None,
) -> str:
if timeout is not None:
if timeout < 0:
return f"Error: timeout must be non-negative, got {timeout}."
if timeout > MAX_EXECUTE_TIMEOUT:
return f"Error: timeout {timeout}s exceeds maximum ({MAX_EXECUTE_TIMEOUT}s)."
return run_async_blocking(execute_in_sandbox(mw, command, runtime, timeout))
async def async_execute_code(
command: Annotated[str, "Python code to execute. Use print() to see output."],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
timeout: Annotated[
int | None,
"Optional timeout in seconds.",
] = None,
) -> str:
if timeout is not None:
if timeout < 0:
return f"Error: timeout must be non-negative, got {timeout}."
if timeout > MAX_EXECUTE_TIMEOUT:
return f"Error: timeout {timeout}s exceeds maximum ({MAX_EXECUTE_TIMEOUT}s)."
return await execute_in_sandbox(mw, command, runtime, timeout)
return StructuredTool.from_function(
name="execute_code",
description=description,
func=sync_execute_code,
coroutine=async_execute_code,
)

View file

@ -0,0 +1,7 @@
"""Tool: ``glob`` — description override (the tool comes from the base middleware)."""
from __future__ import annotations
from .description import select_description
__all__ = ["select_description"]

View file

@ -0,0 +1,15 @@
"""Description string for ``glob`` (mode-agnostic)."""
from __future__ import annotations
from app.agents.shared.filesystem_selection import FilesystemMode
_DESCRIPTION = """Find files matching a glob pattern.
Supports standard glob patterns: `*`, `**`, `?`.
Returns absolute file paths.
"""
def select_description(mode: FilesystemMode) -> str:
return _DESCRIPTION

View file

@ -0,0 +1,7 @@
"""Tool: ``grep`` — description override (the tool comes from the base middleware)."""
from __future__ import annotations
from .description import select_description
__all__ = ["select_description"]

View file

@ -0,0 +1,24 @@
"""Mode-specific description strings for ``grep``."""
from __future__ import annotations
from app.agents.shared.filesystem_selection import FilesystemMode
_CLOUD_DESCRIPTION = """Search for a literal text pattern across files.
Searches both your in-memory edits and the indexed chunks in Postgres.
State-cached file matches include real line numbers; database hits return
`line=0` because their position depends on per-document XML layout call
`read_file(path)` afterwards to find the exact line.
"""
_DESKTOP_DESCRIPTION = """Search for a literal text pattern across files.
Searches files on disk and any in-memory edits. Returns real line numbers.
"""
def select_description(mode: FilesystemMode) -> str:
if mode == FilesystemMode.CLOUD:
return _CLOUD_DESCRIPTION
return _DESKTOP_DESCRIPTION

View file

@ -0,0 +1,7 @@
"""Tool: ``list_tree`` — recursively list files / folders in one bounded call."""
from __future__ import annotations
from .index import create_list_tree_tool
__all__ = ["create_list_tree_tool"]

View file

@ -0,0 +1,37 @@
"""Mode-specific description strings for ``list_tree``."""
from __future__ import annotations
from app.agents.shared.filesystem_selection import FilesystemMode
_CLOUD_DESCRIPTION = """Lists files/folders recursively in a single bounded call.
Args:
- path: absolute path to start from. Defaults to `/documents`.
- max_depth: recursion depth limit (default 8).
- page_size: maximum number of entries returned (max 1000).
- include_files / include_dirs: filter returned entry types.
Returns JSON with:
- entries: [{path, is_dir, size, modified_at, depth}]
- truncated: true when additional entries were omitted due to page_size.
"""
_DESKTOP_DESCRIPTION = """Lists files/folders recursively in a single bounded call.
Args:
- path: absolute path to start from. Defaults to `/`.
- max_depth: recursion depth limit (default 8).
- page_size: maximum number of entries returned (max 1000).
- include_files / include_dirs: filter returned entry types.
Returns JSON with:
- entries: [{path, is_dir, size, modified_at, depth}]
- truncated: true when additional entries were omitted due to page_size.
"""
def select_description(mode: FilesystemMode) -> str:
if mode == FilesystemMode.CLOUD:
return _CLOUD_DESCRIPTION
return _DESKTOP_DESCRIPTION

View file

@ -0,0 +1,101 @@
"""``list_tree`` factory: bounded recursive listing across cloud / desktop backends."""
from __future__ import annotations
import json
from typing import TYPE_CHECKING, Annotated
from deepagents.backends.utils import validate_path
from langchain.tools import ToolRuntime
from langchain_core.tools import BaseTool, StructuredTool
from app.agents.shared.filesystem_state import SurfSenseFilesystemState
from app.agents.shared.middleware.kb_postgres_backend import KBPostgresBackend
from ...middleware.async_dispatch import run_async_blocking
from ...middleware.path_resolution import resolve_list_target_path
from .description import select_description
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
def create_list_tree_tool(mw: SurfSenseFilesystemMiddleware) -> BaseTool:
description = select_description(mw._filesystem_mode)
async def async_list_tree(
runtime: ToolRuntime[None, SurfSenseFilesystemState],
path: Annotated[
str,
"Absolute path to start from. Defaults to /documents in cloud mode.",
] = "",
max_depth: Annotated[int, "Recursion depth limit. Default 8."] = 8,
page_size: Annotated[int, "Maximum entries returned. Max 1000."] = 500,
include_files: Annotated[bool, "Include file entries."] = True,
include_dirs: Annotated[bool, "Include directory entries."] = True,
) -> str:
if max_depth < 0:
return "Error: max_depth must be >= 0."
if page_size < 1:
return "Error: page_size must be >= 1."
if not include_files and not include_dirs:
return "Error: include_files and include_dirs cannot both be false."
target = resolve_list_target_path(mw, path, runtime)
try:
validated = validate_path(target)
except ValueError as exc:
return f"Error: {exc}"
backend = mw._get_backend(runtime)
if isinstance(backend, KBPostgresBackend):
result = await backend.alist_tree_listing(
validated,
max_depth=max_depth,
page_size=page_size,
include_files=include_files,
include_dirs=include_dirs,
)
elif hasattr(backend, "alist_tree"):
result = await backend.alist_tree(
validated,
max_depth=max_depth,
page_size=page_size,
include_files=include_files,
include_dirs=include_dirs,
)
else:
return "Error: list_tree is not supported by the active backend."
if isinstance(result, dict) and isinstance(result.get("error"), str):
return result["error"]
return json.dumps(result, ensure_ascii=True)
def sync_list_tree(
runtime: ToolRuntime[None, SurfSenseFilesystemState],
path: Annotated[
str,
"Absolute path to start from. Defaults to /documents in cloud mode.",
] = "",
max_depth: Annotated[int, "Recursion depth limit. Default 8."] = 8,
page_size: Annotated[int, "Maximum entries returned. Max 1000."] = 500,
include_files: Annotated[bool, "Include file entries."] = True,
include_dirs: Annotated[bool, "Include directory entries."] = True,
) -> str:
return run_async_blocking(
async_list_tree(
runtime,
path=path,
max_depth=max_depth,
page_size=page_size,
include_files=include_files,
include_dirs=include_dirs,
)
)
return StructuredTool.from_function(
name="list_tree",
description=description,
func=sync_list_tree,
coroutine=async_list_tree,
)

View file

@ -0,0 +1,7 @@
"""Tool: ``ls`` — list files and directories at a path."""
from __future__ import annotations
from .index import create_ls_tool
__all__ = ["create_ls_tool"]

View file

@ -0,0 +1,29 @@
"""Mode-specific description strings for ``ls``."""
from __future__ import annotations
from app.agents.shared.filesystem_selection import FilesystemMode
_CLOUD_DESCRIPTION = """Lists files and directories at the given path.
Usage:
- Provide an absolute path under `/documents` (relative paths resolve under
the current cwd, which defaults to `/documents`).
- For very large folders, use `offset` and `limit` to paginate the listing.
- Returns one entry per line; directories end with a trailing `/`.
"""
_DESKTOP_DESCRIPTION = """Lists files and directories at the given path.
Usage:
- Provide an absolute path using a mount prefix (e.g. `/<mount>/sub/dir`).
Use `ls('/')` to discover available mounts.
- For very large folders, use `offset` and `limit` to paginate the listing.
- Returns one entry per line; directories end with a trailing `/`.
"""
def select_description(mode: FilesystemMode) -> str:
if mode == FilesystemMode.CLOUD:
return _CLOUD_DESCRIPTION
return _DESKTOP_DESCRIPTION

View file

@ -0,0 +1,96 @@
"""``ls`` factory: resolve target, page through backend listing."""
from __future__ import annotations
from typing import TYPE_CHECKING, Annotated
from deepagents.backends.utils import validate_path
from langchain.tools import ToolRuntime
from langchain_core.tools import BaseTool, StructuredTool
from app.agents.shared.filesystem_state import SurfSenseFilesystemState
from app.agents.shared.middleware.kb_postgres_backend import paginate_listing
from ...middleware.async_dispatch import run_async_blocking
from ...middleware.path_resolution import resolve_list_target_path
from .description import select_description
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
def create_ls_tool(mw: SurfSenseFilesystemMiddleware) -> BaseTool:
description = select_description(mw._filesystem_mode)
async def async_ls(
runtime: ToolRuntime[None, SurfSenseFilesystemState],
path: Annotated[
str,
"Absolute path to the directory to list. Relative paths resolve against the current cwd.",
] = "",
offset: Annotated[
int,
"Number of entries to skip. Use for paginating large folders. Defaults to 0.",
] = 0,
limit: Annotated[
int,
"Maximum number of entries to return. Defaults to 200.",
] = 200,
) -> str:
target = resolve_list_target_path(mw, path, runtime)
try:
validated = validate_path(target)
except ValueError as exc:
return f"Error: {exc}"
if offset < 0:
offset = 0
if limit < 1:
limit = 1
backend = mw._get_backend(runtime)
infos = await backend.als_info(validated)
page = paginate_listing(infos, offset=offset, limit=limit)
paths = [
f"{fi.get('path', '')}/" if fi.get("is_dir") else fi.get("path", "")
for fi in page
]
total = len(infos)
shown = len(page)
header = (
f"{validated} ({shown} of {total} entries"
f"{f', offset={offset}' if offset else ''})"
)
if not paths:
return f"{header}\n(empty)"
body = "\n".join(paths)
if total > offset + shown:
body += (
f"\n... {total - offset - shown} more — call ls("
f"'{validated}', offset={offset + shown}, limit={limit})"
)
return f"{header}\n{body}"
def sync_ls(
runtime: ToolRuntime[None, SurfSenseFilesystemState],
path: Annotated[
str,
"Absolute path to the directory to list. Relative paths resolve against the current cwd.",
] = "",
offset: Annotated[
int,
"Number of entries to skip. Use for paginating large folders. Defaults to 0.",
] = 0,
limit: Annotated[
int,
"Maximum number of entries to return. Defaults to 200.",
] = 200,
) -> str:
return run_async_blocking(
async_ls(runtime, path=path, offset=offset, limit=limit)
)
return StructuredTool.from_function(
name="ls",
description=description,
func=sync_ls,
coroutine=async_ls,
)

View file

@ -0,0 +1,7 @@
"""Tool: ``mkdir`` — create a directory."""
from __future__ import annotations
from .index import create_mkdir_tool
__all__ = ["create_mkdir_tool"]

View file

@ -0,0 +1,33 @@
"""Mode-specific description strings for ``mkdir``."""
from __future__ import annotations
from app.agents.shared.filesystem_selection import FilesystemMode
_CLOUD_DESCRIPTION = """Creates a directory under `/documents/`.
Stages the folder for end-of-turn commit; the Folder row is inserted only
after the agent's turn finishes successfully.
Args:
- path: absolute path of the new directory (must start with
`/documents/`).
Notes:
- Parent folders are created as needed.
"""
_DESKTOP_DESCRIPTION = """Creates a directory on disk.
Args:
- path: absolute mount-prefixed path of the new directory.
Notes:
- Parent folders are created as needed.
"""
def select_description(mode: FilesystemMode) -> str:
if mode == FilesystemMode.CLOUD:
return _CLOUD_DESCRIPTION
return _DESKTOP_DESCRIPTION

View file

@ -0,0 +1,94 @@
"""``mkdir`` factory: cloud stages for end-of-turn; desktop hits disk immediately."""
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING, Annotated, Any
from deepagents.backends.utils import validate_path
from langchain.tools import ToolRuntime
from langchain_core.messages import ToolMessage
from langchain_core.tools import BaseTool, StructuredTool
from langgraph.types import Command
from app.agents.shared.filesystem_state import SurfSenseFilesystemState
from app.agents.shared.path_resolver import DOCUMENTS_ROOT
from ...middleware.async_dispatch import run_async_blocking
from ...middleware.mode import is_cloud
from ...middleware.path_resolution import resolve_relative
from .description import select_description
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
def create_mkdir_tool(mw: SurfSenseFilesystemMiddleware) -> BaseTool:
description = select_description(mw._filesystem_mode)
async def async_mkdir(
path: Annotated[str, "Absolute or relative directory path to create."],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> Command | str:
target = resolve_relative(mw, path, runtime)
try:
validated = validate_path(target)
except ValueError as exc:
return f"Error: {exc}"
if is_cloud(mw._filesystem_mode):
if not (
validated.startswith(DOCUMENTS_ROOT + "/")
or validated == DOCUMENTS_ROOT
):
return (
"Error: cloud mkdir must target a path under /documents/ "
f"(got '{validated}')."
)
return Command(
update={
"staged_dirs": [validated],
"staged_dir_tool_calls": {
validated: runtime.tool_call_id,
},
"messages": [
ToolMessage(
content=(
f"Staged directory '{validated}' (will be created "
"at end of turn)."
),
tool_call_id=runtime.tool_call_id,
)
],
}
)
backend = mw._get_backend(runtime)
local_method = getattr(backend, "amkdir", None) or getattr(
backend, "mkdir", None
)
if callable(local_method):
try:
res: Any = local_method(validated, parents=True, exist_ok=True)
if asyncio.iscoroutine(res):
await res
except TypeError:
res = local_method(validated)
if asyncio.iscoroutine(res):
await res
except Exception as exc: # pragma: no cover
return f"Error: {exc}"
return f"Created directory {validated}"
def sync_mkdir(
path: Annotated[str, "Absolute or relative directory path to create."],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> Command | str:
return run_async_blocking(async_mkdir(path, runtime))
return StructuredTool.from_function(
name="mkdir",
description=description,
func=sync_mkdir,
coroutine=async_mkdir,
)

View file

@ -0,0 +1,7 @@
"""Tool: ``move_file`` — move or rename a file."""
from __future__ import annotations
from .index import create_move_file_tool
__all__ = ["create_move_file_tool"]

View file

@ -0,0 +1,33 @@
"""Mode-specific description strings for ``move_file``."""
from __future__ import annotations
from app.agents.shared.filesystem_selection import FilesystemMode
_CLOUD_DESCRIPTION = """Moves or renames a file or folder.
Use absolute paths for both source and destination.
Notes:
- `move_file` is staged this turn and committed at end of turn.
- The agent cannot overwrite an existing destination pass a fresh dest
path or move the existing destination away first.
- The anonymous uploaded document is read-only and cannot be moved.
- Rename is a special case of move (same folder, different filename).
"""
_DESKTOP_DESCRIPTION = """Moves or renames a file or folder on disk.
Use mount-prefixed absolute paths for both source and destination
(e.g. `/<mount>/old.txt` -> `/<mount>/new.txt`).
Notes:
- Cross-mount moves are not supported.
- Rename is a special case of move (same folder, different filename).
"""
def select_description(mode: FilesystemMode) -> str:
if mode == FilesystemMode.CLOUD:
return _CLOUD_DESCRIPTION
return _DESKTOP_DESCRIPTION

View file

@ -0,0 +1,108 @@
"""Cloud-mode move helper: stages source/dest into pending_moves + files."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from langchain.tools import ToolRuntime
from langchain_core.messages import ToolMessage
from langgraph.types import Command
from app.agents.shared.filesystem_state import SurfSenseFilesystemState
from app.agents.shared.middleware.kb_postgres_backend import KBPostgresBackend
from app.agents.shared.path_resolver import DOCUMENTS_ROOT
from app.agents.shared.state_reducers import _CLEAR
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
async def cloud_move_file(
mw: SurfSenseFilesystemMiddleware,
runtime: ToolRuntime[None, SurfSenseFilesystemState],
source: str,
dest: str,
*,
overwrite: bool,
) -> Command | str:
"""Stage a source/dest move in cloud mode (commit at end of turn)."""
backend = mw._get_backend(runtime)
if not isinstance(backend, KBPostgresBackend):
return "Error: cloud move requires KBPostgresBackend."
if source == dest:
return f"Moved '{source}' to '{dest}' (no-op)"
if overwrite:
return (
"Error: overwrite=True is not supported in cloud mode. Move/edit "
"the destination doc explicitly first."
)
if not source.startswith(DOCUMENTS_ROOT + "/"):
return (
f"Error: cloud move_file source must be under /documents/ (got '{source}')."
)
if not dest.startswith(DOCUMENTS_ROOT + "/"):
return (
"Error: cloud move_file destination must be under /documents/ (got "
f"'{dest}')."
)
anon = runtime.state.get("kb_anon_doc") or {}
if isinstance(anon, dict):
anon_path = str(anon.get("path") or "")
if anon_path and (anon_path in (source, dest)):
return "Error: the anonymous uploaded document is read-only."
files = runtime.state.get("files") or {}
doc_id_by_path = runtime.state.get("doc_id_by_path") or {}
pending_moves = list(runtime.state.get("pending_moves") or [])
if dest in files:
return f"Error: destination '{dest}' already exists."
if any(move.get("dest") == dest for move in pending_moves):
return f"Error: destination '{dest}' already exists."
if dest != source:
existing_dest = await backend._load_file_data(dest)
if existing_dest is not None:
return f"Error: destination '{dest}' already exists."
source_file_data = files.get(source)
source_doc_id = doc_id_by_path.get(source)
if source_file_data is None:
loaded = await backend._load_file_data(source)
if loaded is None:
return f"Error: source '{source}' not found."
source_file_data, loaded_doc_id = loaded
if source_doc_id is None:
source_doc_id = loaded_doc_id
files_update: dict[str, Any] = {source: None, dest: source_file_data}
update: dict[str, Any] = {
"files": files_update,
"pending_moves": [
{
"source": source,
"dest": dest,
"overwrite": False,
"tool_call_id": runtime.tool_call_id,
}
],
"messages": [
ToolMessage(
content=(f"Moved '{source}' to '{dest}' (will commit at end of turn)."),
tool_call_id=runtime.tool_call_id,
)
],
}
doc_id_update: dict[str, int | None] = {source: None}
if source_doc_id is not None:
doc_id_update[dest] = source_doc_id
update["doc_id_by_path"] = doc_id_update
dirty_paths = list(runtime.state.get("dirty_paths") or [])
if source in dirty_paths:
new_dirty: list[Any] = [_CLEAR]
for entry in dirty_paths:
new_dirty.append(dest if entry == source else entry)
update["dirty_paths"] = new_dirty
return Command(update=update)

View file

@ -0,0 +1,96 @@
"""``move_file`` factory: dispatches cloud (staged) vs desktop (direct disk) moves."""
from __future__ import annotations
from typing import TYPE_CHECKING, Annotated, Any
from deepagents.backends.protocol import WriteResult
from deepagents.backends.utils import validate_path
from langchain.tools import ToolRuntime
from langchain_core.messages import ToolMessage
from langchain_core.tools import BaseTool, StructuredTool
from langgraph.types import Command
from app.agents.shared.filesystem_state import SurfSenseFilesystemState
from ...middleware.async_dispatch import run_async_blocking
from ...middleware.mode import is_cloud
from ...middleware.path_resolution import resolve_move_target_path
from .description import select_description
from .helpers import cloud_move_file
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
def create_move_file_tool(mw: SurfSenseFilesystemMiddleware) -> BaseTool:
description = select_description(mw._filesystem_mode)
async def async_move_file(
source_path: Annotated[str, "Absolute or relative source path."],
destination_path: Annotated[str, "Absolute or relative destination path."],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
*,
overwrite: Annotated[
bool,
"If True, replace existing destination. Cloud mode rejects True. Defaults to False.",
] = False,
) -> Command | str:
if not source_path.strip() or not destination_path.strip():
return "Error: source_path and destination_path are required."
source = resolve_move_target_path(mw, source_path, runtime)
dest = resolve_move_target_path(mw, destination_path, runtime)
try:
validated_source = validate_path(source)
validated_dest = validate_path(dest)
except ValueError as exc:
return f"Error: {exc}"
if is_cloud(mw._filesystem_mode):
return await cloud_move_file(
mw,
runtime,
validated_source,
validated_dest,
overwrite=overwrite,
)
backend = mw._get_backend(runtime)
res: WriteResult = await backend.amove(
validated_source, validated_dest, overwrite=overwrite
)
if res.error:
return res.error
update: dict[str, Any] = {
"messages": [
ToolMessage(
content=f"Moved '{validated_source}' to '{res.path or validated_dest}'",
tool_call_id=runtime.tool_call_id,
)
],
}
if res.files_update is not None:
update["files"] = res.files_update
return Command(update=update)
def sync_move_file(
source_path: Annotated[str, "Absolute or relative source path."],
destination_path: Annotated[str, "Absolute or relative destination path."],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
*,
overwrite: Annotated[
bool,
"If True, replace existing destination. Cloud mode rejects True. Defaults to False.",
] = False,
) -> Command | str:
return run_async_blocking(
async_move_file(source_path, destination_path, runtime, overwrite=overwrite)
)
return StructuredTool.from_function(
name="move_file",
description=description,
func=sync_move_file,
coroutine=async_move_file,
)

View file

@ -0,0 +1,7 @@
"""Tool: ``pwd`` — print the current working directory."""
from __future__ import annotations
from .index import create_pwd_tool
__all__ = ["create_pwd_tool"]

View file

@ -0,0 +1,11 @@
"""Description string for ``pwd`` (mode-agnostic)."""
from __future__ import annotations
from app.agents.shared.filesystem_selection import FilesystemMode
_DESCRIPTION = """Prints the current working directory."""
def select_description(mode: FilesystemMode) -> str:
return _DESCRIPTION

View file

@ -0,0 +1,37 @@
"""``pwd`` factory: read the cwd from state."""
from __future__ import annotations
from typing import TYPE_CHECKING
from langchain.tools import ToolRuntime
from langchain_core.tools import BaseTool, StructuredTool
from app.agents.shared.filesystem_state import SurfSenseFilesystemState
from ...middleware.path_resolution import current_cwd
from .description import select_description
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
def create_pwd_tool(mw: SurfSenseFilesystemMiddleware) -> BaseTool:
description = select_description(mw._filesystem_mode)
def sync_pwd(
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> str:
return current_cwd(mw, runtime)
async def async_pwd(
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> str:
return current_cwd(mw, runtime)
return StructuredTool.from_function(
name="pwd",
description=description,
func=sync_pwd,
coroutine=async_pwd,
)

View file

@ -0,0 +1,7 @@
"""Tool: ``read_file`` — read a file (paginated) from the filesystem."""
from __future__ import annotations
from .index import create_read_file_tool
__all__ = ["create_read_file_tool"]

View file

@ -0,0 +1,22 @@
"""Description string for ``read_file`` (mode-agnostic)."""
from __future__ import annotations
from app.agents.shared.filesystem_selection import FilesystemMode
_DESCRIPTION = """Reads a file from the filesystem.
Usage:
- By default, reads up to 100 lines from the beginning.
- Use `offset` and `limit` for pagination when files are large.
- Results include line numbers.
- Documents contain a `<chunk_index>` near the top listing every chunk with
its line range and a `matched="true"` flag for search-relevant chunks.
Read the index first, then jump to matched chunks with
`read_file(path, offset=<start_line>, limit=<num_lines>)`.
- Use chunk IDs (`<chunk id='...'>`) as citations in answers.
"""
def select_description(mode: FilesystemMode) -> str:
return _DESCRIPTION

View file

@ -0,0 +1,100 @@
"""``read_file`` factory: state-cache lookup, then lazy KB load, then disk read."""
from __future__ import annotations
from typing import TYPE_CHECKING, Annotated, Any
from deepagents.backends.utils import format_read_response, validate_path
from langchain.tools import ToolRuntime
from langchain_core.messages import ToolMessage
from langchain_core.tools import BaseTool, StructuredTool
from langgraph.types import Command
from app.agents.shared.filesystem_state import SurfSenseFilesystemState
from app.agents.shared.middleware.kb_postgres_backend import KBPostgresBackend
from ...middleware.async_dispatch import run_async_blocking
from ...middleware.path_resolution import resolve_relative
from .description import select_description
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
def create_read_file_tool(mw: SurfSenseFilesystemMiddleware) -> BaseTool:
description = select_description(mw._filesystem_mode)
async def async_read_file(
file_path: Annotated[
str,
"Absolute path to the file to read. Relative paths resolve against the current cwd.",
],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
offset: Annotated[
int,
"Line number to start reading from (0-indexed).",
] = 0,
limit: Annotated[
int,
"Maximum number of lines to read.",
] = 100,
) -> Command | str:
target = resolve_relative(mw, file_path, runtime)
try:
validated = validate_path(target)
except ValueError as exc:
return f"Error: {exc}"
files = runtime.state.get("files") or {}
if validated in files:
return format_read_response(files[validated], offset, limit)
backend = mw._get_backend(runtime)
if isinstance(backend, KBPostgresBackend):
loaded = await backend._load_file_data(validated)
if loaded is None:
return f"Error: File '{validated}' not found"
file_data, doc_id = loaded
rendered = format_read_response(file_data, offset, limit)
update: dict[str, Any] = {
"files": {validated: file_data},
"messages": [
ToolMessage(
content=rendered,
tool_call_id=runtime.tool_call_id,
)
],
}
if doc_id is not None:
update["doc_id_by_path"] = {validated: doc_id}
return Command(update=update)
try:
rendered = await backend.aread(validated, offset=offset, limit=limit)
except Exception as exc: # pragma: no cover - defensive
return f"Error: {exc}"
return rendered
def sync_read_file(
file_path: Annotated[
str,
"Absolute path to the file to read. Relative paths resolve against the current cwd.",
],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
offset: Annotated[
int,
"Line number to start reading from (0-indexed).",
] = 0,
limit: Annotated[
int,
"Maximum number of lines to read.",
] = 100,
) -> Command | str:
return run_async_blocking(async_read_file(file_path, runtime, offset, limit))
return StructuredTool.from_function(
name="read_file",
description=description,
func=sync_read_file,
coroutine=async_read_file,
)

View file

@ -0,0 +1,7 @@
"""Tool: ``rm`` — delete a single file."""
from __future__ import annotations
from .index import create_rm_tool
__all__ = ["create_rm_tool"]

View file

@ -0,0 +1,38 @@
"""Mode-specific description strings for ``rm``."""
from __future__ import annotations
from app.agents.shared.filesystem_selection import FilesystemMode
_CLOUD_DESCRIPTION = """Deletes a single file under `/documents/`.
Mirrors POSIX `rm path` (no `-r`, no glob expansion). Stages the deletion
for end-of-turn commit; the row is removed only after the agent's turn
finishes successfully.
Args:
- path: absolute or relative file path. Cannot point at a directory use
`rmdir` for empty folders. Cannot target the root or `/documents`.
Notes:
- The action is reversible via the per-action revert flow when action
logging is enabled.
- The anonymous uploaded document is read-only and cannot be deleted.
"""
_DESKTOP_DESCRIPTION = """Deletes a single file from disk.
Mirrors POSIX `rm path` (no `-r`, no glob expansion). The deletion hits
disk immediately. Desktop deletes are NOT reversible via the agent's
revert flow.
Args:
- path: absolute mount-prefixed file path. Cannot point at a directory
use `rmdir` for empty folders.
"""
def select_description(mode: FilesystemMode) -> str:
if mode == FilesystemMode.CLOUD:
return _CLOUD_DESCRIPTION
return _DESKTOP_DESCRIPTION

View file

@ -0,0 +1,130 @@
"""Cloud and desktop ``rm`` branches.
Both branches receive an already-resolved + validated absolute path.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from deepagents.backends.protocol import WriteResult
from langchain.tools import ToolRuntime
from langchain_core.messages import ToolMessage
from langgraph.types import Command
from app.agents.shared.filesystem_state import SurfSenseFilesystemState
from app.agents.shared.middleware.kb_postgres_backend import KBPostgresBackend
from app.agents.shared.path_resolver import DOCUMENTS_ROOT
from app.agents.shared.state_reducers import _CLEAR
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
async def cloud_rm(
mw: SurfSenseFilesystemMiddleware,
runtime: ToolRuntime[None, SurfSenseFilesystemState],
validated: str,
) -> Command | str:
"""Stage a deletion in cloud mode (commit at end of turn)."""
if validated in ("/", DOCUMENTS_ROOT):
return f"Error: refusing to rm '{validated}'."
if not validated.startswith(DOCUMENTS_ROOT + "/"):
return (
f"Error: cloud rm must target a path under /documents/ (got '{validated}')."
)
anon = runtime.state.get("kb_anon_doc") or {}
if isinstance(anon, dict) and str(anon.get("path") or "") == validated:
return "Error: the anonymous uploaded document is read-only."
staged_dirs = list(runtime.state.get("staged_dirs") or [])
if validated in staged_dirs:
return f"Error: '{validated}' is a directory. Use rmdir for empty directories."
pending_dir_deletes = list(runtime.state.get("pending_dir_deletes") or [])
if any(
isinstance(d, dict) and d.get("path") == validated for d in pending_dir_deletes
):
return f"Error: '{validated}' is already queued for rmdir."
backend = mw._get_backend(runtime)
if isinstance(backend, KBPostgresBackend):
children = await backend.als_info(validated)
if children:
return (
f"Error: '{validated}' is a directory. Use rmdir for empty directories."
)
pending_deletes = list(runtime.state.get("pending_deletes") or [])
if any(isinstance(d, dict) and d.get("path") == validated for d in pending_deletes):
return f"'{validated}' is already queued for deletion."
files_state = runtime.state.get("files") or {}
doc_id_by_path = runtime.state.get("doc_id_by_path") or {}
resolved_doc_id: int | None = doc_id_by_path.get(validated)
if (
validated not in files_state
and resolved_doc_id is None
and isinstance(backend, KBPostgresBackend)
):
loaded = await backend._load_file_data(validated)
if loaded is None:
return f"Error: file '{validated}' not found."
_, resolved_doc_id = loaded
files_update: dict[str, Any] = {validated: None}
update: dict[str, Any] = {
"pending_deletes": [
{
"path": validated,
"tool_call_id": runtime.tool_call_id,
}
],
"files": files_update,
"doc_id_by_path": {validated: None},
"messages": [
ToolMessage(
content=(
f"Staged delete of '{validated}' (will commit at end of turn)."
),
tool_call_id=runtime.tool_call_id,
)
],
}
dirty_paths = list(runtime.state.get("dirty_paths") or [])
if validated in dirty_paths:
new_dirty: list[Any] = [_CLEAR]
for entry in dirty_paths:
if entry != validated:
new_dirty.append(entry)
update["dirty_paths"] = new_dirty
update["dirty_path_tool_calls"] = {validated: None}
return Command(update=update)
async def desktop_rm(
mw: SurfSenseFilesystemMiddleware,
runtime: ToolRuntime[None, SurfSenseFilesystemState],
validated: str,
) -> Command | str:
"""Hit disk immediately in desktop mode."""
backend = mw._get_backend(runtime)
adelete = getattr(backend, "adelete_file", None)
if not callable(adelete):
return "Error: rm is not supported by the active backend."
res: WriteResult = await adelete(validated)
if res.error:
return res.error
return Command(
update={
"files": {validated: None},
"messages": [
ToolMessage(
content=f"Deleted file '{res.path or validated}'",
tool_call_id=runtime.tool_call_id,
)
],
}
)

View file

@ -0,0 +1,61 @@
"""``rm`` factory: resolve + validate the path, then dispatch to cloud / desktop."""
from __future__ import annotations
from typing import TYPE_CHECKING, Annotated
from deepagents.backends.utils import validate_path
from langchain.tools import ToolRuntime
from langchain_core.tools import BaseTool, StructuredTool
from langgraph.types import Command
from app.agents.shared.filesystem_state import SurfSenseFilesystemState
from ...middleware.async_dispatch import run_async_blocking
from ...middleware.mode import is_cloud
from ...middleware.path_resolution import resolve_relative
from .description import select_description
from .helpers import cloud_rm, desktop_rm
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
def create_rm_tool(mw: SurfSenseFilesystemMiddleware) -> BaseTool:
description = select_description(mw._filesystem_mode)
async def async_rm(
path: Annotated[
str,
"Absolute or relative path to the file to delete.",
],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> Command | str:
if not path or not path.strip():
return "Error: path is required."
target = resolve_relative(mw, path, runtime)
try:
validated = validate_path(target)
except ValueError as exc:
return f"Error: {exc}"
if is_cloud(mw._filesystem_mode):
return await cloud_rm(mw, runtime, validated)
return await desktop_rm(mw, runtime, validated)
def sync_rm(
path: Annotated[
str,
"Absolute or relative path to the file to delete.",
],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> Command | str:
return run_async_blocking(async_rm(path, runtime))
return StructuredTool.from_function(
name="rm",
description=description,
func=sync_rm,
coroutine=async_rm,
)

View file

@ -0,0 +1,7 @@
"""Tool: ``rmdir`` — delete an empty directory."""
from __future__ import annotations
from .index import create_rmdir_tool
__all__ = ["create_rmdir_tool"]

View file

@ -0,0 +1,42 @@
"""Mode-specific description strings for ``rmdir``."""
from __future__ import annotations
from app.agents.shared.filesystem_selection import FilesystemMode
_CLOUD_DESCRIPTION = """Deletes an empty directory under `/documents/`.
Mirrors POSIX `rmdir path`: refuses non-empty directories. Recursive
deletion (`rm -r`) is intentionally NOT supported clear contents with
`rm` first.
Args:
- path: absolute or relative directory path. Cannot target the root,
`/documents`, the current cwd, or any ancestor of cwd (use `cd` to
move out first).
Notes:
- Emptiness is evaluated against the post-staged view, so a same-turn
`rm /a/x.md` followed by `rmdir /a` is fine.
- If the directory was added in this same turn via `mkdir` and never
committed, the staged mkdir is dropped instead of issuing a delete.
- The action is reversible via the per-action revert flow when action
logging is enabled.
"""
_DESKTOP_DESCRIPTION = """Deletes an empty directory from disk.
Mirrors POSIX `rmdir path`: refuses non-empty directories. Recursive
deletion is NOT supported. The deletion hits disk immediately and is
NOT reversible via the agent's revert flow.
Args:
- path: absolute mount-prefixed directory path. Cannot target the mount
root or any directory containing files/subfolders.
"""
def select_description(mode: FilesystemMode) -> str:
if mode == FilesystemMode.CLOUD:
return _CLOUD_DESCRIPTION
return _DESKTOP_DESCRIPTION

View file

@ -0,0 +1,136 @@
"""Cloud and desktop ``rmdir`` branches.
Both branches receive an already-resolved + validated absolute path.
"""
from __future__ import annotations
import posixpath
from typing import TYPE_CHECKING
from deepagents.backends.protocol import WriteResult
from langchain.tools import ToolRuntime
from langchain_core.messages import ToolMessage
from langgraph.types import Command
from app.agents.shared.filesystem_state import SurfSenseFilesystemState
from app.agents.shared.middleware.kb_postgres_backend import KBPostgresBackend
from app.agents.shared.path_resolver import DOCUMENTS_ROOT
from app.agents.shared.state_reducers import _CLEAR
from ...middleware.path_resolution import current_cwd
from ...shared.paths import is_ancestor_of
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
async def cloud_rmdir(
mw: SurfSenseFilesystemMiddleware,
runtime: ToolRuntime[None, SurfSenseFilesystemState],
validated: str,
) -> Command | str:
"""Stage an empty-folder delete in cloud mode (commit at end of turn)."""
if validated in ("/", DOCUMENTS_ROOT):
return f"Error: refusing to rmdir '{validated}'."
if not validated.startswith(DOCUMENTS_ROOT + "/"):
return (
"Error: cloud rmdir must target a path under /documents/ "
f"(got '{validated}')."
)
cwd = current_cwd(mw, runtime)
if validated == cwd or is_ancestor_of(validated, cwd):
return (
f"Error: cannot rmdir '{validated}' because the current "
"cwd is at or under it. cd out first."
)
staged_dirs = list(runtime.state.get("staged_dirs") or [])
pending_dir_deletes = list(runtime.state.get("pending_dir_deletes") or [])
if any(
isinstance(d, dict) and d.get("path") == validated for d in pending_dir_deletes
):
return f"'{validated}' is already queued for deletion."
backend = mw._get_backend(runtime)
exists_in_staged = validated in staged_dirs
children: list = []
if isinstance(backend, KBPostgresBackend):
children = list(await backend.als_info(validated))
if isinstance(backend, KBPostgresBackend) and not children and not exists_in_staged:
loaded = await backend._load_file_data(validated)
if loaded is not None:
return f"Error: '{validated}' is a file. Use rm to delete files."
parent = posixpath.dirname(validated) or "/"
parent_listing = await backend.als_info(parent)
parent_has_dir = any(
info.get("path") == validated and info.get("is_dir")
for info in parent_listing
)
if not parent_has_dir:
return f"Error: directory '{validated}' not found."
if children:
return f"Error: directory '{validated}' is not empty. Remove contents first."
if exists_in_staged:
rest = [d for d in staged_dirs if d != validated]
return Command(
update={
"staged_dirs": [_CLEAR, *rest],
"staged_dir_tool_calls": {validated: None},
"messages": [
ToolMessage(
content=(f"Un-staged directory '{validated}'."),
tool_call_id=runtime.tool_call_id,
)
],
}
)
return Command(
update={
"pending_dir_deletes": [
{
"path": validated,
"tool_call_id": runtime.tool_call_id,
}
],
"messages": [
ToolMessage(
content=(
f"Staged rmdir of '{validated}' (will commit at end of turn)."
),
tool_call_id=runtime.tool_call_id,
)
],
}
)
async def desktop_rmdir(
mw: SurfSenseFilesystemMiddleware,
runtime: ToolRuntime[None, SurfSenseFilesystemState],
validated: str,
) -> Command | str:
"""Hit disk immediately in desktop mode."""
backend = mw._get_backend(runtime)
armdir = getattr(backend, "armdir", None)
if not callable(armdir):
return "Error: rmdir is not supported by the active backend."
res: WriteResult = await armdir(validated)
if res.error:
return res.error
return Command(
update={
"messages": [
ToolMessage(
content=f"Deleted directory '{res.path or validated}'",
tool_call_id=runtime.tool_call_id,
)
],
}
)

View file

@ -0,0 +1,61 @@
"""``rmdir`` factory: resolve + validate the path, then dispatch to cloud / desktop."""
from __future__ import annotations
from typing import TYPE_CHECKING, Annotated
from deepagents.backends.utils import validate_path
from langchain.tools import ToolRuntime
from langchain_core.tools import BaseTool, StructuredTool
from langgraph.types import Command
from app.agents.shared.filesystem_state import SurfSenseFilesystemState
from ...middleware.async_dispatch import run_async_blocking
from ...middleware.mode import is_cloud
from ...middleware.path_resolution import resolve_relative
from .description import select_description
from .helpers import cloud_rmdir, desktop_rmdir
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
def create_rmdir_tool(mw: SurfSenseFilesystemMiddleware) -> BaseTool:
description = select_description(mw._filesystem_mode)
async def async_rmdir(
path: Annotated[
str,
"Absolute or relative path of the empty directory to delete.",
],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> Command | str:
if not path or not path.strip():
return "Error: path is required."
target = resolve_relative(mw, path, runtime)
try:
validated = validate_path(target)
except ValueError as exc:
return f"Error: {exc}"
if is_cloud(mw._filesystem_mode):
return await cloud_rmdir(mw, runtime, validated)
return await desktop_rmdir(mw, runtime, validated)
def sync_rmdir(
path: Annotated[
str,
"Absolute or relative path of the empty directory to delete.",
],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> Command | str:
return run_async_blocking(async_rmdir(path, runtime))
return StructuredTool.from_function(
name="rmdir",
description=description,
func=sync_rmdir,
coroutine=async_rmdir,
)

View file

@ -0,0 +1,7 @@
"""Tool: ``write_file`` — create or overwrite a text file."""
from __future__ import annotations
from .index import create_write_file_tool
__all__ = ["create_write_file_tool"]

View file

@ -0,0 +1,35 @@
"""Mode-specific description strings for ``write_file``."""
from __future__ import annotations
from app.agents.shared.filesystem_selection import FilesystemMode
_CLOUD_DESCRIPTION = """Writes a new text file to the workspace.
Usage:
- Files written under `/documents/<...>` are persisted as Documents at end
of turn.
- Use a `temp_` filename prefix (e.g. `temp_plan.md` or `/documents/temp_x.md`)
for scratch/working files; they are automatically discarded at end of turn.
- Writes outside `/documents/` are rejected unless the basename starts with
`temp_`.
- Supported outputs include common LLM-friendly text formats like markdown,
json, yaml, csv, xml, html, css, sql, and code files.
- Avoid placeholders; produce concrete and useful text.
"""
_DESKTOP_DESCRIPTION = """Writes a text file to disk.
Usage:
- Use mount-prefixed absolute paths like `/<mount>/sub/file.ext`.
- Writes hit disk immediately. There is no end-of-turn staging.
- Supported outputs include common LLM-friendly text formats like markdown,
json, yaml, csv, xml, html, css, sql, and code files.
- Avoid placeholders; produce concrete and useful text.
"""
def select_description(mode: FilesystemMode) -> str:
if mode == FilesystemMode.CLOUD:
return _CLOUD_DESCRIPTION
return _DESKTOP_DESCRIPTION

View file

@ -0,0 +1,83 @@
"""``write_file`` factory: resolve target, enforce cloud namespace, dispatch to backend."""
from __future__ import annotations
from typing import TYPE_CHECKING, Annotated, Any
from deepagents.backends.protocol import WriteResult
from deepagents.backends.utils import create_file_data, validate_path
from langchain.tools import ToolRuntime
from langchain_core.messages import ToolMessage
from langchain_core.tools import BaseTool, StructuredTool
from langgraph.types import Command
from app.agents.shared.filesystem_state import SurfSenseFilesystemState
from ...middleware.async_dispatch import run_async_blocking
from ...middleware.mode import is_cloud
from ...middleware.namespace_policy import check_cloud_write_namespace
from ...middleware.path_resolution import resolve_write_target_path
from .description import select_description
if TYPE_CHECKING:
from ...middleware import SurfSenseFilesystemMiddleware
def create_write_file_tool(mw: SurfSenseFilesystemMiddleware) -> BaseTool:
description = select_description(mw._filesystem_mode)
async def async_write_file(
file_path: Annotated[
str,
"Absolute path where the file should be created. Relative paths resolve against the current cwd.",
],
content: Annotated[str, "Text content to write to the file."],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> Command | str:
target = resolve_write_target_path(mw, file_path, runtime)
try:
validated = validate_path(target)
except ValueError as exc:
return f"Error: {exc}"
namespace_error = check_cloud_write_namespace(mw, validated, runtime)
if namespace_error:
return namespace_error
backend = mw._get_backend(runtime)
res: WriteResult = await backend.awrite(validated, content)
if res.error:
return res.error
path = res.path or validated
files_update = res.files_update or {path: create_file_data(content)}
update: dict[str, Any] = {
"files": files_update,
"messages": [
ToolMessage(
content=f"Updated file {path}",
tool_call_id=runtime.tool_call_id,
)
],
}
if is_cloud(mw._filesystem_mode):
update["dirty_paths"] = [path]
update["dirty_path_tool_calls"] = {path: runtime.tool_call_id}
return Command(update=update)
def sync_write_file(
file_path: Annotated[
str,
"Absolute path where the file should be created. Relative paths resolve against the current cwd.",
],
content: Annotated[str, "Text content to write to the file."],
runtime: ToolRuntime[None, SurfSenseFilesystemState],
) -> Command | str:
return run_async_blocking(async_write_file(file_path, content, runtime))
return StructuredTool.from_function(
name="write_file",
description=description,
func=sync_write_file,
coroutine=async_write_file,
)

View file

@ -0,0 +1,10 @@
"""Single source of truth for the feature-flag predicate."""
from __future__ import annotations
from app.agents.shared.feature_flags import AgentFeatureFlags
def enabled(flags: AgentFeatureFlags, attr: str) -> bool:
"""``flags.<attr>`` is on AND the new-agent-stack kill switch is off."""
return getattr(flags, attr) and not flags.disable_new_agent_stack

View file

@ -0,0 +1,73 @@
"""Project ``workspace_tree_text`` + ``kb_priority`` from state into SystemMessages."""
from __future__ import annotations
import time
from typing import Any
from langchain.agents.middleware import AgentMiddleware, AgentState
from langchain_core.messages import SystemMessage
from langgraph.runtime import Runtime
from app.agents.shared.filesystem_state import SurfSenseFilesystemState
from app.agents.shared.middleware.knowledge_search import _render_priority_message
from app.utils.perf import get_perf_logger
_perf_log = get_perf_logger()
class KbContextProjectionMiddleware(AgentMiddleware): # type: ignore[type-arg]
"""Emit ``<workspace_tree>`` + ``<priority_documents>`` from shared state.
Read-only consumer: no DB, no LLM, no state writes. The orchestrator's
renderer middlewares populate the source fields; this projection lets any
agent (orchestrator or subagent) put the same content in front of its
own LLM call.
"""
tools = ()
state_schema = SurfSenseFilesystemState
def before_agent( # type: ignore[override]
self,
state: AgentState,
runtime: Runtime[Any],
) -> dict[str, Any] | None:
del runtime
start = time.perf_counter()
tree_text = state.get("workspace_tree_text")
priority = state.get("kb_priority")
if not tree_text and not priority:
_perf_log.info(
"[kb_context_projection] tree=0 priority=0 elapsed=%.3fs",
time.perf_counter() - start,
)
return None
messages = list(state.get("messages") or [])
insert_at = max(len(messages) - 1, 0)
tree_chars = 0
if tree_text:
tree_chars = len(tree_text)
messages.insert(insert_at, SystemMessage(content=tree_text))
priority_count = 0
if priority:
priority_count = len(priority) if hasattr(priority, "__len__") else 1
messages.insert(insert_at, _render_priority_message(priority))
_perf_log.info(
"[kb_context_projection] tree_chars=%d priority_items=%d elapsed=%.3fs",
tree_chars,
priority_count,
time.perf_counter() - start,
)
return {"messages": messages}
def build_kb_context_projection_mw() -> KbContextProjectionMiddleware:
return KbContextProjectionMiddleware()
__all__ = [
"KbContextProjectionMiddleware",
"build_kb_context_projection_mw",
]

View file

@ -0,0 +1,19 @@
"""User/team memory injection prepended to the conversation."""
from __future__ import annotations
from app.agents.shared.middleware import MemoryInjectionMiddleware
from app.db import ChatVisibility
def build_memory_mw(
*,
user_id: str | None,
search_space_id: int,
visibility: ChatVisibility,
) -> MemoryInjectionMiddleware:
return MemoryInjectionMiddleware(
user_id=user_id,
search_space_id=search_space_id,
thread_visibility=visibility,
)

View file

@ -0,0 +1,9 @@
"""Repair dangling tool-call sequences before each agent turn."""
from __future__ import annotations
from deepagents.middleware.patch_tool_calls import PatchToolCallsMiddleware
def build_patch_tool_calls_mw() -> PatchToolCallsMiddleware:
return PatchToolCallsMiddleware()

View file

@ -0,0 +1,11 @@
"""Pattern-based allow/deny/ask middleware with HITL fallback (vertical slice).
Public surface (one entry point only every other symbol is an internal of
the rule engine and stays inside ``middleware/``, ``ask/``, or ``deny.py``):
- :func:`build_permission_mw` construction recipe shared by every stack.
"""
from .middleware.factory import build_permission_mw
__all__ = ["build_permission_mw"]

View file

@ -0,0 +1,74 @@
"""Translate the unified langchain HITL envelope into permission-domain semantics.
``PermissionMiddleware`` works with the canonical shape
``{decision_type: "once" | "approve_always" | "reject", feedback?: str, edited_args?: dict}``.
The wire envelope arriving from langgraph already lives in the LC HITL shape
(parsed once in :mod:`hitl_wire.decision`); this module performs the small
domain mapping (``approve|edit`` ``once``, ``approve_always``
``approve_always``, anything else ``reject``) without re-implementing the
envelope walk.
Failing closed: any unrecognised decision becomes ``reject`` (with a warning)
so the middleware never proceeds on ambiguous input.
"""
from __future__ import annotations
import logging
from typing import Any
from app.agents.multi_agent_chat.subagents.shared.hitl.wire import (
LC_DECISION_APPROVE,
LC_DECISION_EDIT,
LC_DECISION_REJECT,
SURFSENSE_DECISION_APPROVE_ALWAYS,
parse_lc_envelope,
)
logger = logging.getLogger(__name__)
# ``approve`` and ``edit`` both mean "let this call go through this once". The
# legacy SurfSense bare-scalar values (``once`` / ``approve_always`` / ``reject``)
# pass through unchanged so historical resume payloads still work.
_LC_TO_PERMISSION: dict[str, str] = {
LC_DECISION_APPROVE: "once",
LC_DECISION_EDIT: "once",
SURFSENSE_DECISION_APPROVE_ALWAYS: "approve_always",
LC_DECISION_REJECT: "reject",
"once": "once",
"approve_always": "approve_always",
"reject": "reject",
}
def normalize_permission_decision(envelope: Any) -> dict[str, Any]:
"""Project the user's reply into the canonical permission decision shape.
Args:
envelope: The raw resume value from langgraph (LC HITL envelope, a
bare scalar string, or a pre-canonical dict).
Returns:
``{"decision_type": "once"|"approve_always"|"reject"}`` plus optional
``feedback`` (``reject`` with a user message) and ``edited_args``
(``edit`` reply with non-empty arg overrides).
"""
parsed = parse_lc_envelope(envelope)
mapped = _LC_TO_PERMISSION.get(parsed.decision_type)
if mapped is None:
logger.warning(
"Unknown permission decision %r; treating as reject",
parsed.decision_type,
)
mapped = "reject"
out: dict[str, Any] = {"decision_type": mapped}
if parsed.message:
out["feedback"] = parsed.message
if parsed.edited_args:
out["edited_args"] = parsed.edited_args
return out
__all__ = ["normalize_permission_decision"]

View file

@ -0,0 +1,10 @@
"""Apply ``edit`` permission decisions to tool calls.
Edited-arg extraction now lives in :mod:`hitl_wire.decision` (single parser
for all approval paths); this module owns the merge step that produces a
fresh tool-call dict for the orchestrator.
"""
from .merge import merge_edited_args
__all__ = ["merge_edited_args"]

View file

@ -0,0 +1,22 @@
"""Apply edited args to a tool call (shallow merge, no mutation).
Edited values override originals; keys absent from ``edited_args`` keep
their original values, so partial edits are safe. Returns a NEW tool-call
dict so the caller can swap it into ``AIMessage.tool_calls`` without
aliasing the live message object.
"""
from __future__ import annotations
from typing import Any
def merge_edited_args(
tool_call: dict[str, Any], edited_args: dict[str, Any]
) -> dict[str, Any]:
original_args = tool_call.get("args") or {}
merged_args = {**original_args, **edited_args}
return {**tool_call, "args": merged_args}
__all__ = ["merge_edited_args"]

View file

@ -0,0 +1,89 @@
"""Build the permission-ask interrupt payload (LC HITL wire + SurfSense context)."""
from __future__ import annotations
from typing import Any
from langchain_core.tools import BaseTool
from app.agents.multi_agent_chat.subagents.shared.hitl.wire import (
LC_DECISION_APPROVE,
LC_DECISION_EDIT,
LC_DECISION_REJECT,
SURFSENSE_DECISION_APPROVE_ALWAYS,
build_lc_hitl_payload,
)
from app.agents.shared.permissions import Rule
PERMISSION_ASK_INTERRUPT_TYPE = "permission_ask"
_BASE_PERMISSION_ASK_DECISIONS: list[str] = [
LC_DECISION_APPROVE,
LC_DECISION_REJECT,
LC_DECISION_EDIT,
]
def _is_mcp_tool(tool: BaseTool | None) -> bool:
"""An MCP tool advertises a connector id in its langchain metadata."""
if tool is None:
return False
metadata = getattr(tool, "metadata", None) or {}
return metadata.get("mcp_connector_id") is not None
def _card_fields_from_tool(tool: BaseTool | None) -> dict[str, Any]:
"""Project the FE card's tool-scoped fields out of a BaseTool."""
if tool is None:
return {}
metadata = getattr(tool, "metadata", None) or {}
fields: dict[str, Any] = {}
connector_id = metadata.get("mcp_connector_id")
if connector_id is not None:
fields["mcp_connector_id"] = connector_id
connector_name = metadata.get("mcp_connector_name")
if connector_name:
fields["mcp_server"] = connector_name
if tool.description:
fields["tool_description"] = tool.description
return fields
def build_permission_ask_payload(
*,
tool_name: str,
args: dict[str, Any],
patterns: list[str],
rules: list[Rule],
tool: BaseTool | None = None,
) -> dict[str, Any]:
"""Build the permission-ask interrupt payload.
``approve_always`` is added to the palette only for MCP tools, since that
is the only case where the user's choice can persist beyond the current
agent instance (saved to the connector's trusted-tools list). Native
tools fall back to the once/reject/edit triad.
"""
allowed_decisions = list(_BASE_PERMISSION_ASK_DECISIONS)
if _is_mcp_tool(tool):
allowed_decisions.append(SURFSENSE_DECISION_APPROVE_ALWAYS)
context: dict[str, Any] = {
"patterns": patterns,
"rules": [
{"permission": r.permission, "pattern": r.pattern, "action": r.action}
for r in rules
],
"always": patterns,
**_card_fields_from_tool(tool),
}
return build_lc_hitl_payload(
tool_name=tool_name,
args=args,
allowed_decisions=allowed_decisions,
interrupt_type=PERMISSION_ASK_INTERRUPT_TYPE,
context=context,
)
__all__ = ["PERMISSION_ASK_INTERRUPT_TYPE", "build_permission_ask_payload"]

View file

@ -0,0 +1,61 @@
"""Side-effectful entry point: pause the graph and return the permission decision.
Wraps :func:`langgraph.types.interrupt` with the OTel spans the SurfSense
dashboard expects, then projects the resume value through
:func:`normalize_permission_decision` so the middleware downstream only
sees the canonical permission-domain shape.
When ``emit_interrupt`` is ``False`` the call short-circuits to ``reject``;
this is used by non-interactive deployments where ``ask`` must not block.
"""
from __future__ import annotations
from typing import Any
from langchain_core.tools import BaseTool
from langgraph.types import interrupt
from app.agents.shared.permissions import Rule
from app.observability import metrics as ot_metrics, otel as ot
from .decision import normalize_permission_decision
from .payload import PERMISSION_ASK_INTERRUPT_TYPE, build_permission_ask_payload
def request_permission_decision(
*,
tool_name: str,
args: dict[str, Any],
patterns: list[str],
rules: list[Rule],
emit_interrupt: bool,
tool: BaseTool | None = None,
) -> dict[str, Any]:
"""Pause for an ``ask`` decision; return the canonical permission decision dict."""
if not emit_interrupt:
return {"decision_type": "reject"}
payload = build_permission_ask_payload(
tool_name=tool_name,
args=args,
patterns=patterns,
rules=rules,
tool=tool,
)
with (
ot.permission_asked_span(
permission=tool_name,
pattern=patterns[0] if patterns else None,
extra={"permission.patterns": list(patterns)},
),
ot.interrupt_span(interrupt_type=PERMISSION_ASK_INTERRUPT_TYPE),
):
ot_metrics.record_permission_ask(permission=tool_name)
ot_metrics.record_interrupt(interrupt_type=PERMISSION_ASK_INTERRUPT_TYPE)
decision = interrupt(payload)
return normalize_permission_decision(decision)
__all__ = ["request_permission_decision"]

View file

@ -0,0 +1,39 @@
"""Synthesise a ``ToolMessage`` for a denied tool call.
The denied call is replaced with this message so the model sees a typed
``permission_denied`` error in ``ToolMessage.additional_kwargs["error"]``
and can adjust its plan without retrying the same forbidden call.
"""
from __future__ import annotations
from typing import Any
from langchain_core.messages import ToolMessage
from app.agents.shared.errors import StreamingError
from app.agents.shared.permissions import Rule
def build_deny_message(tool_call: dict[str, Any], rule: Rule) -> ToolMessage:
err = StreamingError(
code="permission_denied",
retryable=False,
suggestion=(
f"rule permission={rule.permission!r} pattern={rule.pattern!r} "
f"blocked this call"
),
)
return ToolMessage(
content=(
f"Permission denied: rule {rule.permission}/{rule.pattern} "
f"blocked tool {tool_call.get('name')!r}."
),
tool_call_id=tool_call.get("id") or "",
name=tool_call.get("name"),
status="error",
additional_kwargs={"error": err.model_dump()},
)
__all__ = ["build_deny_message"]

View file

@ -0,0 +1,13 @@
"""The orchestrator class plus its evaluation and ruleset-view helpers."""
from .core import PermissionMiddleware
from .evaluation import evaluate_tool_call, resolve_patterns
from .ruleset_view import all_rulesets, globally_denied
__all__ = [
"PermissionMiddleware",
"all_rulesets",
"evaluate_tool_call",
"globally_denied",
"resolve_patterns",
]

View file

@ -0,0 +1,225 @@
"""``PermissionMiddleware`` — pattern-based allow/deny/ask with HITL fallback.
LangChain's :class:`HumanInTheLoopMiddleware` only supports a static
"this tool always asks" decision per tool. There's no rule-based
allow/deny/ask, no glob patterns, no per-space/per-thread overrides, and
no auto-deny synthesis.
This middleware layers OpenCode's wildcard-ruleset model on top of the
unified langchain HITL wire format (see :mod:`hitl_wire`), so it sits
beside ``HumanInTheLoopMiddleware`` and self-gated approvals on a single
parallel-HITL routing layer in ``task_tool`` + ``resume_routing``.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Any
from langchain.agents.middleware.types import (
AgentMiddleware,
AgentState,
ContextT,
)
from langchain_core.messages import AIMessage, ToolMessage
from langchain_core.tools import BaseTool
from langgraph.runtime import Runtime
from app.agents.shared.errors import CorrectedError, RejectedError
from app.agents.shared.permissions import Ruleset
from app.services.user_tool_allowlist import TrustedToolSaver
from ..ask.edit import merge_edited_args
from ..ask.request import request_permission_decision
from ..deny import build_deny_message
from .evaluation import evaluate_tool_call
from .pattern_resolver import PatternResolver
from .ruleset_view import all_rulesets
from .runtime_promote import persist_always
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class _AlwaysPromotion:
"""A pending request to save an ``approve_always`` decision to the user's trust list."""
connector_id: int
tool_name: str
class PermissionMiddleware(AgentMiddleware): # type: ignore[type-arg]
"""Allow/deny/ask layer over the agent's tool calls.
Args:
rulesets: Layered rulesets to evaluate (earliest-to-latest wins).
Typical layering: ``defaults < global < space < thread < runtime_approved``.
pattern_resolvers: Optional per-tool callables that map ``args``
to wildcard patterns. Tools without an entry use the bare
tool name as the only pattern.
runtime_ruleset: Mutable :class:`Ruleset` extended in-place when
the user replies ``"approve_always"``. Reused across calls in
the same agent instance so newly-allowed rules apply downstream.
always_emit_interrupt_payload: Set ``False`` to make ``ask``
collapse to ``deny`` (for non-interactive deployments).
tools_by_name: Map from tool name to :class:`BaseTool`, used to
decorate ``ask`` interrupts with the tool's description and
MCP metadata for the FE card.
trusted_tool_saver: Async callback invoked on ``approve_always``
decisions for MCP tools (those whose ``metadata`` carries an
``mcp_connector_id``). Without it the promotion only lives
in-memory for the current agent instance.
"""
tools = ()
def __init__(
self,
*,
rulesets: list[Ruleset] | None = None,
pattern_resolvers: dict[str, PatternResolver] | None = None,
runtime_ruleset: Ruleset | None = None,
always_emit_interrupt_payload: bool = True,
tools_by_name: dict[str, BaseTool] | None = None,
trusted_tool_saver: TrustedToolSaver | None = None,
) -> None:
super().__init__()
self._static_rulesets: list[Ruleset] = list(rulesets or [])
self._pattern_resolvers: dict[str, PatternResolver] = dict(
pattern_resolvers or {}
)
self._runtime_ruleset: Ruleset = runtime_ruleset or Ruleset(
origin="runtime_approved"
)
self._emit_interrupt = always_emit_interrupt_payload
self._tools_by_name: dict[str, BaseTool] = dict(tools_by_name or {})
self._trusted_tool_saver: TrustedToolSaver | None = trusted_tool_saver
def _process(
self,
state: AgentState,
runtime: Runtime[Any],
) -> tuple[dict[str, Any] | None, list[_AlwaysPromotion]]:
"""Pure decision pass: returns ``(state_update, pending_promotions)``.
Side effects performed here are in-memory only (rule promotion
into ``runtime_ruleset``). DB writes for ``approve_always``
decisions are queued as ``_AlwaysPromotion`` and flushed by the
async hook.
"""
del runtime
messages = state.get("messages") or []
if not messages:
return None, []
last = messages[-1]
if not isinstance(last, AIMessage) or not last.tool_calls:
return None, []
rulesets = all_rulesets(self._static_rulesets, self._runtime_ruleset)
deny_messages: list[ToolMessage] = []
kept_calls: list[dict[str, Any]] = []
promotions: list[_AlwaysPromotion] = []
any_change = False
for raw in last.tool_calls:
call = (
dict(raw)
if isinstance(raw, dict)
else {
"name": getattr(raw, "name", None),
"args": getattr(raw, "args", {}),
"id": getattr(raw, "id", None),
"type": "tool_call",
}
)
name = call.get("name") or ""
args = call.get("args") or {}
action, patterns, rules = evaluate_tool_call(
name, args, self._pattern_resolvers, rulesets
)
if action == "deny":
deny_rule = next((r for r in rules if r.action == "deny"), rules[0])
deny_messages.append(build_deny_message(call, deny_rule))
any_change = True
continue
if action == "ask":
decision = request_permission_decision(
tool_name=name,
args=args,
patterns=patterns,
rules=rules,
emit_interrupt=self._emit_interrupt,
tool=self._tools_by_name.get(name),
)
kind = str(decision.get("decision_type") or "reject").lower()
edited_args = decision.get("edited_args")
if kind in ("once", "approve_always"):
final_call = (
merge_edited_args(call, edited_args)
if isinstance(edited_args, dict) and edited_args
else call
)
if final_call is not call:
any_change = True
if kind == "approve_always":
persist_always(self._runtime_ruleset, name, patterns)
promotion = self._build_always_promotion(name)
if promotion is not None:
promotions.append(promotion)
kept_calls.append(final_call)
elif kind == "reject":
feedback = decision.get("feedback")
if isinstance(feedback, str) and feedback.strip():
raise CorrectedError(feedback, tool=name)
raise RejectedError(
tool=name, pattern=patterns[0] if patterns else None
)
else:
logger.warning(
"Unknown permission decision %r; treating as reject", kind
)
raise RejectedError(tool=name)
continue
kept_calls.append(call)
if not any_change and len(kept_calls) == len(last.tool_calls):
return None, promotions
updated = last.model_copy(update={"tool_calls": kept_calls})
result_messages: list[Any] = [updated]
if deny_messages:
result_messages.extend(deny_messages)
return {"messages": result_messages}, promotions
def _build_always_promotion(self, tool_name: str) -> _AlwaysPromotion | None:
"""Return a save request iff the tool exposes an ``mcp_connector_id``."""
tool = self._tools_by_name.get(tool_name)
metadata = getattr(tool, "metadata", None) or {}
connector_id = metadata.get("mcp_connector_id")
if not isinstance(connector_id, int):
return None
return _AlwaysPromotion(connector_id=connector_id, tool_name=tool_name)
def after_model( # type: ignore[override]
self, state: AgentState, runtime: Runtime[ContextT]
) -> dict[str, Any] | None:
update, _ = self._process(state, runtime)
return update
async def aafter_model( # type: ignore[override]
self, state: AgentState, runtime: Runtime[ContextT]
) -> dict[str, Any] | None:
update, promotions = self._process(state, runtime)
if self._trusted_tool_saver is not None:
for promotion in promotions:
await self._trusted_tool_saver(
promotion.connector_id, promotion.tool_name
)
return update
__all__ = ["PermissionMiddleware"]

View file

@ -0,0 +1,60 @@
"""Resolve patterns for a tool call and aggregate the resulting rules.
Two stages run on every tool call:
1. :func:`resolve_patterns` asks the tool's resolver (or the default) for
the wildcard patterns the rule engine should evaluate. Resolver
failures fall back to the bare tool name so a buggy resolver can't
cascade into permission decisions.
2. :func:`evaluate_tool_call` runs the rule engine against those patterns
and collapses the per-pattern rules into a single action
(``deny`` > ``ask`` > ``allow``).
"""
from __future__ import annotations
import logging
from typing import Any
from app.agents.shared.permissions import (
Rule,
RuleAction,
Ruleset,
aggregate_action,
evaluate_many,
)
from .pattern_resolver import PatternResolver, default_pattern_resolver
logger = logging.getLogger(__name__)
def resolve_patterns(
tool_name: str,
args: dict[str, Any],
pattern_resolvers: dict[str, PatternResolver],
) -> list[str]:
resolver = pattern_resolvers.get(tool_name, default_pattern_resolver(tool_name))
try:
patterns = resolver(args or {})
except Exception:
logger.exception("Pattern resolver for %s raised; using bare name", tool_name)
patterns = [tool_name]
if not patterns:
patterns = [tool_name]
return patterns
def evaluate_tool_call(
tool_name: str,
args: dict[str, Any],
pattern_resolvers: dict[str, PatternResolver],
rulesets: list[Ruleset],
) -> tuple[RuleAction, list[str], list[Rule]]:
patterns = resolve_patterns(tool_name, args, pattern_resolvers)
rules = evaluate_many(tool_name, patterns, *rulesets)
action = aggregate_action(rules)
return action, patterns, rules
__all__ = ["evaluate_tool_call", "resolve_patterns"]

View file

@ -0,0 +1,88 @@
"""Construction recipe for :class:`PermissionMiddleware` shared across stacks.
Single source of truth used by both the main-agent stack and every subagent
stack. Rule layers are evaluated earliest-to-latest (last match wins,
matching OpenCode's ``permission/index.ts`` evaluation order):
1. ``surfsense_defaults`` single ``allow */*`` rule. Connector tools
already self-gate via :func:`request_approval`, so the rule engine only
needs to *deny* what the user has explicitly forbidden; the default
``ask`` fallback would otherwise double-prompt every safe read-only
call.
2. ``subagent_rulesets`` caller-supplied rulesets contributed by the
consuming subagent. Each subagent passes its coded rules (KB:
destructive-FS ``ask`` rules; connectors: per-tool ``allow``/``ask``)
plus, when present, the user's persisted allow-list for that subagent.
Connector deny synthesis from ``new_chat._synthesize_connector_deny_rules``
is intentionally NOT replicated: the multi-agent orchestrator already
excludes entire subagents whose required connectors are missing
(``SUBAGENT_TO_REQUIRED_CONNECTOR_MAP``), so the per-tool deny pass is
redundant here.
"""
from __future__ import annotations
from collections.abc import Sequence
from langchain_core.tools import BaseTool
from app.agents.shared.feature_flags import AgentFeatureFlags
from app.agents.shared.permissions import Rule, Ruleset
from app.services.user_tool_allowlist import TrustedToolSaver
from .core import PermissionMiddleware
_SURFSENSE_DEFAULTS = Ruleset(
rules=[Rule(permission="*", pattern="*", action="allow")],
origin="surfsense_defaults",
)
def build_permission_mw(
*,
flags: AgentFeatureFlags,
subagent_rulesets: list[Ruleset] | None = None,
tools: Sequence[BaseTool] | None = None,
trusted_tool_saver: TrustedToolSaver | None = None,
) -> PermissionMiddleware | None:
"""Return a configured :class:`PermissionMiddleware` or ``None`` when no work is needed.
Args:
flags: Feature toggles. ``enable_permission`` switches the engine on;
``disable_new_agent_stack`` overrides everything for safety.
subagent_rulesets: Caller-supplied rulesets layered after the
defaults. Subagents pass their own coded ruleset here (and,
when present, the user's persisted allow-list for that
subagent) so each subagent owns its own rule surface without
aliasing a shared engine. Presence of any subagent ruleset
forces the middleware on regardless of ``enable_permission``
an explicit ``ask`` rule always asks.
tools: Subagent tools used to decorate ``ask`` interrupts with
FE-card metadata (description, MCP connector). Optional.
trusted_tool_saver: Async callback invoked when an MCP tool's
``always`` decision lands; persists the user's preference to
``connector.config['trusted_tools']``. Optional.
Returns:
``None`` when the engine has no rules to enforce
(``enable_permission=False`` and no subagent rulesets); a
configured middleware otherwise.
"""
permission_enabled = flags.enable_permission and not flags.disable_new_agent_stack
has_subagent_rulesets = bool(subagent_rulesets)
if not (permission_enabled or has_subagent_rulesets):
return None
rulesets: list[Ruleset] = [_SURFSENSE_DEFAULTS]
if subagent_rulesets:
rulesets.extend(subagent_rulesets)
tools_by_name = {t.name: t for t in (tools or [])}
return PermissionMiddleware(
rulesets=rulesets,
tools_by_name=tools_by_name,
trusted_tool_saver=trusted_tool_saver,
)
__all__ = ["build_permission_mw"]

View file

@ -0,0 +1,28 @@
"""Per-tool pattern resolution.
A :data:`PatternResolver` turns a tool's ``args`` dict into a list of
wildcard patterns evaluated against the layered rulesets. The first
pattern is conventionally the bare tool name (catch-all); later entries
narrow down to specific resources (file paths, ids, etc.).
Tools without a custom resolver fall back to :func:`default_pattern_resolver`,
which yields only the bare tool name.
"""
from __future__ import annotations
from collections.abc import Callable
from typing import Any
PatternResolver = Callable[[dict[str, Any]], list[str]]
def default_pattern_resolver(name: str) -> PatternResolver:
def _resolve(args: dict[str, Any]) -> list[str]:
del args
return [name]
return _resolve
__all__ = ["PatternResolver", "default_pattern_resolver"]

View file

@ -0,0 +1,27 @@
"""Combined view over static + runtime rulesets.
Static rulesets come from the agent factory (defaults, space-scoped,
thread-scoped, etc.). The runtime ruleset is the in-memory one that
:func:`runtime_promote.persist_always` extends when the user replies
``"approve_always"``. Evaluators always see them merged in this order so
newly-promoted rules apply to subsequent calls.
"""
from __future__ import annotations
from app.agents.shared.permissions import Ruleset, aggregate_action, evaluate_many
def all_rulesets(
static_rulesets: list[Ruleset], runtime_ruleset: Ruleset
) -> list[Ruleset]:
return [*static_rulesets, runtime_ruleset]
def globally_denied(tool_name: str, rulesets: list[Ruleset]) -> bool:
"""True if an unconditional deny rule blocks every invocation of ``tool_name``."""
rules = evaluate_many(tool_name, ["*"], *rulesets)
return aggregate_action(rules) == "deny"
__all__ = ["all_rulesets", "globally_denied"]

View file

@ -0,0 +1,22 @@
"""Promote an ``"approve_always"`` reply into in-memory allow rules.
Subsequent calls within the same agent instance match these new rules and
proceed without prompting. Durable persistence (to ``agent_permission_rules``)
is the streaming layer's job — this module keeps the in-memory copy only.
"""
from __future__ import annotations
from app.agents.shared.permissions import Rule, Ruleset
def persist_always(
runtime_ruleset: Ruleset, tool_name: str, patterns: list[str]
) -> None:
for pattern in patterns:
runtime_ruleset.rules.append(
Rule(permission=tool_name, pattern=pattern, action="allow")
)
__all__ = ["persist_always"]

View file

@ -0,0 +1,7 @@
"""Resilience middleware shared as the same instances across parent / registry."""
from __future__ import annotations
from .bundle import ResilienceMiddlewares, build_resilience_middlewares
__all__ = ["ResilienceMiddlewares", "build_resilience_middlewares"]

View file

@ -0,0 +1,53 @@
"""Construct each resilience middleware once; same instances flow into every consumer."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from langchain.agents.middleware import (
ModelCallLimitMiddleware,
ToolCallLimitMiddleware,
)
from app.agents.shared.feature_flags import AgentFeatureFlags
from app.agents.shared.middleware import RetryAfterMiddleware
from app.agents.shared.middleware.scoped_model_fallback import (
ScopedModelFallbackMiddleware,
)
from .fallback import build_fallback_mw
from .model_call_limit import build_model_call_limit_mw
from .retry import build_retry_mw
from .tool_call_limit import build_tool_call_limit_mw
@dataclass(frozen=True)
class ResilienceMiddlewares:
"""The four resilience middleware instances, any of which may be ``None`` when disabled by flags."""
retry: RetryAfterMiddleware | None
fallback: ScopedModelFallbackMiddleware | None
model_call_limit: ModelCallLimitMiddleware | None
tool_call_limit: ToolCallLimitMiddleware | None
def as_list(self) -> list[Any]:
return [
m
for m in (
self.retry,
self.fallback,
self.model_call_limit,
self.tool_call_limit,
)
if m is not None
]
def build_resilience_middlewares(flags: AgentFeatureFlags) -> ResilienceMiddlewares:
return ResilienceMiddlewares(
retry=build_retry_mw(flags),
fallback=build_fallback_mw(flags),
model_call_limit=build_model_call_limit_mw(flags),
tool_call_limit=build_tool_call_limit_mw(flags),
)

View file

@ -0,0 +1,27 @@
"""Switch to a fallback model on provider/network errors only."""
from __future__ import annotations
import logging
from app.agents.shared.feature_flags import AgentFeatureFlags
from app.agents.shared.middleware.scoped_model_fallback import (
ScopedModelFallbackMiddleware,
)
from ..flags import enabled
def build_fallback_mw(
flags: AgentFeatureFlags,
) -> ScopedModelFallbackMiddleware | None:
if not enabled(flags, "enable_model_fallback"):
return None
try:
return ScopedModelFallbackMiddleware(
"openai:gpt-4o-mini",
"anthropic:claude-3-5-haiku-20241022",
)
except Exception:
logging.warning("ScopedModelFallbackMiddleware init failed; skipping.")
return None

View file

@ -0,0 +1,21 @@
"""Cap model calls per thread / per run to prevent runaway cost."""
from __future__ import annotations
from langchain.agents.middleware import ModelCallLimitMiddleware
from app.agents.shared.feature_flags import AgentFeatureFlags
from ..flags import enabled
def build_model_call_limit_mw(
flags: AgentFeatureFlags,
) -> ModelCallLimitMiddleware | None:
if not enabled(flags, "enable_model_call_limit"):
return None
return ModelCallLimitMiddleware(
thread_limit=120,
run_limit=80,
exit_behavior="end",
)

View file

@ -0,0 +1,16 @@
"""Retry on transient model errors (e.g. Retry-After-bearing 429s)."""
from __future__ import annotations
from app.agents.shared.feature_flags import AgentFeatureFlags
from app.agents.shared.middleware import RetryAfterMiddleware
from ..flags import enabled
def build_retry_mw(flags: AgentFeatureFlags) -> RetryAfterMiddleware | None:
return (
RetryAfterMiddleware(max_retries=3)
if enabled(flags, "enable_retry_after")
else None
)

View file

@ -0,0 +1,21 @@
"""Cap tool calls per thread / per run to bound infinite-loop blast radius."""
from __future__ import annotations
from langchain.agents.middleware import ToolCallLimitMiddleware
from app.agents.shared.feature_flags import AgentFeatureFlags
from ..flags import enabled
def build_tool_call_limit_mw(
flags: AgentFeatureFlags,
) -> ToolCallLimitMiddleware | None:
if not enabled(flags, "enable_tool_call_limit"):
return None
return ToolCallLimitMiddleware(
thread_limit=300,
run_limit=80,
exit_behavior="continue",
)

View file

@ -0,0 +1,12 @@
"""Todo-list middleware (each consumer needs its own instance)."""
from __future__ import annotations
from langchain.agents.middleware import TodoListMiddleware
def build_todos_mw(*, system_prompt: str | None = None) -> TodoListMiddleware:
"""Pass ``system_prompt=""`` to suppress the upstream prompt append. We use a custom system prompt in the main agent."""
if system_prompt is None:
return TodoListMiddleware()
return TodoListMiddleware(system_prompt=system_prompt)