feat(filesystem): add PageIndex filesystem shell

This commit is contained in:
BukeLy 2026-05-26 01:41:57 +08:00
parent 7592163e2a
commit 74d0600261
24 changed files with 11373 additions and 4 deletions

1
.gitignore vendored
View file

@ -4,3 +4,4 @@ __pycache__
.env*
.venv/
logs/
examples/pifs_workspace/

752
examples/pifs_demo.py Normal file
View file

@ -0,0 +1,752 @@
"""
PageIndex FileSystem (PIFS) agent demo.
This mirrors examples/agentic_vectorless_rag_demo.py, but exposes a corpus
through the PageIndex FileSystem shell instead of direct PageIndex document
tools. The agent receives one read-only bash-like PIFS tool and must retrieve
evidence through commands such as ls, tree, find, grep, search-summary,
cat --structure, cat --page, and cat --node.
The demo uses PDFs under examples/documents. When a matching
examples/documents/results/*_structure.json file exists, it is loaded into the
PIFS workspace's PageIndexClient cache so register() does not rebuild the tree.
Requirements:
pip install openai-agents
Example:
python examples/pifs_demo.py --stream-mode all --verbose
"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import re
import shutil
import sys
import time
from pathlib import Path
from typing import Any
import PyPDF2
sys.path.insert(0, str(Path(__file__).parent.parent))
# Keep the local demo quiet in offline environments.
os.environ.setdefault("LITELLM_LOCAL_MODEL_COST_MAP", "true")
from pageindex import PageIndexClient
from pageindex.filesystem import OpenAIMetadataGenerator, PageIndexFileSystem, PIFSCommandExecutor
from pageindex.filesystem.agent import run_pifs_agent
EXAMPLES_DIR = Path(__file__).parent
DOCUMENTS_DIR = EXAMPLES_DIR / "documents"
WORKSPACE = EXAMPLES_DIR / "pifs_workspace"
DEFAULT_MODEL = os.environ.get("PIFS_DEMO_MODEL", "gpt-5.4-mini")
DEFAULT_QUESTION = (
"Use the PIFS workspace to find the Federal Reserve annual report. "
"Which section covers supervision and regulation, and what page range "
"should I inspect? Cite the document and evidence you used."
)
PIFS_DEMO_AGENT_PROMPT = """
You are a PageIndex FileSystem retrieval agent for a local demo workspace.
Use only the bash tool. It is a read-only PIFS virtual shell, not a real OS
shell. The workspace contains registered example PDFs.
Retrieval strategy:
- Start with ls or tree to understand the workspace.
- Use refs exactly as listed, such as ref_1, or use a concrete file path from
ls output. Do not invent paths like /documents/ref_1.
- Use search-summary when available to find likely documents.
Quote multi-word queries and include a path, for example:
search-summary "Federal Reserve supervision regulation" /documents
- Use find --where only with JSON metadata DSL, for example:
find /documents --where '{"file_format":"pdf"}'
- Use grep -R only for lexical evidence; do not treat semantic candidates as
literal matches.
- Run one evidence command at a time. Do not chain large commands like
cat --structure, grep, and cat --page in one bash call.
- For PDFs, use cat --structure <ref> to inspect the PageIndex tree, then
cat --page <range> <ref> for evidence, for example:
cat --page 31-35 ref_1
- For page-range questions, use cat --structure to identify the full section
range. Then run cat --page on the smallest useful evidence range, usually the
section start page or first 1-2 pages, before the final answer. Do not print
a broad multi-page section unless the user asks to read the whole section.
- Do not use cat --all on PDFs.
- Answer only from PIFS tool output and cite file refs or document ids.
"""
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run a PIFS document retrieval agent demo.")
parser.add_argument("--workspace", type=Path, default=WORKSPACE)
parser.add_argument("--documents-dir", type=Path, default=DOCUMENTS_DIR)
parser.add_argument(
"--document",
action="append",
default=[],
help="Specific document filename or path to register. May be repeated.",
)
parser.add_argument(
"--max-docs",
type=int,
default=0,
help="Limit number of cached example documents to register. 0 means all.",
)
parser.add_argument("--reset", action="store_true", help="Delete and rebuild the demo workspace.")
parser.add_argument(
"--prepare-only",
action="store_true",
help="Register documents and print PIFS smoke commands without running the agent.",
)
parser.add_argument("--question", default=DEFAULT_QUESTION)
parser.add_argument("--model", default=DEFAULT_MODEL)
parser.add_argument(
"--metadata-model",
default=os.environ.get("PIFS_METADATA_MODEL", "gpt-5-nano"),
help="OpenAI or OpenAI-compatible model used for register-time metadata.",
)
parser.add_argument("--stream-mode", default="all", choices=["off", "tools", "model", "all"])
parser.add_argument("--verbose", action="store_true")
parser.add_argument("--max-turns", type=int, default=12)
parser.add_argument("--max-seconds", type=float, default=90)
parser.add_argument("--reasoning-effort", default=None)
parser.add_argument("--reasoning-summary", default="auto")
parser.add_argument(
"--embedding-model",
default=os.environ.get("PIFS_DEMO_EMBEDDING_MODEL", "text-embedding-3-small"),
help="OpenAI embedding model used for register-time summary projection.",
)
parser.add_argument("--embedding-dimensions", type=int, default=256)
return parser.parse_args()
def require_openai_environment() -> None:
if os.environ.get("OPENAI_API_KEY"):
return
raise RuntimeError(
"OPENAI_API_KEY is required for this demo: register() generates real "
"PIFS metadata and the agent uses the OpenAI Agents SDK. Source your "
".env or export OPENAI_API_KEY before running."
)
def discover_cached_documents(documents_dir: Path) -> list[Path]:
results_dir = documents_dir / "results"
paths: list[Path] = []
for structure_path in sorted(results_dir.glob("*_structure.json")):
stem = structure_path.name.removesuffix("_structure.json")
for suffix in (".pdf", ".md", ".markdown"):
candidate = documents_dir / f"{stem}{suffix}"
if candidate.exists():
paths.append(candidate)
break
return paths
def resolve_requested_documents(documents_dir: Path, requested: list[str]) -> list[Path]:
if not requested:
return discover_cached_documents(documents_dir)
paths: list[Path] = []
for item in requested:
path = Path(item).expanduser()
if not path.is_absolute():
path = documents_dir / path
if not path.exists():
raise FileNotFoundError(f"document not found: {path}")
paths.append(path)
return paths
def structure_path_for(document_path: Path, documents_dir: Path) -> Path | None:
path = documents_dir / "results" / f"{document_path.stem}_structure.json"
return path if path.exists() else None
def deterministic_doc_id(document_path: Path) -> str:
digest = hashlib.sha1(str(document_path.resolve()).encode("utf-8")).hexdigest()[:16]
return f"pifs_demo_{digest}"
def read_pdf_pages(document_path: Path) -> list[dict[str, Any]]:
pages: list[dict[str, Any]] = []
with document_path.open("rb") as handle:
reader = PyPDF2.PdfReader(handle)
for page_num, page in enumerate(reader.pages, 1):
pages.append({"page": page_num, "content": page.extract_text() or ""})
return pages
def load_structure_json(structure_path: Path) -> dict[str, Any]:
with structure_path.open("r", encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict) or not isinstance(payload.get("structure"), list):
raise ValueError(f"invalid PageIndex structure cache: {structure_path}")
return payload
def seed_pageindex_cache(
filesystem: PageIndexFileSystem,
document_path: Path,
*,
documents_dir: Path,
) -> str | None:
structure_path = structure_path_for(document_path, documents_dir)
if structure_path is None:
return None
filesystem.pageindex_client_workspace.mkdir(parents=True, exist_ok=True)
meta_path = filesystem.pageindex_client_workspace / "_meta.json"
if not meta_path.exists():
meta_path.write_text("{}", encoding="utf-8")
client = PageIndexClient(workspace=str(filesystem.pageindex_client_workspace))
canonical_path = str(document_path.resolve())
for doc_id, doc in client.documents.items():
if Path(str(doc.get("path") or "")).resolve(strict=False) == Path(canonical_path):
return doc_id
payload = load_structure_json(structure_path)
doc_id = deterministic_doc_id(document_path)
suffix = document_path.suffix.lower()
if suffix == ".pdf":
pages = read_pdf_pages(document_path)
client.documents[doc_id] = {
"id": doc_id,
"type": "pdf",
"path": canonical_path,
"doc_name": payload.get("doc_name") or document_path.name,
"doc_description": payload.get("doc_description") or "",
"page_count": len(pages),
"structure": payload["structure"],
"pages": pages,
}
elif suffix in {".md", ".markdown"}:
text = document_path.read_text(encoding="utf-8")
client.documents[doc_id] = {
"id": doc_id,
"type": "md",
"path": canonical_path,
"doc_name": payload.get("doc_name") or document_path.name,
"doc_description": payload.get("doc_description") or "",
"line_count": len(text.splitlines()),
"structure": payload["structure"],
}
else:
return None
client._save_doc(doc_id)
return doc_id
def content_type_for(path: Path) -> str:
suffix = path.suffix.lower()
if suffix == ".pdf":
return "application/pdf"
if suffix in {".md", ".markdown"}:
return "text/markdown"
return "text/plain"
def external_id_for(path: Path) -> str:
slug = "".join(ch.lower() if ch.isalnum() else "_" for ch in path.stem).strip("_")
slug = "_".join(part for part in slug.split("_") if part)
return f"example_{slug}"
def log_progress(message: str, *, indent: int = 0) -> None:
prefix = " " * indent
print(f"{prefix}{message}", flush=True)
def register_demo_metadata_schema(filesystem: PageIndexFileSystem) -> None:
filesystem.metadata.register_schema(
{
"fields": {
"source_collection": {
"type": "string",
"description": "Local example corpus collection.",
},
"file_format": {
"type": "string",
"description": "Source file extension without the leading dot.",
},
}
},
source="demo",
)
def backfill_registered_metadata_values(filesystem: PageIndexFileSystem, file_ref: str) -> None:
entry = filesystem.store.get_file(file_ref)
indexed_metadata = dict(entry.metadata or {})
indexed_metadata.update(entry.derived_metadata or {})
with filesystem.store.connect() as conn:
filesystem.store.replace_metadata_values(conn, file_ref, indexed_metadata)
def configure_summary_projection_backend(
filesystem: PageIndexFileSystem,
*,
embedding_model: str,
embedding_dimensions: int,
) -> None:
if not (filesystem.summary_projection_index_dir / "summary_only_vector.sqlite").exists():
return
filesystem.configure_hybrid_projection_retrieval(
filesystem.summary_projection_index_dir,
embedding_provider="openai",
embedding_model=embedding_model,
embedding_dimensions=embedding_dimensions,
)
def has_ready_register_outputs(filesystem: PageIndexFileSystem, external_id: str) -> bool:
try:
file_ref = filesystem.store.resolve_file_ref(external_id)
entry = filesystem.store.get_file(file_ref)
except KeyError:
return False
generation = entry.metadata_generation or {}
fields = generation.get("fields") or {}
required = ("summary", "doc_type", "domain", "topic")
if any(fields.get(field, {}).get("status") != "generated" for field in required):
return False
summary_projection = (generation.get("projection_indexes") or {}).get("summary") or {}
return summary_projection.get("status") == "ready"
def register_documents(
filesystem: PageIndexFileSystem,
documents: list[Path],
*,
documents_dir: Path,
) -> list[dict[str, Any]]:
registered: list[dict[str, Any]] = []
total = len(documents)
for index, document_path in enumerate(documents, 1):
document_path = document_path.resolve()
external_id = external_id_for(document_path)
log_progress(f"[{index}/{total}] {document_path.name}")
log_progress("PageIndex tree cache: checking examples/documents/results", indent=1)
cache_started = time.perf_counter()
cached_doc_id = seed_pageindex_cache(
filesystem,
document_path,
documents_dir=documents_dir,
)
cache_seconds = time.perf_counter() - cache_started
if cached_doc_id:
log_progress(
f"PageIndex tree cache: ready doc_id={cached_doc_id} ({cache_seconds:.2f}s)",
indent=1,
)
else:
log_progress(
f"PageIndex tree cache: no cached structure; register() will index if supported ({cache_seconds:.2f}s)",
indent=1,
)
if has_ready_register_outputs(filesystem, external_id):
file_ref = filesystem.store.resolve_file_ref(external_id)
backfill_registered_metadata_values(filesystem, file_ref)
log_progress(
f"PIFS register: cached file_ref={file_ref}; metadata and summary projection already ready",
indent=1,
)
registered.append(
{
"file_ref": file_ref,
"external_id": external_id,
"path": str(document_path),
"status": "cached",
"pageindex_doc_id": cached_doc_id,
}
)
continue
log_progress(
"PIFS register: running register() -> metadata generation -> summary embedding -> sqlite upsert",
indent=1,
)
register_started = time.perf_counter()
file_ref = filesystem.register(
storage_uri=document_path.as_uri(),
source_path=str(document_path),
folder_path="/documents",
external_id=external_id,
title=document_path.name,
content_type=content_type_for(document_path),
source_type="examples-documents",
metadata={
"title": document_path.name,
"source_collection": "examples/documents",
"file_format": document_path.suffix.lower().lstrip("."),
},
)
register_seconds = time.perf_counter() - register_started
entry = filesystem.store.get_file(file_ref)
field_status = {
field: state.get("status")
for field, state in (entry.metadata_generation.get("fields") or {}).items()
}
summary_projection = (
entry.metadata_generation.get("projection_indexes", {}).get("summary", {})
)
log_progress(
f"PIFS register: done file_ref={file_ref} ({register_seconds:.2f}s)",
indent=1,
)
log_progress(
f"metadata: {entry.metadata_generation.get('status', 'unknown')} fields={field_status}",
indent=1,
)
log_progress(
"summary projection: "
f"{summary_projection.get('status', 'not_requested')} "
f"index={summary_projection.get('index_path', '')}",
indent=1,
)
registered.append(
{
"file_ref": file_ref,
"external_id": external_id,
"path": str(document_path),
"status": entry.metadata_generation.get("status", "unknown"),
"pageindex_tree_status": entry.pageindex_tree_status,
"pageindex_doc_id": entry.pageindex_doc_id,
}
)
return registered
def print_section(title: str) -> None:
print("\n" + "#" * 78, flush=True)
print(f"# {title}", flush=True)
print("#" * 78, flush=True)
def print_step(title: str, detail: str = "") -> None:
print(f"\n>>> {title}", flush=True)
if detail:
print(f" {detail}", flush=True)
def sanitize_preview_text(text: str) -> str:
cleaned = str(text).replace("\r", "\n").replace("\f", "\n")
cleaned = "".join(
ch if ch == "\n" or ch == "\t" or ord(ch) >= 32 else " "
for ch in cleaned
)
return "\n".join(
re.sub(r"[ \t]{2,}", " ", line).strip()
for line in cleaned.splitlines()
)
def compact_lines(text: str, *, max_lines: int = 6, max_chars: int = 900) -> str:
lines = [line for line in sanitize_preview_text(text).splitlines() if line.strip()]
preview = "\n".join(lines[:max_lines])
if len(preview) > max_chars:
preview = preview[:max_chars].rstrip() + "..."
omitted = len(lines) - min(len(lines), max_lines)
if omitted > 0:
preview += f"\n ... {omitted} more lines"
return preview
def find_structure_node(structure: Any, title_fragment: str) -> dict[str, Any] | None:
if isinstance(structure, list):
for item in structure:
found = find_structure_node(item, title_fragment)
if found:
return found
return None
if not isinstance(structure, dict):
return None
if title_fragment.lower() in str(structure.get("title", "")).lower():
return structure
return find_structure_node(structure.get("nodes", []), title_fragment)
def page_range_for_node(node: dict[str, Any] | None) -> str:
if not node:
return ""
ranges: list[tuple[int, int]] = []
def collect(item: Any) -> None:
if not isinstance(item, dict):
return
start = item.get("start_index")
end = item.get("end_index")
if isinstance(start, int) and isinstance(end, int):
ranges.append((start, end))
for child in item.get("nodes") or []:
collect(child)
collect(node)
if not ranges:
return ""
start = min(item[0] for item in ranges)
end = max(item[1] for item in ranges)
return f"{start}-{end}" if start != end else str(start)
def opening_page_range_for_node(node: dict[str, Any] | None, *, max_pages: int = 2) -> str:
if not node:
return ""
ranges: list[tuple[int, int]] = []
def collect(item: Any) -> None:
if not isinstance(item, dict):
return
start = item.get("start_index")
end = item.get("end_index")
if isinstance(start, int) and isinstance(end, int):
ranges.append((start, end))
for child in item.get("nodes") or []:
collect(child)
collect(node)
if not ranges:
return ""
start = min(item[0] for item in ranges)
end = max(item[1] for item in ranges)
preview_end = min(end, start + max_pages - 1)
return f"{start}-{preview_end}" if start != preview_end else str(start)
def execute_json_command(executor: PIFSCommandExecutor, command: str) -> dict[str, Any]:
try:
return json.loads(executor.execute(command))
except Exception as exc:
return {"ok": False, "error": str(exc), "data": None}
def show_capability(
*,
label: str,
command: str,
result: str,
raw: str = "",
verbose: bool = False,
) -> None:
print_step(label, command)
print(f" result: {result}", flush=True)
if verbose and raw:
print(" raw:", flush=True)
print(compact_lines(raw, max_lines=10, max_chars=1600), flush=True)
def show_registered_documents(registered: list[dict[str, Any]], *, verbose: bool = False) -> None:
print(f"\nRegistered {len(registered)} document(s):", flush=True)
for item in registered:
print(
" - "
f"{Path(str(item.get('path', ''))).name}: "
f"file_ref={item.get('file_ref')} | "
f"status={item.get('status')} | "
f"pageindex_doc_id={item.get('pageindex_doc_id')}",
flush=True,
)
if verbose:
print("\nRaw registration records:", flush=True)
print(json.dumps(registered, ensure_ascii=False, indent=2), flush=True)
def run_smoke_commands(
filesystem: PageIndexFileSystem,
registered: list[dict[str, Any]],
*,
verbose: bool = False,
) -> None:
json_executor = PIFSCommandExecutor(filesystem, json_output=True)
shell_executor = PIFSCommandExecutor(filesystem, json_output=False)
command = "tree / --depth 2"
tree = execute_json_command(json_executor, command)
folders = (tree.get("data") or {}).get("folders") or []
documents_folder = next((item for item in folders if item.get("path") == "/documents"), {})
show_capability(
label="Folder browse",
command=command,
result=f"/documents contains {documents_folder.get('file_count', len(registered))} files",
raw=shell_executor.execute(command) if verbose else "",
verbose=verbose,
)
command = "ls /documents"
listing = execute_json_command(json_executor, command)
files = (listing.get("data") or {}).get("files") or []
file_titles = ", ".join(item.get("title", "") for item in files[:3])
show_capability(
label="List registered files",
command=command,
result=f"{len(files)} files: {file_titles}",
raw=shell_executor.execute(command) if verbose else "",
verbose=verbose,
)
command = "stat --schema"
schema = execute_json_command(json_executor, command)
fields = sorted(((schema.get("data") or {}).get("fields") or {}).keys())
show_capability(
label="Metadata schema",
command=command,
result=", ".join(fields),
raw=shell_executor.execute(command) if verbose else "",
verbose=verbose,
)
command = "find /documents --where '{\"source_collection\":\"examples/documents\"}' --limit 5"
found = execute_json_command(json_executor, command)
found_files = found.get("data") or []
show_capability(
label="Metadata DSL filter",
command=command,
result=f"{len(found_files)} documents matched source_collection=examples/documents",
raw=shell_executor.execute(command) if verbose else "",
verbose=verbose,
)
command = 'search-summary "Federal Reserve annual report supervision regulation section page range" /documents'
summary = execute_json_command(json_executor, command)
summary_hits = ((summary.get("data") or {}).get("data") or [])
if summary_hits:
summary_result = f"{len(summary_hits)} summary-vector candidates; top={summary_hits[0].get('external_id')}"
else:
summary_result = "summary-vector command is available, but this tiny two-doc demo returned no candidates"
show_capability(
label="Semantic summary search",
command=command,
result=summary_result,
raw=shell_executor.execute(command) if verbose else "",
verbose=verbose,
)
first_ref = registered[0]["file_ref"] if registered else None
if not first_ref:
return
command = f"stat {first_ref}"
stat = execute_json_command(json_executor, command)
stat_data = stat.get("data") or {}
show_capability(
label="File stat",
command=command,
result=(
f"{stat_data.get('title')} | tree={stat_data.get('pageindex_tree_status')} | "
f"metadata_status={(stat_data.get('metadata_generation') or {}).get('status')}"
),
raw=shell_executor.execute(command) if verbose else "",
verbose=verbose,
)
command = f"cat --structure {first_ref}"
structure_payload = execute_json_command(json_executor, command)
structure_data = structure_payload.get("data") or {}
structure = structure_data.get("structure") or []
supervision_node = find_structure_node(structure, "Supervision and Regulation")
supervision_range = page_range_for_node(supervision_node)
show_capability(
label="PageIndex document structure",
command=command,
result=(
"found section 'Supervision and Regulation'"
+ (f" with page span {supervision_range}" if supervision_range else "")
),
raw=shell_executor.execute(command) if verbose else "",
verbose=verbose,
)
evidence_range = opening_page_range_for_node(supervision_node) or "1-2"
command = f"cat --page {evidence_range} {first_ref}"
page = execute_json_command(json_executor, command)
page_text = str((page.get("data") or {}).get("text") or "")
show_capability(
label="Page evidence",
command=command,
result=compact_lines(page_text, max_lines=3, max_chars=420),
raw=shell_executor.execute(command) if verbose else "",
verbose=verbose,
)
command = 'grep -R "Supervision and Regulation" /documents'
grep = execute_json_command(json_executor, command)
grep_hits = ((grep.get("data") or {}).get("data") or [])
show_capability(
label="Lexical grep",
command=command,
result=f"{len(grep_hits)} real text matches",
raw=shell_executor.execute(command) if verbose else "",
verbose=verbose,
)
def main() -> None:
args = parse_args()
require_openai_environment()
workspace = args.workspace.expanduser()
documents_dir = args.documents_dir.expanduser()
if args.reset and workspace.exists():
shutil.rmtree(workspace)
workspace.mkdir(parents=True, exist_ok=True)
documents = resolve_requested_documents(documents_dir, args.document)
if args.max_docs > 0:
documents = documents[: args.max_docs]
if not documents:
raise RuntimeError(f"no cached example documents found under {documents_dir}")
filesystem = PageIndexFileSystem(
workspace,
metadata_generator=OpenAIMetadataGenerator(model=args.metadata_model),
summary_projection_embedding_provider="openai",
summary_projection_embedding_model=args.embedding_model,
summary_projection_embedding_dimensions=args.embedding_dimensions,
)
register_demo_metadata_schema(filesystem)
print_section("STEP 1/3 Register Documents")
print(f"Workspace: {workspace}", flush=True)
print(f"Documents: {len(documents)}", flush=True)
registered = register_documents(filesystem, documents, documents_dir=documents_dir)
configure_summary_projection_backend(
filesystem,
embedding_model=args.embedding_model,
embedding_dimensions=args.embedding_dimensions,
)
show_registered_documents(registered, verbose=args.verbose)
print_section("STEP 2/3 Explore PIFS Tool Surface")
run_smoke_commands(filesystem, registered, verbose=args.verbose)
if args.prepare_only:
return
print_section("STEP 3/3 Ask An Agent Using Only PIFS")
print(f"Question: {args.question}", flush=True)
answer = run_pifs_agent(
filesystem,
args.question,
model=args.model,
root="/",
system_prompt=PIFS_DEMO_AGENT_PROMPT,
max_turns=args.max_turns,
max_seconds=args.max_seconds,
verbose=args.verbose,
stream_mode=args.stream_mode,
reasoning_effort=args.reasoning_effort,
reasoning_summary=args.reasoning_summary,
)
if answer:
print("\nFinal answer:", flush=True)
print(answer, flush=True)
if __name__ == "__main__":
main()

View file

@ -1,4 +1,10 @@
from .page_index import *
from .page_index_md import md_to_tree
from .retrieve import get_document, get_document_structure, get_page_content
from .client import PageIndexClient
try:
from .page_index import *
from .page_index_md import md_to_tree
from .retrieve import get_document, get_document_structure, get_page_content
from .client import PageIndexClient
except ModuleNotFoundError as exc:
if exc.name != "litellm":
raise
from .filesystem import PageIndexFileSystem

View file

@ -0,0 +1,36 @@
from .commands import PIFSCommandExecutor
from .core import PageIndexFileSystem
from .hybrid_projection import HybridProjectionSearchBackend
from .metadata_generation import (
MetadataGenerationError,
MetadataGenerationInput,
MetadataGenerationResult,
MetadataGenerator,
OpenAIMetadataGenerator,
)
from .projection_indexing import SummaryProjectionIndexer
from .semantic_index import (
RebuildableSemanticIndex,
SemanticIndexRecord,
SemanticSearchResult,
SQLiteVecSemanticIndex,
)
from .types import OpenResult, SearchResult
__all__ = [
"OpenResult",
"HybridProjectionSearchBackend",
"MetadataGenerationError",
"MetadataGenerationInput",
"MetadataGenerationResult",
"MetadataGenerator",
"OpenAIMetadataGenerator",
"PIFSCommandExecutor",
"PageIndexFileSystem",
"RebuildableSemanticIndex",
"SearchResult",
"SemanticIndexRecord",
"SemanticSearchResult",
"SummaryProjectionIndexer",
"SQLiteVecSemanticIndex",
]

View file

@ -0,0 +1,514 @@
from __future__ import annotations
import asyncio
import concurrent.futures
import json
import os
import re
import sys
import time
from dataclasses import asdict, is_dataclass
from typing import Any, Mapping, TextIO
from .commands import PIFSCommandError, PIFSCommandExecutor
from .core import PageIndexFileSystem
TRUTHY_ENV_VALUES = {"1", "true", "yes", "on"}
PIFS_AGENT_TRACING_ENV = "PAGEINDEX_PIFS_AGENT_TRACING"
PIFS_AGENT_RAW_REASONING_ENV = "PAGEINDEX_PIFS_AGENT_RAW_REASONING"
AGENT_SYSTEM_PROMPT = """
You are a PageIndex FileSystem retrieval agent.
You can inspect the corpus only by calling the bash tool. The bash tool is a
read-only PageIndex virtual shell, not a real operating-system shell.
Follow the task prompt for command policy, retrieval strategy, and answer
format. If the caller needs stricter behavior, pass an explicit system_prompt.
"""
BASH_TOOL_DESCRIPTION = """
Run a command in the PageIndex FileSystem virtual shell. This is not a real
operating-system shell. By default the tool is read-only: use ls, tree, find,
grep, cat, stat, head, tail, sed, and any dynamically available semantic search
commands described in the workspace context. grep -R is lexical evidence search;
semantic search commands return candidate documents and do not guarantee literal
text matches. Errors are returned as text prefixed with ERROR. Do not call
commands that are not listed as available. When evidence is required, inspect it
with cat or grep before answering.
"""
AGENT_TOOL_POLICY = """
Tool policy:
- The bash tool is a PageIndex virtual shell, not an operating-system shell.
- The default agent tool surface is read-only.
- Use only commands listed in the workspace capabilities.
- grep -R performs lexical evidence search.
- Semantic search commands are candidate-discovery tools and do not guarantee literal text matches.
- Tool errors are returned as ERROR text; recover by trying an available command.
- Use cat or grep to gather evidence before making source-backed claims.
"""
STREAM_MODE_ALIASES = {
"": "off",
"none": "off",
"false": "off",
"0": "off",
"off": "off",
"tool": "tools",
"tools": "tools",
"model": "model",
"output": "model",
"outputs": "model",
"think": "model",
"all": "all",
"debug": "all",
}
AGENT_STREAM_MODE_CHOICES = sorted(item for item in STREAM_MODE_ALIASES if item)
REASONING_EFFORT_CHOICES = ["none", "minimal", "low", "medium", "high", "xhigh"]
REASONING_SUMMARY_CHOICES = ["none", "auto", "concise", "detailed"]
def should_use_openai_compatible_chat_model(base_url: str | None) -> bool:
if not base_url:
return False
normalized = base_url.strip().rstrip("/")
return normalized not in {"https://api.openai.com", "https://api.openai.com/v1"}
def env_flag_enabled(name: str, environ: Mapping[str, str] | None = None) -> bool:
source = os.environ if environ is None else environ
value = source.get(name, "")
return value.strip().lower() in TRUTHY_ENV_VALUES
def pifs_agent_tracing_enabled(environ: Mapping[str, str] | None = None) -> bool:
return env_flag_enabled(PIFS_AGENT_TRACING_ENV, environ)
def should_disable_pifs_agent_tracing(environ: Mapping[str, str] | None = None) -> bool:
return not pifs_agent_tracing_enabled(environ)
def pifs_agent_raw_reasoning_enabled(environ: Mapping[str, str] | None = None) -> bool:
return env_flag_enabled(PIFS_AGENT_RAW_REASONING_ENV, environ)
def normalize_reasoning_effort(reasoning_effort: str | None) -> str | None:
if reasoning_effort is None or not reasoning_effort.strip():
return None
effort = reasoning_effort.strip().lower()
if effort not in REASONING_EFFORT_CHOICES:
allowed = ", ".join(REASONING_EFFORT_CHOICES)
raise ValueError(f"Unknown reasoning effort: {reasoning_effort!r}. Allowed: {allowed}")
return effort
def normalize_reasoning_summary(reasoning_summary: str | None) -> str | None:
if reasoning_summary is None or not reasoning_summary.strip():
return None
summary = reasoning_summary.strip().lower()
if summary not in REASONING_SUMMARY_CHOICES:
allowed = ", ".join(REASONING_SUMMARY_CHOICES)
raise ValueError(f"Unknown reasoning summary: {reasoning_summary!r}. Allowed: {allowed}")
return None if summary == "none" else summary
def build_agent_model_settings(
*,
reasoning_effort: str | None = None,
reasoning_summary: str | None = None,
) -> Any | None:
effort = normalize_reasoning_effort(reasoning_effort)
summary = normalize_reasoning_summary(reasoning_summary)
if effort is None and summary is None:
return None
if effort not in {None, "none"} and summary is None:
summary = "auto"
from agents import ModelSettings
from openai.types.shared import Reasoning
reasoning_kwargs = {}
if effort is not None:
reasoning_kwargs["effort"] = effort
if summary is not None:
reasoning_kwargs["summary"] = summary
return ModelSettings(reasoning=Reasoning(**reasoning_kwargs), verbosity="low")
def normalize_agent_stream_mode(stream_mode: str | None) -> str:
mode = STREAM_MODE_ALIASES.get((stream_mode or "off").strip().lower())
if mode is None:
allowed = ", ".join(sorted({"off", "tools", "model", "all"}))
raise ValueError(f"Unknown PIFS agent stream mode: {stream_mode!r}. Allowed: {allowed}")
return mode
def serialize_agent_final_output(value: Any) -> str:
if value is None:
return ""
if isinstance(value, str):
return value
if hasattr(value, "model_dump_json"):
return value.model_dump_json()
if is_dataclass(value):
return json.dumps(asdict(value), ensure_ascii=False)
if isinstance(value, (dict, list)):
return json.dumps(value, ensure_ascii=False)
return str(value)
def compact_tool_output_preview(
output: str,
*,
preview_chars: int = 700,
max_lines: int = 8,
) -> str:
cleaned = str(output).replace("\r", "\n").replace("\f", "\n")
cleaned = "".join(
ch if ch == "\n" or ch == "\t" or ord(ch) >= 32 else " "
for ch in cleaned
)
lines = [
re.sub(r"[ \t]{2,}", " ", line).strip()
for line in cleaned.splitlines()
if line.strip()
]
is_large_result = len(cleaned) > preview_chars or len(lines) > max_lines
preview = "\n".join(lines[:max_lines])
if len(preview) > preview_chars:
preview = preview[:preview_chars].rstrip() + "..."
omitted = len(lines) - min(len(lines), max_lines)
if is_large_result:
preview = f"[large PIFS result: {len(cleaned)} chars; showing compact preview]\n" + preview
if omitted > 0:
preview += f"\n... [{omitted} more lines omitted from preview]"
if len(cleaned) > preview_chars:
preview += "\n... [full result returned to agent; terminal preview shortened]"
return preview
def build_agent_initial_context(
filesystem: PageIndexFileSystem,
*,
root: str = "/",
executor: PIFSCommandExecutor | None = None,
query_context: str | None = None,
) -> str:
executor = executor or PIFSCommandExecutor(
filesystem,
json_output=False,
query_context=query_context,
)
schema = filesystem._metadata_schema()
schema_fields = schema.get("fields", {})
schema_sample = dict(list(schema_fields.items())[:50])
return "\n".join(
[
f"Root path: {root}",
"Top-level listing:",
executor.execute(f"ls {root}"),
"Metadata schema summary:",
json.dumps(
{
"field_count": len(schema_fields),
"sample_fields": schema_sample,
},
ensure_ascii=False,
),
"Workspace retrieval capabilities:",
executor.describe_available_command_surfaces(),
]
)
def build_pifs_agent_instructions(
filesystem: PageIndexFileSystem,
*,
root: str = "/",
system_prompt: str | None = None,
executor: PIFSCommandExecutor | None = None,
query_context: str | None = None,
) -> str:
initial_context = build_agent_initial_context(
filesystem,
root=root,
executor=executor,
query_context=query_context,
)
return "\n\n".join(
[
(system_prompt or AGENT_SYSTEM_PROMPT).strip(),
AGENT_TOOL_POLICY.strip(),
"Workspace context:\n" + initial_context,
]
)
class PIFSAgentStreamObserver:
def __init__(
self,
stream_mode: str,
*,
stream_log: list[dict[str, Any]] | None = None,
output: TextIO | None = None,
include_raw_reasoning: bool | None = None,
) -> None:
self.stream_mode = normalize_agent_stream_mode(stream_mode)
self.stream_log = stream_log
self.output = output or sys.stdout
self.include_raw_reasoning = (
pifs_agent_raw_reasoning_enabled()
if include_raw_reasoning is None
else include_raw_reasoning
)
self._printed_section: str | None = None
self._buffers: dict[str, list[str]] = {
"output": [],
"think": [],
"think_summary": [],
"tool_args": [],
}
@property
def wants_model_stream(self) -> bool:
return self.stream_mode in {"model", "all"}
@property
def wants_tool_stream(self) -> bool:
return self.stream_mode in {"tools", "all"}
@property
def has_output_text(self) -> bool:
return bool(self._buffers["output"])
def handle_event(self, event: Any) -> None:
if getattr(event, "type", None) == "raw_response_event":
self._handle_raw_response_event(getattr(event, "data", None))
elif getattr(event, "type", None) == "run_item_stream_event":
self._handle_run_item_event(event)
def finish(self, final_output: Any = None) -> None:
if self.wants_model_stream and not self.has_output_text and final_output:
self._emit("output", str(final_output), "[llm final output stream]")
if self._printed_section is not None:
print(file=self.output, flush=True)
self._printed_section = None
if self.stream_log is not None:
for kind, parts in self._buffers.items():
text = "".join(parts)
if text:
self.stream_log.append({"kind": kind, "text": text})
def _handle_raw_response_event(self, data: Any) -> None:
event_type = getattr(data, "type", "")
delta = getattr(data, "delta", None)
if not isinstance(delta, str) or not delta:
return
if event_type == "response.output_text.delta":
self._emit("output", delta, "[llm final output stream]")
elif event_type == "response.reasoning_text.delta":
if self.include_raw_reasoning:
self._emit("think", delta, "[llm reasoning text stream]")
elif event_type == "response.reasoning_summary_text.delta":
self._emit("think_summary", delta, "[llm reasoning summary stream]")
elif event_type == "response.function_call_arguments.delta":
self._buffers["tool_args"].append(delta)
def _handle_run_item_event(self, event: Any) -> None:
name = getattr(event, "name", "")
item = getattr(event, "item", None)
item_type = getattr(item, "type", "")
if self.stream_log is not None and name in {"message_output_created", "reasoning_item_created"}:
self.stream_log.append({"kind": "run_item", "name": name, "item_type": item_type})
def _emit(self, kind: str, text: str, label: str) -> None:
if kind == "tool_args":
should_print = self.wants_tool_stream
else:
should_print = self.wants_model_stream
if not should_print:
return
self._buffers[kind].append(text)
if self._printed_section != kind:
if self._printed_section is not None:
print(file=self.output, flush=True)
print(f"\n{label}", file=self.output, flush=True)
self._printed_section = kind
print(text, end="", file=self.output, flush=True)
def emit_tool_call(self, command: str, *, force: bool = False) -> None:
if self.stream_log is not None:
self.stream_log.append({"kind": "tool_call", "command": command})
if not (force or self.wants_tool_stream):
return
self._start_section("tool_call", "[llm -> pifs command]")
print(command, file=self.output, flush=True)
def emit_tool_result(
self,
*,
ok: bool,
output: str,
seconds: float,
force: bool = False,
preview_chars: int = 1000,
) -> None:
if self.stream_log is not None:
self.stream_log.append(
{
"kind": "tool_result",
"ok": ok,
"seconds": round(seconds, 4),
"output_chars": len(output),
"preview": compact_tool_output_preview(output, preview_chars=preview_chars),
}
)
if not (force or self.wants_tool_stream):
return
preview = compact_tool_output_preview(output, preview_chars=preview_chars)
self._start_section("tool_result", "[pifs -> llm result preview]")
print(
f"ok={str(ok).lower()} seconds={seconds:.4f} output_chars={len(output)}",
file=self.output,
flush=True,
)
print(preview, file=self.output, flush=True)
def _start_section(self, kind: str, label: str) -> None:
if self._printed_section is not None:
print(file=self.output, flush=True)
print(f"\n{label}", file=self.output, flush=True)
self._printed_section = kind
def run_pifs_agent(
filesystem: PageIndexFileSystem,
question: str,
*,
model: str,
root: str = "/",
system_prompt: str | None = None,
max_turns: int = 20,
max_seconds: float | None = 60,
verbose: bool = False,
stream_mode: str = "off",
reasoning_effort: str | None = None,
reasoning_summary: str | None = None,
output_type: type[Any] | None = None,
tool_log: list[dict[str, Any]] | None = None,
agent_log: list[dict[str, Any]] | None = None,
) -> str:
try:
from agents import Agent, OpenAIChatCompletionsModel, Runner, function_tool, set_tracing_disabled
from openai import AsyncOpenAI
except ModuleNotFoundError as exc:
if exc.name == "agents":
raise RuntimeError("openai-agents is required to run the PageIndex FileSystem agent") from exc
raise
set_tracing_disabled(should_disable_pifs_agent_tracing())
normalized_stream_mode = normalize_agent_stream_mode(stream_mode)
executor = PIFSCommandExecutor(
filesystem,
json_output=False,
query_context=extract_agent_question_text(question),
)
observer = PIFSAgentStreamObserver(normalized_stream_mode, stream_log=agent_log)
instructions = build_pifs_agent_instructions(
filesystem,
root=root,
system_prompt=system_prompt,
executor=executor,
)
@function_tool(description_override=BASH_TOOL_DESCRIPTION.strip())
def bash(command: str) -> str:
"""Run an allowed PageIndex FileSystem virtual shell command."""
started = time.time()
ok = True
observer.emit_tool_call(command, force=verbose)
try:
output = executor.execute(command)
except PIFSCommandError as exc:
ok = False
output = f"ERROR: {exc}"
seconds = time.time() - started
if tool_log is not None:
tool_log.append(
{
"command": command,
"ok": ok,
"seconds": round(seconds, 4),
"output_chars": len(output),
"preview": output[:500],
}
)
observer.emit_tool_result(ok=ok, output=output, seconds=seconds, force=verbose)
return output
model_settings = build_agent_model_settings(
reasoning_effort=reasoning_effort,
reasoning_summary=reasoning_summary,
)
base_url = os.environ.get("OPENAI_BASE_URL")
model_config = model
if should_use_openai_compatible_chat_model(base_url):
model_config = OpenAIChatCompletionsModel(
model=model,
openai_client=AsyncOpenAI(
api_key=os.environ.get("OPENAI_API_KEY"),
base_url=base_url,
),
)
agent_kwargs: dict[str, Any] = {
"name": "PageIndexFileSystem",
"instructions": instructions,
"tools": [bash],
"model": model_config,
}
if model_settings is not None:
agent_kwargs["model_settings"] = model_settings
if output_type is not None:
agent_kwargs["output_type"] = output_type
agent = Agent(**agent_kwargs)
async def _run_streamed() -> str:
streamed_run = Runner.run_streamed(agent, question, max_turns=max_turns)
final_output = ""
try:
async for event in streamed_run.stream_events():
observer.handle_event(event)
final_output = serialize_agent_final_output(streamed_run.final_output)
return final_output
finally:
if not final_output and streamed_run.final_output:
final_output = serialize_agent_final_output(streamed_run.final_output)
observer.finish(final_output)
async def _run() -> str:
if max_seconds is None or max_seconds <= 0:
return await _run_streamed()
try:
return await asyncio.wait_for(_run_streamed(), timeout=max_seconds)
except asyncio.TimeoutError as exc:
raise TimeoutError(f"MaxSecondsExceeded: exceeded {max_seconds:g}s") from exc
try:
asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
return pool.submit(asyncio.run, _run()).result()
except RuntimeError:
return asyncio.run(_run())
def extract_agent_question_text(prompt: str) -> str:
for line in str(prompt or "").splitlines():
if line.startswith("Question:"):
value = line.split(":", 1)[1].strip()
if value:
return value
return str(prompt or "").strip()

View file

@ -0,0 +1,47 @@
from __future__ import annotations
import argparse
import os
import shlex
import sys
from pathlib import Path
from .commands import PIFSCommandError, PIFSCommandExecutor
from .core import PageIndexFileSystem
def main(argv: list[str] | None = None) -> int:
argv = list(sys.argv[1:] if argv is None else argv)
parser = argparse.ArgumentParser(description="PageIndex FileSystem CLI")
parser.add_argument("--workspace", default=os.environ.get("PIFS_WORKSPACE"))
parser.add_argument("--json", action="store_true", dest="json_output")
parser.add_argument("command", nargs=argparse.REMAINDER)
args = parser.parse_args(argv)
command_tokens = [token for token in args.command if token != "--"]
json_output = args.json_output
if "--json" in command_tokens:
command_tokens = [token for token in command_tokens if token != "--json"]
json_output = True
if not args.workspace:
parser.error("--workspace is required unless PIFS_WORKSPACE is set")
if not command_tokens:
parser.error("a filesystem command is required")
filesystem = PageIndexFileSystem(Path(args.workspace).expanduser())
executor = PIFSCommandExecutor(filesystem, json_output=json_output)
try:
command = " ".join(shlex.quote(token) for token in command_tokens)
print(executor.execute(command))
except PIFSCommandError as exc:
print(f"ERROR: {exc}", file=sys.stderr)
return 2
except Exception as exc:
print(f"ERROR: {exc}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())

File diff suppressed because it is too large Load diff

1771
pageindex/filesystem/core.py Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,662 @@
from __future__ import annotations
import hashlib
import json
import os
import re
import sqlite3
import struct
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from .semantic_index import SQLiteVecSemanticIndex, SemanticIndexError, SemanticSearchResult
INDEX_BY_CHANNEL = {
"metadata": "metadata_composite_vector",
"summary": "summary_only_vector",
"entity": "entity_vectors",
"constraint": "constraint_vectors",
"relation": "relation_vectors",
}
HYBRID_ENTITY_RELATION_CHANNELS = ("metadata", "entity", "constraint", "relation")
SEMANTIC_TOOL_CHANNELS = ("summary", "entity", "relation")
HYBRID_ENTITY_RELATION_WEIGHTS = {
"metadata": 0.25,
"entity": 0.25,
"relation": 0.30,
"constraint": 0.20,
}
@dataclass(frozen=True)
class QueryProjection:
entities: list[str]
relations: list[str]
constraints: list[str]
expected_answer_type: str = ""
@dataclass(frozen=True)
class HybridProjectionCandidate:
document_id: str
score: float
sources: list[dict[str, Any]]
source_type: str
source_path: str
title: str
metadata: dict[str, Any]
snippet: str
class HybridProjectionSearchBackend:
"""Hybrid entity/relation/vector retrieval over rebuildable projection indexes.
The SQLite catalog remains the source of truth. This backend only reads
external sqlite-vec projection indexes and returns candidate document ids
for the catalog to resolve and filter.
"""
def __init__(
self,
index_dir: str | Path,
*,
embedder: Any,
embedding_provider: str,
embedding_model: str,
embedding_dimensions: int = 256,
embedding_cache_path: str | Path | None = None,
per_channel_limit: int = 100,
fetch_multiplier: int = 100,
) -> None:
self.index_dir = Path(index_dir).expanduser()
self.embedder = embedder
self.embedding_provider = embedding_provider
self.embedding_model = embedding_model
self.embedding_dimensions = embedding_dimensions
self.cache_model = embedding_cache_model_key(embedding_model, embedding_dimensions)
self.embedding_cache = EmbeddingCache(
Path(embedding_cache_path).expanduser()
if embedding_cache_path is not None
else self.index_dir / "embedding_cache.sqlite"
)
self.per_channel_limit = per_channel_limit
self.fetch_multiplier = fetch_multiplier
self.indexes = {
channel: SQLiteVecSemanticIndex(self.index_dir / f"{index_name}.sqlite")
for channel, index_name in INDEX_BY_CHANNEL.items()
}
@classmethod
def from_provider(
cls,
index_dir: str | Path,
*,
embedding_provider: str = "openai",
embedding_model: str = "text-embedding-3-small",
embedding_dimensions: int = 256,
embedding_timeout: float = 60,
**kwargs: Any,
) -> "HybridProjectionSearchBackend":
return cls(
index_dir,
embedder=make_embedder(
embedding_provider,
embedding_model,
dimensions=embedding_dimensions,
timeout=embedding_timeout,
),
embedding_provider=embedding_provider,
embedding_model=embedding_model,
embedding_dimensions=embedding_dimensions,
**kwargs,
)
def search(
self,
query: str,
*,
limit: int = 10,
filters: dict[str, Any] | None = None,
) -> list[HybridProjectionCandidate]:
query = normalize_text(query)
if not query:
return []
projection = heuristic_query_projection(query)
channels = tuple(
channel
for channel in HYBRID_ENTITY_RELATION_CHANNELS
if self._channel_document_count(channel) > 0
)
if not channels:
return []
channel_hits = self._search_channels(
query=query,
projection=projection,
limit=max(limit, self.per_channel_limit),
filters=filters,
channels=channels,
)
return aggregate_hybrid_entity_relation(channel_hits, projection)[:limit]
def search_channel(
self,
channel: str,
query: str,
*,
limit: int = 10,
filters: dict[str, Any] | None = None,
) -> list[HybridProjectionCandidate]:
if channel not in SEMANTIC_TOOL_CHANNELS:
raise ValueError(f"unsupported semantic channel: {channel}")
if channel not in self.available_channels():
return []
query = normalize_text(query)
if not query:
return []
projection = heuristic_query_projection(query)
vector = self.embedding_cache.embed_texts(
[query_text_for_channel(channel, query, projection)],
provider=self.embedding_provider,
model=self.cache_model,
embedder=self.embedder,
batch_size=1,
)[0]
results = self.indexes[channel].search(
vector,
limit=limit,
filters=filters,
fetch_multiplier=self.fetch_multiplier,
)
return rank_single_semantic_channel(channel, results)
def available_channels(self) -> tuple[str, ...]:
return tuple(
channel
for channel in SEMANTIC_TOOL_CHANNELS
if self._channel_document_count(channel) > 0
)
def info(self) -> dict[str, Any]:
return {
"index_dir": str(self.index_dir),
"embedding_provider": self.embedding_provider,
"embedding_model": self.embedding_model,
"embedding_dimensions": self.embedding_dimensions,
"strategy": "hybrid_entity_relation_vector",
"available_channels": list(self.available_channels()),
"channels": {
channel: self._safe_channel_info(channel)
for channel in self.indexes
},
}
def _channel_document_count(self, channel: str) -> int:
info = self._safe_channel_info(channel)
if not info.get("available"):
return 0
return int(info.get("document_count") or 0)
def _safe_channel_info(self, channel: str) -> dict[str, Any]:
index = self.indexes[channel]
if not index.db_path.exists():
return {
"db_path": str(index.db_path),
"available": False,
"document_count": 0,
"error": "index file is missing",
}
try:
info = index.info()
except (OSError, sqlite3.Error, SemanticIndexError) as exc:
return {
"db_path": str(index.db_path),
"available": False,
"document_count": 0,
"error": str(exc),
}
return {**info, "available": int(info.get("document_count") or 0) > 0}
def _search_channels(
self,
*,
query: str,
projection: QueryProjection,
limit: int,
filters: dict[str, Any] | None,
channels: tuple[str, ...],
) -> dict[str, list[SemanticSearchResult]]:
query_texts = {
channel: query_text_for_channel(channel, query, projection)
for channel in channels
}
vectors = self.embedding_cache.embed_texts(
[query_texts[channel] for channel in channels],
provider=self.embedding_provider,
model=self.cache_model,
embedder=self.embedder,
batch_size=1,
)
return {
channel: self.indexes[channel].search(
vector,
limit=limit,
filters=filters,
fetch_multiplier=self.fetch_multiplier,
)
for channel, vector in zip(channels, vectors)
}
class EmbeddingCache:
def __init__(self, db_path: Path):
self.db_path = db_path
self.db_path.parent.mkdir(parents=True, exist_ok=True)
with self.connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS embedding_cache (
provider TEXT NOT NULL,
model TEXT NOT NULL,
text_hash TEXT NOT NULL,
dimension INTEGER NOT NULL,
vector_blob BLOB,
vector_json TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(provider, model, text_hash)
)
"""
)
conn.commit()
def connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def embed_texts(
self,
texts: list[str],
*,
provider: str,
model: str,
embedder: Any,
batch_size: int,
) -> list[list[float]]:
hashes = [SQLiteVecSemanticIndex.text_hash(text) for text in texts]
cached: dict[str, list[float]] = {}
with self.connect() as conn:
for text_hash in sorted(set(hashes)):
row = conn.execute(
"""
SELECT vector_blob, vector_json
FROM embedding_cache
WHERE provider = ? AND model = ? AND text_hash = ?
""",
(provider, model, text_hash),
).fetchone()
if row is not None:
cached[text_hash] = decode_vector(row["vector_blob"], row["vector_json"])
missing_positions = [
index for index, text_hash in enumerate(hashes) if text_hash not in cached
]
for start in range(0, len(missing_positions), max(1, batch_size)):
positions = missing_positions[start : start + max(1, batch_size)]
batch_texts = [texts[index] for index in positions]
vectors = embed_with_retry(embedder, batch_texts)
with self.connect() as conn:
conn.executemany(
"""
INSERT OR REPLACE INTO embedding_cache(
provider, model, text_hash, dimension, vector_blob, vector_json
)
VALUES (?, ?, ?, ?, ?, '')
""",
[
(
provider,
model,
hashes[index],
len(vector),
encode_vector(vector),
)
for index, vector in zip(positions, vectors)
],
)
conn.commit()
for index, vector in zip(positions, vectors):
cached[hashes[index]] = vector
return [cached[text_hash] for text_hash in hashes]
class OpenAIEmbeddingClient:
def __init__(self, model: str, *, dimensions: int, timeout: float):
from openai import OpenAI
self.model = model
self.dimensions = dimensions
self.client = OpenAI(
api_key=os.environ.get("OPENAI_API_KEY"),
base_url=os.environ.get("OPENAI_BASE_URL") or None,
timeout=timeout,
)
def embed(self, texts: list[str]) -> list[list[float]]:
kwargs: dict[str, Any] = {"model": self.model, "input": texts}
if self.dimensions > 0:
kwargs["dimensions"] = self.dimensions
response = self.client.embeddings.create(**kwargs)
return [list(item.embedding) for item in sorted(response.data, key=lambda item: item.index)]
class HashEmbeddingClient:
def __init__(self, dimensions: int = 256):
self.dimensions = dimensions
def embed(self, texts: list[str]) -> list[list[float]]:
return [self._embed_one(text) for text in texts]
def _embed_one(self, text: str) -> list[float]:
vector = [0.0] * self.dimensions
for term in keyword_terms(text)[:256]:
digest = hashlib.blake2b(term.encode("utf-8"), digest_size=8).digest()
bucket = int.from_bytes(digest[:4], "little") % self.dimensions
sign = 1.0 if digest[4] % 2 == 0 else -1.0
vector[bucket] += sign
norm = sum(value * value for value in vector) ** 0.5
if norm:
vector = [value / norm for value in vector]
return vector
def make_embedder(provider: str, model: str, *, dimensions: int, timeout: float) -> Any:
if provider == "openai":
return OpenAIEmbeddingClient(model, dimensions=dimensions, timeout=timeout)
if provider == "hash":
return HashEmbeddingClient(dimensions=dimensions if dimensions > 0 else 256)
raise ValueError(f"unknown embedding provider: {provider}")
def query_text_for_channel(channel: str, query: str, projection: QueryProjection) -> str:
if channel in {"metadata", "summary"}:
return query
if channel == "entity":
return compact_join(projection.entities, limit=24) or query
if channel == "constraint":
return compact_join(projection.constraints, limit=24) or query
if channel == "relation":
return "\n".join(projection.relations) or query
raise ValueError(f"unknown semantic channel: {channel}")
def rank_single_semantic_channel(
channel: str,
results: list[SemanticSearchResult],
) -> list[HybridProjectionCandidate]:
rows: list[HybridProjectionCandidate] = []
seen: set[str] = set()
for rank, result in enumerate(results, 1):
doc_id = str(result.external_id or result.file_ref)
if doc_id in seen:
continue
seen.add(doc_id)
rows.append(
HybridProjectionCandidate(
document_id=doc_id,
score=1 / (60 + rank),
sources=[{"channel": channel, "rank": rank, "distance": result.distance}],
source_type=result.source_type,
source_path=result.source_path,
title=result.title,
metadata=result.metadata,
snippet=f"{channel}_vector rank={rank}",
)
)
return rows
def aggregate_hybrid_entity_relation(
channel_hits: dict[str, list[SemanticSearchResult]],
projection: QueryProjection,
) -> list[HybridProjectionCandidate]:
by_doc: dict[str, dict[str, Any]] = {}
for channel, results in channel_hits.items():
weight = HYBRID_ENTITY_RELATION_WEIGHTS[channel]
seen_in_channel = set()
for rank, result in enumerate(results, 1):
doc_id = str(result.external_id or result.file_ref)
if doc_id in seen_in_channel:
continue
seen_in_channel.add(doc_id)
item = by_doc.setdefault(
doc_id,
{
"document_id": doc_id,
"score": 0.0,
"sources": [],
"source_type": result.source_type,
"source_path": result.source_path,
"title": result.title,
"metadata": result.metadata,
},
)
item["score"] += weight * (1 / (60 + rank))
item["sources"].append({"channel": channel, "rank": rank, "distance": result.distance})
candidates = []
for item in by_doc.values():
item["score"] += exact_match_bonus(item, projection)
candidates.append(
HybridProjectionCandidate(
document_id=item["document_id"],
score=float(item["score"]),
sources=item["sources"],
source_type=item["source_type"],
source_path=item["source_path"],
title=item["title"],
metadata=item["metadata"],
snippet=hybrid_snippet(item),
)
)
return sorted(
candidates,
key=lambda item: (
-item.score,
min(source["rank"] for source in item.sources),
item.document_id,
),
)
def exact_match_bonus(item: dict[str, Any], projection: QueryProjection) -> float:
haystack = json.dumps(
{
"title": item.get("title", ""),
"source_path": item.get("source_path", ""),
"metadata": item.get("metadata", {}),
},
ensure_ascii=False,
).lower()
terms = [*projection.entities[:8], *projection.constraints[:6]]
matched = 0
for term in terms:
normalized = str(term).lower().strip()
if len(normalized) >= 3 and normalized in haystack:
matched += 1
return min(0.02, matched * 0.004)
def hybrid_snippet(item: dict[str, Any]) -> str:
channels = ", ".join(
f"{source['channel']}@{source['rank']}" for source in item.get("sources", [])[:4]
)
topic = str((item.get("metadata") or {}).get("topic") or "").strip()
parts = [f"hybrid_entity_relation_vector {channels}"]
if topic:
parts.append(f"topic: {topic}")
return "; ".join(parts)
def heuristic_query_projection(question: str) -> QueryProjection:
entities = dedupe(
[
*identifier_terms(question),
*keyword_terms(question)[:16],
]
)[:16]
constraints = dedupe(
[
*extract_constraint_terms(question),
*numeric_terms(question),
]
)[:12]
predicate = infer_query_predicate(question)
subject = entities[0] if entities else "question"
return QueryProjection(
entities=entities,
relations=[f"{subject} | {predicate} | {question}"],
constraints=constraints,
expected_answer_type=infer_answer_type(question),
)
def compact_join(values: list[str], *, limit: int) -> str:
return " | ".join(values[:limit])
def identifier_terms(text: str) -> list[str]:
patterns = [
r"\b[A-Z]{2,12}-\d{2,}\b",
r"\b[A-Za-z_][A-Za-z0-9_]{2,}\b\s*(?:=|:)\s*[A-Za-z0-9_.:/-]+",
r"\b[A-Za-z][A-Za-z0-9_+-]+(?:[-_+][A-Za-z0-9]+)+\b",
r"\b[A-Z]{2,}[A-Za-z0-9_-]*\b",
]
found: list[str] = []
for pattern in patterns:
found.extend(match.strip() for match in re.findall(pattern, text))
return found
def keyword_terms(text: str) -> list[str]:
stopwords = {
"about",
"after",
"also",
"and",
"are",
"for",
"from",
"how",
"into",
"the",
"this",
"that",
"what",
"when",
"where",
"which",
"with",
}
terms = [
term.lower()
for term in re.findall(r"[A-Za-z][A-Za-z0-9_+-]{2,}", text)
if term.lower() not in stopwords
]
return dedupe(terms)
def extract_constraint_terms(text: str) -> list[str]:
constraints = []
for pattern in [
r"\b(?:must|should|required|requires?|default(?:s)?|limit(?:s)?|maximum|minimum)\b[^.!?\n]{0,120}",
r"\b[A-Za-z_][A-Za-z0-9_]{2,}\s*(?:=|:)\s*[A-Za-z0-9_.:/-]+",
]:
constraints.extend(match.strip() for match in re.findall(pattern, text, flags=re.IGNORECASE))
return dedupe(constraints)
def numeric_terms(text: str) -> list[str]:
return re.findall(
r"\b\d+(?:\.\d+)?\s*(?:MiB|GiB|MB|GB|ms|sec|seconds|minutes|hours|days|%|tokens?|req/s|rps)\b",
text,
flags=re.IGNORECASE,
)
def infer_query_predicate(question: str) -> str:
lowered = question.lower()
rules = [
("asks_default", ["default", "defaults"]),
("asks_limit", ["limit", "maximum", "minimum", "size"]),
("asks_cause", ["caused", "cause", "why"]),
("asks_owner", ["who", "owner", "assigned"]),
("asks_deadline", ["when", "deadline", "date"]),
("asks_status", ["status", "state"]),
("asks_requirement", ["required", "requirement", "must"]),
]
for predicate, needles in rules:
if any(needle in lowered for needle in needles):
return predicate
return "asks_about"
def infer_answer_type(question: str) -> str:
lowered = question.lower()
if "how many" in lowered or "limit" in lowered or "size" in lowered:
return "number_or_limit"
if lowered.startswith("who"):
return "person_or_team"
if lowered.startswith("when"):
return "date_or_time"
if "why" in lowered or "caused" in lowered:
return "cause"
return "fact"
def dedupe(values: Any) -> list[str]:
seen = set()
result = []
for value in values:
normalized = re.sub(r"\s+", " ", str(value)).strip()
key = normalized.lower()
if not normalized or key in seen:
continue
seen.add(key)
result.append(normalized)
return result
def normalize_text(text: str) -> str:
return re.sub(r"\s+", " ", str(text or "")).strip()
def embedding_cache_model_key(model: str, dimensions: int) -> str:
return f"{model}:dimensions={dimensions}" if dimensions > 0 else model
def embed_with_retry(embedder: Any, texts: list[str], *, max_attempts: int = 8) -> list[list[float]]:
for attempt in range(1, max_attempts + 1):
try:
return embedder.embed(texts)
except Exception:
if attempt >= max_attempts:
raise
time.sleep(min(120.0, 2.0 ** (attempt - 1)))
raise RuntimeError("unreachable embedding retry state")
def encode_vector(vector: list[float]) -> bytes:
return struct.pack(f"<{len(vector)}f", *vector)
def decode_vector(blob: bytes | None, vector_json: str | None) -> list[float]:
if blob:
if len(blob) % 4 != 0:
raise ValueError("invalid cached vector blob length")
return list(struct.unpack(f"<{len(blob) // 4}f", blob))
if vector_json:
value = json.loads(vector_json)
if isinstance(value, list):
return [float(item) for item in value]
raise ValueError("cached embedding row does not contain a vector")

View file

@ -0,0 +1,152 @@
from __future__ import annotations
import json
import re
from typing import Any
from .types import MetadataField
class MetadataQueryError(ValueError):
pass
class MetadataQueryEngine:
FIELD_RE = re.compile(r"^[A-Za-z][A-Za-z0-9_]*$")
OPERATORS = {"$eq", "$ne", "$in", "$gt", "$gte", "$lt", "$lte", "$contains"}
LOGICAL_OPERATORS = {"$and", "$or"}
MAX_DEPTH = 5
def __init__(self, store: Any):
self.store = store
def register_schema(self, schema: dict[str, Any], source: str = "manual") -> None:
fields = []
raw_fields = schema.get("fields", schema)
if not isinstance(raw_fields, dict):
raise MetadataQueryError("metadata schema must contain a fields object")
for name, declaration in raw_fields.items():
name = str(name)
self.validate_field_name(name)
if isinstance(declaration, str):
field_type = declaration
description = ""
elif isinstance(declaration, dict):
field_type = str(declaration.get("type", ""))
description = str(declaration.get("description", ""))
else:
raise MetadataQueryError(f"Invalid schema declaration for field: {name}")
if field_type not in {"string", "number", "boolean"}:
raise MetadataQueryError(f"Unsupported metadata field type for {name}: {field_type}")
fields.append(
MetadataField(
name=name,
field_type=field_type,
description=description,
source=source,
)
)
if fields:
self.store.upsert_metadata_fields(fields)
def parse_filter(self, value: str | dict[str, Any] | None) -> dict[str, Any] | None:
if value is None or value == "":
return None
if isinstance(value, str):
value = self.parse_dsl(value)
if not isinstance(value, dict):
raise MetadataQueryError("metadata_filter must be a JSON object")
self.validate_filter(value)
return value
def parse_dsl(self, dsl: str) -> dict[str, Any]:
try:
parsed = json.loads(dsl)
except json.JSONDecodeError as exc:
raise MetadataQueryError(
"metadata DSL must be a JSON object, for example "
'\'{"$and":[{"repo":"redwood"},{"year":{"$gte":2024}}]}\''
) from exc
if not isinstance(parsed, dict):
raise MetadataQueryError("metadata DSL must be a JSON object")
return parsed
def validate_filter(self, metadata_filter: dict[str, Any], depth: int = 1) -> None:
if depth > self.MAX_DEPTH:
raise MetadataQueryError(f"metadata_filter nesting depth exceeds {self.MAX_DEPTH}")
if not metadata_filter:
return
for key, condition in metadata_filter.items():
if key in self.LOGICAL_OPERATORS:
self._validate_logical(key, condition, depth)
continue
self.validate_field(key)
self._validate_field_condition(key, condition)
def _validate_logical(self, operator: str, condition: Any, depth: int) -> None:
if not isinstance(condition, list) or not condition:
raise MetadataQueryError(f"{operator} requires a non-empty list")
for item in condition:
if not isinstance(item, dict):
raise MetadataQueryError(f"{operator} items must be metadata filter objects")
self.validate_filter(item, depth + 1)
def _validate_field_condition(self, field: str, condition: Any) -> None:
if not isinstance(condition, dict) or not any(
str(key).startswith("$") for key in condition
):
self._validate_scalar(condition, context=field)
return
if len(condition) != 1:
raise MetadataQueryError(
f"Field {field} condition must contain exactly one metadata operator"
)
operator, expected = next(iter(condition.items()))
if operator not in self.OPERATORS:
raise MetadataQueryError(f"Unsupported metadata operator: {operator}")
if operator == "$in":
if not isinstance(expected, list):
raise MetadataQueryError(f"{field} $in requires a list")
for item in expected:
self._validate_scalar(item, context=f"{field} $in")
return
if operator == "$contains":
self._validate_scalar(expected, context=f"{field} $contains")
return
if operator in {"$gt", "$gte", "$lt", "$lte"}:
self._validate_range_value(expected, context=f"{field} {operator}")
return
self._validate_scalar(expected, context=f"{field} {operator}")
def validate_field(self, field: str) -> None:
self.validate_field_name(field)
if not self.store.metadata_field_exists(field):
raise MetadataQueryError(f"Unknown metadata field: {field}")
def validate_field_name(self, field: str) -> None:
if not self.FIELD_RE.match(field):
raise MetadataQueryError(f"Invalid metadata field: {field}")
def export_schema(self) -> dict[str, Any]:
fields = {}
for field in self.store.list_metadata_fields():
fields[field.name] = {
"type": field.field_type,
"description": field.description,
}
return {"fields": fields}
@staticmethod
def _validate_scalar(value: Any, *, context: str) -> None:
if isinstance(value, bool):
return
if isinstance(value, (int, float)):
return
if isinstance(value, str):
return
raise MetadataQueryError(f"{context} must be a string, number, or boolean")
@staticmethod
def _validate_range_value(value: Any, *, context: str) -> None:
if isinstance(value, bool) or not isinstance(value, (int, float, str)):
raise MetadataQueryError(f"{context} must be a string or number")

View file

@ -0,0 +1,139 @@
from __future__ import annotations
import json
import os
from dataclasses import dataclass, field
from typing import Any, Protocol
GENERATED_METADATA_FIELDS = ("summary", "doc_type", "domain", "topic", "entity", "relation")
class MetadataGenerationError(RuntimeError):
pass
@dataclass(frozen=True)
class MetadataGenerationInput:
file_ref: str
external_id: str | None
title: str
source_path: str
content_type: str
source_type: str | None
text: str
metadata: dict[str, Any] = field(default_factory=dict)
text_artifact_path: str | None = None
@dataclass(frozen=True)
class MetadataGenerationResult:
values: dict[str, Any] = field(default_factory=dict)
failures: dict[str, str] = field(default_factory=dict)
class MetadataGenerator(Protocol):
def generate(
self,
request: MetadataGenerationInput,
*,
fields: list[str],
) -> MetadataGenerationResult | dict[str, Any]:
...
class OpenAIMetadataGenerator:
"""Default product generator for retrieval metadata.
This intentionally lives under pageindex.filesystem instead of benchmark
paths. It uses registered text today; callers can pass PageIndex-extracted
text through the same MetadataGenerationInput without changing the API.
"""
def __init__(
self,
*,
model: str | None = None,
base_url: str | None = None,
max_text_chars: int = 24000,
):
self.model = model or os.environ.get("PIFS_METADATA_MODEL", "gpt-5-nano")
self.base_url = base_url if base_url is not None else os.environ.get("OPENAI_BASE_URL")
self.max_text_chars = max_text_chars
def generate(
self,
request: MetadataGenerationInput,
*,
fields: list[str],
) -> MetadataGenerationResult:
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
raise MetadataGenerationError("OPENAI_API_KEY is required for PIFS metadata generation")
from openai import OpenAI
client = OpenAI(api_key=api_key, base_url=self.base_url or None)
response = client.chat.completions.create(
model=self.model,
messages=[
{
"role": "system",
"content": (
"Generate grounded retrieval metadata for one document. "
"Use only the provided document text and ordinary source metadata. "
"The summary must be a retrieval summary, not a title rewrite. "
"Do not use filenames, paths, URLs, storage URIs, or outside knowledge. "
"Return strict JSON matching the requested fields."
),
},
{
"role": "user",
"content": json.dumps(
{
"requested_fields": fields,
"document": {
"title": request.title,
"source_type": request.source_type,
"content_type": request.content_type,
"metadata": request.metadata,
"text": request.text[: self.max_text_chars],
},
},
ensure_ascii=False,
),
},
],
response_format=self._response_format(fields),
)
content = response.choices[0].message.content or "{}"
values = json.loads(content)
return MetadataGenerationResult(
values={field: values[field] for field in fields if field in values},
)
@staticmethod
def _response_format(fields: list[str]) -> dict[str, Any]:
properties: dict[str, Any] = {}
for field in fields:
if field in {"summary", "doc_type", "domain", "topic"}:
properties[field] = {"type": "string"}
elif field in {"entity", "relation"}:
properties[field] = {"type": "string"}
else:
raise MetadataGenerationError(
f"OpenAIMetadataGenerator does not support generated metadata field: {field}"
)
return {
"type": "json_schema",
"json_schema": {
"name": "pifs_metadata_generation",
"strict": True,
"schema": {
"type": "object",
"additionalProperties": False,
"required": fields,
"properties": properties,
},
},
}

View file

@ -0,0 +1,131 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
from .hybrid_projection import (
EmbeddingCache,
INDEX_BY_CHANNEL,
embedding_cache_model_key,
make_embedder,
)
from .semantic_index import SQLiteVecSemanticIndex, SemanticIndexRecord
class SummaryProjectionIndexer:
"""Synchronous register-time summary projection indexer."""
def __init__(
self,
index_dir: str | Path,
*,
embedder: Any,
embedding_provider: str,
embedding_model: str,
embedding_dimensions: int = 256,
embedding_cache_path: str | Path | None = None,
) -> None:
self.index_dir = Path(index_dir).expanduser()
self.index_dir.mkdir(parents=True, exist_ok=True)
self.embedder = embedder
self.embedding_provider = embedding_provider
self.embedding_model = embedding_model
self.embedding_dimensions = embedding_dimensions
self.cache_model = embedding_cache_model_key(embedding_model, embedding_dimensions)
self.embedding_cache = EmbeddingCache(
Path(embedding_cache_path).expanduser()
if embedding_cache_path is not None
else self.index_dir / "embedding_cache.sqlite"
)
self.index = SQLiteVecSemanticIndex(
self.index_dir / f"{INDEX_BY_CHANNEL['summary']}.sqlite"
)
self._ensure_index()
@classmethod
def from_provider(
cls,
index_dir: str | Path,
*,
embedding_provider: str = "openai",
embedding_model: str = "text-embedding-3-small",
embedding_dimensions: int = 256,
embedding_timeout: float = 60,
**kwargs: Any,
) -> "SummaryProjectionIndexer":
return cls(
index_dir,
embedder=make_embedder(
embedding_provider,
embedding_model,
dimensions=embedding_dimensions,
timeout=embedding_timeout,
),
embedding_provider=embedding_provider,
embedding_model=embedding_model,
embedding_dimensions=embedding_dimensions,
**kwargs,
)
def upsert_summary(self, record: dict[str, Any]) -> dict[str, Any]:
summary = str((record.get("derived_metadata") or {}).get("summary") or "").strip()
if not summary:
return {"status": "skipped", "reason": "missing_summary"}
vector = self.embedding_cache.embed_texts(
[summary],
provider=self.embedding_provider,
model=self.cache_model,
embedder=self.embedder,
batch_size=1,
)[0]
metadata = dict(record.get("metadata") or {})
metadata.update(record.get("derived_metadata") or {})
count = self.index.upsert_many(
[
SemanticIndexRecord(
file_ref=str(record["file_ref"]),
vector=vector,
text=summary,
external_id=record.get("external_id"),
source_type=str(record.get("source_type") or ""),
source_path=str(record.get("source_path") or ""),
title=str(record.get("title") or ""),
metadata=metadata,
)
]
)
return {
"status": "ready",
"indexed_rows": count,
"index_path": str(self.index.db_path),
"embedding_provider": self.embedding_provider,
"embedding_model": self.embedding_model,
"embedding_dimensions": self.embedding_dimensions,
}
def _ensure_index(self) -> None:
if not self.index.db_path.exists():
self.index.reset(
dimension=self.embedding_dimensions,
metadata=self._index_metadata(),
)
return
try:
if self.index.dimension() != self.embedding_dimensions:
self.index.reset(
dimension=self.embedding_dimensions,
metadata=self._index_metadata(),
)
except Exception:
self.index.reset(
dimension=self.embedding_dimensions,
metadata=self._index_metadata(),
)
def _index_metadata(self) -> dict[str, Any]:
return {
"channel": "summary",
"embedding_provider": self.embedding_provider,
"embedding_model": self.embedding_model,
"embedding_dimensions": self.embedding_dimensions,
}

View file

@ -0,0 +1,72 @@
from __future__ import annotations
import re
from typing import Any, Iterable
SEMANTIC_FOLDER_ROOT = "/semantic"
SEMANTIC_FOLDER_BASE_FIELDS = {"doc_type", "domain", "topic"}
SEMANTIC_FOLDER_SYSTEM_FIELDS = {"source_type"}
SEMANTIC_FOLDER_FORBIDDEN_FIELDS = {
"summary",
"entities",
"relations",
"constraints",
"retrieval_cues",
"dataset_doc_uuid",
"path",
"uri",
"source_path",
"storage_uri",
"title",
"content_type",
"created_at",
"updated_at",
}
def canonical_semantic_folder_field_name(value: Any) -> str:
text = str(value or "").strip()
if not text:
return ""
text = re.sub(r"(.)([A-Z][a-z]+)", r"\1_\2", text)
text = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", text)
return re.sub(r"[^A-Za-z0-9]+", "_", text).strip("_").casefold()
def compact_semantic_folder_field_name(value: Any) -> str:
return re.sub(r"[^a-z0-9]+", "", canonical_semantic_folder_field_name(value))
def semantic_folder_field_identity_keys(value: Any) -> frozenset[str]:
canonical = canonical_semantic_folder_field_name(value)
compact = compact_semantic_folder_field_name(value)
return frozenset(key for key in (canonical, compact) if key)
def semantic_folder_field_identity_set(fields: Iterable[Any]) -> frozenset[str]:
keys: set[str] = set()
for field in fields:
keys.update(semantic_folder_field_identity_keys(field))
return frozenset(keys)
SEMANTIC_FOLDER_FORBIDDEN_FIELD_IDENTITIES = semantic_folder_field_identity_set(
SEMANTIC_FOLDER_FORBIDDEN_FIELDS
)
def is_semantic_folder_forbidden_field(value: Any) -> bool:
return bool(
semantic_folder_field_identity_keys(value)
& SEMANTIC_FOLDER_FORBIDDEN_FIELD_IDENTITIES
)
def semantic_folder_allowed_extension_fields(fields: Iterable[Any]) -> set[str]:
allowed = set()
for field in fields:
name = canonical_semantic_folder_field_name(field)
if name and not is_semantic_folder_forbidden_field(field):
allowed.add(name)
return allowed

View file

@ -0,0 +1,362 @@
from __future__ import annotations
import hashlib
import json
import sqlite3
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Protocol
import sqlite_vec
class SemanticIndexError(RuntimeError):
pass
@dataclass(frozen=True)
class SemanticIndexRecord:
file_ref: str
vector: list[float]
text: str
external_id: str | None = None
source_type: str = ""
source_path: str = ""
title: str = ""
metadata: dict[str, Any] | None = None
@dataclass(frozen=True)
class SemanticSearchResult:
file_ref: str
distance: float
external_id: str | None
source_type: str
source_path: str
title: str
text_hash: str
metadata: dict[str, Any]
class RebuildableSemanticIndex(Protocol):
def reset(self, *, dimension: int, metadata: dict[str, Any] | None = None) -> None:
...
def upsert_many(self, records: list[SemanticIndexRecord]) -> int:
...
def search(
self,
vector: list[float],
*,
limit: int = 10,
filters: dict[str, Any] | None = None,
fetch_multiplier: int = 20,
) -> list[SemanticSearchResult]:
...
def info(self) -> dict[str, Any]:
...
class SQLiteVecSemanticIndex:
"""Rebuildable local semantic index backed by sqlite-vec.
This is intentionally separate from the PIFS catalog tables. The catalog
remains source of truth; this file is a rebuildable recall index.
"""
def __init__(self, db_path: str | Path):
self.db_path = Path(db_path).expanduser()
self.db_path.parent.mkdir(parents=True, exist_ok=True)
def reset(self, *, dimension: int, metadata: dict[str, Any] | None = None) -> None:
if dimension <= 0:
raise SemanticIndexError("semantic index dimension must be positive")
with self.connect() as conn:
conn.executescript(
"""
DROP TABLE IF EXISTS semantic_index_vec;
DROP TABLE IF EXISTS semantic_index_docs;
DROP TABLE IF EXISTS semantic_index_config;
CREATE TABLE semantic_index_config (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
CREATE TABLE semantic_index_docs (
rowid INTEGER PRIMARY KEY,
file_ref TEXT NOT NULL UNIQUE,
external_id TEXT,
source_type TEXT NOT NULL DEFAULT '',
source_path TEXT NOT NULL DEFAULT '',
title TEXT NOT NULL DEFAULT '',
text_hash TEXT NOT NULL,
text_chars INTEGER NOT NULL DEFAULT 0,
metadata_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_semantic_index_docs_file_ref
ON semantic_index_docs(file_ref);
CREATE INDEX idx_semantic_index_docs_external_id
ON semantic_index_docs(external_id);
CREATE INDEX idx_semantic_index_docs_source_type
ON semantic_index_docs(source_type);
"""
)
conn.execute(
"CREATE VIRTUAL TABLE semantic_index_vec USING "
f"vec0(source_type TEXT partition key, embedding float[{dimension}])"
)
config = {
"dimension": str(dimension),
"adapter": "sqlite-vec",
"adapter_version": sqlite_vec.__version__,
"metadata": json.dumps(metadata or {}, ensure_ascii=False, sort_keys=True),
}
conn.executemany(
"INSERT INTO semantic_index_config(key, value) VALUES (?, ?)",
sorted(config.items()),
)
conn.commit()
def upsert_many(self, records: list[SemanticIndexRecord]) -> int:
if not records:
return 0
dimension = self.dimension()
with self.connect() as conn:
inserted = 0
for record in records:
if len(record.vector) != dimension:
raise SemanticIndexError(
f"vector dimension mismatch for {record.file_ref}: "
f"expected {dimension}, got {len(record.vector)}"
)
rowid = self._upsert_doc(conn, record)
conn.execute("DELETE FROM semantic_index_vec WHERE rowid = ?", (rowid,))
conn.execute(
"INSERT INTO semantic_index_vec(rowid, source_type, embedding) VALUES (?, ?, ?)",
(
rowid,
record.source_type,
sqlite_vec.serialize_float32(record.vector),
),
)
inserted += 1
conn.commit()
return inserted
def search(
self,
vector: list[float],
*,
limit: int = 10,
filters: dict[str, Any] | None = None,
fetch_multiplier: int = 20,
) -> list[SemanticSearchResult]:
dimension = self.dimension()
if len(vector) != dimension:
raise SemanticIndexError(
f"query vector dimension mismatch: expected {dimension}, got {len(vector)}"
)
fetch_k = min(4096, max(limit, limit * max(fetch_multiplier, 1)))
source_types = _source_type_filters(filters or {})
with self.connect() as conn:
rows = []
if source_types:
for source_type in source_types:
rows.extend(
conn.execute(
"""
SELECT
d.file_ref,
d.external_id,
d.source_type,
d.source_path,
d.title,
d.text_hash,
d.metadata_json,
v.distance
FROM semantic_index_vec v
JOIN semantic_index_docs d ON d.rowid = v.rowid
WHERE v.embedding MATCH ? AND k = ? AND v.source_type = ?
ORDER BY v.distance
""",
(sqlite_vec.serialize_float32(vector), fetch_k, source_type),
).fetchall()
)
rows.sort(key=lambda row: float(row["distance"]))
else:
rows = conn.execute(
"""
SELECT
d.file_ref,
d.external_id,
d.source_type,
d.source_path,
d.title,
d.text_hash,
d.metadata_json,
v.distance
FROM semantic_index_vec v
JOIN semantic_index_docs d ON d.rowid = v.rowid
WHERE v.embedding MATCH ? AND k = ?
ORDER BY v.distance
""",
(sqlite_vec.serialize_float32(vector), fetch_k),
).fetchall()
results: list[SemanticSearchResult] = []
for row in rows:
metadata = _json_obj(row["metadata_json"])
if not _matches_filters(row, metadata, filters or {}):
continue
results.append(
SemanticSearchResult(
file_ref=row["file_ref"],
distance=float(row["distance"]),
external_id=row["external_id"],
source_type=row["source_type"],
source_path=row["source_path"],
title=row["title"],
text_hash=row["text_hash"],
metadata=metadata,
)
)
if len(results) >= limit:
break
return results
def info(self) -> dict[str, Any]:
with self.connect() as conn:
config = {
row["key"]: row["value"]
for row in conn.execute(
"SELECT key, value FROM semantic_index_config ORDER BY key"
).fetchall()
}
count = conn.execute("SELECT COUNT(*) FROM semantic_index_docs").fetchone()[0]
parsed_metadata: dict[str, Any]
try:
parsed_metadata = json.loads(config.get("metadata", "{}"))
except json.JSONDecodeError:
parsed_metadata = {}
return {
"db_path": str(self.db_path),
"adapter": config.get("adapter", "sqlite-vec"),
"adapter_version": config.get("adapter_version", ""),
"dimension": int(config.get("dimension", "0") or 0),
"document_count": count,
"metadata": parsed_metadata,
}
def dimension(self) -> int:
with self.connect() as conn:
row = conn.execute(
"SELECT value FROM semantic_index_config WHERE key = 'dimension'"
).fetchone()
if row is None:
raise SemanticIndexError(
f"semantic index is not initialized; call reset() first: {self.db_path}"
)
return int(row["value"])
def connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
conn.enable_load_extension(True)
sqlite_vec.load(conn)
conn.enable_load_extension(False)
return conn
@staticmethod
def text_hash(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
@staticmethod
def _upsert_doc(conn: sqlite3.Connection, record: SemanticIndexRecord) -> int:
existing = conn.execute(
"SELECT rowid FROM semantic_index_docs WHERE file_ref = ?",
(record.file_ref,),
).fetchone()
metadata_json = json.dumps(record.metadata or {}, ensure_ascii=False, sort_keys=True)
text_hash = SQLiteVecSemanticIndex.text_hash(record.text)
if existing is None:
cursor = conn.execute(
"""
INSERT INTO semantic_index_docs(
file_ref, external_id, source_type, source_path, title,
text_hash, text_chars, metadata_json
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
record.file_ref,
record.external_id,
record.source_type,
record.source_path,
record.title,
text_hash,
len(record.text),
metadata_json,
),
)
return int(cursor.lastrowid)
rowid = int(existing["rowid"])
conn.execute(
"""
UPDATE semantic_index_docs
SET external_id = ?,
source_type = ?,
source_path = ?,
title = ?,
text_hash = ?,
text_chars = ?,
metadata_json = ?,
updated_at = CURRENT_TIMESTAMP
WHERE rowid = ?
""",
(
record.external_id,
record.source_type,
record.source_path,
record.title,
text_hash,
len(record.text),
metadata_json,
rowid,
),
)
return rowid
def _json_obj(text: str | None) -> dict[str, Any]:
if not text:
return {}
try:
value = json.loads(text)
except json.JSONDecodeError:
return {}
return value if isinstance(value, dict) else {}
def _matches_filters(
row: sqlite3.Row,
metadata: dict[str, Any],
filters: dict[str, Any],
) -> bool:
for key, expected in filters.items():
actual = row[key] if key in row.keys() else metadata.get(key)
if isinstance(expected, list):
if str(actual) not in {str(item) for item in expected}:
return False
elif str(actual) != str(expected):
return False
return True
def _source_type_filters(filters: dict[str, Any]) -> list[str]:
value = filters.get("source_type")
if value is None:
return []
if isinstance(value, list):
return [str(item) for item in value if str(item)]
return [str(value)] if str(value) else []

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,40 @@
from __future__ import annotations
from copy import deepcopy
from typing import Any
def strip_pageindex_text_fields(value: Any) -> Any:
if isinstance(value, list):
return [strip_pageindex_text_fields(item) for item in value]
if isinstance(value, dict):
return {
key: strip_pageindex_text_fields(item)
for key, item in value.items()
if key != "text"
}
return value
def find_pageindex_node(structure: Any, node_id: str) -> dict[str, Any] | None:
if isinstance(structure, dict):
if str(structure.get("node_id", "")) == str(node_id):
return deepcopy(structure)
for child_key in ("nodes", "children"):
found = find_pageindex_node(structure.get(child_key), node_id)
if found is not None:
return found
if isinstance(structure, list):
for item in structure:
found = find_pageindex_node(item, node_id)
if found is not None:
return found
return None
def first_node_location(node: dict[str, Any]) -> str | None:
for key in ("line_num", "physical_index", "start_index"):
value = node.get(key)
if value is not None and value != "":
return str(value)
return None

View file

@ -0,0 +1,87 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Optional
@dataclass(frozen=True)
class SearchResult:
reference_id: str
file_ref: str
external_id: Optional[str]
title: str
snippet: str
folder_path: str
folder_paths: list[str]
metadata: dict[str, Any]
source_path: str = ""
id: Optional[str] = None
document_id: Optional[str] = None
name: str = ""
description: str = ""
status: str = ""
pageNum: Optional[int] = None
createdAt: Optional[str] = None
folderId: Optional[str] = None
derived_metadata: dict[str, Any] = field(default_factory=dict)
metadata_generation: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class OpenResult:
reference_id: str
file_ref: str
start_line: int
end_line: int
text: str
external_id: Optional[str] = None
folder_path: str = ""
source_path: str = ""
@dataclass(frozen=True)
class FolderEntry:
folder_id: str
parent_id: Optional[str]
name: str
path: str
kind: str
@dataclass(frozen=True)
class FileEntry:
file_ref: str
external_id: Optional[str]
storage_uri: str
source_path: str
title: str
descriptor: str
content_type: str
source_type: Optional[str]
fingerprint: str
text_artifact_path: str
raw_artifact_path: Optional[str]
pageindex_doc_id: Optional[str]
pageindex_tree_status: str
metadata: dict[str, Any]
folder_path: str
derived_metadata: dict[str, Any] = field(default_factory=dict)
metadata_generation: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class MetadataField:
name: str
field_type: str
description: str = ""
indexed: bool = True
faceted: bool = False
sortable: bool = False
source: str = "manual"
@dataclass(frozen=True)
class CommandResult:
command: str
data: Any
text: str

6
pifs-cli Executable file
View file

@ -0,0 +1,6 @@
#!/usr/bin/env python3
from pageindex.filesystem.cli import main
if __name__ == "__main__":
raise SystemExit(main())

14
pyproject.toml Normal file
View file

@ -0,0 +1,14 @@
[project]
name = "pageindex"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
"litellm==1.83.7",
"openai-agents>=0.17.2",
"pymupdf==1.26.4",
"pypdf2==3.0.1",
"pytest>=9.0.3",
"python-dotenv==1.1.0",
"pyyaml==6.0.2",
"sqlite-vec>=0.1.9",
]

View file

@ -0,0 +1,60 @@
import json
from types import SimpleNamespace
class SummaryBackend:
def __init__(self, document_id):
self.document_id = document_id
self.calls = []
def available_channels(self):
return ("summary",)
def search_channel(self, channel, query, *, limit=10, filters=None):
self.calls.append((channel, query, filters))
return [
SimpleNamespace(
document_id=self.document_id,
snippet=f"summary candidate: {query}",
)
]
def test_semantic_search_scope_keeps_ordinary_folders_out_of_source_type_filters(tmp_path):
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
filesystem = PageIndexFileSystem(workspace=tmp_path / "workspace")
filesystem.register_file(
storage_uri="file:///tmp/report.pdf",
source_path="examples/documents/report.pdf",
folder_path="/documents",
external_id="dsid_report",
title="Annual report",
metadata={"source_type": "examples-documents"},
content="Federal Reserve supervision and regulation annual report.",
)
backend = SummaryBackend("dsid_report")
filesystem.semantic_retrieval_backend = backend
executor = PIFSCommandExecutor(filesystem, json_output=True)
result = json.loads(
executor.execute('search-summary "Federal Reserve annual report" /documents')
)
assert backend.calls[0][2] == {}
assert result["data"]["data"][0]["external_id"] == "dsid_report"
def test_semantic_search_scope_filters_explicit_source_type_facets():
from pageindex.filesystem import PageIndexFileSystem
assert PageIndexFileSystem._semantic_filters_for_scope(
{"folder_path": "/source_type=google-drive"}
) == {"source_type": "google_drive"}
assert PageIndexFileSystem._semantic_filters_for_scope(
{"folder_path": "/semantic/source_type=google-drive"}
) == {"source_type": "google_drive"}
assert PageIndexFileSystem._semantic_filters_for_scope(
{"folder_path": "/documents"}
) == {}

View file

@ -0,0 +1,632 @@
import json
import tempfile
from pathlib import Path
import pytest
def write_pageindex_client_doc(workspace: Path, doc_id: str, doc: dict) -> None:
workspace.mkdir(parents=True, exist_ok=True)
(workspace / f"{doc_id}.json").write_text(
json.dumps(doc, ensure_ascii=False, indent=2),
encoding="utf-8",
)
meta = {
doc_id: {
"type": doc.get("type", ""),
"doc_name": doc.get("doc_name", ""),
"doc_description": doc.get("doc_description", ""),
"path": doc.get("path", ""),
}
}
if doc.get("type") == "pdf":
meta[doc_id]["page_count"] = doc.get("page_count")
elif doc.get("type") == "md":
meta[doc_id]["line_count"] = doc.get("line_count")
(workspace / "_meta.json").write_text(
json.dumps(meta, ensure_ascii=False, indent=2),
encoding="utf-8",
)
class RecordingMetadataGenerator:
values = {
"summary": "Generated retrieval summary.",
"doc_type": "technical_note",
"domain": "documentation",
"topic": "pageindex extraction",
}
def __init__(self):
self.calls = []
def generate(self, request, *, fields):
self.calls.append((request, list(fields)))
return {field: self.values[field] for field in fields if field in self.values}
def test_pageindex_structure_options_report_failed_register_build(monkeypatch):
from pageindex import PageIndexClient
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
with tempfile.TemporaryDirectory() as tmp:
source = Path(tmp) / "report.md"
source.write_text("# Report\n\nCached structure is not built yet.", encoding="utf-8")
filesystem = PageIndexFileSystem(workspace=Path(tmp) / "workspace")
def fail_index(*args, **kwargs):
raise RuntimeError("index failed")
monkeypatch.setattr(PageIndexClient, "index", fail_index)
filesystem.register_file(
storage_uri=source.as_uri(),
source_path="docs/report.md",
external_id="dsid_structural_missing",
title="Structural report",
content=source.read_text(encoding="utf-8"),
)
executor = PIFSCommandExecutor(filesystem, json_output=True)
structure = json.loads(executor.execute("cat --structure dsid_structural_missing"))
node = json.loads(executor.execute("cat --node 0001 dsid_structural_missing"))
pages = json.loads(executor.execute("cat --page 1-2 dsid_structural_missing"))
stat = json.loads(executor.execute("stat dsid_structural_missing"))
assert structure["data"]["mode"] == "structure"
assert structure["data"]["available"] is False
assert structure["data"]["status"] == "failed"
assert "PageIndexClient workspace" in structure["data"]["message"]
assert stat["data"]["pageindex_tree_status"] == "failed"
assert node["data"]["mode"] == "node"
assert node["data"]["available"] is False
assert node["data"]["node_id"] == "0001"
assert pages["data"]["mode"] == "page"
assert pages["data"]["available"] is False
assert pages["data"]["pages"] == "1-2"
assert "cp" not in executor.allowed_commands()
assert "mkdir" not in executor.allowed_commands()
def test_register_pdf_markdown_uses_pageindex_extracted_text_for_metadata_and_fts(monkeypatch):
from pageindex import PageIndexClient
from pageindex.filesystem import PageIndexFileSystem
def fake_index(self, file_path, mode="auto"):
suffix = Path(file_path).suffix.lower()
doc_id = f"doc_{suffix.lstrip('.')}"
if suffix == ".pdf":
doc = {
"id": doc_id,
"type": "pdf",
"path": str(Path(file_path).resolve()),
"doc_name": "report.pdf",
"doc_description": "",
"page_count": 2,
"structure": [{"title": "Report", "node_id": "0001", "nodes": []}],
"pages": [
{"page": 1, "content": "PageIndex PDF extracted alpha text."},
{"page": 2, "content": "Second PageIndex PDF extracted beta text."},
],
}
else:
doc = {
"id": doc_id,
"type": "md",
"path": str(Path(file_path).resolve()),
"doc_name": "notes",
"doc_description": "",
"line_count": 3,
"structure": [
{
"title": "Notes",
"node_id": "0001",
"line_num": 1,
"text": "# Notes\n\nPageIndex Markdown extracted gamma text.",
"nodes": [],
}
],
}
write_pageindex_client_doc(self.workspace, doc_id, doc)
self.documents[doc_id] = doc
return doc_id
monkeypatch.setattr(PageIndexClient, "index", fake_index)
with tempfile.TemporaryDirectory() as tmp:
source_pdf = Path(tmp) / "report.pdf"
source_md = Path(tmp) / "notes.md"
source_pdf.write_bytes(b"%PDF-1.4\n% test fixture\n")
source_md.write_text("# Notes\n\nCaller markdown content", encoding="utf-8")
generator = RecordingMetadataGenerator()
filesystem = PageIndexFileSystem(
workspace=Path(tmp) / "workspace",
metadata_generator=generator,
)
filesystem.register_file(
storage_uri=source_pdf.as_uri(),
source_path="docs/report.pdf",
external_id="dsid_pdf_extracted",
title="PDF extracted",
content="CALLER PDF CONTENT MUST NOT REACH GENERATOR",
)
filesystem.register_file(
storage_uri=source_md.as_uri(),
source_path="docs/notes.md",
external_id="dsid_md_extracted",
title="Markdown extracted",
content="CALLER MD CONTENT MUST NOT REACH GENERATOR",
)
pdf_request = generator.calls[0][0]
md_request = generator.calls[1][0]
pdf_stat = filesystem.store.file_info("dsid_pdf_extracted")
md_stat = filesystem.store.file_info("dsid_md_extracted")
assert "PageIndex PDF extracted alpha text" in pdf_request.text
assert "Second PageIndex PDF extracted beta text" in pdf_request.text
assert "CALLER PDF CONTENT" not in pdf_request.text
assert "PageIndex Markdown extracted gamma text" in md_request.text
assert "CALLER MD CONTENT" not in md_request.text
assert "PageIndex PDF extracted alpha text" in Path(
pdf_stat["text_artifact_path"]
).read_text(encoding="utf-8")
assert "PageIndex Markdown extracted gamma text" in Path(
md_stat["text_artifact_path"]
).read_text(encoding="utf-8")
assert [r.external_id for r in filesystem.search("alpha beta", limit=5)] == [
"dsid_pdf_extracted"
]
assert [r.external_id for r in filesystem.search("gamma", limit=5)] == [
"dsid_md_extracted"
]
assert filesystem.search("CALLER", limit=5) == []
def test_register_text_metadata_generation_keeps_caller_content_without_pageindex(monkeypatch):
from pageindex import PageIndexClient
from pageindex.filesystem import PageIndexFileSystem
def fail_index(*args, **kwargs):
raise AssertionError("PageIndexClient.index should not be called for text files")
monkeypatch.setattr(PageIndexClient, "index", fail_index)
with tempfile.TemporaryDirectory() as tmp:
generator = RecordingMetadataGenerator()
filesystem = PageIndexFileSystem(
workspace=Path(tmp) / "workspace",
metadata_generator=generator,
)
filesystem.register_file(
storage_uri="file:///tmp/readme.txt",
source_path="docs/readme.txt",
external_id="dsid_text_generation",
title="Text generation",
content="Plain text caller content stays authoritative.",
content_type="text/plain",
)
stat = filesystem.store.file_info("dsid_text_generation")
assert generator.calls[0][0].text == "Plain text caller content stays authoritative."
assert stat["pageindex_doc_id"] is None
assert stat["pageindex_tree_status"] == "not_built"
assert Path(stat["text_artifact_path"]).read_text(
encoding="utf-8"
) == "Plain text caller content stays authoritative."
def test_register_pdf_markdown_cache_miss_invokes_pageindex_client_index(monkeypatch):
from pageindex import PageIndexClient
from pageindex.filesystem import PageIndexFileSystem
calls: list[str] = []
def fake_index(self, file_path, mode="auto"):
calls.append(str(file_path))
doc_id = f"doc_{Path(file_path).suffix.lstrip('.')}"
doc_type = "pdf" if Path(file_path).suffix == ".pdf" else "md"
doc = {
"id": doc_id,
"type": doc_type,
"path": str(Path(file_path).resolve()),
"doc_name": Path(file_path).name,
"doc_description": "",
"structure": [{"title": Path(file_path).stem, "node_id": "0001", "nodes": []}],
}
if doc_type == "pdf":
doc["page_count"] = 1
doc["pages"] = [{"page": 1, "content": "Page one text"}]
else:
doc["line_count"] = 1
write_pageindex_client_doc(self.workspace, doc_id, doc)
self.documents[doc_id] = doc
return doc_id
monkeypatch.setattr(PageIndexClient, "index", fake_index)
with tempfile.TemporaryDirectory() as tmp:
source_pdf = Path(tmp) / "report.pdf"
source_md = Path(tmp) / "notes.md"
source_pdf.write_bytes(b"%PDF-1.4\n% test fixture\n")
source_md.write_text("# Notes", encoding="utf-8")
filesystem = PageIndexFileSystem(workspace=Path(tmp) / "workspace")
filesystem.register_file(
storage_uri=str(source_pdf),
source_path="docs/report.pdf",
external_id="dsid_pdf_build",
title="PDF build",
content="pdf text",
)
filesystem.register_file(
storage_uri=source_md.as_uri(),
source_path="docs/notes.md",
external_id="dsid_md_build",
title="Markdown build",
content=source_md.read_text(encoding="utf-8"),
)
pdf_stat = filesystem.store.file_info("dsid_pdf_build")
md_stat = filesystem.store.file_info("dsid_md_build")
assert calls == [str(source_pdf.resolve()), str(source_md.resolve())]
assert pdf_stat["pageindex_doc_id"] == "doc_pdf"
assert pdf_stat["pageindex_tree_status"] == "built"
assert md_stat["pageindex_doc_id"] == "doc_md"
assert md_stat["pageindex_tree_status"] == "built"
def test_cat_structure_page_reuses_pageindex_client_cache_without_indexing(monkeypatch):
from pageindex import PageIndexClient
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
with tempfile.TemporaryDirectory() as tmp:
source = Path(tmp) / "report.pdf"
source.write_bytes(b"%PDF-1.4\n% test fixture\n")
workspace = Path(tmp) / "workspace"
filesystem = PageIndexFileSystem(workspace=workspace)
write_pageindex_client_doc(
filesystem.pageindex_client_workspace,
"doc_cached_pdf",
{
"id": "doc_cached_pdf",
"type": "pdf",
"path": str(source.resolve()),
"doc_name": "report.pdf",
"doc_description": "",
"page_count": 2,
"structure": [
{
"title": "Introduction",
"node_id": "0001",
"text": "Intro section text",
"nodes": [
{
"title": "Findings",
"node_id": "0002",
"physical_index": 2,
"nodes": [],
}
],
}
],
"pages": [
{"page": 1, "content": "Page one text"},
{"page": 2, "content": "Page two text"},
],
},
)
def fail_index(*args, **kwargs):
raise AssertionError("PageIndexClient.index should not be called on cache hit")
monkeypatch.setattr(PageIndexClient, "index", fail_index)
filesystem.register_file(
storage_uri=source.as_uri(),
source_path="docs/report.pdf",
external_id="dsid_structural_cached",
title="Cached structural report",
content="text artifact remains available for grep, not cat --all",
)
executor = PIFSCommandExecutor(filesystem, json_output=True)
structure = json.loads(executor.execute("cat --structure dsid_structural_cached"))
pages = json.loads(executor.execute("cat --page 1-2 dsid_structural_cached"))
stat = json.loads(executor.execute("stat dsid_structural_cached"))
assert structure["data"]["available"] is True
assert structure["data"]["pageindex_doc_id"] == "doc_cached_pdf"
assert structure["data"]["structure"][0]["title"] == "Introduction"
assert "text" not in structure["data"]["structure"][0]
assert "text" not in structure["data"]["structure"][0]["nodes"][0]
assert pages["data"]["available"] is True
assert pages["data"]["text"] == "Page one text\n\nPage two text"
assert stat["data"]["pageindex_doc_id"] == "doc_cached_pdf"
assert stat["data"]["pageindex_tree_status"] == "built"
def test_cat_node_reads_pageindex_client_structure_without_custom_pifs_artifact():
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
with tempfile.TemporaryDirectory() as tmp:
source = Path(tmp) / "notes.md"
source.write_text("# Notes\n\nBody", encoding="utf-8")
filesystem = PageIndexFileSystem(workspace=Path(tmp) / "workspace")
write_pageindex_client_doc(
filesystem.pageindex_client_workspace,
"doc_cached_md",
{
"id": "doc_cached_md",
"type": "md",
"path": str(source.resolve()),
"doc_name": "notes",
"doc_description": "",
"line_count": 3,
"structure": [
{
"title": "Notes",
"node_id": "0001",
"line_num": 1,
"text": "# Notes\n\nBody",
"nodes": [],
}
],
},
)
filesystem.register_file(
storage_uri=source.as_uri(),
source_path="docs/notes.md",
external_id="dsid_md_cached",
title="Cached markdown notes",
content=source.read_text(encoding="utf-8"),
)
executor = PIFSCommandExecutor(filesystem, json_output=True)
node = json.loads(executor.execute("cat --node 0001 dsid_md_cached"))
assert node["data"]["available"] is True
assert node["data"]["pageindex_doc_id"] == "doc_cached_md"
assert node["data"]["node"]["title"] == "Notes"
assert node["data"]["text"] == "# Notes\n\nBody"
assert "text" not in node["data"]["node"]
def test_tree_folder_behavior_is_preserved():
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
with tempfile.TemporaryDirectory() as tmp:
filesystem = PageIndexFileSystem(workspace=Path(tmp) / "workspace")
filesystem.register_file(
storage_uri="file:///tmp/report.txt",
source_path="docs/report.txt",
folder_path="/docs/reports",
external_id="dsid_folder_tree",
title="Folder report",
content="folder tree behavior remains intact",
)
executor = PIFSCommandExecutor(filesystem, json_output=True)
folder_tree = json.loads(executor.execute("tree /docs --depth 2"))
assert folder_tree["data"]["path"] == "/docs"
assert folder_tree["data"]["folders"][0]["path"] == "/docs/reports"
def test_tree_does_not_read_file_internal_pageindex_structure():
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
from pageindex.filesystem.commands import PIFSCommandError
with tempfile.TemporaryDirectory() as tmp:
source = Path(tmp) / "report.pdf"
source.write_bytes(b"%PDF-1.4\n% test fixture\n")
filesystem = PageIndexFileSystem(workspace=Path(tmp) / "workspace")
write_pageindex_client_doc(
filesystem.pageindex_client_workspace,
"doc_tree_is_folder_only",
{
"id": "doc_tree_is_folder_only",
"type": "pdf",
"path": str(source.resolve()),
"doc_name": "report.pdf",
"doc_description": "",
"page_count": 1,
"structure": [
{"title": "Introduction", "node_id": "0001", "nodes": []}
],
"pages": [{"page": 1, "content": "Page one text"}],
},
)
filesystem.register_file(
storage_uri=source.as_uri(),
source_path="docs/report.pdf",
external_id="dsid_tree_is_folder_only",
title="Cached structural report",
content="text artifact remains available",
)
executor = PIFSCommandExecutor(filesystem, json_output=True)
with pytest.raises(PIFSCommandError):
executor.execute("tree dsid_tree_is_folder_only")
structure = json.loads(executor.execute("cat --structure dsid_tree_is_folder_only"))
assert structure["data"]["structure"][0]["title"] == "Introduction"
def test_cat_all_is_limited_to_text_files():
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
from pageindex.filesystem.commands import PIFSCommandError
with tempfile.TemporaryDirectory() as tmp:
filesystem = PageIndexFileSystem(workspace=Path(tmp) / "workspace")
filesystem.register_file(
storage_uri="file:///tmp/readme.txt",
source_path="docs/readme.txt",
external_id="dsid_text_file",
title="Text readme",
content="plain text body",
)
filesystem.register_file(
storage_uri="file:///tmp/report.pdf",
source_path="docs/report.pdf",
external_id="dsid_pdf_file",
title="PDF report",
content="extracted text should not be served through cat --all",
)
filesystem.register_file(
storage_uri="file:///tmp/notes.md",
source_path="docs/notes.md",
external_id="dsid_md_file",
title="Markdown notes",
content="markdown text should use PageIndex structure reads",
)
filesystem.register_file(
storage_uri="file:///tmp/data.json",
source_path="docs/data.json",
external_id="dsid_json_file",
title="JSON record",
content='{"body":"json"}',
content_type="application/json",
)
executor = PIFSCommandExecutor(filesystem, json_output=True)
text = json.loads(executor.execute("cat --all dsid_text_file"))
assert text["data"]["text"] == "plain text body"
with pytest.raises(PIFSCommandError, match="only supported for txt/text files"):
executor.execute("cat --all dsid_pdf_file")
with pytest.raises(ValueError, match="not supported for PDF/Markdown"):
filesystem.open("dsid_pdf_file")
with pytest.raises(PIFSCommandError, match="only supported for txt/text files"):
executor.execute("cat --all dsid_md_file")
with pytest.raises(ValueError, match="not supported for PDF/Markdown"):
filesystem.open("dsid_md_file")
with pytest.raises(PIFSCommandError, match="only supported for txt/text files"):
executor.execute("cat --all dsid_json_file")
assert filesystem.open("dsid_json_file").text == '{"body":"json"}'
for command in (
"head dsid_pdf_file",
"tail dsid_pdf_file",
"sed -n 1,1p dsid_pdf_file",
"head dsid_md_file",
"tail dsid_md_file",
"sed -n 1,1p dsid_md_file",
):
with pytest.raises(PIFSCommandError, match="only supported for txt/text files"):
executor.execute(command)
def test_pageindex_structure_commands_are_limited_to_pdf_and_markdown():
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
from pageindex.filesystem.commands import PIFSCommandError
with tempfile.TemporaryDirectory() as tmp:
filesystem = PageIndexFileSystem(workspace=Path(tmp) / "workspace")
filesystem.register_file(
storage_uri="file:///tmp/readme.txt",
source_path="docs/readme.txt",
external_id="dsid_text_only",
title="Text readme",
content="plain text body",
)
executor = PIFSCommandExecutor(filesystem, json_output=True)
for command in (
"cat --structure dsid_text_only",
"cat --page 1 dsid_text_only",
"cat --node 0001 dsid_text_only",
):
with pytest.raises(PIFSCommandError, match="only supported for PDF/Markdown"):
executor.execute(command)
def test_existing_pageindex_status_allows_legacy_record_without_format_suffix():
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
from pageindex.filesystem.commands import PIFSCommandError
with tempfile.TemporaryDirectory() as tmp:
source = Path(tmp) / "uploaded"
source.write_text("# Uploaded\n\nBody", encoding="utf-8")
filesystem = PageIndexFileSystem(workspace=Path(tmp) / "workspace")
file_ref = filesystem.register_file(
storage_uri=source.as_uri(),
source_path="uploads/uploaded",
external_id="dsid_legacy_pageindex",
title="Legacy PageIndex record",
content="text/plain is only a weak default here",
)
write_pageindex_client_doc(
filesystem.pageindex_client_workspace,
"doc_legacy_pageindex",
{
"id": "doc_legacy_pageindex",
"type": "md",
"path": str(source.resolve()),
"doc_name": "uploaded",
"doc_description": "",
"line_count": 3,
"structure": [
{"title": "Uploaded", "node_id": "0001", "text": "Body", "nodes": []}
],
},
)
filesystem.store.update_pageindex_pointer(
file_ref,
pageindex_doc_id="doc_legacy_pageindex",
pageindex_tree_status="built",
)
executor = PIFSCommandExecutor(filesystem, json_output=True)
structure = json.loads(executor.execute("cat --structure dsid_legacy_pageindex"))
assert structure["data"]["structure"][0]["title"] == "Uploaded"
with pytest.raises(PIFSCommandError, match="only supported for txt/text files"):
executor.execute("cat --all dsid_legacy_pageindex")
def test_read_commands_do_not_link_pageindex_cache_when_pointer_is_missing(monkeypatch):
from pageindex import PageIndexClient
from pageindex.filesystem import PIFSCommandExecutor, PageIndexFileSystem
with tempfile.TemporaryDirectory() as tmp:
source = Path(tmp) / "late.md"
source.write_text("# Late\n\nBody", encoding="utf-8")
filesystem = PageIndexFileSystem(workspace=Path(tmp) / "workspace")
def fail_index(*args, **kwargs):
raise RuntimeError("index failed")
monkeypatch.setattr(PageIndexClient, "index", fail_index)
filesystem.register_file(
storage_uri=source.as_uri(),
source_path="docs/late.md",
external_id="dsid_late_cache",
title="Late cache",
content=source.read_text(encoding="utf-8"),
)
write_pageindex_client_doc(
filesystem.pageindex_client_workspace,
"doc_late_cache",
{
"id": "doc_late_cache",
"type": "md",
"path": str(source.resolve()),
"doc_name": "late",
"doc_description": "",
"line_count": 3,
"structure": [
{"title": "Late", "node_id": "0001", "text": "Body", "nodes": []}
],
},
)
executor = PIFSCommandExecutor(filesystem, json_output=True)
structure = json.loads(executor.execute("cat --structure dsid_late_cache"))
stat = json.loads(executor.execute("stat dsid_late_cache"))
assert structure["data"]["available"] is False
assert stat["data"]["pageindex_doc_id"] is None
assert stat["data"]["pageindex_tree_status"] == "failed"

View file

@ -0,0 +1,185 @@
import io
import os
import unittest
from types import SimpleNamespace
from pydantic import BaseModel, ConfigDict
from pageindex.filesystem.agent import (
PIFSAgentStreamObserver,
build_agent_model_settings,
normalize_agent_stream_mode,
normalize_reasoning_effort,
normalize_reasoning_summary,
pifs_agent_raw_reasoning_enabled,
serialize_agent_final_output,
should_disable_pifs_agent_tracing,
should_use_openai_compatible_chat_model,
)
class StructuredAnswer(BaseModel):
model_config = ConfigDict(extra="forbid")
answer: str
document_ids: list[str]
class PIFSAgentStreamTest(unittest.TestCase):
def raw_event(self, event_type, delta):
return SimpleNamespace(
type="raw_response_event",
data=SimpleNamespace(type=event_type, delta=delta),
)
def test_model_stream_prints_output_and_think_deltas(self):
output = io.StringIO()
stream_log = []
observer = PIFSAgentStreamObserver("model", stream_log=stream_log, output=output)
observer.handle_event(self.raw_event("response.reasoning_summary_text.delta", "look up folder"))
observer.handle_event(self.raw_event("response.output_text.delta", '{"answer":'))
observer.handle_event(self.raw_event("response.output_text.delta", '"done"}'))
observer.finish()
printed = output.getvalue()
self.assertIn("[llm reasoning summary stream]", printed)
self.assertIn("look up folder", printed)
self.assertIn("[llm final output stream]", printed)
self.assertIn('{"answer":"done"}', printed.replace("\n", ""))
self.assertEqual(
stream_log,
[
{"kind": "output", "text": '{"answer":"done"}'},
{"kind": "think_summary", "text": "look up folder"},
],
)
def test_tools_mode_does_not_print_model_text(self):
output = io.StringIO()
stream_log = []
observer = PIFSAgentStreamObserver("tools", stream_log=stream_log, output=output)
observer.handle_event(self.raw_event("response.output_text.delta", "hidden from tools mode"))
observer.handle_event(self.raw_event("response.function_call_arguments.delta", '{"command":"ls /"}'))
observer.emit_tool_call("ls /")
observer.emit_tool_result(ok=True, output='{"ok": true}', seconds=0.001)
observer.finish()
printed = output.getvalue()
self.assertNotIn("hidden from tools mode", printed)
self.assertIn("[llm -> pifs command]", printed)
self.assertIn("ls /", printed)
self.assertIn("[pifs -> llm result preview]", printed)
self.assertIn('{"ok": true}', printed)
self.assertEqual(stream_log[0], {"kind": "tool_call", "command": "ls /"})
self.assertEqual(stream_log[1]["kind"], "tool_result")
self.assertEqual(stream_log[2], {"kind": "tool_args", "text": '{"command":"ls /"}'})
def test_tool_result_preview_compacts_large_outputs(self):
output = io.StringIO()
observer = PIFSAgentStreamObserver("tools", output=output)
observer.emit_tool_result(
ok=True,
output="\n".join(f"line {index}" for index in range(50)),
seconds=0.001,
)
printed = output.getvalue()
self.assertIn("[large PIFS result", printed)
self.assertIn("line 0", printed)
self.assertIn("more lines omitted from preview", printed)
self.assertNotIn("line 49", printed)
def test_raw_reasoning_is_not_logged_by_default_but_summary_is(self):
output = io.StringIO()
stream_log = []
previous = os.environ.pop("PAGEINDEX_PIFS_AGENT_RAW_REASONING", None)
try:
observer = PIFSAgentStreamObserver("model", stream_log=stream_log, output=output)
observer.handle_event(self.raw_event("response.reasoning_text.delta", "private chain"))
observer.handle_event(
self.raw_event("response.reasoning_summary_text.delta", "visible summary")
)
observer.finish()
finally:
if previous is not None:
os.environ["PAGEINDEX_PIFS_AGENT_RAW_REASONING"] = previous
printed = output.getvalue()
self.assertNotIn("private chain", printed)
self.assertIn("visible summary", printed)
self.assertEqual(stream_log, [{"kind": "think_summary", "text": "visible summary"}])
def test_raw_reasoning_requires_debug_env_flag(self):
self.assertFalse(pifs_agent_raw_reasoning_enabled({}))
self.assertTrue(
pifs_agent_raw_reasoning_enabled({"PAGEINDEX_PIFS_AGENT_RAW_REASONING": "on"})
)
self.assertTrue(
pifs_agent_raw_reasoning_enabled({"PAGEINDEX_PIFS_AGENT_RAW_REASONING": "TRUE"})
)
self.assertFalse(
pifs_agent_raw_reasoning_enabled({"PAGEINDEX_PIFS_AGENT_RAW_REASONING": "0"})
)
def test_stream_mode_aliases(self):
self.assertEqual(normalize_agent_stream_mode("think"), "model")
self.assertEqual(normalize_agent_stream_mode("debug"), "all")
self.assertEqual(normalize_agent_stream_mode(""), "off")
with self.assertRaises(ValueError):
normalize_agent_stream_mode("nope")
def test_reasoning_settings_enable_effort_and_summary(self):
settings = build_agent_model_settings(
reasoning_effort="medium",
reasoning_summary="detailed",
)
self.assertIsNotNone(settings)
self.assertEqual(settings.reasoning.effort, "medium")
self.assertEqual(settings.reasoning.summary, "detailed")
self.assertEqual(settings.verbosity, "low")
def test_reasoning_effort_defaults_to_visible_summary(self):
settings = build_agent_model_settings(reasoning_effort="low")
self.assertIsNotNone(settings)
self.assertEqual(settings.reasoning.effort, "low")
self.assertEqual(settings.reasoning.summary, "auto")
def test_reasoning_and_base_url_normalization(self):
self.assertEqual(normalize_reasoning_effort("xhigh"), "xhigh")
self.assertIsNone(normalize_reasoning_summary("none"))
self.assertFalse(should_use_openai_compatible_chat_model(None))
self.assertFalse(should_use_openai_compatible_chat_model("https://api.openai.com/v1/"))
self.assertTrue(should_use_openai_compatible_chat_model("https://example.test/v1"))
with self.assertRaises(ValueError):
normalize_reasoning_effort("maximum")
def test_tracing_is_disabled_by_default_unless_env_enables_it(self):
self.assertTrue(should_disable_pifs_agent_tracing({}))
self.assertFalse(
should_disable_pifs_agent_tracing({"PAGEINDEX_PIFS_AGENT_TRACING": "1"})
)
self.assertFalse(
should_disable_pifs_agent_tracing({"PAGEINDEX_PIFS_AGENT_TRACING": "true"})
)
self.assertFalse(
should_disable_pifs_agent_tracing({"PAGEINDEX_PIFS_AGENT_TRACING": "on"})
)
self.assertTrue(
should_disable_pifs_agent_tracing({"PAGEINDEX_PIFS_AGENT_TRACING": "0"})
)
def test_structured_agent_output_serializes_to_json(self):
output = serialize_agent_final_output(
StructuredAnswer(answer="done", document_ids=["dsid_1"])
)
self.assertEqual(output, '{"answer":"done","document_ids":["dsid_1"]}')
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,53 @@
import sys
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[1]
if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))
from pageindex.filesystem.semantic_index import (
SemanticIndexRecord,
SQLiteVecSemanticIndex,
)
def test_sqlite_vec_semantic_index_round_trip(tmp_path):
index = SQLiteVecSemanticIndex(tmp_path / "semantic.sqlite")
index.reset(dimension=3, metadata={"field_mode": "summary"})
index.upsert_many(
[
SemanticIndexRecord(
file_ref="file_a",
external_id="doc_a",
source_type="github",
source_path="github/a.json",
title="Multipart upload limits",
text="multipart upload limits",
vector=[1.0, 0.0, 0.0],
metadata={"topic": "uploads"},
),
SemanticIndexRecord(
file_ref="file_b",
external_id="doc_b",
source_type="slack",
source_path="slack/b.json",
title="GPU cache issue",
text="gpu cache issue",
vector=[0.0, 1.0, 0.0],
metadata={"topic": "runtime"},
),
]
)
assert index.info()["document_count"] == 2
results = index.search([0.9, 0.1, 0.0], limit=2)
assert [item.external_id for item in results] == ["doc_a", "doc_b"]
filtered = index.search(
[0.9, 0.1, 0.0],
limit=2,
filters={"source_type": "slack"},
)
assert [item.external_id for item in filtered] == ["doc_b"]

1988
uv.lock generated Normal file

File diff suppressed because it is too large Load diff