2026-04-06 22:51:04 +08:00
|
|
|
# pageindex/collection.py
|
|
|
|
|
from __future__ import annotations
|
2026-05-15 11:14:12 +08:00
|
|
|
import os
|
|
|
|
|
import warnings
|
2026-04-06 22:51:04 +08:00
|
|
|
from typing import AsyncIterator
|
|
|
|
|
from .events import QueryEvent
|
|
|
|
|
from .backend.protocol import Backend
|
|
|
|
|
|
|
|
|
|
|
2026-05-15 11:14:12 +08:00
|
|
|
def _multidoc_acked() -> bool:
|
|
|
|
|
return os.getenv("PAGEINDEX_EXPERIMENTAL_MULTIDOC", "").lower() in ("1", "true", "yes")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_MULTIDOC_WARNING = (
|
2026-05-15 17:03:17 +08:00
|
|
|
"Querying the entire collection (no doc_ids) is experimental — a naive "
|
|
|
|
|
"first implementation that lets the agent pick docs from auto-generated "
|
|
|
|
|
"descriptions. Better cross-document retrieval is on the way. Pass "
|
|
|
|
|
"doc_ids=[...] for reliable results, or set "
|
|
|
|
|
"PAGEINDEX_EXPERIMENTAL_MULTIDOC=1 to silence this warning."
|
2026-05-15 11:14:12 +08:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
2026-04-06 22:51:04 +08:00
|
|
|
class QueryStream:
|
|
|
|
|
"""Wraps backend.query_stream() as an async iterable object."""
|
|
|
|
|
|
|
|
|
|
def __init__(self, backend: Backend, collection: str, question: str,
|
|
|
|
|
doc_ids: list[str] | None = None):
|
|
|
|
|
self._backend = backend
|
|
|
|
|
self._collection = collection
|
|
|
|
|
self._question = question
|
|
|
|
|
self._doc_ids = doc_ids
|
|
|
|
|
|
|
|
|
|
async def stream_events(self) -> AsyncIterator[QueryEvent]:
|
|
|
|
|
async for event in self._backend.query_stream(
|
|
|
|
|
self._collection, self._question, self._doc_ids
|
|
|
|
|
):
|
|
|
|
|
yield event
|
|
|
|
|
|
|
|
|
|
def __aiter__(self):
|
|
|
|
|
return self.stream_events()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Collection:
|
|
|
|
|
def __init__(self, name: str, backend: Backend):
|
|
|
|
|
self._name = name
|
|
|
|
|
self._backend = backend
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def name(self) -> str:
|
|
|
|
|
return self._name
|
|
|
|
|
|
|
|
|
|
def add(self, file_path: str) -> str:
|
|
|
|
|
return self._backend.add_document(self._name, file_path)
|
|
|
|
|
|
|
|
|
|
def list_documents(self) -> list[dict]:
|
|
|
|
|
return self._backend.list_documents(self._name)
|
|
|
|
|
|
|
|
|
|
def get_document(self, doc_id: str, include_text: bool = False) -> dict:
|
|
|
|
|
return self._backend.get_document(self._name, doc_id, include_text=include_text)
|
|
|
|
|
|
|
|
|
|
def get_document_structure(self, doc_id: str) -> list:
|
|
|
|
|
return self._backend.get_document_structure(self._name, doc_id)
|
|
|
|
|
|
|
|
|
|
def get_page_content(self, doc_id: str, pages: str) -> list:
|
|
|
|
|
return self._backend.get_page_content(self._name, doc_id, pages)
|
|
|
|
|
|
|
|
|
|
def delete_document(self, doc_id: str) -> None:
|
|
|
|
|
self._backend.delete_document(self._name, doc_id)
|
|
|
|
|
|
2026-05-15 17:03:17 +08:00
|
|
|
def query(self, question: str,
|
|
|
|
|
doc_ids: str | list[str] | None = None,
|
2026-04-06 22:51:04 +08:00
|
|
|
stream: bool = False) -> str | QueryStream:
|
|
|
|
|
"""Query documents in this collection.
|
|
|
|
|
|
|
|
|
|
- stream=False: returns answer string (sync)
|
|
|
|
|
- stream=True: returns async iterable of QueryEvent
|
|
|
|
|
|
2026-05-15 17:03:17 +08:00
|
|
|
``doc_ids`` can be a single doc id (``str``) or a list. ``None`` queries
|
|
|
|
|
the entire collection (experimental).
|
|
|
|
|
|
2026-04-06 22:51:04 +08:00
|
|
|
Usage:
|
2026-05-15 17:03:17 +08:00
|
|
|
answer = col.query("question", doc_ids=doc_id) # single
|
|
|
|
|
answer = col.query("question", doc_ids=[d1, d2]) # multi
|
|
|
|
|
async for event in col.query("question", doc_ids=doc_id, stream=True):
|
2026-04-06 22:51:04 +08:00
|
|
|
...
|
2026-05-15 11:14:12 +08:00
|
|
|
|
|
|
|
|
Passing doc_ids=None queries the entire collection — this is
|
|
|
|
|
experimental; emits a UserWarning unless PAGEINDEX_EXPERIMENTAL_MULTIDOC
|
|
|
|
|
is set.
|
2026-04-06 22:51:04 +08:00
|
|
|
"""
|
2026-05-15 17:03:17 +08:00
|
|
|
if isinstance(doc_ids, str):
|
|
|
|
|
doc_ids = [doc_ids]
|
|
|
|
|
elif doc_ids == []:
|
|
|
|
|
raise ValueError(
|
|
|
|
|
"doc_ids cannot be empty; pass None to query the whole collection"
|
|
|
|
|
)
|
2026-05-15 11:14:12 +08:00
|
|
|
if doc_ids is None and not _multidoc_acked():
|
|
|
|
|
docs = self._backend.list_documents(self._name)
|
|
|
|
|
if not docs:
|
|
|
|
|
raise ValueError(
|
|
|
|
|
f"Cannot query collection '{self._name}': it is empty. "
|
|
|
|
|
"Add documents with col.add(...) first."
|
|
|
|
|
)
|
|
|
|
|
if len(docs) > 1:
|
|
|
|
|
warnings.warn(_MULTIDOC_WARNING, UserWarning, stacklevel=2)
|
2026-04-06 22:51:04 +08:00
|
|
|
if stream:
|
|
|
|
|
return QueryStream(self._backend, self._name, question, doc_ids)
|
|
|
|
|
return self._backend.query(self._name, question, doc_ids)
|