diff --git a/pageindex/filesystem/core.py b/pageindex/filesystem/core.py index da86dd5..5c963b2 100644 --- a/pageindex/filesystem/core.py +++ b/pageindex/filesystem/core.py @@ -203,6 +203,8 @@ class PageIndexFileSystem: ) if self.store.file_basename_exists_in_folder(folder_path, filename): raise FileExistsError(f"File already exists at {virtual_path}") + if not self.summary_projection_index: + raise RuntimeError("pifs add requires the summary projection index") self._ensure_add_completion_defaults() file_ref = make_file_ref(virtual_path.strip("/")) @@ -210,6 +212,7 @@ class PageIndexFileSystem: final_dir = uploads_dir / file_ref final_path = final_dir / filename final_dir_created = False + catalog_inserted = False records: list[dict[str, Any]] = [] uploads_dir.mkdir(parents=True, exist_ok=True) @@ -242,12 +245,22 @@ class PageIndexFileSystem: self._require_add_pageindex_ready(record) self._generate_register_metadata(record) self._require_add_metadata_ready(record) - self._complete_summary_projection_index(record) - self._require_add_summary_projection_ready(record) self._register_generation_policy_schema(records) - self._sync_owned_raw_artifact(record) self.store.insert_files(records) + catalog_inserted = True + if self._complete_summary_projection_index(record): + self.store.update_file_metadata_status( + record["file_ref"], + metadata=record["metadata"], + metadata_status=record["metadata_status"], + ) + self._require_add_summary_projection_ready(record) + self._sync_owned_raw_artifact(record) + self._ensure_add_semantic_retrieval_ready() except Exception: + if catalog_inserted: + self._cleanup_add_catalog_record(file_ref) + self._cleanup_add_summary_projection(records) self._cleanup_failed_register_artifacts(records) if final_dir_created: shutil.rmtree(final_dir, ignore_errors=True) @@ -355,6 +368,63 @@ class PageIndexFileSystem: embedding_timeout=self.summary_projection_embedding_timeout, ) + def _ensure_add_semantic_retrieval_ready(self) -> None: + indexer = self.summary_projection_indexer + if indexer is None: + raise RuntimeError("pifs add requires a summary projection indexer") + from .hybrid_projection import HybridProjectionSearchBackend + + index_dir = Path(getattr(indexer, "index_dir", self.summary_projection_index_dir)) + embedder = getattr(indexer, "embedder", None) + if embedder is None: + self.configure_hybrid_projection_retrieval( + index_dir, + embedding_provider=str( + getattr( + indexer, + "embedding_provider", + self.summary_projection_embedding_provider, + ) + ), + embedding_model=str( + getattr(indexer, "embedding_model", self.summary_projection_embedding_model) + ), + embedding_dimensions=int( + getattr( + indexer, + "embedding_dimensions", + self.summary_projection_embedding_dimensions, + ) + ), + embedding_timeout=self.summary_projection_embedding_timeout, + ) + else: + embedding_cache = getattr(indexer, "embedding_cache", None) + self.semantic_retrieval_backend = HybridProjectionSearchBackend( + index_dir, + embedder=embedder, + embedding_provider=str( + getattr( + indexer, + "embedding_provider", + self.summary_projection_embedding_provider, + ) + ), + embedding_model=str( + getattr(indexer, "embedding_model", self.summary_projection_embedding_model) + ), + embedding_dimensions=int( + getattr( + indexer, + "embedding_dimensions", + self.summary_projection_embedding_dimensions, + ) + ), + embedding_cache_path=getattr(embedding_cache, "db_path", None), + ) + if "summary" not in self.semantic_retrieval_channels(): + raise RuntimeError("pifs add failed to configure summary semantic retrieval") + def configure_existing_projection_retrieval(self) -> bool: """Attach semantic retrieval to already-built projection indexes. @@ -1615,6 +1685,32 @@ class PageIndexFileSystem: if record.get("_pifs_owned_raw_artifact") and record.get("raw_artifact_path"): self._unlink_artifact(record["raw_artifact_path"]) + def _cleanup_add_catalog_record(self, file_ref: str) -> None: + try: + self.store.delete_file(file_ref) + except Exception: + return + + def _cleanup_add_summary_projection(self, records: list[dict[str, Any]]) -> None: + indexer = self.summary_projection_indexer + if indexer is None: + return + delete_summary = getattr(indexer, "delete_summary", None) + for record in records: + file_ref = str(record.get("file_ref") or "") + if not file_ref: + continue + try: + if callable(delete_summary): + delete_summary(file_ref) + continue + index = getattr(indexer, "index", None) + delete_file_refs = getattr(index, "delete_file_refs", None) + if callable(delete_file_refs): + delete_file_refs([file_ref]) + except Exception: + continue + @staticmethod def _metadata_policy_is_batch(policy: dict[str, Any]) -> bool: return bool(policy.get("batch")) or policy.get("mode") == "batch" diff --git a/pageindex/filesystem/projection_indexing.py b/pageindex/filesystem/projection_indexing.py index 63802d5..06a1a93 100644 --- a/pageindex/filesystem/projection_indexing.py +++ b/pageindex/filesystem/projection_indexing.py @@ -104,6 +104,9 @@ class SummaryProjectionIndexer: "embedding_dimensions": self.embedding_dimensions, } + def delete_summary(self, file_ref: str) -> int: + return self.index.delete_file_refs([file_ref]) + def _ensure_index(self) -> None: if not self.index.db_path.exists(): self.index.reset( diff --git a/pageindex/filesystem/semantic_index.py b/pageindex/filesystem/semantic_index.py index 4a29551..5b3e393 100644 --- a/pageindex/filesystem/semantic_index.py +++ b/pageindex/filesystem/semantic_index.py @@ -146,6 +146,35 @@ class SQLiteVecSemanticIndex: conn.commit() return inserted + def delete_file_refs(self, file_refs: list[str]) -> int: + refs = [str(file_ref) for file_ref in file_refs if str(file_ref)] + if not refs: + return 0 + placeholders = ", ".join("?" for _ in refs) + with self.connect() as conn: + rows = conn.execute( + f""" + SELECT rowid + FROM semantic_index_docs + WHERE file_ref IN ({placeholders}) + """, + refs, + ).fetchall() + rowids = [int(row["rowid"]) for row in rows] + if not rowids: + return 0 + rowid_placeholders = ", ".join("?" for _ in rowids) + conn.execute( + f"DELETE FROM semantic_index_vec WHERE rowid IN ({rowid_placeholders})", + rowids, + ) + conn.execute( + f"DELETE FROM semantic_index_docs WHERE rowid IN ({rowid_placeholders})", + rowids, + ) + conn.commit() + return len(rowids) + def search( self, vector: list[float], diff --git a/pageindex/filesystem/store.py b/pageindex/filesystem/store.py index bb01aa3..347c181 100644 --- a/pageindex/filesystem/store.py +++ b/pageindex/filesystem/store.py @@ -1056,6 +1056,13 @@ class SQLiteFileSystemStore: (metadata_text_value, file_ref), ) + def delete_file(self, target: str) -> None: + with self.connect() as conn: + file_ref = self._resolve_file_ref(conn, target) + conn.execute("DELETE FROM file_fts WHERE file_ref = ?", (file_ref,)) + conn.execute("DELETE FROM metadata_values WHERE file_ref = ?", (file_ref,)) + conn.execute("DELETE FROM files WHERE file_ref = ?", (file_ref,)) + def resolve_file_ref(self, target: str) -> str: with self.connect() as conn: return self._resolve_file_ref(conn, target) diff --git a/tests/test_pifs_add_command.py b/tests/test_pifs_add_command.py index 88e94f0..ac73c96 100644 --- a/tests/test_pifs_add_command.py +++ b/tests/test_pifs_add_command.py @@ -19,13 +19,34 @@ class GeneratedMetadata: return {field: values[field] for field in fields if field in values} -class RecordingSummaryIndexer: - def __init__(self): - self.upserted = [] +class StaticEmbedder: + def embed(self, texts): + return [[1.0, 0.0, 0.0] for _ in texts] - def upsert_summary(self, record): - self.upserted.append(dict(record)) - return {"status": "ready", "indexed_rows": 1} + +def make_summary_indexer(workspace: Path): + from pageindex.filesystem.projection_indexing import SummaryProjectionIndexer + + return SummaryProjectionIndexer( + workspace / "artifacts" / "projection_indexes", + embedder=StaticEmbedder(), + embedding_provider="test", + embedding_model="static", + embedding_dimensions=3, + ) + + +def make_filesystem(workspace: Path): + from pageindex.filesystem import PageIndexFileSystem + + return PageIndexFileSystem( + workspace=workspace, + metadata_generator=GeneratedMetadata(), + summary_projection_indexer=make_summary_indexer(workspace), + summary_projection_embedding_provider="test", + summary_projection_embedding_model="static", + summary_projection_embedding_dimensions=3, + ) def write_pageindex_client_doc(workspace: Path, doc_id: str, doc: dict) -> None: @@ -50,16 +71,12 @@ def write_pageindex_client_doc(workspace: Path, doc_id: str, doc: dict) -> None: def test_add_text_folder_target_copies_artifact_indexes_summary_and_is_readable(tmp_path): - from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem + from pageindex.filesystem import PIFSCommandExecutor source = tmp_path / "filing.txt" source.write_text("alpha filing text for pifs add", encoding="utf-8") - indexer = RecordingSummaryIndexer() - filesystem = PageIndexFileSystem( - workspace=tmp_path / "workspace", - metadata_generator=GeneratedMetadata(), - summary_projection_indexer=indexer, - ) + workspace = tmp_path / "workspace" + filesystem = make_filesystem(workspace) info = filesystem.add_file(str(source), "/documents/reports") @@ -77,20 +94,15 @@ def test_add_text_folder_target_copies_artifact_indexes_summary_and_is_readable( assert rendered["data"]["text"] == "alpha filing text for pifs add" assert info["metadata"]["summary"].startswith("Summary for filing.txt") - assert indexer.upserted[0]["file_ref"] == info["file_ref"] - assert indexer.upserted[0]["metadata"]["summary"] == info["metadata"]["summary"] + assert filesystem.summary_projection_indexer.index.info()["document_count"] == 1 def test_add_rejects_same_folder_same_basename_without_overwrite(tmp_path): - from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem + from pageindex.filesystem import PIFSCommandExecutor source = tmp_path / "conflict.txt" source.write_text("first body", encoding="utf-8") - filesystem = PageIndexFileSystem( - workspace=tmp_path / "workspace", - metadata_generator=GeneratedMetadata(), - summary_projection_indexer=RecordingSummaryIndexer(), - ) + filesystem = make_filesystem(tmp_path / "workspace") filesystem.add_file(source, "/documents") source.write_text("second body must not overwrite", encoding="utf-8") @@ -104,15 +116,9 @@ def test_add_rejects_same_folder_same_basename_without_overwrite(tmp_path): def test_add_rejects_unsupported_type_before_registration(tmp_path): - from pageindex.filesystem import PageIndexFileSystem - source = tmp_path / "payload.json" source.write_text('{"unsupported": true}', encoding="utf-8") - filesystem = PageIndexFileSystem( - workspace=tmp_path / "workspace", - metadata_generator=GeneratedMetadata(), - summary_projection_indexer=RecordingSummaryIndexer(), - ) + filesystem = make_filesystem(tmp_path / "workspace") with pytest.raises(ValueError, match="Unsupported file type"): filesystem.add_file(source, "/documents") @@ -121,9 +127,49 @@ def test_add_rejects_unsupported_type_before_registration(tmp_path): assert not list((tmp_path / "workspace" / "artifacts" / "uploads").glob("**/*")) +def test_add_rejects_disabled_summary_projection_before_registration(tmp_path): + from pageindex.filesystem import PageIndexFileSystem + + source = tmp_path / "disabled.txt" + source.write_text("must not register without summary vector", encoding="utf-8") + workspace = tmp_path / "workspace" + filesystem = PageIndexFileSystem( + workspace=workspace, + metadata_generator=GeneratedMetadata(), + summary_projection_index=False, + ) + + with pytest.raises(RuntimeError, match="summary projection index"): + filesystem.add_file(source, "/documents") + + assert filesystem.browse("/", recursive=True)["files"] == [] + assert not list((workspace / "artifacts" / "uploads").glob("**/*")) + assert not list((workspace / "artifacts" / "text").glob("*.txt")) + assert not list((workspace / "artifacts" / "raw").glob("*.json")) + + +def test_add_configures_semantic_retrieval_in_same_filesystem_instance(tmp_path): + source = tmp_path / "semantic.txt" + source.write_text("alpha semantic recall text", encoding="utf-8") + filesystem = make_filesystem(tmp_path / "workspace") + + assert filesystem.semantic_retrieval_channels() == () + + filesystem.add_file(source, "/documents") + + assert filesystem.semantic_retrieval_channels() == ("summary",) + results = filesystem.search_semantic_channel( + "summary", + "semantic recall", + scope={"folder_path": "/documents", "recursive": True}, + limit=5, + ) + assert [result.source_path for result in results] == ["documents/semantic.txt"] + + def test_add_markdown_builds_pageindex_tree_from_copied_artifact(tmp_path, monkeypatch): from pageindex import PageIndexClient - from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem + from pageindex.filesystem import PIFSCommandExecutor indexed_paths = [] @@ -154,11 +200,7 @@ def test_add_markdown_builds_pageindex_tree_from_copied_artifact(tmp_path, monke monkeypatch.setattr(PageIndexClient, "index", fake_index) source = tmp_path / "notes.md" source.write_text("# Notes\n\ncopied markdown body", encoding="utf-8") - filesystem = PageIndexFileSystem( - workspace=tmp_path / "workspace", - metadata_generator=GeneratedMetadata(), - summary_projection_indexer=RecordingSummaryIndexer(), - ) + filesystem = make_filesystem(tmp_path / "workspace") info = filesystem.add_file(source, "/documents") executor = PIFSCommandExecutor(filesystem, json_output=True) @@ -171,15 +213,10 @@ def test_add_markdown_builds_pageindex_tree_from_copied_artifact(tmp_path, monke def test_add_failure_does_not_leave_visible_catalog_or_artifacts(tmp_path, monkeypatch): - from pageindex.filesystem import PageIndexFileSystem - source = tmp_path / "atomic.txt" source.write_text("atomic body", encoding="utf-8") - filesystem = PageIndexFileSystem( - workspace=tmp_path / "workspace", - metadata_generator=GeneratedMetadata(), - summary_projection_indexer=RecordingSummaryIndexer(), - ) + workspace = tmp_path / "workspace" + filesystem = make_filesystem(workspace) def fail_insert(records): raise RuntimeError("catalog insert failed") @@ -190,9 +227,33 @@ def test_add_failure_does_not_leave_visible_catalog_or_artifacts(tmp_path, monke filesystem.add_file(source, "/documents") assert filesystem.browse("/", recursive=True)["files"] == [] - assert not list((tmp_path / "workspace" / "artifacts" / "uploads").glob("**/*")) - assert not list((tmp_path / "workspace" / "artifacts" / "text").glob("*.txt")) - assert not list((tmp_path / "workspace" / "artifacts" / "raw").glob("*.json")) + assert filesystem.summary_projection_indexer.index.info()["document_count"] == 0 + assert not list((workspace / "artifacts" / "uploads").glob("**/*")) + assert not list((workspace / "artifacts" / "text").glob("*.txt")) + assert not list((workspace / "artifacts" / "raw").glob("*.json")) + + +def test_add_failure_after_summary_vector_rolls_back_catalog_and_vector( + tmp_path, monkeypatch +): + source = tmp_path / "post_vector.txt" + source.write_text("post vector rollback body", encoding="utf-8") + workspace = tmp_path / "workspace" + filesystem = make_filesystem(workspace) + + def fail_status_update(*args, **kwargs): + raise RuntimeError("metadata status update failed") + + monkeypatch.setattr(filesystem.store, "update_file_metadata_status", fail_status_update) + + with pytest.raises(RuntimeError, match="metadata status update failed"): + filesystem.add_file(source, "/documents") + + assert filesystem.browse("/", recursive=True)["files"] == [] + assert filesystem.summary_projection_indexer.index.info()["document_count"] == 0 + assert not list((workspace / "artifacts" / "uploads").glob("**/*")) + assert not list((workspace / "artifacts" / "text").glob("*.txt")) + assert not list((workspace / "artifacts" / "raw").glob("*.json")) def test_cli_add_uses_workspace_and_prints_added_file(monkeypatch, capsys, tmp_path):