fix(filesystem): delay register side effects until insert

Write projection and raw side effects only after a successful catalog insert, and clean owned artifacts when registration fails.
This commit is contained in:
Bukely_ 2026-05-26 20:31:13 +08:00 committed by BukeLy
parent c86d5727ed
commit d139181c86
2 changed files with 103 additions and 23 deletions

View file

@ -168,12 +168,22 @@ class PageIndexFileSystem:
def register_files(self, files: list[dict[str, Any]]) -> list[str]:
records = [self._prepare_file_record(file) for file in files]
try:
for record in records:
self._generate_register_metadata(record)
self._register_generation_policy_schema(records)
self.store.insert_files(records)
except Exception:
self._cleanup_failed_register_artifacts(records)
raise
for record in records:
self._generate_register_metadata(record)
self._complete_summary_projection_index(record)
if self._complete_summary_projection_index(record):
self.store.update_file_metadata_status(
record["file_ref"],
metadata=record["metadata"],
metadata_status=record["metadata_status"],
)
self._sync_owned_raw_artifact(record)
self._register_generation_policy_schema(records)
self.store.insert_files(records)
return [record["file_ref"] for record in records]
def batch_generate(self, *, limit: int | None = None) -> dict[str, Any]:
@ -997,22 +1007,15 @@ class PageIndexFileSystem:
folder_path = normalize_path(file.get("folder_path") or "/")
title = file.get("title") or metadata.get("title") or Path(source_path).stem
file_ref = make_file_ref(external_id or source_path)
text_artifact_path = file.get("text_artifact_path") or self.store.write_text_artifact(
file_ref,
artifact_content,
)
text_artifact_path = file.get("text_artifact_path")
owns_text_artifact = text_artifact_path is None
if text_artifact_path is None:
text_artifact_path = self.store.write_text_artifact(file_ref, artifact_content)
raw_artifact_path = file.get("raw_artifact_path")
owns_raw_artifact = False
if raw_artifact_path is None and file.get("write_raw_artifact", True):
raw_artifact_path = self.store.write_raw_artifact(
file_ref,
self._raw_artifact_payload(
storage_uri=storage_uri,
source_path=source_path,
folder_path=folder_path,
metadata=metadata,
metadata_status=metadata_status,
),
)
raw_artifact_path = self.store.raw_dir / f"{file_ref}.json"
owns_raw_artifact = True
descriptor = self._build_descriptor(title, metadata)
return {
"file_ref": file_ref,
@ -1037,6 +1040,8 @@ class PageIndexFileSystem:
"folder_path": folder_path,
"content": fts_content,
"skip_fts": bool(file.get("skip_fts", False)),
"_pifs_owned_text_artifact": owns_text_artifact,
"_pifs_owned_raw_artifact": owns_raw_artifact,
}
def _registration_text_artifact_content(
@ -1234,29 +1239,44 @@ class PageIndexFileSystem:
}
self._refresh_record_metadata_status(record)
def _complete_summary_projection_index(self, record: dict[str, Any]) -> None:
def _complete_summary_projection_index(self, record: dict[str, Any]) -> bool:
metadata_status = record["metadata_status"]
summary_index = metadata_status.get("projection_indexes", {}).get("summary")
if not summary_index or not summary_index.get("requested"):
return
return False
summary = str(record.get("metadata", {}).get("summary") or "").strip()
if not summary:
return
return False
if self.summary_projection_indexer is None:
self._refresh_record_metadata_status(record)
return
return True
try:
result = self.summary_projection_indexer.upsert_summary(record)
except Exception as exc:
summary_index["status"] = "failed"
summary_index["error"] = str(exc)
self._refresh_record_metadata_status(record)
return
return True
summary_index.clear()
summary_index.update({"requested": True, **result})
if summary_index.get("status") != "ready":
summary_index["status"] = "ready"
self._refresh_record_metadata_status(record)
return True
@staticmethod
def _unlink_artifact(path: Any) -> None:
try:
Path(path).unlink()
except FileNotFoundError:
return
def _cleanup_failed_register_artifacts(self, records: list[dict[str, Any]]) -> None:
for record in records:
if record.get("_pifs_owned_text_artifact"):
self._unlink_artifact(record["text_artifact_path"])
if record.get("_pifs_owned_raw_artifact") and record.get("raw_artifact_path"):
self._unlink_artifact(record["raw_artifact_path"])
@staticmethod
def _metadata_policy_is_batch(policy: dict[str, Any]) -> bool:

View file

@ -0,0 +1,60 @@
from pathlib import Path
import pytest
class SummaryGenerator:
def generate(self, document, *, fields):
return {field: "Generated registration summary." for field in fields}
class RecordingSummaryIndexer:
def __init__(self):
self.upserted = []
def upsert_summary(self, record):
self.upserted.append(dict(record))
return {"status": "ready"}
def test_register_insert_failure_cleans_owned_artifacts_and_skips_projection(
tmp_path: Path, monkeypatch
):
from pageindex.filesystem import PageIndexFileSystem
workspace = tmp_path / "workspace"
source = tmp_path / "source.txt"
source.write_text("Plain text content for registration.", encoding="utf-8")
indexer = RecordingSummaryIndexer()
filesystem = PageIndexFileSystem(
workspace=workspace,
metadata_generator=SummaryGenerator(),
summary_projection_indexer=indexer,
)
def fail_insert(records):
raise RuntimeError("catalog insert failed")
monkeypatch.setattr(filesystem.store, "insert_files", fail_insert)
with pytest.raises(RuntimeError, match="catalog insert failed"):
filesystem.register_file(
storage_uri=source.as_uri(),
source_path="docs/source.txt",
folder_path="/documents",
external_id="doc_insert_failure",
title="Insert failure",
content=source.read_text(encoding="utf-8"),
metadata_policy={
"fields": {
"summary": True,
"doc_type": False,
"domain": False,
"topic": False,
}
},
)
assert indexer.upserted == []
assert list((workspace / "artifacts" / "raw").glob("*.json")) == []
assert list((workspace / "artifacts" / "text").glob("*.txt")) == []