feat(pifs): add find maxdepth traversal limit

This commit is contained in:
BukeLy 2026-05-26 15:17:35 +08:00
parent 5a78131509
commit 144e8ba325
4 changed files with 303 additions and 14 deletions

View file

@ -89,6 +89,7 @@ class PIFSCommandExecutor:
"- mode: read-only inspection",
"- ls/tree: folder browsing",
"- find --where: exact/canonical metadata DSL filtering",
"- find <folder> -maxdepth N -type f|d: bounded folder traversal for find",
"- grep -R: recursive lexical/FTS search only; semantic vector prefilter is disabled",
"- cat <ref> --structure/--node/--page: cached PageIndex reads for PDF/Markdown files",
"- cat <ref> --all: full text artifact reads for txt/text files",
@ -230,6 +231,7 @@ class PIFSCommandExecutor:
relation = None
limit = 10
file_type = None
max_depth = None
i = 0
while i < len(args):
arg = args[i]
@ -248,6 +250,9 @@ class PIFSCommandExecutor:
elif arg == "-type":
i += 1
file_type = args[i]
elif arg == "-maxdepth":
i += 1
max_depth = self._parse_find_maxdepth(args[i] if i < len(args) else None)
elif arg.startswith("-"):
raise PIFSCommandError(f"Unsupported find option: {arg}")
else:
@ -259,8 +264,26 @@ class PIFSCommandExecutor:
raise PIFSCommandError("find supports only one of --name or --relation")
if file_type == "d":
if where:
return self.filesystem.find_folders(path, metadata_filter=where, limit=limit)
return self.filesystem.browse(path, recursive=True, limit=limit)["folders"]
return self.filesystem.find_folders(
path,
metadata_filter=where,
limit=limit,
max_depth=max_depth,
)
folders = self.filesystem.browse(
path,
recursive=True,
limit=limit,
max_depth=max_depth,
)["folders"]
if max_depth is not None and limit != 0:
return [self.filesystem.folder_info(path), *folders][:limit]
return folders
scope = {"folder_path": path, "recursive": True}
if max_depth is not None:
if max_depth == 0:
return []
scope["max_depth"] = max_depth
if relation:
if not self.filesystem.has_semantic_channel("relation"):
raise PIFSCommandError(
@ -269,7 +292,7 @@ class PIFSCommandExecutor:
return self.filesystem.search_semantic_channel(
"relation",
self._semantic_retrieval_query(relation),
scope={"folder_path": path, "recursive": True},
scope=scope,
metadata_filter=where,
limit=limit,
)
@ -277,13 +300,13 @@ class PIFSCommandExecutor:
return self.filesystem.search_semantic_channel(
"entity",
self._semantic_retrieval_query(name),
scope={"folder_path": path, "recursive": True},
scope=scope,
metadata_filter=where,
limit=limit,
)
return self.filesystem.search(
query=name,
scope={"folder_path": path, "recursive": True},
scope=scope,
metadata_filter=where,
limit=limit,
semantic=False,
@ -1466,6 +1489,18 @@ class PIFSCommandExecutor:
raise PIFSCommandError(f"{label} must be non-negative")
return parsed
@staticmethod
def _parse_find_maxdepth(value: str | None) -> int:
if value is None:
raise PIFSCommandError("find -maxdepth requires an integer >= 0")
try:
parsed = int(value)
except ValueError as exc:
raise PIFSCommandError("find -maxdepth requires an integer >= 0") from exc
if parsed < 0:
raise PIFSCommandError("find -maxdepth requires an integer >= 0")
return parsed
@staticmethod
def _try_json_loads(input_text: str) -> Any | None:
try:

View file

@ -238,17 +238,32 @@ class PageIndexFileSystem:
path: str = "/",
recursive: bool = False,
limit: int = 100,
max_depth: int | None = None,
) -> dict[str, list[dict[str, Any]]]:
return self.store.list_folder(path, recursive=recursive, limit=limit)
return self.store.list_folder(
path,
recursive=recursive,
limit=limit,
max_depth=max_depth,
)
def folder_info(self, path: str = "/") -> dict[str, Any]:
return self.store.folder_info(path)
def find_folders(
self,
path: str = "/",
metadata_filter: Optional[dict[str, Any] | str] = None,
limit: int = 100,
max_depth: int | None = None,
) -> list[dict[str, Any]]:
parsed_filter = self.metadata.parse_filter(metadata_filter)
return self.store.find_folders(path, metadata_filter=parsed_filter, limit=limit)
return self.store.find_folders(
path,
metadata_filter=parsed_filter,
limit=limit,
max_depth=max_depth,
)
def create_folder(
self,
@ -484,6 +499,7 @@ class PageIndexFileSystem:
"grep_recursive": True,
"grep_recursive_semantic_prefilter": False,
"grep_recursive_guard": "bounded broad-folder notice",
"find_maxdepth": True,
},
"semantic": {
"backend_configured": self.semantic_retrieval_backend is not None,

View file

@ -731,15 +731,33 @@ class SQLiteFileSystemStore:
for row in rows
]
def list_folder(self, path: str = "/", recursive: bool = False, limit: int = 100) -> dict[str, Any]:
def list_folder(
self,
path: str = "/",
recursive: bool = False,
limit: int = 100,
max_depth: int | None = None,
) -> dict[str, Any]:
path = normalize_path(path)
if max_depth is not None and max_depth < 0:
raise ValueError("max_depth must be non-negative")
with self.connect() as conn:
folder = self._folder_by_path(conn, path)
if folder is None:
raise KeyError(f"Unknown folder path: {path}")
if recursive:
folder_depth_clause = ""
folder_depth_params: list[Any] = []
if max_depth is not None:
if max_depth == 0:
folder_depth_clause = "AND 0"
else:
folder_depth_clause = (
f"AND ({self._folder_depth_sql('fo.path')} - ?) <= ?"
)
folder_depth_params = [self._folder_depth(path), max_depth]
folder_rows = conn.execute(
"""
f"""
SELECT
fo.folder_id,
fo.parent_id,
@ -765,12 +783,19 @@ class SQLiteFileSystemStore:
) AS children_count
FROM folders fo
WHERE fo.path != ? AND (fo.path LIKE ?)
{folder_depth_clause}
ORDER BY fo.path
LIMIT ?
""",
(path, self._descendant_like(path), limit),
(path, self._descendant_like(path), *folder_depth_params, limit),
).fetchall()
file_rows = self._file_rows_for_scope(conn, path, True, limit)
file_rows = self._file_rows_for_scope(
conn,
path,
True,
limit,
max_depth=max_depth,
)
else:
folder_rows = conn.execute(
"""
@ -810,16 +835,64 @@ class SQLiteFileSystemStore:
"files": [self._file_summary(row) for row in file_rows],
}
def folder_info(self, path: str = "/") -> dict[str, Any]:
path = normalize_path(path)
with self.connect() as conn:
row = conn.execute(
"""
SELECT
fo.folder_id,
fo.parent_id,
fo.name,
fo.path,
fo.description,
fo.kind,
fo.metadata_json,
fo.created_at,
fo.updated_at,
(
SELECT COUNT(DISTINCT child_ff.file_ref)
FROM file_folders child_ff
JOIN files child_file
ON child_file.file_ref = child_ff.file_ref
AND child_file.deleted_at IS NULL
WHERE child_ff.folder_id = fo.folder_id
) AS file_count,
(
SELECT COUNT(*)
FROM folders child_folder
WHERE child_folder.parent_id = fo.folder_id
) AS children_count
FROM folders fo
WHERE fo.path = ?
""",
(path,),
).fetchone()
if row is None:
raise KeyError(f"Unknown folder path: {path}")
return self._folder_row_to_dict(row)
def find_folders(
self,
path: str = "/",
*,
metadata_filter: Optional[dict[str, Any]] = None,
limit: int = 100,
max_depth: int | None = None,
) -> list[dict[str, Any]]:
path = normalize_path(path)
if max_depth is not None and max_depth < 0:
raise ValueError("max_depth must be non-negative")
metadata_sql, metadata_params = self._metadata_filter_sql(metadata_filter)
metadata_clause = f"AND {' AND '.join(metadata_sql)}" if metadata_sql else ""
folder_depth_clause = ""
folder_depth_params: list[Any] = []
if max_depth is not None:
if max_depth == 0:
folder_depth_clause = "AND 0"
else:
folder_depth_clause = f"AND ({self._folder_depth_sql('fo.path')} - ?) <= ?"
folder_depth_params = [self._folder_depth(path), max_depth]
sql = f"""
SELECT *
FROM (
@ -865,12 +938,19 @@ class SQLiteFileSystemStore:
) AS matched_files
FROM folders fo
WHERE fo.path != ? AND fo.path LIKE ?
{folder_depth_clause}
)
WHERE matched_files > 0
ORDER BY path
LIMIT ?
"""
params = [*metadata_params, path, self._descendant_like(path), limit]
params = [
*metadata_params,
path,
self._descendant_like(path),
*folder_depth_params,
limit,
]
with self.connect() as conn:
folder = self._folder_by_path(conn, path)
if folder is None:
@ -1577,6 +1657,7 @@ class SQLiteFileSystemStore:
path: str,
recursive: bool,
limit: int,
max_depth: int | None = None,
) -> list[sqlite3.Row]:
sql = """
SELECT
@ -1601,6 +1682,12 @@ class SQLiteFileSystemStore:
if recursive:
sql += " AND (pf.path = ? OR pf.path LIKE ?)"
params = [path, self._descendant_like(path)]
if max_depth is not None:
if max_depth <= 0:
sql += " AND 0"
else:
sql += f" AND ({self._folder_depth_sql('pf.path')} - ?) <= ?"
params.extend([self._folder_depth(path), max_depth - 1])
else:
sql += " AND pf.path = ?"
params = [path]
@ -1612,14 +1699,30 @@ class SQLiteFileSystemStore:
if not scope:
return "", []
recursive = scope.get("recursive", True)
max_depth = scope.get("max_depth")
if max_depth is not None:
max_depth = int(max_depth)
if max_depth < 0:
raise ValueError("max_depth must be non-negative")
folder_id = scope.get("folder_id")
if folder_id:
if folder_id == "root":
folder_path = "/"
else:
if recursive:
if max_depth == 0:
return "0", []
depth_clause = ""
depth_params: list[Any] = []
if max_depth is not None:
depth_clause = (
"AND "
f"({self._folder_depth_sql('scope_folder.path')} - "
f"{self._folder_depth_sql('base_folder.path')}) <= ?"
)
depth_params = [max_depth - 1]
return (
"""
f"""
EXISTS (
SELECT 1
FROM file_folders scope_ff
@ -1635,9 +1738,10 @@ class SQLiteFileSystemStore:
ELSE base_folder.path || '/%'
END
)
{depth_clause}
)
""",
[folder_id],
[folder_id, *depth_params],
)
return (
"""
@ -1654,12 +1758,18 @@ class SQLiteFileSystemStore:
folder_path = normalize_path(scope.get("folder_path") or scope.get("path"))
else:
return "", []
if recursive and max_depth == 0:
return "0", []
path_clause = (
"(scope_folder.path = ? OR scope_folder.path LIKE ?)"
if recursive
else "scope_folder.path = ?"
)
params = [folder_path, self._descendant_like(folder_path)] if recursive else [folder_path]
depth_clause = ""
if recursive and max_depth is not None:
depth_clause = f"AND ({self._folder_depth_sql('scope_folder.path')} - ?) <= ?"
params.extend([self._folder_depth(folder_path), max_depth - 1])
return (
f"""
EXISTS (
@ -1669,6 +1779,7 @@ class SQLiteFileSystemStore:
ON scope_folder.folder_id = scope_ff.folder_id
WHERE scope_ff.file_ref = f.file_ref
AND {path_clause}
{depth_clause}
)
""",
params,
@ -1702,6 +1813,16 @@ class SQLiteFileSystemStore:
stripped = normalize_path(path).strip("/")
return 0 if not stripped else len(stripped.split("/"))
@staticmethod
def _folder_depth_sql(path_expr: str) -> str:
return (
"(CASE "
f"WHEN TRIM({path_expr}, '/') = '' THEN 0 "
f"ELSE LENGTH(TRIM({path_expr}, '/')) "
f"- LENGTH(REPLACE(TRIM({path_expr}, '/'), '/', '')) + 1 "
"END)"
)
@classmethod
def _folder_row_to_dict(cls, row: sqlite3.Row) -> dict[str, Any]:
return {

View file

@ -0,0 +1,117 @@
import json
from pathlib import Path
import pytest
def _register_find_fixture(tmp_path: Path):
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
source_dir = tmp_path / "source"
source_dir.mkdir()
filesystem = PageIndexFileSystem(workspace=tmp_path / "workspace")
filesystem.metadata.register_schema({"fields": {"department": "string"}})
def add_file(
filename: str,
*,
folder_path: str,
external_id: str,
title: str,
domain: str,
) -> None:
source = source_dir / filename
source.write_text(f"{title} fixture text", encoding="utf-8")
filesystem.register_file(
storage_uri=source.as_uri(),
source_path=f"docs/{filename}",
folder_path=folder_path,
external_id=external_id,
title=title,
content=source.read_text(encoding="utf-8"),
metadata={"department": domain},
)
add_file(
"root.txt",
folder_path="/documents",
external_id="doc_root",
title="Root document",
domain="ops",
)
add_file(
"child.txt",
folder_path="/documents/team",
external_id="doc_child",
title="Child document",
domain="ops",
)
add_file(
"deep.txt",
folder_path="/documents/team/deep",
external_id="doc_deep",
title="Deep document",
domain="ops",
)
add_file(
"other.txt",
folder_path="/documents/team",
external_id="doc_other",
title="Other document",
domain="finance",
)
return PIFSCommandExecutor(filesystem, json_output=True)
def _data(output: str):
return json.loads(output)["data"]
def test_find_maxdepth_one_returns_direct_files_only(tmp_path):
executor = _register_find_fixture(tmp_path)
rows = _data(executor.execute("find /documents -maxdepth 1 -type f"))
assert [row["external_id"] for row in rows] == ["doc_root"]
def test_find_maxdepth_zero_type_directory_returns_start_folder(tmp_path):
executor = _register_find_fixture(tmp_path)
rows = _data(executor.execute("find /documents -maxdepth 0 -type d"))
assert [row["path"] for row in rows] == ["/documents"]
def test_find_maxdepth_combines_with_where_and_limit(tmp_path):
executor = _register_find_fixture(tmp_path)
rows = _data(
executor.execute(
"""find /documents -maxdepth 2 -type f --where '{"department":"ops"}' --limit 1"""
)
)
assert len(rows) == 1
assert rows[0]["metadata"]["department"] == "ops"
assert rows[0]["folder_path"] in {"/documents", "/documents/team"}
def test_find_maxdepth_rejects_invalid_values_and_unsupported_options(tmp_path):
from pageindex.filesystem.commands import PIFSCommandError
executor = _register_find_fixture(tmp_path)
with pytest.raises(PIFSCommandError, match="find -maxdepth requires an integer >= 0"):
executor.execute("find /documents -maxdepth nope -type f")
with pytest.raises(PIFSCommandError, match="find -maxdepth requires an integer >= 0"):
executor.execute("find /documents -maxdepth -1 -type f")
with pytest.raises(PIFSCommandError, match="Unsupported find option: -exec"):
executor.execute("find /documents -maxdepth 1 -type f -exec")
def test_find_maxdepth_is_advertised_to_agents(tmp_path):
executor = _register_find_fixture(tmp_path)
assert "-maxdepth N -type f|d" in executor.describe_available_command_surfaces()
assert executor.command_capabilities()["retrieval"]["lexical"]["find_maxdepth"] is True