Merge Goal 5: workspace file import command

Merge pifs add command and atomic import handling into feat/pageindex-filesystem.
This commit is contained in:
Bukely_ 2026-05-31 21:42:53 +08:00 committed by GitHub
commit d3034fa1b9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 1016 additions and 1 deletions

View file

@ -279,6 +279,23 @@ def _run_passthrough(
return 0
def _run_add(argv: list[str], *, workspace: str) -> int:
parser = argparse.ArgumentParser(
prog="pifs add",
description="Add a local file to a PageIndex FileSystem workspace",
)
parser.add_argument("physical_path")
parser.add_argument("virtual_target")
args = parser.parse_args(argv)
filesystem = _filesystem_from_workspace(workspace)
info = filesystem.add_file(args.physical_path, args.virtual_target)
print(f"added: {info.get('path') or '/' + str(info.get('source_path') or '').strip('/')}")
print(f"file_ref: {info['file_ref']}")
print(f"storage_uri: {info['storage_uri']}")
return 0
def _run_set(argv: list[str]) -> int:
parser = argparse.ArgumentParser(
prog="pifs set",
@ -326,6 +343,10 @@ def main(argv: list[str] | None = None) -> int:
return _run_ask(command_args, workspace_default=args.workspace)
if command_name == "chat":
return _run_chat(command_args, workspace_default=args.workspace)
if command_name == "add":
if not args.workspace:
parser.error("--workspace is required unless PIFS_WORKSPACE is set or `pifs set workspace <path>` has been run")
return _run_add(command_args, workspace=args.workspace)
if "--json" in command_tokens:
command_tokens = [token for token in command_tokens if token != "--json"]

View file

@ -2,7 +2,9 @@ from __future__ import annotations
import json
import os
from pathlib import Path
import shutil
import tempfile
from pathlib import Path, PurePosixPath
from typing import TYPE_CHECKING, Any, Optional, Union
from urllib.parse import unquote, urlparse
@ -91,6 +93,13 @@ PAGEINDEX_DOCUMENT_CONTENT_TYPES = {
}
TEXT_ARTIFACT_SUFFIXES = {".txt", ".text"}
TEXT_ARTIFACT_CONTENT_TYPES = {"text/plain"}
ADD_FILE_CONTENT_TYPES = {
".pdf": "application/pdf",
".md": "text/markdown",
".markdown": "text/markdown",
".txt": "text/plain",
".text": "text/plain",
}
class PageIndexFileSystem:
@ -171,6 +180,100 @@ class PageIndexFileSystem:
self._ensure_register_completion_defaults()
return self.register_file(**kwargs)
def add_file(
self,
physical_path: Union[str, Path],
virtual_target: Union[str, Path],
) -> dict[str, Any]:
source = Path(physical_path).expanduser()
if not source.is_file():
raise FileNotFoundError(f"Source file not found: {source}")
suffix = source.suffix.lower()
content_type = ADD_FILE_CONTENT_TYPES.get(suffix)
if content_type is None:
supported = ", ".join(sorted(ADD_FILE_CONTENT_TYPES))
raise ValueError(
f"Unsupported file type: {suffix or '<none>'}; supported: {supported}"
)
folder_path, filename, virtual_path = self._resolve_add_target(
virtual_target,
physical_basename=source.name,
physical_suffix=suffix,
)
if self.store.file_basename_exists_in_folder(folder_path, filename):
raise FileExistsError(f"File already exists at {virtual_path}")
if not self.summary_projection_index:
raise RuntimeError("pifs add requires the summary projection index")
self._ensure_add_completion_defaults()
add_created_folder_paths = self._add_created_folder_paths(folder_path)
file_ref = make_file_ref(virtual_path.strip("/"))
uploads_dir = self.workspace / "artifacts" / "uploads"
final_dir = uploads_dir / file_ref
final_path = final_dir / filename
final_dir_created = False
catalog_inserted = False
records: list[dict[str, Any]] = []
preexisting_pageindex_doc_ids = self._pageindex_cache_doc_ids()
uploads_dir.mkdir(parents=True, exist_ok=True)
with tempfile.TemporaryDirectory(prefix=f".add-{file_ref}-", dir=uploads_dir) as tmp:
temp_path = Path(tmp) / filename
try:
shutil.copy2(source, temp_path)
if final_dir.exists():
raise FileExistsError(
f"Workspace artifact already exists for {virtual_path}: {final_dir}"
)
final_dir.mkdir(parents=True)
final_dir_created = True
os.replace(temp_path, final_path)
record = self._prepare_file_record(
{
"storage_uri": final_path.as_uri(),
"source_path": virtual_path.strip("/"),
"folder_path": folder_path,
"metadata": {},
"external_id": None,
"title": filename,
"content": self._add_file_content(final_path, content_type),
"content_type": content_type,
"metadata_policy": self._add_metadata_policy(),
}
)
records = [record]
self._require_add_pageindex_ready(record)
self._generate_register_metadata(record)
self._require_add_metadata_ready(record)
self._register_generation_policy_schema(records)
self.store.insert_files(records)
catalog_inserted = True
if self._complete_summary_projection_index(record):
self.store.update_file_metadata_status(
record["file_ref"],
metadata=record["metadata"],
metadata_status=record["metadata_status"],
)
self._require_add_summary_projection_ready(record)
self._sync_owned_raw_artifact(record)
self._ensure_add_semantic_retrieval_ready()
except Exception:
if catalog_inserted:
self._cleanup_add_catalog_record(file_ref)
self._cleanup_add_summary_projection(records)
self._cleanup_failed_register_artifacts(records)
self._cleanup_add_pageindex_cache(records, preexisting_pageindex_doc_ids)
self._cleanup_add_created_folders(add_created_folder_paths)
if final_dir_created:
shutil.rmtree(final_dir, ignore_errors=True)
raise
info = self.store.file_info(file_ref)
info["path"] = virtual_path
return info
def register_files(self, files: list[dict[str, Any]]) -> list[str]:
records = [self._prepare_file_record(file) for file in files]
try:
@ -250,6 +353,97 @@ class PageIndexFileSystem:
embedding_timeout=self.summary_projection_embedding_timeout,
)
def _ensure_add_completion_defaults(self) -> None:
if self.metadata_generator is None:
self.metadata_generator = MetadataGenerator(
provider=self.metadata_provider,
model=self.metadata_model,
base_url=self.metadata_base_url,
max_text_chars=self.metadata_max_text_chars,
)
if self.summary_projection_index and self.summary_projection_indexer is None:
from .projection_indexing import SummaryProjectionIndexer
self.summary_projection_indexer = SummaryProjectionIndexer.from_provider(
self.summary_projection_index_dir,
embedding_provider=self.summary_projection_embedding_provider,
embedding_model=self.summary_projection_embedding_model,
embedding_dimensions=self.summary_projection_embedding_dimensions,
embedding_timeout=self.summary_projection_embedding_timeout,
)
def _ensure_add_semantic_retrieval_ready(self) -> None:
indexer = self.summary_projection_indexer
if indexer is None:
raise RuntimeError("pifs add requires a summary projection indexer")
from .hybrid_projection import HybridProjectionSearchBackend
index_dir = Path(getattr(indexer, "index_dir", self.summary_projection_index_dir))
embedder = getattr(indexer, "embedder", None)
if embedder is None:
self.configure_hybrid_projection_retrieval(
index_dir,
embedding_provider=str(
getattr(
indexer,
"embedding_provider",
self.summary_projection_embedding_provider,
)
),
embedding_model=str(
getattr(indexer, "embedding_model", self.summary_projection_embedding_model)
),
embedding_dimensions=int(
getattr(
indexer,
"embedding_dimensions",
self.summary_projection_embedding_dimensions,
)
),
embedding_timeout=self.summary_projection_embedding_timeout,
)
else:
embedding_cache = getattr(indexer, "embedding_cache", None)
self.semantic_retrieval_backend = HybridProjectionSearchBackend(
index_dir,
embedder=embedder,
embedding_provider=str(
getattr(
indexer,
"embedding_provider",
self.summary_projection_embedding_provider,
)
),
embedding_model=str(
getattr(indexer, "embedding_model", self.summary_projection_embedding_model)
),
embedding_dimensions=int(
getattr(
indexer,
"embedding_dimensions",
self.summary_projection_embedding_dimensions,
)
),
embedding_cache_path=getattr(embedding_cache, "db_path", None),
)
if "summary" not in self.semantic_retrieval_channels():
raise RuntimeError("pifs add failed to configure summary semantic retrieval")
def _add_created_folder_paths(self, folder_path: str) -> list[str]:
paths = self._folder_ancestor_paths(folder_path)
return [path for path in paths if not self.store.folder_exists(path)]
@staticmethod
def _folder_ancestor_paths(folder_path: str) -> list[str]:
normalized = normalize_path(folder_path)
if normalized == "/":
return []
segments = [segment for segment in normalized.strip("/").split("/") if segment]
paths: list[str] = []
for index in range(1, len(segments) + 1):
paths.append("/" + "/".join(segments[:index]))
return paths
def configure_existing_projection_retrieval(self) -> bool:
"""Attach semantic retrieval to already-built projection indexes.
@ -1075,6 +1269,117 @@ class PageIndexFileSystem:
def _create_folder(self, path: str) -> str:
return self.create_folder(path)
@classmethod
def _resolve_add_target(
cls,
virtual_target: Union[str, Path],
*,
physical_basename: str,
physical_suffix: str,
) -> tuple[str, str, str]:
raw_target = str(virtual_target).strip()
if not raw_target:
raise ValueError("pifs add target is required")
normalized = normalize_path(raw_target)
posix_target = PurePosixPath(normalized)
raw_looks_like_folder = raw_target.replace("\\", "/").endswith("/")
target_suffix = posix_target.suffix.lower()
if raw_looks_like_folder or target_suffix not in ADD_FILE_CONTENT_TYPES:
folder_path = normalized
filename = physical_basename
else:
if target_suffix != physical_suffix:
raise ValueError(
"pifs add target file extension must match the physical file extension"
)
folder_path = normalize_path(str(posix_target.parent))
filename = posix_target.name
cls._validate_add_filename(filename)
virtual_path = cls._join_virtual_file_path(folder_path, filename)
return folder_path, filename, virtual_path
@staticmethod
def _validate_add_filename(filename: str) -> None:
if not filename or filename in {".", ".."}:
raise ValueError("pifs add target filename is required")
if "/" in filename or "\\" in filename:
raise ValueError("pifs add target filename must be a basename")
@staticmethod
def _join_virtual_file_path(folder_path: str, filename: str) -> str:
folder_path = normalize_path(folder_path)
if folder_path == "/":
return f"/{filename}"
return f"{folder_path}/{filename}"
@staticmethod
def _add_metadata_policy() -> dict[str, Any]:
return {
"fields": {
"summary": True,
"doc_type": False,
"domain": False,
"topic": False,
"entity": False,
"relation": False,
},
"projection_indexes": {"summary": True},
"batch": False,
}
def _add_file_content(self, path: Path, content_type: str) -> str:
if self._source_format(str(path), content_type) in {"markdown", "text"}:
return path.read_text(encoding="utf-8")
return ""
def _require_add_pageindex_ready(self, record: dict[str, Any]) -> None:
if self._source_format(record["source_path"], record["content_type"]) not in {
"pdf",
"markdown",
}:
return
if record.get("pageindex_tree_status") == "built" and record.get("pageindex_doc_id"):
return
message = self._pageindex_tree_failure_message(record.get("metadata_status")) or (
"PageIndex tree was not built"
)
raise RuntimeError(f"pifs add failed to build PageIndex tree: {message}")
@staticmethod
def _require_add_metadata_ready(record: dict[str, Any]) -> None:
metadata = record.get("metadata") or {}
summary = str(metadata.get("summary") or "").strip()
if not summary:
raise MetadataGenerationError(
"pifs add requires synchronous generated summary metadata"
)
status = record.get("metadata_status") or {}
summary_status = (status.get("fields") or {}).get("summary") or {}
if summary_status.get("status") != "generated":
raise MetadataGenerationError(
"pifs add requires generated summary metadata before registration"
)
if status.get("status") == "failed":
raise MetadataGenerationError(
"pifs add metadata generation failed before registration"
)
def _require_add_summary_projection_ready(self, record: dict[str, Any]) -> None:
if not self.summary_projection_index:
return
summary_projection = (
(record.get("metadata_status") or {})
.get("projection_indexes", {})
.get("summary")
)
if not summary_projection or not summary_projection.get("requested"):
raise RuntimeError("pifs add requires a requested summary projection index")
if summary_projection.get("status") != "ready":
detail = summary_projection.get("error") or summary_projection.get("status")
raise RuntimeError(
f"pifs add failed to build summary projection index: {detail}"
)
def _prepare_file_record(self, file: dict[str, Any]) -> dict[str, Any]:
storage_uri = file["storage_uri"]
raw_source_path = str(file["source_path"])
@ -1399,6 +1704,98 @@ class PageIndexFileSystem:
if record.get("_pifs_owned_raw_artifact") and record.get("raw_artifact_path"):
self._unlink_artifact(record["raw_artifact_path"])
def _cleanup_add_catalog_record(self, file_ref: str) -> None:
try:
self.store.delete_file(file_ref)
except Exception:
return
def _cleanup_add_summary_projection(self, records: list[dict[str, Any]]) -> None:
indexer = self.summary_projection_indexer
if indexer is None:
return
delete_summary = getattr(indexer, "delete_summary", None)
for record in records:
file_ref = str(record.get("file_ref") or "")
if not file_ref:
continue
try:
if callable(delete_summary):
delete_summary(file_ref)
continue
index = getattr(indexer, "index", None)
delete_file_refs = getattr(index, "delete_file_refs", None)
if callable(delete_file_refs):
delete_file_refs([file_ref])
except Exception:
continue
def _cleanup_add_created_folders(self, folder_paths: list[str]) -> None:
for folder_path in reversed(folder_paths):
try:
self.store.delete_empty_folder(folder_path)
except Exception:
continue
def _pageindex_cache_doc_ids(self) -> set[str]:
workspace = self.pageindex_client_workspace
doc_ids = {path.stem for path in workspace.glob("*.json") if path.name != "_meta.json"}
meta_path = workspace / "_meta.json"
if not meta_path.exists():
return doc_ids
try:
payload = json.loads(meta_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return doc_ids
if isinstance(payload, dict):
doc_ids.update(str(doc_id) for doc_id in payload)
return doc_ids
def _cleanup_add_pageindex_cache(
self,
records: list[dict[str, Any]],
preexisting_doc_ids: set[str],
) -> None:
doc_ids = sorted(self._pageindex_cache_doc_ids() - preexisting_doc_ids)
for record in records:
doc_id = str(record.get("pageindex_doc_id") or "").strip()
if doc_id and doc_id not in preexisting_doc_ids:
doc_ids.append(doc_id)
doc_ids = sorted(set(doc_ids))
if not doc_ids:
return
workspace = self.pageindex_client_workspace
for doc_id in doc_ids:
try:
(workspace / f"{doc_id}.json").unlink()
except FileNotFoundError:
pass
except Exception:
continue
meta_path = workspace / "_meta.json"
if not meta_path.exists():
return
try:
payload = json.loads(meta_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return
if not isinstance(payload, dict):
return
changed = False
for doc_id in doc_ids:
if doc_id in payload:
payload.pop(doc_id, None)
changed = True
if not changed:
return
try:
meta_path.write_text(
json.dumps(payload, ensure_ascii=False, indent=2),
encoding="utf-8",
)
except OSError:
return
@staticmethod
def _metadata_policy_is_batch(policy: dict[str, Any]) -> bool:
return bool(policy.get("batch")) or policy.get("mode") == "batch"

View file

@ -104,6 +104,9 @@ class SummaryProjectionIndexer:
"embedding_dimensions": self.embedding_dimensions,
}
def delete_summary(self, file_ref: str) -> int:
return self.index.delete_file_refs([file_ref])
def _ensure_index(self) -> None:
if not self.index.db_path.exists():
self.index.reset(

View file

@ -146,6 +146,35 @@ class SQLiteVecSemanticIndex:
conn.commit()
return inserted
def delete_file_refs(self, file_refs: list[str]) -> int:
refs = [str(file_ref) for file_ref in file_refs if str(file_ref)]
if not refs:
return 0
placeholders = ", ".join("?" for _ in refs)
with self.connect() as conn:
rows = conn.execute(
f"""
SELECT rowid
FROM semantic_index_docs
WHERE file_ref IN ({placeholders})
""",
refs,
).fetchall()
rowids = [int(row["rowid"]) for row in rows]
if not rowids:
return 0
rowid_placeholders = ", ".join("?" for _ in rowids)
conn.execute(
f"DELETE FROM semantic_index_vec WHERE rowid IN ({rowid_placeholders})",
rowids,
)
conn.execute(
f"DELETE FROM semantic_index_docs WHERE rowid IN ({rowid_placeholders})",
rowids,
)
conn.commit()
return len(rowids)
def search(
self,
vector: list[float],

View file

@ -1056,6 +1056,55 @@ class SQLiteFileSystemStore:
(metadata_text_value, file_ref),
)
def delete_file(self, target: str) -> None:
with self.connect() as conn:
file_ref = self._resolve_file_ref(conn, target)
conn.execute("DELETE FROM file_fts WHERE file_ref = ?", (file_ref,))
conn.execute("DELETE FROM metadata_values WHERE file_ref = ?", (file_ref,))
conn.execute("DELETE FROM files WHERE file_ref = ?", (file_ref,))
def folder_exists(self, path: str) -> bool:
path = normalize_path(path)
with self.connect() as conn:
row = conn.execute(
"SELECT 1 FROM folders WHERE path = ?",
(path,),
).fetchone()
return row is not None
def delete_empty_folder(self, path: str) -> bool:
path = normalize_path(path)
if path == "/":
return False
with self.connect() as conn:
folder = self._folder_by_path(conn, path)
if folder is None:
return False
has_files = conn.execute(
"""
SELECT 1
FROM file_folders
WHERE folder_id = ?
LIMIT 1
""",
(folder["folder_id"],),
).fetchone()
if has_files is not None:
return False
has_children = conn.execute(
"""
SELECT 1
FROM folders
WHERE parent_id = ?
LIMIT 1
""",
(folder["folder_id"],),
).fetchone()
if has_children is not None:
return False
conn.execute("DELETE FROM folders WHERE folder_id = ?", (folder["folder_id"],))
return True
def resolve_file_ref(self, target: str) -> str:
with self.connect() as conn:
return self._resolve_file_ref(conn, target)
@ -1412,6 +1461,36 @@ class SQLiteFileSystemStore:
).fetchone()
return int(row["count"] or 0)
def file_basename_exists_in_folder(self, path: str, basename: str) -> bool:
path = normalize_path(path)
basename = str(basename).strip()
if not basename:
return False
with self.connect() as conn:
row = conn.execute(
"""
SELECT 1
FROM files f
JOIN file_folders ff ON ff.file_ref = f.file_ref
JOIN folders fo ON fo.folder_id = ff.folder_id
WHERE f.deleted_at IS NULL
AND fo.path = ?
AND (
f.title = ?
OR f.source_path = ?
OR f.source_path LIKE ? ESCAPE '\\'
)
LIMIT 1
""",
(
path,
basename,
basename,
"%/" + self._like_escape(basename),
),
).fetchone()
return row is not None
def folder_subtree_thresholds(
self,
path: str,

View file

@ -0,0 +1,486 @@
import json
from pathlib import Path
import pytest
class GeneratedMetadata:
def __init__(self):
self.calls = []
def generate(self, request, *, fields):
self.calls.append((request, list(fields)))
values = {
"summary": f"Summary for {request.title}: {request.text[:60]}",
"doc_type": "uploaded_file",
"domain": "workspace",
"topic": "pifs add",
}
return {field: values[field] for field in fields if field in values}
class StaticEmbedder:
def embed(self, texts):
return [[1.0, 0.0, 0.0] for _ in texts]
def make_summary_indexer(workspace: Path):
from pageindex.filesystem.projection_indexing import SummaryProjectionIndexer
return SummaryProjectionIndexer(
workspace / "artifacts" / "projection_indexes",
embedder=StaticEmbedder(),
embedding_provider="test",
embedding_model="static",
embedding_dimensions=3,
)
def make_filesystem(workspace: Path):
from pageindex.filesystem import PageIndexFileSystem
return PageIndexFileSystem(
workspace=workspace,
metadata_generator=GeneratedMetadata(),
summary_projection_indexer=make_summary_indexer(workspace),
summary_projection_embedding_provider="test",
summary_projection_embedding_model="static",
summary_projection_embedding_dimensions=3,
)
def write_pageindex_client_doc(workspace: Path, doc_id: str, doc: dict) -> None:
workspace.mkdir(parents=True, exist_ok=True)
(workspace / f"{doc_id}.json").write_text(
json.dumps(doc, ensure_ascii=False, indent=2),
encoding="utf-8",
)
meta = {
doc_id: {
"type": doc.get("type", ""),
"doc_name": doc.get("doc_name", ""),
"doc_description": doc.get("doc_description", ""),
"path": doc.get("path", ""),
"line_count": doc.get("line_count"),
}
}
(workspace / "_meta.json").write_text(
json.dumps(meta, ensure_ascii=False, indent=2),
encoding="utf-8",
)
def test_add_text_folder_target_copies_artifact_indexes_summary_and_is_readable(tmp_path):
from pageindex.filesystem import PIFSCommandExecutor
source = tmp_path / "filing.txt"
source.write_text("alpha filing text for pifs add", encoding="utf-8")
workspace = tmp_path / "workspace"
filesystem = make_filesystem(workspace)
info = filesystem.add_file(str(source), "/documents/reports")
assert info["source_path"] == "documents/reports/filing.txt"
assert info["folder_path"] == "/documents/reports"
assert filesystem.folder_info("/documents/reports")["path"] == "/documents/reports"
assert info["storage_uri"] != source.as_uri()
assert "/artifacts/uploads/" in info["storage_uri"]
copied_path = Path(info["storage_uri"].removeprefix("file://"))
assert copied_path.read_text(encoding="utf-8") == "alpha filing text for pifs add"
assert copied_path.resolve() != source.resolve()
executor = PIFSCommandExecutor(filesystem, json_output=True)
rendered = json.loads(executor.execute("cat /documents/reports/filing.txt --all"))
assert rendered["data"]["text"] == "alpha filing text for pifs add"
assert info["metadata"]["summary"].startswith("Summary for filing.txt")
assert filesystem.summary_projection_indexer.index.info()["document_count"] == 1
def test_add_rejects_same_folder_same_basename_without_overwrite(tmp_path):
from pageindex.filesystem import PIFSCommandExecutor
source = tmp_path / "conflict.txt"
source.write_text("first body", encoding="utf-8")
filesystem = make_filesystem(tmp_path / "workspace")
filesystem.add_file(source, "/documents")
source.write_text("second body must not overwrite", encoding="utf-8")
with pytest.raises(FileExistsError, match="already exists"):
filesystem.add_file(source, "/documents")
executor = PIFSCommandExecutor(filesystem, json_output=True)
rendered = json.loads(executor.execute("cat /documents/conflict.txt --all"))
assert rendered["data"]["text"] == "first body"
def test_add_rejects_unsupported_type_before_registration(tmp_path):
source = tmp_path / "payload.json"
source.write_text('{"unsupported": true}', encoding="utf-8")
filesystem = make_filesystem(tmp_path / "workspace")
with pytest.raises(ValueError, match="Unsupported file type"):
filesystem.add_file(source, "/documents")
assert filesystem.browse("/", recursive=True)["files"] == []
assert not list((tmp_path / "workspace" / "artifacts" / "uploads").glob("**/*"))
def test_add_rejects_disabled_summary_projection_before_registration(tmp_path):
from pageindex.filesystem import PageIndexFileSystem
source = tmp_path / "disabled.txt"
source.write_text("must not register without summary vector", encoding="utf-8")
workspace = tmp_path / "workspace"
filesystem = PageIndexFileSystem(
workspace=workspace,
metadata_generator=GeneratedMetadata(),
summary_projection_index=False,
)
with pytest.raises(RuntimeError, match="summary projection index"):
filesystem.add_file(source, "/documents")
assert filesystem.browse("/", recursive=True)["files"] == []
assert not list((workspace / "artifacts" / "uploads").glob("**/*"))
assert not list((workspace / "artifacts" / "text").glob("*.txt"))
assert not list((workspace / "artifacts" / "raw").glob("*.json"))
def test_add_configures_semantic_retrieval_in_same_filesystem_instance(tmp_path):
source = tmp_path / "semantic.txt"
source.write_text("alpha semantic recall text", encoding="utf-8")
filesystem = make_filesystem(tmp_path / "workspace")
assert filesystem.semantic_retrieval_channels() == ()
filesystem.add_file(source, "/documents")
assert filesystem.semantic_retrieval_channels() == ("summary",)
results = filesystem.search_semantic_channel(
"summary",
"semantic recall",
scope={"folder_path": "/documents", "recursive": True},
limit=5,
)
assert [result.source_path for result in results] == ["documents/semantic.txt"]
def test_add_markdown_builds_pageindex_tree_from_copied_artifact(tmp_path, monkeypatch):
from pageindex import PageIndexClient
from pageindex.filesystem import PIFSCommandExecutor
indexed_paths = []
def fake_index(self, file_path, mode="auto"):
indexed_paths.append(Path(file_path))
doc_id = "doc_added_md"
doc = {
"id": doc_id,
"type": "md",
"path": str(Path(file_path).resolve()),
"doc_name": "notes.md",
"doc_description": "",
"line_count": 3,
"structure": [
{
"title": "Notes",
"node_id": "0001",
"line_num": 1,
"text": "# Notes\n\ncopied markdown body",
"nodes": [],
}
],
}
write_pageindex_client_doc(self.workspace, doc_id, doc)
self.documents[doc_id] = doc
return doc_id
monkeypatch.setattr(PageIndexClient, "index", fake_index)
source = tmp_path / "notes.md"
source.write_text("# Notes\n\ncopied markdown body", encoding="utf-8")
filesystem = make_filesystem(tmp_path / "workspace")
info = filesystem.add_file(source, "/documents")
executor = PIFSCommandExecutor(filesystem, json_output=True)
structure = json.loads(executor.execute("cat /documents/notes.md --structure"))
assert structure["data"]["available"] is True
assert structure["data"]["structure"][0]["title"] == "Notes"
assert indexed_paths == [Path(info["storage_uri"].removeprefix("file://"))]
assert indexed_paths[0].resolve() != source.resolve()
def test_add_failure_does_not_leave_visible_catalog_or_artifacts(tmp_path, monkeypatch):
source = tmp_path / "atomic.txt"
source.write_text("atomic body", encoding="utf-8")
workspace = tmp_path / "workspace"
filesystem = make_filesystem(workspace)
def fail_insert(records):
raise RuntimeError("catalog insert failed")
monkeypatch.setattr(filesystem.store, "insert_files", fail_insert)
with pytest.raises(RuntimeError, match="catalog insert failed"):
filesystem.add_file(source, "/documents")
assert filesystem.browse("/", recursive=True)["files"] == []
assert filesystem.summary_projection_indexer.index.info()["document_count"] == 0
assert not list((workspace / "artifacts" / "uploads").glob("**/*"))
assert not list((workspace / "artifacts" / "text").glob("*.txt"))
assert not list((workspace / "artifacts" / "raw").glob("*.json"))
def test_add_markdown_insert_failure_removes_pageindex_cache(tmp_path, monkeypatch):
from pageindex import PageIndexClient
def fake_index(self, file_path, mode="auto"):
doc_id = "doc_failed_add_md"
doc = {
"id": doc_id,
"type": "md",
"path": str(Path(file_path).resolve()),
"doc_name": "failed.md",
"doc_description": "",
"line_count": 3,
"structure": [
{
"title": "Failed",
"node_id": "0001",
"line_num": 1,
"text": "# Failed\n\nbody",
"nodes": [],
}
],
}
write_pageindex_client_doc(self.workspace, doc_id, doc)
self.documents[doc_id] = doc
return doc_id
monkeypatch.setattr(PageIndexClient, "index", fake_index)
source = tmp_path / "failed.md"
source.write_text("# Failed\n\nbody", encoding="utf-8")
workspace = tmp_path / "workspace"
filesystem = make_filesystem(workspace)
def fail_insert(records):
raise RuntimeError("catalog insert failed")
monkeypatch.setattr(filesystem.store, "insert_files", fail_insert)
with pytest.raises(RuntimeError, match="catalog insert failed"):
filesystem.add_file(source, "/documents/reports")
pageindex_workspace = workspace / "artifacts" / "pageindex_client"
assert not (pageindex_workspace / "doc_failed_add_md.json").exists()
meta_path = pageindex_workspace / "_meta.json"
if meta_path.exists():
meta = json.loads(meta_path.read_text(encoding="utf-8"))
assert "doc_failed_add_md" not in meta
listing = filesystem.browse("/", recursive=True)
assert listing["files"] == []
assert listing["folders"] == []
assert filesystem.summary_projection_indexer.index.info()["document_count"] == 0
assert not list((workspace / "artifacts" / "uploads").glob("**/*"))
assert not list((workspace / "artifacts" / "text").glob("*.txt"))
assert not list((workspace / "artifacts" / "raw").glob("*.json"))
def test_add_markdown_index_failure_removes_pageindex_cache_delta(tmp_path, monkeypatch):
from pageindex import PageIndexClient
def fake_index(self, file_path, mode="auto"):
doc_id = "doc_partial_before_raise"
doc = {
"id": doc_id,
"type": "md",
"path": str(Path(file_path).resolve()),
"doc_name": "partial.md",
"doc_description": "",
"line_count": 3,
"structure": [{"title": "Partial", "node_id": "0001", "nodes": []}],
}
self.documents[doc_id] = doc
self._save_doc(doc_id)
raise RuntimeError("index failed after cache write")
monkeypatch.setattr(PageIndexClient, "index", fake_index)
source = tmp_path / "partial.md"
source.write_text("# Partial\n\nbody", encoding="utf-8")
workspace = tmp_path / "workspace"
filesystem = make_filesystem(workspace)
pageindex_workspace = workspace / "artifacts" / "pageindex_client"
with pytest.raises(RuntimeError, match="failed to build PageIndex tree"):
filesystem.add_file(source, "/documents/reports")
assert not (pageindex_workspace / "doc_partial_before_raise.json").exists()
meta_path = pageindex_workspace / "_meta.json"
if meta_path.exists():
meta = json.loads(meta_path.read_text(encoding="utf-8"))
assert "doc_partial_before_raise" not in meta
listing = filesystem.browse("/", recursive=True)
assert listing["files"] == []
assert listing["folders"] == []
assert filesystem.summary_projection_indexer.index.info()["document_count"] == 0
assert not list((workspace / "artifacts" / "uploads").glob("**/*"))
assert not list((workspace / "artifacts" / "text").glob("*.txt"))
assert not list((workspace / "artifacts" / "raw").glob("*.json"))
def test_add_markdown_failure_preserves_unrelated_pageindex_cache(tmp_path, monkeypatch):
from pageindex import PageIndexClient
def fake_index(self, file_path, mode="auto"):
doc_id = "doc_failed_add_md"
doc = {
"id": doc_id,
"type": "md",
"path": str(Path(file_path).resolve()),
"doc_name": "failed.md",
"doc_description": "",
"line_count": 3,
"structure": [{"title": "Failed", "node_id": "0001", "nodes": []}],
}
self.documents[doc_id] = doc
self._save_doc(doc_id)
return doc_id
monkeypatch.setattr(PageIndexClient, "index", fake_index)
source = tmp_path / "failed.md"
source.write_text("# Failed\n\nbody", encoding="utf-8")
workspace = tmp_path / "workspace"
filesystem = make_filesystem(workspace)
pageindex_workspace = workspace / "artifacts" / "pageindex_client"
write_pageindex_client_doc(
pageindex_workspace,
"doc_unrelated",
{
"id": "doc_unrelated",
"type": "md",
"path": str((tmp_path / "unrelated.md").resolve()),
"doc_name": "unrelated.md",
"doc_description": "",
"line_count": 1,
"structure": [{"title": "Unrelated", "node_id": "0001", "nodes": []}],
},
)
def fail_insert(records):
raise RuntimeError("catalog insert failed")
monkeypatch.setattr(filesystem.store, "insert_files", fail_insert)
with pytest.raises(RuntimeError, match="catalog insert failed"):
filesystem.add_file(source, "/documents")
assert not (pageindex_workspace / "doc_failed_add_md.json").exists()
assert (pageindex_workspace / "doc_unrelated.json").exists()
meta = json.loads((pageindex_workspace / "_meta.json").read_text(encoding="utf-8"))
assert "doc_failed_add_md" not in meta
assert "doc_unrelated" in meta
def test_add_failure_after_summary_vector_rolls_back_catalog_and_vector(
tmp_path, monkeypatch
):
source = tmp_path / "post_vector.txt"
source.write_text("post vector rollback body", encoding="utf-8")
workspace = tmp_path / "workspace"
filesystem = make_filesystem(workspace)
def fail_status_update(*args, **kwargs):
raise RuntimeError("metadata status update failed")
monkeypatch.setattr(filesystem.store, "update_file_metadata_status", fail_status_update)
with pytest.raises(RuntimeError, match="metadata status update failed"):
filesystem.add_file(source, "/documents")
assert filesystem.browse("/", recursive=True)["files"] == []
assert filesystem.summary_projection_indexer.index.info()["document_count"] == 0
assert not list((workspace / "artifacts" / "uploads").glob("**/*"))
assert not list((workspace / "artifacts" / "text").glob("*.txt"))
assert not list((workspace / "artifacts" / "raw").glob("*.json"))
def test_add_failure_removes_nested_folders_created_only_for_add(tmp_path, monkeypatch):
source = tmp_path / "nested.txt"
source.write_text("nested rollback body", encoding="utf-8")
workspace = tmp_path / "workspace"
filesystem = make_filesystem(workspace)
def fail_status_update(*args, **kwargs):
raise RuntimeError("metadata status update failed")
monkeypatch.setattr(filesystem.store, "update_file_metadata_status", fail_status_update)
with pytest.raises(RuntimeError, match="metadata status update failed"):
filesystem.add_file(source, "/documents/reports")
listing = filesystem.browse("/", recursive=True)
assert listing["files"] == []
assert listing["folders"] == []
assert filesystem.summary_projection_indexer.index.info()["document_count"] == 0
assert not list((workspace / "artifacts" / "uploads").glob("**/*"))
assert not list((workspace / "artifacts" / "text").glob("*.txt"))
assert not list((workspace / "artifacts" / "raw").glob("*.json"))
def test_add_failure_preserves_preexisting_parent_folder(tmp_path, monkeypatch):
source = tmp_path / "nested.txt"
source.write_text("nested rollback body", encoding="utf-8")
workspace = tmp_path / "workspace"
filesystem = make_filesystem(workspace)
filesystem.create_folder("/documents")
def fail_status_update(*args, **kwargs):
raise RuntimeError("metadata status update failed")
monkeypatch.setattr(filesystem.store, "update_file_metadata_status", fail_status_update)
with pytest.raises(RuntimeError, match="metadata status update failed"):
filesystem.add_file(source, "/documents/reports")
listing = filesystem.browse("/", recursive=True)
assert listing["files"] == []
assert [folder["path"] for folder in listing["folders"]] == ["/documents"]
assert filesystem.summary_projection_indexer.index.info()["document_count"] == 0
def test_cli_add_uses_workspace_and_prints_added_file(monkeypatch, capsys, tmp_path):
from pageindex.filesystem import cli
source = tmp_path / "cli.txt"
source.write_text("cli body", encoding="utf-8")
calls = []
class FakeAddFileSystem:
def __init__(self, workspace):
self.workspace = Path(workspace)
def configure_existing_projection_retrieval(self):
return False
def add_file(self, physical_path, virtual_target):
calls.append((self.workspace, physical_path, virtual_target))
return {
"file_ref": "file_cli",
"path": "/documents/cli.txt",
"source_path": "documents/cli.txt",
"storage_uri": "file:///workspace/artifacts/uploads/file_cli/cli.txt",
}
monkeypatch.setattr(cli, "PageIndexFileSystem", FakeAddFileSystem)
status = cli.main(["--workspace", str(tmp_path / "workspace"), "add", str(source), "/documents"])
assert status == 0
assert calls == [(tmp_path / "workspace", str(source), "/documents")]
assert capsys.readouterr().out == (
"added: /documents/cli.txt\n"
"file_ref: file_cli\n"
"storage_uri: file:///workspace/artifacts/uploads/file_cli/cli.txt\n"
)