feat(middleware): enhance performance logging in chat agents

- Integrated performance logging in `OtelSpanMiddleware` to track model call durations even when OTel is disabled.
- Added detailed performance metrics in `KnowledgePriorityMiddleware` for database operations and embedding processes, improving visibility into query performance.
- Utilized `get_perf_logger` for consistent logging across middleware components.
This commit is contained in:
DESKTOP-RTLN3BA\$punk 2026-06-09 00:28:53 -07:00
parent 640ef5f15d
commit 0a012dbc79
2 changed files with 71 additions and 2 deletions

View file

@ -24,6 +24,7 @@ from langchain.agents.middleware import AgentMiddleware
from langchain_core.messages import AIMessage, ToolMessage
from app.observability import metrics as ot_metrics, otel as ot
from app.utils.perf import get_perf_logger
if TYPE_CHECKING: # pragma: no cover — type-only
from langchain.agents.middleware.types import (
@ -34,6 +35,7 @@ if TYPE_CHECKING: # pragma: no cover — type-only
from langgraph.types import Command
logger = logging.getLogger(__name__)
_perf_log = get_perf_logger()
class OtelSpanMiddleware(AgentMiddleware):
@ -60,7 +62,23 @@ class OtelSpanMiddleware(AgentMiddleware):
handler: Callable[[ModelRequest], Awaitable[ModelResponse | AIMessage | Any]],
) -> ModelResponse | AIMessage | Any:
if not ot.is_enabled():
return await handler(request)
# Always emit a [PERF] line for the model step even when OTel is
# disabled. This isolates provider/model latency from the agent's
# pre-flight (before_agent KB-priority/memory/tree) work, which is
# the usual culprit when the multi-agent path feels slow to start.
# ``perf_counter`` at entry doubles as the "before_agent finished /
# model call started" marker on the first step of a turn.
model_id, _provider = _resolve_model_attrs(request)
_t0 = time.perf_counter()
_perf_log.info("[model_call] start model=%s", model_id)
try:
return await handler(request)
finally:
_perf_log.info(
"[model_call] done model=%s elapsed=%.3fs",
model_id,
time.perf_counter() - _t0,
)
model_id, provider = _resolve_model_attrs(request)
t0 = time.perf_counter()

View file

@ -27,6 +27,7 @@ import asyncio
import json
import logging
import re
import time
from collections.abc import Sequence
from datetime import UTC, datetime
from typing import Any
@ -346,6 +347,7 @@ async def browse_recent_documents(
from app.db import DocumentType
_t0 = time.perf_counter()
async with shielded_async_session() as session:
base_conditions = [
Document.search_space_id == search_space_id,
@ -445,6 +447,12 @@ async def browse_recent_documents(
),
}
)
_perf_log.info(
"[kb_priority.recent] db=%.3fs docs=%d space=%d",
time.perf_counter() - _t0,
len(results),
search_space_id,
)
return results
@ -462,10 +470,18 @@ async def search_knowledge_base(
if not query:
return []
# ``embed_texts`` serializes behind a global embedding lock and, for API
# models, makes a network round-trip — so this can stall while another
# turn is embedding. Timed separately from the DB search to tell the two
# apart when debugging slow time-to-first-token.
_t_embed = time.perf_counter()
[embedding] = await asyncio.to_thread(embed_texts, [query])
_embed_elapsed = time.perf_counter() - _t_embed
doc_types = _resolve_search_types(available_connectors, available_document_types)
retriever_top_k = min(top_k * 3, 30)
_t_search = time.perf_counter()
async with shielded_async_session() as session:
retriever = ChucksHybridSearchRetriever(session)
results = await retriever.hybrid_search(
@ -477,7 +493,16 @@ async def search_knowledge_base(
end_date=end_date,
query_embedding=embedding.tolist(),
)
_search_elapsed = time.perf_counter() - _t_search
_perf_log.info(
"[kb_priority.search] embed=%.3fs hybrid_search=%.3fs results=%d space=%d query=%r",
_embed_elapsed,
_search_elapsed,
len(results),
search_space_id,
query[:80],
)
return results[:top_k]
@ -490,6 +515,7 @@ async def fetch_mentioned_documents(
if not document_ids:
return []
_t0 = time.perf_counter()
async with shielded_async_session() as session:
doc_result = await session.execute(
select(Document).where(
@ -546,6 +572,12 @@ async def fetch_mentioned_documents(
"_user_mentioned": True,
}
)
_perf_log.info(
"[kb_priority.mentioned] db=%.3fs requested=%d resolved=%d",
time.perf_counter() - _t0,
len(document_ids),
len(results),
)
return results
@ -839,6 +871,7 @@ class KnowledgePriorityMiddleware(AgentMiddleware): # type: ignore[type-arg]
search_space_id=self.search_space_id,
)
_t_search_phase = time.perf_counter()
if is_recency:
doc_types = _resolve_search_types(
self.available_connectors, self.available_document_types
@ -860,6 +893,7 @@ class KnowledgePriorityMiddleware(AgentMiddleware): # type: ignore[type-arg]
start_date=start_date,
end_date=end_date,
)
_search_phase_elapsed = time.perf_counter() - _t_search_phase
seen_doc_ids: set[int] = set()
merged: list[dict[str, Any]] = []
@ -874,15 +908,26 @@ class KnowledgePriorityMiddleware(AgentMiddleware): # type: ignore[type-arg]
continue
merged.append(doc)
_t_materialize = time.perf_counter()
priority, matched_chunk_ids = await self._materialize_priority(merged)
if folder_mention_ids:
folder_entries = await self._materialize_folder_priority(folder_mention_ids)
priority = folder_entries + priority
_materialize_elapsed = time.perf_counter() - _t_materialize
# ``recency=...`` reflects which retrieval path ran (recency browse vs
# hybrid search). The planner phase is logged separately by
# ``_plan_search_inputs``; here ``search_phase`` and ``materialize``
# break down the remaining DB-bound work so a slow turn can be
# attributed to planner / search / materialize at a glance.
_perf_log.info(
"[kb_priority] completed in %.3fs query=%r priority=%d mentioned=%d folders=%d",
"[kb_priority] completed in %.3fs (search_phase=%.3fs materialize=%.3fs "
"recency=%s) query=%r priority=%d mentioned=%d folders=%d",
asyncio.get_event_loop().time() - t0,
_search_phase_elapsed,
_materialize_elapsed,
is_recency,
user_text[:80],
len(priority),
len(mentioned_results),
@ -958,6 +1003,7 @@ class KnowledgePriorityMiddleware(AgentMiddleware): # type: ignore[type-arg]
if not merged:
return priority, matched_chunk_ids
_t0 = time.perf_counter()
async with shielded_async_session() as session:
index: PathIndex = await build_path_index(session, self.search_space_id)
doc_ids = [
@ -1006,6 +1052,11 @@ class KnowledgePriorityMiddleware(AgentMiddleware): # type: ignore[type-arg]
matched_chunk_ids[doc_id] = [
int(cid) for cid in chunk_ids if isinstance(cid, int | str)
]
_perf_log.info(
"[kb_priority.materialize] db=%.3fs docs=%d",
time.perf_counter() - _t0,
len(merged),
)
return priority, matched_chunk_ids