fix(pifs): keep add imports semantically atomic

This commit is contained in:
BukeLy 2026-05-31 17:33:50 +08:00
parent 7096ba1388
commit 1c5ed63ef8
5 changed files with 243 additions and 47 deletions

View file

@ -203,6 +203,8 @@ class PageIndexFileSystem:
)
if self.store.file_basename_exists_in_folder(folder_path, filename):
raise FileExistsError(f"File already exists at {virtual_path}")
if not self.summary_projection_index:
raise RuntimeError("pifs add requires the summary projection index")
self._ensure_add_completion_defaults()
file_ref = make_file_ref(virtual_path.strip("/"))
@ -210,6 +212,7 @@ class PageIndexFileSystem:
final_dir = uploads_dir / file_ref
final_path = final_dir / filename
final_dir_created = False
catalog_inserted = False
records: list[dict[str, Any]] = []
uploads_dir.mkdir(parents=True, exist_ok=True)
@ -242,12 +245,22 @@ class PageIndexFileSystem:
self._require_add_pageindex_ready(record)
self._generate_register_metadata(record)
self._require_add_metadata_ready(record)
self._complete_summary_projection_index(record)
self._require_add_summary_projection_ready(record)
self._register_generation_policy_schema(records)
self._sync_owned_raw_artifact(record)
self.store.insert_files(records)
catalog_inserted = True
if self._complete_summary_projection_index(record):
self.store.update_file_metadata_status(
record["file_ref"],
metadata=record["metadata"],
metadata_status=record["metadata_status"],
)
self._require_add_summary_projection_ready(record)
self._sync_owned_raw_artifact(record)
self._ensure_add_semantic_retrieval_ready()
except Exception:
if catalog_inserted:
self._cleanup_add_catalog_record(file_ref)
self._cleanup_add_summary_projection(records)
self._cleanup_failed_register_artifacts(records)
if final_dir_created:
shutil.rmtree(final_dir, ignore_errors=True)
@ -355,6 +368,63 @@ class PageIndexFileSystem:
embedding_timeout=self.summary_projection_embedding_timeout,
)
def _ensure_add_semantic_retrieval_ready(self) -> None:
indexer = self.summary_projection_indexer
if indexer is None:
raise RuntimeError("pifs add requires a summary projection indexer")
from .hybrid_projection import HybridProjectionSearchBackend
index_dir = Path(getattr(indexer, "index_dir", self.summary_projection_index_dir))
embedder = getattr(indexer, "embedder", None)
if embedder is None:
self.configure_hybrid_projection_retrieval(
index_dir,
embedding_provider=str(
getattr(
indexer,
"embedding_provider",
self.summary_projection_embedding_provider,
)
),
embedding_model=str(
getattr(indexer, "embedding_model", self.summary_projection_embedding_model)
),
embedding_dimensions=int(
getattr(
indexer,
"embedding_dimensions",
self.summary_projection_embedding_dimensions,
)
),
embedding_timeout=self.summary_projection_embedding_timeout,
)
else:
embedding_cache = getattr(indexer, "embedding_cache", None)
self.semantic_retrieval_backend = HybridProjectionSearchBackend(
index_dir,
embedder=embedder,
embedding_provider=str(
getattr(
indexer,
"embedding_provider",
self.summary_projection_embedding_provider,
)
),
embedding_model=str(
getattr(indexer, "embedding_model", self.summary_projection_embedding_model)
),
embedding_dimensions=int(
getattr(
indexer,
"embedding_dimensions",
self.summary_projection_embedding_dimensions,
)
),
embedding_cache_path=getattr(embedding_cache, "db_path", None),
)
if "summary" not in self.semantic_retrieval_channels():
raise RuntimeError("pifs add failed to configure summary semantic retrieval")
def configure_existing_projection_retrieval(self) -> bool:
"""Attach semantic retrieval to already-built projection indexes.
@ -1615,6 +1685,32 @@ class PageIndexFileSystem:
if record.get("_pifs_owned_raw_artifact") and record.get("raw_artifact_path"):
self._unlink_artifact(record["raw_artifact_path"])
def _cleanup_add_catalog_record(self, file_ref: str) -> None:
try:
self.store.delete_file(file_ref)
except Exception:
return
def _cleanup_add_summary_projection(self, records: list[dict[str, Any]]) -> None:
indexer = self.summary_projection_indexer
if indexer is None:
return
delete_summary = getattr(indexer, "delete_summary", None)
for record in records:
file_ref = str(record.get("file_ref") or "")
if not file_ref:
continue
try:
if callable(delete_summary):
delete_summary(file_ref)
continue
index = getattr(indexer, "index", None)
delete_file_refs = getattr(index, "delete_file_refs", None)
if callable(delete_file_refs):
delete_file_refs([file_ref])
except Exception:
continue
@staticmethod
def _metadata_policy_is_batch(policy: dict[str, Any]) -> bool:
return bool(policy.get("batch")) or policy.get("mode") == "batch"

View file

@ -104,6 +104,9 @@ class SummaryProjectionIndexer:
"embedding_dimensions": self.embedding_dimensions,
}
def delete_summary(self, file_ref: str) -> int:
return self.index.delete_file_refs([file_ref])
def _ensure_index(self) -> None:
if not self.index.db_path.exists():
self.index.reset(

View file

@ -146,6 +146,35 @@ class SQLiteVecSemanticIndex:
conn.commit()
return inserted
def delete_file_refs(self, file_refs: list[str]) -> int:
refs = [str(file_ref) for file_ref in file_refs if str(file_ref)]
if not refs:
return 0
placeholders = ", ".join("?" for _ in refs)
with self.connect() as conn:
rows = conn.execute(
f"""
SELECT rowid
FROM semantic_index_docs
WHERE file_ref IN ({placeholders})
""",
refs,
).fetchall()
rowids = [int(row["rowid"]) for row in rows]
if not rowids:
return 0
rowid_placeholders = ", ".join("?" for _ in rowids)
conn.execute(
f"DELETE FROM semantic_index_vec WHERE rowid IN ({rowid_placeholders})",
rowids,
)
conn.execute(
f"DELETE FROM semantic_index_docs WHERE rowid IN ({rowid_placeholders})",
rowids,
)
conn.commit()
return len(rowids)
def search(
self,
vector: list[float],

View file

@ -1056,6 +1056,13 @@ class SQLiteFileSystemStore:
(metadata_text_value, file_ref),
)
def delete_file(self, target: str) -> None:
with self.connect() as conn:
file_ref = self._resolve_file_ref(conn, target)
conn.execute("DELETE FROM file_fts WHERE file_ref = ?", (file_ref,))
conn.execute("DELETE FROM metadata_values WHERE file_ref = ?", (file_ref,))
conn.execute("DELETE FROM files WHERE file_ref = ?", (file_ref,))
def resolve_file_ref(self, target: str) -> str:
with self.connect() as conn:
return self._resolve_file_ref(conn, target)

View file

@ -19,13 +19,34 @@ class GeneratedMetadata:
return {field: values[field] for field in fields if field in values}
class RecordingSummaryIndexer:
def __init__(self):
self.upserted = []
class StaticEmbedder:
def embed(self, texts):
return [[1.0, 0.0, 0.0] for _ in texts]
def upsert_summary(self, record):
self.upserted.append(dict(record))
return {"status": "ready", "indexed_rows": 1}
def make_summary_indexer(workspace: Path):
from pageindex.filesystem.projection_indexing import SummaryProjectionIndexer
return SummaryProjectionIndexer(
workspace / "artifacts" / "projection_indexes",
embedder=StaticEmbedder(),
embedding_provider="test",
embedding_model="static",
embedding_dimensions=3,
)
def make_filesystem(workspace: Path):
from pageindex.filesystem import PageIndexFileSystem
return PageIndexFileSystem(
workspace=workspace,
metadata_generator=GeneratedMetadata(),
summary_projection_indexer=make_summary_indexer(workspace),
summary_projection_embedding_provider="test",
summary_projection_embedding_model="static",
summary_projection_embedding_dimensions=3,
)
def write_pageindex_client_doc(workspace: Path, doc_id: str, doc: dict) -> None:
@ -50,16 +71,12 @@ def write_pageindex_client_doc(workspace: Path, doc_id: str, doc: dict) -> None:
def test_add_text_folder_target_copies_artifact_indexes_summary_and_is_readable(tmp_path):
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
from pageindex.filesystem import PIFSCommandExecutor
source = tmp_path / "filing.txt"
source.write_text("alpha filing text for pifs add", encoding="utf-8")
indexer = RecordingSummaryIndexer()
filesystem = PageIndexFileSystem(
workspace=tmp_path / "workspace",
metadata_generator=GeneratedMetadata(),
summary_projection_indexer=indexer,
)
workspace = tmp_path / "workspace"
filesystem = make_filesystem(workspace)
info = filesystem.add_file(str(source), "/documents/reports")
@ -77,20 +94,15 @@ def test_add_text_folder_target_copies_artifact_indexes_summary_and_is_readable(
assert rendered["data"]["text"] == "alpha filing text for pifs add"
assert info["metadata"]["summary"].startswith("Summary for filing.txt")
assert indexer.upserted[0]["file_ref"] == info["file_ref"]
assert indexer.upserted[0]["metadata"]["summary"] == info["metadata"]["summary"]
assert filesystem.summary_projection_indexer.index.info()["document_count"] == 1
def test_add_rejects_same_folder_same_basename_without_overwrite(tmp_path):
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
from pageindex.filesystem import PIFSCommandExecutor
source = tmp_path / "conflict.txt"
source.write_text("first body", encoding="utf-8")
filesystem = PageIndexFileSystem(
workspace=tmp_path / "workspace",
metadata_generator=GeneratedMetadata(),
summary_projection_indexer=RecordingSummaryIndexer(),
)
filesystem = make_filesystem(tmp_path / "workspace")
filesystem.add_file(source, "/documents")
source.write_text("second body must not overwrite", encoding="utf-8")
@ -104,15 +116,9 @@ def test_add_rejects_same_folder_same_basename_without_overwrite(tmp_path):
def test_add_rejects_unsupported_type_before_registration(tmp_path):
from pageindex.filesystem import PageIndexFileSystem
source = tmp_path / "payload.json"
source.write_text('{"unsupported": true}', encoding="utf-8")
filesystem = PageIndexFileSystem(
workspace=tmp_path / "workspace",
metadata_generator=GeneratedMetadata(),
summary_projection_indexer=RecordingSummaryIndexer(),
)
filesystem = make_filesystem(tmp_path / "workspace")
with pytest.raises(ValueError, match="Unsupported file type"):
filesystem.add_file(source, "/documents")
@ -121,9 +127,49 @@ def test_add_rejects_unsupported_type_before_registration(tmp_path):
assert not list((tmp_path / "workspace" / "artifacts" / "uploads").glob("**/*"))
def test_add_rejects_disabled_summary_projection_before_registration(tmp_path):
from pageindex.filesystem import PageIndexFileSystem
source = tmp_path / "disabled.txt"
source.write_text("must not register without summary vector", encoding="utf-8")
workspace = tmp_path / "workspace"
filesystem = PageIndexFileSystem(
workspace=workspace,
metadata_generator=GeneratedMetadata(),
summary_projection_index=False,
)
with pytest.raises(RuntimeError, match="summary projection index"):
filesystem.add_file(source, "/documents")
assert filesystem.browse("/", recursive=True)["files"] == []
assert not list((workspace / "artifacts" / "uploads").glob("**/*"))
assert not list((workspace / "artifacts" / "text").glob("*.txt"))
assert not list((workspace / "artifacts" / "raw").glob("*.json"))
def test_add_configures_semantic_retrieval_in_same_filesystem_instance(tmp_path):
source = tmp_path / "semantic.txt"
source.write_text("alpha semantic recall text", encoding="utf-8")
filesystem = make_filesystem(tmp_path / "workspace")
assert filesystem.semantic_retrieval_channels() == ()
filesystem.add_file(source, "/documents")
assert filesystem.semantic_retrieval_channels() == ("summary",)
results = filesystem.search_semantic_channel(
"summary",
"semantic recall",
scope={"folder_path": "/documents", "recursive": True},
limit=5,
)
assert [result.source_path for result in results] == ["documents/semantic.txt"]
def test_add_markdown_builds_pageindex_tree_from_copied_artifact(tmp_path, monkeypatch):
from pageindex import PageIndexClient
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
from pageindex.filesystem import PIFSCommandExecutor
indexed_paths = []
@ -154,11 +200,7 @@ def test_add_markdown_builds_pageindex_tree_from_copied_artifact(tmp_path, monke
monkeypatch.setattr(PageIndexClient, "index", fake_index)
source = tmp_path / "notes.md"
source.write_text("# Notes\n\ncopied markdown body", encoding="utf-8")
filesystem = PageIndexFileSystem(
workspace=tmp_path / "workspace",
metadata_generator=GeneratedMetadata(),
summary_projection_indexer=RecordingSummaryIndexer(),
)
filesystem = make_filesystem(tmp_path / "workspace")
info = filesystem.add_file(source, "/documents")
executor = PIFSCommandExecutor(filesystem, json_output=True)
@ -171,15 +213,10 @@ def test_add_markdown_builds_pageindex_tree_from_copied_artifact(tmp_path, monke
def test_add_failure_does_not_leave_visible_catalog_or_artifacts(tmp_path, monkeypatch):
from pageindex.filesystem import PageIndexFileSystem
source = tmp_path / "atomic.txt"
source.write_text("atomic body", encoding="utf-8")
filesystem = PageIndexFileSystem(
workspace=tmp_path / "workspace",
metadata_generator=GeneratedMetadata(),
summary_projection_indexer=RecordingSummaryIndexer(),
)
workspace = tmp_path / "workspace"
filesystem = make_filesystem(workspace)
def fail_insert(records):
raise RuntimeError("catalog insert failed")
@ -190,9 +227,33 @@ def test_add_failure_does_not_leave_visible_catalog_or_artifacts(tmp_path, monke
filesystem.add_file(source, "/documents")
assert filesystem.browse("/", recursive=True)["files"] == []
assert not list((tmp_path / "workspace" / "artifacts" / "uploads").glob("**/*"))
assert not list((tmp_path / "workspace" / "artifacts" / "text").glob("*.txt"))
assert not list((tmp_path / "workspace" / "artifacts" / "raw").glob("*.json"))
assert filesystem.summary_projection_indexer.index.info()["document_count"] == 0
assert not list((workspace / "artifacts" / "uploads").glob("**/*"))
assert not list((workspace / "artifacts" / "text").glob("*.txt"))
assert not list((workspace / "artifacts" / "raw").glob("*.json"))
def test_add_failure_after_summary_vector_rolls_back_catalog_and_vector(
tmp_path, monkeypatch
):
source = tmp_path / "post_vector.txt"
source.write_text("post vector rollback body", encoding="utf-8")
workspace = tmp_path / "workspace"
filesystem = make_filesystem(workspace)
def fail_status_update(*args, **kwargs):
raise RuntimeError("metadata status update failed")
monkeypatch.setattr(filesystem.store, "update_file_metadata_status", fail_status_update)
with pytest.raises(RuntimeError, match="metadata status update failed"):
filesystem.add_file(source, "/documents")
assert filesystem.browse("/", recursive=True)["files"] == []
assert filesystem.summary_projection_indexer.index.info()["document_count"] == 0
assert not list((workspace / "artifacts" / "uploads").glob("**/*"))
assert not list((workspace / "artifacts" / "text").glob("*.txt"))
assert not list((workspace / "artifacts" / "raw").glob("*.json"))
def test_cli_add_uses_workspace_and_prints_added_file(monkeypatch, capsys, tmp_path):