fix(filesystem): escape sqlite like wildcards

Escape wildcard characters in recursive folder LIKE filters and metadata contains queries.
This commit is contained in:
Bukely_ 2026-05-26 20:30:33 +08:00 committed by BukeLy
parent 8e0f295464
commit 112ef99d47
2 changed files with 152 additions and 19 deletions

View file

@ -555,7 +555,7 @@ class SQLiteFileSystemStore:
WHERE child_folder.parent_id = fo.folder_id
) AS children_count
FROM folders fo
WHERE fo.path != ? AND (fo.path LIKE ?)
WHERE fo.path != ? AND (fo.path LIKE ? ESCAPE '\\')
{folder_depth_clause}
ORDER BY fo.path
LIMIT ?
@ -702,15 +702,12 @@ class SQLiteFileSystemStore:
WHERE f.deleted_at IS NULL
AND (
matched_folder.folder_id = fo.folder_id
OR matched_folder.path LIKE CASE
WHEN fo.path = '/' THEN '/%'
ELSE fo.path || '/%'
END
OR matched_folder.path LIKE {self._descendant_like_sql_expr("fo.path")} ESCAPE '\\'
)
{metadata_clause}
) AS matched_files
FROM folders fo
WHERE fo.path != ? AND fo.path LIKE ?
WHERE fo.path != ? AND fo.path LIKE ? ESCAPE '\\'
{folder_depth_clause}
)
WHERE matched_files > 0
@ -909,10 +906,10 @@ class SQLiteFileSystemStore:
SELECT 1 FROM metadata_values mv
WHERE mv.file_ref = f.file_ref
AND mv.field_id = ?
AND lower(mv.value_text) LIKE '%' || lower(?) || '%'
AND lower(mv.value_text) LIKE lower(?) ESCAPE '\\'
)
""",
[field_id, self._metadata_compare_text(expected)],
[field_id, self._contains_like(self._metadata_compare_text(expected))],
)
if operator in {"$gt", "$gte", "$lt", "$lte"}:
comparator = {
@ -1353,7 +1350,7 @@ class SQLiteFileSystemStore:
JOIN file_folders ff ON ff.file_ref = f.file_ref
JOIN folders fo ON fo.folder_id = ff.folder_id
WHERE f.deleted_at IS NULL
AND (fo.path = ? OR fo.path LIKE ?)
AND (fo.path = ? OR fo.path LIKE ? ESCAPE '\\')
""",
(path, self._descendant_like(path)),
).fetchone()
@ -1389,7 +1386,7 @@ class SQLiteFileSystemStore:
SELECT path
FROM folders
WHERE path != ?
AND path LIKE ?
AND path LIKE ? ESCAPE '\\'
AND (
CASE
WHEN TRIM(path, '/') = '' THEN 0
@ -1407,7 +1404,7 @@ class SQLiteFileSystemStore:
JOIN file_folders ff ON ff.file_ref = f.file_ref
JOIN folders fo ON fo.folder_id = ff.folder_id
WHERE f.deleted_at IS NULL
AND (fo.path = ? OR fo.path LIKE ?)
AND (fo.path = ? OR fo.path LIKE ? ESCAPE '\\')
LIMIT ?
""",
(path, self._descendant_like(path), file_limit + 1),
@ -1486,7 +1483,7 @@ class SQLiteFileSystemStore:
"""
params: list[Any]
if recursive:
sql += " AND (pf.path = ? OR pf.path LIKE ?)"
sql += " AND (pf.path = ? OR pf.path LIKE ? ESCAPE '\\')"
params = [path, self._descendant_like(path)]
if max_depth is not None:
if max_depth <= 0:
@ -1539,10 +1536,7 @@ class SQLiteFileSystemStore:
WHERE scope_ff.file_ref = f.file_ref
AND (
scope_folder.folder_id = base_folder.folder_id
OR scope_folder.path LIKE CASE
WHEN base_folder.path = '/' THEN '/%'
ELSE base_folder.path || '/%'
END
OR scope_folder.path LIKE {self._descendant_like_sql_expr("base_folder.path")} ESCAPE '\\'
)
{depth_clause}
)
@ -1567,7 +1561,7 @@ class SQLiteFileSystemStore:
if recursive and max_depth == 0:
return "0", []
path_clause = (
"(scope_folder.path = ? OR scope_folder.path LIKE ?)"
"(scope_folder.path = ? OR scope_folder.path LIKE ? ESCAPE '\\')"
if recursive
else "scope_folder.path = ?"
)
@ -1610,9 +1604,33 @@ class SQLiteFileSystemStore:
(path,),
).fetchone()
@classmethod
def _descendant_like(cls, path: str) -> str:
return "/%" if path == "/" else f"{cls._like_escape(path)}/%"
@staticmethod
def _descendant_like(path: str) -> str:
return "/%" if path == "/" else f"{path}/%"
def _descendant_like_sql_expr(path_expr: str) -> str:
escaped_expr = SQLiteFileSystemStore._like_escape_sql_expr(path_expr)
return f"CASE WHEN {path_expr} = '/' THEN '/%' ELSE {escaped_expr} || '/%' END"
@staticmethod
def _contains_like(value: str) -> str:
return f"%{SQLiteFileSystemStore._like_escape(value)}%"
@staticmethod
def _like_escape(value: str) -> str:
return (
value.replace("\\", "\\\\")
.replace("%", "\\%")
.replace("_", "\\_")
)
@staticmethod
def _like_escape_sql_expr(value_expr: str) -> str:
return (
f"replace(replace(replace({value_expr}, '\\', '\\\\'), "
"'%', '\\%'), '_', '\\_')"
)
@staticmethod
def _folder_depth(path: str) -> int:

View file

@ -0,0 +1,115 @@
from pathlib import Path
def _register_file(
filesystem,
tmp_path: Path,
filename: str,
*,
folder_path: str,
external_id: str,
metadata: dict[str, str] | None = None,
) -> None:
source = tmp_path / filename
source.write_text(f"{external_id} fixture text", encoding="utf-8")
filesystem.register_file(
storage_uri=source.as_uri(),
source_path=f"docs/{filename}",
folder_path=folder_path,
external_id=external_id,
title=external_id,
content=source.read_text(encoding="utf-8"),
metadata=metadata or {},
)
def test_descendant_folder_filter_treats_underscore_literally(tmp_path):
from pageindex.filesystem import PageIndexFileSystem
filesystem = PageIndexFileSystem(workspace=tmp_path / "workspace")
_register_file(
filesystem,
tmp_path,
"literal.txt",
folder_path="/proj_1/docs",
external_id="literal_underscore",
)
_register_file(
filesystem,
tmp_path,
"wildcard.txt",
folder_path="/projA1/docs",
external_id="wildcard_neighbor",
)
recursive = filesystem.browse("/proj_1", recursive=True, limit=10)
folder_id = filesystem.folder_info("/proj_1")["folder_id"]
scoped_results = filesystem.search(
scope={"folder_id": folder_id, "recursive": True},
semantic=False,
limit=10,
)
ranked_folders = {
folder["path"]: folder
for folder in filesystem.find_folders("/", max_depth=1, limit=10)
}
assert {folder["path"] for folder in recursive["folders"]} == {"/proj_1/docs"}
assert {file["external_id"] for file in recursive["files"]} == {"literal_underscore"}
assert {result.external_id for result in scoped_results} == {"literal_underscore"}
assert ranked_folders["/proj_1"]["matched_files"] == 1
assert ranked_folders["/projA1"]["matched_files"] == 1
assert filesystem.store.count_files_in_folder("/proj_1", recursive=True) == 1
def test_metadata_contains_treats_percent_and_underscore_literally(tmp_path):
from pageindex.filesystem import PageIndexFileSystem
filesystem = PageIndexFileSystem(workspace=tmp_path / "workspace")
filesystem.metadata.register_schema({"fields": {"status": "string"}})
_register_file(
filesystem,
tmp_path,
"percent.txt",
folder_path="/documents",
external_id="literal_percent",
metadata={"status": "100% done"},
)
_register_file(
filesystem,
tmp_path,
"percent-neighbor.txt",
folder_path="/documents",
external_id="percent_neighbor",
metadata={"status": "100X done"},
)
_register_file(
filesystem,
tmp_path,
"underscore.txt",
folder_path="/documents",
external_id="literal_underscore",
metadata={"status": "build_alpha"},
)
_register_file(
filesystem,
tmp_path,
"underscore-neighbor.txt",
folder_path="/documents",
external_id="underscore_neighbor",
metadata={"status": "buildXalpha"},
)
percent_results = filesystem.search(
metadata_filter={"status": {"$contains": "100% done"}},
semantic=False,
limit=10,
)
underscore_results = filesystem.search(
metadata_filter={"status": {"$contains": "build_alpha"}},
semantic=False,
limit=10,
)
assert {result.external_id for result in percent_results} == {"literal_percent"}
assert {result.external_id for result in underscore_results} == {"literal_underscore"}