mirror of
https://github.com/VectifyAI/PageIndex.git
synced 2026-06-12 19:55:17 +02:00
361 lines
14 KiB
Python
361 lines
14 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Protocol
|
|
|
|
|
|
CANDIDATE_FIELDS = ("domain", "topic")
|
|
MEMBERSHIP_LIMIT = 3
|
|
SEGMENT_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{0,127}$")
|
|
|
|
|
|
class SemanticFolderPlanError(ValueError):
|
|
pass
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SemanticFolderBuildItem:
|
|
item_id: str
|
|
title: str
|
|
summary: str
|
|
domain: Any = None
|
|
topic: Any = None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SemanticFolderMembership:
|
|
item_id: str
|
|
file_ref: str
|
|
relative_path: str
|
|
confidence: float | None = None
|
|
canonical_segments: list[dict[str, str]] = field(default_factory=list)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SemanticFolderValidatedPlan:
|
|
template: list[str]
|
|
canonical_values: list[dict[str, str]]
|
|
memberships: list[SemanticFolderMembership]
|
|
skipped: list[dict[str, str]]
|
|
raw_plan: dict[str, Any]
|
|
|
|
|
|
class SemanticFolderPlanner(Protocol):
|
|
def plan(self, payload: dict[str, Any]) -> dict[str, Any]:
|
|
...
|
|
|
|
|
|
class OpenAISemanticFolderPlanner:
|
|
def __init__(
|
|
self,
|
|
*,
|
|
model: str | None = None,
|
|
base_url: str | None = None,
|
|
):
|
|
self.model = (
|
|
model
|
|
or os.environ.get("PIFS_SEMANTIC_FOLDER_MODEL")
|
|
or os.environ.get("PIFS_METADATA_MODEL")
|
|
or "gpt-5-nano"
|
|
)
|
|
self.base_url = (
|
|
base_url
|
|
if base_url is not None
|
|
else os.environ.get("PIFS_METADATA_BASE_URL") or os.environ.get("OPENAI_BASE_URL")
|
|
)
|
|
|
|
def plan(self, payload: dict[str, Any]) -> dict[str, Any]:
|
|
api_key = (
|
|
os.environ.get("PIFS_SEMANTIC_FOLDER_API_KEY")
|
|
or os.environ.get("PIFS_METADATA_API_KEY")
|
|
or os.environ.get("OPENAI_API_KEY")
|
|
)
|
|
if not api_key:
|
|
raise SemanticFolderPlanError(
|
|
"PIFS_SEMANTIC_FOLDER_API_KEY, PIFS_METADATA_API_KEY, or OPENAI_API_KEY "
|
|
"is required for PIFS Semantic Folder planning"
|
|
)
|
|
|
|
from openai import OpenAI
|
|
|
|
client = OpenAI(api_key=api_key, base_url=self.base_url or None)
|
|
response = client.chat.completions.create(
|
|
model=self.model,
|
|
messages=[
|
|
{
|
|
"role": "system",
|
|
"content": (
|
|
"Plan a PIFS Semantic Folder from document-level metadata. "
|
|
"Use only the provided transient item ids, title, summary, domain, and topic. "
|
|
"Do not infer from storage paths or original folders. "
|
|
"Choose a useful field/value folder template using domain and topic, "
|
|
"canonicalize display values, provide path-safe slugs, and reduce each "
|
|
"document to at most three semantic memberships. Return strict JSON only."
|
|
),
|
|
},
|
|
{
|
|
"role": "user",
|
|
"content": json.dumps(payload, ensure_ascii=False),
|
|
},
|
|
],
|
|
response_format=self._response_format(),
|
|
)
|
|
return json.loads(response.choices[0].message.content or "{}")
|
|
|
|
@staticmethod
|
|
def _response_format() -> dict[str, Any]:
|
|
return {
|
|
"type": "json_schema",
|
|
"json_schema": {
|
|
"name": "pifs_semantic_folder_plan",
|
|
"strict": True,
|
|
"schema": {
|
|
"type": "object",
|
|
"additionalProperties": False,
|
|
"required": ["template", "canonical_values", "memberships", "skipped"],
|
|
"properties": {
|
|
"template": {
|
|
"type": "array",
|
|
"items": {"type": "string", "enum": list(CANDIDATE_FIELDS)},
|
|
},
|
|
"canonical_values": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "object",
|
|
"additionalProperties": False,
|
|
"required": ["field", "display", "slug"],
|
|
"properties": {
|
|
"field": {"type": "string", "enum": list(CANDIDATE_FIELDS)},
|
|
"display": {"type": "string"},
|
|
"slug": {"type": "string"},
|
|
},
|
|
},
|
|
},
|
|
"memberships": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "object",
|
|
"additionalProperties": False,
|
|
"required": ["item_id", "paths"],
|
|
"properties": {
|
|
"item_id": {"type": "string"},
|
|
"paths": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
},
|
|
"confidence": {"type": ["number", "null"]},
|
|
},
|
|
},
|
|
},
|
|
"skipped": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "object",
|
|
"additionalProperties": False,
|
|
"required": ["item_id", "reason"],
|
|
"properties": {
|
|
"item_id": {"type": "string"},
|
|
"reason": {"type": "string"},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
def semantic_mount_path(source_scope: str) -> str:
|
|
source_scope = _normalize_path(source_scope)
|
|
return "/semantic" if source_scope == "/" else f"{source_scope}/semantic"
|
|
|
|
|
|
def validate_semantic_folder_plan(
|
|
plan: dict[str, Any],
|
|
*,
|
|
item_file_refs: dict[str, str],
|
|
) -> SemanticFolderValidatedPlan:
|
|
if not isinstance(plan, dict):
|
|
raise SemanticFolderPlanError("Semantic Folder planner returned a non-object plan")
|
|
template = _validate_template(plan.get("template"))
|
|
canonical_values = _validate_canonical_values(plan.get("canonical_values"))
|
|
canonical_lookup = {
|
|
(item["field"], item["slug"]): item for item in canonical_values
|
|
}
|
|
memberships: list[SemanticFolderMembership] = []
|
|
seen_item_paths: set[tuple[str, str]] = set()
|
|
per_item_count: dict[str, int] = {}
|
|
for item in _required_list(plan.get("memberships"), "memberships"):
|
|
if not isinstance(item, dict):
|
|
raise SemanticFolderPlanError("Semantic Folder membership entries must be objects")
|
|
item_id = str(item.get("item_id") or "").strip()
|
|
if item_id not in item_file_refs:
|
|
raise SemanticFolderPlanError(f"Unknown Semantic Folder build item: {item_id}")
|
|
paths = item.get("paths")
|
|
if not isinstance(paths, list):
|
|
raise SemanticFolderPlanError(f"Semantic Folder membership {item_id} paths must be a list")
|
|
confidence = _optional_float(item.get("confidence"))
|
|
for raw_path in paths:
|
|
relative_path, canonical_segments = _validate_membership_path(
|
|
raw_path,
|
|
template=template,
|
|
canonical_lookup=canonical_lookup,
|
|
)
|
|
key = (item_id, relative_path)
|
|
if key in seen_item_paths:
|
|
raise SemanticFolderPlanError(
|
|
f"Duplicate Semantic Folder membership for {item_id}: {relative_path}"
|
|
)
|
|
seen_item_paths.add(key)
|
|
per_item_count[item_id] = per_item_count.get(item_id, 0) + 1
|
|
if per_item_count[item_id] > MEMBERSHIP_LIMIT:
|
|
raise SemanticFolderPlanError(
|
|
f"Semantic Folder membership limit exceeded for {item_id}: "
|
|
f"max {MEMBERSHIP_LIMIT}"
|
|
)
|
|
memberships.append(
|
|
SemanticFolderMembership(
|
|
item_id=item_id,
|
|
file_ref=item_file_refs[item_id],
|
|
relative_path=relative_path,
|
|
confidence=confidence,
|
|
canonical_segments=canonical_segments,
|
|
)
|
|
)
|
|
skipped = _validate_skipped(plan.get("skipped"), item_file_refs)
|
|
if not memberships:
|
|
raise SemanticFolderPlanError("No useful Semantic Folder hierarchy was planned")
|
|
return SemanticFolderValidatedPlan(
|
|
template=template,
|
|
canonical_values=canonical_values,
|
|
memberships=memberships,
|
|
skipped=skipped,
|
|
raw_plan=plan,
|
|
)
|
|
|
|
|
|
def _validate_template(value: Any) -> list[str]:
|
|
if not isinstance(value, list) or not value:
|
|
raise SemanticFolderPlanError("Semantic Folder plan template must select at least one field")
|
|
template: list[str] = []
|
|
for field in value:
|
|
field = str(field)
|
|
if field not in CANDIDATE_FIELDS:
|
|
raise SemanticFolderPlanError(f"Unsupported Semantic Folder field: {field}")
|
|
if field in template:
|
|
raise SemanticFolderPlanError(f"Duplicate Semantic Folder template field: {field}")
|
|
template.append(field)
|
|
return template
|
|
|
|
|
|
def _validate_canonical_values(value: Any) -> list[dict[str, str]]:
|
|
rows = _required_list(value, "canonical_values")
|
|
seen_slug: dict[tuple[str, str], str] = {}
|
|
canonical_values: list[dict[str, str]] = []
|
|
for row in rows:
|
|
if not isinstance(row, dict):
|
|
raise SemanticFolderPlanError("Semantic Folder canonical values must be objects")
|
|
field = str(row.get("field") or "").strip()
|
|
display = str(row.get("display") or "").strip()
|
|
slug = str(row.get("slug") or "").strip()
|
|
if field not in CANDIDATE_FIELDS:
|
|
raise SemanticFolderPlanError(f"Unsupported Semantic Folder canonical field: {field}")
|
|
if not display:
|
|
raise SemanticFolderPlanError("Semantic Folder canonical display value is required")
|
|
_validate_segment(slug, label=f"{field} slug")
|
|
key = (field, slug)
|
|
previous = seen_slug.get(key)
|
|
if previous is not None and previous != display:
|
|
raise SemanticFolderPlanError(
|
|
f"Semantic Folder segment collision for {field}/{slug}: "
|
|
f"{previous!r} and {display!r}"
|
|
)
|
|
seen_slug[key] = display
|
|
canonical_values.append({"field": field, "display": display, "slug": slug})
|
|
return canonical_values
|
|
|
|
|
|
def _validate_membership_path(
|
|
value: Any,
|
|
*,
|
|
template: list[str],
|
|
canonical_lookup: dict[tuple[str, str], dict[str, str]],
|
|
) -> tuple[str, list[dict[str, str]]]:
|
|
raw_path = str(value or "").strip()
|
|
if not raw_path:
|
|
raise SemanticFolderPlanError("Semantic Folder membership path is required")
|
|
if raw_path.startswith("/"):
|
|
raise SemanticFolderPlanError(f"Semantic Folder membership path must be relative: {raw_path}")
|
|
parts = raw_path.split("/")
|
|
if len(parts) % 2:
|
|
raise SemanticFolderPlanError(
|
|
f"Semantic Folder membership path must use field/value segments: {raw_path}"
|
|
)
|
|
canonical_segments: list[dict[str, str]] = []
|
|
fields = parts[0::2]
|
|
values = parts[1::2]
|
|
if fields != template[: len(fields)]:
|
|
raise SemanticFolderPlanError(
|
|
f"Semantic Folder membership path does not match selected template: {raw_path}"
|
|
)
|
|
for field, slug in zip(fields, values):
|
|
_validate_segment(field, label="field segment")
|
|
_validate_segment(slug, label=f"{field} value segment")
|
|
if field not in CANDIDATE_FIELDS:
|
|
raise SemanticFolderPlanError(f"Unsupported Semantic Folder field segment: {field}")
|
|
canonical = canonical_lookup.get((field, slug))
|
|
if canonical is None:
|
|
raise SemanticFolderPlanError(
|
|
f"Semantic Folder path uses undeclared canonical value: {field}/{slug}"
|
|
)
|
|
canonical_segments.append(canonical)
|
|
return "/".join(parts), canonical_segments
|
|
|
|
|
|
def _validate_segment(segment: str, *, label: str) -> None:
|
|
if not segment or segment in {".", ".."}:
|
|
raise SemanticFolderPlanError(f"Unsafe Semantic Folder {label}: {segment!r}")
|
|
if "/" in segment or "\\" in segment or "=" in segment:
|
|
raise SemanticFolderPlanError(f"Unsafe Semantic Folder {label}: {segment!r}")
|
|
if segment.lower() in {"unknown", "misc", "uncategorized"}:
|
|
raise SemanticFolderPlanError(
|
|
f"Semantic Folder plan must skip missing values instead of using {segment!r}"
|
|
)
|
|
if not SEGMENT_RE.fullmatch(segment):
|
|
raise SemanticFolderPlanError(f"Unsafe Semantic Folder {label}: {segment!r}")
|
|
|
|
|
|
def _validate_skipped(value: Any, item_file_refs: dict[str, str]) -> list[dict[str, str]]:
|
|
skipped: list[dict[str, str]] = []
|
|
for row in _required_list(value, "skipped"):
|
|
if not isinstance(row, dict):
|
|
raise SemanticFolderPlanError("Semantic Folder skipped entries must be objects")
|
|
item_id = str(row.get("item_id") or "").strip()
|
|
if item_id not in item_file_refs:
|
|
raise SemanticFolderPlanError(f"Unknown skipped Semantic Folder build item: {item_id}")
|
|
reason = str(row.get("reason") or "").strip() or "skipped"
|
|
skipped.append({"item_id": item_id, "reason": reason})
|
|
return skipped
|
|
|
|
|
|
def _required_list(value: Any, name: str) -> list[Any]:
|
|
if not isinstance(value, list):
|
|
raise SemanticFolderPlanError(f"Semantic Folder plan {name} must be a list")
|
|
return value
|
|
|
|
|
|
def _optional_float(value: Any) -> float | None:
|
|
if value is None:
|
|
return None
|
|
try:
|
|
return float(value)
|
|
except (TypeError, ValueError) as exc:
|
|
raise SemanticFolderPlanError("Semantic Folder confidence must be numeric") from exc
|
|
|
|
|
|
def _normalize_path(path: str) -> str:
|
|
parts = [part for part in str(path or "/").replace("\\", "/").split("/") if part and part != "."]
|
|
return "/" + "/".join(parts) if parts else "/"
|