diff --git a/examples/pifs_demo.py b/examples/pifs_demo.py index 230d586..f6f9b51 100644 --- a/examples/pifs_demo.py +++ b/examples/pifs_demo.py @@ -287,7 +287,6 @@ def register_demo_metadata_schema(filesystem: PageIndexFileSystem) -> None: def backfill_registered_metadata_values(filesystem: PageIndexFileSystem, file_ref: str) -> None: entry = filesystem.store.get_file(file_ref) indexed_metadata = dict(entry.metadata or {}) - indexed_metadata.update(entry.derived_metadata or {}) with filesystem.store.connect() as conn: filesystem.store.replace_metadata_values(conn, file_ref, indexed_metadata) @@ -314,12 +313,12 @@ def has_ready_register_outputs(filesystem: PageIndexFileSystem, external_id: str entry = filesystem.store.get_file(file_ref) except KeyError: return False - generation = entry.metadata_generation or {} - fields = generation.get("fields") or {} + status = entry.metadata_status or {} + fields = status.get("fields") or {} required = ("summary", "doc_type", "domain", "topic") if any(fields.get(field, {}).get("status") != "generated" for field in required): return False - summary_projection = (generation.get("projection_indexes") or {}).get("summary") or {} + summary_projection = (status.get("projection_indexes") or {}).get("summary") or {} return summary_projection.get("status") == "ready" @@ -394,17 +393,17 @@ def register_documents( entry = filesystem.store.get_file(file_ref) field_status = { field: state.get("status") - for field, state in (entry.metadata_generation.get("fields") or {}).items() + for field, state in (entry.metadata_status.get("fields") or {}).items() } summary_projection = ( - entry.metadata_generation.get("projection_indexes", {}).get("summary", {}) + entry.metadata_status.get("projection_indexes", {}).get("summary", {}) ) log_progress( f"PIFS register: done file_ref={file_ref} ({register_seconds:.2f}s)", indent=1, ) log_progress( - f"metadata: {entry.metadata_generation.get('status', 'unknown')} fields={field_status}", + f"metadata: {entry.metadata_status.get('status', 'unknown')} fields={field_status}", indent=1, ) log_progress( @@ -418,7 +417,7 @@ def register_documents( "file_ref": file_ref, "external_id": external_id, "path": str(document_path), - "status": entry.metadata_generation.get("status", "unknown"), + "status": entry.metadata_status.get("status", "unknown"), "pageindex_tree_status": entry.pageindex_tree_status, "pageindex_doc_id": entry.pageindex_doc_id, } @@ -642,7 +641,7 @@ def run_smoke_commands( command=command, result=( f"{stat_data.get('title')} | tree={stat_data.get('pageindex_tree_status')} | " - f"metadata_status={(stat_data.get('metadata_generation') or {}).get('status')}" + f"metadata_status={(stat_data.get('metadata_status') or {}).get('status')}" ), raw=shell_executor.execute(command) if verbose else "", verbose=verbose, diff --git a/pageindex/filesystem/commands.py b/pageindex/filesystem/commands.py index 416acec..fc2f47c 100644 --- a/pageindex/filesystem/commands.py +++ b/pageindex/filesystem/commands.py @@ -858,19 +858,16 @@ class PIFSCommandExecutor: lines.append(f" {key}: {self._compact_value(value)}") if len(metadata) > self.MAX_STAT_METADATA_FIELDS: lines.append(f" ... {len(metadata) - self.MAX_STAT_METADATA_FIELDS} more fields") - derived_metadata = data.get("derived_metadata") or {} - if derived_metadata: - lines.append("generated_metadata:") - derived_items = sorted(derived_metadata.items())[: self.MAX_STAT_METADATA_FIELDS] - for key, value in derived_items: - lines.append(f" {key}: {self._compact_value(value)}") - if len(derived_metadata) > self.MAX_STAT_METADATA_FIELDS: + metadata_status = data.get("metadata_status") or {} + if metadata_status: + lines.append(f"metadata_status: {metadata_status.get('status', '-')}") + summary_projection = ( + metadata_status.get("projection_indexes", {}).get("summary", {}) + ) + if summary_projection: lines.append( - f" ... {len(derived_metadata) - self.MAX_STAT_METADATA_FIELDS} more fields" + f"summary_projection_status: {summary_projection.get('status', '-')}" ) - generation = data.get("metadata_generation") or {} - if generation: - lines.append(f"metadata_generation_status: {generation.get('status', '-')}") return "\n".join(lines) def _file_row_text(self, item: dict[str, Any]) -> str: diff --git a/pageindex/filesystem/core.py b/pageindex/filesystem/core.py index 35af513..bbf81f1 100644 --- a/pageindex/filesystem/core.py +++ b/pageindex/filesystem/core.py @@ -47,7 +47,7 @@ DEFAULT_METADATA_GENERATION_FIELDS = { "relation": False, } -DEFAULT_DERIVED_METADATA_FIELD_TYPES = { +DEFAULT_METADATA_FIELD_TYPES = { "summary": "string", "doc_type": "string", "domain": "string", @@ -56,7 +56,8 @@ DEFAULT_DERIVED_METADATA_FIELD_TYPES = { "relation": "string", } -METADATA_GENERATION_STATUSES = { +METADATA_STATUSES = { + "skipped", "pending_submit", "pending_generate", "generated", @@ -128,9 +129,8 @@ class PageIndexFileSystem: content: str = "", content_type: str = "text/plain", source_type: Optional[str] = None, - derived_metadata: Optional[dict[str, Any]] = None, - metadata_generation_policy: Optional[dict[str, Any]] = None, - metadata_generation_status: Optional[str] = None, + metadata_policy: Optional[dict[str, Any]] = None, + metadata_status: Optional[str] = None, ) -> str: return self.register_files( [ @@ -144,15 +144,14 @@ class PageIndexFileSystem: "content": content, "content_type": content_type, "source_type": source_type, - "derived_metadata": derived_metadata, - "metadata_generation_policy": metadata_generation_policy, - "metadata_generation_status": metadata_generation_status, + "metadata_policy": metadata_policy, + "metadata_status": metadata_status, } ] )[0] def register(self, **kwargs: Any) -> str: - if not self._register_uses_deferred_metadata(kwargs.get("metadata_generation_policy")): + if not self._register_uses_deferred_metadata(kwargs.get("metadata_policy")): self._ensure_register_completion_defaults() return self.register_file(**kwargs) @@ -171,7 +170,7 @@ class PageIndexFileSystem: raise MetadataGenerationError( "metadata_generator is required to generate pending PIFS metadata" ) - rows = self.store.list_pending_metadata_generation(limit=limit) + rows = self.store.list_pending_metadata_status(limit=limit) generated = 0 failed = 0 file_refs: list[str] = [] @@ -180,14 +179,14 @@ class PageIndexFileSystem: self._generate_register_metadata(record, force=True) self._complete_summary_projection_index(record) self._register_generation_policy_schema([record]) - self.store.update_file_metadata_generation( + self.store.update_file_metadata_status( record["file_ref"], - derived_metadata=record["derived_metadata"], - metadata_generation=record["metadata_generation"], + metadata=record["metadata"], + metadata_status=record["metadata_status"], ) self._sync_owned_raw_artifact(record) file_refs.append(record["file_ref"]) - if record["metadata_generation"]["status"] == "failed": + if record["metadata_status"]["status"] == "failed": failed += 1 else: generated += 1 @@ -283,7 +282,7 @@ class PageIndexFileSystem: return bool(policy.get("batch")) or policy.get("mode") == "batch" @classmethod - def default_metadata_generation_policy(cls) -> dict[str, Any]: + def default_metadata_policy(cls) -> dict[str, Any]: return { "fields": dict(DEFAULT_METADATA_GENERATION_FIELDS), "projection_indexes": {"summary": True}, @@ -459,8 +458,7 @@ class PageIndexFileSystem: folder_path=folder_path, folder_paths=folder_paths, metadata=row["metadata"], - derived_metadata=row["derived_metadata"], - metadata_generation=row["metadata_generation"], + metadata_status=row["metadata_status"], source_path=row["source_path"], id=row["id"], document_id=row["document_id"], @@ -885,11 +883,18 @@ class PageIndexFileSystem: raw_source_path = str(file["source_path"]) source_path = raw_source_path.strip("/") metadata = file.get("metadata") or {} - derived_metadata = file.get("derived_metadata") or {} if not isinstance(metadata, dict): raise ValueError("metadata must be a JSON object") - if not isinstance(derived_metadata, dict): - raise ValueError("derived_metadata must be a JSON object") + legacy_value_key = "derived_" + "metadata" + legacy_policy_key = "metadata_" + "generation_policy" + legacy_status_key = "metadata_" + "generation_status" + if legacy_value_key in file: + raise ValueError("legacy generated metadata map has been removed; put values in metadata") + if legacy_policy_key in file: + raise ValueError("legacy metadata policy key has been renamed to metadata_policy") + if legacy_status_key in file: + raise ValueError("legacy metadata status key has been renamed to metadata_status") + self._validate_register_metadata(metadata) external_id = file.get("external_id") content = file.get("content") or "" content_type = file.get("content_type") or "text/plain" @@ -907,21 +912,17 @@ class PageIndexFileSystem: ) fts_content = file.get("fts_content", artifact_content) source_type = file.get("source_type") or self._infer_source_type(source_path) - generation_policy = self._normalize_metadata_generation_policy( - file.get("metadata_generation_policy"), - derived_metadata=derived_metadata, + metadata_policy = self._normalize_metadata_policy( + file.get("metadata_policy"), + metadata=metadata, ) - generation_state = self._metadata_generation_state( - generation_policy, - derived_metadata=derived_metadata, - status=file.get("metadata_generation_status"), + metadata_status = self._metadata_status_state( + metadata_policy, + metadata=metadata, + status=file.get("metadata_status"), ) - indexed_metadata = SQLiteFileSystemStore.indexed_metadata_values( - metadata, - derived_metadata, - generation_state, - ) - searchable_metadata = self._merge_metadata_values(metadata, derived_metadata) + indexed_metadata = SQLiteFileSystemStore.indexed_metadata_values(metadata) + searchable_metadata = dict(metadata) folder_path = normalize_path(file.get("folder_path") or "/") title = file.get("title") or metadata.get("title") or Path(source_path).stem file_ref = make_file_ref(external_id or source_path) @@ -938,8 +939,7 @@ class PageIndexFileSystem: source_path=source_path, folder_path=folder_path, metadata=metadata, - derived_metadata=derived_metadata, - metadata_generation=generation_state, + metadata_status=metadata_status, ), ) descriptor = self._build_descriptor(title, metadata) @@ -959,10 +959,8 @@ class PageIndexFileSystem: "pageindex_tree_status": pageindex_tree_status, "metadata": metadata, "metadata_json": json.dumps(metadata, ensure_ascii=False), - "derived_metadata": derived_metadata, - "derived_metadata_json": json.dumps(derived_metadata, ensure_ascii=False), - "metadata_generation": generation_state, - "metadata_generation_json": json.dumps(generation_state, ensure_ascii=False), + "metadata_status": metadata_status, + "metadata_status_json": json.dumps(metadata_status, ensure_ascii=False), "indexed_metadata": indexed_metadata, "metadata_text": metadata_text(searchable_metadata), "folder_path": folder_path, @@ -1035,16 +1033,14 @@ class PageIndexFileSystem: source_path: str, folder_path: str, metadata: dict[str, Any], - derived_metadata: dict[str, Any], - metadata_generation: dict[str, Any], + metadata_status: dict[str, Any], ) -> dict[str, Any]: return { "storage_uri": storage_uri, "source_path": source_path, "folder_path": folder_path, "metadata": metadata, - "derived_metadata": derived_metadata, - "metadata_generation": metadata_generation, + "metadata_status": metadata_status, } def _sync_owned_raw_artifact(self, record: dict[str, Any]) -> None: @@ -1064,22 +1060,21 @@ class PageIndexFileSystem: source_path=record["source_path"], folder_path=record["folder_path"], metadata=record["metadata"], - derived_metadata=record["derived_metadata"], - metadata_generation=record["metadata_generation"], + metadata_status=record["metadata_status"], ), ) ) def _record_from_file_entry(self, entry: Any) -> dict[str, Any]: content = self.store.read_text(entry.file_ref) - generation_policy = self._normalize_metadata_generation_policy( - entry.metadata_generation.get("policy", {}), - derived_metadata=entry.derived_metadata, + metadata_policy = self._normalize_metadata_policy( + entry.metadata_status.get("policy", {}), + metadata=entry.metadata, ) - generation_state = self._metadata_generation_state( - generation_policy, - derived_metadata=entry.derived_metadata, - status=entry.metadata_generation.get("status"), + metadata_status = self._metadata_status_state( + metadata_policy, + metadata=entry.metadata, + status=entry.metadata_status.get("status"), ) return { "file_ref": entry.file_ref, @@ -1097,35 +1092,29 @@ class PageIndexFileSystem: "pageindex_tree_status": entry.pageindex_tree_status, "metadata": dict(entry.metadata), "metadata_json": json.dumps(entry.metadata, ensure_ascii=False), - "derived_metadata": dict(entry.derived_metadata), - "derived_metadata_json": json.dumps(entry.derived_metadata, ensure_ascii=False), - "metadata_generation": generation_state, - "metadata_generation_json": json.dumps(generation_state, ensure_ascii=False), - "indexed_metadata": SQLiteFileSystemStore.indexed_metadata_values( - entry.metadata, - entry.derived_metadata, - generation_state, - ), - "metadata_text": metadata_text(self._merge_metadata_values(entry.metadata, entry.derived_metadata)), + "metadata_status": metadata_status, + "metadata_status_json": json.dumps(metadata_status, ensure_ascii=False), + "indexed_metadata": SQLiteFileSystemStore.indexed_metadata_values(entry.metadata), + "metadata_text": metadata_text(entry.metadata), "folder_path": entry.folder_path, "content": content, "skip_fts": False, } def _generate_register_metadata(self, record: dict[str, Any], *, force: bool = False) -> None: - generation = record["metadata_generation"] - policy = generation.get("policy", {}) - if self._metadata_generation_is_batch(policy) and not force: + status = record["metadata_status"] + policy = status.get("policy", {}) + if self._metadata_policy_is_batch(policy) and not force: self._mark_requested_generation_status(record, "pending_submit") return fields = self._metadata_fields_to_generate(record) if not fields: return if self.metadata_generator is None: - if self._metadata_generation_requires_sync(policy): + if self._metadata_policy_requires_sync(policy): raise MetadataGenerationError( "metadata_generator is required for synchronous PIFS metadata generation; " - "set metadata_generation_policy batch=true to defer" + "set metadata_policy batch=true to defer" ) return try: @@ -1146,112 +1135,117 @@ class PageIndexFileSystem: if isinstance(result, dict): result = MetadataGenerationResult(values=result) except Exception as exc: - self._apply_metadata_generation_failures(record, fields, str(exc)) + self._apply_metadata_status_failures(record, fields, str(exc)) return failures = dict(result.failures) for field in fields: if field in result.values: - record["derived_metadata"][field] = result.values[field] - generation["fields"][field] = {"requested": True, "status": "generated"} + record["metadata"][field] = result.values[field] + status["fields"][field] = { + "requested": True, + "status": "generated", + "owner": "pifs", + "source": "llm", + } else: failures.setdefault(field, "metadata generator did not return field") for field, reason in failures.items(): - generation["fields"][field] = { + status["fields"][field] = { "requested": True, "status": "failed", + "owner": "pifs", + "source": "llm", "error": str(reason), } - self._refresh_record_metadata_generation(record) + self._refresh_record_metadata_status(record) def _complete_summary_projection_index(self, record: dict[str, Any]) -> None: - generation = record["metadata_generation"] - summary_index = generation.get("projection_indexes", {}).get("summary") + metadata_status = record["metadata_status"] + summary_index = metadata_status.get("projection_indexes", {}).get("summary") if not summary_index or not summary_index.get("requested"): return - summary = str(record.get("derived_metadata", {}).get("summary") or "").strip() + summary = str(record.get("metadata", {}).get("summary") or "").strip() if not summary: return if self.summary_projection_indexer is None: - self._refresh_record_metadata_generation(record) + self._refresh_record_metadata_status(record) return try: result = self.summary_projection_indexer.upsert_summary(record) except Exception as exc: summary_index["status"] = "failed" summary_index["error"] = str(exc) - self._refresh_record_metadata_generation(record) + self._refresh_record_metadata_status(record) return summary_index.clear() summary_index.update({"requested": True, **result}) if summary_index.get("status") != "ready": summary_index["status"] = "ready" - self._refresh_record_metadata_generation(record) + self._refresh_record_metadata_status(record) @staticmethod - def _metadata_generation_is_batch(policy: dict[str, Any]) -> bool: + def _metadata_policy_is_batch(policy: dict[str, Any]) -> bool: return bool(policy.get("batch")) or policy.get("mode") == "batch" @staticmethod - def _metadata_generation_requires_sync(policy: dict[str, Any]) -> bool: + def _metadata_policy_requires_sync(policy: dict[str, Any]) -> bool: return policy.get("batch") is False or policy.get("mode") == "sync" def _metadata_fields_to_generate(self, record: dict[str, Any]) -> list[str]: fields: list[str] = [] - for name, state in record["metadata_generation"].get("fields", {}).items(): + for name, state in record["metadata_status"].get("fields", {}).items(): if not state.get("requested"): continue - if state.get("status") == "generated" and name in record["derived_metadata"]: + if state.get("status") == "generated" and name in record["metadata"]: continue fields.append(name) return fields def _mark_requested_generation_status(self, record: dict[str, Any], status: str) -> None: - for name, field in record["metadata_generation"].get("fields", {}).items(): + for name, field in record["metadata_status"].get("fields", {}).items(): if field.get("requested") and field.get("status") != "generated": - record["metadata_generation"]["fields"][name] = { + record["metadata_status"]["fields"][name] = { "requested": True, "status": status, + "owner": "pifs", + "source": "llm", } - self._refresh_record_metadata_generation(record, explicit_status=status) + self._refresh_record_metadata_status(record, explicit_status=status) - def _apply_metadata_generation_failures( + def _apply_metadata_status_failures( self, record: dict[str, Any], fields: list[str], reason: str, ) -> None: for field in fields: - record["metadata_generation"]["fields"][field] = { + record["metadata_status"]["fields"][field] = { "requested": True, "status": "failed", + "owner": "pifs", + "source": "llm", "error": reason, } - self._refresh_record_metadata_generation(record, explicit_status="failed") + self._refresh_record_metadata_status(record, explicit_status="failed") - def _refresh_record_metadata_generation( + def _refresh_record_metadata_status( self, record: dict[str, Any], *, explicit_status: str | None = None, ) -> None: - generation = record["metadata_generation"] + metadata_status = record["metadata_status"] statuses = [ field.get("status") - for field in generation.get("fields", {}).values() + for field in metadata_status.get("fields", {}).values() if field.get("requested") and field.get("status") ] - generation["status"] = explicit_status or self._aggregate_generation_status(statuses) - self._refresh_projection_index_statuses(generation, record["derived_metadata"]) - record["derived_metadata_json"] = json.dumps(record["derived_metadata"], ensure_ascii=False) - record["metadata_generation_json"] = json.dumps(generation, ensure_ascii=False) - record["indexed_metadata"] = SQLiteFileSystemStore.indexed_metadata_values( - record["metadata"], - record["derived_metadata"], - generation, - ) - record["metadata_text"] = metadata_text( - self._merge_metadata_values(record["metadata"], record["derived_metadata"]) - ) + metadata_status["status"] = explicit_status or self._aggregate_metadata_status(statuses) + self._refresh_projection_index_statuses(metadata_status, record["metadata"]) + record["metadata_json"] = json.dumps(record["metadata"], ensure_ascii=False) + record["metadata_status_json"] = json.dumps(metadata_status, ensure_ascii=False) + record["indexed_metadata"] = SQLiteFileSystemStore.indexed_metadata_values(record["metadata"]) + record["metadata_text"] = metadata_text(record["metadata"]) def _open_lines(self, file_ref: str, start: int, end: int) -> OpenResult: entry = self.store.get_file(file_ref) @@ -1381,8 +1375,7 @@ class PageIndexFileSystem: folder_path=folder_path, folder_paths=folder_paths, metadata=entry.metadata, - derived_metadata=entry.derived_metadata, - metadata_generation=entry.metadata_generation, + metadata_status=entry.metadata_status, source_path=entry.source_path, id=entry.external_id or file_ref, document_id=entry.external_id, @@ -1403,31 +1396,49 @@ class PageIndexFileSystem: source = metadata.get("source_type") or metadata.get("repo") or metadata.get("channel") return f"{title} ({source})" if source else title + @staticmethod + def _validate_register_metadata(metadata: dict[str, Any]) -> None: + pifs_owned_fields = set(DEFAULT_METADATA_GENERATION_FIELDS) + conflicts = sorted(pifs_owned_fields.intersection(metadata)) + if conflicts: + raise ValueError( + "metadata contains PIFS-owned generated field(s): " + + ", ".join(conflicts) + + "; configure metadata_policy instead of passing generated fields" + ) + def _register_generation_policy_schema(self, records: list[dict[str, Any]]) -> None: - fields: dict[str, dict[str, str]] = {} + pifs_fields: dict[str, dict[str, str]] = {} + user_fields: dict[str, dict[str, str]] = {} for record in records: - policy_fields = record["metadata_generation"]["policy"]["fields"] + policy_fields = record["metadata_status"]["policy"]["fields"] + generated_names = {str(name) for name, requested in policy_fields.items() if requested} for name, requested in policy_fields.items(): if requested: - fields[name] = { - "type": DEFAULT_DERIVED_METADATA_FIELD_TYPES.get( + pifs_fields[name] = { + "type": DEFAULT_METADATA_FIELD_TYPES.get( name, self._infer_metadata_field_type( - record.get("derived_metadata", {}).get(name) + record.get("metadata", {}).get(name) ), ) } - for name, value in record.get("derived_metadata", {}).items(): - fields.setdefault(name, {"type": self._infer_metadata_field_type(value)}) - if fields: - self.metadata.register_schema({"fields": fields}, source="derived") + for name, value in record.get("metadata", {}).items(): + if name in generated_names: + pifs_fields.setdefault(name, {"type": self._infer_metadata_field_type(value)}) + else: + user_fields.setdefault(name, {"type": self._infer_metadata_field_type(value)}) + if pifs_fields: + self.metadata.register_schema({"fields": pifs_fields}, source="pifs") + if user_fields: + self.metadata.register_schema({"fields": user_fields}, source="user") @classmethod - def _normalize_metadata_generation_policy( + def _normalize_metadata_policy( cls, policy: Optional[dict[str, Any]], *, - derived_metadata: dict[str, Any], + metadata: dict[str, Any], ) -> dict[str, Any]: fields = dict(DEFAULT_METADATA_GENERATION_FIELDS) field_statuses: dict[str, str] = {} @@ -1438,7 +1449,7 @@ class PageIndexFileSystem: top_level_status = None if policy is not None: if not isinstance(policy, dict): - raise ValueError("metadata_generation_policy must be a JSON object") + raise ValueError("metadata_policy must be a JSON object") raw_fields = policy.get("fields") if raw_fields is None: raw_fields = { @@ -1447,7 +1458,7 @@ class PageIndexFileSystem: if name not in {"batch", "mode", "status", "projection_indexes"} } if not isinstance(raw_fields, dict): - raise ValueError("metadata_generation_policy fields must be a JSON object") + raise ValueError("metadata_policy fields must be a JSON object") for name, declaration in raw_fields.items(): name = str(name) if isinstance(declaration, bool): @@ -1459,7 +1470,7 @@ class PageIndexFileSystem: ) field_status = declaration.get("status") if field_status is not None: - cls._validate_metadata_generation_status(str(field_status)) + cls._validate_metadata_status(str(field_status)) field_statuses[name] = str(field_status) continue raise ValueError(f"Invalid metadata generation policy for field: {name}") @@ -1470,13 +1481,11 @@ class PageIndexFileSystem: batch = True top_level_status = policy.get("status") if top_level_status is not None: - cls._validate_metadata_generation_status(str(top_level_status)) + cls._validate_metadata_status(str(top_level_status)) if "projection_indexes" in policy: projection_indexes, projection_index_statuses = ( cls._normalize_projection_index_policy(policy["projection_indexes"]) ) - for name in derived_metadata: - fields.setdefault(name, True) normalized: dict[str, Any] = { "fields": fields, "projection_indexes": ( @@ -1498,36 +1507,46 @@ class PageIndexFileSystem: return normalized @classmethod - def _metadata_generation_state( + def _metadata_status_state( cls, policy: dict[str, Any], *, - derived_metadata: dict[str, Any], + metadata: dict[str, Any], status: Optional[str], ) -> dict[str, Any]: explicit_status = status or policy.get("status") if explicit_status is not None: explicit_status = str(explicit_status) - cls._validate_metadata_generation_status(explicit_status) + cls._validate_metadata_status(explicit_status) field_statuses = policy.get("field_statuses", {}) fields: dict[str, dict[str, Any]] = {} for name, requested in policy["fields"].items(): if not requested: - fields[name] = {"requested": False} + fields[name] = { + "requested": False, + "status": "skipped", + "owner": "pifs", + "source": "llm", + } continue field_status = field_statuses.get(name) if field_status is None: field_status = explicit_status if field_status is None: - field_status = "generated" if name in derived_metadata else "pending_generate" - fields[name] = {"requested": True, "status": field_status} + field_status = "generated" if name in metadata else "pending_generate" + fields[name] = { + "requested": True, + "status": field_status, + "owner": "pifs", + "source": "llm", + } requested_statuses = [ item["status"] for item in fields.values() if item.get("requested") and item.get("status") ] - aggregate_status = explicit_status or cls._aggregate_generation_status(requested_statuses) + aggregate_status = explicit_status or cls._aggregate_metadata_status(requested_statuses) policy_summary = { "fields": dict(policy["fields"]), "projection_indexes": dict(policy.get("projection_indexes", {})), @@ -1549,12 +1568,14 @@ class PageIndexFileSystem: state["projection_indexes"][name] = { "requested": True, "status": projection_statuses.get(name, "not_indexed"), + "owner": "pifs", + "source": "index", } - cls._refresh_projection_index_statuses(state, derived_metadata) + cls._refresh_projection_index_statuses(state, metadata) return state @staticmethod - def _aggregate_generation_status(statuses: list[str]) -> str: + def _aggregate_metadata_status(statuses: list[str]) -> str: if not statuses: return "generated" for status in ("failed", "pending_submit", "pending_generate"): @@ -1563,9 +1584,9 @@ class PageIndexFileSystem: return "generated" @staticmethod - def _validate_metadata_generation_status(status: str) -> None: - if status not in METADATA_GENERATION_STATUSES: - raise ValueError(f"Unsupported metadata generation status: {status}") + def _validate_metadata_status(status: str) -> None: + if status not in METADATA_STATUSES: + raise ValueError(f"Unsupported metadata status: {status}") @classmethod def _normalize_projection_index_policy( @@ -1575,7 +1596,7 @@ class PageIndexFileSystem: if projection_policy is None: return {}, {} if not isinstance(projection_policy, dict): - raise ValueError("metadata_generation_policy projection_indexes must be a JSON object") + raise ValueError("metadata_policy projection_indexes must be a JSON object") projection_indexes: dict[str, bool] = {} projection_index_statuses: dict[str, str] = {} for name, declaration in projection_policy.items(): @@ -1604,43 +1625,17 @@ class PageIndexFileSystem: @classmethod def _refresh_projection_index_statuses( cls, - generation: dict[str, Any], - derived_metadata: dict[str, Any], + metadata_status: dict[str, Any], + metadata: dict[str, Any], ) -> None: - summary_index = generation.get("projection_indexes", {}).get("summary") + summary_index = metadata_status.get("projection_indexes", {}).get("summary") if not summary_index or not summary_index.get("requested"): return - if "summary" not in derived_metadata: + if "summary" not in metadata: return if summary_index.get("status", "not_indexed") == "not_indexed": summary_index["status"] = "pending_index" - @classmethod - def _merge_metadata_values( - cls, - metadata: dict[str, Any], - derived_metadata: dict[str, Any], - ) -> dict[str, Any]: - merged = dict(metadata) - for name, value in derived_metadata.items(): - if name not in merged: - merged[name] = value - continue - if merged[name] == value: - continue - merged[name] = cls._merge_metadata_value(merged[name], value) - return merged - - @staticmethod - def _merge_metadata_value(raw_value: Any, derived_value: Any) -> Any: - values = raw_value if isinstance(raw_value, list) else [raw_value] - derived_values = derived_value if isinstance(derived_value, list) else [derived_value] - merged = list(values) - for item in derived_values: - if item not in merged: - merged.append(item) - return merged - @staticmethod def _infer_metadata_field_type(value: Any) -> str: if isinstance(value, bool): diff --git a/pageindex/filesystem/projection_indexing.py b/pageindex/filesystem/projection_indexing.py index 5c07ca0..ed77465 100644 --- a/pageindex/filesystem/projection_indexing.py +++ b/pageindex/filesystem/projection_indexing.py @@ -68,7 +68,7 @@ class SummaryProjectionIndexer: ) def upsert_summary(self, record: dict[str, Any]) -> dict[str, Any]: - summary = str((record.get("derived_metadata") or {}).get("summary") or "").strip() + summary = str((record.get("metadata") or {}).get("summary") or "").strip() if not summary: return {"status": "skipped", "reason": "missing_summary"} vector = self.embedding_cache.embed_texts( @@ -79,7 +79,6 @@ class SummaryProjectionIndexer: batch_size=1, )[0] metadata = dict(record.get("metadata") or {}) - metadata.update(record.get("derived_metadata") or {}) count = self.index.upsert_many( [ SemanticIndexRecord( diff --git a/pageindex/filesystem/store.py b/pageindex/filesystem/store.py index 9ef90ad..a0dbe6a 100644 --- a/pageindex/filesystem/store.py +++ b/pageindex/filesystem/store.py @@ -9,7 +9,7 @@ from typing import Any, Iterable, Optional from .types import FileEntry, MetadataField -SCHEMA_VERSION = 4 +SCHEMA_VERSION = 5 class SQLiteFileSystemStore: @@ -47,6 +47,10 @@ class SQLiteFileSystemStore: version = 3 if version < 4: self._migrate_to_v4(conn) + conn.execute("PRAGMA user_version = 4") + version = 4 + if version < 5: + self._migrate_to_v5(conn) conn.execute(f"PRAGMA user_version = {SCHEMA_VERSION}") def _migrate_to_v1(self, conn: sqlite3.Connection) -> None: @@ -68,8 +72,7 @@ class SQLiteFileSystemStore: pageindex_doc_id TEXT, pageindex_tree_status TEXT NOT NULL DEFAULT 'not_built', metadata_json TEXT NOT NULL DEFAULT '{}', - derived_metadata_json TEXT NOT NULL DEFAULT '{}', - metadata_generation_json TEXT NOT NULL DEFAULT '{}', + metadata_status_json TEXT NOT NULL DEFAULT '{}', created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP, deleted_at TEXT @@ -228,10 +231,58 @@ class SQLiteFileSystemStore: if "files" not in self._tables(conn): return columns = self._columns(conn, "files") - if "derived_metadata_json" not in columns: - conn.execute("ALTER TABLE files ADD COLUMN derived_metadata_json TEXT NOT NULL DEFAULT '{}'") - if "metadata_generation_json" not in columns: - conn.execute("ALTER TABLE files ADD COLUMN metadata_generation_json TEXT NOT NULL DEFAULT '{}'") + if "metadata_status_json" not in columns: + conn.execute("ALTER TABLE files ADD COLUMN metadata_status_json TEXT NOT NULL DEFAULT '{}'") + self._backfill_metadata_values(conn) + + def _migrate_to_v5(self, conn: sqlite3.Connection) -> None: + if "files" not in self._tables(conn): + return + columns = self._columns(conn, "files") + if "metadata_status_json" not in columns: + conn.execute("ALTER TABLE files ADD COLUMN metadata_status_json TEXT NOT NULL DEFAULT '{}'") + columns = self._columns(conn, "files") + derived_select = ( + "derived_metadata_json" + if "derived_metadata_json" in columns + else "'{}' AS derived_metadata_json" + ) + legacy_status_select = ( + "metadata_generation_json" + if "metadata_generation_json" in columns + else "'{}' AS metadata_generation_json" + ) + rows = conn.execute( + f""" + SELECT file_ref, metadata_json, metadata_status_json, {derived_select}, {legacy_status_select} + FROM files + WHERE deleted_at IS NULL + """ + ).fetchall() + for row in rows: + metadata = self._json_object(row["metadata_json"]) + legacy_generated_values = self._json_object(row["derived_metadata_json"]) + metadata_status = self._json_object(row["metadata_status_json"]) + legacy_status = self._json_object(row["metadata_generation_json"]) + if not metadata_status and legacy_status: + metadata_status = legacy_status + metadata_status = self._normalize_metadata_status(metadata_status) + if legacy_generated_values: + metadata.update(legacy_generated_values) + conn.execute( + """ + UPDATE files + SET metadata_json = ?, + metadata_status_json = ?, + updated_at = CURRENT_TIMESTAMP + WHERE file_ref = ? AND deleted_at IS NULL + """, + ( + json.dumps(metadata, ensure_ascii=False), + json.dumps(metadata_status, ensure_ascii=False), + row["file_ref"], + ), + ) self._backfill_metadata_values(conn) def _migrate_legacy_tables(self, conn: sqlite3.Connection) -> None: @@ -271,43 +322,61 @@ class SQLiteFileSystemStore: def _backfill_metadata_values(self, conn: sqlite3.Connection) -> None: if "files" not in self._tables(conn): return - columns = self._columns(conn, "files") - derived_select = ( - "derived_metadata_json" - if "derived_metadata_json" in columns - else "'{}' AS derived_metadata_json" - ) - generation_select = ( - "metadata_generation_json" - if "metadata_generation_json" in columns - else "'{}' AS metadata_generation_json" - ) rows = conn.execute( - f""" - SELECT file_ref, metadata_json, {derived_select}, {generation_select} + """ + SELECT file_ref, metadata_json FROM files WHERE deleted_at IS NULL """ ).fetchall() for row in rows: - try: - metadata = json.loads(row["metadata_json"] or "{}") - except json.JSONDecodeError: - metadata = {} - try: - derived_metadata = json.loads(row["derived_metadata_json"] or "{}") - except json.JSONDecodeError: - derived_metadata = {} - try: - metadata_generation = json.loads(row["metadata_generation_json"] or "{}") - except json.JSONDecodeError: - metadata_generation = {} self.replace_metadata_values( conn, row["file_ref"], - self.indexed_metadata_values(metadata, derived_metadata, metadata_generation), + self.indexed_metadata_values(self._json_object(row["metadata_json"])), ) + @staticmethod + def _json_object(value: Any) -> dict[str, Any]: + try: + parsed = json.loads(value or "{}") if isinstance(value, str) else value + except json.JSONDecodeError: + return {} + return parsed if isinstance(parsed, dict) else {} + + @staticmethod + def _normalize_metadata_status(metadata_status: dict[str, Any]) -> dict[str, Any]: + normalized = dict(metadata_status) + fields = normalized.get("fields") + if isinstance(fields, dict): + normalized["fields"] = { + name: ( + { + **state, + "owner": state.get("owner", "pifs"), + "source": state.get("source", "llm"), + } + if isinstance(state, dict) + else state + ) + for name, state in fields.items() + } + projection_indexes = normalized.get("projection_indexes") + if isinstance(projection_indexes, dict): + normalized["projection_indexes"] = { + name: ( + { + **state, + "owner": state.get("owner", "pifs"), + "source": state.get("source", "index"), + } + if isinstance(state, dict) + else state + ) + for name, state in projection_indexes.items() + } + return normalized + @staticmethod def _tables(conn: sqlite3.Connection) -> set[str]: rows = conn.execute("SELECT name FROM sqlite_master WHERE type IN ('table', 'virtual table')").fetchall() @@ -421,8 +490,7 @@ class SQLiteFileSystemStore: "pageindex_doc_id", "pageindex_tree_status", "metadata_json", - "derived_metadata_json", - "metadata_generation_json", + "metadata_status_json", ] if include_folder_path: columns.append("folder_path") @@ -450,8 +518,7 @@ class SQLiteFileSystemStore: record.get("pageindex_doc_id"), record.get("pageindex_tree_status", "not_built"), record["metadata_json"], - record.get("derived_metadata_json", "{}"), - record.get("metadata_generation_json", "{}"), + record.get("metadata_status_json", "{}"), ] if include_folder_path: values.append(record["folder_path"]) @@ -560,8 +627,7 @@ class SQLiteFileSystemStore: "pageindex_doc_id", "pageindex_tree_status", "metadata_json", - "derived_metadata_json", - "metadata_generation_json", + "metadata_status_json", "deleted_at", "updated_at", ] @@ -580,8 +646,7 @@ class SQLiteFileSystemStore: record.get("pageindex_doc_id"), record.get("pageindex_tree_status", "not_built"), record["metadata_json"], - record.get("derived_metadata_json", "{}"), - record.get("metadata_generation_json", "{}"), + record.get("metadata_status_json", "{}"), None, current_timestamp, ] @@ -999,8 +1064,7 @@ class SQLiteFileSystemStore: "f.descriptor", "f.pageindex_tree_status", "f.metadata_json", - "f.derived_metadata_json", - "f.metadata_generation_json", + "f.metadata_status_json", "f.created_at", """ ( @@ -1182,7 +1246,7 @@ class SQLiteFileSystemStore: raise KeyError(f"Unknown file_ref: {file_ref}") return self._file_entry(row) - def list_pending_metadata_generation(self, *, limit: int | None = None) -> list[FileEntry]: + def list_pending_metadata_status(self, *, limit: int | None = None) -> list[FileEntry]: sql = """ SELECT f.file_ref, @@ -1199,16 +1263,15 @@ class SQLiteFileSystemStore: f.pageindex_doc_id, f.pageindex_tree_status, f.metadata_json, - f.derived_metadata_json, - f.metadata_generation_json, + f.metadata_status_json, COALESCE(primary_folder.path, '/') AS folder_path FROM files f LEFT JOIN file_folders ff ON ff.file_ref = f.file_ref LEFT JOIN folders primary_folder ON primary_folder.folder_id = ff.folder_id WHERE f.deleted_at IS NULL AND ( - f.metadata_generation_json LIKE '%pending_generate%' - OR f.metadata_generation_json LIKE '%pending_submit%' + f.metadata_status_json LIKE '%pending_generate%' + OR f.metadata_status_json LIKE '%pending_submit%' ) GROUP BY f.file_ref ORDER BY f.created_at, f.file_ref @@ -1221,39 +1284,36 @@ class SQLiteFileSystemStore: rows = conn.execute(sql, params).fetchall() return [self._file_entry(row) for row in rows] - def update_file_metadata_generation( + def update_file_metadata_status( self, file_ref: str, *, - derived_metadata: dict[str, Any], - metadata_generation: dict[str, Any], + metadata: dict[str, Any], + metadata_status: dict[str, Any], ) -> None: with self.connect() as conn: row = self._file_entry_row(conn, file_ref) if row is None: raise KeyError(f"Unknown file_ref: {file_ref}") - metadata = json.loads(row["metadata_json"] or "{}") - metadata_text_value = metadata_text( - self._merge_metadata_values(metadata, derived_metadata) - ) + metadata_text_value = metadata_text(metadata) conn.execute( """ UPDATE files - SET derived_metadata_json = ?, - metadata_generation_json = ?, + SET metadata_json = ?, + metadata_status_json = ?, updated_at = CURRENT_TIMESTAMP WHERE file_ref = ? AND deleted_at IS NULL """, ( - json.dumps(derived_metadata, ensure_ascii=False), - json.dumps(metadata_generation, ensure_ascii=False), + json.dumps(metadata, ensure_ascii=False), + json.dumps(metadata_status, ensure_ascii=False), file_ref, ), ) self.replace_metadata_values( conn, file_ref, - self.indexed_metadata_values(metadata, derived_metadata, metadata_generation), + self.indexed_metadata_values(metadata), ) conn.execute( """ @@ -1631,8 +1691,7 @@ class SQLiteFileSystemStore: f.pageindex_doc_id, f.pageindex_tree_status, f.metadata_json, - f.derived_metadata_json, - f.metadata_generation_json, + f.metadata_status_json, COALESCE( ( SELECT display_folder.path @@ -1668,8 +1727,7 @@ class SQLiteFileSystemStore: f.source_path, f.pageindex_tree_status, f.metadata_json, - f.derived_metadata_json, - f.metadata_generation_json, + f.metadata_status_json, f.created_at, MIN(pf.folder_id) AS folder_id, MIN(pf.path) AS folder_path @@ -1860,9 +1918,8 @@ class SQLiteFileSystemStore: "source_path": row["source_path"], "folder_path": row["folder_path"], "metadata": json.loads(row["metadata_json"] or "{}"), - "derived_metadata": json.loads(cls._row_value(row, "derived_metadata_json", "{}") or "{}"), - "metadata_generation": json.loads( - cls._row_value(row, "metadata_generation_json", "{}") or "{}" + "metadata_status": json.loads( + cls._row_value(row, "metadata_status_json", "{}") or "{}" ), } @@ -1885,9 +1942,8 @@ class SQLiteFileSystemStore: "snippet": row["snippet"] or row["title"], "folder_path": row["folder_path"], "metadata": json.loads(row["metadata_json"] or "{}"), - "derived_metadata": json.loads(cls._row_value(row, "derived_metadata_json", "{}") or "{}"), - "metadata_generation": json.loads( - cls._row_value(row, "metadata_generation_json", "{}") or "{}" + "metadata_status": json.loads( + cls._row_value(row, "metadata_status_json", "{}") or "{}" ), } @@ -1913,11 +1969,8 @@ class SQLiteFileSystemStore: pageindex_tree_status=row["pageindex_tree_status"], metadata=json.loads(row["metadata_json"] or "{}"), folder_path=row["folder_path"], - derived_metadata=json.loads( - SQLiteFileSystemStore._row_value(row, "derived_metadata_json", "{}") or "{}" - ), - metadata_generation=json.loads( - SQLiteFileSystemStore._row_value(row, "metadata_generation_json", "{}") or "{}" + metadata_status=json.loads( + SQLiteFileSystemStore._row_value(row, "metadata_status_json", "{}") or "{}" ), ) @@ -1944,8 +1997,7 @@ class SQLiteFileSystemStore: "pageindex_doc_id": entry.pageindex_doc_id, "pageindex_tree_status": entry.pageindex_tree_status, "metadata": entry.metadata, - "derived_metadata": entry.derived_metadata, - "metadata_generation": entry.metadata_generation, + "metadata_status": entry.metadata_status, "folder_path": entry.folder_path, } @@ -2041,57 +2093,9 @@ class SQLiteFileSystemStore: return json.dumps(value, ensure_ascii=False, sort_keys=True) return "" if value is None else str(value) - @classmethod - def _merge_metadata_values( - cls, - metadata: dict[str, Any], - derived_metadata: dict[str, Any], - ) -> dict[str, Any]: - merged = dict(metadata) - for name, value in derived_metadata.items(): - if name not in merged: - merged[name] = value - continue - if merged[name] == value: - continue - merged[name] = cls._merge_metadata_value(merged[name], value) - return merged - @staticmethod - def _merge_metadata_value(raw_value: Any, derived_value: Any) -> Any: - values = raw_value if isinstance(raw_value, list) else [raw_value] - derived_values = derived_value if isinstance(derived_value, list) else [derived_value] - merged = list(values) - for item in derived_values: - if item not in merged: - merged.append(item) - return merged - - @classmethod - def indexed_metadata_values( - cls, - metadata: dict[str, Any], - derived_metadata: dict[str, Any], - metadata_generation: dict[str, Any] | None, - ) -> dict[str, Any]: - generated_fields = set(derived_metadata) - if isinstance(metadata_generation, dict): - policy = metadata_generation.get("policy", {}) - if isinstance(policy, dict): - fields = policy.get("fields", {}) - if isinstance(fields, dict): - generated_fields.update( - str(name) - for name in fields - ) - - indexed = { - name: value - for name, value in metadata.items() - if name not in generated_fields - } - indexed.update(derived_metadata) - return indexed + def indexed_metadata_values(metadata: dict[str, Any]) -> dict[str, Any]: + return dict(metadata) @staticmethod def _valid_field_name(name: str) -> bool: diff --git a/pageindex/filesystem/types.py b/pageindex/filesystem/types.py index 4cd573f..103d28d 100644 --- a/pageindex/filesystem/types.py +++ b/pageindex/filesystem/types.py @@ -22,8 +22,7 @@ class SearchResult: pageNum: Optional[int] = None createdAt: Optional[str] = None folderId: Optional[str] = None - derived_metadata: dict[str, Any] = field(default_factory=dict) - metadata_generation: dict[str, Any] = field(default_factory=dict) + metadata_status: dict[str, Any] = field(default_factory=dict) @dataclass(frozen=True) @@ -63,8 +62,7 @@ class FileEntry: pageindex_tree_status: str metadata: dict[str, Any] folder_path: str - derived_metadata: dict[str, Any] = field(default_factory=dict) - metadata_generation: dict[str, Any] = field(default_factory=dict) + metadata_status: dict[str, Any] = field(default_factory=dict) @dataclass(frozen=True) diff --git a/tests/test_pifs_find_maxdepth.py b/tests/test_pifs_find_maxdepth.py index 304964f..e301248 100644 --- a/tests/test_pifs_find_maxdepth.py +++ b/tests/test_pifs_find_maxdepth.py @@ -98,12 +98,23 @@ def test_stable_path_targets_work_without_session_refs(tmp_path): assert "Root document fixture text" in text -def test_stat_shell_output_includes_generated_metadata(tmp_path): +def test_stat_shell_output_includes_unified_metadata_status(tmp_path): from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem + from pageindex.filesystem.metadata_generation import MetadataGenerationResult source = tmp_path / "source.txt" source.write_text("fixture text", encoding="utf-8") - filesystem = PageIndexFileSystem(workspace=tmp_path / "workspace") + + class SummaryGenerator: + def generate(self, document, *, fields): + return MetadataGenerationResult( + values={field: "Generated summary for retrieval." for field in fields} + ) + + filesystem = PageIndexFileSystem( + workspace=tmp_path / "workspace", + metadata_generator=SummaryGenerator(), + ) filesystem.register_file( storage_uri=source.as_uri(), source_path="docs/source.txt", @@ -112,8 +123,7 @@ def test_stat_shell_output_includes_generated_metadata(tmp_path): title="Generated metadata document", content=source.read_text(encoding="utf-8"), metadata={"department": "ops"}, - derived_metadata={"summary": "Generated summary for retrieval."}, - metadata_generation_policy={ + metadata_policy={ "fields": { "summary": True, "doc_type": False, @@ -128,9 +138,126 @@ def test_stat_shell_output_includes_generated_metadata(tmp_path): assert "metadata:" in stat assert " department: ops" in stat - assert "generated_metadata:" in stat assert " summary: Generated summary for retrieval." in stat - assert "metadata_generation_status: generated" in stat + assert "metadata_status: generated" in stat + + +def test_register_rejects_pifs_owned_metadata_fields(tmp_path): + from pageindex.filesystem import PageIndexFileSystem + + source = tmp_path / "source.txt" + source.write_text("fixture text", encoding="utf-8") + filesystem = PageIndexFileSystem(workspace=tmp_path / "workspace") + + with pytest.raises(ValueError, match="PIFS-owned generated field"): + filesystem.register_file( + storage_uri=source.as_uri(), + source_path="docs/source.txt", + folder_path="/documents", + external_id="doc_conflict", + title="Conflict document", + content=source.read_text(encoding="utf-8"), + metadata={"summary": "caller summary"}, + ) + + +def test_batch_metadata_status_generates_into_unified_metadata(tmp_path): + from pageindex.filesystem import PageIndexFileSystem + from pageindex.filesystem.metadata_generation import MetadataGenerationResult + + source = tmp_path / "source.txt" + source.write_text("fixture text", encoding="utf-8") + + class SummaryGenerator: + def generate(self, document, *, fields): + return MetadataGenerationResult(values={"summary": "Batch generated summary."}) + + filesystem = PageIndexFileSystem( + workspace=tmp_path / "workspace", + metadata_generator=SummaryGenerator(), + ) + file_ref = filesystem.register_file( + storage_uri=source.as_uri(), + source_path="docs/source.txt", + folder_path="/documents", + external_id="doc_batch", + title="Batch document", + content=source.read_text(encoding="utf-8"), + metadata={"department": "ops"}, + metadata_policy={ + "batch": True, + "fields": { + "summary": True, + "doc_type": False, + "domain": False, + "topic": False, + }, + }, + ) + + before = filesystem.store.get_file(file_ref) + assert "summary" not in before.metadata + assert before.metadata_status["fields"]["summary"]["status"] == "pending_submit" + + result = filesystem.batch_generate() + after = filesystem.store.get_file(file_ref) + + assert result["generated"] == 1 + assert after.metadata["summary"] == "Batch generated summary." + assert after.metadata["department"] == "ops" + assert after.metadata_status["fields"]["summary"]["status"] == "generated" + + +def test_v4_metadata_columns_migrate_to_unified_metadata_status(tmp_path): + from pageindex.filesystem import PageIndexFileSystem + + source = tmp_path / "source.txt" + source.write_text("fixture text", encoding="utf-8") + workspace = tmp_path / "workspace" + filesystem = PageIndexFileSystem(workspace=workspace) + file_ref = filesystem.register_file( + storage_uri=source.as_uri(), + source_path="docs/source.txt", + folder_path="/documents", + external_id="doc_migrate", + title="Migrated document", + content=source.read_text(encoding="utf-8"), + metadata={"department": "ops"}, + ) + + with filesystem.store.connect() as conn: + conn.execute( + "ALTER TABLE files ADD COLUMN derived_metadata_json TEXT NOT NULL DEFAULT '{}'" + ) + conn.execute( + "ALTER TABLE files ADD COLUMN metadata_generation_json TEXT NOT NULL DEFAULT '{}'" + ) + conn.execute( + """ + UPDATE files + SET metadata_json = ?, + derived_metadata_json = ?, + metadata_generation_json = ?, + metadata_status_json = '{}' + WHERE file_ref = ? + """, + ( + '{"department":"ops","summary":"raw summary"}', + '{"summary":"generated summary"}', + '{"status":"generated","fields":{"summary":{"requested":true,"status":"generated"}}}', + file_ref, + ), + ) + conn.execute("PRAGMA user_version = 4") + + migrated = PageIndexFileSystem(workspace=workspace) + entry = migrated.store.get_file(file_ref) + + assert entry.metadata["department"] == "ops" + assert entry.metadata["summary"] == "generated summary" + assert entry.metadata_status["status"] == "generated" + assert entry.metadata_status["fields"]["summary"]["owner"] == "pifs" + assert entry.metadata_status["fields"]["summary"]["source"] == "llm" def test_find_maxdepth_zero_type_directory_returns_start_folder(tmp_path): diff --git a/tests/test_semantic_index.py b/tests/test_semantic_index.py index a500d9b..a06641e 100644 --- a/tests/test_semantic_index.py +++ b/tests/test_semantic_index.py @@ -51,3 +51,39 @@ def test_sqlite_vec_semantic_index_round_trip(tmp_path): filters={"source_type": "slack"}, ) assert [item.external_id for item in filtered] == ["doc_b"] + + +def test_summary_projection_indexes_unified_metadata_summary(tmp_path): + from pageindex.filesystem.projection_indexing import SummaryProjectionIndexer + + class FakeEmbedder: + def embed(self, texts): + return [[1.0, 0.0, 0.0] for _ in texts] + + indexer = SummaryProjectionIndexer( + tmp_path / "projection", + embedder=FakeEmbedder(), + embedding_provider="test", + embedding_model="fake", + embedding_dimensions=3, + ) + + result = indexer.upsert_summary( + { + "file_ref": "file_a", + "external_id": "doc_a", + "source_type": "documents", + "source_path": "docs/a.pdf", + "title": "A", + "metadata": { + "summary": "Unified metadata summary.", + "department": "ops", + }, + } + ) + + assert result["status"] == "ready" + hits = indexer.index.search([1.0, 0.0, 0.0], limit=1) + assert hits[0].external_id == "doc_a" + assert hits[0].metadata["summary"] == "Unified metadata summary." + assert hits[0].metadata["department"] == "ops"