diff --git a/pageindex/filesystem/cli.py b/pageindex/filesystem/cli.py index 7f91cda..bb01f80 100644 --- a/pageindex/filesystem/cli.py +++ b/pageindex/filesystem/cli.py @@ -279,6 +279,23 @@ def _run_passthrough( return 0 +def _run_add(argv: list[str], *, workspace: str) -> int: + parser = argparse.ArgumentParser( + prog="pifs add", + description="Add a local file to a PageIndex FileSystem workspace", + ) + parser.add_argument("physical_path") + parser.add_argument("virtual_target") + args = parser.parse_args(argv) + + filesystem = _filesystem_from_workspace(workspace) + info = filesystem.add_file(args.physical_path, args.virtual_target) + print(f"added: {info.get('path') or '/' + str(info.get('source_path') or '').strip('/')}") + print(f"file_ref: {info['file_ref']}") + print(f"storage_uri: {info['storage_uri']}") + return 0 + + def _run_set(argv: list[str]) -> int: parser = argparse.ArgumentParser( prog="pifs set", @@ -326,6 +343,10 @@ def main(argv: list[str] | None = None) -> int: return _run_ask(command_args, workspace_default=args.workspace) if command_name == "chat": return _run_chat(command_args, workspace_default=args.workspace) + if command_name == "add": + if not args.workspace: + parser.error("--workspace is required unless PIFS_WORKSPACE is set or `pifs set workspace ` has been run") + return _run_add(command_args, workspace=args.workspace) if "--json" in command_tokens: command_tokens = [token for token in command_tokens if token != "--json"] diff --git a/pageindex/filesystem/core.py b/pageindex/filesystem/core.py index 557b4e1..d8f6310 100644 --- a/pageindex/filesystem/core.py +++ b/pageindex/filesystem/core.py @@ -2,7 +2,9 @@ from __future__ import annotations import json import os -from pathlib import Path +import shutil +import tempfile +from pathlib import Path, PurePosixPath from typing import TYPE_CHECKING, Any, Optional, Union from urllib.parse import unquote, urlparse @@ -91,6 +93,13 @@ PAGEINDEX_DOCUMENT_CONTENT_TYPES = { } TEXT_ARTIFACT_SUFFIXES = {".txt", ".text"} TEXT_ARTIFACT_CONTENT_TYPES = {"text/plain"} +ADD_FILE_CONTENT_TYPES = { + ".pdf": "application/pdf", + ".md": "text/markdown", + ".markdown": "text/markdown", + ".txt": "text/plain", + ".text": "text/plain", +} class PageIndexFileSystem: @@ -171,6 +180,100 @@ class PageIndexFileSystem: self._ensure_register_completion_defaults() return self.register_file(**kwargs) + def add_file( + self, + physical_path: Union[str, Path], + virtual_target: Union[str, Path], + ) -> dict[str, Any]: + source = Path(physical_path).expanduser() + if not source.is_file(): + raise FileNotFoundError(f"Source file not found: {source}") + suffix = source.suffix.lower() + content_type = ADD_FILE_CONTENT_TYPES.get(suffix) + if content_type is None: + supported = ", ".join(sorted(ADD_FILE_CONTENT_TYPES)) + raise ValueError( + f"Unsupported file type: {suffix or ''}; supported: {supported}" + ) + + folder_path, filename, virtual_path = self._resolve_add_target( + virtual_target, + physical_basename=source.name, + physical_suffix=suffix, + ) + if self.store.file_basename_exists_in_folder(folder_path, filename): + raise FileExistsError(f"File already exists at {virtual_path}") + if not self.summary_projection_index: + raise RuntimeError("pifs add requires the summary projection index") + + self._ensure_add_completion_defaults() + add_created_folder_paths = self._add_created_folder_paths(folder_path) + file_ref = make_file_ref(virtual_path.strip("/")) + uploads_dir = self.workspace / "artifacts" / "uploads" + final_dir = uploads_dir / file_ref + final_path = final_dir / filename + final_dir_created = False + catalog_inserted = False + records: list[dict[str, Any]] = [] + preexisting_pageindex_doc_ids = self._pageindex_cache_doc_ids() + + uploads_dir.mkdir(parents=True, exist_ok=True) + with tempfile.TemporaryDirectory(prefix=f".add-{file_ref}-", dir=uploads_dir) as tmp: + temp_path = Path(tmp) / filename + try: + shutil.copy2(source, temp_path) + if final_dir.exists(): + raise FileExistsError( + f"Workspace artifact already exists for {virtual_path}: {final_dir}" + ) + final_dir.mkdir(parents=True) + final_dir_created = True + os.replace(temp_path, final_path) + + record = self._prepare_file_record( + { + "storage_uri": final_path.as_uri(), + "source_path": virtual_path.strip("/"), + "folder_path": folder_path, + "metadata": {}, + "external_id": None, + "title": filename, + "content": self._add_file_content(final_path, content_type), + "content_type": content_type, + "metadata_policy": self._add_metadata_policy(), + } + ) + records = [record] + self._require_add_pageindex_ready(record) + self._generate_register_metadata(record) + self._require_add_metadata_ready(record) + self._register_generation_policy_schema(records) + self.store.insert_files(records) + catalog_inserted = True + if self._complete_summary_projection_index(record): + self.store.update_file_metadata_status( + record["file_ref"], + metadata=record["metadata"], + metadata_status=record["metadata_status"], + ) + self._require_add_summary_projection_ready(record) + self._sync_owned_raw_artifact(record) + self._ensure_add_semantic_retrieval_ready() + except Exception: + if catalog_inserted: + self._cleanup_add_catalog_record(file_ref) + self._cleanup_add_summary_projection(records) + self._cleanup_failed_register_artifacts(records) + self._cleanup_add_pageindex_cache(records, preexisting_pageindex_doc_ids) + self._cleanup_add_created_folders(add_created_folder_paths) + if final_dir_created: + shutil.rmtree(final_dir, ignore_errors=True) + raise + + info = self.store.file_info(file_ref) + info["path"] = virtual_path + return info + def register_files(self, files: list[dict[str, Any]]) -> list[str]: records = [self._prepare_file_record(file) for file in files] try: @@ -250,6 +353,97 @@ class PageIndexFileSystem: embedding_timeout=self.summary_projection_embedding_timeout, ) + def _ensure_add_completion_defaults(self) -> None: + if self.metadata_generator is None: + self.metadata_generator = MetadataGenerator( + provider=self.metadata_provider, + model=self.metadata_model, + base_url=self.metadata_base_url, + max_text_chars=self.metadata_max_text_chars, + ) + if self.summary_projection_index and self.summary_projection_indexer is None: + from .projection_indexing import SummaryProjectionIndexer + + self.summary_projection_indexer = SummaryProjectionIndexer.from_provider( + self.summary_projection_index_dir, + embedding_provider=self.summary_projection_embedding_provider, + embedding_model=self.summary_projection_embedding_model, + embedding_dimensions=self.summary_projection_embedding_dimensions, + embedding_timeout=self.summary_projection_embedding_timeout, + ) + + def _ensure_add_semantic_retrieval_ready(self) -> None: + indexer = self.summary_projection_indexer + if indexer is None: + raise RuntimeError("pifs add requires a summary projection indexer") + from .hybrid_projection import HybridProjectionSearchBackend + + index_dir = Path(getattr(indexer, "index_dir", self.summary_projection_index_dir)) + embedder = getattr(indexer, "embedder", None) + if embedder is None: + self.configure_hybrid_projection_retrieval( + index_dir, + embedding_provider=str( + getattr( + indexer, + "embedding_provider", + self.summary_projection_embedding_provider, + ) + ), + embedding_model=str( + getattr(indexer, "embedding_model", self.summary_projection_embedding_model) + ), + embedding_dimensions=int( + getattr( + indexer, + "embedding_dimensions", + self.summary_projection_embedding_dimensions, + ) + ), + embedding_timeout=self.summary_projection_embedding_timeout, + ) + else: + embedding_cache = getattr(indexer, "embedding_cache", None) + self.semantic_retrieval_backend = HybridProjectionSearchBackend( + index_dir, + embedder=embedder, + embedding_provider=str( + getattr( + indexer, + "embedding_provider", + self.summary_projection_embedding_provider, + ) + ), + embedding_model=str( + getattr(indexer, "embedding_model", self.summary_projection_embedding_model) + ), + embedding_dimensions=int( + getattr( + indexer, + "embedding_dimensions", + self.summary_projection_embedding_dimensions, + ) + ), + embedding_cache_path=getattr(embedding_cache, "db_path", None), + ) + if "summary" not in self.semantic_retrieval_channels(): + raise RuntimeError("pifs add failed to configure summary semantic retrieval") + + def _add_created_folder_paths(self, folder_path: str) -> list[str]: + paths = self._folder_ancestor_paths(folder_path) + return [path for path in paths if not self.store.folder_exists(path)] + + @staticmethod + def _folder_ancestor_paths(folder_path: str) -> list[str]: + normalized = normalize_path(folder_path) + if normalized == "/": + return [] + segments = [segment for segment in normalized.strip("/").split("/") if segment] + paths: list[str] = [] + for index in range(1, len(segments) + 1): + paths.append("/" + "/".join(segments[:index])) + return paths + def configure_existing_projection_retrieval(self) -> bool: """Attach semantic retrieval to already-built projection indexes. @@ -1075,6 +1269,117 @@ class PageIndexFileSystem: def _create_folder(self, path: str) -> str: return self.create_folder(path) + @classmethod + def _resolve_add_target( + cls, + virtual_target: Union[str, Path], + *, + physical_basename: str, + physical_suffix: str, + ) -> tuple[str, str, str]: + raw_target = str(virtual_target).strip() + if not raw_target: + raise ValueError("pifs add target is required") + normalized = normalize_path(raw_target) + posix_target = PurePosixPath(normalized) + raw_looks_like_folder = raw_target.replace("\\", "/").endswith("/") + target_suffix = posix_target.suffix.lower() + if raw_looks_like_folder or target_suffix not in ADD_FILE_CONTENT_TYPES: + folder_path = normalized + filename = physical_basename + else: + if target_suffix != physical_suffix: + raise ValueError( + "pifs add target file extension must match the physical file extension" + ) + folder_path = normalize_path(str(posix_target.parent)) + filename = posix_target.name + cls._validate_add_filename(filename) + virtual_path = cls._join_virtual_file_path(folder_path, filename) + return folder_path, filename, virtual_path + + @staticmethod + def _validate_add_filename(filename: str) -> None: + if not filename or filename in {".", ".."}: + raise ValueError("pifs add target filename is required") + if "/" in filename or "\\" in filename: + raise ValueError("pifs add target filename must be a basename") + + @staticmethod + def _join_virtual_file_path(folder_path: str, filename: str) -> str: + folder_path = normalize_path(folder_path) + if folder_path == "/": + return f"/{filename}" + return f"{folder_path}/{filename}" + + @staticmethod + def _add_metadata_policy() -> dict[str, Any]: + return { + "fields": { + "summary": True, + "doc_type": False, + "domain": False, + "topic": False, + "entity": False, + "relation": False, + }, + "projection_indexes": {"summary": True}, + "batch": False, + } + + def _add_file_content(self, path: Path, content_type: str) -> str: + if self._source_format(str(path), content_type) in {"markdown", "text"}: + return path.read_text(encoding="utf-8") + return "" + + def _require_add_pageindex_ready(self, record: dict[str, Any]) -> None: + if self._source_format(record["source_path"], record["content_type"]) not in { + "pdf", + "markdown", + }: + return + if record.get("pageindex_tree_status") == "built" and record.get("pageindex_doc_id"): + return + message = self._pageindex_tree_failure_message(record.get("metadata_status")) or ( + "PageIndex tree was not built" + ) + raise RuntimeError(f"pifs add failed to build PageIndex tree: {message}") + + @staticmethod + def _require_add_metadata_ready(record: dict[str, Any]) -> None: + metadata = record.get("metadata") or {} + summary = str(metadata.get("summary") or "").strip() + if not summary: + raise MetadataGenerationError( + "pifs add requires synchronous generated summary metadata" + ) + status = record.get("metadata_status") or {} + summary_status = (status.get("fields") or {}).get("summary") or {} + if summary_status.get("status") != "generated": + raise MetadataGenerationError( + "pifs add requires generated summary metadata before registration" + ) + if status.get("status") == "failed": + raise MetadataGenerationError( + "pifs add metadata generation failed before registration" + ) + + def _require_add_summary_projection_ready(self, record: dict[str, Any]) -> None: + if not self.summary_projection_index: + return + summary_projection = ( + (record.get("metadata_status") or {}) + .get("projection_indexes", {}) + .get("summary") + ) + if not summary_projection or not summary_projection.get("requested"): + raise RuntimeError("pifs add requires a requested summary projection index") + if summary_projection.get("status") != "ready": + detail = summary_projection.get("error") or summary_projection.get("status") + raise RuntimeError( + f"pifs add failed to build summary projection index: {detail}" + ) + def _prepare_file_record(self, file: dict[str, Any]) -> dict[str, Any]: storage_uri = file["storage_uri"] raw_source_path = str(file["source_path"]) @@ -1399,6 +1704,98 @@ class PageIndexFileSystem: if record.get("_pifs_owned_raw_artifact") and record.get("raw_artifact_path"): self._unlink_artifact(record["raw_artifact_path"]) + def _cleanup_add_catalog_record(self, file_ref: str) -> None: + try: + self.store.delete_file(file_ref) + except Exception: + return + + def _cleanup_add_summary_projection(self, records: list[dict[str, Any]]) -> None: + indexer = self.summary_projection_indexer + if indexer is None: + return + delete_summary = getattr(indexer, "delete_summary", None) + for record in records: + file_ref = str(record.get("file_ref") or "") + if not file_ref: + continue + try: + if callable(delete_summary): + delete_summary(file_ref) + continue + index = getattr(indexer, "index", None) + delete_file_refs = getattr(index, "delete_file_refs", None) + if callable(delete_file_refs): + delete_file_refs([file_ref]) + except Exception: + continue + + def _cleanup_add_created_folders(self, folder_paths: list[str]) -> None: + for folder_path in reversed(folder_paths): + try: + self.store.delete_empty_folder(folder_path) + except Exception: + continue + + def _pageindex_cache_doc_ids(self) -> set[str]: + workspace = self.pageindex_client_workspace + doc_ids = {path.stem for path in workspace.glob("*.json") if path.name != "_meta.json"} + meta_path = workspace / "_meta.json" + if not meta_path.exists(): + return doc_ids + try: + payload = json.loads(meta_path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return doc_ids + if isinstance(payload, dict): + doc_ids.update(str(doc_id) for doc_id in payload) + return doc_ids + + def _cleanup_add_pageindex_cache( + self, + records: list[dict[str, Any]], + preexisting_doc_ids: set[str], + ) -> None: + doc_ids = sorted(self._pageindex_cache_doc_ids() - preexisting_doc_ids) + for record in records: + doc_id = str(record.get("pageindex_doc_id") or "").strip() + if doc_id and doc_id not in preexisting_doc_ids: + doc_ids.append(doc_id) + doc_ids = sorted(set(doc_ids)) + if not doc_ids: + return + workspace = self.pageindex_client_workspace + for doc_id in doc_ids: + try: + (workspace / f"{doc_id}.json").unlink() + except FileNotFoundError: + pass + except Exception: + continue + meta_path = workspace / "_meta.json" + if not meta_path.exists(): + return + try: + payload = json.loads(meta_path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return + if not isinstance(payload, dict): + return + changed = False + for doc_id in doc_ids: + if doc_id in payload: + payload.pop(doc_id, None) + changed = True + if not changed: + return + try: + meta_path.write_text( + json.dumps(payload, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + except OSError: + return + @staticmethod def _metadata_policy_is_batch(policy: dict[str, Any]) -> bool: return bool(policy.get("batch")) or policy.get("mode") == "batch" diff --git a/pageindex/filesystem/projection_indexing.py b/pageindex/filesystem/projection_indexing.py index 63802d5..06a1a93 100644 --- a/pageindex/filesystem/projection_indexing.py +++ b/pageindex/filesystem/projection_indexing.py @@ -104,6 +104,9 @@ class SummaryProjectionIndexer: "embedding_dimensions": self.embedding_dimensions, } + def delete_summary(self, file_ref: str) -> int: + return self.index.delete_file_refs([file_ref]) + def _ensure_index(self) -> None: if not self.index.db_path.exists(): self.index.reset( diff --git a/pageindex/filesystem/semantic_index.py b/pageindex/filesystem/semantic_index.py index 4a29551..5b3e393 100644 --- a/pageindex/filesystem/semantic_index.py +++ b/pageindex/filesystem/semantic_index.py @@ -146,6 +146,35 @@ class SQLiteVecSemanticIndex: conn.commit() return inserted + def delete_file_refs(self, file_refs: list[str]) -> int: + refs = [str(file_ref) for file_ref in file_refs if str(file_ref)] + if not refs: + return 0 + placeholders = ", ".join("?" for _ in refs) + with self.connect() as conn: + rows = conn.execute( + f""" + SELECT rowid + FROM semantic_index_docs + WHERE file_ref IN ({placeholders}) + """, + refs, + ).fetchall() + rowids = [int(row["rowid"]) for row in rows] + if not rowids: + return 0 + rowid_placeholders = ", ".join("?" for _ in rowids) + conn.execute( + f"DELETE FROM semantic_index_vec WHERE rowid IN ({rowid_placeholders})", + rowids, + ) + conn.execute( + f"DELETE FROM semantic_index_docs WHERE rowid IN ({rowid_placeholders})", + rowids, + ) + conn.commit() + return len(rowids) + def search( self, vector: list[float], diff --git a/pageindex/filesystem/store.py b/pageindex/filesystem/store.py index 30a7d32..10e1e7a 100644 --- a/pageindex/filesystem/store.py +++ b/pageindex/filesystem/store.py @@ -1056,6 +1056,55 @@ class SQLiteFileSystemStore: (metadata_text_value, file_ref), ) + def delete_file(self, target: str) -> None: + with self.connect() as conn: + file_ref = self._resolve_file_ref(conn, target) + conn.execute("DELETE FROM file_fts WHERE file_ref = ?", (file_ref,)) + conn.execute("DELETE FROM metadata_values WHERE file_ref = ?", (file_ref,)) + conn.execute("DELETE FROM files WHERE file_ref = ?", (file_ref,)) + + def folder_exists(self, path: str) -> bool: + path = normalize_path(path) + with self.connect() as conn: + row = conn.execute( + "SELECT 1 FROM folders WHERE path = ?", + (path,), + ).fetchone() + return row is not None + + def delete_empty_folder(self, path: str) -> bool: + path = normalize_path(path) + if path == "/": + return False + with self.connect() as conn: + folder = self._folder_by_path(conn, path) + if folder is None: + return False + has_files = conn.execute( + """ + SELECT 1 + FROM file_folders + WHERE folder_id = ? + LIMIT 1 + """, + (folder["folder_id"],), + ).fetchone() + if has_files is not None: + return False + has_children = conn.execute( + """ + SELECT 1 + FROM folders + WHERE parent_id = ? + LIMIT 1 + """, + (folder["folder_id"],), + ).fetchone() + if has_children is not None: + return False + conn.execute("DELETE FROM folders WHERE folder_id = ?", (folder["folder_id"],)) + return True + def resolve_file_ref(self, target: str) -> str: with self.connect() as conn: return self._resolve_file_ref(conn, target) @@ -1412,6 +1461,36 @@ class SQLiteFileSystemStore: ).fetchone() return int(row["count"] or 0) + def file_basename_exists_in_folder(self, path: str, basename: str) -> bool: + path = normalize_path(path) + basename = str(basename).strip() + if not basename: + return False + with self.connect() as conn: + row = conn.execute( + """ + SELECT 1 + FROM files f + JOIN file_folders ff ON ff.file_ref = f.file_ref + JOIN folders fo ON fo.folder_id = ff.folder_id + WHERE f.deleted_at IS NULL + AND fo.path = ? + AND ( + f.title = ? + OR f.source_path = ? + OR f.source_path LIKE ? ESCAPE '\\' + ) + LIMIT 1 + """, + ( + path, + basename, + basename, + "%/" + self._like_escape(basename), + ), + ).fetchone() + return row is not None + def folder_subtree_thresholds( self, path: str, diff --git a/tests/test_pifs_add_command.py b/tests/test_pifs_add_command.py new file mode 100644 index 0000000..d2b8f9c --- /dev/null +++ b/tests/test_pifs_add_command.py @@ -0,0 +1,486 @@ +import json +from pathlib import Path + +import pytest + + +class GeneratedMetadata: + def __init__(self): + self.calls = [] + + def generate(self, request, *, fields): + self.calls.append((request, list(fields))) + values = { + "summary": f"Summary for {request.title}: {request.text[:60]}", + "doc_type": "uploaded_file", + "domain": "workspace", + "topic": "pifs add", + } + return {field: values[field] for field in fields if field in values} + + +class StaticEmbedder: + def embed(self, texts): + return [[1.0, 0.0, 0.0] for _ in texts] + + +def make_summary_indexer(workspace: Path): + from pageindex.filesystem.projection_indexing import SummaryProjectionIndexer + + return SummaryProjectionIndexer( + workspace / "artifacts" / "projection_indexes", + embedder=StaticEmbedder(), + embedding_provider="test", + embedding_model="static", + embedding_dimensions=3, + ) + + +def make_filesystem(workspace: Path): + from pageindex.filesystem import PageIndexFileSystem + + return PageIndexFileSystem( + workspace=workspace, + metadata_generator=GeneratedMetadata(), + summary_projection_indexer=make_summary_indexer(workspace), + summary_projection_embedding_provider="test", + summary_projection_embedding_model="static", + summary_projection_embedding_dimensions=3, + ) + + +def write_pageindex_client_doc(workspace: Path, doc_id: str, doc: dict) -> None: + workspace.mkdir(parents=True, exist_ok=True) + (workspace / f"{doc_id}.json").write_text( + json.dumps(doc, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + meta = { + doc_id: { + "type": doc.get("type", ""), + "doc_name": doc.get("doc_name", ""), + "doc_description": doc.get("doc_description", ""), + "path": doc.get("path", ""), + "line_count": doc.get("line_count"), + } + } + (workspace / "_meta.json").write_text( + json.dumps(meta, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + + +def test_add_text_folder_target_copies_artifact_indexes_summary_and_is_readable(tmp_path): + from pageindex.filesystem import PIFSCommandExecutor + + source = tmp_path / "filing.txt" + source.write_text("alpha filing text for pifs add", encoding="utf-8") + workspace = tmp_path / "workspace" + filesystem = make_filesystem(workspace) + + info = filesystem.add_file(str(source), "/documents/reports") + + assert info["source_path"] == "documents/reports/filing.txt" + assert info["folder_path"] == "/documents/reports" + assert filesystem.folder_info("/documents/reports")["path"] == "/documents/reports" + assert info["storage_uri"] != source.as_uri() + assert "/artifacts/uploads/" in info["storage_uri"] + copied_path = Path(info["storage_uri"].removeprefix("file://")) + assert copied_path.read_text(encoding="utf-8") == "alpha filing text for pifs add" + assert copied_path.resolve() != source.resolve() + + executor = PIFSCommandExecutor(filesystem, json_output=True) + rendered = json.loads(executor.execute("cat /documents/reports/filing.txt --all")) + + assert rendered["data"]["text"] == "alpha filing text for pifs add" + assert info["metadata"]["summary"].startswith("Summary for filing.txt") + assert filesystem.summary_projection_indexer.index.info()["document_count"] == 1 + + +def test_add_rejects_same_folder_same_basename_without_overwrite(tmp_path): + from pageindex.filesystem import PIFSCommandExecutor + + source = tmp_path / "conflict.txt" + source.write_text("first body", encoding="utf-8") + filesystem = make_filesystem(tmp_path / "workspace") + + filesystem.add_file(source, "/documents") + source.write_text("second body must not overwrite", encoding="utf-8") + + with pytest.raises(FileExistsError, match="already exists"): + filesystem.add_file(source, "/documents") + + executor = PIFSCommandExecutor(filesystem, json_output=True) + rendered = json.loads(executor.execute("cat /documents/conflict.txt --all")) + assert rendered["data"]["text"] == "first body" + + +def test_add_rejects_unsupported_type_before_registration(tmp_path): + source = tmp_path / "payload.json" + source.write_text('{"unsupported": true}', encoding="utf-8") + filesystem = make_filesystem(tmp_path / "workspace") + + with pytest.raises(ValueError, match="Unsupported file type"): + filesystem.add_file(source, "/documents") + + assert filesystem.browse("/", recursive=True)["files"] == [] + assert not list((tmp_path / "workspace" / "artifacts" / "uploads").glob("**/*")) + + +def test_add_rejects_disabled_summary_projection_before_registration(tmp_path): + from pageindex.filesystem import PageIndexFileSystem + + source = tmp_path / "disabled.txt" + source.write_text("must not register without summary vector", encoding="utf-8") + workspace = tmp_path / "workspace" + filesystem = PageIndexFileSystem( + workspace=workspace, + metadata_generator=GeneratedMetadata(), + summary_projection_index=False, + ) + + with pytest.raises(RuntimeError, match="summary projection index"): + filesystem.add_file(source, "/documents") + + assert filesystem.browse("/", recursive=True)["files"] == [] + assert not list((workspace / "artifacts" / "uploads").glob("**/*")) + assert not list((workspace / "artifacts" / "text").glob("*.txt")) + assert not list((workspace / "artifacts" / "raw").glob("*.json")) + + +def test_add_configures_semantic_retrieval_in_same_filesystem_instance(tmp_path): + source = tmp_path / "semantic.txt" + source.write_text("alpha semantic recall text", encoding="utf-8") + filesystem = make_filesystem(tmp_path / "workspace") + + assert filesystem.semantic_retrieval_channels() == () + + filesystem.add_file(source, "/documents") + + assert filesystem.semantic_retrieval_channels() == ("summary",) + results = filesystem.search_semantic_channel( + "summary", + "semantic recall", + scope={"folder_path": "/documents", "recursive": True}, + limit=5, + ) + assert [result.source_path for result in results] == ["documents/semantic.txt"] + + +def test_add_markdown_builds_pageindex_tree_from_copied_artifact(tmp_path, monkeypatch): + from pageindex import PageIndexClient + from pageindex.filesystem import PIFSCommandExecutor + + indexed_paths = [] + + def fake_index(self, file_path, mode="auto"): + indexed_paths.append(Path(file_path)) + doc_id = "doc_added_md" + doc = { + "id": doc_id, + "type": "md", + "path": str(Path(file_path).resolve()), + "doc_name": "notes.md", + "doc_description": "", + "line_count": 3, + "structure": [ + { + "title": "Notes", + "node_id": "0001", + "line_num": 1, + "text": "# Notes\n\ncopied markdown body", + "nodes": [], + } + ], + } + write_pageindex_client_doc(self.workspace, doc_id, doc) + self.documents[doc_id] = doc + return doc_id + + monkeypatch.setattr(PageIndexClient, "index", fake_index) + source = tmp_path / "notes.md" + source.write_text("# Notes\n\ncopied markdown body", encoding="utf-8") + filesystem = make_filesystem(tmp_path / "workspace") + + info = filesystem.add_file(source, "/documents") + executor = PIFSCommandExecutor(filesystem, json_output=True) + structure = json.loads(executor.execute("cat /documents/notes.md --structure")) + + assert structure["data"]["available"] is True + assert structure["data"]["structure"][0]["title"] == "Notes" + assert indexed_paths == [Path(info["storage_uri"].removeprefix("file://"))] + assert indexed_paths[0].resolve() != source.resolve() + + +def test_add_failure_does_not_leave_visible_catalog_or_artifacts(tmp_path, monkeypatch): + source = tmp_path / "atomic.txt" + source.write_text("atomic body", encoding="utf-8") + workspace = tmp_path / "workspace" + filesystem = make_filesystem(workspace) + + def fail_insert(records): + raise RuntimeError("catalog insert failed") + + monkeypatch.setattr(filesystem.store, "insert_files", fail_insert) + + with pytest.raises(RuntimeError, match="catalog insert failed"): + filesystem.add_file(source, "/documents") + + assert filesystem.browse("/", recursive=True)["files"] == [] + assert filesystem.summary_projection_indexer.index.info()["document_count"] == 0 + assert not list((workspace / "artifacts" / "uploads").glob("**/*")) + assert not list((workspace / "artifacts" / "text").glob("*.txt")) + assert not list((workspace / "artifacts" / "raw").glob("*.json")) + + +def test_add_markdown_insert_failure_removes_pageindex_cache(tmp_path, monkeypatch): + from pageindex import PageIndexClient + + def fake_index(self, file_path, mode="auto"): + doc_id = "doc_failed_add_md" + doc = { + "id": doc_id, + "type": "md", + "path": str(Path(file_path).resolve()), + "doc_name": "failed.md", + "doc_description": "", + "line_count": 3, + "structure": [ + { + "title": "Failed", + "node_id": "0001", + "line_num": 1, + "text": "# Failed\n\nbody", + "nodes": [], + } + ], + } + write_pageindex_client_doc(self.workspace, doc_id, doc) + self.documents[doc_id] = doc + return doc_id + + monkeypatch.setattr(PageIndexClient, "index", fake_index) + source = tmp_path / "failed.md" + source.write_text("# Failed\n\nbody", encoding="utf-8") + workspace = tmp_path / "workspace" + filesystem = make_filesystem(workspace) + + def fail_insert(records): + raise RuntimeError("catalog insert failed") + + monkeypatch.setattr(filesystem.store, "insert_files", fail_insert) + + with pytest.raises(RuntimeError, match="catalog insert failed"): + filesystem.add_file(source, "/documents/reports") + + pageindex_workspace = workspace / "artifacts" / "pageindex_client" + assert not (pageindex_workspace / "doc_failed_add_md.json").exists() + meta_path = pageindex_workspace / "_meta.json" + if meta_path.exists(): + meta = json.loads(meta_path.read_text(encoding="utf-8")) + assert "doc_failed_add_md" not in meta + listing = filesystem.browse("/", recursive=True) + assert listing["files"] == [] + assert listing["folders"] == [] + assert filesystem.summary_projection_indexer.index.info()["document_count"] == 0 + assert not list((workspace / "artifacts" / "uploads").glob("**/*")) + assert not list((workspace / "artifacts" / "text").glob("*.txt")) + assert not list((workspace / "artifacts" / "raw").glob("*.json")) + + +def test_add_markdown_index_failure_removes_pageindex_cache_delta(tmp_path, monkeypatch): + from pageindex import PageIndexClient + + def fake_index(self, file_path, mode="auto"): + doc_id = "doc_partial_before_raise" + doc = { + "id": doc_id, + "type": "md", + "path": str(Path(file_path).resolve()), + "doc_name": "partial.md", + "doc_description": "", + "line_count": 3, + "structure": [{"title": "Partial", "node_id": "0001", "nodes": []}], + } + self.documents[doc_id] = doc + self._save_doc(doc_id) + raise RuntimeError("index failed after cache write") + + monkeypatch.setattr(PageIndexClient, "index", fake_index) + source = tmp_path / "partial.md" + source.write_text("# Partial\n\nbody", encoding="utf-8") + workspace = tmp_path / "workspace" + filesystem = make_filesystem(workspace) + pageindex_workspace = workspace / "artifacts" / "pageindex_client" + + with pytest.raises(RuntimeError, match="failed to build PageIndex tree"): + filesystem.add_file(source, "/documents/reports") + + assert not (pageindex_workspace / "doc_partial_before_raise.json").exists() + meta_path = pageindex_workspace / "_meta.json" + if meta_path.exists(): + meta = json.loads(meta_path.read_text(encoding="utf-8")) + assert "doc_partial_before_raise" not in meta + listing = filesystem.browse("/", recursive=True) + assert listing["files"] == [] + assert listing["folders"] == [] + assert filesystem.summary_projection_indexer.index.info()["document_count"] == 0 + assert not list((workspace / "artifacts" / "uploads").glob("**/*")) + assert not list((workspace / "artifacts" / "text").glob("*.txt")) + assert not list((workspace / "artifacts" / "raw").glob("*.json")) + + +def test_add_markdown_failure_preserves_unrelated_pageindex_cache(tmp_path, monkeypatch): + from pageindex import PageIndexClient + + def fake_index(self, file_path, mode="auto"): + doc_id = "doc_failed_add_md" + doc = { + "id": doc_id, + "type": "md", + "path": str(Path(file_path).resolve()), + "doc_name": "failed.md", + "doc_description": "", + "line_count": 3, + "structure": [{"title": "Failed", "node_id": "0001", "nodes": []}], + } + self.documents[doc_id] = doc + self._save_doc(doc_id) + return doc_id + + monkeypatch.setattr(PageIndexClient, "index", fake_index) + source = tmp_path / "failed.md" + source.write_text("# Failed\n\nbody", encoding="utf-8") + workspace = tmp_path / "workspace" + filesystem = make_filesystem(workspace) + pageindex_workspace = workspace / "artifacts" / "pageindex_client" + write_pageindex_client_doc( + pageindex_workspace, + "doc_unrelated", + { + "id": "doc_unrelated", + "type": "md", + "path": str((tmp_path / "unrelated.md").resolve()), + "doc_name": "unrelated.md", + "doc_description": "", + "line_count": 1, + "structure": [{"title": "Unrelated", "node_id": "0001", "nodes": []}], + }, + ) + + def fail_insert(records): + raise RuntimeError("catalog insert failed") + + monkeypatch.setattr(filesystem.store, "insert_files", fail_insert) + + with pytest.raises(RuntimeError, match="catalog insert failed"): + filesystem.add_file(source, "/documents") + + assert not (pageindex_workspace / "doc_failed_add_md.json").exists() + assert (pageindex_workspace / "doc_unrelated.json").exists() + meta = json.loads((pageindex_workspace / "_meta.json").read_text(encoding="utf-8")) + assert "doc_failed_add_md" not in meta + assert "doc_unrelated" in meta + + +def test_add_failure_after_summary_vector_rolls_back_catalog_and_vector( + tmp_path, monkeypatch +): + source = tmp_path / "post_vector.txt" + source.write_text("post vector rollback body", encoding="utf-8") + workspace = tmp_path / "workspace" + filesystem = make_filesystem(workspace) + + def fail_status_update(*args, **kwargs): + raise RuntimeError("metadata status update failed") + + monkeypatch.setattr(filesystem.store, "update_file_metadata_status", fail_status_update) + + with pytest.raises(RuntimeError, match="metadata status update failed"): + filesystem.add_file(source, "/documents") + + assert filesystem.browse("/", recursive=True)["files"] == [] + assert filesystem.summary_projection_indexer.index.info()["document_count"] == 0 + assert not list((workspace / "artifacts" / "uploads").glob("**/*")) + assert not list((workspace / "artifacts" / "text").glob("*.txt")) + assert not list((workspace / "artifacts" / "raw").glob("*.json")) + + +def test_add_failure_removes_nested_folders_created_only_for_add(tmp_path, monkeypatch): + source = tmp_path / "nested.txt" + source.write_text("nested rollback body", encoding="utf-8") + workspace = tmp_path / "workspace" + filesystem = make_filesystem(workspace) + + def fail_status_update(*args, **kwargs): + raise RuntimeError("metadata status update failed") + + monkeypatch.setattr(filesystem.store, "update_file_metadata_status", fail_status_update) + + with pytest.raises(RuntimeError, match="metadata status update failed"): + filesystem.add_file(source, "/documents/reports") + + listing = filesystem.browse("/", recursive=True) + assert listing["files"] == [] + assert listing["folders"] == [] + assert filesystem.summary_projection_indexer.index.info()["document_count"] == 0 + assert not list((workspace / "artifacts" / "uploads").glob("**/*")) + assert not list((workspace / "artifacts" / "text").glob("*.txt")) + assert not list((workspace / "artifacts" / "raw").glob("*.json")) + + +def test_add_failure_preserves_preexisting_parent_folder(tmp_path, monkeypatch): + source = tmp_path / "nested.txt" + source.write_text("nested rollback body", encoding="utf-8") + workspace = tmp_path / "workspace" + filesystem = make_filesystem(workspace) + filesystem.create_folder("/documents") + + def fail_status_update(*args, **kwargs): + raise RuntimeError("metadata status update failed") + + monkeypatch.setattr(filesystem.store, "update_file_metadata_status", fail_status_update) + + with pytest.raises(RuntimeError, match="metadata status update failed"): + filesystem.add_file(source, "/documents/reports") + + listing = filesystem.browse("/", recursive=True) + assert listing["files"] == [] + assert [folder["path"] for folder in listing["folders"]] == ["/documents"] + assert filesystem.summary_projection_indexer.index.info()["document_count"] == 0 + + +def test_cli_add_uses_workspace_and_prints_added_file(monkeypatch, capsys, tmp_path): + from pageindex.filesystem import cli + + source = tmp_path / "cli.txt" + source.write_text("cli body", encoding="utf-8") + calls = [] + + class FakeAddFileSystem: + def __init__(self, workspace): + self.workspace = Path(workspace) + + def configure_existing_projection_retrieval(self): + return False + + def add_file(self, physical_path, virtual_target): + calls.append((self.workspace, physical_path, virtual_target)) + return { + "file_ref": "file_cli", + "path": "/documents/cli.txt", + "source_path": "documents/cli.txt", + "storage_uri": "file:///workspace/artifacts/uploads/file_cli/cli.txt", + } + + monkeypatch.setattr(cli, "PageIndexFileSystem", FakeAddFileSystem) + + status = cli.main(["--workspace", str(tmp_path / "workspace"), "add", str(source), "/documents"]) + + assert status == 0 + assert calls == [(tmp_path / "workspace", str(source), "/documents")] + assert capsys.readouterr().out == ( + "added: /documents/cli.txt\n" + "file_ref: file_cli\n" + "storage_uri: file:///workspace/artifacts/uploads/file_cli/cli.txt\n" + )