feat: updated file management for main agent

This commit is contained in:
DESKTOP-RTLN3BA\$punk 2026-04-28 04:32:52 -07:00
parent 8d50f90060
commit 05ca4c0b9f
27 changed files with 5054 additions and 1803 deletions

View file

@ -0,0 +1,198 @@
"""Tests for canonical virtual-path resolver helpers."""
from __future__ import annotations
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
import pytest
from app.agents.new_chat.path_resolver import (
DOCUMENTS_ROOT,
PathIndex,
doc_to_virtual_path,
parse_doc_id_suffix,
parse_documents_path,
safe_filename,
safe_folder_segment,
virtual_path_to_doc,
)
pytestmark = pytest.mark.unit
class TestSafeFilename:
def test_appends_xml_extension(self):
assert safe_filename("notes").endswith(".xml")
def test_strips_invalid_chars(self):
assert "/" not in safe_filename("a/b\\c.xml")
def test_falls_back_when_empty(self):
assert safe_filename("").endswith(".xml")
assert safe_filename("///") == "untitled.xml" or safe_filename("///").endswith(
".xml"
)
class TestSafeFolderSegment:
def test_strips_path_separators(self):
assert "/" not in safe_folder_segment("a/b")
def test_falls_back(self):
assert safe_folder_segment("") == "folder"
class TestParseDocIdSuffix:
def test_parses_suffix(self):
stem, doc_id = parse_doc_id_suffix("My Doc (42).xml")
assert stem == "My Doc"
assert doc_id == 42
def test_no_suffix_returns_none(self):
stem, doc_id = parse_doc_id_suffix("My Doc.xml")
assert stem == "My Doc"
assert doc_id is None
def test_no_xml_extension(self):
stem, doc_id = parse_doc_id_suffix("plain")
assert stem == "plain"
assert doc_id is None
class TestDocToVirtualPath:
def test_root_when_no_folder(self):
index = PathIndex()
path = doc_to_virtual_path(doc_id=1, title="Hello", folder_id=None, index=index)
assert path == f"{DOCUMENTS_ROOT}/Hello.xml"
assert index.occupants[path] == 1
def test_collision_picks_doc_id_suffix(self):
index = PathIndex(occupants={f"{DOCUMENTS_ROOT}/Hello.xml": 7})
path = doc_to_virtual_path(doc_id=8, title="Hello", folder_id=None, index=index)
assert path == f"{DOCUMENTS_ROOT}/Hello (8).xml"
assert index.occupants[path] == 8
def test_uses_folder_path_when_known(self):
index = PathIndex(folder_paths={5: f"{DOCUMENTS_ROOT}/notes"})
path = doc_to_virtual_path(doc_id=2, title="A", folder_id=5, index=index)
assert path == f"{DOCUMENTS_ROOT}/notes/A.xml"
class TestParseDocumentsPath:
def test_extracts_folder_parts_and_title(self):
parts, title = parse_documents_path(f"{DOCUMENTS_ROOT}/foo/bar/baz.xml")
assert parts == ["foo", "bar"]
assert title == "baz"
def test_strips_doc_id_suffix(self):
parts, title = parse_documents_path(f"{DOCUMENTS_ROOT}/foo/My Doc (12).xml")
assert parts == ["foo"]
assert title == "My Doc"
def test_non_documents_returns_empty(self):
assert parse_documents_path("/other/x.xml") == ([], "")
def _result_from_scalars(rows: list):
"""Build a fake SQLAlchemy ``Result`` whose ``.scalars().all()`` and
``.scalars().first()`` yield ``rows``."""
scalars = MagicMock()
scalars.all.return_value = list(rows)
scalars.first.return_value = rows[0] if rows else None
result = MagicMock()
result.scalars.return_value = scalars
result.scalar_one_or_none.return_value = None
result.first.return_value = None
return result
def _result_from_one(value):
result = MagicMock()
result.scalar_one_or_none.return_value = value
return result
class TestVirtualPathToDoc:
"""Lookup must round-trip through ``safe_filename``'s lossy encoding.
The workspace tree displays ``safe_filename(title)`` as the basename, so
when the agent passes that basename back to a tool (move/edit/read) the
resolver must find the original document even though characters like
``:`` were replaced with ``_``.
"""
@pytest.mark.asyncio
async def test_falls_back_to_safe_filename_match_when_title_lossy(self):
# A Google Calendar-style title that contains a colon — safe_filename
# rewrites the colon to ``_``, so the literal title-equality lookup
# would miss this row.
original_title = "Calendar: Happy birthday!"
encoded_basename = safe_filename(original_title)
assert encoded_basename == "Calendar_ Happy birthday!.xml"
target_doc = SimpleNamespace(id=42, title=original_title, folder_id=None)
session = MagicMock()
# Each ``await session.execute(...)`` returns a fresh canned result.
# Order matches the resolver's lookup steps:
# 1) unique_identifier_hash → no match
# 2) literal title match → no match (lossy encoding)
# 3) folder scan → returns the row whose title encodes to basename
session.execute = AsyncMock(
side_effect=[
_result_from_one(None),
_result_from_scalars([]),
_result_from_scalars([target_doc]),
]
)
document = await virtual_path_to_doc(
session,
search_space_id=5,
virtual_path=f"{DOCUMENTS_ROOT}/{encoded_basename}",
)
assert document is target_doc
@pytest.mark.asyncio
async def test_returns_none_when_no_doc_matches_safe_filename(self):
session = MagicMock()
session.execute = AsyncMock(
side_effect=[
_result_from_one(None),
_result_from_scalars([]),
_result_from_scalars(
[SimpleNamespace(id=1, title="Something else", folder_id=None)]
),
]
)
document = await virtual_path_to_doc(
session,
search_space_id=5,
virtual_path=f"{DOCUMENTS_ROOT}/Calendar_ Happy birthday!.xml",
)
assert document is None
@pytest.mark.asyncio
async def test_literal_title_match_short_circuits_fallback(self):
# When the literal title query hits, the folder-scan fallback must
# NOT run (saves a query and avoids picking the wrong doc when two
# rows share a lossy encoding).
target_doc = SimpleNamespace(id=7, title="Plain Note", folder_id=None)
session = MagicMock()
session.execute = AsyncMock(
side_effect=[
_result_from_one(None),
_result_from_scalars([target_doc]),
]
)
document = await virtual_path_to_doc(
session,
search_space_id=5,
virtual_path=f"{DOCUMENTS_ROOT}/Plain Note.xml",
)
assert document is target_doc
assert session.execute.await_count == 2

View file

@ -0,0 +1,107 @@
"""Tests for SurfSense filesystem state reducers."""
from __future__ import annotations
import pytest
from app.agents.new_chat.state_reducers import (
_CLEAR,
_add_unique_reducer,
_dict_merge_with_tombstones_reducer,
_initial_filesystem_state,
_list_append_reducer,
_replace_reducer,
)
pytestmark = pytest.mark.unit
class TestReplaceReducer:
def test_right_wins_outright(self):
assert _replace_reducer("a", "b") == "b"
def test_none_right_returns_none(self):
assert _replace_reducer("a", None) is None
def test_none_left_returns_right(self):
assert _replace_reducer(None, "b") == "b"
class TestAddUniqueReducer:
def test_appends_unique_items(self):
assert _add_unique_reducer(["a"], ["b", "c"]) == ["a", "b", "c"]
def test_dedupes_against_left(self):
assert _add_unique_reducer(["a", "b"], ["b", "c"]) == ["a", "b", "c"]
def test_dedupes_within_right(self):
assert _add_unique_reducer([], ["a", "a", "b"]) == ["a", "b"]
def test_clear_anywhere_resets_and_reseeds_with_after_items(self):
# _CLEAR semantics: only items AFTER the LAST _CLEAR are kept.
result = _add_unique_reducer(["x", "y"], ["a", _CLEAR, "b", "c"])
assert result == ["b", "c"]
def test_multiple_clears_use_last(self):
result = _add_unique_reducer(["x"], [_CLEAR, "a", _CLEAR, "b"])
assert result == ["b"]
def test_clear_only_resets_to_empty(self):
assert _add_unique_reducer(["x", "y"], [_CLEAR]) == []
def test_empty_right_keeps_left(self):
assert _add_unique_reducer(["a"], []) == ["a"]
assert _add_unique_reducer(["a"], None) == ["a"]
class TestListAppendReducer:
def test_preserves_order_and_duplicates(self):
result = _list_append_reducer([{"a": 1}], [{"b": 2}, {"a": 1}])
assert result == [{"a": 1}, {"b": 2}, {"a": 1}]
def test_clear_resets_keeping_after_items(self):
result = _list_append_reducer([{"a": 1}], [{"old": 1}, _CLEAR, {"new": 2}])
assert result == [{"new": 2}]
class TestDictMergeWithTombstones:
def test_merges_keys(self):
assert _dict_merge_with_tombstones_reducer({"a": 1}, {"b": 2}) == {
"a": 1,
"b": 2,
}
def test_none_value_deletes_key(self):
result = _dict_merge_with_tombstones_reducer({"a": 1, "b": 2}, {"a": None})
assert result == {"b": 2}
def test_clear_resets_then_merges(self):
result = _dict_merge_with_tombstones_reducer(
{"a": 1, "b": 2}, {_CLEAR: True, "c": 3}
)
assert result == {"c": 3}
def test_clear_keeps_only_post_clear_non_none(self):
result = _dict_merge_with_tombstones_reducer(
{"a": 1}, {_CLEAR: True, "b": 2, "c": None}
)
assert result == {"b": 2}
def test_none_left_handled(self):
assert _dict_merge_with_tombstones_reducer(None, {"a": 1, "b": None}) == {
"a": 1
}
class TestInitialFilesystemState:
def test_default_shape(self):
state = _initial_filesystem_state()
assert state["cwd"] == "/documents"
assert state["staged_dirs"] == []
assert state["pending_moves"] == []
assert state["doc_id_by_path"] == {}
assert state["dirty_paths"] == []
assert state["kb_priority"] == []
assert state["kb_matched_chunk_ids"] == {}
assert state["kb_anon_doc"] is None
assert state["tree_version"] == 0

View file

@ -36,11 +36,18 @@ def test_backend_resolver_returns_multi_root_backend_for_single_root(tmp_path: P
def test_backend_resolver_uses_cloud_mode_by_default():
resolver = build_backend_resolver(FilesystemSelection())
backend = resolver(_RuntimeStub())
# StateBackend class name check keeps this test decoupled
# from internal deepagents runtime class identity.
# When no search_space_id is provided we fall back to StateBackend so
# sub-agents / tests without DB access still work.
assert backend.__class__.__name__ == "StateBackend"
def test_backend_resolver_uses_kb_postgres_in_cloud_with_search_space():
resolver = build_backend_resolver(FilesystemSelection(), search_space_id=42)
backend = resolver(_RuntimeStub())
assert backend.__class__.__name__ == "KBPostgresBackend"
assert backend.search_space_id == 42
def test_backend_resolver_returns_multi_root_backend_for_multiple_roots(tmp_path: Path):
root_one = tmp_path / "resume"
root_two = tmp_path / "notes"

View file

@ -0,0 +1,204 @@
"""Unit tests for the SurfSense filesystem middleware new behaviors.
Covers:
* cloud cwd defaults to ``/documents`` and relative paths resolve under it
* cloud writes outside ``/documents/`` are rejected unless basename starts
with ``temp_``
* cloud writes/edits to the anonymous document are rejected (read-only)
* helper methods on the middleware (``_resolve_relative``,
``_check_cloud_write_namespace``, ``_default_cwd``)
These tests use ``__new__`` to bypass the heavy ``__init__`` and exercise
the helper methods directly so the test surface stays narrow and fast.
"""
from __future__ import annotations
from types import SimpleNamespace
import pytest
from app.agents.new_chat.filesystem_selection import FilesystemMode
from app.agents.new_chat.middleware.filesystem import (
SurfSenseFilesystemMiddleware,
_build_filesystem_system_prompt,
_build_tool_descriptions,
)
pytestmark = pytest.mark.unit
def _make_middleware(mode: FilesystemMode = FilesystemMode.CLOUD):
middleware = SurfSenseFilesystemMiddleware.__new__(SurfSenseFilesystemMiddleware)
middleware._filesystem_mode = mode
return middleware
def _runtime(state: dict | None = None) -> SimpleNamespace:
return SimpleNamespace(state=state or {})
class TestCloudCwdDefaults:
def test_default_cwd_in_cloud_is_documents_root(self):
m = _make_middleware()
assert m._default_cwd() == "/documents"
def test_default_cwd_in_desktop_is_root(self):
m = _make_middleware(FilesystemMode.DESKTOP_LOCAL_FOLDER)
assert m._default_cwd() == "/"
def test_current_cwd_uses_state_when_set(self):
m = _make_middleware()
runtime = _runtime({"cwd": "/documents/notes"})
assert m._current_cwd(runtime) == "/documents/notes"
def test_current_cwd_falls_back_to_default(self):
m = _make_middleware()
runtime = _runtime({})
assert m._current_cwd(runtime) == "/documents"
def test_current_cwd_ignores_invalid(self):
m = _make_middleware()
runtime = _runtime({"cwd": "not-absolute"})
assert m._current_cwd(runtime) == "/documents"
class TestRelativePathResolution:
def test_relative_path_resolves_against_cwd(self):
m = _make_middleware()
runtime = _runtime({"cwd": "/documents/projects"})
assert (
m._resolve_relative("notes.md", runtime) == "/documents/projects/notes.md"
)
def test_relative_path_with_dotdot(self):
m = _make_middleware()
runtime = _runtime({"cwd": "/documents/a/b"})
assert m._resolve_relative("../c.md", runtime) == "/documents/a/c.md"
def test_absolute_path_is_kept(self):
m = _make_middleware()
runtime = _runtime({"cwd": "/documents"})
assert m._resolve_relative("/other/x.md", runtime) == "/other/x.md"
def test_empty_path_returns_cwd(self):
m = _make_middleware()
runtime = _runtime({"cwd": "/documents/projects"})
assert m._resolve_relative("", runtime) == "/documents/projects"
class TestCloudWriteNamespacePolicy:
def test_documents_path_allowed(self):
m = _make_middleware()
runtime = _runtime()
assert m._check_cloud_write_namespace("/documents/foo.md", runtime) is None
def test_documents_root_allowed(self):
m = _make_middleware()
runtime = _runtime()
assert m._check_cloud_write_namespace("/documents", runtime) is None
def test_temp_basename_anywhere_allowed(self):
m = _make_middleware()
runtime = _runtime()
assert m._check_cloud_write_namespace("/temp_scratch.md", runtime) is None
assert m._check_cloud_write_namespace("/foo/temp_x.md", runtime) is None
assert m._check_cloud_write_namespace("/documents/temp_x.md", runtime) is None
def test_other_paths_rejected(self):
m = _make_middleware()
runtime = _runtime()
err = m._check_cloud_write_namespace("/foo/bar.md", runtime)
assert err is not None
assert "must target /documents" in err
def test_anon_doc_path_is_read_only(self):
m = _make_middleware()
runtime = _runtime(
{
"kb_anon_doc": {
"path": "/documents/uploaded.xml",
"title": "uploaded",
"content": "",
"chunks": [],
}
}
)
err = m._check_cloud_write_namespace("/documents/uploaded.xml", runtime)
assert err is not None
assert "read-only" in err
def test_desktop_mode_skips_namespace_policy(self):
m = _make_middleware(FilesystemMode.DESKTOP_LOCAL_FOLDER)
runtime = _runtime()
assert m._check_cloud_write_namespace("/random/path.md", runtime) is None
class TestModeSpecificPrompts:
"""The prompt and tool descriptions must only describe the active mode.
Cross-mode noise wastes tokens and confuses the model with rules it
cannot use this session.
"""
def test_cloud_prompt_omits_desktop_section(self):
prompt = _build_filesystem_system_prompt(
FilesystemMode.CLOUD, sandbox_available=False
)
assert "Local Folder Mode" not in prompt
assert "mount-prefixed" not in prompt
assert "Persistence Rules" in prompt
assert "/documents" in prompt
assert "temp_" in prompt
def test_desktop_prompt_omits_cloud_persistence_rules(self):
prompt = _build_filesystem_system_prompt(
FilesystemMode.DESKTOP_LOCAL_FOLDER, sandbox_available=False
)
assert "Persistence Rules" not in prompt
assert "Workspace Tree" not in prompt
assert "<priority_documents>" not in prompt
assert "Local Folder Mode" in prompt
assert "mount-prefixed" in prompt
def test_cloud_tool_descs_omit_desktop_phrases(self):
descs = _build_tool_descriptions(FilesystemMode.CLOUD)
for name in (
"write_file",
"edit_file",
"move_file",
"mkdir",
"list_tree",
"grep",
):
text = descs[name]
assert "Desktop" not in text, f"{name} leaks desktop hints"
assert "Cloud mode:" not in text, f"{name} qualifies a cloud-only desc"
def test_desktop_tool_descs_omit_cloud_phrases(self):
descs = _build_tool_descriptions(FilesystemMode.DESKTOP_LOCAL_FOLDER)
for name in (
"write_file",
"edit_file",
"move_file",
"mkdir",
"list_tree",
"grep",
):
text = descs[name]
assert "Cloud" not in text, f"{name} leaks cloud hints"
assert "/documents/" not in text, f"{name} mentions cloud namespace"
assert "temp_" not in text, f"{name} mentions cloud temp_ semantics"
def test_sandbox_addendum_appended_when_available(self):
prompt = _build_filesystem_system_prompt(
FilesystemMode.CLOUD, sandbox_available=True
)
assert "execute_code" in prompt
assert "Code Execution" in prompt
def test_sandbox_addendum_absent_when_unavailable(self):
prompt = _build_filesystem_system_prompt(
FilesystemMode.CLOUD, sandbox_available=False
)
assert "execute_code" not in prompt

View file

@ -11,25 +11,6 @@ from app.agents.new_chat.middleware.multi_root_local_folder_backend import (
pytestmark = pytest.mark.unit
class _BackendWithRawRead:
def __init__(self, content: str) -> None:
self._content = content
def read(self, file_path: str, offset: int = 0, limit: int = 200000) -> str:
del file_path, offset, limit
return " 1\tline1\n 2\tline2"
async def aread(self, file_path: str, offset: int = 0, limit: int = 200000) -> str:
return self.read(file_path, offset, limit)
def read_raw(self, file_path: str) -> str:
del file_path
return self._content
async def aread_raw(self, file_path: str) -> str:
return self.read_raw(file_path)
class _RuntimeNoSuggestedPath:
state = {"file_operation_contract": {}}
@ -39,40 +20,19 @@ class _RuntimeWithSuggestedPath:
self.state = {"file_operation_contract": {"suggested_path": suggested_path}}
def test_verify_written_content_prefers_raw_sync() -> None:
middleware = SurfSenseFilesystemMiddleware.__new__(SurfSenseFilesystemMiddleware)
expected = "line1\nline2"
backend = _BackendWithRawRead(expected)
verify_error = middleware._verify_written_content_sync(
backend=backend,
path="/note.md",
expected_content=expected,
)
assert verify_error is None
def test_contract_suggested_path_falls_back_to_notes_md() -> None:
def test_contract_suggested_path_falls_back_to_documents_notes_md() -> None:
middleware = SurfSenseFilesystemMiddleware.__new__(SurfSenseFilesystemMiddleware)
middleware._filesystem_mode = FilesystemMode.CLOUD
suggested = middleware._get_contract_suggested_path(_RuntimeNoSuggestedPath()) # type: ignore[arg-type]
assert suggested == "/notes.md"
# Cloud default cwd is /documents so the fallback lands in the KB.
assert suggested == "/documents/notes.md"
@pytest.mark.asyncio
async def test_verify_written_content_prefers_raw_async() -> None:
def test_contract_suggested_path_falls_back_to_root_notes_md_in_desktop() -> None:
middleware = SurfSenseFilesystemMiddleware.__new__(SurfSenseFilesystemMiddleware)
expected = "line1\nline2"
backend = _BackendWithRawRead(expected)
verify_error = await middleware._verify_written_content_async(
backend=backend,
path="/note.md",
expected_content=expected,
)
assert verify_error is None
middleware._filesystem_mode = FilesystemMode.DESKTOP_LOCAL_FOLDER
suggested = middleware._get_contract_suggested_path(_RuntimeNoSuggestedPath()) # type: ignore[arg-type]
assert suggested == "/notes.md"
def test_normalize_local_mount_path_prefixes_default_mount(tmp_path: Path) -> None:

View file

@ -5,10 +5,10 @@ import json
import pytest
from langchain_core.messages import AIMessage, HumanMessage
from app.agents.new_chat.document_xml import build_document_xml as _build_document_xml
from app.agents.new_chat.middleware.knowledge_search import (
KBSearchPlan,
KnowledgeBaseSearchMiddleware,
_build_document_xml,
_normalize_optional_date_range,
_parse_kb_search_plan_response,
_render_recent_conversation,
@ -248,17 +248,10 @@ class TestKnowledgeBaseSearchMiddlewarePlanner:
captured.update(kwargs)
return []
async def fake_build_scoped_filesystem(**kwargs):
return {}, {}
monkeypatch.setattr(
"app.agents.new_chat.middleware.knowledge_search.search_knowledge_base",
fake_search_knowledge_base,
)
monkeypatch.setattr(
"app.agents.new_chat.middleware.knowledge_search.build_scoped_filesystem",
fake_build_scoped_filesystem,
)
llm = FakeLLM(
json.dumps(
@ -298,17 +291,10 @@ class TestKnowledgeBaseSearchMiddlewarePlanner:
captured.update(kwargs)
return []
async def fake_build_scoped_filesystem(**kwargs):
return {}, {}
monkeypatch.setattr(
"app.agents.new_chat.middleware.knowledge_search.search_knowledge_base",
fake_search_knowledge_base,
)
monkeypatch.setattr(
"app.agents.new_chat.middleware.knowledge_search.build_scoped_filesystem",
fake_build_scoped_filesystem,
)
middleware = KnowledgeBaseSearchMiddleware(
llm=FakeLLM("not json"),
@ -334,17 +320,10 @@ class TestKnowledgeBaseSearchMiddlewarePlanner:
captured.update(kwargs)
return []
async def fake_build_scoped_filesystem(**kwargs):
return {}, {}
monkeypatch.setattr(
"app.agents.new_chat.middleware.knowledge_search.search_knowledge_base",
fake_search_knowledge_base,
)
monkeypatch.setattr(
"app.agents.new_chat.middleware.knowledge_search.build_scoped_filesystem",
fake_build_scoped_filesystem,
)
middleware = KnowledgeBaseSearchMiddleware(
llm=FakeLLM(
@ -386,9 +365,6 @@ class TestKnowledgeBaseSearchMiddlewarePlanner:
search_called = True
return []
async def fake_build_scoped_filesystem(**kwargs):
return {}, {}
monkeypatch.setattr(
"app.agents.new_chat.middleware.knowledge_search.browse_recent_documents",
fake_browse_recent_documents,
@ -397,10 +373,6 @@ class TestKnowledgeBaseSearchMiddlewarePlanner:
"app.agents.new_chat.middleware.knowledge_search.search_knowledge_base",
fake_search_knowledge_base,
)
monkeypatch.setattr(
"app.agents.new_chat.middleware.knowledge_search.build_scoped_filesystem",
fake_build_scoped_filesystem,
)
llm = FakeLLM(
json.dumps(
@ -440,9 +412,6 @@ class TestKnowledgeBaseSearchMiddlewarePlanner:
search_captured.update(kwargs)
return []
async def fake_build_scoped_filesystem(**kwargs):
return {}, {}
monkeypatch.setattr(
"app.agents.new_chat.middleware.knowledge_search.browse_recent_documents",
fake_browse_recent_documents,
@ -451,10 +420,6 @@ class TestKnowledgeBaseSearchMiddlewarePlanner:
"app.agents.new_chat.middleware.knowledge_search.search_knowledge_base",
fake_search_knowledge_base,
)
monkeypatch.setattr(
"app.agents.new_chat.middleware.knowledge_search.build_scoped_filesystem",
fake_build_scoped_filesystem,
)
llm = FakeLLM(
json.dumps(