mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-21 18:55:16 +02:00
feat(filesystem): enhance local mount path normalization and add error handling for missing parent directories
This commit is contained in:
parent
8c06709295
commit
c238a671c8
5 changed files with 160 additions and 5 deletions
|
|
@ -28,6 +28,9 @@ from langgraph.types import Command
|
|||
from sqlalchemy import delete, select
|
||||
|
||||
from app.agents.new_chat.filesystem_selection import FilesystemMode
|
||||
from app.agents.new_chat.middleware.multi_root_local_folder_backend import (
|
||||
MultiRootLocalFolderBackend,
|
||||
)
|
||||
from app.agents.new_chat.sandbox import (
|
||||
_evict_sandbox_cache,
|
||||
delete_sandbox,
|
||||
|
|
@ -816,6 +819,89 @@ class SurfSenseFilesystemMiddleware(FilesystemMiddleware):
|
|||
return normalized
|
||||
return f"/{normalized.lstrip('/')}"
|
||||
|
||||
@staticmethod
|
||||
def _extract_mount_from_path(path: str, mounts: tuple[str, ...]) -> str | None:
|
||||
rel = path.lstrip("/")
|
||||
if not rel:
|
||||
return None
|
||||
mount, _, _ = rel.partition("/")
|
||||
if mount in mounts:
|
||||
return mount
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _local_parent_path(path: str) -> str:
|
||||
rel = path.lstrip("/")
|
||||
if "/" not in rel:
|
||||
return "/"
|
||||
parent = rel.rsplit("/", 1)[0].strip("/")
|
||||
if not parent:
|
||||
return "/"
|
||||
return f"/{parent}"
|
||||
|
||||
@staticmethod
|
||||
def _path_exists_under_mount(
|
||||
backend: MultiRootLocalFolderBackend,
|
||||
mount: str,
|
||||
local_path: str,
|
||||
) -> bool:
|
||||
result = backend.list_tree(
|
||||
f"/{mount}{local_path}",
|
||||
max_depth=0,
|
||||
page_size=1,
|
||||
include_files=True,
|
||||
include_dirs=True,
|
||||
)
|
||||
return not bool(result.get("error"))
|
||||
|
||||
def _normalize_local_mount_path(
|
||||
self,
|
||||
candidate: str,
|
||||
runtime: ToolRuntime[None, FilesystemState],
|
||||
) -> str:
|
||||
normalized = self._normalize_absolute_path(candidate)
|
||||
backend = self._get_backend(runtime)
|
||||
if not isinstance(backend, MultiRootLocalFolderBackend):
|
||||
return normalized
|
||||
|
||||
mounts = backend.list_mounts()
|
||||
explicit_mount = self._extract_mount_from_path(normalized, mounts)
|
||||
if explicit_mount:
|
||||
return normalized
|
||||
|
||||
if len(mounts) == 1:
|
||||
return f"/{mounts[0]}{normalized}"
|
||||
|
||||
suggested_mount: str | None = None
|
||||
contract = runtime.state.get("file_operation_contract") or {}
|
||||
suggested_path = contract.get("suggested_path")
|
||||
if isinstance(suggested_path, str) and suggested_path.strip():
|
||||
normalized_suggested = self._normalize_absolute_path(suggested_path)
|
||||
suggested_mount = self._extract_mount_from_path(normalized_suggested, mounts)
|
||||
|
||||
matching_mounts = [
|
||||
mount
|
||||
for mount in mounts
|
||||
if self._path_exists_under_mount(backend, mount, normalized)
|
||||
]
|
||||
if len(matching_mounts) == 1:
|
||||
return f"/{matching_mounts[0]}{normalized}"
|
||||
|
||||
parent_path = self._local_parent_path(normalized)
|
||||
if parent_path != "/":
|
||||
parent_matching_mounts = [
|
||||
mount
|
||||
for mount in mounts
|
||||
if self._path_exists_under_mount(backend, mount, parent_path)
|
||||
]
|
||||
if len(parent_matching_mounts) == 1:
|
||||
return f"/{parent_matching_mounts[0]}{normalized}"
|
||||
|
||||
if suggested_mount:
|
||||
return f"/{suggested_mount}{normalized}"
|
||||
|
||||
return f"/{backend.default_mount()}{normalized}"
|
||||
|
||||
def _get_contract_suggested_path(
|
||||
self, runtime: ToolRuntime[None, FilesystemState]
|
||||
) -> str:
|
||||
|
|
@ -834,7 +920,7 @@ class SurfSenseFilesystemMiddleware(FilesystemMiddleware):
|
|||
if not candidate:
|
||||
return self._get_contract_suggested_path(runtime)
|
||||
if self._filesystem_mode == FilesystemMode.DESKTOP_LOCAL_FOLDER:
|
||||
return self._normalize_absolute_path(candidate)
|
||||
return self._normalize_local_mount_path(candidate, runtime)
|
||||
if not candidate.startswith("/"):
|
||||
return f"/{candidate.lstrip('/')}"
|
||||
return candidate
|
||||
|
|
@ -848,7 +934,7 @@ class SurfSenseFilesystemMiddleware(FilesystemMiddleware):
|
|||
if not candidate:
|
||||
return ""
|
||||
if self._filesystem_mode == FilesystemMode.DESKTOP_LOCAL_FOLDER:
|
||||
return self._normalize_absolute_path(candidate)
|
||||
return self._normalize_local_mount_path(candidate, runtime)
|
||||
if not candidate.startswith("/"):
|
||||
return f"/{candidate.lstrip('/')}"
|
||||
return candidate
|
||||
|
|
@ -862,7 +948,7 @@ class SurfSenseFilesystemMiddleware(FilesystemMiddleware):
|
|||
if candidate == "/":
|
||||
return "/"
|
||||
if self._filesystem_mode == FilesystemMode.DESKTOP_LOCAL_FOLDER:
|
||||
return self._normalize_absolute_path(candidate)
|
||||
return self._normalize_local_mount_path(candidate, runtime)
|
||||
if not candidate.startswith("/"):
|
||||
return f"/{candidate.lstrip('/')}"
|
||||
return candidate
|
||||
|
|
|
|||
|
|
@ -180,6 +180,14 @@ class LocalFolderBackend:
|
|||
"Read and then make an edit, or write to a new path."
|
||||
)
|
||||
)
|
||||
parent = path.parent
|
||||
if not parent.exists() or not parent.is_dir():
|
||||
return WriteResult(
|
||||
error=(
|
||||
f"Error: parent directory for '{file_path}' does not exist. "
|
||||
"Create the folder first or write to an existing directory."
|
||||
)
|
||||
)
|
||||
self._write_text_atomic(path, content)
|
||||
return WriteResult(path=file_path, files_update=None)
|
||||
|
||||
|
|
|
|||
|
|
@ -79,7 +79,7 @@ async def test_file_write_null_filename_uses_semantic_default_path():
|
|||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_write_null_filename_infers_json_extension():
|
||||
async def test_file_write_null_filename_defaults_to_markdown_path():
|
||||
llm = _FakeLLM(
|
||||
'{"intent":"file_write","confidence":0.71,"suggested_filename":null}'
|
||||
)
|
||||
|
|
@ -94,7 +94,7 @@ async def test_file_write_null_filename_infers_json_extension():
|
|||
assert result is not None
|
||||
contract = result["file_operation_contract"]
|
||||
assert contract["intent"] == FileOperationIntent.FILE_WRITE.value
|
||||
assert contract["suggested_path"] == "/notes.json"
|
||||
assert contract["suggested_path"] == "/notes.md"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
|
|
|||
|
|
@ -34,6 +34,11 @@ class _RuntimeNoSuggestedPath:
|
|||
state = {"file_operation_contract": {}}
|
||||
|
||||
|
||||
class _RuntimeWithSuggestedPath:
|
||||
def __init__(self, suggested_path: str) -> None:
|
||||
self.state = {"file_operation_contract": {"suggested_path": suggested_path}}
|
||||
|
||||
|
||||
def test_verify_written_content_prefers_raw_sync() -> None:
|
||||
middleware = SurfSenseFilesystemMiddleware.__new__(SurfSenseFilesystemMiddleware)
|
||||
expected = "line1\nline2"
|
||||
|
|
@ -162,3 +167,47 @@ def test_normalize_local_mount_path_prefixes_posix_absolute_path_for_linux_and_m
|
|||
resolved = middleware._normalize_local_mount_path("/var/log/app.log", runtime) # type: ignore[arg-type]
|
||||
|
||||
assert resolved == "/pc_backups/var/log/app.log"
|
||||
|
||||
|
||||
def test_normalize_local_mount_path_prefers_unique_existing_parent_mount(
|
||||
tmp_path: Path,
|
||||
) -> None:
|
||||
root_a = tmp_path / "RootA"
|
||||
root_b = tmp_path / "RootB"
|
||||
(root_a / "other").mkdir(parents=True)
|
||||
(root_b / "nested" / "deep").mkdir(parents=True)
|
||||
backend = MultiRootLocalFolderBackend(
|
||||
(("root_a", str(root_a)), ("root_b", str(root_b)))
|
||||
)
|
||||
runtime = _RuntimeNoSuggestedPath()
|
||||
middleware = SurfSenseFilesystemMiddleware.__new__(SurfSenseFilesystemMiddleware)
|
||||
middleware._get_backend = lambda _runtime: backend # type: ignore[method-assign]
|
||||
|
||||
resolved = middleware._normalize_local_mount_path( # type: ignore[arg-type]
|
||||
"/nested/deep/new-note.md",
|
||||
runtime,
|
||||
)
|
||||
|
||||
assert resolved == "/root_b/nested/deep/new-note.md"
|
||||
|
||||
|
||||
def test_normalize_local_mount_path_uses_suggested_mount_when_ambiguous(
|
||||
tmp_path: Path,
|
||||
) -> None:
|
||||
root_a = tmp_path / "RootA"
|
||||
root_b = tmp_path / "RootB"
|
||||
root_a.mkdir(parents=True)
|
||||
root_b.mkdir(parents=True)
|
||||
backend = MultiRootLocalFolderBackend(
|
||||
(("root_a", str(root_a)), ("root_b", str(root_b)))
|
||||
)
|
||||
runtime = _RuntimeWithSuggestedPath("/root_b/notes/context.md")
|
||||
middleware = SurfSenseFilesystemMiddleware.__new__(SurfSenseFilesystemMiddleware)
|
||||
middleware._get_backend = lambda _runtime: backend # type: ignore[method-assign]
|
||||
|
||||
resolved = middleware._normalize_local_mount_path( # type: ignore[arg-type]
|
||||
"/brand-new-note.md",
|
||||
runtime,
|
||||
)
|
||||
|
||||
assert resolved == "/root_b/brand-new-note.md"
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ pytestmark = pytest.mark.unit
|
|||
|
||||
def test_local_backend_write_read_edit_roundtrip(tmp_path: Path):
|
||||
backend = LocalFolderBackend(str(tmp_path))
|
||||
(tmp_path / "notes").mkdir()
|
||||
|
||||
write = backend.write("/notes/test.md", "line1\nline2")
|
||||
assert write.error is None
|
||||
|
|
@ -51,9 +52,20 @@ def test_local_backend_glob_and_grep(tmp_path: Path):
|
|||
|
||||
def test_local_backend_read_raw_returns_exact_content(tmp_path: Path):
|
||||
backend = LocalFolderBackend(str(tmp_path))
|
||||
(tmp_path / "notes").mkdir()
|
||||
expected = "# Title\n\nline 1\nline 2\n"
|
||||
write = backend.write("/notes/raw.md", expected)
|
||||
assert write.error is None
|
||||
|
||||
raw = backend.read_raw("/notes/raw.md")
|
||||
assert raw == expected
|
||||
|
||||
|
||||
def test_local_backend_write_rejects_missing_parent_directory(tmp_path: Path):
|
||||
backend = LocalFolderBackend(str(tmp_path))
|
||||
|
||||
write = backend.write("/tempoo/new-note.md", "# New note")
|
||||
|
||||
assert write.error is not None
|
||||
assert "parent directory" in write.error
|
||||
assert not (tmp_path / "tempoo").exists()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue