PageIndex/examples/agentic_vectorless_rag_demo.py

175 lines
7.2 KiB
Python
Raw Normal View History

"""
PageIndex x OpenAI Agents Demo
Demonstrates how to use PageIndexClient with the OpenAI Agents SDK
to build a document QA agent with 3 tools:
- get_document()
- get_document_structure()
- get_page_content()
Requirements:
pip install openai-agents
Steps:
1 Index PDF and inspect tree structure
2 Inspect document metadata
3 Ask a question (agent auto-calls tools)
4 Reload from workspace and verify persistence
"""
import os
import sys
import json
import asyncio
import concurrent.futures
import requests
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from agents import Agent, ItemHelpers, Runner, function_tool
from agents.stream_events import RawResponsesStreamEvent, RunItemStreamEvent
from openai.types.responses import ResponseTextDeltaEvent, ResponseReasoningSummaryTextDeltaEvent # noqa: F401
from pageindex import PageIndexClient
import pageindex.utils as utils
_EXAMPLES_DIR = os.path.dirname(os.path.abspath(__file__))
PDF_URL = "https://arxiv.org/pdf/2603.15031"
PDF_PATH = os.path.join(_EXAMPLES_DIR, "documents", "attention-residuals.pdf")
WORKSPACE = os.path.join(_EXAMPLES_DIR, "workspace")
AGENT_SYSTEM_PROMPT = """
You are PageIndex, a document QA assistant.
TOOL USE:
- Call get_document() first to confirm status and page/line count.
- Call get_document_structure() to find relevant page ranges (use node summaries and start_index/end_index).
- Call get_page_content(pages="5-7") with tight ranges. Never fetch the whole doc.
- When calling tool call, output one short sentence explaining reason.
ANSWERING: Answer based only on tool output. Be concise.
"""
def query_agent(
client: PageIndexClient,
doc_id: str,
prompt: str,
verbose: bool = False,
) -> str:
"""Run a document QA agent using the OpenAI Agents SDK.
Streams text output token-by-token and returns the full answer string.
Tool calls are always printed; verbose=True also prints arguments and output previews.
"""
@function_tool
def get_document() -> str:
"""Get document metadata: status, page count, name, and description."""
return client.get_document(doc_id)
@function_tool
def get_document_structure() -> str:
"""Get the document's full tree structure (without text) to find relevant sections."""
return client.get_document_structure(doc_id)
@function_tool
def get_page_content(pages: str) -> str:
"""
Get the text content of specific pages or line numbers.
Use tight ranges: e.g. '5-7' for pages 5 to 7, '3,8' for pages 3 and 8, '12' for page 12.
For Markdown documents, use line numbers from the structure's line_num field.
"""
return client.get_page_content(doc_id, pages)
agent = Agent(
name="PageIndex",
instructions=AGENT_SYSTEM_PROMPT,
tools=[get_document, get_document_structure, get_page_content],
model=client.retrieve_model,
)
async def _run():
collected = []
streamed_this_turn = False
streamed_run = Runner.run_streamed(agent, prompt)
async for event in streamed_run.stream_events():
if isinstance(event, RawResponsesStreamEvent):
if isinstance(event.data, ResponseReasoningSummaryTextDeltaEvent):
print(event.data.delta, end="", flush=True)
elif isinstance(event.data, ResponseTextDeltaEvent):
delta = event.data.delta
print(delta, end="", flush=True)
collected.append(delta)
streamed_this_turn = True
elif isinstance(event, RunItemStreamEvent):
item = event.item
if item.type == "message_output_item":
if not streamed_this_turn:
text = ItemHelpers.text_message_output(item)
if text:
print(f"{text}")
streamed_this_turn = False
collected.clear()
elif item.type == "tool_call_item":
if streamed_this_turn:
print() # end streaming line before tool call
raw = item.raw_item
args = getattr(raw, "arguments", "{}")
args_str = f"({args})" if verbose else ""
print(f"[tool call]: {raw.name}{args_str}")
elif item.type == "tool_call_output_item" and verbose:
output = str(item.output)
preview = output[:200] + "..." if len(output) > 200 else output
print(f"[tool output]: {preview}\n")
return "".join(collected)
try:
asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
return pool.submit(asyncio.run, _run()).result()
except RuntimeError:
return asyncio.run(_run())
# ── Download PDF if needed ─────────────────────────────────────────────────────
if not os.path.exists(PDF_PATH):
print(f"Downloading {PDF_URL} ...")
os.makedirs(os.path.dirname(PDF_PATH), exist_ok=True)
with requests.get(PDF_URL, stream=True, timeout=30) as r:
r.raise_for_status()
with open(PDF_PATH, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print("Download complete.\n")
# ── Setup ──────────────────────────────────────────────────────────────────────
client = PageIndexClient(workspace=WORKSPACE)
# ── Step 1: Index + Tree ───────────────────────────────────────────────────────
print("=" * 60)
print("Step 1: Indexing PDF and inspecting tree structure")
print("=" * 60)
doc_id = next((did for did, doc in client.documents.items()
if doc.get('doc_name') == os.path.basename(PDF_PATH)), None)
if doc_id:
print(f"\nLoaded cached doc_id: {doc_id}")
else:
doc_id = client.index(PDF_PATH)
print(f"\nIndexed. doc_id: {doc_id}")
print("\nTree Structure (top-level sections):")
structure = json.loads(client.get_document_structure(doc_id))
utils.print_tree(structure)
# ── Step 2: Document Metadata ──────────────────────────────────────────────────
print("\n" + "=" * 60)
print("Step 2: Document Metadata (get_document)")
print("=" * 60)
print(client.get_document(doc_id))
# ── Step 3: Agent Query ────────────────────────────────────────────────────────
print("\n" + "=" * 60)
print("Step 3: Agent Query (auto tool-use)")
print("=" * 60)
question = "Explain Attention Residuals in simple language."
print(f"\nQuestion: '{question}'\n")
query_agent(client, doc_id, question, verbose=True)