feat(filesystem): add move and list_tree functionalities to enhance local folder operations

This commit is contained in:
Anish Sarkar 2026-04-27 22:32:37 +05:30
parent 1190ee9449
commit 3fa8c790f5
3 changed files with 758 additions and 0 deletions

View file

@ -7,6 +7,7 @@ This middleware customizes prompts and persists write/edit operations for
from __future__ import annotations
import asyncio
import json
import logging
import re
import secrets
@ -141,6 +142,33 @@ IMPORTANT:
content.
"""
SURFSENSE_MOVE_FILE_TOOL_DESCRIPTION = """Moves or renames a file or folder.
Use absolute paths for both source and destination.
Notes:
- In local-folder mode, paths should use mount prefixes (e.g., /<mount>/foo.txt).
- Rename is a special case of move (same folder, different filename).
- Cross-mount moves are not supported.
"""
SURFSENSE_LIST_TREE_TOOL_DESCRIPTION = """Lists files/folders recursively with cursor pagination.
Use this in desktop local-folder mode to discover nested files at scale.
Args:
- path: absolute mount-prefixed path (e.g., /<mount>/src) or "/" for mount roots.
- max_depth: recursion depth limit (default 8).
- page_size: number of entries to return per page (max 1000).
- cursor: opaque continuation token from a previous call.
- include_files/include_dirs: filter returned entry types.
Returns JSON with:
- entries: [{path, is_dir, size, modified_at, depth}]
- next_cursor: continuation token or null
- has_more: whether additional pages exist
"""
SURFSENSE_GLOB_TOOL_DESCRIPTION = """Find files matching a glob pattern.
Supports standard glob patterns: `*`, `**`, `?`.
@ -222,11 +250,14 @@ class SurfSenseFilesystemMiddleware(FilesystemMiddleware):
)
if filesystem_mode == FilesystemMode.DESKTOP_LOCAL_FOLDER:
system_prompt += (
"\n- move_file: move or rename files/folders in local-folder mode."
"\n- list_tree: recursively list nested local paths with cursor pagination."
"\n\n## Local Folder Mode"
"\n\nThis chat is running in desktop local-folder mode."
" Keep all file operations local. Do not use save_document."
" Always use mount-prefixed absolute paths like /<folder>/file.ext."
" If you are unsure which mounts are available, call ls('/') first."
" For big trees: use list_tree pages, then grep, then read_file."
)
super().__init__(
@ -237,6 +268,8 @@ class SurfSenseFilesystemMiddleware(FilesystemMiddleware):
"read_file": SURFSENSE_READ_FILE_TOOL_DESCRIPTION,
"write_file": SURFSENSE_WRITE_FILE_TOOL_DESCRIPTION,
"edit_file": SURFSENSE_EDIT_FILE_TOOL_DESCRIPTION,
"move_file": SURFSENSE_MOVE_FILE_TOOL_DESCRIPTION,
"list_tree": SURFSENSE_LIST_TREE_TOOL_DESCRIPTION,
"glob": SURFSENSE_GLOB_TOOL_DESCRIPTION,
"grep": SURFSENSE_GREP_TOOL_DESCRIPTION,
},
@ -244,6 +277,9 @@ class SurfSenseFilesystemMiddleware(FilesystemMiddleware):
max_execute_timeout=self._MAX_EXECUTE_TIMEOUT,
)
self.tools = [t for t in self.tools if t.name != "execute"]
if self._filesystem_mode == FilesystemMode.DESKTOP_LOCAL_FOLDER:
self.tools.append(self._create_move_file_tool())
self.tools.append(self._create_list_tree_tool())
if self._should_persist_documents():
self.tools.append(self._create_save_document_tool())
if self._sandbox_available:
@ -836,6 +872,34 @@ class SurfSenseFilesystemMiddleware(FilesystemMiddleware):
return f"/{candidate.lstrip('/')}"
return candidate
def _resolve_move_target_path(
self,
file_path: str,
runtime: ToolRuntime[None, FilesystemState],
) -> str:
candidate = file_path.strip()
if not candidate:
return ""
if self._filesystem_mode == FilesystemMode.DESKTOP_LOCAL_FOLDER:
return self._normalize_local_mount_path(candidate, runtime)
if not candidate.startswith("/"):
return f"/{candidate.lstrip('/')}"
return candidate
def _resolve_list_target_path(
self,
path: str,
runtime: ToolRuntime[None, FilesystemState],
) -> str:
candidate = path.strip() or "/"
if candidate == "/":
return "/"
if self._filesystem_mode == FilesystemMode.DESKTOP_LOCAL_FOLDER:
return self._normalize_local_mount_path(candidate, runtime)
if not candidate.startswith("/"):
return f"/{candidate.lstrip('/')}"
return candidate
@staticmethod
def _is_error_text(value: str) -> bool:
return value.startswith("Error:")
@ -930,6 +994,256 @@ class SurfSenseFilesystemMiddleware(FilesystemMiddleware):
)
return None, updated_content
def _create_move_file_tool(self) -> BaseTool:
"""Create move_file for desktop local-folder mode."""
tool_description = (
self._custom_tool_descriptions.get("move_file")
or SURFSENSE_MOVE_FILE_TOOL_DESCRIPTION
)
def sync_move_file(
source_path: Annotated[
str,
"Absolute source path to move from.",
],
destination_path: Annotated[
str,
"Absolute destination path to move to.",
],
runtime: ToolRuntime[None, FilesystemState],
*,
overwrite: Annotated[
bool,
"If True, replace an existing destination file. Defaults to False.",
] = False,
) -> Command | str:
if self._filesystem_mode != FilesystemMode.DESKTOP_LOCAL_FOLDER:
return "Error: move_file is only available in desktop local-folder mode."
if not source_path.strip() or not destination_path.strip():
return "Error: source_path and destination_path are required."
resolved_backend = self._get_backend(runtime)
source_target = self._resolve_move_target_path(source_path, runtime)
destination_target = self._resolve_move_target_path(destination_path, runtime)
try:
validated_source = validate_path(source_target)
validated_destination = validate_path(destination_target)
except ValueError as exc:
return f"Error: {exc}"
res: WriteResult = resolved_backend.move(
validated_source,
validated_destination,
overwrite=overwrite,
)
if res.error:
return res.error
if res.files_update is not None:
return Command(
update={
"files": res.files_update,
"messages": [
ToolMessage(
content=(
f"Moved '{validated_source}' to "
f"'{res.path or validated_destination}'"
),
tool_call_id=runtime.tool_call_id,
)
],
}
)
return f"Moved '{validated_source}' to '{res.path or validated_destination}'"
async def async_move_file(
source_path: Annotated[
str,
"Absolute source path to move from.",
],
destination_path: Annotated[
str,
"Absolute destination path to move to.",
],
runtime: ToolRuntime[None, FilesystemState],
*,
overwrite: Annotated[
bool,
"If True, replace an existing destination file. Defaults to False.",
] = False,
) -> Command | str:
if self._filesystem_mode != FilesystemMode.DESKTOP_LOCAL_FOLDER:
return "Error: move_file is only available in desktop local-folder mode."
if not source_path.strip() or not destination_path.strip():
return "Error: source_path and destination_path are required."
resolved_backend = self._get_backend(runtime)
source_target = self._resolve_move_target_path(source_path, runtime)
destination_target = self._resolve_move_target_path(destination_path, runtime)
try:
validated_source = validate_path(source_target)
validated_destination = validate_path(destination_target)
except ValueError as exc:
return f"Error: {exc}"
res: WriteResult = await resolved_backend.amove(
validated_source,
validated_destination,
overwrite=overwrite,
)
if res.error:
return res.error
if res.files_update is not None:
return Command(
update={
"files": res.files_update,
"messages": [
ToolMessage(
content=(
f"Moved '{validated_source}' to "
f"'{res.path or validated_destination}'"
),
tool_call_id=runtime.tool_call_id,
)
],
}
)
return f"Moved '{validated_source}' to '{res.path or validated_destination}'"
return StructuredTool.from_function(
name="move_file",
description=tool_description,
func=sync_move_file,
coroutine=async_move_file,
)
def _create_list_tree_tool(self) -> BaseTool:
"""Create list_tree for desktop local-folder mode."""
tool_description = (
self._custom_tool_descriptions.get("list_tree")
or SURFSENSE_LIST_TREE_TOOL_DESCRIPTION
)
def sync_list_tree(
runtime: ToolRuntime[None, FilesystemState],
*,
path: Annotated[
str,
"Absolute path to list from. Use '/' for mount roots.",
] = "/",
max_depth: Annotated[
int,
"Maximum recursion depth to traverse. Defaults to 8.",
] = 8,
page_size: Annotated[
int,
"Number of entries to return per page. Defaults to 500 (max 1000).",
] = 500,
cursor: Annotated[
str | None,
"Opaque cursor from a previous list_tree call.",
] = None,
include_files: Annotated[
bool,
"Whether file entries should be included.",
] = True,
include_dirs: Annotated[
bool,
"Whether directory entries should be included.",
] = True,
) -> str:
if self._filesystem_mode != FilesystemMode.DESKTOP_LOCAL_FOLDER:
return "Error: list_tree is only available in desktop local-folder mode."
if max_depth < 0:
return "Error: max_depth must be >= 0."
if page_size < 1:
return "Error: page_size must be >= 1."
if not include_files and not include_dirs:
return "Error: include_files and include_dirs cannot both be false."
resolved_backend = self._get_backend(runtime)
target_path = self._resolve_list_target_path(path, runtime)
try:
validated_path = validate_path(target_path)
except ValueError as exc:
return f"Error: {exc}"
result = resolved_backend.list_tree(
validated_path,
max_depth=max_depth,
page_size=page_size,
cursor=cursor,
include_files=include_files,
include_dirs=include_dirs,
)
error = result.get("error") if isinstance(result, dict) else None
if isinstance(error, str) and error:
return error
return json.dumps(result, ensure_ascii=True)
async def async_list_tree(
runtime: ToolRuntime[None, FilesystemState],
*,
path: Annotated[
str,
"Absolute path to list from. Use '/' for mount roots.",
] = "/",
max_depth: Annotated[
int,
"Maximum recursion depth to traverse. Defaults to 8.",
] = 8,
page_size: Annotated[
int,
"Number of entries to return per page. Defaults to 500 (max 1000).",
] = 500,
cursor: Annotated[
str | None,
"Opaque cursor from a previous list_tree call.",
] = None,
include_files: Annotated[
bool,
"Whether file entries should be included.",
] = True,
include_dirs: Annotated[
bool,
"Whether directory entries should be included.",
] = True,
) -> str:
if self._filesystem_mode != FilesystemMode.DESKTOP_LOCAL_FOLDER:
return "Error: list_tree is only available in desktop local-folder mode."
if max_depth < 0:
return "Error: max_depth must be >= 0."
if page_size < 1:
return "Error: page_size must be >= 1."
if not include_files and not include_dirs:
return "Error: include_files and include_dirs cannot both be false."
resolved_backend = self._get_backend(runtime)
target_path = self._resolve_list_target_path(path, runtime)
try:
validated_path = validate_path(target_path)
except ValueError as exc:
return f"Error: {exc}"
result = await resolved_backend.alist_tree(
validated_path,
max_depth=max_depth,
page_size=page_size,
cursor=cursor,
include_files=include_files,
include_dirs=include_dirs,
)
error = result.get("error") if isinstance(result, dict) else None
if isinstance(error, str) and error:
return error
return json.dumps(result, ensure_ascii=True)
return StructuredTool.from_function(
name="list_tree",
description=tool_description,
func=sync_list_tree,
coroutine=async_list_tree,
)
def _create_edit_file_tool(self) -> BaseTool:
"""Create edit_file with DB persistence (skipped for KB documents)."""
tool_description = (

View file

@ -6,7 +6,12 @@ import asyncio
import fnmatch
import os
import threading
from collections import deque
from contextlib import ExitStack
from pathlib import Path
from time import time
from typing import Any
from uuid import uuid4
from deepagents.backends.protocol import (
EditResult,
@ -38,6 +43,8 @@ class LocalFolderBackend:
self._root = root
self._locks: dict[str, threading.Lock] = {}
self._locks_mu = threading.Lock()
self._tree_sessions: dict[str, dict[str, Any]] = {}
self._tree_sessions_ttl_s = 900
def _lock_for(self, path: str) -> threading.Lock:
with self._locks_mu:
@ -71,6 +78,54 @@ class LocalFolderBackend:
temp_path.write_text(content, encoding="utf-8")
os.replace(temp_path, path)
def _acquire_path_locks(self, *paths: str) -> ExitStack:
ordered_paths = sorted(set(paths))
stack = ExitStack()
for path in ordered_paths:
stack.enter_context(self._lock_for(path))
return stack
@staticmethod
def _clamp_page_size(page_size: int) -> int:
return max(1, min(page_size, 1000))
def _prune_expired_tree_sessions(self) -> None:
now = time()
expired = [
cursor
for cursor, session in self._tree_sessions.items()
if now - float(session.get("last_accessed_at", now)) > self._tree_sessions_ttl_s
]
for cursor in expired:
self._tree_sessions.pop(cursor, None)
def _read_dir_entries(self, directory_path: str) -> list[dict[str, Any]]:
directory = Path(directory_path)
try:
children = sorted(
directory.iterdir(),
key=lambda p: (not p.is_dir(), p.name.lower()),
)
except OSError:
return []
entries: list[dict[str, Any]] = []
for child in children:
try:
stat_result = child.stat()
except OSError:
continue
entries.append(
{
"path": self._to_virtual(child, self._root),
"is_dir": child.is_dir(),
"size": stat_result.st_size if child.is_file() else 0,
"modified_at": str(stat_result.st_mtime),
"absolute_path": str(child),
}
)
return entries
def ls_info(self, path: str) -> list[FileInfo]:
try:
target = self._resolve_virtual(path, allow_root=True)
@ -145,6 +200,232 @@ class LocalFolderBackend:
async def awrite(self, file_path: str, content: str) -> WriteResult:
return await asyncio.to_thread(self.write, file_path, content)
def list_tree(
self,
path: str = "/",
*,
max_depth: int | None = 8,
page_size: int = 500,
cursor: str | None = None,
include_files: bool = True,
include_dirs: bool = True,
) -> dict[str, Any]:
self._prune_expired_tree_sessions()
if not include_files and not include_dirs:
return {
"entries": [],
"next_cursor": None,
"has_more": False,
"truncated": False,
}
normalized_depth = None if max_depth is None else max(0, int(max_depth))
page_limit = self._clamp_page_size(int(page_size))
now = time()
if cursor:
session = self._tree_sessions.get(cursor)
if not session:
return {"error": "Invalid or expired cursor"}
if (
session.get("path") != path
or session.get("max_depth") != normalized_depth
or session.get("include_files") != include_files
or session.get("include_dirs") != include_dirs
):
return {"error": "Cursor options do not match request options"}
state = session
else:
try:
start = self._resolve_virtual(path, allow_root=True)
except ValueError:
return {"error": f"Error: invalid path '{path}'"}
if not start.exists():
return {"error": f"Error: path '{path}' not found"}
if start.is_file():
stat_result = start.stat()
if include_files:
return {
"entries": [
{
"path": self._to_virtual(start, self._root),
"is_dir": False,
"size": stat_result.st_size,
"modified_at": str(stat_result.st_mtime),
"depth": 0,
}
],
"next_cursor": None,
"has_more": False,
"truncated": False,
}
return {
"entries": [],
"next_cursor": None,
"has_more": False,
"truncated": False,
}
state = {
"path": path,
"max_depth": normalized_depth,
"include_files": include_files,
"include_dirs": include_dirs,
"pending_dirs": deque([(str(start), 0)]),
"active_dir": None,
"active_depth": 0,
"active_entries": [],
"active_index": 0,
}
entries: list[dict[str, Any]] = []
truncated = False
while len(entries) < page_limit:
active_entries = state.get("active_entries", [])
active_index = int(state.get("active_index", 0))
if active_index >= len(active_entries):
pending_dirs = state.get("pending_dirs", [])
if not pending_dirs:
state["active_entries"] = []
state["active_index"] = 0
break
next_dir_path, next_depth = pending_dirs.popleft()
state["active_dir"] = next_dir_path
state["active_depth"] = next_depth
state["active_entries"] = self._read_dir_entries(next_dir_path)
state["active_index"] = 0
active_entries = state["active_entries"]
active_index = 0
if active_index >= len(active_entries):
continue
item = active_entries[active_index]
state["active_index"] = active_index + 1
item_depth = int(state.get("active_depth", 0)) + 1
if normalized_depth is not None and item_depth > normalized_depth:
continue
if item["is_dir"]:
if normalized_depth is None or item_depth <= normalized_depth:
state["pending_dirs"].append((item["absolute_path"], item_depth))
if include_dirs:
entries.append(
{
"path": item["path"],
"is_dir": True,
"size": 0,
"modified_at": item["modified_at"],
"depth": item_depth,
}
)
elif include_files:
entries.append(
{
"path": item["path"],
"is_dir": False,
"size": item["size"],
"modified_at": item["modified_at"],
"depth": item_depth,
}
)
if len(entries) >= page_limit:
truncated = True
break
has_more = bool(state.get("pending_dirs")) or (
int(state.get("active_index", 0)) < len(state.get("active_entries", []))
)
if has_more:
next_cursor = cursor or uuid4().hex
state["last_accessed_at"] = now
self._tree_sessions[next_cursor] = state
else:
next_cursor = None
if cursor:
self._tree_sessions.pop(cursor, None)
return {
"entries": entries,
"next_cursor": next_cursor,
"has_more": has_more,
"truncated": truncated,
}
async def alist_tree(
self,
path: str = "/",
*,
max_depth: int | None = 8,
page_size: int = 500,
cursor: str | None = None,
include_files: bool = True,
include_dirs: bool = True,
) -> dict[str, Any]:
return await asyncio.to_thread(
self.list_tree,
path,
max_depth=max_depth,
page_size=page_size,
cursor=cursor,
include_files=include_files,
include_dirs=include_dirs,
)
def move(
self,
source_path: str,
destination_path: str,
overwrite: bool = False,
) -> WriteResult:
try:
source = self._resolve_virtual(source_path)
destination = self._resolve_virtual(destination_path)
except ValueError:
return WriteResult(
error=(
f"Error: invalid source '{source_path}' or destination "
f"'{destination_path}' path"
)
)
if source == destination:
return WriteResult(error="Error: source and destination paths are the same")
with self._acquire_path_locks(source_path, destination_path):
if not source.exists():
return WriteResult(error=f"Error: source path '{source_path}' not found")
if destination.exists():
if not overwrite:
return WriteResult(
error=(
f"Error: destination path '{destination_path}' already exists. "
"Set overwrite=True to replace files."
)
)
if source.is_dir() or destination.is_dir():
return WriteResult(
error=(
"Error: overwrite=True is only supported for file-to-file moves."
)
)
destination.parent.mkdir(parents=True, exist_ok=True)
try:
if overwrite:
os.replace(source, destination)
else:
source.rename(destination)
except OSError as exc:
return WriteResult(error=f"Error: failed to move '{source_path}': {exc}")
return WriteResult(path=self._to_virtual(destination, self._root), files_update=None)
async def amove(
self,
source_path: str,
destination_path: str,
overwrite: bool = False,
) -> WriteResult:
return await asyncio.to_thread(
self.move, source_path, destination_path, overwrite
)
def edit(
self,
file_path: str,

View file

@ -3,6 +3,8 @@
from __future__ import annotations
import asyncio
import base64
import json
from pathlib import Path
from typing import Any
@ -107,6 +109,28 @@ class MultiRootLocalFolderBackend:
for mount in self._mount_order
]
@staticmethod
def _encode_tree_cursor(mount: str, local_cursor: str) -> str:
payload = json.dumps(
{"mount": mount, "cursor": local_cursor},
separators=(",", ":"),
).encode("utf-8")
return base64.urlsafe_b64encode(payload).decode("ascii")
@staticmethod
def _decode_tree_cursor(cursor: str) -> tuple[str, str]:
try:
padded = cursor + "=" * ((4 - len(cursor) % 4) % 4)
data = base64.urlsafe_b64decode(padded.encode("ascii"))
parsed = json.loads(data.decode("utf-8"))
except Exception as exc:
raise ValueError("Invalid cursor") from exc
mount = parsed.get("mount")
local_cursor = parsed.get("cursor")
if not isinstance(mount, str) or not isinstance(local_cursor, str):
raise ValueError("Invalid cursor")
return mount, local_cursor
def _transform_infos(self, mount: str, infos: list[FileInfo]) -> list[FileInfo]:
transformed: list[FileInfo] = []
for info in infos:
@ -132,6 +156,103 @@ class MultiRootLocalFolderBackend:
async def als_info(self, path: str) -> list[FileInfo]:
return await asyncio.to_thread(self.ls_info, path)
def list_tree(
self,
path: str = "/",
*,
max_depth: int | None = 8,
page_size: int = 500,
cursor: str | None = None,
include_files: bool = True,
include_dirs: bool = True,
) -> dict[str, Any]:
if path == "/" and not cursor:
entries = [
{
"path": f"/{mount}",
"is_dir": True,
"size": 0,
"modified_at": "0",
"depth": 0,
}
for mount in self._mount_order
]
return {
"entries": entries if include_dirs else [],
"next_cursor": None,
"has_more": False,
"truncated": False,
}
try:
if cursor:
mount, local_cursor = self._decode_tree_cursor(cursor)
if mount not in self._mount_to_backend:
return {"error": "Invalid or expired cursor"}
local_path = "/"
else:
mount, local_path = self._split_mount_path(path)
local_cursor = None
except ValueError as exc:
return {"error": f"Error: {exc}"}
result = self._mount_to_backend[mount].list_tree(
local_path,
max_depth=max_depth,
page_size=page_size,
cursor=local_cursor,
include_files=include_files,
include_dirs=include_dirs,
)
if result.get("error"):
return result
entries: list[dict[str, Any]] = []
for entry in result.get("entries", []):
raw_path = self._get_str(entry, "path")
entries.append(
{
"path": self._prefix_mount_path(mount, raw_path),
"is_dir": self._get_bool(entry, "is_dir"),
"size": self._get_int(entry, "size"),
"modified_at": self._get_str(entry, "modified_at"),
"depth": self._get_int(entry, "depth"),
}
)
local_next_cursor = self._get_str(result, "next_cursor")
next_cursor = (
self._encode_tree_cursor(mount, local_next_cursor)
if local_next_cursor
else None
)
return {
"entries": entries,
"next_cursor": next_cursor,
"has_more": self._get_bool(result, "has_more"),
"truncated": self._get_bool(result, "truncated"),
}
async def alist_tree(
self,
path: str = "/",
*,
max_depth: int | None = 8,
page_size: int = 500,
cursor: str | None = None,
include_files: bool = True,
include_dirs: bool = True,
) -> dict[str, Any]:
return await asyncio.to_thread(
self.list_tree,
path,
max_depth=max_depth,
page_size=page_size,
cursor=cursor,
include_files=include_files,
include_dirs=include_dirs,
)
def read(self, file_path: str, offset: int = 0, limit: int = 2000) -> str:
try:
mount, local_path = self._split_mount_path(file_path)
@ -165,6 +286,48 @@ class MultiRootLocalFolderBackend:
async def awrite(self, file_path: str, content: str) -> WriteResult:
return await asyncio.to_thread(self.write, file_path, content)
def move(
self,
source_path: str,
destination_path: str,
overwrite: bool = False,
) -> WriteResult:
try:
source_mount, source_local_path = self._split_mount_path(source_path)
destination_mount, destination_local_path = self._split_mount_path(
destination_path
)
except ValueError as exc:
return WriteResult(error=f"Error: {exc}")
if source_mount != destination_mount:
return WriteResult(
error=(
"Error: cross-mount moves are not supported. "
"Source and destination must be under the same mounted root."
)
)
result = self._mount_to_backend[source_mount].move(
source_local_path,
destination_local_path,
overwrite=overwrite,
)
if result.path:
result.path = self._prefix_mount_path(source_mount, result.path)
return result
async def amove(
self,
source_path: str,
destination_path: str,
overwrite: bool = False,
) -> WriteResult:
return await asyncio.to_thread(
self.move,
source_path,
destination_path,
overwrite,
)
def edit(
self,
file_path: str,