From eca5edd8a9b0475ae0b061c5d3eaee5441b33971 Mon Sep 17 00:00:00 2001 From: BukeLy Date: Sun, 31 May 2026 20:58:45 +0800 Subject: [PATCH] fix(pifs): roll back add pageindex cache --- pageindex/filesystem/core.py | 60 ++++++++++++++++++ tests/test_pifs_add_command.py | 108 +++++++++++++++++++++++++++++++++ 2 files changed, 168 insertions(+) diff --git a/pageindex/filesystem/core.py b/pageindex/filesystem/core.py index c38e137..ce2c336 100644 --- a/pageindex/filesystem/core.py +++ b/pageindex/filesystem/core.py @@ -215,6 +215,7 @@ class PageIndexFileSystem: final_dir_created = False catalog_inserted = False records: list[dict[str, Any]] = [] + preexisting_pageindex_doc_ids = self._pageindex_cache_doc_ids() uploads_dir.mkdir(parents=True, exist_ok=True) with tempfile.TemporaryDirectory(prefix=f".add-{file_ref}-", dir=uploads_dir) as tmp: @@ -263,6 +264,7 @@ class PageIndexFileSystem: self._cleanup_add_catalog_record(file_ref) self._cleanup_add_summary_projection(records) self._cleanup_failed_register_artifacts(records) + self._cleanup_add_pageindex_cache(records, preexisting_pageindex_doc_ids) self._cleanup_add_created_folders(add_created_folder_paths) if final_dir_created: shutil.rmtree(final_dir, ignore_errors=True) @@ -1735,6 +1737,64 @@ class PageIndexFileSystem: except Exception: continue + def _pageindex_cache_doc_ids(self) -> set[str]: + workspace = self.pageindex_client_workspace + doc_ids = {path.stem for path in workspace.glob("*.json") if path.name != "_meta.json"} + meta_path = workspace / "_meta.json" + if not meta_path.exists(): + return doc_ids + try: + payload = json.loads(meta_path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return doc_ids + if isinstance(payload, dict): + doc_ids.update(str(doc_id) for doc_id in payload) + return doc_ids + + def _cleanup_add_pageindex_cache( + self, + records: list[dict[str, Any]], + preexisting_doc_ids: set[str], + ) -> None: + doc_ids: list[str] = [] + for record in records: + doc_id = str(record.get("pageindex_doc_id") or "").strip() + if doc_id and doc_id not in preexisting_doc_ids: + doc_ids.append(doc_id) + if not doc_ids: + return + workspace = self.pageindex_client_workspace + for doc_id in doc_ids: + try: + (workspace / f"{doc_id}.json").unlink() + except FileNotFoundError: + pass + except Exception: + continue + meta_path = workspace / "_meta.json" + if not meta_path.exists(): + return + try: + payload = json.loads(meta_path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return + if not isinstance(payload, dict): + return + changed = False + for doc_id in doc_ids: + if doc_id in payload: + payload.pop(doc_id, None) + changed = True + if not changed: + return + try: + meta_path.write_text( + json.dumps(payload, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + except OSError: + return + @staticmethod def _metadata_policy_is_batch(policy: dict[str, Any]) -> bool: return bool(policy.get("batch")) or policy.get("mode") == "batch" diff --git a/tests/test_pifs_add_command.py b/tests/test_pifs_add_command.py index f608c89..b221ae0 100644 --- a/tests/test_pifs_add_command.py +++ b/tests/test_pifs_add_command.py @@ -233,6 +233,114 @@ def test_add_failure_does_not_leave_visible_catalog_or_artifacts(tmp_path, monke assert not list((workspace / "artifacts" / "raw").glob("*.json")) +def test_add_markdown_insert_failure_removes_pageindex_cache(tmp_path, monkeypatch): + from pageindex import PageIndexClient + + def fake_index(self, file_path, mode="auto"): + doc_id = "doc_failed_add_md" + doc = { + "id": doc_id, + "type": "md", + "path": str(Path(file_path).resolve()), + "doc_name": "failed.md", + "doc_description": "", + "line_count": 3, + "structure": [ + { + "title": "Failed", + "node_id": "0001", + "line_num": 1, + "text": "# Failed\n\nbody", + "nodes": [], + } + ], + } + write_pageindex_client_doc(self.workspace, doc_id, doc) + self.documents[doc_id] = doc + return doc_id + + monkeypatch.setattr(PageIndexClient, "index", fake_index) + source = tmp_path / "failed.md" + source.write_text("# Failed\n\nbody", encoding="utf-8") + workspace = tmp_path / "workspace" + filesystem = make_filesystem(workspace) + + def fail_insert(records): + raise RuntimeError("catalog insert failed") + + monkeypatch.setattr(filesystem.store, "insert_files", fail_insert) + + with pytest.raises(RuntimeError, match="catalog insert failed"): + filesystem.add_file(source, "/documents/reports") + + pageindex_workspace = workspace / "artifacts" / "pageindex_client" + assert not (pageindex_workspace / "doc_failed_add_md.json").exists() + meta_path = pageindex_workspace / "_meta.json" + if meta_path.exists(): + meta = json.loads(meta_path.read_text(encoding="utf-8")) + assert "doc_failed_add_md" not in meta + listing = filesystem.browse("/", recursive=True) + assert listing["files"] == [] + assert listing["folders"] == [] + assert filesystem.summary_projection_indexer.index.info()["document_count"] == 0 + assert not list((workspace / "artifacts" / "uploads").glob("**/*")) + assert not list((workspace / "artifacts" / "text").glob("*.txt")) + assert not list((workspace / "artifacts" / "raw").glob("*.json")) + + +def test_add_markdown_failure_preserves_unrelated_pageindex_cache(tmp_path, monkeypatch): + from pageindex import PageIndexClient + + def fake_index(self, file_path, mode="auto"): + doc_id = "doc_failed_add_md" + doc = { + "id": doc_id, + "type": "md", + "path": str(Path(file_path).resolve()), + "doc_name": "failed.md", + "doc_description": "", + "line_count": 3, + "structure": [{"title": "Failed", "node_id": "0001", "nodes": []}], + } + self.documents[doc_id] = doc + self._save_doc(doc_id) + return doc_id + + monkeypatch.setattr(PageIndexClient, "index", fake_index) + source = tmp_path / "failed.md" + source.write_text("# Failed\n\nbody", encoding="utf-8") + workspace = tmp_path / "workspace" + filesystem = make_filesystem(workspace) + pageindex_workspace = workspace / "artifacts" / "pageindex_client" + write_pageindex_client_doc( + pageindex_workspace, + "doc_unrelated", + { + "id": "doc_unrelated", + "type": "md", + "path": str((tmp_path / "unrelated.md").resolve()), + "doc_name": "unrelated.md", + "doc_description": "", + "line_count": 1, + "structure": [{"title": "Unrelated", "node_id": "0001", "nodes": []}], + }, + ) + + def fail_insert(records): + raise RuntimeError("catalog insert failed") + + monkeypatch.setattr(filesystem.store, "insert_files", fail_insert) + + with pytest.raises(RuntimeError, match="catalog insert failed"): + filesystem.add_file(source, "/documents") + + assert not (pageindex_workspace / "doc_failed_add_md.json").exists() + assert (pageindex_workspace / "doc_unrelated.json").exists() + meta = json.loads((pageindex_workspace / "_meta.json").read_text(encoding="utf-8")) + assert "doc_failed_add_md" not in meta + assert "doc_unrelated" in meta + + def test_add_failure_after_summary_vector_rolls_back_catalog_and_vector( tmp_path, monkeypatch ):