From adce9704e1f4f8e45dc3ca2fa557ca88d6f93eea Mon Sep 17 00:00:00 2001 From: BukeLy Date: Mon, 1 Jun 2026 15:53:41 +0800 Subject: [PATCH] feat(filesystem): add pifs semantic folder build --- pageindex/filesystem/cli.py | 34 ++ pageindex/filesystem/core.py | 216 +++++++++++- pageindex/filesystem/semantic_folder.py | 361 +++++++++++++++++++ pageindex/filesystem/store.py | 360 ++++++++++++++++++- tests/test_pifs_semantic_folder.py | 450 ++++++++++++++++++++++++ 5 files changed, 1411 insertions(+), 10 deletions(-) create mode 100644 pageindex/filesystem/semantic_folder.py create mode 100644 tests/test_pifs_semantic_folder.py diff --git a/pageindex/filesystem/cli.py b/pageindex/filesystem/cli.py index 8e13d1d..85da91d 100644 --- a/pageindex/filesystem/cli.py +++ b/pageindex/filesystem/cli.py @@ -295,6 +295,36 @@ def _run_add(argv: list[str], *, workspace: str) -> int: return 0 +def _run_semantic_folder(argv: list[str], *, workspace: str) -> int: + parser = argparse.ArgumentParser( + prog="pifs semantic-folder", + description="Build PIFS Semantic Folders", + ) + subparsers = parser.add_subparsers(dest="semantic_folder_command", required=True) + build_parser = subparsers.add_parser("build") + build_parser.add_argument("source_scope", nargs="?", default="/") + args = parser.parse_args(argv) + + if args.semantic_folder_command == "build": + filesystem = _filesystem_from_workspace(workspace) + result = filesystem.build_semantic_folder(args.source_scope) + print(f"source: {result['source']}") + print(f"mount: {result['mount']}") + print(f"template: {result['template']}") + print(f"files: {result['files']}") + print(f"memberships: {result['memberships']}") + print(f"skipped: {result['skipped']}") + print( + "metadata: " + f"cached={result['metadata_cached']} " + f"generating={result['metadata_generating']} " + f"failed={result['metadata_failed']}" + ) + print(f"planning: {result['planning']}") + return 0 + raise ValueError(f"unknown semantic-folder command: {args.semantic_folder_command}") + + def _run_set(argv: list[str]) -> int: parser = argparse.ArgumentParser( prog="pifs set", @@ -346,6 +376,10 @@ def main(argv: list[str] | None = None) -> int: if not args.workspace: parser.error("--workspace is required unless PIFS_WORKSPACE is set or `pifs set workspace ` has been run") return _run_add(command_args, workspace=args.workspace) + if command_name == "semantic-folder": + if not args.workspace: + parser.error("--workspace is required unless PIFS_WORKSPACE is set or `pifs set workspace ` has been run") + return _run_semantic_folder(command_args, workspace=args.workspace) if "--json" in command_tokens: command_tokens = [token for token in command_tokens if token != "--json"] diff --git a/pageindex/filesystem/core.py b/pageindex/filesystem/core.py index 91a4971..78c9971 100644 --- a/pageindex/filesystem/core.py +++ b/pageindex/filesystem/core.py @@ -4,6 +4,7 @@ import json import os import shutil import tempfile +import uuid from pathlib import Path, PurePosixPath from typing import TYPE_CHECKING, Any, Optional, Union from urllib.parse import unquote, urlparse @@ -23,6 +24,14 @@ from .store import ( metadata_text, normalize_path, ) +from .semantic_folder import ( + CANDIDATE_FIELDS as SEMANTIC_FOLDER_CANDIDATE_FIELDS, + OpenAISemanticFolderPlanner, + SemanticFolderBuildItem, + SemanticFolderPlanner, + semantic_mount_path, + validate_semantic_folder_plan, +) from .types import OpenResult, SearchResult if TYPE_CHECKING: @@ -321,6 +330,198 @@ class PageIndexFileSystem: "file_refs": file_refs, } + def build_semantic_folder( + self, + source_scope: str = "/", + *, + planner: SemanticFolderPlanner | None = None, + ) -> dict[str, Any]: + source_scope = normalize_path(source_scope or "/") + blocked_mount = self.store.semantic_generated_mount_containing(source_scope) + if blocked_mount is not None: + raise ValueError( + "Semantic Folder source scope must not be a semantic mount path " + f"or descendant: {source_scope}" + ) + self.store.folder_info(source_scope) + mount_path = semantic_mount_path(source_scope) + self.store.validate_semantic_mount_available( + source_scope=source_scope, + mount_path=mount_path, + ) + entries = self.store.semantic_source_file_entries(source_scope) + if not entries: + raise ValueError(f"No files found in Semantic Folder source scope: {source_scope}") + + records = [self._record_from_file_entry(entry) for entry in entries] + metadata_stats = self._ensure_semantic_folder_candidate_metadata(records) + item_file_refs: dict[str, str] = {} + items: list[SemanticFolderBuildItem] = [] + for index, record in enumerate(records, 1): + item_id = f"item_{index:04d}" + item_file_refs[item_id] = record["file_ref"] + metadata = record.get("metadata") or {} + items.append( + SemanticFolderBuildItem( + item_id=item_id, + title=str(record.get("title") or ""), + summary=str(metadata.get("summary") or ""), + domain=metadata.get("domain"), + topic=metadata.get("topic"), + ) + ) + planning_payload = { + "feature": "PIFS Semantic Folder", + "candidate_fields": list(SEMANTIC_FOLDER_CANDIDATE_FIELDS), + "membership_limit": 3, + "path_contract": "relative field/value segments under semantic mount path", + "items": [ + { + "item_id": item.item_id, + "title": item.title, + "summary": item.summary, + "domain": item.domain, + "topic": item.topic, + } + for item in items + ], + } + planner = planner or OpenAISemanticFolderPlanner() + raw_plan = planner.plan(planning_payload) + validated = validate_semantic_folder_plan( + raw_plan, + item_file_refs=item_file_refs, + ) + memberships = [ + { + "file_ref": membership.file_ref, + "item_id": membership.item_id, + "relative_path": membership.relative_path, + "confidence": membership.confidence, + "canonical_segments": membership.canonical_segments, + } + for membership in validated.memberships + ] + build_id = f"semantic_folder_{uuid.uuid4().hex}" + skipped = list(validated.skipped) + planned_item_ids = {membership.item_id for membership in validated.memberships} + explicitly_skipped = {item["item_id"] for item in skipped} + for item in items: + if item.item_id not in planned_item_ids and item.item_id not in explicitly_skipped: + skipped.append({"item_id": item.item_id, "reason": "not included in plan"}) + manifest = { + "build_id": build_id, + "source_scope": source_scope, + "mount_path": mount_path, + "template": validated.template, + "candidate_fields": list(SEMANTIC_FOLDER_CANDIDATE_FIELDS), + "canonical_values": validated.canonical_values, + "memberships": memberships, + "skipped": skipped, + "items": [ + { + "item_id": item.item_id, + "file_ref": item_file_refs[item.item_id], + "title": item.title, + "domain": item.domain, + "topic": item.topic, + } + for item in items + ], + "planner": { + "type": planner.__class__.__name__, + }, + } + self.store.apply_semantic_folder_build( + source_scope=source_scope, + mount_path=mount_path, + memberships=memberships, + manifest=manifest, + ) + return { + "source": source_scope, + "mount": mount_path, + "template": "/".join(validated.template), + "files": len(items), + "memberships": len(memberships), + "skipped": len(skipped), + "metadata_cached": metadata_stats["cached"], + "metadata_generating": metadata_stats["generating"], + "metadata_failed": metadata_stats["failed"], + "planning": "generated", + } + + def _ensure_semantic_folder_candidate_metadata( + self, + records: list[dict[str, Any]], + ) -> dict[str, int]: + self.metadata.register_schema( + { + "fields": { + field: {"type": DEFAULT_METADATA_FIELD_TYPES[field]} + for field in SEMANTIC_FOLDER_CANDIDATE_FIELDS + } + }, + source="pifs", + ) + cached = 0 + generating = 0 + failed = 0 + for record in records: + fields = [ + field + for field in SEMANTIC_FOLDER_CANDIDATE_FIELDS + if not self._semantic_candidate_field_ready(record, field) + ] + cached += len(SEMANTIC_FOLDER_CANDIDATE_FIELDS) - len(fields) + if not fields: + continue + generating += len(fields) + status = record["metadata_status"] + policy_fields = status.setdefault("policy", {}).setdefault("fields", {}) + status_fields = status.setdefault("fields", {}) + for field in fields: + policy_fields[field] = True + status_fields[field] = { + "requested": True, + "status": "pending_generate", + "owner": "pifs", + "source": "llm", + } + if self.metadata_generator is None: + self.metadata_generator = MetadataGenerator( + provider=self.metadata_provider, + model=self.metadata_model, + base_url=self.metadata_base_url, + max_text_chars=self.metadata_max_text_chars, + ) + self._generate_register_metadata(record, force=True) + self.store.update_file_metadata_status( + record["file_ref"], + metadata=record["metadata"], + metadata_status=record["metadata_status"], + ) + for field in fields: + if self._semantic_candidate_field_ready(record, field): + continue + failed += 1 + return {"cached": cached, "generating": generating, "failed": failed} + + @staticmethod + def _semantic_candidate_field_ready(record: dict[str, Any], field: str) -> bool: + value = (record.get("metadata") or {}).get(field) + if value is None or value == "" or value == []: + return False + field_status = ( + (record.get("metadata_status") or {}) + .get("fields", {}) + .get(field, {}) + ) + status = field_status.get("status") + if status is None: + return True + return status == "generated" + def _ensure_register_completion_defaults(self) -> None: if self.metadata_generator is None: self.metadata_generator = MetadataGenerator( @@ -606,6 +807,7 @@ class PageIndexFileSystem: path, entry.folder_path, ) + display_title = self.store.membership_display_name(file_ref, folder_path) or entry.title rank = len(rows) + 1 rows.append( { @@ -620,7 +822,8 @@ class PageIndexFileSystem: "file_ref": file_ref, "document_id": entry.external_id, "external_id": entry.external_id, - "title": entry.title, + "title": display_title, + "original_title": entry.title, "folder_path": folder_path, "folder_paths": folder_paths, "summary": str((entry.metadata or {}).get("summary") or ""), @@ -715,11 +918,12 @@ class PageIndexFileSystem: for folder in self.store.folder_memberships(row["file_ref"]) ] folder_path = self._preferred_folder_path(folder_paths, scope_path, row["folder_path"]) + display_title = self.store.membership_display_name(row["file_ref"], folder_path) or row["title"] results.append( SearchResult( file_ref=row["file_ref"], external_id=row["external_id"], - title=row["title"], + title=display_title, snippet=row["snippet"], folder_path=folder_path, folder_paths=folder_paths, @@ -727,7 +931,7 @@ class PageIndexFileSystem: metadata_status=row["metadata_status"], id=row["id"], document_id=row["document_id"], - name=row["name"], + name=display_title, description=row["description"], status=row["status"], pageNum=row["pageNum"], @@ -1758,7 +1962,11 @@ class PageIndexFileSystem: folder_path: str | None = None, ) -> str: folder_path = normalize_path(folder_path or getattr(entry, "folder_path", None) or "/") - title = str(getattr(entry, "title", "") or "").strip() + title = str( + self.store.membership_display_name(file_ref, folder_path) + or getattr(entry, "title", "") + or "" + ).strip() if not title: raise RuntimeError(f"browse cannot build a virtual path for {file_ref}: missing title") target = self._join_virtual_file_path(folder_path, title.strip("/")) diff --git a/pageindex/filesystem/semantic_folder.py b/pageindex/filesystem/semantic_folder.py new file mode 100644 index 0000000..8ce34f4 --- /dev/null +++ b/pageindex/filesystem/semantic_folder.py @@ -0,0 +1,361 @@ +from __future__ import annotations + +import json +import os +import re +from dataclasses import dataclass, field +from typing import Any, Protocol + + +CANDIDATE_FIELDS = ("domain", "topic") +MEMBERSHIP_LIMIT = 3 +SEGMENT_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{0,127}$") + + +class SemanticFolderPlanError(ValueError): + pass + + +@dataclass(frozen=True) +class SemanticFolderBuildItem: + item_id: str + title: str + summary: str + domain: Any = None + topic: Any = None + + +@dataclass(frozen=True) +class SemanticFolderMembership: + item_id: str + file_ref: str + relative_path: str + confidence: float | None = None + canonical_segments: list[dict[str, str]] = field(default_factory=list) + + +@dataclass(frozen=True) +class SemanticFolderValidatedPlan: + template: list[str] + canonical_values: list[dict[str, str]] + memberships: list[SemanticFolderMembership] + skipped: list[dict[str, str]] + raw_plan: dict[str, Any] + + +class SemanticFolderPlanner(Protocol): + def plan(self, payload: dict[str, Any]) -> dict[str, Any]: + ... + + +class OpenAISemanticFolderPlanner: + def __init__( + self, + *, + model: str | None = None, + base_url: str | None = None, + ): + self.model = ( + model + or os.environ.get("PIFS_SEMANTIC_FOLDER_MODEL") + or os.environ.get("PIFS_METADATA_MODEL") + or "gpt-5-nano" + ) + self.base_url = ( + base_url + if base_url is not None + else os.environ.get("PIFS_METADATA_BASE_URL") or os.environ.get("OPENAI_BASE_URL") + ) + + def plan(self, payload: dict[str, Any]) -> dict[str, Any]: + api_key = ( + os.environ.get("PIFS_SEMANTIC_FOLDER_API_KEY") + or os.environ.get("PIFS_METADATA_API_KEY") + or os.environ.get("OPENAI_API_KEY") + ) + if not api_key: + raise SemanticFolderPlanError( + "PIFS_SEMANTIC_FOLDER_API_KEY, PIFS_METADATA_API_KEY, or OPENAI_API_KEY " + "is required for PIFS Semantic Folder planning" + ) + + from openai import OpenAI + + client = OpenAI(api_key=api_key, base_url=self.base_url or None) + response = client.chat.completions.create( + model=self.model, + messages=[ + { + "role": "system", + "content": ( + "Plan a PIFS Semantic Folder from document-level metadata. " + "Use only the provided transient item ids, title, summary, domain, and topic. " + "Do not infer from storage paths or original folders. " + "Choose a useful field/value folder template using domain and topic, " + "canonicalize display values, provide path-safe slugs, and reduce each " + "document to at most three semantic memberships. Return strict JSON only." + ), + }, + { + "role": "user", + "content": json.dumps(payload, ensure_ascii=False), + }, + ], + response_format=self._response_format(), + ) + return json.loads(response.choices[0].message.content or "{}") + + @staticmethod + def _response_format() -> dict[str, Any]: + return { + "type": "json_schema", + "json_schema": { + "name": "pifs_semantic_folder_plan", + "strict": True, + "schema": { + "type": "object", + "additionalProperties": False, + "required": ["template", "canonical_values", "memberships", "skipped"], + "properties": { + "template": { + "type": "array", + "items": {"type": "string", "enum": list(CANDIDATE_FIELDS)}, + }, + "canonical_values": { + "type": "array", + "items": { + "type": "object", + "additionalProperties": False, + "required": ["field", "display", "slug"], + "properties": { + "field": {"type": "string", "enum": list(CANDIDATE_FIELDS)}, + "display": {"type": "string"}, + "slug": {"type": "string"}, + }, + }, + }, + "memberships": { + "type": "array", + "items": { + "type": "object", + "additionalProperties": False, + "required": ["item_id", "paths"], + "properties": { + "item_id": {"type": "string"}, + "paths": { + "type": "array", + "items": {"type": "string"}, + }, + "confidence": {"type": ["number", "null"]}, + }, + }, + }, + "skipped": { + "type": "array", + "items": { + "type": "object", + "additionalProperties": False, + "required": ["item_id", "reason"], + "properties": { + "item_id": {"type": "string"}, + "reason": {"type": "string"}, + }, + }, + }, + }, + }, + }, + } + + +def semantic_mount_path(source_scope: str) -> str: + source_scope = _normalize_path(source_scope) + return "/semantic" if source_scope == "/" else f"{source_scope}/semantic" + + +def validate_semantic_folder_plan( + plan: dict[str, Any], + *, + item_file_refs: dict[str, str], +) -> SemanticFolderValidatedPlan: + if not isinstance(plan, dict): + raise SemanticFolderPlanError("Semantic Folder planner returned a non-object plan") + template = _validate_template(plan.get("template")) + canonical_values = _validate_canonical_values(plan.get("canonical_values")) + canonical_lookup = { + (item["field"], item["slug"]): item for item in canonical_values + } + memberships: list[SemanticFolderMembership] = [] + seen_item_paths: set[tuple[str, str]] = set() + per_item_count: dict[str, int] = {} + for item in _required_list(plan.get("memberships"), "memberships"): + if not isinstance(item, dict): + raise SemanticFolderPlanError("Semantic Folder membership entries must be objects") + item_id = str(item.get("item_id") or "").strip() + if item_id not in item_file_refs: + raise SemanticFolderPlanError(f"Unknown Semantic Folder build item: {item_id}") + paths = item.get("paths") + if not isinstance(paths, list): + raise SemanticFolderPlanError(f"Semantic Folder membership {item_id} paths must be a list") + confidence = _optional_float(item.get("confidence")) + for raw_path in paths: + relative_path, canonical_segments = _validate_membership_path( + raw_path, + template=template, + canonical_lookup=canonical_lookup, + ) + key = (item_id, relative_path) + if key in seen_item_paths: + raise SemanticFolderPlanError( + f"Duplicate Semantic Folder membership for {item_id}: {relative_path}" + ) + seen_item_paths.add(key) + per_item_count[item_id] = per_item_count.get(item_id, 0) + 1 + if per_item_count[item_id] > MEMBERSHIP_LIMIT: + raise SemanticFolderPlanError( + f"Semantic Folder membership limit exceeded for {item_id}: " + f"max {MEMBERSHIP_LIMIT}" + ) + memberships.append( + SemanticFolderMembership( + item_id=item_id, + file_ref=item_file_refs[item_id], + relative_path=relative_path, + confidence=confidence, + canonical_segments=canonical_segments, + ) + ) + skipped = _validate_skipped(plan.get("skipped"), item_file_refs) + if not memberships: + raise SemanticFolderPlanError("No useful Semantic Folder hierarchy was planned") + return SemanticFolderValidatedPlan( + template=template, + canonical_values=canonical_values, + memberships=memberships, + skipped=skipped, + raw_plan=plan, + ) + + +def _validate_template(value: Any) -> list[str]: + if not isinstance(value, list) or not value: + raise SemanticFolderPlanError("Semantic Folder plan template must select at least one field") + template: list[str] = [] + for field in value: + field = str(field) + if field not in CANDIDATE_FIELDS: + raise SemanticFolderPlanError(f"Unsupported Semantic Folder field: {field}") + if field in template: + raise SemanticFolderPlanError(f"Duplicate Semantic Folder template field: {field}") + template.append(field) + return template + + +def _validate_canonical_values(value: Any) -> list[dict[str, str]]: + rows = _required_list(value, "canonical_values") + seen_slug: dict[tuple[str, str], str] = {} + canonical_values: list[dict[str, str]] = [] + for row in rows: + if not isinstance(row, dict): + raise SemanticFolderPlanError("Semantic Folder canonical values must be objects") + field = str(row.get("field") or "").strip() + display = str(row.get("display") or "").strip() + slug = str(row.get("slug") or "").strip() + if field not in CANDIDATE_FIELDS: + raise SemanticFolderPlanError(f"Unsupported Semantic Folder canonical field: {field}") + if not display: + raise SemanticFolderPlanError("Semantic Folder canonical display value is required") + _validate_segment(slug, label=f"{field} slug") + key = (field, slug) + previous = seen_slug.get(key) + if previous is not None and previous != display: + raise SemanticFolderPlanError( + f"Semantic Folder segment collision for {field}/{slug}: " + f"{previous!r} and {display!r}" + ) + seen_slug[key] = display + canonical_values.append({"field": field, "display": display, "slug": slug}) + return canonical_values + + +def _validate_membership_path( + value: Any, + *, + template: list[str], + canonical_lookup: dict[tuple[str, str], dict[str, str]], +) -> tuple[str, list[dict[str, str]]]: + raw_path = str(value or "").strip() + if not raw_path: + raise SemanticFolderPlanError("Semantic Folder membership path is required") + if raw_path.startswith("/"): + raise SemanticFolderPlanError(f"Semantic Folder membership path must be relative: {raw_path}") + parts = raw_path.split("/") + if len(parts) % 2: + raise SemanticFolderPlanError( + f"Semantic Folder membership path must use field/value segments: {raw_path}" + ) + canonical_segments: list[dict[str, str]] = [] + fields = parts[0::2] + values = parts[1::2] + if fields != template[: len(fields)]: + raise SemanticFolderPlanError( + f"Semantic Folder membership path does not match selected template: {raw_path}" + ) + for field, slug in zip(fields, values): + _validate_segment(field, label="field segment") + _validate_segment(slug, label=f"{field} value segment") + if field not in CANDIDATE_FIELDS: + raise SemanticFolderPlanError(f"Unsupported Semantic Folder field segment: {field}") + canonical = canonical_lookup.get((field, slug)) + if canonical is None: + raise SemanticFolderPlanError( + f"Semantic Folder path uses undeclared canonical value: {field}/{slug}" + ) + canonical_segments.append(canonical) + return "/".join(parts), canonical_segments + + +def _validate_segment(segment: str, *, label: str) -> None: + if not segment or segment in {".", ".."}: + raise SemanticFolderPlanError(f"Unsafe Semantic Folder {label}: {segment!r}") + if "/" in segment or "\\" in segment or "=" in segment: + raise SemanticFolderPlanError(f"Unsafe Semantic Folder {label}: {segment!r}") + if segment.lower() in {"unknown", "misc", "uncategorized"}: + raise SemanticFolderPlanError( + f"Semantic Folder plan must skip missing values instead of using {segment!r}" + ) + if not SEGMENT_RE.fullmatch(segment): + raise SemanticFolderPlanError(f"Unsafe Semantic Folder {label}: {segment!r}") + + +def _validate_skipped(value: Any, item_file_refs: dict[str, str]) -> list[dict[str, str]]: + skipped: list[dict[str, str]] = [] + for row in _required_list(value, "skipped"): + if not isinstance(row, dict): + raise SemanticFolderPlanError("Semantic Folder skipped entries must be objects") + item_id = str(row.get("item_id") or "").strip() + if item_id not in item_file_refs: + raise SemanticFolderPlanError(f"Unknown skipped Semantic Folder build item: {item_id}") + reason = str(row.get("reason") or "").strip() or "skipped" + skipped.append({"item_id": item_id, "reason": reason}) + return skipped + + +def _required_list(value: Any, name: str) -> list[Any]: + if not isinstance(value, list): + raise SemanticFolderPlanError(f"Semantic Folder plan {name} must be a list") + return value + + +def _optional_float(value: Any) -> float | None: + if value is None: + return None + try: + return float(value) + except (TypeError, ValueError) as exc: + raise SemanticFolderPlanError("Semantic Folder confidence must be numeric") from exc + + +def _normalize_path(path: str) -> str: + parts = [part for part in str(path or "/").replace("\\", "/").split("/") if part and part != "."] + return "/" + "/".join(parts) if parts else "/" diff --git a/pageindex/filesystem/store.py b/pageindex/filesystem/store.py index b1754da..b3031c6 100644 --- a/pageindex/filesystem/store.py +++ b/pageindex/filesystem/store.py @@ -82,6 +82,14 @@ class SQLiteFileSystemStore: FOREIGN KEY(folder_id) REFERENCES folders(folder_id) ON DELETE CASCADE ); + CREATE TABLE IF NOT EXISTS semantic_folder_manifests ( + build_id TEXT PRIMARY KEY, + source_scope TEXT NOT NULL, + mount_path TEXT NOT NULL, + manifest_json TEXT NOT NULL, + created_at TEXT DEFAULT CURRENT_TIMESTAMP + ); + CREATE TABLE IF NOT EXISTS metadata_schema ( schema_id TEXT PRIMARY KEY, scope_path TEXT, @@ -127,6 +135,8 @@ class SQLiteFileSystemStore: CREATE INDEX IF NOT EXISTS idx_folders_path ON folders(path); CREATE INDEX IF NOT EXISTS idx_folders_parent_id ON folders(parent_id); CREATE INDEX IF NOT EXISTS idx_file_folders_folder ON file_folders(folder_id); + CREATE INDEX IF NOT EXISTS idx_semantic_folder_manifests_scope + ON semantic_folder_manifests(source_scope, created_at); CREATE INDEX IF NOT EXISTS idx_metadata_fields_name ON metadata_fields(name); CREATE INDEX IF NOT EXISTS idx_metadata_values_field_text ON metadata_values(field_id, value_text); CREATE INDEX IF NOT EXISTS idx_metadata_values_field_number ON metadata_values(field_id, value_number); @@ -392,6 +402,326 @@ class SQLiteFileSystemStore: ), ) + def semantic_generated_mount_containing(self, path: str) -> str | None: + path = normalize_path(path) + with self.connect() as conn: + row = conn.execute( + f""" + SELECT path + FROM folders + WHERE kind = 'generated' + AND json_extract(metadata_json, '$.generator') = 'pifs_semantic_folder' + AND json_extract(metadata_json, '$.mount_role') = 'semantic_mount' + AND (path = ? OR ? LIKE {self._descendant_like_sql_expr("path")} ESCAPE '\\') + ORDER BY LENGTH(path) DESC, path DESC + LIMIT 1 + """, + (path, path), + ).fetchone() + return None if row is None else str(row["path"]) + + def semantic_source_file_entries(self, source_scope: str) -> list[FileEntry]: + source_scope = normalize_path(source_scope) + with self.connect() as conn: + folder = self._folder_by_path(conn, source_scope) + if folder is None: + raise KeyError(f"Unknown folder path: {source_scope}") + rows = conn.execute( + f""" + SELECT + f.file_ref, + f.external_id, + f.storage_uri, + f.title, + f.descriptor, + f.content_type, + f.source_type, + f.fingerprint, + f.text_artifact_path, + f.raw_artifact_path, + f.pageindex_doc_id, + f.pageindex_tree_status, + f.metadata_json, + f.metadata_status_json, + MIN(scope_folder.path) AS folder_path + FROM files f + JOIN file_folders scope_ff ON scope_ff.file_ref = f.file_ref + JOIN folders scope_folder ON scope_folder.folder_id = scope_ff.folder_id + WHERE f.deleted_at IS NULL + AND ( + scope_folder.path = ? + OR scope_folder.path LIKE ? ESCAPE '\\' + ) + AND NOT EXISTS ( + SELECT 1 + FROM folders excluded + WHERE excluded.kind = 'generated' + AND json_extract(excluded.metadata_json, '$.generator') = 'pifs_semantic_folder' + AND json_extract(excluded.metadata_json, '$.mount_role') = 'semantic_mount' + AND ( + scope_folder.path = excluded.path + OR scope_folder.path LIKE {self._descendant_like_sql_expr("excluded.path")} ESCAPE '\\' + ) + ) + GROUP BY f.file_ref + ORDER BY f.file_ref + """, + (source_scope, self._descendant_like(source_scope)), + ).fetchall() + return [self._file_entry(row) for row in rows] + + def apply_semantic_folder_build( + self, + *, + source_scope: str, + mount_path: str, + memberships: list[dict[str, Any]], + manifest: dict[str, Any], + ) -> None: + source_scope = normalize_path(source_scope) + mount_path = normalize_path(mount_path) + build_id = str(manifest["build_id"]) + with self.connect() as conn: + source = self._folder_by_path(conn, source_scope) + if source is None: + raise KeyError(f"Unknown folder path: {source_scope}") + self._validate_semantic_mount_conflict( + conn, + source_scope=source_scope, + mount_path=mount_path, + ) + self._delete_semantic_mount_tree( + conn, + source_scope=source_scope, + mount_path=mount_path, + ) + mount_metadata = { + "generator": "pifs_semantic_folder", + "mount_role": "semantic_mount", + "source_scope": source_scope, + "mount_path": mount_path, + "build_id": build_id, + } + self._ensure_generated_folder_path( + conn, + mount_path, + stop_parent=source_scope, + metadata=mount_metadata, + ) + leaf_groups: dict[str, list[dict[str, Any]]] = {} + for membership in memberships: + leaf_path = normalize_path(f"{mount_path}/{membership['relative_path']}") + leaf_groups.setdefault(leaf_path, []).append(membership) + display_names: dict[tuple[str, str], str] = {} + for leaf_path, items in leaf_groups.items(): + titles: dict[str, list[str]] = {} + for item in items: + title = self._file_title(conn, str(item["file_ref"])) + titles.setdefault(title, []).append(str(item["file_ref"])) + for item in items: + title = self._file_title(conn, str(item["file_ref"])) + display = title + if len(titles[title]) > 1: + display = self._semantic_display_name(title, str(item["file_ref"])) + display_names[(str(item["file_ref"]), leaf_path)] = display + + for leaf_path, items in leaf_groups.items(): + folder_metadata = { + "generator": "pifs_semantic_folder", + "mount_role": "semantic_branch", + "source_scope": source_scope, + "mount_path": mount_path, + "build_id": build_id, + } + self._ensure_generated_folder_path( + conn, + leaf_path, + stop_parent=mount_path, + metadata=folder_metadata, + ) + folder_id = self._resolve_or_create_folder(conn, leaf_path) + used_display_names: set[str] = set() + for item in items: + file_ref = self._resolve_file_ref(conn, str(item["file_ref"])) + display_name = display_names[(file_ref, leaf_path)] + if display_name in used_display_names: + raise FileExistsError(f"Semantic Folder display name collision at {leaf_path}") + used_display_names.add(display_name) + membership_metadata = { + "generator": "pifs_semantic_folder", + "source_scope": source_scope, + "mount_path": mount_path, + "build_id": build_id, + "relative_path": item["relative_path"], + "display_name": display_name, + "canonical_segments": item.get("canonical_segments") or [], + } + if item.get("confidence") is not None: + membership_metadata["confidence"] = item["confidence"] + conn.execute( + """ + INSERT INTO file_folders(file_ref, folder_id, metadata_json) + VALUES (?, ?, ?) + ON CONFLICT(file_ref, folder_id) DO UPDATE SET + metadata_json = excluded.metadata_json + """, + ( + file_ref, + folder_id, + json.dumps(membership_metadata, ensure_ascii=False), + ), + ) + conn.execute( + """ + INSERT INTO semantic_folder_manifests( + build_id, source_scope, mount_path, manifest_json + ) VALUES (?, ?, ?, ?) + """, + ( + build_id, + source_scope, + mount_path, + json.dumps(manifest, ensure_ascii=False), + ), + ) + + def validate_semantic_mount_available(self, *, source_scope: str, mount_path: str) -> None: + with self.connect() as conn: + self._validate_semantic_mount_conflict( + conn, + source_scope=normalize_path(source_scope), + mount_path=normalize_path(mount_path), + ) + + def membership_display_name(self, file_ref: str, folder_path: str) -> str | None: + folder_path = normalize_path(folder_path) + with self.connect() as conn: + row = conn.execute( + """ + SELECT ff.metadata_json, f.title + FROM file_folders ff + JOIN folders fo ON fo.folder_id = ff.folder_id + JOIN files f ON f.file_ref = ff.file_ref + WHERE ff.file_ref = ? + AND fo.path = ? + AND f.deleted_at IS NULL + LIMIT 1 + """, + (file_ref, folder_path), + ).fetchone() + if row is None: + return None + metadata = self._json_object(row["metadata_json"]) + return str(metadata.get("display_name") or row["title"] or "").strip() or None + + def _validate_semantic_mount_conflict( + self, + conn: sqlite3.Connection, + *, + source_scope: str, + mount_path: str, + ) -> None: + row = self._folder_by_path(conn, mount_path) + if row is None: + return + metadata = self._json_object(row["metadata_json"]) + if ( + row["kind"] == "generated" + and metadata.get("generator") == "pifs_semantic_folder" + and metadata.get("mount_role") == "semantic_mount" + and metadata.get("source_scope") == source_scope + and metadata.get("mount_path") == mount_path + ): + return + raise FileExistsError( + f"Semantic mount path already exists as a non-generated folder: {mount_path}" + ) + + def _delete_semantic_mount_tree( + self, + conn: sqlite3.Connection, + *, + source_scope: str, + mount_path: str, + ) -> None: + rows = conn.execute( + """ + SELECT path, kind, metadata_json + FROM folders + WHERE path = ? OR path LIKE ? ESCAPE '\\' + ORDER BY LENGTH(path) DESC + """, + (mount_path, self._descendant_like(mount_path)), + ).fetchall() + for row in rows: + metadata = self._json_object(row["metadata_json"]) + if not ( + row["kind"] == "generated" + and metadata.get("generator") == "pifs_semantic_folder" + and metadata.get("source_scope") == source_scope + and metadata.get("mount_path") == mount_path + ): + raise FileExistsError( + f"Semantic mount path contains non-generated content: {row['path']}" + ) + for row in rows: + conn.execute("DELETE FROM folders WHERE path = ?", (row["path"],)) + + def _ensure_generated_folder_path( + self, + conn: sqlite3.Connection, + path: str, + *, + stop_parent: str, + metadata: dict[str, Any], + ) -> str: + path = normalize_path(path) + stop_parent = normalize_path(stop_parent) + if path == stop_parent: + row = self._folder_by_path(conn, path) + if row is None: + raise KeyError(f"Unknown semantic folder parent: {stop_parent}") + return row["folder_id"] + parent_path = normalize_path(str(Path(path).parent)) + if parent_path != stop_parent: + parent_id = self._ensure_generated_folder_path( + conn, + parent_path, + stop_parent=stop_parent, + metadata={ + "generator": "pifs_semantic_folder", + "mount_role": "semantic_branch", + "source_scope": metadata["source_scope"], + "mount_path": metadata["mount_path"], + "build_id": metadata["build_id"], + }, + ) + else: + parent = self._folder_by_path(conn, parent_path) + if parent is None: + raise KeyError(f"Unknown semantic folder parent: {parent_path}") + parent_id = parent["folder_id"] + folder_id = self.folder_id(path) + self._upsert_folder_row( + conn, + folder_id=folder_id, + parent_id=parent_id, + name=path.rsplit("/", 1)[-1], + path=path, + kind="generated", + description="PIFS Semantic Folder", + metadata_json=json.dumps(metadata, ensure_ascii=False), + ) + return folder_id + + @staticmethod + def _semantic_display_name(title: str, file_ref: str) -> str: + suffix = file_ref.replace("file_", "")[:8] + path = Path(title) + if path.suffix: + return f"{path.stem} [{suffix}]{path.suffix}" + return f"{title} [{suffix}]" + def _ensure_title_available_in_folder( self, conn: sqlite3.Connection, @@ -1208,9 +1538,19 @@ class SQLiteFileSystemStore: f.file_ref, f.external_id, f.title, + COALESCE( + NULLIF(json_extract(ff.metadata_json, '$.display_name'), ''), + f.title + ) AS display_title, pf.path AS folder_path, (CASE WHEN pf.path = '/' THEN '/' ELSE pf.path || '/' END) - || ltrim(f.title, '/') AS title_virtual_path + || ltrim( + COALESCE( + NULLIF(json_extract(ff.metadata_json, '$.display_name'), ''), + f.title + ), + '/' + ) AS title_virtual_path FROM files f JOIN file_folders ff ON ff.file_ref = f.file_ref JOIN folders pf ON pf.folder_id = ff.folder_id @@ -1219,11 +1559,11 @@ class SQLiteFileSystemStore: SELECT file_ref, external_id, - title, + display_title AS title, MIN(folder_path) AS folder_path FROM virtual_matches WHERE title_virtual_path = ? - GROUP BY file_ref, external_id, title + GROUP BY file_ref, external_id, display_title ORDER BY file_ref LIMIT 2 """, @@ -1629,7 +1969,13 @@ class SQLiteFileSystemStore: f.metadata_status_json, f.created_at, MIN(pf.folder_id) AS folder_id, - MIN(pf.path) AS folder_path + MIN(pf.path) AS folder_path, + MIN( + COALESCE( + NULLIF(json_extract(ff.metadata_json, '$.display_name'), ''), + f.title + ) + ) AS display_title FROM files f JOIN file_folders ff ON ff.file_ref = f.file_ref JOIN folders pf ON pf.folder_id = ff.folder_id @@ -1823,13 +2169,15 @@ class SQLiteFileSystemStore: @classmethod def _file_summary(cls, row: sqlite3.Row) -> dict[str, Any]: external_id = row["external_id"] + display_title = cls._row_value(row, "display_title", row["title"]) return { "file_ref": row["file_ref"], "id": external_id or row["file_ref"], "document_id": external_id, "external_id": external_id, - "name": row["title"], - "title": row["title"], + "name": display_title, + "title": display_title, + "original_title": row["title"], "description": cls._row_value(row, "descriptor", row["title"]), "status": cls._row_value(row, "pageindex_tree_status", "not_built"), "pageNum": None, diff --git a/tests/test_pifs_semantic_folder.py b/tests/test_pifs_semantic_folder.py new file mode 100644 index 0000000..bb781ef --- /dev/null +++ b/tests/test_pifs_semantic_folder.py @@ -0,0 +1,450 @@ +from __future__ import annotations + +import json +from dataclasses import dataclass +from typing import Any + +import pytest + + +class MetadataGenerator: + def __init__(self, values_by_title: dict[str, dict[str, Any]]): + self.values_by_title = values_by_title + self.calls: list[tuple[str, tuple[str, ...]]] = [] + + def generate(self, request, *, fields): + self.calls.append((request.title, tuple(fields))) + values = self.values_by_title[request.title] + return {field: values[field] for field in fields} + + +class TitlePlanner: + def __init__(self, paths_by_title: dict[str, list[str]], *, template=None): + self.paths_by_title = paths_by_title + self.template = template or ["domain", "topic"] + self.payloads: list[dict[str, Any]] = [] + + def plan(self, payload): + self.payloads.append(payload) + canonical_values = [ + {"field": "domain", "display": "Finance", "slug": "finance"}, + {"field": "domain", "display": "Technology", "slug": "technology"}, + {"field": "topic", "display": "Rates", "slug": "rates"}, + {"field": "topic", "display": "GPU Accelerators", "slug": "gpu-accelerators"}, + {"field": "topic", "display": "Credit", "slug": "credit"}, + ] + memberships = [] + skipped = [] + for item in payload["items"]: + paths = self.paths_by_title.get(item["title"], []) + if paths: + memberships.append({"item_id": item["item_id"], "paths": paths, "confidence": 0.91}) + else: + skipped.append({"item_id": item["item_id"], "reason": "missing first field"}) + return { + "template": self.template, + "canonical_values": canonical_values, + "memberships": memberships, + "skipped": skipped, + } + + +@dataclass +class Candidate: + document_id: str + score: float = 0.8 + snippet: str = "" + sources: list[dict[str, Any]] | None = None + + +class BrowseBackend: + semantic_tool_channels = ("summary",) + + def __init__(self, document_ids): + self.document_ids = document_ids + + def available_channels(self): + return ("summary",) + + def search_channel(self, channel, query, *, limit, filters=None): + rows = [] + for document_id in self.document_ids: + rows.append(Candidate(document_id=document_id, sources=[{"distance": 0.25}])) + return rows[:limit] + + +def _filesystem(tmp_path, values_by_title=None): + from pageindex.filesystem import PageIndexFileSystem + + return PageIndexFileSystem( + tmp_path / "workspace", + metadata_generator=MetadataGenerator(values_by_title or {}), + summary_projection_index=False, + ) + + +def _register_generated_file(filesystem, title, *, folder="/documents", external_id=None): + values = filesystem.metadata_generator.values_by_title + values.setdefault( + title, + { + "summary": f"Summary for {title}", + "domain": "Finance", + "topic": "Rates", + }, + ) + return filesystem.register_file( + storage_uri=f"file:///tmp/{title}.txt", + folder_path=folder, + external_id=external_id or title.lower().replace(" ", "_"), + title=title, + content=f"{title} evidence about rates and GPUs.", + content_type="text/plain", + metadata_policy={ + "fields": { + "summary": True, + "doc_type": False, + "domain": True, + "topic": True, + "entity": False, + "relation": False, + }, + "projection_indexes": {"summary": False}, + "batch": False, + }, + ) + + +def test_semantic_folder_build_materializes_scope_relative_mount_and_memberships(tmp_path): + filesystem = _filesystem( + tmp_path, + { + "Rates": {"summary": "Central bank rate summary", "domain": "Finance", "topic": "Rates"}, + "GPU": {"summary": "Accelerator summary", "domain": "Technology", "topic": "GPU Accelerators"}, + }, + ) + rates_ref = _register_generated_file(filesystem, "Rates", external_id="doc_rates") + gpu_ref = _register_generated_file(filesystem, "GPU", folder="/documents/sec-filings", external_id="doc_gpu") + planner = TitlePlanner( + { + "Rates": ["domain/finance/topic/rates"], + "GPU": ["domain/technology/topic/gpu-accelerators"], + } + ) + + result = filesystem.build_semantic_folder("/", planner=planner) + + assert result == { + "source": "/", + "mount": "/semantic", + "template": "domain/topic", + "files": 2, + "memberships": 2, + "skipped": 0, + "metadata_cached": 4, + "metadata_generating": 0, + "metadata_failed": 0, + "planning": "generated", + } + assert filesystem.store.resolve_file_ref("/semantic/domain/finance/topic/rates/Rates") == rates_ref + assert ( + filesystem.store.resolve_file_ref( + "/semantic/domain/technology/topic/gpu-accelerators/GPU" + ) + == gpu_ref + ) + assert filesystem.store.get_file(rates_ref).file_ref == rates_ref + memberships = filesystem.store.folder_memberships(rates_ref) + assert sorted(folder["path"] for folder in memberships) == [ + "/documents", + "/semantic/domain/finance/topic/rates", + ] + + payload_item = planner.payloads[0]["items"][0] + assert set(payload_item) == {"item_id", "title", "summary", "domain", "topic"} + assert "file_ref" not in json.dumps(planner.payloads[0]) + assert "storage_uri" not in json.dumps(planner.payloads[0]) + assert "/documents" not in json.dumps(planner.payloads[0]) + + +def test_semantic_folder_build_uses_scope_relative_mount_and_rejects_conflict(tmp_path): + filesystem = _filesystem( + tmp_path, + { + "Report": {"summary": "Report summary", "domain": "Finance", "topic": "Credit"}, + }, + ) + _register_generated_file(filesystem, "Report", folder="/documents/sec-filings") + planner = TitlePlanner({"Report": ["domain/finance/topic/credit"]}) + + result = filesystem.build_semantic_folder("/documents/sec-filings", planner=planner) + + assert result["mount"] == "/documents/sec-filings/semantic" + assert filesystem.store.folder_info("/documents/sec-filings/semantic")["kind"] == "generated" + + filesystem.create_folder("/documents/manual/semantic") + with pytest.raises(FileExistsError, match="non-generated"): + filesystem.build_semantic_folder( + "/documents/manual", + planner=TitlePlanner({"Report": ["domain/finance/topic/credit"]}), + ) + + +def test_semantic_folder_rebuild_is_atomic_and_replaces_only_own_mount(tmp_path): + filesystem = _filesystem( + tmp_path, + { + "Report": {"summary": "Report summary", "domain": "Finance", "topic": "Rates"}, + }, + ) + _register_generated_file(filesystem, "Report", external_id="doc_report") + filesystem.build_semantic_folder( + "/", + planner=TitlePlanner({"Report": ["domain/finance/topic/rates"]}), + ) + assert filesystem.store.resolve_file_ref("/semantic/domain/finance/topic/rates/Report") + + class InvalidPlanner: + def plan(self, payload): + return { + "template": ["domain"], + "canonical_values": [{"field": "domain", "display": "Finance", "slug": "finance"}], + "memberships": [{"item_id": payload["items"][0]["item_id"], "paths": ["/domain/finance"]}], + "skipped": [], + } + + with pytest.raises(ValueError, match="must be relative"): + filesystem.build_semantic_folder("/", planner=InvalidPlanner()) + assert filesystem.store.resolve_file_ref("/semantic/domain/finance/topic/rates/Report") + + filesystem.build_semantic_folder( + "/", + planner=TitlePlanner({"Report": ["domain/finance"]}, template=["domain"]), + ) + assert filesystem.store.resolve_file_ref("/semantic/domain/finance/Report") + with pytest.raises(KeyError): + filesystem.store.folder_info("/semantic/domain/finance/topic/rates") + + +def test_semantic_source_scan_excludes_descendant_semantic_mounts(tmp_path): + filesystem = _filesystem( + tmp_path, + { + "Report": {"summary": "Report summary", "domain": "Finance", "topic": "Rates"}, + }, + ) + file_ref = _register_generated_file(filesystem, "Report", external_id="doc_report") + filesystem.build_semantic_folder( + "/", + planner=TitlePlanner({"Report": ["domain/finance/topic/rates"]}), + ) + + entries = filesystem.store.semantic_source_file_entries("/") + + assert [entry.file_ref for entry in entries] == [file_ref] + with pytest.raises(ValueError, match="semantic mount path"): + filesystem.build_semantic_folder( + "/semantic/domain", + planner=TitlePlanner({"Report": ["domain/finance/topic/rates"]}), + ) + + +def test_semantic_folder_generates_missing_candidate_metadata_without_overwriting_canonicalization(tmp_path): + filesystem = _filesystem( + tmp_path, + { + "Report": { + "summary": "Cached report summary", + "domain": "Financial Services", + "topic": "Central Bank Rates", + }, + }, + ) + filesystem.register_file( + storage_uri="file:///tmp/report.txt", + folder_path="/documents", + external_id="doc_report", + title="Report", + content="Report evidence", + content_type="text/plain", + metadata_policy={ + "fields": { + "summary": True, + "doc_type": False, + "domain": False, + "topic": False, + "entity": False, + "relation": False, + }, + "projection_indexes": {"summary": False}, + "batch": False, + }, + ) + + filesystem.build_semantic_folder( + "/", + planner=TitlePlanner({"Report": ["domain/finance/topic/rates"]}), + ) + + metadata = filesystem.store.get_file(filesystem.store.resolve_file_ref("doc_report")).metadata + assert metadata["domain"] == "Financial Services" + assert metadata["topic"] == "Central Bank Rates" + assert ("Report", ("summary",)) in filesystem.metadata_generator.calls + assert ("Report", ("domain", "topic")) in filesystem.metadata_generator.calls + + +def test_browse_inside_semantic_folder_returns_navigation_local_locators(tmp_path): + filesystem = _filesystem( + tmp_path, + { + "Report": {"summary": "Report summary", "domain": "Finance", "topic": "Rates"}, + }, + ) + file_ref = _register_generated_file(filesystem, "Report", external_id="doc_report") + filesystem.build_semantic_folder( + "/", + planner=TitlePlanner({"Report": ["domain/finance/topic/rates"]}), + ) + filesystem.semantic_retrieval_backend = BrowseBackend([file_ref]) + + result = filesystem.browse_semantic_files( + "/semantic/domain/finance", + "rates", + recursive=True, + ) + + assert result["data"][0]["path"] == "/semantic/domain/finance/topic/rates/Report" + assert filesystem.store.resolve_file_ref(result["data"][0]["path"]) == filesystem.store.resolve_file_ref( + "/documents/Report" + ) + + +def test_semantic_folder_display_names_disambiguate_same_title_memberships(tmp_path): + filesystem = _filesystem( + tmp_path, + { + "Report": {"summary": "Report summary", "domain": "Finance", "topic": "Rates"}, + }, + ) + first_ref = _register_generated_file( + filesystem, + "Report", + folder="/first", + external_id="doc_first", + ) + second_ref = _register_generated_file( + filesystem, + "Report", + folder="/second", + external_id="doc_second", + ) + + filesystem.build_semantic_folder( + "/", + planner=TitlePlanner({"Report": ["domain/finance/topic/rates"]}), + ) + + listing = filesystem.browse("/semantic/domain/finance/topic/rates") + paths = sorted(f"{item['folder_path']}/{item['title']}" for item in listing["files"]) + assert paths == [ + f"/semantic/domain/finance/topic/rates/Report [{first_ref.replace('file_', '')[:8]}]", + f"/semantic/domain/finance/topic/rates/Report [{second_ref.replace('file_', '')[:8]}]", + ] + assert filesystem.store.resolve_file_ref(paths[0]) in {first_ref, second_ref} + assert filesystem.store.resolve_file_ref(paths[1]) in {first_ref, second_ref} + assert filesystem.store.resolve_file_ref(paths[0]) != filesystem.store.resolve_file_ref(paths[1]) + + +def test_semantic_folder_validation_rejects_taxonomy_repairs_and_limits(): + from pageindex.filesystem.semantic_folder import validate_semantic_folder_plan + + base = { + "template": ["domain"], + "canonical_values": [ + {"field": "domain", "display": "Finance", "slug": "finance"}, + ], + "memberships": [{"item_id": "item_0001", "paths": ["domain/finance"]}], + "skipped": [], + } + assert validate_semantic_folder_plan(base, item_file_refs={"item_0001": "file_a"}).memberships + + with pytest.raises(ValueError, match="collision"): + validate_semantic_folder_plan( + { + **base, + "canonical_values": [ + {"field": "domain", "display": "Finance", "slug": "finance"}, + {"field": "domain", "display": "Financial Services", "slug": "finance"}, + ], + }, + item_file_refs={"item_0001": "file_a"}, + ) + with pytest.raises(ValueError, match="limit exceeded"): + validate_semantic_folder_plan( + { + **base, + "canonical_values": [ + {"field": "domain", "display": "Finance", "slug": "finance"}, + {"field": "domain", "display": "Technology", "slug": "technology"}, + {"field": "domain", "display": "Healthcare", "slug": "healthcare"}, + {"field": "domain", "display": "Energy", "slug": "energy"}, + ], + "memberships": [ + { + "item_id": "item_0001", + "paths": [ + "domain/finance", + "domain/technology", + "domain/healthcare", + "domain/energy", + ], + } + ], + }, + item_file_refs={"item_0001": "file_a"}, + ) + with pytest.raises(ValueError, match="unknown"): + validate_semantic_folder_plan( + {**base, "memberships": [{"item_id": "item_0001", "paths": ["domain/unknown"]}]}, + item_file_refs={"item_0001": "file_a"}, + ) + + +def test_cli_semantic_folder_build_is_user_surface_not_agent_surface(monkeypatch, capsys, tmp_path): + from pageindex.filesystem import cli + from pageindex.filesystem.commands import PIFSCommandError, PIFSCommandExecutor + + class FakeFileSystem: + def __init__(self, workspace): + self.workspace = workspace + + def configure_existing_projection_retrieval(self): + return False + + def build_semantic_folder(self, source_scope="/"): + return { + "source": source_scope, + "mount": "/documents/semantic", + "template": "domain/topic", + "files": 3, + "memberships": 4, + "skipped": 1, + "metadata_cached": 5, + "metadata_generating": 1, + "metadata_failed": 0, + "planning": "generated", + } + + monkeypatch.setattr(cli, "PageIndexFileSystem", FakeFileSystem) + + status = cli.main(["--workspace", str(tmp_path), "semantic-folder", "build", "/documents"]) + + assert status == 0 + output = capsys.readouterr().out + assert "source: /documents" in output + assert "mount: /documents/semantic" in output + assert "metadata: cached=5 generating=1 failed=0" in output + executor = PIFSCommandExecutor(FakeFileSystem(tmp_path)) + assert "semantic-folder" not in executor.allowed_commands() + with pytest.raises(PIFSCommandError, match="Unsupported command"): + executor.execute("semantic-folder build /documents")