mirror of
https://github.com/VectifyAI/PageIndex.git
synced 2026-06-27 20:29:41 +02:00
fix(pifs): roll back add pageindex cache
This commit is contained in:
parent
8cdddb5e5b
commit
eca5edd8a9
2 changed files with 168 additions and 0 deletions
|
|
@ -215,6 +215,7 @@ class PageIndexFileSystem:
|
||||||
final_dir_created = False
|
final_dir_created = False
|
||||||
catalog_inserted = False
|
catalog_inserted = False
|
||||||
records: list[dict[str, Any]] = []
|
records: list[dict[str, Any]] = []
|
||||||
|
preexisting_pageindex_doc_ids = self._pageindex_cache_doc_ids()
|
||||||
|
|
||||||
uploads_dir.mkdir(parents=True, exist_ok=True)
|
uploads_dir.mkdir(parents=True, exist_ok=True)
|
||||||
with tempfile.TemporaryDirectory(prefix=f".add-{file_ref}-", dir=uploads_dir) as tmp:
|
with tempfile.TemporaryDirectory(prefix=f".add-{file_ref}-", dir=uploads_dir) as tmp:
|
||||||
|
|
@ -263,6 +264,7 @@ class PageIndexFileSystem:
|
||||||
self._cleanup_add_catalog_record(file_ref)
|
self._cleanup_add_catalog_record(file_ref)
|
||||||
self._cleanup_add_summary_projection(records)
|
self._cleanup_add_summary_projection(records)
|
||||||
self._cleanup_failed_register_artifacts(records)
|
self._cleanup_failed_register_artifacts(records)
|
||||||
|
self._cleanup_add_pageindex_cache(records, preexisting_pageindex_doc_ids)
|
||||||
self._cleanup_add_created_folders(add_created_folder_paths)
|
self._cleanup_add_created_folders(add_created_folder_paths)
|
||||||
if final_dir_created:
|
if final_dir_created:
|
||||||
shutil.rmtree(final_dir, ignore_errors=True)
|
shutil.rmtree(final_dir, ignore_errors=True)
|
||||||
|
|
@ -1735,6 +1737,64 @@ class PageIndexFileSystem:
|
||||||
except Exception:
|
except Exception:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
def _pageindex_cache_doc_ids(self) -> set[str]:
|
||||||
|
workspace = self.pageindex_client_workspace
|
||||||
|
doc_ids = {path.stem for path in workspace.glob("*.json") if path.name != "_meta.json"}
|
||||||
|
meta_path = workspace / "_meta.json"
|
||||||
|
if not meta_path.exists():
|
||||||
|
return doc_ids
|
||||||
|
try:
|
||||||
|
payload = json.loads(meta_path.read_text(encoding="utf-8"))
|
||||||
|
except (OSError, json.JSONDecodeError):
|
||||||
|
return doc_ids
|
||||||
|
if isinstance(payload, dict):
|
||||||
|
doc_ids.update(str(doc_id) for doc_id in payload)
|
||||||
|
return doc_ids
|
||||||
|
|
||||||
|
def _cleanup_add_pageindex_cache(
|
||||||
|
self,
|
||||||
|
records: list[dict[str, Any]],
|
||||||
|
preexisting_doc_ids: set[str],
|
||||||
|
) -> None:
|
||||||
|
doc_ids: list[str] = []
|
||||||
|
for record in records:
|
||||||
|
doc_id = str(record.get("pageindex_doc_id") or "").strip()
|
||||||
|
if doc_id and doc_id not in preexisting_doc_ids:
|
||||||
|
doc_ids.append(doc_id)
|
||||||
|
if not doc_ids:
|
||||||
|
return
|
||||||
|
workspace = self.pageindex_client_workspace
|
||||||
|
for doc_id in doc_ids:
|
||||||
|
try:
|
||||||
|
(workspace / f"{doc_id}.json").unlink()
|
||||||
|
except FileNotFoundError:
|
||||||
|
pass
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
meta_path = workspace / "_meta.json"
|
||||||
|
if not meta_path.exists():
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
payload = json.loads(meta_path.read_text(encoding="utf-8"))
|
||||||
|
except (OSError, json.JSONDecodeError):
|
||||||
|
return
|
||||||
|
if not isinstance(payload, dict):
|
||||||
|
return
|
||||||
|
changed = False
|
||||||
|
for doc_id in doc_ids:
|
||||||
|
if doc_id in payload:
|
||||||
|
payload.pop(doc_id, None)
|
||||||
|
changed = True
|
||||||
|
if not changed:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
meta_path.write_text(
|
||||||
|
json.dumps(payload, ensure_ascii=False, indent=2),
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
except OSError:
|
||||||
|
return
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _metadata_policy_is_batch(policy: dict[str, Any]) -> bool:
|
def _metadata_policy_is_batch(policy: dict[str, Any]) -> bool:
|
||||||
return bool(policy.get("batch")) or policy.get("mode") == "batch"
|
return bool(policy.get("batch")) or policy.get("mode") == "batch"
|
||||||
|
|
|
||||||
|
|
@ -233,6 +233,114 @@ def test_add_failure_does_not_leave_visible_catalog_or_artifacts(tmp_path, monke
|
||||||
assert not list((workspace / "artifacts" / "raw").glob("*.json"))
|
assert not list((workspace / "artifacts" / "raw").glob("*.json"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_markdown_insert_failure_removes_pageindex_cache(tmp_path, monkeypatch):
|
||||||
|
from pageindex import PageIndexClient
|
||||||
|
|
||||||
|
def fake_index(self, file_path, mode="auto"):
|
||||||
|
doc_id = "doc_failed_add_md"
|
||||||
|
doc = {
|
||||||
|
"id": doc_id,
|
||||||
|
"type": "md",
|
||||||
|
"path": str(Path(file_path).resolve()),
|
||||||
|
"doc_name": "failed.md",
|
||||||
|
"doc_description": "",
|
||||||
|
"line_count": 3,
|
||||||
|
"structure": [
|
||||||
|
{
|
||||||
|
"title": "Failed",
|
||||||
|
"node_id": "0001",
|
||||||
|
"line_num": 1,
|
||||||
|
"text": "# Failed\n\nbody",
|
||||||
|
"nodes": [],
|
||||||
|
}
|
||||||
|
],
|
||||||
|
}
|
||||||
|
write_pageindex_client_doc(self.workspace, doc_id, doc)
|
||||||
|
self.documents[doc_id] = doc
|
||||||
|
return doc_id
|
||||||
|
|
||||||
|
monkeypatch.setattr(PageIndexClient, "index", fake_index)
|
||||||
|
source = tmp_path / "failed.md"
|
||||||
|
source.write_text("# Failed\n\nbody", encoding="utf-8")
|
||||||
|
workspace = tmp_path / "workspace"
|
||||||
|
filesystem = make_filesystem(workspace)
|
||||||
|
|
||||||
|
def fail_insert(records):
|
||||||
|
raise RuntimeError("catalog insert failed")
|
||||||
|
|
||||||
|
monkeypatch.setattr(filesystem.store, "insert_files", fail_insert)
|
||||||
|
|
||||||
|
with pytest.raises(RuntimeError, match="catalog insert failed"):
|
||||||
|
filesystem.add_file(source, "/documents/reports")
|
||||||
|
|
||||||
|
pageindex_workspace = workspace / "artifacts" / "pageindex_client"
|
||||||
|
assert not (pageindex_workspace / "doc_failed_add_md.json").exists()
|
||||||
|
meta_path = pageindex_workspace / "_meta.json"
|
||||||
|
if meta_path.exists():
|
||||||
|
meta = json.loads(meta_path.read_text(encoding="utf-8"))
|
||||||
|
assert "doc_failed_add_md" not in meta
|
||||||
|
listing = filesystem.browse("/", recursive=True)
|
||||||
|
assert listing["files"] == []
|
||||||
|
assert listing["folders"] == []
|
||||||
|
assert filesystem.summary_projection_indexer.index.info()["document_count"] == 0
|
||||||
|
assert not list((workspace / "artifacts" / "uploads").glob("**/*"))
|
||||||
|
assert not list((workspace / "artifacts" / "text").glob("*.txt"))
|
||||||
|
assert not list((workspace / "artifacts" / "raw").glob("*.json"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_markdown_failure_preserves_unrelated_pageindex_cache(tmp_path, monkeypatch):
|
||||||
|
from pageindex import PageIndexClient
|
||||||
|
|
||||||
|
def fake_index(self, file_path, mode="auto"):
|
||||||
|
doc_id = "doc_failed_add_md"
|
||||||
|
doc = {
|
||||||
|
"id": doc_id,
|
||||||
|
"type": "md",
|
||||||
|
"path": str(Path(file_path).resolve()),
|
||||||
|
"doc_name": "failed.md",
|
||||||
|
"doc_description": "",
|
||||||
|
"line_count": 3,
|
||||||
|
"structure": [{"title": "Failed", "node_id": "0001", "nodes": []}],
|
||||||
|
}
|
||||||
|
self.documents[doc_id] = doc
|
||||||
|
self._save_doc(doc_id)
|
||||||
|
return doc_id
|
||||||
|
|
||||||
|
monkeypatch.setattr(PageIndexClient, "index", fake_index)
|
||||||
|
source = tmp_path / "failed.md"
|
||||||
|
source.write_text("# Failed\n\nbody", encoding="utf-8")
|
||||||
|
workspace = tmp_path / "workspace"
|
||||||
|
filesystem = make_filesystem(workspace)
|
||||||
|
pageindex_workspace = workspace / "artifacts" / "pageindex_client"
|
||||||
|
write_pageindex_client_doc(
|
||||||
|
pageindex_workspace,
|
||||||
|
"doc_unrelated",
|
||||||
|
{
|
||||||
|
"id": "doc_unrelated",
|
||||||
|
"type": "md",
|
||||||
|
"path": str((tmp_path / "unrelated.md").resolve()),
|
||||||
|
"doc_name": "unrelated.md",
|
||||||
|
"doc_description": "",
|
||||||
|
"line_count": 1,
|
||||||
|
"structure": [{"title": "Unrelated", "node_id": "0001", "nodes": []}],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
def fail_insert(records):
|
||||||
|
raise RuntimeError("catalog insert failed")
|
||||||
|
|
||||||
|
monkeypatch.setattr(filesystem.store, "insert_files", fail_insert)
|
||||||
|
|
||||||
|
with pytest.raises(RuntimeError, match="catalog insert failed"):
|
||||||
|
filesystem.add_file(source, "/documents")
|
||||||
|
|
||||||
|
assert not (pageindex_workspace / "doc_failed_add_md.json").exists()
|
||||||
|
assert (pageindex_workspace / "doc_unrelated.json").exists()
|
||||||
|
meta = json.loads((pageindex_workspace / "_meta.json").read_text(encoding="utf-8"))
|
||||||
|
assert "doc_failed_add_md" not in meta
|
||||||
|
assert "doc_unrelated" in meta
|
||||||
|
|
||||||
|
|
||||||
def test_add_failure_after_summary_vector_rolls_back_catalog_and_vector(
|
def test_add_failure_after_summary_vector_rolls_back_catalog_and_vector(
|
||||||
tmp_path, monkeypatch
|
tmp_path, monkeypatch
|
||||||
):
|
):
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue