retrieval: instrument hybrid search; note deferred citation markers

This commit is contained in:
CREDO23 2026-06-25 09:00:23 +02:00
parent e12afa7c8f
commit e72b17fbed
2 changed files with 52 additions and 1 deletions

View file

@ -23,6 +23,9 @@ def to_frontend_payload(entry: CitationEntry) -> str | None:
url = locator.get("url") url = locator.get("url")
return url or None return url or None
case _: case _:
# Connector items and chat turns have no client-side renderer yet
# (the frontend resolves only chunk ids and URLs), so they stay
# unmarked until both a registration path and a renderer exist.
return None return None

View file

@ -9,6 +9,7 @@ from __future__ import annotations
import asyncio import asyncio
import contextlib import contextlib
import time
from sqlalchemy import func, select, text from sqlalchemy import func, select, text
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
@ -16,12 +17,15 @@ from sqlalchemy.orm import joinedload
from app.config import config from app.config import config
from app.db import Chunk, Document, DocumentType from app.db import Chunk, Document, DocumentType
from app.observability import metrics, otel
from app.utils.perf import get_perf_logger
from .models import ChunkHit, DocumentHit, SearchScope from .models import ChunkHit, DocumentHit, SearchScope
_RRF_K = 60 _RRF_K = 60
_CANDIDATE_MULTIPLIER = 5 # fused-chunk pool size relative to top_k _CANDIDATE_MULTIPLIER = 5 # fused-chunk pool size relative to top_k
_MAX_PASSAGES_PER_DOC = 12 _MAX_PASSAGES_PER_DOC = 12
_SURFACE = "chunks"
async def search_chunks( async def search_chunks(
@ -33,7 +37,51 @@ async def search_chunks(
top_k: int, top_k: int,
query_embedding: list[float] | None = None, query_embedding: list[float] | None = None,
) -> list[DocumentHit]: ) -> list[DocumentHit]:
"""Top ``top_k`` documents for ``query`` within scope, each with its chunks.""" """Top ``top_k`` documents for ``query`` within scope, each with its chunks.
Instrumented seam: traces the search, records its duration, and logs a
timing line. The fusion logic lives in :func:`_search`.
"""
started = time.perf_counter()
with otel.kb_search_span(
search_space_id=search_space_id,
query_chars=len(query),
extra={"search.surface": _SURFACE, "search.mode": "hybrid"},
) as span:
try:
documents = await _search(
db_session,
search_space_id=search_space_id,
query=query,
scope=scope,
top_k=top_k,
query_embedding=query_embedding,
)
finally:
elapsed_ms = (time.perf_counter() - started) * 1000
metrics.record_kb_search_duration(
elapsed_ms, search_space_id=search_space_id, surface=_SURFACE
)
span.set_attribute("result.count", len(documents))
get_perf_logger().info(
"[chunk_search] hybrid in %.3fs docs=%d space=%d",
elapsed_ms / 1000,
len(documents),
search_space_id,
)
return documents
async def _search(
db_session: AsyncSession,
*,
search_space_id: int,
query: str,
scope: SearchScope,
top_k: int,
query_embedding: list[float] | None,
) -> list[DocumentHit]:
"""Fusion search itself: resolve scope, fuse the two legs, group by document."""
document_types = _resolve_document_types(scope.document_types) document_types = _resolve_document_types(scope.document_types)
if document_types == []: # types requested, none recognized → nothing matches if document_types == []: # types requested, none recognized → nothing matches
return [] return []