From 112ef99d479ba89cdbd2f22732cc8a08c484631e Mon Sep 17 00:00:00 2001 From: Bukely_ Date: Tue, 26 May 2026 20:30:33 +0800 Subject: [PATCH] fix(filesystem): escape sqlite like wildcards Escape wildcard characters in recursive folder LIKE filters and metadata contains queries. --- pageindex/filesystem/store.py | 56 ++++++++++------ tests/test_pifs_like_escape.py | 115 +++++++++++++++++++++++++++++++++ 2 files changed, 152 insertions(+), 19 deletions(-) create mode 100644 tests/test_pifs_like_escape.py diff --git a/pageindex/filesystem/store.py b/pageindex/filesystem/store.py index 85de66b..b88b54e 100644 --- a/pageindex/filesystem/store.py +++ b/pageindex/filesystem/store.py @@ -555,7 +555,7 @@ class SQLiteFileSystemStore: WHERE child_folder.parent_id = fo.folder_id ) AS children_count FROM folders fo - WHERE fo.path != ? AND (fo.path LIKE ?) + WHERE fo.path != ? AND (fo.path LIKE ? ESCAPE '\\') {folder_depth_clause} ORDER BY fo.path LIMIT ? @@ -702,15 +702,12 @@ class SQLiteFileSystemStore: WHERE f.deleted_at IS NULL AND ( matched_folder.folder_id = fo.folder_id - OR matched_folder.path LIKE CASE - WHEN fo.path = '/' THEN '/%' - ELSE fo.path || '/%' - END + OR matched_folder.path LIKE {self._descendant_like_sql_expr("fo.path")} ESCAPE '\\' ) {metadata_clause} ) AS matched_files FROM folders fo - WHERE fo.path != ? AND fo.path LIKE ? + WHERE fo.path != ? AND fo.path LIKE ? ESCAPE '\\' {folder_depth_clause} ) WHERE matched_files > 0 @@ -909,10 +906,10 @@ class SQLiteFileSystemStore: SELECT 1 FROM metadata_values mv WHERE mv.file_ref = f.file_ref AND mv.field_id = ? - AND lower(mv.value_text) LIKE '%' || lower(?) || '%' + AND lower(mv.value_text) LIKE lower(?) ESCAPE '\\' ) """, - [field_id, self._metadata_compare_text(expected)], + [field_id, self._contains_like(self._metadata_compare_text(expected))], ) if operator in {"$gt", "$gte", "$lt", "$lte"}: comparator = { @@ -1353,7 +1350,7 @@ class SQLiteFileSystemStore: JOIN file_folders ff ON ff.file_ref = f.file_ref JOIN folders fo ON fo.folder_id = ff.folder_id WHERE f.deleted_at IS NULL - AND (fo.path = ? OR fo.path LIKE ?) + AND (fo.path = ? OR fo.path LIKE ? ESCAPE '\\') """, (path, self._descendant_like(path)), ).fetchone() @@ -1389,7 +1386,7 @@ class SQLiteFileSystemStore: SELECT path FROM folders WHERE path != ? - AND path LIKE ? + AND path LIKE ? ESCAPE '\\' AND ( CASE WHEN TRIM(path, '/') = '' THEN 0 @@ -1407,7 +1404,7 @@ class SQLiteFileSystemStore: JOIN file_folders ff ON ff.file_ref = f.file_ref JOIN folders fo ON fo.folder_id = ff.folder_id WHERE f.deleted_at IS NULL - AND (fo.path = ? OR fo.path LIKE ?) + AND (fo.path = ? OR fo.path LIKE ? ESCAPE '\\') LIMIT ? """, (path, self._descendant_like(path), file_limit + 1), @@ -1486,7 +1483,7 @@ class SQLiteFileSystemStore: """ params: list[Any] if recursive: - sql += " AND (pf.path = ? OR pf.path LIKE ?)" + sql += " AND (pf.path = ? OR pf.path LIKE ? ESCAPE '\\')" params = [path, self._descendant_like(path)] if max_depth is not None: if max_depth <= 0: @@ -1539,10 +1536,7 @@ class SQLiteFileSystemStore: WHERE scope_ff.file_ref = f.file_ref AND ( scope_folder.folder_id = base_folder.folder_id - OR scope_folder.path LIKE CASE - WHEN base_folder.path = '/' THEN '/%' - ELSE base_folder.path || '/%' - END + OR scope_folder.path LIKE {self._descendant_like_sql_expr("base_folder.path")} ESCAPE '\\' ) {depth_clause} ) @@ -1567,7 +1561,7 @@ class SQLiteFileSystemStore: if recursive and max_depth == 0: return "0", [] path_clause = ( - "(scope_folder.path = ? OR scope_folder.path LIKE ?)" + "(scope_folder.path = ? OR scope_folder.path LIKE ? ESCAPE '\\')" if recursive else "scope_folder.path = ?" ) @@ -1610,9 +1604,33 @@ class SQLiteFileSystemStore: (path,), ).fetchone() + @classmethod + def _descendant_like(cls, path: str) -> str: + return "/%" if path == "/" else f"{cls._like_escape(path)}/%" + @staticmethod - def _descendant_like(path: str) -> str: - return "/%" if path == "/" else f"{path}/%" + def _descendant_like_sql_expr(path_expr: str) -> str: + escaped_expr = SQLiteFileSystemStore._like_escape_sql_expr(path_expr) + return f"CASE WHEN {path_expr} = '/' THEN '/%' ELSE {escaped_expr} || '/%' END" + + @staticmethod + def _contains_like(value: str) -> str: + return f"%{SQLiteFileSystemStore._like_escape(value)}%" + + @staticmethod + def _like_escape(value: str) -> str: + return ( + value.replace("\\", "\\\\") + .replace("%", "\\%") + .replace("_", "\\_") + ) + + @staticmethod + def _like_escape_sql_expr(value_expr: str) -> str: + return ( + f"replace(replace(replace({value_expr}, '\\', '\\\\'), " + "'%', '\\%'), '_', '\\_')" + ) @staticmethod def _folder_depth(path: str) -> int: diff --git a/tests/test_pifs_like_escape.py b/tests/test_pifs_like_escape.py new file mode 100644 index 0000000..82e7ef9 --- /dev/null +++ b/tests/test_pifs_like_escape.py @@ -0,0 +1,115 @@ +from pathlib import Path + + +def _register_file( + filesystem, + tmp_path: Path, + filename: str, + *, + folder_path: str, + external_id: str, + metadata: dict[str, str] | None = None, +) -> None: + source = tmp_path / filename + source.write_text(f"{external_id} fixture text", encoding="utf-8") + filesystem.register_file( + storage_uri=source.as_uri(), + source_path=f"docs/{filename}", + folder_path=folder_path, + external_id=external_id, + title=external_id, + content=source.read_text(encoding="utf-8"), + metadata=metadata or {}, + ) + + +def test_descendant_folder_filter_treats_underscore_literally(tmp_path): + from pageindex.filesystem import PageIndexFileSystem + + filesystem = PageIndexFileSystem(workspace=tmp_path / "workspace") + _register_file( + filesystem, + tmp_path, + "literal.txt", + folder_path="/proj_1/docs", + external_id="literal_underscore", + ) + _register_file( + filesystem, + tmp_path, + "wildcard.txt", + folder_path="/projA1/docs", + external_id="wildcard_neighbor", + ) + + recursive = filesystem.browse("/proj_1", recursive=True, limit=10) + folder_id = filesystem.folder_info("/proj_1")["folder_id"] + scoped_results = filesystem.search( + scope={"folder_id": folder_id, "recursive": True}, + semantic=False, + limit=10, + ) + ranked_folders = { + folder["path"]: folder + for folder in filesystem.find_folders("/", max_depth=1, limit=10) + } + + assert {folder["path"] for folder in recursive["folders"]} == {"/proj_1/docs"} + assert {file["external_id"] for file in recursive["files"]} == {"literal_underscore"} + assert {result.external_id for result in scoped_results} == {"literal_underscore"} + assert ranked_folders["/proj_1"]["matched_files"] == 1 + assert ranked_folders["/projA1"]["matched_files"] == 1 + assert filesystem.store.count_files_in_folder("/proj_1", recursive=True) == 1 + + +def test_metadata_contains_treats_percent_and_underscore_literally(tmp_path): + from pageindex.filesystem import PageIndexFileSystem + + filesystem = PageIndexFileSystem(workspace=tmp_path / "workspace") + filesystem.metadata.register_schema({"fields": {"status": "string"}}) + _register_file( + filesystem, + tmp_path, + "percent.txt", + folder_path="/documents", + external_id="literal_percent", + metadata={"status": "100% done"}, + ) + _register_file( + filesystem, + tmp_path, + "percent-neighbor.txt", + folder_path="/documents", + external_id="percent_neighbor", + metadata={"status": "100X done"}, + ) + _register_file( + filesystem, + tmp_path, + "underscore.txt", + folder_path="/documents", + external_id="literal_underscore", + metadata={"status": "build_alpha"}, + ) + _register_file( + filesystem, + tmp_path, + "underscore-neighbor.txt", + folder_path="/documents", + external_id="underscore_neighbor", + metadata={"status": "buildXalpha"}, + ) + + percent_results = filesystem.search( + metadata_filter={"status": {"$contains": "100% done"}}, + semantic=False, + limit=10, + ) + underscore_results = filesystem.search( + metadata_filter={"status": {"$contains": "build_alpha"}}, + semantic=False, + limit=10, + ) + + assert {result.external_id for result in percent_results} == {"literal_percent"} + assert {result.external_id for result in underscore_results} == {"literal_underscore"}