perf(mcp): add per-call, discovery, and oauth-refresh timing logs

This commit is contained in:
CREDO23 2026-05-19 21:29:56 +02:00
parent d1d44dc4c5
commit 9bfba34e8e

View file

@ -38,6 +38,9 @@ from app.agents.new_chat.tools.hitl import request_approval
from app.agents.new_chat.tools.mcp_client import MCPClient
from app.db import SearchSourceConnector
from app.services.mcp_oauth.registry import MCP_SERVICES, get_service_by_connector_type
from app.utils.perf import get_perf_logger
_perf_log = get_perf_logger()
logger = logging.getLogger(__name__)
@ -293,15 +296,21 @@ async def _create_mcp_tool_from_definition_http(
timeout: float = 60.0,
) -> str:
"""Execute a single MCP HTTP call with the given headers."""
call_start = time.perf_counter()
async with (
streamablehttp_client(url, headers=call_headers) as (read, write, _),
ClientSession(read, write) as session,
):
init_start = time.perf_counter()
await session.initialize()
init_elapsed = time.perf_counter() - init_start
tool_start = time.perf_counter()
response = await asyncio.wait_for(
session.call_tool(original_tool_name, arguments=call_kwargs),
timeout=timeout,
)
tool_elapsed = time.perf_counter() - tool_start
result = []
for content in response.content:
@ -312,7 +321,18 @@ async def _create_mcp_tool_from_definition_http(
else:
result.append(str(content))
return "\n".join(result) if result else ""
payload = "\n".join(result) if result else ""
_perf_log.info(
"[mcp_http_call] connector=%s tool=%s init=%.3fs call=%.3fs total=%.3fs out_chars=%d",
connector_id,
original_tool_name,
init_elapsed,
tool_elapsed,
time.perf_counter() - call_start,
len(payload),
)
return payload
async def mcp_http_tool_call(**kwargs) -> str:
"""Execute the MCP tool call via HTTP transport."""
@ -792,14 +812,25 @@ async def _maybe_refresh_mcp_oauth_token(
except (ValueError, TypeError):
return server_config
refresh_start = time.perf_counter()
try:
new_access = await _refresh_connector_token(session, connector)
if not new_access:
_perf_log.info(
"[mcp_oauth_refresh] connector=%s elapsed=%.3fs outcome=no_token",
connector.id,
time.perf_counter() - refresh_start,
)
return server_config
logger.info(
"Proactively refreshed MCP OAuth token for connector %s", connector.id
)
_perf_log.info(
"[mcp_oauth_refresh] connector=%s elapsed=%.3fs outcome=refreshed",
connector.id,
time.perf_counter() - refresh_start,
)
refreshed_config = dict(server_config)
refreshed_config["headers"] = {
@ -809,6 +840,11 @@ async def _maybe_refresh_mcp_oauth_token(
return refreshed_config
except Exception:
_perf_log.info(
"[mcp_oauth_refresh] connector=%s elapsed=%.3fs outcome=failed",
connector.id,
time.perf_counter() - refresh_start,
)
logger.warning(
"Failed to refresh MCP OAuth token for connector %s",
connector.id,
@ -1074,9 +1110,11 @@ async def load_mcp_tools(
)
async def _discover_one(task: dict[str, Any]) -> list[StructuredTool]:
discover_start = time.perf_counter()
transport = task["transport"]
try:
if task["transport"] in ("streamable-http", "http", "sse"):
return await asyncio.wait_for(
if transport in ("streamable-http", "http", "sse"):
result = await asyncio.wait_for(
_load_http_mcp_tools(
task["connector_id"],
task["connector_name"],
@ -1091,7 +1129,7 @@ async def load_mcp_tools(
timeout=_MCP_DISCOVERY_TIMEOUT_SECONDS,
)
else:
return await asyncio.wait_for(
result = await asyncio.wait_for(
_load_stdio_mcp_tools(
task["connector_id"],
task["connector_name"],
@ -1101,7 +1139,23 @@ async def load_mcp_tools(
),
timeout=_MCP_DISCOVERY_TIMEOUT_SECONDS,
)
_perf_log.info(
"[mcp_discover] connector=%s name=%r transport=%s tools=%d elapsed=%.3fs",
task["connector_id"],
task["connector_name"],
transport,
len(result),
time.perf_counter() - discover_start,
)
return result
except TimeoutError:
_perf_log.info(
"[mcp_discover] connector=%s name=%r transport=%s elapsed=%.3fs outcome=timeout",
task["connector_id"],
task["connector_name"],
transport,
time.perf_counter() - discover_start,
)
logger.error(
"MCP connector %d timed out after %ds during discovery",
task["connector_id"],
@ -1109,6 +1163,13 @@ async def load_mcp_tools(
)
return []
except Exception as e:
_perf_log.info(
"[mcp_discover] connector=%s name=%r transport=%s elapsed=%.3fs outcome=error",
task["connector_id"],
task["connector_name"],
transport,
time.perf_counter() - discover_start,
)
logger.exception(
"Failed to load tools from MCP connector %d: %s",
task["connector_id"],
@ -1116,7 +1177,14 @@ async def load_mcp_tools(
)
return []
gather_start = time.perf_counter()
results = await asyncio.gather(*[_discover_one(t) for t in discovery_tasks])
_perf_log.info(
"[mcp_discover] gather_wall=%.3fs connectors=%d total_tools=%d",
time.perf_counter() - gather_start,
len(discovery_tasks),
sum(len(r) for r in results),
)
tools: list[StructuredTool] = [tool for sublist in results for tool in sublist]
_mcp_tools_cache[cache_key] = (now, tools)