From d139181c86fd92a7768dbd7f294250e0ae345a4c Mon Sep 17 00:00:00 2001 From: Bukely_ Date: Tue, 26 May 2026 20:31:13 +0800 Subject: [PATCH] fix(filesystem): delay register side effects until insert Write projection and raw side effects only after a successful catalog insert, and clean owned artifacts when registration fails. --- pageindex/filesystem/core.py | 66 +++++++++++++++--------- tests/test_pifs_register_side_effects.py | 60 +++++++++++++++++++++ 2 files changed, 103 insertions(+), 23 deletions(-) create mode 100644 tests/test_pifs_register_side_effects.py diff --git a/pageindex/filesystem/core.py b/pageindex/filesystem/core.py index e2c6136..72833b7 100644 --- a/pageindex/filesystem/core.py +++ b/pageindex/filesystem/core.py @@ -168,12 +168,22 @@ class PageIndexFileSystem: def register_files(self, files: list[dict[str, Any]]) -> list[str]: records = [self._prepare_file_record(file) for file in files] + try: + for record in records: + self._generate_register_metadata(record) + self._register_generation_policy_schema(records) + self.store.insert_files(records) + except Exception: + self._cleanup_failed_register_artifacts(records) + raise for record in records: - self._generate_register_metadata(record) - self._complete_summary_projection_index(record) + if self._complete_summary_projection_index(record): + self.store.update_file_metadata_status( + record["file_ref"], + metadata=record["metadata"], + metadata_status=record["metadata_status"], + ) self._sync_owned_raw_artifact(record) - self._register_generation_policy_schema(records) - self.store.insert_files(records) return [record["file_ref"] for record in records] def batch_generate(self, *, limit: int | None = None) -> dict[str, Any]: @@ -997,22 +1007,15 @@ class PageIndexFileSystem: folder_path = normalize_path(file.get("folder_path") or "/") title = file.get("title") or metadata.get("title") or Path(source_path).stem file_ref = make_file_ref(external_id or source_path) - text_artifact_path = file.get("text_artifact_path") or self.store.write_text_artifact( - file_ref, - artifact_content, - ) + text_artifact_path = file.get("text_artifact_path") + owns_text_artifact = text_artifact_path is None + if text_artifact_path is None: + text_artifact_path = self.store.write_text_artifact(file_ref, artifact_content) raw_artifact_path = file.get("raw_artifact_path") + owns_raw_artifact = False if raw_artifact_path is None and file.get("write_raw_artifact", True): - raw_artifact_path = self.store.write_raw_artifact( - file_ref, - self._raw_artifact_payload( - storage_uri=storage_uri, - source_path=source_path, - folder_path=folder_path, - metadata=metadata, - metadata_status=metadata_status, - ), - ) + raw_artifact_path = self.store.raw_dir / f"{file_ref}.json" + owns_raw_artifact = True descriptor = self._build_descriptor(title, metadata) return { "file_ref": file_ref, @@ -1037,6 +1040,8 @@ class PageIndexFileSystem: "folder_path": folder_path, "content": fts_content, "skip_fts": bool(file.get("skip_fts", False)), + "_pifs_owned_text_artifact": owns_text_artifact, + "_pifs_owned_raw_artifact": owns_raw_artifact, } def _registration_text_artifact_content( @@ -1234,29 +1239,44 @@ class PageIndexFileSystem: } self._refresh_record_metadata_status(record) - def _complete_summary_projection_index(self, record: dict[str, Any]) -> None: + def _complete_summary_projection_index(self, record: dict[str, Any]) -> bool: metadata_status = record["metadata_status"] summary_index = metadata_status.get("projection_indexes", {}).get("summary") if not summary_index or not summary_index.get("requested"): - return + return False summary = str(record.get("metadata", {}).get("summary") or "").strip() if not summary: - return + return False if self.summary_projection_indexer is None: self._refresh_record_metadata_status(record) - return + return True try: result = self.summary_projection_indexer.upsert_summary(record) except Exception as exc: summary_index["status"] = "failed" summary_index["error"] = str(exc) self._refresh_record_metadata_status(record) - return + return True summary_index.clear() summary_index.update({"requested": True, **result}) if summary_index.get("status") != "ready": summary_index["status"] = "ready" self._refresh_record_metadata_status(record) + return True + + @staticmethod + def _unlink_artifact(path: Any) -> None: + try: + Path(path).unlink() + except FileNotFoundError: + return + + def _cleanup_failed_register_artifacts(self, records: list[dict[str, Any]]) -> None: + for record in records: + if record.get("_pifs_owned_text_artifact"): + self._unlink_artifact(record["text_artifact_path"]) + if record.get("_pifs_owned_raw_artifact") and record.get("raw_artifact_path"): + self._unlink_artifact(record["raw_artifact_path"]) @staticmethod def _metadata_policy_is_batch(policy: dict[str, Any]) -> bool: diff --git a/tests/test_pifs_register_side_effects.py b/tests/test_pifs_register_side_effects.py new file mode 100644 index 0000000..867dd6b --- /dev/null +++ b/tests/test_pifs_register_side_effects.py @@ -0,0 +1,60 @@ +from pathlib import Path + +import pytest + + +class SummaryGenerator: + def generate(self, document, *, fields): + return {field: "Generated registration summary." for field in fields} + + +class RecordingSummaryIndexer: + def __init__(self): + self.upserted = [] + + def upsert_summary(self, record): + self.upserted.append(dict(record)) + return {"status": "ready"} + + +def test_register_insert_failure_cleans_owned_artifacts_and_skips_projection( + tmp_path: Path, monkeypatch +): + from pageindex.filesystem import PageIndexFileSystem + + workspace = tmp_path / "workspace" + source = tmp_path / "source.txt" + source.write_text("Plain text content for registration.", encoding="utf-8") + indexer = RecordingSummaryIndexer() + filesystem = PageIndexFileSystem( + workspace=workspace, + metadata_generator=SummaryGenerator(), + summary_projection_indexer=indexer, + ) + + def fail_insert(records): + raise RuntimeError("catalog insert failed") + + monkeypatch.setattr(filesystem.store, "insert_files", fail_insert) + + with pytest.raises(RuntimeError, match="catalog insert failed"): + filesystem.register_file( + storage_uri=source.as_uri(), + source_path="docs/source.txt", + folder_path="/documents", + external_id="doc_insert_failure", + title="Insert failure", + content=source.read_text(encoding="utf-8"), + metadata_policy={ + "fields": { + "summary": True, + "doc_type": False, + "domain": False, + "topic": False, + } + }, + ) + + assert indexer.upserted == [] + assert list((workspace / "artifacts" / "raw").glob("*.json")) == [] + assert list((workspace / "artifacts" / "text").glob("*.txt")) == []