Merge pull request #1314 from AnishSarkar22/fix/swappable-filesystem

fix: improve swappable filesystem architecture
This commit is contained in:
Rohan Verma 2026-04-27 13:32:56 -07:00 committed by GitHub
commit 19f4668e8b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 2364 additions and 407 deletions

View file

@ -109,37 +109,6 @@ def _sanitize_path_segment(value: str) -> str:
return segment
def _infer_text_file_extension(user_text: str) -> str:
lowered = user_text.lower()
if any(token in lowered for token in ("json", ".json")):
return ".json"
if any(token in lowered for token in ("yaml", "yml", ".yaml", ".yml")):
return ".yaml"
if any(token in lowered for token in ("csv", ".csv")):
return ".csv"
if any(token in lowered for token in ("python", ".py")):
return ".py"
if any(token in lowered for token in ("typescript", ".ts", ".tsx")):
return ".ts"
if any(token in lowered for token in ("javascript", ".js", ".mjs", ".cjs")):
return ".js"
if any(token in lowered for token in ("html", ".html")):
return ".html"
if any(token in lowered for token in ("css", ".css")):
return ".css"
if any(token in lowered for token in ("sql", ".sql")):
return ".sql"
if any(token in lowered for token in ("toml", ".toml")):
return ".toml"
if any(token in lowered for token in ("ini", ".ini")):
return ".ini"
if any(token in lowered for token in ("xml", ".xml")):
return ".xml"
if any(token in lowered for token in ("markdown", ".md", "readme")):
return ".md"
return ".md"
def _normalize_directory(value: str) -> str:
raw = value.strip().replace("\\", "/")
raw = raw.strip("/")
@ -193,7 +162,6 @@ def _fallback_path(
suggested_path: str | None = None,
user_text: str,
) -> str:
default_extension = _infer_text_file_extension(user_text)
inferred_dir = _infer_directory_from_user_text(user_text)
sanitized_filename = ""
@ -202,9 +170,9 @@ def _fallback_path(
if sanitized_filename.lower().endswith(".txt"):
sanitized_filename = f"{sanitized_filename[:-4]}.md"
if not sanitized_filename:
sanitized_filename = f"notes{default_extension}"
sanitized_filename = "notes.md"
elif "." not in sanitized_filename:
sanitized_filename = f"{sanitized_filename}{default_extension}"
sanitized_filename = f"{sanitized_filename}.md"
normalized_suggested_path = (
_normalize_file_path(suggested_path) if suggested_path else ""

View file

@ -7,6 +7,7 @@ This middleware customizes prompts and persists write/edit operations for
from __future__ import annotations
import asyncio
import json
import logging
import re
import secrets
@ -141,6 +142,31 @@ IMPORTANT:
content.
"""
SURFSENSE_MOVE_FILE_TOOL_DESCRIPTION = """Moves or renames a file or folder.
Use absolute paths for both source and destination.
Notes:
- In local-folder mode, paths should use mount prefixes (e.g., /<mount>/foo.txt).
- Rename is a special case of move (same folder, different filename).
- Cross-mount moves are not supported.
"""
SURFSENSE_LIST_TREE_TOOL_DESCRIPTION = """Lists files/folders recursively in a single bounded call.
Use this in desktop local-folder mode to discover nested files at scale.
Args:
- path: absolute mount-prefixed path (e.g., /<mount>/src) or "/" for mount roots.
- max_depth: recursion depth limit (default 8).
- page_size: maximum number of entries returned (max 1000).
- include_files/include_dirs: filter returned entry types.
Returns JSON with:
- entries: [{path, is_dir, size, modified_at, depth}]
- truncated: true when additional entries were omitted due to page_size
"""
SURFSENSE_GLOB_TOOL_DESCRIPTION = """Find files matching a glob pattern.
Supports standard glob patterns: `*`, `**`, `?`.
@ -222,11 +248,14 @@ class SurfSenseFilesystemMiddleware(FilesystemMiddleware):
)
if filesystem_mode == FilesystemMode.DESKTOP_LOCAL_FOLDER:
system_prompt += (
"\n- move_file: move or rename files/folders in local-folder mode."
"\n- list_tree: recursively list nested local paths in one bounded response."
"\n\n## Local Folder Mode"
"\n\nThis chat is running in desktop local-folder mode."
" Keep all file operations local. Do not use save_document."
" Always use mount-prefixed absolute paths like /<folder>/file.ext."
" If you are unsure which mounts are available, call ls('/') first."
" For big trees: use list_tree, then grep, then read_file."
)
super().__init__(
@ -237,6 +266,8 @@ class SurfSenseFilesystemMiddleware(FilesystemMiddleware):
"read_file": SURFSENSE_READ_FILE_TOOL_DESCRIPTION,
"write_file": SURFSENSE_WRITE_FILE_TOOL_DESCRIPTION,
"edit_file": SURFSENSE_EDIT_FILE_TOOL_DESCRIPTION,
"move_file": SURFSENSE_MOVE_FILE_TOOL_DESCRIPTION,
"list_tree": SURFSENSE_LIST_TREE_TOOL_DESCRIPTION,
"glob": SURFSENSE_GLOB_TOOL_DESCRIPTION,
"grep": SURFSENSE_GREP_TOOL_DESCRIPTION,
},
@ -244,6 +275,9 @@ class SurfSenseFilesystemMiddleware(FilesystemMiddleware):
max_execute_timeout=self._MAX_EXECUTE_TIMEOUT,
)
self.tools = [t for t in self.tools if t.name != "execute"]
if self._filesystem_mode == FilesystemMode.DESKTOP_LOCAL_FOLDER:
self.tools.append(self._create_move_file_tool())
self.tools.append(self._create_list_tree_tool())
if self._should_persist_documents():
self.tools.append(self._create_save_document_tool())
if self._sandbox_available:
@ -776,35 +810,97 @@ class SurfSenseFilesystemMiddleware(FilesystemMiddleware):
"""Only cloud mode persists file content to Document/Chunk tables."""
return self._filesystem_mode == FilesystemMode.CLOUD
def _default_mount_prefix(self, runtime: ToolRuntime[None, FilesystemState]) -> str:
backend = self._get_backend(runtime)
if isinstance(backend, MultiRootLocalFolderBackend):
return f"/{backend.default_mount()}"
return ""
@staticmethod
def _normalize_absolute_path(candidate: str) -> str:
normalized = re.sub(r"/+", "/", candidate.strip().replace("\\", "/"))
if not normalized:
return "/"
if normalized.startswith("/"):
return normalized
return f"/{normalized.lstrip('/')}"
@staticmethod
def _extract_mount_from_path(path: str, mounts: tuple[str, ...]) -> str | None:
rel = path.lstrip("/")
if not rel:
return None
mount, _, _ = rel.partition("/")
if mount in mounts:
return mount
return None
@staticmethod
def _local_parent_path(path: str) -> str:
rel = path.lstrip("/")
if "/" not in rel:
return "/"
parent = rel.rsplit("/", 1)[0].strip("/")
if not parent:
return "/"
return f"/{parent}"
@staticmethod
def _path_exists_under_mount(
backend: MultiRootLocalFolderBackend,
mount: str,
local_path: str,
) -> bool:
result = backend.list_tree(
f"/{mount}{local_path}",
max_depth=0,
page_size=1,
include_files=True,
include_dirs=True,
)
return not bool(result.get("error"))
def _normalize_local_mount_path(
self, candidate: str, runtime: ToolRuntime[None, FilesystemState]
self,
candidate: str,
runtime: ToolRuntime[None, FilesystemState],
) -> str:
normalized = self._normalize_absolute_path(candidate)
backend = self._get_backend(runtime)
mount_prefix = self._default_mount_prefix(runtime)
normalized_candidate = re.sub(r"/+", "/", candidate.strip().replace("\\", "/"))
if not mount_prefix or not isinstance(backend, MultiRootLocalFolderBackend):
if normalized_candidate.startswith("/"):
return normalized_candidate
return f"/{normalized_candidate.lstrip('/')}"
if not isinstance(backend, MultiRootLocalFolderBackend):
return normalized
mount_names = set(backend.list_mounts())
if normalized_candidate.startswith("/"):
first_segment = normalized_candidate.lstrip("/").split("/", 1)[0]
if first_segment in mount_names:
return normalized_candidate
return f"{mount_prefix}{normalized_candidate}"
mounts = backend.list_mounts()
explicit_mount = self._extract_mount_from_path(normalized, mounts)
if explicit_mount:
return normalized
relative = normalized_candidate.lstrip("/")
first_segment = relative.split("/", 1)[0]
if first_segment in mount_names:
return f"/{relative}"
return f"{mount_prefix}/{relative}"
if len(mounts) == 1:
return f"/{mounts[0]}{normalized}"
suggested_mount: str | None = None
contract = runtime.state.get("file_operation_contract") or {}
suggested_path = contract.get("suggested_path")
if isinstance(suggested_path, str) and suggested_path.strip():
normalized_suggested = self._normalize_absolute_path(suggested_path)
suggested_mount = self._extract_mount_from_path(normalized_suggested, mounts)
matching_mounts = [
mount
for mount in mounts
if self._path_exists_under_mount(backend, mount, normalized)
]
if len(matching_mounts) == 1:
return f"/{matching_mounts[0]}{normalized}"
parent_path = self._local_parent_path(normalized)
if parent_path != "/":
parent_matching_mounts = [
mount
for mount in mounts
if self._path_exists_under_mount(backend, mount, parent_path)
]
if len(parent_matching_mounts) == 1:
return f"/{parent_matching_mounts[0]}{normalized}"
if suggested_mount:
return f"/{suggested_mount}{normalized}"
return f"/{backend.default_mount()}{normalized}"
def _get_contract_suggested_path(
self, runtime: ToolRuntime[None, FilesystemState]
@ -812,14 +908,7 @@ class SurfSenseFilesystemMiddleware(FilesystemMiddleware):
contract = runtime.state.get("file_operation_contract") or {}
suggested = contract.get("suggested_path")
if isinstance(suggested, str) and suggested.strip():
cleaned = suggested.strip()
if self._filesystem_mode == FilesystemMode.DESKTOP_LOCAL_FOLDER:
return self._normalize_local_mount_path(cleaned, runtime)
return cleaned
if self._filesystem_mode == FilesystemMode.DESKTOP_LOCAL_FOLDER:
mount_prefix = self._default_mount_prefix(runtime)
if mount_prefix:
return f"{mount_prefix}/notes.md"
return self._normalize_absolute_path(suggested)
return "/notes.md"
def _resolve_write_target_path(
@ -836,6 +925,34 @@ class SurfSenseFilesystemMiddleware(FilesystemMiddleware):
return f"/{candidate.lstrip('/')}"
return candidate
def _resolve_move_target_path(
self,
file_path: str,
runtime: ToolRuntime[None, FilesystemState],
) -> str:
candidate = file_path.strip()
if not candidate:
return ""
if self._filesystem_mode == FilesystemMode.DESKTOP_LOCAL_FOLDER:
return self._normalize_local_mount_path(candidate, runtime)
if not candidate.startswith("/"):
return f"/{candidate.lstrip('/')}"
return candidate
def _resolve_list_target_path(
self,
path: str,
runtime: ToolRuntime[None, FilesystemState],
) -> str:
candidate = path.strip() or "/"
if candidate == "/":
return "/"
if self._filesystem_mode == FilesystemMode.DESKTOP_LOCAL_FOLDER:
return self._normalize_local_mount_path(candidate, runtime)
if not candidate.startswith("/"):
return f"/{candidate.lstrip('/')}"
return candidate
@staticmethod
def _is_error_text(value: str) -> bool:
return value.startswith("Error:")
@ -930,6 +1047,246 @@ class SurfSenseFilesystemMiddleware(FilesystemMiddleware):
)
return None, updated_content
def _create_move_file_tool(self) -> BaseTool:
"""Create move_file for desktop local-folder mode."""
tool_description = (
self._custom_tool_descriptions.get("move_file")
or SURFSENSE_MOVE_FILE_TOOL_DESCRIPTION
)
def sync_move_file(
source_path: Annotated[
str,
"Absolute source path to move from.",
],
destination_path: Annotated[
str,
"Absolute destination path to move to.",
],
runtime: ToolRuntime[None, FilesystemState],
*,
overwrite: Annotated[
bool,
"If True, replace an existing destination file. Defaults to False.",
] = False,
) -> Command | str:
if self._filesystem_mode != FilesystemMode.DESKTOP_LOCAL_FOLDER:
return "Error: move_file is only available in desktop local-folder mode."
if not source_path.strip() or not destination_path.strip():
return "Error: source_path and destination_path are required."
resolved_backend = self._get_backend(runtime)
source_target = self._resolve_move_target_path(source_path, runtime)
destination_target = self._resolve_move_target_path(destination_path, runtime)
try:
validated_source = validate_path(source_target)
validated_destination = validate_path(destination_target)
except ValueError as exc:
return f"Error: {exc}"
res: WriteResult = resolved_backend.move(
validated_source,
validated_destination,
overwrite=overwrite,
)
if res.error:
return res.error
if res.files_update is not None:
return Command(
update={
"files": res.files_update,
"messages": [
ToolMessage(
content=(
f"Moved '{validated_source}' to "
f"'{res.path or validated_destination}'"
),
tool_call_id=runtime.tool_call_id,
)
],
}
)
return f"Moved '{validated_source}' to '{res.path or validated_destination}'"
async def async_move_file(
source_path: Annotated[
str,
"Absolute source path to move from.",
],
destination_path: Annotated[
str,
"Absolute destination path to move to.",
],
runtime: ToolRuntime[None, FilesystemState],
*,
overwrite: Annotated[
bool,
"If True, replace an existing destination file. Defaults to False.",
] = False,
) -> Command | str:
if self._filesystem_mode != FilesystemMode.DESKTOP_LOCAL_FOLDER:
return "Error: move_file is only available in desktop local-folder mode."
if not source_path.strip() or not destination_path.strip():
return "Error: source_path and destination_path are required."
resolved_backend = self._get_backend(runtime)
source_target = self._resolve_move_target_path(source_path, runtime)
destination_target = self._resolve_move_target_path(destination_path, runtime)
try:
validated_source = validate_path(source_target)
validated_destination = validate_path(destination_target)
except ValueError as exc:
return f"Error: {exc}"
res: WriteResult = await resolved_backend.amove(
validated_source,
validated_destination,
overwrite=overwrite,
)
if res.error:
return res.error
if res.files_update is not None:
return Command(
update={
"files": res.files_update,
"messages": [
ToolMessage(
content=(
f"Moved '{validated_source}' to "
f"'{res.path or validated_destination}'"
),
tool_call_id=runtime.tool_call_id,
)
],
}
)
return f"Moved '{validated_source}' to '{res.path or validated_destination}'"
return StructuredTool.from_function(
name="move_file",
description=tool_description,
func=sync_move_file,
coroutine=async_move_file,
)
def _create_list_tree_tool(self) -> BaseTool:
"""Create list_tree for desktop local-folder mode."""
tool_description = (
self._custom_tool_descriptions.get("list_tree")
or SURFSENSE_LIST_TREE_TOOL_DESCRIPTION
)
def sync_list_tree(
runtime: ToolRuntime[None, FilesystemState],
*,
path: Annotated[
str,
"Absolute path to list from. Use '/' for mount roots.",
] = "/",
max_depth: Annotated[
int,
"Maximum recursion depth to traverse. Defaults to 8.",
] = 8,
page_size: Annotated[
int,
"Maximum number of entries to return. Defaults to 500 (max 1000).",
] = 500,
include_files: Annotated[
bool,
"Whether file entries should be included.",
] = True,
include_dirs: Annotated[
bool,
"Whether directory entries should be included.",
] = True,
) -> str:
if self._filesystem_mode != FilesystemMode.DESKTOP_LOCAL_FOLDER:
return "Error: list_tree is only available in desktop local-folder mode."
if max_depth < 0:
return "Error: max_depth must be >= 0."
if page_size < 1:
return "Error: page_size must be >= 1."
if not include_files and not include_dirs:
return "Error: include_files and include_dirs cannot both be false."
resolved_backend = self._get_backend(runtime)
target_path = self._resolve_list_target_path(path, runtime)
try:
validated_path = validate_path(target_path)
except ValueError as exc:
return f"Error: {exc}"
result = resolved_backend.list_tree(
validated_path,
max_depth=max_depth,
page_size=page_size,
include_files=include_files,
include_dirs=include_dirs,
)
error = result.get("error") if isinstance(result, dict) else None
if isinstance(error, str) and error:
return error
return json.dumps(result, ensure_ascii=True)
async def async_list_tree(
runtime: ToolRuntime[None, FilesystemState],
*,
path: Annotated[
str,
"Absolute path to list from. Use '/' for mount roots.",
] = "/",
max_depth: Annotated[
int,
"Maximum recursion depth to traverse. Defaults to 8.",
] = 8,
page_size: Annotated[
int,
"Maximum number of entries to return. Defaults to 500 (max 1000).",
] = 500,
include_files: Annotated[
bool,
"Whether file entries should be included.",
] = True,
include_dirs: Annotated[
bool,
"Whether directory entries should be included.",
] = True,
) -> str:
if self._filesystem_mode != FilesystemMode.DESKTOP_LOCAL_FOLDER:
return "Error: list_tree is only available in desktop local-folder mode."
if max_depth < 0:
return "Error: max_depth must be >= 0."
if page_size < 1:
return "Error: page_size must be >= 1."
if not include_files and not include_dirs:
return "Error: include_files and include_dirs cannot both be false."
resolved_backend = self._get_backend(runtime)
target_path = self._resolve_list_target_path(path, runtime)
try:
validated_path = validate_path(target_path)
except ValueError as exc:
return f"Error: {exc}"
result = await resolved_backend.alist_tree(
validated_path,
max_depth=max_depth,
page_size=page_size,
include_files=include_files,
include_dirs=include_dirs,
)
error = result.get("error") if isinstance(result, dict) else None
if isinstance(error, str) and error:
return error
return json.dumps(result, ensure_ascii=True)
return StructuredTool.from_function(
name="list_tree",
description=tool_description,
func=sync_list_tree,
coroutine=async_list_tree,
)
def _create_edit_file_tool(self) -> BaseTool:
"""Create edit_file with DB persistence (skipped for KB documents)."""
tool_description = (

View file

@ -6,7 +6,10 @@ import asyncio
import fnmatch
import os
import threading
from collections import deque
from contextlib import ExitStack
from pathlib import Path
from typing import Any
from deepagents.backends.protocol import (
EditResult,
@ -71,6 +74,44 @@ class LocalFolderBackend:
temp_path.write_text(content, encoding="utf-8")
os.replace(temp_path, path)
def _acquire_path_locks(self, *paths: str) -> ExitStack:
ordered_paths = sorted(set(paths))
stack = ExitStack()
for path in ordered_paths:
stack.enter_context(self._lock_for(path))
return stack
@staticmethod
def _clamp_page_size(page_size: int) -> int:
return max(1, min(page_size, 1000))
def _read_dir_entries(self, directory_path: str) -> list[dict[str, Any]]:
directory = Path(directory_path)
try:
children = sorted(
directory.iterdir(),
key=lambda p: (not p.is_dir(), p.name.lower()),
)
except OSError:
return []
entries: list[dict[str, Any]] = []
for child in children:
try:
stat_result = child.stat()
except OSError:
continue
entries.append(
{
"path": self._to_virtual(child, self._root),
"is_dir": child.is_dir(),
"size": stat_result.st_size if child.is_file() else 0,
"modified_at": str(stat_result.st_mtime),
"absolute_path": str(child),
}
)
return entries
def ls_info(self, path: str) -> list[FileInfo]:
try:
target = self._resolve_virtual(path, allow_root=True)
@ -139,12 +180,178 @@ class LocalFolderBackend:
"Read and then make an edit, or write to a new path."
)
)
parent = path.parent
if not parent.exists() or not parent.is_dir():
return WriteResult(
error=(
f"Error: parent directory for '{file_path}' does not exist. "
"Create the folder first or write to an existing directory."
)
)
self._write_text_atomic(path, content)
return WriteResult(path=file_path, files_update=None)
async def awrite(self, file_path: str, content: str) -> WriteResult:
return await asyncio.to_thread(self.write, file_path, content)
def list_tree(
self,
path: str = "/",
*,
max_depth: int | None = 8,
page_size: int = 500,
include_files: bool = True,
include_dirs: bool = True,
) -> dict[str, Any]:
if not include_files and not include_dirs:
return {
"entries": [],
"truncated": False,
}
normalized_depth = None if max_depth is None else max(0, int(max_depth))
page_limit = self._clamp_page_size(int(page_size))
try:
start = self._resolve_virtual(path, allow_root=True)
except ValueError:
return {"error": f"Error: invalid path '{path}'"}
if not start.exists():
return {"error": f"Error: path '{path}' not found"}
if start.is_file():
stat_result = start.stat()
if include_files:
return {
"entries": [
{
"path": self._to_virtual(start, self._root),
"is_dir": False,
"size": stat_result.st_size,
"modified_at": str(stat_result.st_mtime),
"depth": 0,
}
],
"truncated": False,
}
return {
"entries": [],
"truncated": False,
}
pending_dirs: deque[tuple[str, int]] = deque([(str(start), 0)])
entries: list[dict[str, Any]] = []
truncated = False
while pending_dirs and not truncated:
next_dir_path, next_depth = pending_dirs.popleft()
active_entries = self._read_dir_entries(next_dir_path)
for item in active_entries:
item_depth = next_depth + 1
if normalized_depth is not None and item_depth > normalized_depth:
continue
if item["is_dir"]:
if normalized_depth is None or item_depth <= normalized_depth:
pending_dirs.append((item["absolute_path"], item_depth))
if include_dirs:
entries.append(
{
"path": item["path"],
"is_dir": True,
"size": 0,
"modified_at": item["modified_at"],
"depth": item_depth,
}
)
elif include_files:
entries.append(
{
"path": item["path"],
"is_dir": False,
"size": item["size"],
"modified_at": item["modified_at"],
"depth": item_depth,
}
)
if len(entries) >= page_limit:
truncated = True
break
return {
"entries": entries,
"truncated": truncated,
}
async def alist_tree(
self,
path: str = "/",
*,
max_depth: int | None = 8,
page_size: int = 500,
include_files: bool = True,
include_dirs: bool = True,
) -> dict[str, Any]:
return await asyncio.to_thread(
self.list_tree,
path,
max_depth=max_depth,
page_size=page_size,
include_files=include_files,
include_dirs=include_dirs,
)
def move(
self,
source_path: str,
destination_path: str,
overwrite: bool = False,
) -> WriteResult:
try:
source = self._resolve_virtual(source_path)
destination = self._resolve_virtual(destination_path)
except ValueError:
return WriteResult(
error=(
f"Error: invalid source '{source_path}' or destination "
f"'{destination_path}' path"
)
)
if source == destination:
return WriteResult(error="Error: source and destination paths are the same")
with self._acquire_path_locks(source_path, destination_path):
if not source.exists():
return WriteResult(error=f"Error: source path '{source_path}' not found")
if destination.exists():
if not overwrite:
return WriteResult(
error=(
f"Error: destination path '{destination_path}' already exists. "
"Set overwrite=True to replace files."
)
)
if source.is_dir() or destination.is_dir():
return WriteResult(
error=(
"Error: overwrite=True is only supported for file-to-file moves."
)
)
destination.parent.mkdir(parents=True, exist_ok=True)
try:
if overwrite:
os.replace(source, destination)
else:
source.rename(destination)
except OSError as exc:
return WriteResult(error=f"Error: failed to move '{source_path}': {exc}")
return WriteResult(path=self._to_virtual(destination, self._root), files_update=None)
async def amove(
self,
source_path: str,
destination_path: str,
overwrite: bool = False,
) -> WriteResult:
return await asyncio.to_thread(
self.move, source_path, destination_path, overwrite
)
def edit(
self,
file_path: str,

View file

@ -132,6 +132,82 @@ class MultiRootLocalFolderBackend:
async def als_info(self, path: str) -> list[FileInfo]:
return await asyncio.to_thread(self.ls_info, path)
def list_tree(
self,
path: str = "/",
*,
max_depth: int | None = 8,
page_size: int = 500,
include_files: bool = True,
include_dirs: bool = True,
) -> dict[str, Any]:
if path == "/":
entries = [
{
"path": f"/{mount}",
"is_dir": True,
"size": 0,
"modified_at": "0",
"depth": 0,
}
for mount in self._mount_order
]
return {
"entries": entries if include_dirs else [],
"truncated": False,
}
try:
mount, local_path = self._split_mount_path(path)
except ValueError as exc:
return {"error": f"Error: {exc}"}
result = self._mount_to_backend[mount].list_tree(
local_path,
max_depth=max_depth,
page_size=page_size,
include_files=include_files,
include_dirs=include_dirs,
)
if result.get("error"):
return result
entries: list[dict[str, Any]] = []
for entry in result.get("entries", []):
raw_path = self._get_str(entry, "path")
entries.append(
{
"path": self._prefix_mount_path(mount, raw_path),
"is_dir": self._get_bool(entry, "is_dir"),
"size": self._get_int(entry, "size"),
"modified_at": self._get_str(entry, "modified_at"),
"depth": self._get_int(entry, "depth"),
}
)
return {
"entries": entries,
"truncated": self._get_bool(result, "truncated"),
}
async def alist_tree(
self,
path: str = "/",
*,
max_depth: int | None = 8,
page_size: int = 500,
include_files: bool = True,
include_dirs: bool = True,
) -> dict[str, Any]:
return await asyncio.to_thread(
self.list_tree,
path,
max_depth=max_depth,
page_size=page_size,
include_files=include_files,
include_dirs=include_dirs,
)
def read(self, file_path: str, offset: int = 0, limit: int = 2000) -> str:
try:
mount, local_path = self._split_mount_path(file_path)
@ -165,6 +241,48 @@ class MultiRootLocalFolderBackend:
async def awrite(self, file_path: str, content: str) -> WriteResult:
return await asyncio.to_thread(self.write, file_path, content)
def move(
self,
source_path: str,
destination_path: str,
overwrite: bool = False,
) -> WriteResult:
try:
source_mount, source_local_path = self._split_mount_path(source_path)
destination_mount, destination_local_path = self._split_mount_path(
destination_path
)
except ValueError as exc:
return WriteResult(error=f"Error: {exc}")
if source_mount != destination_mount:
return WriteResult(
error=(
"Error: cross-mount moves are not supported. "
"Source and destination must be under the same mounted root."
)
)
result = self._mount_to_backend[source_mount].move(
source_local_path,
destination_local_path,
overwrite=overwrite,
)
if result.path:
result.path = self._prefix_mount_path(source_mount, result.path)
return result
async def amove(
self,
source_path: str,
destination_path: str,
overwrite: bool = False,
) -> WriteResult:
return await asyncio.to_thread(
self.move,
source_path,
destination_path,
overwrite,
)
def edit(
self,
file_path: str,

View file

@ -79,7 +79,7 @@ async def test_file_write_null_filename_uses_semantic_default_path():
@pytest.mark.asyncio
async def test_file_write_null_filename_infers_json_extension():
async def test_file_write_null_filename_defaults_to_markdown_path():
llm = _FakeLLM(
'{"intent":"file_write","confidence":0.71,"suggested_filename":null}'
)
@ -94,7 +94,7 @@ async def test_file_write_null_filename_infers_json_extension():
assert result is not None
contract = result["file_operation_contract"]
assert contract["intent"] == FileOperationIntent.FILE_WRITE.value
assert contract["suggested_path"] == "/notes.json"
assert contract["suggested_path"] == "/notes.md"
@pytest.mark.asyncio

View file

@ -30,6 +30,7 @@ def test_backend_resolver_returns_multi_root_backend_for_single_root(tmp_path: P
backend = resolver(_RuntimeStub())
assert isinstance(backend, MultiRootLocalFolderBackend)
assert backend.list_mounts() == ("tmp",)
def test_backend_resolver_uses_cloud_mode_by_default():
@ -57,3 +58,4 @@ def test_backend_resolver_returns_multi_root_backend_for_multiple_roots(tmp_path
backend = resolver(_RuntimeStub())
assert isinstance(backend, MultiRootLocalFolderBackend)
assert backend.list_mounts() == ("resume", "notes")

View file

@ -34,6 +34,11 @@ class _RuntimeNoSuggestedPath:
state = {"file_operation_contract": {}}
class _RuntimeWithSuggestedPath:
def __init__(self, suggested_path: str) -> None:
self.state = {"file_operation_contract": {"suggested_path": suggested_path}}
def test_verify_written_content_prefers_raw_sync() -> None:
middleware = SurfSenseFilesystemMiddleware.__new__(SurfSenseFilesystemMiddleware)
expected = "line1\nline2"
@ -162,3 +167,47 @@ def test_normalize_local_mount_path_prefixes_posix_absolute_path_for_linux_and_m
resolved = middleware._normalize_local_mount_path("/var/log/app.log", runtime) # type: ignore[arg-type]
assert resolved == "/pc_backups/var/log/app.log"
def test_normalize_local_mount_path_prefers_unique_existing_parent_mount(
tmp_path: Path,
) -> None:
root_a = tmp_path / "RootA"
root_b = tmp_path / "RootB"
(root_a / "other").mkdir(parents=True)
(root_b / "nested" / "deep").mkdir(parents=True)
backend = MultiRootLocalFolderBackend(
(("root_a", str(root_a)), ("root_b", str(root_b)))
)
runtime = _RuntimeNoSuggestedPath()
middleware = SurfSenseFilesystemMiddleware.__new__(SurfSenseFilesystemMiddleware)
middleware._get_backend = lambda _runtime: backend # type: ignore[method-assign]
resolved = middleware._normalize_local_mount_path( # type: ignore[arg-type]
"/nested/deep/new-note.md",
runtime,
)
assert resolved == "/root_b/nested/deep/new-note.md"
def test_normalize_local_mount_path_uses_suggested_mount_when_ambiguous(
tmp_path: Path,
) -> None:
root_a = tmp_path / "RootA"
root_b = tmp_path / "RootB"
root_a.mkdir(parents=True)
root_b.mkdir(parents=True)
backend = MultiRootLocalFolderBackend(
(("root_a", str(root_a)), ("root_b", str(root_b)))
)
runtime = _RuntimeWithSuggestedPath("/root_b/notes/context.md")
middleware = SurfSenseFilesystemMiddleware.__new__(SurfSenseFilesystemMiddleware)
middleware._get_backend = lambda _runtime: backend # type: ignore[method-assign]
resolved = middleware._normalize_local_mount_path( # type: ignore[arg-type]
"/brand-new-note.md",
runtime,
)
assert resolved == "/root_b/brand-new-note.md"

View file

@ -9,6 +9,7 @@ pytestmark = pytest.mark.unit
def test_local_backend_write_read_edit_roundtrip(tmp_path: Path):
backend = LocalFolderBackend(str(tmp_path))
(tmp_path / "notes").mkdir()
write = backend.write("/notes/test.md", "line1\nline2")
assert write.error is None
@ -51,9 +52,20 @@ def test_local_backend_glob_and_grep(tmp_path: Path):
def test_local_backend_read_raw_returns_exact_content(tmp_path: Path):
backend = LocalFolderBackend(str(tmp_path))
(tmp_path / "notes").mkdir()
expected = "# Title\n\nline 1\nline 2\n"
write = backend.write("/notes/raw.md", expected)
assert write.error is None
raw = backend.read_raw("/notes/raw.md")
assert raw == expected
def test_local_backend_write_rejects_missing_parent_directory(tmp_path: Path):
backend = LocalFolderBackend(str(tmp_path))
write = backend.write("/tempoo/new-note.md", "# New note")
assert write.error is not None
assert "parent directory" in write.error
assert not (tmp_path / "tempoo").exists()

View file

@ -26,3 +26,12 @@ def test_mount_ids_preserve_client_mapping_order(tmp_path: Path) -> None:
)
assert backend.list_mounts() == ("pc_backups", "pc_backups_2", "notes_2026")
def test_mount_id_is_authoritative_not_folder_name(tmp_path: Path) -> None:
root = tmp_path / "Resume Folder"
root.mkdir()
backend = MultiRootLocalFolderBackend((("custom_resume_mount", str(root)),))
assert backend.list_mounts() == ("custom_resume_mount",)