feat(evals): publish multimodal_doc parser_compare benchmark + n=171 report

Adds the full parser_compare experiment for the multimodal_doc suite:
six arms compared on 30 PDFs / 171 questions from MMLongBench-Doc with
anthropic/claude-sonnet-4.5 across the board.

Source code:
- core/parsers/{azure_di,llamacloud,pdf_pages}.py: direct parser SDK
  callers (Azure Document Intelligence prebuilt-read/layout, LlamaParse
  parse_page_with_llm/parse_page_with_agent) used by the LC arms,
  bypassing the SurfSense backend so each (basic/premium) extraction
  is a clean A/B independent of backend ETL routing.
- suites/multimodal_doc/parser_compare/{ingest,runner,prompt}.py:
  six-arm benchmark (native_pdf, azure_basic_lc, azure_premium_lc,
  llamacloud_basic_lc, llamacloud_premium_lc, surfsense_agentic) with
  byte-identical prompts per question, deterministic grader, Wilson
  CIs, and the per-page preprocessing tariff cost overlay.

Reproducibility:
- pyproject.toml + uv.lock pin pypdf, azure-ai-documentintelligence,
  llama-cloud-services as new deps.
- .env.example documents the AZURE_DI_* and LLAMA_CLOUD_API_KEY env
  vars now required for parser_compare.
- 12 analysis scripts under scripts/: retry pass with exponential
  backoff, post-retry accuracy merge, McNemar / latency / per-PDF
  stats, context-overflow hypothesis test, etc. Each produces one
  number cited by the blog report.

Citation surface:
- reports/blog/multimodal_doc_parser_compare_n171_report.md: 1219-line
  technical writeup (16 sections) covering headline accuracy, per-format
  accuracy, McNemar pairwise significance, latency / token / per-PDF
  distributions, error analysis, retry experiment, post-retry final
  accuracy, cost amortization model with closed-form derivation, threats
  to validity, and reproducibility appendix.
- data/multimodal_doc/runs/2026-05-14T00-53-19Z/parser_compare/{raw,
  raw_retries,raw_post_retry}.jsonl + run_artifact.json + retry summary
  whitelisted via data/.gitignore as the verifiable numbers source.

Gitignore:
- ignore logs_*.txt + retry_run.log; structured artifacts cover the
  citation surface, debug logs are noise.
- data/.gitignore default-ignores everything, whitelists the n=171 run
  artifacts only (parser manifest left ignored to avoid leaking local
  Windows usernames in absolute paths; manifest is fully regenerable
  via 'ingest multimodal_doc parser_compare').
- reports/.gitignore now whitelists hand-curated reports/blog/.

Also retires the abandoned CRAG Task 3 implementation (download script,
streaming Task 3 ingest, CragTask3Benchmark + tests) and trims the
runner / ingest module APIs to match.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
DESKTOP-RTLN3BA\$punk 2026-05-14 19:54:41 -07:00
parent 3737118050
commit 9bcd50164d
40 changed files with 9303 additions and 993 deletions

View file

@ -0,0 +1,35 @@
"""Direct parser invocations for the parser_compare benchmark.
The SurfSense backend exposes a single ``ETL_SERVICE`` env var that
picks one parser globally; per-ingestion overrides are not on the
public API. To drive the four (Azure DI x basic/premium, LlamaCloud x
basic/premium) extractions we need for ``multimodal_doc/parser_compare``
we therefore call the Azure DI and LlamaCloud SDKs directly from the
eval harness, mirroring the production code path in
``surfsense_backend/app/etl_pipeline/parsers/``.
Two design rules:
* No backend imports the eval harness cannot pull in the FastAPI
app's config layer (it would require the full backend ``.env`` plus a
reachable Postgres). We re-read keys from our own environment instead.
* Same wire shape as the backend's parsers (Azure ``prebuilt-read`` /
``prebuilt-layout`` selected by ``processing_mode``; LlamaCloud
``parse_page_with_llm`` / ``parse_page_with_agent`` selected by
``processing_mode``) so any quality conclusions transfer back to
production behaviour.
"""
from __future__ import annotations
from .azure_di import AzureDIError, parse_with_azure_di
from .llamacloud import LlamaCloudError, parse_with_llamacloud
from .pdf_pages import count_pdf_pages
__all__ = [
"AzureDIError",
"LlamaCloudError",
"count_pdf_pages",
"parse_with_azure_di",
"parse_with_llamacloud",
]

View file

@ -0,0 +1,144 @@
"""Azure Document Intelligence parser — eval-side mirror of the backend.
Calls ``DocumentIntelligenceClient.begin_analyze_document`` with one
of two ``model_id`` slugs depending on ``processing_mode``:
* ``basic`` ``prebuilt-read`` (text OCR only, cheaper, faster)
* ``premium`` ``prebuilt-layout`` (text + tables + structure;
produces real markdown headings,
pipe-tables, etc.)
These are the same model selections the production
``surfsense_backend/app/etl_pipeline/parsers/azure_doc_intelligence.py``
makes per ``processing_mode``. Output format is forced to Markdown
(``DocumentContentFormat.MARKDOWN``) so the long-context arm can stuff
it into a prompt verbatim.
Retry policy is intentionally light here (the eval harness re-runs
the whole batch on top-level failure); we do one synchronous attempt
plus exponential backoff on transient transport errors.
"""
from __future__ import annotations
import asyncio
import logging
import os
import random
logger = logging.getLogger(__name__)
_AZURE_MODEL_BY_MODE = {
"basic": "prebuilt-read",
"premium": "prebuilt-layout",
}
_MAX_RETRIES = 4
_BASE_DELAY = 5.0
_MAX_DELAY = 60.0
class AzureDIError(RuntimeError):
"""Raised when Azure DI fails after all retries."""
async def parse_with_azure_di(
file_path: str | os.PathLike,
*,
processing_mode: str = "basic",
endpoint: str | None = None,
api_key: str | None = None,
) -> str:
"""Run Azure DI on ``file_path`` and return the markdown content.
``endpoint`` / ``api_key`` default to ``AZURE_DI_ENDPOINT`` and
``AZURE_DI_KEY`` env vars (set in ``surfsense_evals/.env``).
Raises ``AzureDIError`` after exhausting retries; ``ValueError`` if
credentials are missing.
"""
endpoint = endpoint or os.environ.get("AZURE_DI_ENDPOINT")
api_key = api_key or os.environ.get("AZURE_DI_KEY")
if not endpoint or not api_key:
raise ValueError(
"AZURE_DI_ENDPOINT and AZURE_DI_KEY must be set "
"(see surfsense_evals/.env)."
)
model_id = _AZURE_MODEL_BY_MODE.get(processing_mode, "prebuilt-read")
# Lazy imports — surfsense_evals shouldn't pay the azure-sdk
# import cost on every CLI invocation that doesn't touch
# parser_compare.
from azure.ai.documentintelligence.aio import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import DocumentContentFormat
from azure.core.credentials import AzureKeyCredential
from azure.core.exceptions import (
ClientAuthenticationError,
HttpResponseError,
ServiceRequestError,
ServiceResponseError,
)
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
logger.info(
"Azure DI parsing %s (mode=%s, model=%s, size=%.1fMB)",
file_path, processing_mode, model_id, file_size_mb,
)
last_exc: Exception | None = None
for attempt in range(1, _MAX_RETRIES + 1):
try:
client = DocumentIntelligenceClient(
endpoint=endpoint,
credential=AzureKeyCredential(api_key),
)
async with client:
with open(file_path, "rb") as fh:
poller = await client.begin_analyze_document(
model_id,
body=fh,
output_content_format=DocumentContentFormat.MARKDOWN,
)
result = await poller.result()
content = (result.content or "").strip()
if not content:
raise AzureDIError(
f"Azure DI returned empty content for {file_path}"
)
logger.info(
"Azure DI OK: %s (%s) -> %d chars",
file_path, model_id, len(content),
)
return content
except ClientAuthenticationError:
raise
except HttpResponseError as exc:
# 4xx that's not auth: don't retry, the request itself is broken.
if exc.status_code and 400 <= exc.status_code < 500:
raise AzureDIError(
f"Azure DI {exc.status_code} on {file_path}: {exc}"
) from exc
last_exc = exc
except (ServiceRequestError, ServiceResponseError) as exc:
last_exc = exc
if attempt < _MAX_RETRIES:
delay = min(_BASE_DELAY * (2 ** (attempt - 1)), _MAX_DELAY)
jitter = delay * 0.25 * (2 * random.random() - 1)
sleep_for = delay + jitter
logger.warning(
"Azure DI attempt %d/%d failed (%s); retrying in %.1fs",
attempt, _MAX_RETRIES, type(last_exc).__name__, sleep_for,
)
await asyncio.sleep(sleep_for)
raise AzureDIError(
f"Azure DI failed after {_MAX_RETRIES} attempts on {file_path}"
) from last_exc
__all__ = ["AzureDIError", "parse_with_azure_di"]

View file

@ -0,0 +1,168 @@
"""LlamaParse (LlamaCloud) parser — eval-side mirror of the backend.
Calls ``LlamaParse.aparse`` with one of two ``parse_mode`` slugs
depending on ``processing_mode``:
* ``basic`` ``parse_page_with_llm`` (cheap, single-LLM-call/page)
* ``premium`` ``parse_page_with_agent`` (multi-step agent per page;
handles tables / figures
substantially better)
These are the exact mappings from production
``surfsense_backend/app/etl_pipeline/parsers/llamacloud.py``. We keep
``num_workers=1`` and language=``"en"`` to match production.
The result is materialised via ``get_markdown_documents(split_by_page=False)``
which concatenates every page into a single markdown string, exactly
the shape we need for long-context stuffing.
"""
from __future__ import annotations
import asyncio
import logging
import os
import random
import httpx
logger = logging.getLogger(__name__)
_LLAMA_PARSE_MODE_MAP = {
"basic": "parse_page_with_llm",
"premium": "parse_page_with_agent",
}
_MAX_RETRIES = 3
_BASE_DELAY = 10.0
_MAX_DELAY = 90.0
class LlamaCloudError(RuntimeError):
"""Raised when LlamaCloud parse fails after all retries."""
def _extract_markdown(result) -> str:
"""Pull markdown out of whatever object LlamaParse.aparse returns.
Mirrors backend's tolerant extraction: the SDK has gone through
several response shapes; we accept all of them so a minor SDK bump
doesn't silently zero the eval.
"""
if hasattr(result, "get_markdown_documents"):
docs = result.get_markdown_documents(split_by_page=False)
if docs and hasattr(docs[0], "text"):
return docs[0].text
if hasattr(result, "pages") and result.pages:
return "\n\n".join(p.md for p in result.pages if hasattr(p, "md") and p.md)
if isinstance(result, list):
if result and hasattr(result[0], "text"):
return result[0].text
return "\n\n".join(
doc.page_content if hasattr(doc, "page_content") else str(doc)
for doc in result
)
return str(result)
async def parse_with_llamacloud(
file_path: str | os.PathLike,
*,
processing_mode: str = "basic",
estimated_pages: int = 50,
api_key: str | None = None,
) -> str:
"""Run LlamaParse on ``file_path`` and return the markdown content.
``api_key`` defaults to the ``LLAMA_CLOUD_API_KEY`` env var (set
in ``surfsense_evals/.env``).
Raises ``LlamaCloudError`` after exhausting retries; ``ValueError``
if the API key is missing.
"""
api_key = api_key or os.environ.get("LLAMA_CLOUD_API_KEY")
if not api_key:
raise ValueError(
"LLAMA_CLOUD_API_KEY must be set (see surfsense_evals/.env)."
)
parse_mode = _LLAMA_PARSE_MODE_MAP.get(processing_mode, "parse_page_with_llm")
# Lazy import: llama-cloud pulls llama-index-core (~50 MB) on first
# touch; defer until the parser actually runs.
from llama_cloud_services import LlamaParse
from llama_cloud_services.parse.base import JobFailedException
from llama_cloud_services.parse.utils import ResultType
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
# Match backend's per-page timeout heuristic so big PDFs don't drop
# mid-job: 60s baseline + 30s/page (premium agent runs longer than
# basic; both fit comfortably here).
job_timeout = max(180.0, 60.0 + 30.0 * estimated_pages)
upload_timeout = max(120.0, 30.0 * file_size_mb)
logger.info(
"LlamaCloud parsing %s (mode=%s, parse_mode=%s, %.1fMB, "
"job_timeout=%.0fs)",
file_path, processing_mode, parse_mode, file_size_mb, job_timeout,
)
custom_timeout = httpx.Timeout(
connect=120.0, read=upload_timeout, write=upload_timeout, pool=120.0,
)
last_exc: Exception | None = None
for attempt in range(1, _MAX_RETRIES + 1):
try:
async with httpx.AsyncClient(timeout=custom_timeout) as client:
parser = LlamaParse(
api_key=api_key,
num_workers=1,
verbose=False,
language="en",
result_type=ResultType.MD,
parse_mode=parse_mode,
ignore_errors=False,
max_timeout=int(max(2000.0, job_timeout + upload_timeout)),
job_timeout_in_seconds=job_timeout,
job_timeout_extra_time_per_page_in_seconds=60,
custom_client=client,
)
result = await parser.aparse(str(file_path))
content = _extract_markdown(result).strip()
if not content:
raise LlamaCloudError(
f"LlamaCloud returned empty content for {file_path}"
)
logger.info(
"LlamaCloud OK: %s (%s) -> %d chars",
file_path, parse_mode, len(content),
)
return content
except (
httpx.HTTPError,
JobFailedException,
RuntimeError,
) as exc:
last_exc = exc
if attempt < _MAX_RETRIES:
delay = min(_BASE_DELAY * (2 ** (attempt - 1)), _MAX_DELAY)
jitter = delay * 0.25 * (2 * random.random() - 1)
sleep_for = delay + jitter
logger.warning(
"LlamaCloud attempt %d/%d failed (%s); retrying in %.1fs",
attempt, _MAX_RETRIES, type(last_exc).__name__, sleep_for,
)
await asyncio.sleep(sleep_for)
raise LlamaCloudError(
f"LlamaCloud failed after {_MAX_RETRIES} attempts on {file_path}"
) from last_exc
__all__ = ["LlamaCloudError", "parse_with_llamacloud"]

View file

@ -0,0 +1,35 @@
"""Tiny pypdf wrapper for "how many pages does this PDF have?".
Used by ``parser_compare`` to:
* Decide LlamaCloud's per-page job timeout.
* Compute the SurfSense preprocessing dollar cost
(``$1 / 1k pages`` for basic, ``$10 / 1k pages`` for premium) so the
report can show "ingest + LLM" total cost per arm.
Returns ``0`` (and logs) on parse failure rather than raising costs
shown as ``?`` are always better than a benchmark that crashes mid-run.
"""
from __future__ import annotations
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
def count_pdf_pages(path: Path) -> int:
"""Return the page count for ``path``; ``0`` if pypdf can't open it."""
try:
from pypdf import PdfReader
reader = PdfReader(str(path))
return len(reader.pages)
except Exception as exc: # noqa: BLE001
logger.warning("Failed to count pages for %s: %s", path, exc)
return 0
__all__ = ["count_pdf_pages"]

View file

@ -0,0 +1,46 @@
"""parser_compare — six-way head-to-head on long multimodal PDFs.
Same 5 mmlongbench PDFs that ``mmlongbench`` already ingested
(``search_space_id=55``), one question per PDF for the smoke run.
The point of this benchmark is to disentangle TWO orthogonal
dimensions of "how good is our multimodal pipeline?":
1. **Parser quality** Azure DI prebuilt-read vs prebuilt-layout vs
LlamaParse parse_page_with_llm vs parse_page_with_agent. We run
each parser directly (bypassing ``/documents/fileupload`` because
the backend's parser routing is global, not per-call) and stuff the
resulting markdown into a long-context prompt.
2. **Context-management strategy** full-context stuffing (no chunk
selection, the model sees everything) vs SurfSense's agentic
retrieval over chunks of the same documents.
Six arms, all answered by ``anthropic/claude-sonnet-4.5``:
* ``native_pdf`` PDF attached natively via OpenRouter
(gold-standard reference).
* ``azure_basic_lc`` Azure DI ``prebuilt-read`` markdown stuffed
into the prompt.
* ``azure_premium_lc`` Azure DI ``prebuilt-layout`` markdown stuffed.
* ``llamacloud_basic_lc`` LlamaParse ``parse_page_with_llm`` markdown stuffed.
* ``llamacloud_premium_lc`` LlamaParse ``parse_page_with_agent`` markdown stuffed.
* ``surfsense_agentic`` SurfSense ``/api/v1/new_chat`` with
``mentioned_document_ids`` scoped to the
one source PDF, retrieving chunks from
the existing search_space=55 ingestion
(vision_llm=on, processing_mode=premium,
ETL_SERVICE=LLAMACLOUD with Azure DI
fallback effectively azure_premium).
The report includes preprocessing cost ($1 / 1k pages basic, $10 / 1k
pages premium) on top of the OpenRouter LLM cost so each arm's true
total-cost-per-question is directly comparable.
"""
from __future__ import annotations
from ....core import registry as _registry
from .runner import ParserCompareBenchmark
_registry.register(ParserCompareBenchmark())

View file

@ -0,0 +1,356 @@
"""parser_compare ingestion: pre-extract markdown 4 ways per PDF.
For each PDF in scope, we run all four (parser × mode) combinations
in parallel and persist the resulting markdown alongside the PDF:
data/multimodal_doc/parser_compare/extractions/
<doc_id>.azure_basic.md
<doc_id>.azure_premium.md
<doc_id>.llamacloud_basic.md
<doc_id>.llamacloud_premium.md
A manifest at ``maps/parser_compare_doc_map.jsonl`` records, per PDF:
* ``doc_id`` filename of the source PDF.
* ``pdf_path`` local cached PDF path.
* ``document_id`` SurfSense document id (carried over from
mmlongbench's existing ingestion so the
SurfSense agentic arm can scope retrieval).
* ``pages`` page count via pypdf (drives preprocessing cost).
* ``extractions`` map of ``arm_name -> {markdown_path, chars,
elapsed_s, status, error}``.
The runner reads this manifest, loads the markdown for each long-context
arm, and uses ``document_id`` for the SurfSense arm.
Source PDFs come from the existing mmlongbench ingestion no new
download or upload happens here. The point of this benchmark is
parser quality on the same physical PDFs SurfSense already has, so
re-using mmlongbench's PDF cache is correct.
"""
from __future__ import annotations
import asyncio
import json
import logging
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from ....core.config import set_suite_state
from ....core.parsers import (
AzureDIError,
LlamaCloudError,
count_pdf_pages,
parse_with_azure_di,
parse_with_llamacloud,
)
from ....core.registry import RunContext
logger = logging.getLogger(__name__)
# Order matters for the manifest only (deterministic JSONL diffs);
# the runner doesn't rely on it.
PARSER_ARMS: tuple[tuple[str, str, str], ...] = (
("azure_basic_lc", "azure", "basic"),
("azure_premium_lc", "azure", "premium"),
("llamacloud_basic_lc", "llamacloud", "basic"),
("llamacloud_premium_lc", "llamacloud", "premium"),
)
@dataclass
class ExtractionResult:
arm: str
parser: str
mode: str
markdown_path: Path | None = None
chars: int = 0
elapsed_s: float = 0.0
status: str = "ok" # "ok" | "failed"
error: str | None = None
def to_jsonl(self) -> dict[str, Any]:
return {
"arm": self.arm,
"parser": self.parser,
"mode": self.mode,
"markdown_path": str(self.markdown_path) if self.markdown_path else None,
"chars": self.chars,
"elapsed_s": round(self.elapsed_s, 2),
"status": self.status,
"error": self.error,
}
@dataclass
class PdfManifestRow:
doc_id: str
pdf_path: Path
document_id: int | None
pages: int
extractions: dict[str, ExtractionResult] = field(default_factory=dict)
def to_jsonl(self) -> dict[str, Any]:
return {
"doc_id": self.doc_id,
"pdf_path": str(self.pdf_path),
"document_id": self.document_id,
"pages": self.pages,
"extractions": {
arm: ext.to_jsonl() for arm, ext in self.extractions.items()
},
}
# ---------------------------------------------------------------------------
# Single-PDF extraction
# ---------------------------------------------------------------------------
async def _run_one_extraction(
pdf_path: Path,
*,
parser: str,
mode: str,
out_path: Path,
estimated_pages: int,
) -> tuple[str, float]:
"""Invoke the requested parser, persist markdown, return (markdown, elapsed_s)."""
started = time.monotonic()
if parser == "azure":
markdown = await parse_with_azure_di(pdf_path, processing_mode=mode)
elif parser == "llamacloud":
markdown = await parse_with_llamacloud(
pdf_path, processing_mode=mode, estimated_pages=estimated_pages,
)
else:
raise ValueError(f"Unknown parser {parser!r}")
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(markdown, encoding="utf-8")
return markdown, time.monotonic() - started
async def _extract_one_pdf(
pdf_path: Path,
*,
extractions_dir: Path,
force_reextract: bool,
) -> dict[str, ExtractionResult]:
"""Run all four parser combos for ``pdf_path``, returning per-arm results.
Re-uses any cached ``.md`` already on disk unless ``force_reextract``.
The four parser invocations run concurrently they're independent
HTTP-bound jobs and the providers don't share state.
"""
estimated_pages = count_pdf_pages(pdf_path) or 50
out: dict[str, ExtractionResult] = {}
coros = []
arm_specs: list[tuple[str, str, str, Path]] = []
for arm_name, parser, mode in PARSER_ARMS:
out_path = extractions_dir / f"{pdf_path.stem}.{arm_name}.md"
arm_specs.append((arm_name, parser, mode, out_path))
if out_path.exists() and not force_reextract:
cached = out_path.read_text(encoding="utf-8")
out[arm_name] = ExtractionResult(
arm=arm_name,
parser=parser,
mode=mode,
markdown_path=out_path,
chars=len(cached),
elapsed_s=0.0,
status="ok",
error="(cached)",
)
logger.info(
"Cached extraction reused: %s (%d chars)", out_path.name, len(cached),
)
coros.append(_noop())
else:
coros.append(
_run_one_extraction(
pdf_path,
parser=parser, mode=mode,
out_path=out_path,
estimated_pages=estimated_pages,
)
)
results = await asyncio.gather(*coros, return_exceptions=True)
for (arm_name, parser, mode, out_path), result in zip(arm_specs, results, strict=True):
if arm_name in out:
continue # cached — already populated above
if isinstance(result, Exception):
err = result
err_msg = f"{type(err).__name__}: {err}"
logger.warning(
"Extraction FAILED for %s [%s/%s]: %s",
pdf_path.name, parser, mode, err_msg,
)
out[arm_name] = ExtractionResult(
arm=arm_name, parser=parser, mode=mode,
status="failed", error=err_msg,
)
else:
markdown, elapsed = result
out[arm_name] = ExtractionResult(
arm=arm_name, parser=parser, mode=mode,
markdown_path=out_path,
chars=len(markdown),
elapsed_s=elapsed,
status="ok",
)
return out
async def _noop() -> tuple[str, float]:
"""Placeholder so cached entries align with parallel gather indexing."""
return ("", 0.0)
# ---------------------------------------------------------------------------
# Public entry point
# ---------------------------------------------------------------------------
def _read_existing_mmlongbench_map(map_path: Path) -> list[dict[str, Any]]:
"""Read the mmlongbench doc map (skipping its ``__settings__`` header)."""
if not map_path.exists():
raise RuntimeError(
f"mmlongbench doc map not found at {map_path}. Run "
"`python -m surfsense_evals ingest multimodal_doc mmlongbench` first."
)
rows: list[dict[str, Any]] = []
with map_path.open("r", encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if not line:
continue
row = json.loads(line)
if "__settings__" in row:
continue
rows.append(row)
return rows
async def run_ingest(
ctx: RunContext,
*,
docs_filter: list[str] | None = None,
max_docs: int | None = None,
force_reextract: bool = False,
pdf_concurrency: int = 2,
) -> None:
"""Pre-extract all four parser markdowns for each PDF.
Parameters
----------
docs_filter : list[str] | None
Specific filenames to extract (default: all PDFs from
mmlongbench's existing manifest).
max_docs : int | None
Cap on number of PDFs to process. Default: all.
force_reextract : bool
Re-call parsers even if a cached ``.md`` already exists. Off
by default extractions are deterministic and parser calls
cost real money.
pdf_concurrency : int
How many PDFs to extract in parallel. Each PDF triggers four
parser HTTP calls, so total in-flight = 4 * pdf_concurrency.
Default 2 keeps us comfortably under both Azure DI and
LlamaCloud per-IP rate limits.
"""
# Pull the source PDFs and document_ids from mmlongbench's existing
# ingestion. parser_compare doesn't re-upload; SurfSense's agentic
# arm queries the same search_space=55 chunks.
mmlb_map = ctx.suite_state.ingestion_maps.get("mmlongbench")
if not mmlb_map:
raise RuntimeError(
"Suite state has no mmlongbench ingestion map. Run "
"`python -m surfsense_evals ingest multimodal_doc mmlongbench` first "
"so parser_compare can re-use those PDFs."
)
src_rows = _read_existing_mmlongbench_map(Path(mmlb_map))
rows_in_scope = src_rows
if docs_filter:
wanted = set(docs_filter)
rows_in_scope = [r for r in rows_in_scope if r["doc_id"] in wanted]
if max_docs is not None and max_docs > 0:
rows_in_scope = rows_in_scope[:max_docs]
if not rows_in_scope:
raise RuntimeError(
"No PDFs in scope for parser_compare. Check --docs / --max-docs."
)
bench_dir = ctx.benchmark_data_dir()
extractions_dir = bench_dir / "extractions"
extractions_dir.mkdir(parents=True, exist_ok=True)
sem = asyncio.Semaphore(max(1, pdf_concurrency))
manifest_rows: list[PdfManifestRow] = []
async def _process(row: dict[str, Any]) -> PdfManifestRow:
pdf_path = Path(row["pdf_path"])
async with sem:
extractions = await _extract_one_pdf(
pdf_path,
extractions_dir=extractions_dir,
force_reextract=force_reextract,
)
return PdfManifestRow(
doc_id=str(row["doc_id"]),
pdf_path=pdf_path,
document_id=row.get("document_id"),
pages=count_pdf_pages(pdf_path),
extractions=extractions,
)
logger.info(
"parser_compare: extracting %d PDFs x 4 parsers (concurrency=%d)",
len(rows_in_scope), pdf_concurrency,
)
manifest_rows = await asyncio.gather(*(_process(r) for r in rows_in_scope))
# Persist manifest
map_path = ctx.maps_dir() / "parser_compare_doc_map.jsonl"
with map_path.open("w", encoding="utf-8") as fh:
for mr in manifest_rows:
fh.write(json.dumps(mr.to_jsonl()) + "\n")
logger.info("parser_compare manifest -> %s", map_path)
# Update suite state so the runner can find us via
# ctx.suite_state.ingestion_maps.
new_state = ctx.suite_state
new_state.ingestion_maps["parser_compare"] = str(map_path)
set_suite_state(ctx.config, ctx.suite, new_state)
# Quick summary log
total_extractions = sum(len(mr.extractions) for mr in manifest_rows)
failures = sum(
1 for mr in manifest_rows for ext in mr.extractions.values()
if ext.status != "ok"
)
logger.info(
"parser_compare ingest done: %d PDFs, %d extractions, %d failures",
len(manifest_rows), total_extractions, failures,
)
__all__ = [
"ExtractionResult",
"PARSER_ARMS",
"PdfManifestRow",
"run_ingest",
]

View file

@ -0,0 +1,120 @@
"""Prompt templates for the three input modalities in parser_compare.
We deliberately reuse the *same* core question framing as
``mmlongbench/prompt.py`` so byte-identical questions reach all six
arms; only the document delivery channel changes.
Three templates:
* ``build_native_pdf_prompt`` bare question + format hint.
The PDF rides as a separate file
part (``NativePdfArm`` handles it).
* ``build_long_context_prompt`` question + format hint + the
parser-extracted markdown wrapped
in fenced ``<document>`` tags so
the model can clearly delimit
"context" from "instruction".
* ``build_surfsense_prompt`` bare question + format hint
(chunks come from RAG retrieval,
not from the prompt).
The ``<document>`` tag is doc-aware: even though parser_compare runs
one PDF per question today, we keep the wrapper plural so this is
trivial to extend to multi-doc later.
"""
from __future__ import annotations
# ---------------------------------------------------------------------------
# Per-format hint blocks (same lookup as mmlongbench/prompt.py)
# ---------------------------------------------------------------------------
_FORMAT_HINTS: dict[str, str] = {
"str": (
"Respond with the answer as a short phrase, no full sentence. "
"Format your final line as `Answer: <text>`."
),
"int": (
"Respond with a single integer only. "
"Format your final line as `Answer: <integer>`."
),
"float": (
"Respond with a single decimal number only (no units). "
"Format your final line as `Answer: <number>`."
),
"list": (
"Respond with a comma-separated list of items, no extra text. "
"Format your final line as `Answer: item1, item2, item3`."
),
"none": (
"If the answer cannot be determined from the document, say so explicitly. "
"Format your final line as `Answer: Not answerable`."
),
}
def _format_hint(answer_format: str) -> str:
fmt = (answer_format or "str").strip().lower()
return _FORMAT_HINTS.get(fmt, _FORMAT_HINTS["str"])
_BASE_INSTRUCTION = (
"You are a document-understanding assistant. Use ONLY the provided "
"document to answer the question. The document may contain text, "
"tables, charts, figures, and images. If the answer is in a chart "
"or image, read it carefully. Do not use external knowledge."
)
def build_native_pdf_prompt(question: str, *, answer_format: str) -> str:
"""Prompt for ``NativePdfArm`` — PDF attached separately as a file part."""
return (
f"{_BASE_INSTRUCTION}\n\n"
f"Question: {question.strip()}\n\n"
f"{_format_hint(answer_format)}\n"
)
def build_surfsense_prompt(question: str, *, answer_format: str) -> str:
"""Prompt for ``SurfSenseArm`` — chunks retrieved by the agent."""
# SurfSense's agent already injects retrieved chunks via its tool
# loop; the prompt only carries the user-visible question + format
# hint, mirroring how a human asks the SurfSense UI.
return (
f"{_BASE_INSTRUCTION}\n\n"
f"Question: {question.strip()}\n\n"
f"{_format_hint(answer_format)}\n"
)
def build_long_context_prompt(
question: str,
*,
answer_format: str,
document_markdown: str,
document_label: str,
) -> str:
"""Prompt for the four long-context arms — markdown stuffed inline.
``document_label`` is a short human-readable name (e.g. the PDF
filename) so the model can reason about source provenance even
though only one document is in scope.
"""
return (
f"{_BASE_INSTRUCTION}\n\n"
f"<document name=\"{document_label}\">\n"
f"{document_markdown.strip()}\n"
f"</document>\n\n"
f"Question: {question.strip()}\n\n"
f"{_format_hint(answer_format)}\n"
)
__all__ = [
"build_long_context_prompt",
"build_native_pdf_prompt",
"build_surfsense_prompt",
]

View file

@ -0,0 +1,797 @@
"""parser_compare runner — six-arm head-to-head on n shared questions.
For each (PDF, question) pair we issue six LLM calls (all sonnet 4.5):
* ``native_pdf`` PDF attached natively.
* ``azure_basic_lc`` Azure prebuilt-read markdown stuffed.
* ``azure_premium_lc`` Azure prebuilt-layout markdown stuffed.
* ``llamacloud_basic_lc`` LlamaParse parse_page_with_llm markdown stuffed.
* ``llamacloud_premium_lc`` LlamaParse parse_page_with_agent markdown stuffed.
* ``surfsense_agentic`` SurfSense /api/v1/new_chat retrieval over chunks.
Cost reporting:
* ``llm_cost_per_q`` mean OpenRouter ``usage.cost`` reported by
the chat-completions API. Zero for the
SurfSense agentic arm because the SSE
stream doesn't surface per-call cost yet
(a known gap; we annotate it in the
report rather than estimating).
* ``preprocess_cost_total`` pages * $/1k according to the user's
tariff:
* basic : $1 / 1k pages
* premium : $10 / 1k pages
* native_pdf : $0 (no preprocessing)
* surfsense_agentic : $10 / 1k pages
(existing mmlongbench ingest used
processing_mode=premium with Azure DI).
* ``preprocess_cost_per_q`` preprocess_cost_total / n_questions.
* ``total_cost_per_q`` llm_cost_per_q + preprocess_cost_per_q.
The grader is reused from ``mmlongbench/grader.py`` (deterministic,
format-aware) so the metric is directly comparable to the existing
mmlongbench runs.
"""
from __future__ import annotations
import argparse
import asyncio
import json
import logging
import os
from collections.abc import Iterable
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from ....core.arms import (
ArmRequest,
ArmResult,
BareLlmArm,
NativePdfArm,
SurfSenseArm,
)
from ....core.config import utc_iso_timestamp
from ....core.metrics.mc_accuracy import accuracy_with_wilson_ci
from ....core.parse.freeform_answer import extract_freeform_answer
from ....core.providers.openrouter_chat import OpenRouterChatProvider
from ....core.providers.openrouter_pdf import OpenRouterPdfProvider, PdfEngine
from ....core.registry import ReportSection, RunArtifact, RunContext
from ..mmlongbench.grader import GradeResult, grade
from .ingest import PARSER_ARMS
from .prompt import (
build_long_context_prompt,
build_native_pdf_prompt,
build_surfsense_prompt,
)
logger = logging.getLogger(__name__)
# Cost tariff (per the user's spec: $1 / 1k pages basic, $10 / 1k pages premium).
# Held as dollars-per-page so per-PDF math is a pure multiply.
PREPROCESS_USD_PER_PAGE = {
"basic": 1.0 / 1000.0,
"premium": 10.0 / 1000.0,
}
ARM_NAMES = (
"native_pdf",
"azure_basic_lc",
"azure_premium_lc",
"llamacloud_basic_lc",
"llamacloud_premium_lc",
"surfsense_agentic",
)
# What ingest mode each LC arm corresponds to (drives preprocess cost).
_LC_ARM_MODE: dict[str, str] = {
"azure_basic_lc": "basic",
"azure_premium_lc": "premium",
"llamacloud_basic_lc": "basic",
"llamacloud_premium_lc": "premium",
}
# The SurfSense agentic arm is fed by the existing mmlongbench
# ingestion. That ingestion was performed with vision_llm=on and
# processing_mode=premium, and the backend's ETL routes premium-mode
# PDFs through Azure DI prebuilt-layout when AZURE_DI_KEY is set. So
# the preprocessing cost is the premium tariff.
SURFSENSE_INGEST_MODE = "premium"
# ---------------------------------------------------------------------------
# Question + PDF row shapes
# ---------------------------------------------------------------------------
@dataclass
class PCQuestion:
qid: str
doc_id: str
question: str
gold_answer: str
answer_format: str
pdf_path: Path
document_id: int | None
pages: int
extractions: dict[str, Path] # arm_name -> markdown path (only successes)
def _read_doc_map(map_path: Path) -> dict[str, dict[str, Any]]:
out: dict[str, dict[str, Any]] = {}
with map_path.open("r", encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if not line:
continue
row = json.loads(line)
out[str(row["doc_id"])] = row
return out
def _select_questions(
questions_jsonl: Path,
doc_map: dict[str, dict[str, Any]],
*,
docs_filter: list[str] | None,
sample_per_doc: int,
skip_unanswerable: bool,
skip_format: list[str] | None,
) -> list[PCQuestion]:
"""Pick the first ``sample_per_doc`` questions per PDF in scope.
Defaults to one per PDF (n=5 across 5 PDFs 5 questions). Filters
out unanswerable probes by default since they're noise at small n.
"""
out: list[PCQuestion] = []
per_doc_taken: dict[str, int] = {}
per_doc_idx: dict[str, int] = {}
skip_format_set = {f.lower() for f in (skip_format or [])}
with questions_jsonl.open("r", encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if not line:
continue
row = json.loads(line)
doc_id = str(row.get("doc_id") or "")
if not doc_id:
continue
if docs_filter and doc_id not in docs_filter:
continue
map_row = doc_map.get(doc_id)
if map_row is None:
continue
answer_format = str(row.get("answer_format") or "").strip().lower()
idx = per_doc_idx.get(doc_id, 0)
per_doc_idx[doc_id] = idx + 1
if skip_unanswerable and answer_format == "none":
continue
if answer_format in skip_format_set:
continue
if per_doc_taken.get(doc_id, 0) >= sample_per_doc:
continue
extractions: dict[str, Path] = {}
for arm_name, ext_blob in (map_row.get("extractions") or {}).items():
if ext_blob.get("status") == "ok" and ext_blob.get("markdown_path"):
extractions[arm_name] = Path(ext_blob["markdown_path"])
out.append(PCQuestion(
qid=f"{doc_id}::Q{idx:03d}",
doc_id=doc_id,
question=str(row.get("question") or "").strip(),
gold_answer=str(row.get("answer") or "").strip(),
answer_format=answer_format,
pdf_path=Path(map_row["pdf_path"]),
document_id=map_row.get("document_id"),
pages=int(map_row.get("pages", 0)),
extractions=extractions,
))
per_doc_taken[doc_id] = per_doc_taken.get(doc_id, 0) + 1
out.sort(key=lambda q: (q.doc_id, q.qid))
return out
# ---------------------------------------------------------------------------
# Bounded concurrency helper
# ---------------------------------------------------------------------------
async def _gather_with_limit(coros: Iterable, *, concurrency: int) -> list[Any]:
sem = asyncio.Semaphore(max(1, concurrency))
async def _wrap(coro):
async with sem:
return await coro
return await asyncio.gather(*(_wrap(c) for c in coros))
# ---------------------------------------------------------------------------
# Benchmark
# ---------------------------------------------------------------------------
_DESCRIPTION = (
"parser_compare — 6-arm head-to-head on shared MMLongBench-Doc PDFs: "
"native PDF + (Azure DI / LlamaCloud) x (basic / premium) long-context "
"stuffing + SurfSense agentic retrieval. Reports preprocessing dollars "
"($1 / 1k pages basic, $10 / 1k pages premium) on top of LLM cost."
)
class ParserCompareBenchmark:
"""6-arm parser + agentic-vs-non-agentic head-to-head."""
suite: str = "multimodal_doc"
name: str = "parser_compare"
headline: bool = True
description: str = _DESCRIPTION
# ------------------------------------------------------------------
# CLI flags
# ------------------------------------------------------------------
def add_run_args(self, parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--docs", default=None,
help="Comma-separated doc_ids to include (default: all in manifest).",
)
parser.add_argument(
"--sample-per-doc", type=int, default=1,
help="Take the first N answerable questions per PDF (default 1).",
)
parser.add_argument(
"--skip-unanswerable", dest="skip_unanswerable",
action="store_true", default=True,
help="Drop 'None' format probes (default true; we want signal not "
"hallucination probes for n=5).",
)
parser.add_argument(
"--include-unanswerable", dest="skip_unanswerable",
action="store_false",
help="Override --skip-unanswerable; include unanswerable probes too.",
)
parser.add_argument(
"--skip-format", default=None,
help="Comma-separated answer_format values to skip (e.g. 'none,float').",
)
parser.add_argument(
"--concurrency", type=int, default=2,
help="Parallel question workers per arm (default 2).",
)
parser.add_argument(
"--no-mentions", dest="no_mentions", action="store_true",
help="SurfSense arm: skip mentioned_document_ids (full-corpus retrieval).",
)
parser.add_argument(
"--pdf-engine", default="native",
choices=[e.value for e in PdfEngine],
help="OpenRouter file-parser engine for native_pdf arm.",
)
parser.add_argument(
"--max-output-tokens", type=int, default=512,
help="Cap on completion length for every arm.",
)
parser.add_argument(
"--llm-model", default="anthropic/claude-sonnet-4.5",
help="OpenRouter slug used by the 5 OpenRouter-driven arms. "
"SurfSense arm uses whatever provider_model is pinned on the suite.",
)
parser.add_argument(
"--skip-arms", default=None,
help="Comma-separated arm names to skip (e.g. 'llamacloud_premium_lc').",
)
# Ingest-only flags (forwarded by the CLI to ingest.run_ingest).
parser.add_argument(
"--max-docs", type=int, default=None,
help="(ingest only) cap number of unique PDFs to process.",
)
parser.add_argument(
"--force-reextract", action="store_true",
help="(ingest only) re-call parsers even if cached .md exists.",
)
parser.add_argument(
"--pdf-concurrency", type=int, default=2,
help="(ingest only) parallel PDFs (each fans out to 4 parsers).",
)
# ------------------------------------------------------------------
# Lifecycle: ingest delegates to .ingest.run_ingest
# ------------------------------------------------------------------
async def ingest(self, ctx: RunContext, **opts: Any) -> None:
from .ingest import run_ingest
docs_raw: str | None = opts.get("docs")
docs_filter = (
[d.strip() for d in docs_raw.split(",") if d.strip()] if docs_raw else None
)
await run_ingest(
ctx,
docs_filter=docs_filter,
max_docs=opts.get("max_docs"),
force_reextract=bool(opts.get("force_reextract", False)),
pdf_concurrency=int(opts.get("pdf_concurrency") or 2),
)
# ------------------------------------------------------------------
# Run
# ------------------------------------------------------------------
async def run(self, ctx: RunContext, **opts: Any) -> RunArtifact:
docs_raw: str | None = opts.get("docs")
docs_filter = (
[d.strip() for d in docs_raw.split(",") if d.strip()] if docs_raw else None
)
sample_per_doc = int(opts.get("sample_per_doc") or 1)
skip_unanswerable = bool(opts.get("skip_unanswerable", True))
skip_format_raw: str | None = opts.get("skip_format")
skip_format = (
[f.strip() for f in skip_format_raw.split(",") if f.strip()]
if skip_format_raw else None
)
concurrency = int(opts.get("concurrency") or 2)
no_mentions = bool(opts.get("no_mentions"))
pdf_engine_name = opts.get("pdf_engine") or "native"
max_output_tokens = int(opts.get("max_output_tokens") or 512)
llm_model = str(opts.get("llm_model") or "anthropic/claude-sonnet-4.5")
skip_arms_raw: str | None = opts.get("skip_arms")
skip_arms = (
{a.strip() for a in skip_arms_raw.split(",") if a.strip()}
if skip_arms_raw else set()
)
active_arms = [a for a in ARM_NAMES if a not in skip_arms]
if not active_arms:
raise RuntimeError("All arms skipped; nothing to run.")
bench_dir = ctx.benchmark_data_dir()
# parser_compare reuses mmlongbench's questions.jsonl (already
# downloaded by `ingest multimodal_doc mmlongbench`).
questions_jsonl = bench_dir.parent / "mmlongbench" / "questions.jsonl"
map_path = ctx.maps_dir() / "parser_compare_doc_map.jsonl"
if not questions_jsonl.exists():
raise RuntimeError(
"Missing mmlongbench questions at "
f"{questions_jsonl}. Run "
"`python -m surfsense_evals ingest multimodal_doc mmlongbench` first."
)
if not map_path.exists():
raise RuntimeError(
"parser_compare doc map missing. Run "
"`python -m surfsense_evals ingest multimodal_doc parser_compare` first."
)
doc_map = _read_doc_map(map_path)
questions = _select_questions(
questions_jsonl, doc_map,
docs_filter=docs_filter,
sample_per_doc=sample_per_doc,
skip_unanswerable=skip_unanswerable,
skip_format=skip_format,
)
if not questions:
raise RuntimeError(
"No questions matched filters; broaden --docs / --skip-format."
)
logger.info(
"parser_compare: scheduled %d questions across %d arms (%s)",
len(questions), len(active_arms), ",".join(active_arms),
)
api_key = os.environ.get("OPENROUTER_API_KEY")
if not api_key:
raise RuntimeError("OPENROUTER_API_KEY env var is required.")
# Build arms
arms: dict[str, Any] = {}
if "native_pdf" in active_arms:
native_provider = OpenRouterPdfProvider(
api_key=api_key, base_url=ctx.config.openrouter_base_url,
model=llm_model, engine=PdfEngine(pdf_engine_name),
)
arms["native_pdf"] = NativePdfArm(
provider=native_provider, max_output_tokens=max_output_tokens,
)
for arm_name, _, _ in PARSER_ARMS:
if arm_name in active_arms:
lc_provider = OpenRouterChatProvider(
api_key=api_key, base_url=ctx.config.openrouter_base_url,
model=llm_model,
)
arms[arm_name] = BareLlmArm(
provider=lc_provider,
max_output_tokens=max_output_tokens,
name=arm_name,
)
if "surfsense_agentic" in active_arms:
surf = SurfSenseArm(
client=ctx.new_chat_client(),
search_space_id=ctx.search_space_id,
ephemeral_threads=True,
)
# Override the default "surfsense" name so the metrics
# bucket lines up with the rest of parser_compare's arms.
surf.name = "surfsense_agentic"
arms["surfsense_agentic"] = surf
run_timestamp = utc_iso_timestamp()
run_dir = ctx.runs_dir(run_timestamp=run_timestamp)
raw_path = run_dir / "raw.jsonl"
# ---- per-arm answer coroutine helpers ----
def _native_req(q: PCQuestion) -> ArmRequest:
return ArmRequest(
question_id=q.qid,
prompt=build_native_pdf_prompt(q.question, answer_format=q.answer_format),
pdf_paths=[q.pdf_path],
options={"max_tokens": max_output_tokens},
)
def _lc_req(q: PCQuestion, arm_name: str) -> ArmRequest:
md_path = q.extractions.get(arm_name)
if md_path is None or not md_path.exists():
raise FileNotFoundError(
f"Missing extraction for {arm_name} on {q.doc_id}"
)
markdown = md_path.read_text(encoding="utf-8")
return ArmRequest(
question_id=q.qid,
prompt=build_long_context_prompt(
q.question,
answer_format=q.answer_format,
document_markdown=markdown,
document_label=q.doc_id,
),
)
def _surf_req(q: PCQuestion) -> ArmRequest:
mentions: list[int] | None = None
if not no_mentions and q.document_id is not None:
mentions = [int(q.document_id)]
return ArmRequest(
question_id=q.qid,
prompt=build_surfsense_prompt(q.question, answer_format=q.answer_format),
mentioned_document_ids=mentions,
)
async def _answer_one(arm_name: str, q: PCQuestion) -> ArmResult:
arm = arms[arm_name]
try:
if arm_name == "native_pdf":
return await arm.answer(_native_req(q))
if arm_name == "surfsense_agentic":
return await arm.answer(_surf_req(q))
return await arm.answer(_lc_req(q, arm_name))
except FileNotFoundError as exc:
return ArmResult(
arm=arm_name,
question_id=q.qid,
raw_text="",
error=f"FileNotFoundError: {exc}",
)
# Run all arms in parallel (each arm bounded by `concurrency`).
per_arm_tasks: dict[str, list] = {
arm_name: [_answer_one(arm_name, q) for q in questions]
for arm_name in active_arms
}
per_arm_results: dict[str, list[ArmResult]] = {}
gathered = await asyncio.gather(*[
_gather_with_limit(per_arm_tasks[arm_name], concurrency=concurrency)
for arm_name in active_arms
])
for arm_name, results in zip(active_arms, gathered, strict=True):
per_arm_results[arm_name] = results
# Grade
per_arm_grades: dict[str, list[GradeResult]] = {}
for arm_name in active_arms:
per_arm_grades[arm_name] = [
grade(
pred=extract_freeform_answer(r.raw_text or ""),
gold=q.gold_answer,
answer_format=q.answer_format,
)
for q, r in zip(questions, per_arm_results[arm_name], strict=True)
]
# Persist raw.jsonl
with raw_path.open("w", encoding="utf-8") as fh:
for i, q in enumerate(questions):
base = {
"qid": q.qid,
"doc_id": q.doc_id,
"answer_format": q.answer_format,
"gold": q.gold_answer,
"pages": q.pages,
"document_id": q.document_id,
}
for arm_name in active_arms:
res = per_arm_results[arm_name][i]
g = per_arm_grades[arm_name][i]
fh.write(json.dumps({
**base,
**res.to_jsonl(),
"graded": {
"correct": g.correct,
"f1": g.f1,
"method": g.method,
"normalised_pred": g.normalised_pred,
"normalised_gold": g.normalised_gold,
},
}) + "\n")
# Aggregate per-arm metrics + cost
metrics = _compute_metrics(
questions, per_arm_results, per_arm_grades, active_arms,
)
artifact = RunArtifact(
suite=self.suite,
benchmark=self.name,
run_timestamp=run_timestamp,
raw_path=raw_path,
metrics=metrics,
extra={
"n_questions": len(questions),
"n_pdfs": len({q.doc_id for q in questions}),
"active_arms": list(active_arms),
"concurrency": concurrency,
"no_mentions": no_mentions,
"pdf_engine": pdf_engine_name,
"llm_model": llm_model,
"scenario": ctx.scenario,
"provider_model": ctx.provider_model,
"vision_provider_model": ctx.vision_provider_model,
"agent_llm_id": ctx.agent_llm_id,
"preprocess_tariff": {
"basic_per_1k_pages": 1.0,
"premium_per_1k_pages": 10.0,
},
},
)
manifest_path = run_dir / "run_artifact.json"
manifest_path.write_text(
json.dumps({
"suite": self.suite,
"benchmark": self.name,
"raw_path": "raw.jsonl",
"metrics": metrics,
"extra": artifact.extra,
}, indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
return artifact
# ------------------------------------------------------------------
# Report
# ------------------------------------------------------------------
def report_section(self, artifacts: list[RunArtifact]) -> ReportSection:
if not artifacts:
return ReportSection(
title="Parser × agent-vs-stuffing comparison",
headline=True,
body_md="(no run artifacts found)",
body_json={},
)
latest = max(artifacts, key=lambda a: a.run_timestamp)
m = latest.metrics
extra = latest.extra
per_arm = m.get("per_arm", {})
active_arms = list(extra.get("active_arms", per_arm.keys()))
n_q = extra.get("n_questions", "?")
n_pdfs = extra.get("n_pdfs", "?")
body: list[str] = []
body.append(
f"- Sample size: **{n_q} questions across {n_pdfs} PDFs** "
f"(LLM: `{extra.get('llm_model', '?')}`, "
f"engine: `{extra.get('pdf_engine', 'native')}`)."
)
body.append(
f"- Preprocess tariff: basic = $1 / 1k pages, "
f"premium = $10 / 1k pages."
)
body.append("")
body.append("### Per-arm summary")
body.append("")
body.append(
"| Arm | Accuracy | F1 mean | LLM $/Q | Preprocess $ total | Total $/Q | Latency p50 |"
)
body.append("|---|---:|---:|---:|---:|---:|---:|")
for arm_name in active_arms:
row = per_arm.get(arm_name)
if not row:
body.append(f"| `{arm_name}` | (no data) | | | | | |")
continue
body.append(
f"| `{arm_name}` "
f"| {row['accuracy']*100:.1f}% "
f"({row['n_correct']}/{row['n']}) "
f"| {row['f1_mean']*100:.1f}% "
f"| ${row['llm_cost_per_q']:.4f} "
f"| ${row['preprocess_cost_total']:.4f} "
f"| ${row['total_cost_per_q']:.4f} "
f"| {row['latency_ms_median']/1000:.1f}s |"
)
body.append("")
# Notes / caveats
body.append("### Notes")
body.append("")
body.append(
"- `surfsense_agentic` LLM cost shows as $0.0000 because the "
"`/api/v1/new_chat` SSE stream does not surface per-call token "
"or cost yet (a known instrumentation gap). Preprocessing cost "
"is the premium tariff because the underlying mmlongbench "
"ingestion was performed with `processing_mode=premium` + "
"`vision_llm=on` + Azure DI."
)
body.append(
"- Long-context arms include the **same PDF text** for every "
"question against that PDF, so the OpenRouter input cost is "
"dominated by markdown size; preprocessing cost is paid once "
"across all questions sharing a PDF."
)
body.append(
"- Preprocessing $ total is computed as "
"`pages_processed_per_arm × tariff`, summed across the unique "
"PDFs in scope. With one question per PDF (n=5), preprocess $ "
"= preprocess $ / Q."
)
if extra.get("scenario"):
body.append(
f"- Scenario: `{extra.get('scenario')}` "
f"(suite-pinned `provider_model`: "
f"`{extra.get('provider_model', '?')}`)."
)
# Per-PDF breakdown if useful
per_pdf = m.get("per_pdf", {})
if per_pdf:
body.append("")
body.append("### Per-PDF correctness")
body.append("")
header = "| Doc | Pages | " + " | ".join(f"`{a}`" for a in active_arms) + " |"
sep = "|---|---:|" + "|".join(":---:" for _ in active_arms) + "|"
body.append(header)
body.append(sep)
for doc_id, info in sorted(per_pdf.items()):
row_cells = []
for arm_name in active_arms:
g = info.get("arms", {}).get(arm_name, {})
if not g:
row_cells.append("?")
else:
row_cells.append("" if g.get("correct") else "")
body.append(
f"| `{doc_id}` | {info.get('pages', '?')} | "
+ " | ".join(row_cells) + " |"
)
return ReportSection(
title="Parser × agent-vs-stuffing — long PDFs (sonnet 4.5)",
headline=True,
body_md="\n".join(body),
body_json=m,
)
# ---------------------------------------------------------------------------
# Metrics
# ---------------------------------------------------------------------------
def _compute_metrics(
questions: list[PCQuestion],
per_arm_results: dict[str, list[ArmResult]],
per_arm_grades: dict[str, list[GradeResult]],
active_arms: Iterable[str],
) -> dict[str, Any]:
"""Aggregate per-arm metrics + the user's preprocessing cost overlay."""
# Sum unique PDF pages — preprocessing pays per unique PDF, not per question.
pdf_pages: dict[str, int] = {}
for q in questions:
pdf_pages.setdefault(q.doc_id, q.pages)
per_arm: dict[str, dict[str, Any]] = {}
for arm_name in active_arms:
results = per_arm_results[arm_name]
grades = per_arm_grades[arm_name]
n = len(grades)
n_correct = sum(1 for g in grades if g.correct)
f1_sum = sum(g.f1 for g in grades)
acc_with_ci = accuracy_with_wilson_ci(n_correct, n)
# LLM cost: sum of per-call cost_micros across questions, then average.
cost_micros_total = sum(int(r.cost_micros or 0) for r in results)
llm_cost_per_q = (cost_micros_total / 1_000_000.0) / n if n else 0.0
# Preprocessing cost depends on which mode this arm corresponds to.
if arm_name == "native_pdf":
preprocess_per_page = 0.0
preprocess_label = "n/a (PDF attached natively)"
elif arm_name in _LC_ARM_MODE:
mode = _LC_ARM_MODE[arm_name]
preprocess_per_page = PREPROCESS_USD_PER_PAGE[mode]
preprocess_label = f"{mode} tier ($/{mode}/page = ${preprocess_per_page:.4f})"
elif arm_name == "surfsense_agentic":
preprocess_per_page = PREPROCESS_USD_PER_PAGE[SURFSENSE_INGEST_MODE]
preprocess_label = (
f"{SURFSENSE_INGEST_MODE} tier (ingested by SurfSense at "
f"processing_mode=premium + vision_llm=on)"
)
else:
preprocess_per_page = 0.0
preprocess_label = "unknown"
preprocess_cost_total = sum(
pages * preprocess_per_page for pages in pdf_pages.values()
)
preprocess_cost_per_q = preprocess_cost_total / n if n else 0.0
total_cost_per_q = llm_cost_per_q + preprocess_cost_per_q
latencies = sorted(int(r.latency_ms or 0) for r in results)
latency_median = latencies[len(latencies) // 2] if latencies else 0
latency_p95 = latencies[int(len(latencies) * 0.95)] if len(latencies) >= 20 else (
latencies[-1] if latencies else 0
)
in_tokens = [int(r.input_tokens or 0) for r in results]
out_tokens = [int(r.output_tokens or 0) for r in results]
per_arm[arm_name] = {
**acc_with_ci.to_dict(),
"n": n,
"n_correct": n_correct,
"f1_mean": f1_sum / n if n else 0.0,
"llm_cost_per_q": llm_cost_per_q,
"preprocess_per_page_usd": preprocess_per_page,
"preprocess_cost_total": preprocess_cost_total,
"preprocess_cost_per_q": preprocess_cost_per_q,
"total_cost_per_q": total_cost_per_q,
"preprocess_label": preprocess_label,
"latency_ms_median": latency_median,
"latency_ms_p95": latency_p95,
"input_tokens_mean": (sum(in_tokens) / len(in_tokens)) if in_tokens else 0.0,
"output_tokens_mean": (sum(out_tokens) / len(out_tokens)) if out_tokens else 0.0,
}
# Per-PDF breakdown (correct / not for each arm)
per_pdf: dict[str, dict[str, Any]] = {}
for i, q in enumerate(questions):
slot = per_pdf.setdefault(q.doc_id, {
"pages": q.pages,
"arms": {},
})
for arm_name in active_arms:
slot["arms"].setdefault(arm_name, {
"correct": per_arm_grades[arm_name][i].correct,
"f1": per_arm_grades[arm_name][i].f1,
})
return {
"per_arm": per_arm,
"per_pdf": per_pdf,
"n_questions": len(questions),
"n_unique_pdfs": len(pdf_pages),
"total_pages_in_scope": sum(pdf_pages.values()),
}
__all__ = ["ParserCompareBenchmark", "PCQuestion"]

View file

@ -1,22 +1,10 @@
"""CRAG — Comprehensive RAG Benchmark (Yang et al., Meta, KDD Cup 2024).
Source: https://github.com/facebookresearch/CRAG (Tasks 1, 2, and 3)
Source: https://github.com/facebookresearch/CRAG (Tasks 1 & 2)
Paper: https://arxiv.org/abs/2406.04744
This package registers two siblings:
* ``crag`` Tasks 1 & 2: 5 candidate pages per question.
* ``crag_t3`` Task 3: 50 candidate pages per question. The
long-context arm is capped to the top-5 (the realistic "naive
RAG = pick top-K results" baseline); SurfSense retrieves over
all 50, where its rerank becomes the entire contribution.
Both share the grader, prompt, runner, and report code; only the
ingest path differs (single bz2 vs 4-part tar.bz2 streamed).
CRAG ships ~2,706 factual QA pairs, each paired with **5 full HTML
pages** retrieved as the top-5 of a real web search at ``query_time``
(50 in Task 3).
pages** retrieved as the top-5 of a real web search at ``query_time``.
The benchmark spans 5 domains (finance, music, movie, sports, open)
and 8 question types (simple, comparison, aggregation, set, multi-hop,
post-processing, false_premise, simple_w_condition) heads/torsos/
@ -51,7 +39,6 @@ relative to refusals.
from __future__ import annotations
from ....core import registry as _registry
from .runner import CragBenchmark, CragTask3Benchmark
from .runner import CragBenchmark
_registry.register(CragBenchmark())
_registry.register(CragTask3Benchmark())

View file

@ -1,263 +0,0 @@
"""CRAG Task 3 dataset loader — 4-part tar.bz2 → streaming JSONL.
Task 3 ships ~7 GB of compressed data split into 4 parts on GitHub:
crag_task_3_dev_v4.tar.bz2.part1 (2 GB)
crag_task_3_dev_v4.tar.bz2.part2 (2 GB)
crag_task_3_dev_v4.tar.bz2.part3 (2 GB)
crag_task_3_dev_v4.tar.bz2.part4 (1.3 GB)
Concatenated, they form a tar archive containing a single JSONL file.
Decompressed, that JSONL is on the order of 30-50 GB because each row
embeds 50 full HTML pages (vs 5 in Tasks 1 & 2).
Materialising the JSONL would blow the disk budget (we have ~50 GB
free at the time of writing), so we stream the whole thing instead:
1. Download parts (idempotent; ``scripts/download_crag_task3.py``).
2. Concat them into a virtual file via ``_MultiPartReader``.
3. Wrap in ``bz2.BZ2File`` for on-the-fly decompression.
4. Wrap in ``tarfile.open(fileobj=..., mode="r|")`` for streaming
tar member iteration.
5. For the JSONL member inside, ``tar.extractfile()`` returns a
binary file-like; we iterate lines and yield parsed dicts.
The caller can ``break`` out as soon as they have enough samples
nothing past the consumed point is decompressed.
Schema is identical to Tasks 1 & 2 (see ``dataset.py``); only
``search_results`` is bigger (50 entries instead of 5).
"""
from __future__ import annotations
import bz2
import json
import logging
import tarfile
from collections.abc import Iterator
from pathlib import Path
from typing import IO
from .dataset import (
CragPage,
CragQuestion,
_parse_alt_answers,
_parse_pages,
)
logger = logging.getLogger(__name__)
CRAG_TASK_3_PART_URLS: tuple[str, ...] = tuple(
"https://github.com/facebookresearch/CRAG/raw/refs/heads/main/data/"
f"crag_task_3_dev_v4.tar.bz2.part{i}"
for i in (1, 2, 3, 4)
)
CRAG_TASK_3_PART_NAMES: tuple[str, ...] = tuple(
f"crag_task_3_dev_v4.tar.bz2.part{i}" for i in (1, 2, 3, 4)
)
# ---------------------------------------------------------------------------
# Multi-part virtual file (concatenates N files transparently)
# ---------------------------------------------------------------------------
class _MultiPartReader:
"""Read N files end-to-end as if they were one big file.
Implements just enough of the file protocol for ``bz2.BZ2File``
to consume it: ``read(n)``, ``readable()``, ``close()``.
Doesn't implement ``seek`` — the bz2 + tarfile streaming path
is forward-only, which is what we want here.
"""
def __init__(self, paths: list[Path]) -> None:
if not paths:
raise ValueError("_MultiPartReader needs at least one path")
for p in paths:
if not p.exists():
raise FileNotFoundError(p)
self._paths = list(paths)
self._idx = 0
self._fh: IO[bytes] | None = self._paths[0].open("rb")
self._closed = False
def read(self, n: int = -1) -> bytes:
if self._closed:
raise ValueError("read of closed _MultiPartReader")
if n is None or n < 0:
chunks: list[bytes] = []
while self._fh is not None:
chunks.append(self._fh.read())
self._advance()
return b"".join(chunks)
out: list[bytes] = []
remaining = n
while remaining > 0 and self._fh is not None:
chunk = self._fh.read(remaining)
if not chunk:
self._advance()
continue
out.append(chunk)
remaining -= len(chunk)
return b"".join(out)
def _advance(self) -> None:
if self._fh is not None:
self._fh.close()
self._fh = None
self._idx += 1
if self._idx < len(self._paths):
self._fh = self._paths[self._idx].open("rb")
def readable(self) -> bool:
return not self._closed
def close(self) -> None:
if self._fh is not None:
self._fh.close()
self._fh = None
self._closed = True
def __enter__(self) -> _MultiPartReader:
return self
def __exit__(self, exc_type, exc, tb) -> None: # type: ignore[no-untyped-def]
self.close()
# ---------------------------------------------------------------------------
# Stream the JSONL inside the tar.bz2
# ---------------------------------------------------------------------------
def _is_jsonl_member(name: str) -> bool:
return name.endswith(".jsonl") or name.endswith(".jsonl.txt")
def iter_questions_task3(
parts_dir: Path,
*,
max_questions: int | None = None,
) -> list[CragQuestion]:
"""Stream-parse Task 3 rows into ``CragQuestion`` objects.
The Task 3 archive ships its 2,706 questions sharded across
multiple JSONL files inside the tar (e.g.
``crag_task_3_dev_v4_0.jsonl``, ``..._1.jsonl``, ). We iterate
members in-stream, parse every JSONL one we encounter, and stop
as soon as ``max_questions`` is reached at which point we
don't decompress any further members.
For a typical n=50 sample at ~3 MB per row we touch ~150 MB of
decompressed JSONL almost always inside the first shard.
"""
parts = [parts_dir / name for name in CRAG_TASK_3_PART_NAMES]
multi = _MultiPartReader(parts)
bz = bz2.BZ2File(multi, mode="rb")
tar = tarfile.open(fileobj=bz, mode="r|")
out: list[CragQuestion] = []
raw_idx = 0
found_jsonl = False
try:
for member in tar:
if not member.isfile() or not _is_jsonl_member(member.name):
continue
found_jsonl = True
logger.info(
"CRAG Task 3: streaming JSONL shard %s (size: %d bytes)",
member.name, member.size,
)
fh = tar.extractfile(member)
if fh is None:
logger.warning("tar.extractfile returned None for %s; skipping", member.name)
continue
try:
for raw_line in fh:
line = raw_line.decode("utf-8", errors="replace").strip()
if not line:
continue
try:
row = json.loads(line)
except json.JSONDecodeError as exc:
logger.warning(
"Skipping malformed CRAG Task 3 row %d in %s: %s",
raw_idx, member.name, exc,
)
raw_idx += 1
continue
query = str(row.get("query") or "").strip()
answer = str(row.get("answer") or "").strip()
if not query or not answer:
raw_idx += 1
continue
out.append(CragQuestion(
qid=f"T3_{raw_idx:05d}",
interaction_id=str(row.get("interaction_id") or "").strip(),
query_time=str(row.get("query_time") or "").strip(),
query=query,
gold_answer=answer,
alt_answers=_parse_alt_answers(row.get("alt_ans")),
domain=str(row.get("domain") or "").strip().lower(),
question_type=str(row.get("question_type") or "").strip().lower(),
static_or_dynamic=str(row.get("static_or_dynamic") or "").strip().lower(),
popularity=str(row.get("popularity") or "").strip().lower(),
split=int(row.get("split") or 0),
raw_index=raw_idx,
pages=_parse_pages(row.get("search_results")),
))
raw_idx += 1
if max_questions is not None and len(out) >= max_questions:
return out
finally:
try:
fh.close()
except Exception: # noqa: BLE001
pass
if not found_jsonl:
raise RuntimeError(
"No JSONL member found inside Task 3 tar.bz2 archive; "
"schema may have changed upstream."
)
finally:
try:
tar.close()
except Exception: # noqa: BLE001
pass
try:
bz.close()
except Exception: # noqa: BLE001
pass
try:
multi.close()
except Exception: # noqa: BLE001
pass
return out
def parts_present(parts_dir: Path) -> bool:
"""``True`` iff all 4 parts exist on disk and are non-empty."""
for name in CRAG_TASK_3_PART_NAMES:
p = parts_dir / name
if not p.exists() or p.stat().st_size == 0:
return False
return True
# ---------------------------------------------------------------------------
# Re-exports for convenience
# ---------------------------------------------------------------------------
__all__ = [
"CRAG_TASK_3_PART_NAMES",
"CRAG_TASK_3_PART_URLS",
"CragPage",
"CragQuestion",
"iter_questions_task3",
"parts_present",
]

View file

@ -436,12 +436,4 @@ async def _retry_upload_idempotent( # noqa: D401 - hidden helper
return {}
__all__ = [
"_IngestStats",
"_materialise_pages",
"_page_filename",
"_resolve_question_doc_ids",
"_upload_pages",
"read_page_markdown",
"run_ingest",
]
__all__ = ["read_page_markdown", "run_ingest"]

View file

@ -1,191 +0,0 @@
"""CRAG Task 3 ingestion: 4-part download → streaming JSONL → upload.
Same flow as ``ingest.run_ingest`` for Tasks 1 & 2 (extract HTML
upload markdown resolve doc_ids write doc map), but:
* Source: 4 .tar.bz2 parts streamed via ``dataset_task3``.
* Page count: 50 per question instead of 5 the whole point of
Task 3 (the long-context arm now structurally has to choose what
to keep, while SurfSense's retrieval becomes mandatory).
* Stratified sampling re-uses the Task 1 helper since the question
schema is identical.
Doc map lands at ``<suite_data>/maps/crag_t3_doc_map.jsonl`` with the
same row shape as Task 1's map (so the runner only needs to know
which file to load; everything else is shared).
"""
from __future__ import annotations
import json
import logging
from pathlib import Path
from ....core.config import set_suite_state
from ....core.ingest_settings import IngestSettings, settings_header_line
from ....core.registry import RunContext
from .dataset import stratified_sample, write_questions_jsonl
from .dataset_task3 import (
CRAG_TASK_3_PART_NAMES,
iter_questions_task3,
parts_present,
)
from .ingest import (
_IngestStats,
_materialise_pages,
_resolve_question_doc_ids,
_upload_pages,
)
logger = logging.getLogger(__name__)
_INSTRUCTIONS_TO_DOWNLOAD = (
"Run `python scripts/download_crag_task3.py` first to fetch the "
"4 tar.bz2 parts (~7 GB total) into "
"data/research/crag_t3/.raw_cache/. The downloader is idempotent "
"and parallel."
)
async def run_ingest_task3(
ctx: RunContext,
*,
n_questions: int | None = None,
upload_batch_size: int = 16,
skip_upload: bool = False,
overwrite_extract: bool = False,
settings: IngestSettings | None = None,
sample_seed: int = 17,
parse_cap: int | None = None,
) -> None:
"""Ingest CRAG Task 3 (50 pages per question) into the research suite.
Parameters
----------
n_questions
Cap on the post-stratified-sample question count. ``None`` =
"use whatever ``parse_cap`` produced". For real runs aim for
50 (~2,500 pages) n=200 (10k pages) is doable but slow.
parse_cap
Hard cap on how many rows we *parse* from the streaming
archive before stratified sampling. Defaults to
``max(400, 6*n_questions)`` enough to cover all (domain,
question_type) buckets ~5x but small enough to fit in the
first shard or two (each shard is 5 GB decompressed and
holds ~300 rows; bz2 throughput is ~50 MB/s). Lowering this
is the only knob that bounds streaming cost since we can
``break`` out of the JSONL stream early without decompressing
the rest of the ~50 GB archive body.
upload_batch_size
Markdown files per ``/documents/fileupload`` call.
skip_upload
Extract markdown locally, don't push to SurfSense.
overwrite_extract
Re-run trafilatura even when a cached markdown is present.
settings
Per-upload knobs override (default: text-only basic ETL).
sample_seed
RNG seed for stratified sampling (deterministic).
"""
settings = settings or IngestSettings(
use_vision_llm=False,
processing_mode="basic",
should_summarize=False,
)
bench_dir = ctx.benchmark_data_dir()
pages_dir = bench_dir / "pages"
raw_cache = bench_dir / ".raw_cache"
raw_cache.mkdir(parents=True, exist_ok=True)
if not parts_present(raw_cache):
missing = [
n for n in CRAG_TASK_3_PART_NAMES
if not (raw_cache / n).exists()
]
raise RuntimeError(
f"CRAG Task 3 parts missing from {raw_cache}: {missing}. "
f"{_INSTRUCTIONS_TO_DOWNLOAD}"
)
# 1. Stream-parse (capped). For n=50 we don't need the full 2,706
# rows — just enough that the stratified sampler can balance.
# Each tar shard ~5 GB / ~300 rows / ~2 min decompress, so
# 400-500 rows = shard 0 + a slice of shard 1 ≈ 3-4 min.
parse_cap = parse_cap or (
max(400, 6 * (n_questions or 50)) if n_questions else None
)
logger.info(
"CRAG Task 3: streaming JSONL (parse_cap=%s) ...",
parse_cap if parse_cap else "no-cap",
)
all_questions = iter_questions_task3(raw_cache, max_questions=parse_cap)
logger.info("CRAG Task 3: parsed %d rows", len(all_questions))
if not all_questions:
raise RuntimeError("CRAG Task 3 streaming returned 0 rows; check archive integrity.")
if n_questions is not None and n_questions > 0:
questions = stratified_sample(all_questions, n=n_questions, seed=sample_seed)
logger.info(
"CRAG Task 3: stratified sample of %d questions across %d (domain, qtype) buckets",
len(questions),
len({(q.domain, q.question_type) for q in questions}),
)
else:
questions = all_questions
questions_jsonl = bench_dir / "questions.jsonl"
write_questions_jsonl(questions, questions_jsonl)
n_pages_total = sum(len(q.pages) for q in questions)
logger.info(
"CRAG Task 3: extracting up to %d pages across %d questions ...",
n_pages_total, len(questions),
)
qid_to_files, _file_to_url = _materialise_pages(
questions, pages_dir=pages_dir, overwrite=overwrite_extract,
)
n_pages_extracted = sum(len(v) for v in qid_to_files.values())
name_to_id: dict[str, int] = {}
if skip_upload:
logger.info("CRAG Task 3: --skip-upload; skipping SurfSense ingestion")
else:
all_filenames = sorted({fn for fns in qid_to_files.values() for fn in fns})
logger.info("CRAG Task 3: uploading %d unique pages ...", len(all_filenames))
name_to_id = await _upload_pages(
ctx,
pages_dir=pages_dir,
filenames=all_filenames,
batch_size=upload_batch_size,
settings=settings,
)
doc_rows = _resolve_question_doc_ids(questions, qid_to_files, name_to_id)
map_path = ctx.maps_dir() / "crag_t3_doc_map.jsonl"
with map_path.open("w", encoding="utf-8") as fh:
fh.write(settings_header_line(settings) + "\n")
for row in doc_rows:
fh.write(json.dumps(row) + "\n")
logger.info("Wrote CRAG Task 3 doc map to %s (%d rows)", map_path, len(doc_rows))
new_state = ctx.suite_state
new_state.ingestion_maps["crag_t3"] = str(map_path)
set_suite_state(ctx.config, ctx.suite, new_state)
stats = _IngestStats(
n_questions=len(questions),
n_pages_total=n_pages_total,
n_pages_extracted=n_pages_extracted,
n_pages_empty=n_pages_total - n_pages_extracted,
n_uploaded=len(name_to_id),
n_existing=0,
bench_dir=bench_dir,
map_path=map_path,
)
logger.info("CRAG Task 3 ingest done: %s", stats)
__all__ = ["run_ingest_task3"]

View file

@ -189,18 +189,6 @@ class CragBenchmark:
headline: bool = True
description: str = _DESCRIPTION
# Subclasses (e.g. Task 3) override these without re-implementing run().
doc_map_filename: str = "crag_doc_map.jsonl"
# 0 = use ALL pages in the long-context arm. Task 3 defaults to 5
# so the long-context arm models the realistic "stuff the top-5
# search results into the prompt" baseline rather than blowing
# past the 128k-token context window with all 50 pages.
default_long_context_top_n: int = 0
pages_per_question_label: str = "5 pages"
ingest_hint: str = (
"`python -m surfsense_evals ingest research crag --n-questions 200`"
)
def add_run_args(self, parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--n", dest="sample_n", type=int, default=None,
@ -230,15 +218,6 @@ class CragBenchmark:
"--per-page-char-cap", dest="per_page_char_cap", type=int, default=12_000,
help="Long-context arm: max chars per page before truncation (default 12k).",
)
parser.add_argument(
"--long-context-top-n-pages", dest="long_context_top_n_pages",
type=int, default=self.default_long_context_top_n,
help=(
"Long-context arm: keep only the first N pages from the "
"question's candidate list (0 = use all). Task 3 defaults "
"to 5 (the realistic 'naive RAG' top-K baseline)."
),
)
parser.add_argument(
"--skip-bare", dest="skip_bare", action="store_true",
help="Skip the bare-LLM arm (saves cost on re-runs).",
@ -317,11 +296,6 @@ class CragBenchmark:
concurrency = int(opts.get("concurrency") or 4)
max_output_tokens = int(opts.get("max_output_tokens") or 512)
per_page_char_cap = int(opts.get("per_page_char_cap") or 12_000)
long_context_top_n_pages = int(
opts.get("long_context_top_n_pages")
if opts.get("long_context_top_n_pages") is not None
else self.default_long_context_top_n
)
skip_bare = bool(opts.get("skip_bare"))
skip_long_context = bool(opts.get("skip_long_context"))
skip_surfsense = bool(opts.get("skip_surfsense"))
@ -331,11 +305,11 @@ class CragBenchmark:
judge_concurrency = int(opts.get("judge_concurrency") or 4)
bench_dir = ctx.benchmark_data_dir()
map_path = ctx.maps_dir() / self.doc_map_filename
map_path = ctx.maps_dir() / "crag_doc_map.jsonl"
if not map_path.exists():
raise RuntimeError(
f"{self.name} not ingested for this suite. Run "
f"{self.ingest_hint} first."
"CRAG not ingested for this suite. Run "
"`python -m surfsense_evals ingest research crag --n-questions 200` first."
)
rows, ingest_settings = _load_doc_map(map_path)
@ -407,13 +381,7 @@ class CragBenchmark:
async def _long_context_one(q: CragRunnerQuestion) -> ArmResult:
assert long_context_arm is not None
return await long_context_arm.answer(
_make_long_context_request(
q,
bench_dir,
max_output_tokens,
per_page_char_cap,
top_n_pages=long_context_top_n_pages,
)
_make_long_context_request(q, bench_dir, max_output_tokens, per_page_char_cap)
)
async def _surf_one(q: CragRunnerQuestion) -> ArmResult:
@ -503,8 +471,6 @@ class CragBenchmark:
"agent_llm_id": ctx.agent_llm_id,
"ingest_settings": ingest_settings,
"per_page_char_cap": per_page_char_cap,
"long_context_top_n_pages": long_context_top_n_pages,
"pages_per_question_label": self.pages_per_question_label,
"max_output_tokens": max_output_tokens,
"arms_active": {
"bare_llm": bare_arm is not None,
@ -565,29 +531,18 @@ class CragBenchmark:
if not active.get("long_context", True):
body_lines.append("- Long-context arm: SKIPPED.")
else:
top_n = int(extra.get("long_context_top_n_pages") or 0)
page_phrase = (
f"top-{top_n} of {extra.get('pages_per_question_label') or 'pages'}"
if top_n > 0
else f"all of {extra.get('pages_per_question_label') or 'pages'}"
)
body_lines.append(
f"- Long-context arm (`{extra.get('native_arm_model') or '?'}`, "
f"{page_phrase} stuffed into prompt; per-page cap "
f"all 5 pages stuffed into prompt; per-page cap "
f"{extra.get('per_page_char_cap', 12_000):,} chars):"
)
body_lines.append(_arm_summary_lines(lc, indent=" "))
if not active.get("surfsense", True):
body_lines.append("- SurfSense arm: SKIPPED.")
else:
scope_phrase = (
"whole SearchSpace"
if extra.get("no_mention_scope")
else f"per-question {extra.get('pages_per_question_label') or 'pages'}"
)
body_lines.append(
f"- SurfSense arm (`{extra.get('provider_model', '?')}`, retrieval over "
f"{scope_phrase}):"
f"{'whole SearchSpace' if extra.get('no_mention_scope') else 'per-question 5 pages'}):"
)
body_lines.append(_arm_summary_lines(surf, indent=" "))
@ -673,17 +628,9 @@ def _make_long_context_request(
bench_dir: Path,
max_tokens: int,
per_page_char_cap: int,
*,
top_n_pages: int = 0,
) -> ArmRequest:
# The CRAG search_results list is already ranked top-K from the
# original web search at query_time; slicing the prefix is the
# honest "naive RAG: take the top-K results" baseline.
page_iter = q.page_filenames
if top_n_pages and top_n_pages > 0:
page_iter = page_iter[:top_n_pages]
contexts: list[tuple[str, str]] = []
for fn in page_iter:
for fn in q.page_filenames:
text = read_page_markdown(bench_dir, fn) or ""
if not text.strip():
continue
@ -993,61 +940,4 @@ def _fmt(value: Any, ndigits: int) -> str:
return "?"
_TASK3_DESCRIPTION = (
"CRAG Task 3 (Meta KDD Cup 2024) — same 3 arms but the corpus per "
"question now has **50 candidate web pages** (vs 5 in Tasks 1 & 2). "
"The long-context arm uses only the top-5 (the realistic naive-RAG "
"baseline); SurfSense retrieves over all 50, where its rerank "
"becomes the actual contribution."
)
class CragTask3Benchmark(CragBenchmark):
"""3-arm CRAG runner over Task 3 (50 pages per question).
Reuses the entire Task 1/2 runtime (grader, prompt, metrics,
reporting) the only deltas are: the doc map filename, the
long-context arm's default page cap (5 instead of all 50), and
the ingest entrypoint (4-part archive instead of single bz2).
"""
name: str = "crag_t3"
description: str = _TASK3_DESCRIPTION
doc_map_filename: str = "crag_t3_doc_map.jsonl"
default_long_context_top_n: int = 5
pages_per_question_label: str = "50 pages"
ingest_hint: str = (
"`python -m surfsense_evals ingest research crag_t3 --n-questions 50` "
"(after `python scripts/download_crag_task3.py`)"
)
async def ingest(self, ctx: RunContext, **opts: Any) -> None:
# Local import: keep dataset_task3's lazy-streaming module out
# of the import graph until someone actually wants Task 3.
from .ingest_task3 import run_ingest_task3
settings = IngestSettings.merge(_DEFAULT_INGEST_SETTINGS, opts)
await run_ingest_task3(
ctx,
n_questions=opts.get("n_questions"),
upload_batch_size=int(opts.get("upload_batch_size") or 16),
skip_upload=bool(opts.get("skip_upload", False)),
overwrite_extract=bool(opts.get("overwrite_extract", False)),
settings=settings,
sample_seed=int(opts.get("sample_seed") or 17),
parse_cap=opts.get("parse_cap"),
)
def add_run_args(self, parser: argparse.ArgumentParser) -> None:
super().add_run_args(parser)
parser.add_argument(
"--parse-cap", dest="parse_cap", type=int, default=None,
help=(
"(ingest only) Hard cap on rows parsed from the streaming "
"Task 3 archive before stratified sampling. Default: "
"max(2000, 10 * n_questions). Lower = less decompression."
),
)
__all__ = ["CragBenchmark", "CragRunnerQuestion", "CragTask3Benchmark"]
__all__ = ["CragBenchmark", "CragRunnerQuestion"]