perf(chat): kill auto-pin preflight + speculative build, rely on reactive 429 recovery

The preflight pattern probed the LLM with a 1-token ping before each
cold turn (when requested_llm_config_id==0, llm_config_id<0, and the
45s healthy TTL had expired) to detect 429s before fanning out into
planner/classifier/title-gen. To absorb its ~1-5s RTT cost we built the
agent speculatively in parallel; on 429 we discarded the build and
repinned.

Three problems with that design:

1. False security. Provider rate limits are token-bucket. A 1-token
   ping consumes ~5 tokens; the real request consumes 10-50K. The
   probe can return 200 while the real call still 429s.
2. Pure overhead in the common case. On warm-agent-cache turns the
   probe dominates wall time: ~2.5s of TTFT pure tax for ~99% of users
   who never see a 429.
3. The in-stream recovery loop (catch of _is_provider_rate_limited
   gated by not _first_event_logged) already does the right thing
   reactively: mark_runtime_cooldown -> resolve_or_get_pinned_llm_config_id
   with exclude_config_ids={previous} -> rebuild agent -> retry the
   stream. Preflight was never the only safety net; it was a redundant
   probe in front of one.

Changes:
- Delete _preflight_llm, _settle_speculative_agent_build, and the
  _PREFLIGHT_TIMEOUT_SEC / _PREFLIGHT_MAX_TOKENS constants.
- Drop the parallel agent_build_task / preflight_task plumbing in
  both stream_new_chat and stream_resume_chat; build the agent inline
  with await _build_main_agent_for_thread(...).
- Drop the unused is_recently_healthy / mark_healthy imports here
  (still exported from auto_model_pin_service since OpenRouter
  catalogue refresh and a few tests reference clear_healthy).
- Remove the obsolete preflight + settle-speculative tests from
  test_stream_new_chat_contract.py.

Net: -447 LOC. ~2.5s removed from TTFT on every cold preflight-eligible
turn. 429 recovery path is unchanged - same repin/rebuild/retry, just
not paid in advance on the healthy path.
This commit is contained in:
CREDO23 2026-05-20 11:03:08 +02:00
parent 1791241c0c
commit c3db25302b
2 changed files with 40 additions and 487 deletions

View file

@ -64,8 +64,6 @@ from app.db import (
) )
from app.prompts import TITLE_GENERATION_PROMPT from app.prompts import TITLE_GENERATION_PROMPT
from app.services.auto_model_pin_service import ( from app.services.auto_model_pin_service import (
is_recently_healthy,
mark_healthy,
mark_runtime_cooldown, mark_runtime_cooldown,
resolve_or_get_pinned_llm_config_id, resolve_or_get_pinned_llm_config_id,
) )
@ -502,54 +500,6 @@ def _is_provider_rate_limited(exc: BaseException) -> bool:
) )
_PREFLIGHT_TIMEOUT_SEC: float = 2.5
_PREFLIGHT_MAX_TOKENS: int = 1
async def _preflight_llm(llm: Any) -> None:
"""Issue a minimal completion to confirm the pinned model isn't 429'ing.
Used before agent build / planner / classifier / title-gen so a known-bad
free OpenRouter deployment is detected and repinned before it cascades
into multiple wasted internal calls. The probe is intentionally cheap:
one token, low timeout, tagged ``surfsense:internal`` so token tracking
and SSE pipelines treat it as overhead rather than user output.
Raises the original exception when the provider responds with a
rate-limit-shaped error so the caller can drive the cooldown/repin
branch via :func:`_is_provider_rate_limited`. Other transient failures
are swallowed the caller continues to the normal stream path and the
in-stream recovery loop remains the safety net.
"""
from litellm import acompletion
model = getattr(llm, "model", None)
if not model or model == "auto":
# Auto-mode router doesn't have a single deployment to ping; the
# router itself handles per-deployment rate-limit accounting.
return
try:
await acompletion(
model=model,
messages=[{"role": "user", "content": "ping"}],
api_key=getattr(llm, "api_key", None),
api_base=getattr(llm, "api_base", None),
max_tokens=_PREFLIGHT_MAX_TOKENS,
timeout=_PREFLIGHT_TIMEOUT_SEC,
stream=False,
metadata={"tags": ["surfsense:internal", "auto-pin-preflight"]},
)
except Exception as exc:
if _is_provider_rate_limited(exc):
raise
logging.getLogger(__name__).debug(
"auto_pin_preflight non_rate_limit_error model=%s err=%s",
model,
exc,
)
async def _build_main_agent_for_thread( async def _build_main_agent_for_thread(
agent_factory: Any, agent_factory: Any,
*, *,
@ -567,9 +517,9 @@ async def _build_main_agent_for_thread(
disabled_tools: list[str] | None = None, disabled_tools: list[str] | None = None,
mentioned_document_ids: list[int] | None = None, mentioned_document_ids: list[int] | None = None,
) -> Any: ) -> Any:
"""Single (re)build path so the agent factory cannot drift across """Single (re)build path so the agent factory cannot drift across the
initial build, preflight repin, and mid-stream 429 recovery for one initial build and mid-stream 429 recovery for one ``thread_id``: a
``thread_id``: a graph swap mid-turn would corrupt checkpointer state.""" graph swap mid-turn would corrupt checkpointer state."""
return await agent_factory( return await agent_factory(
llm=llm, llm=llm,
search_space_id=search_space_id, search_space_id=search_space_id,
@ -587,29 +537,6 @@ async def _build_main_agent_for_thread(
) )
async def _settle_speculative_agent_build(task: asyncio.Task[Any]) -> None:
"""Wait for a discarded speculative agent build to release shared state.
Used by the parallel preflight + agent-build path. The speculative build
closes over the request-scoped ``AsyncSession`` (for the brief connector
discovery / tool-factory window before its CPU work moves into a worker
thread). If preflight reports a 429 we want to fall back to the original
repin reload rebuild path, but we MUST NOT touch ``session`` again
until any in-flight session work owned by the speculative build has
fully settled :class:`sqlalchemy.ext.asyncio.AsyncSession` is not
concurrency-safe and the same hazard cost us a hard ``InvalidRequestError``
earlier in this PR (see ``connector_service`` parallel-gather revert).
We simply ``await`` the task and swallow any exception: in this path the
build's outcome is irrelevant — success populates the agent cache (a free
side effect), failure is discarded. The wasted CPU is acceptable since
429 fallbacks are rare and the original sequential code also paid the
full build cost on the same path.
"""
with contextlib.suppress(BaseException):
await task
def _classify_stream_exception( def _classify_stream_exception(
exc: Exception, exc: Exception,
*, *,
@ -1237,39 +1164,6 @@ async def stream_new_chat(
yield streaming_service.format_done() yield streaming_service.format_done()
return return
# Auto-mode preflight ping. Runs ONLY for thread-pinned auto cfgs
# (negative ids selected via ``resolve_or_get_pinned_llm_config_id``)
# whose health hasn't already been confirmed within the TTL window.
# Detecting a 429 here lets us repin BEFORE the planner/classifier/
# title-generation LLM calls fan out and each independently hit the
# same upstream rate limit.
#
# PERF: preflight is a network round-trip to the LLM provider (~1-5s)
# and is independent of the agent build (CPU-bound, ~5-7s). They used
# to run sequentially → ``preflight + build`` on cold cache = 11.5s.
# We now kick off preflight as a background task FIRST, then run the
# synchronous setup work and the agent build in parallel. In the
# success path (the common case) total wall time drops to roughly
# ``max(preflight, build)`` — the preflight finishes during the
# agent compile and we just consume its result. In the rare 429
# path the speculative build is awaited to completion (so its
# session usage is fully released) via
# :func:`_settle_speculative_agent_build`, then discarded, and
# we fall back to the original repin-and-rebuild flow.
preflight_needed = (
requested_llm_config_id == 0
and llm_config_id < 0
and not is_recently_healthy(llm_config_id)
)
preflight_task: asyncio.Task[None] | None = None
_t_preflight = 0.0
if preflight_needed:
_t_preflight = time.perf_counter()
preflight_task = asyncio.create_task(
_preflight_llm(llm),
name=f"auto_pin_preflight:{llm_config_id}",
)
# Create connector service # Create connector service
_t0 = time.perf_counter() _t0 = time.perf_counter()
connector_service = ConnectorService(session, search_space_id=search_space_id) connector_service = ConnectorService(session, search_space_id=search_space_id)
@ -1303,136 +1197,26 @@ async def stream_new_chat(
if use_multi_agent if use_multi_agent
else create_surfsense_deep_agent else create_surfsense_deep_agent
) )
# Speculative agent build — runs in parallel with the preflight # Build the agent inline. Provider 429s surface through the
# task (if any). Built with the *current* ``llm`` / ``agent_config``; # in-stream recovery loop below (``_is_provider_rate_limited``),
# if preflight reports 429 we will discard this future and rebuild # which repins the thread to an eligible alternative config and
# against the freshly pinned config below. # rebuilds the agent before the user sees any output.
agent_build_task = asyncio.create_task( agent = await _build_main_agent_for_thread(
_build_main_agent_for_thread( agent_factory,
agent_factory, llm=llm,
llm=llm, search_space_id=search_space_id,
search_space_id=search_space_id, db_session=session,
db_session=session, connector_service=connector_service,
connector_service=connector_service, checkpointer=checkpointer,
checkpointer=checkpointer, user_id=user_id,
user_id=user_id, thread_id=chat_id,
thread_id=chat_id, agent_config=agent_config,
agent_config=agent_config, firecrawl_api_key=firecrawl_api_key,
firecrawl_api_key=firecrawl_api_key, thread_visibility=visibility,
thread_visibility=visibility, filesystem_selection=filesystem_selection,
filesystem_selection=filesystem_selection, disabled_tools=disabled_tools,
disabled_tools=disabled_tools, mentioned_document_ids=mentioned_document_ids,
mentioned_document_ids=mentioned_document_ids,
),
name="agent_build:stream_new_chat",
) )
agent: Any = None
if preflight_task is not None:
try:
await preflight_task
mark_healthy(llm_config_id)
_perf_log.info(
"[stream_new_chat] auto_pin_preflight ok config_id=%s took=%.3fs (parallel)",
llm_config_id,
time.perf_counter() - _t_preflight,
)
except Exception as preflight_exc:
# Both branches below need the session: the non-429 path
# may unwind via cleanup that uses ``session``, and the
# 429 path explicitly calls ``resolve_or_get_pinned_llm_config_id``
# against it. Wait for the speculative build to release its
# session usage before we proceed.
await _settle_speculative_agent_build(agent_build_task)
if not _is_provider_rate_limited(preflight_exc):
raise
# 429: speculative agent is discarded; run the original
# repin → reload → rebuild path against the freshly
# pinned config.
previous_config_id = llm_config_id
mark_runtime_cooldown(
previous_config_id, reason="preflight_rate_limited"
)
try:
llm_config_id = (
await resolve_or_get_pinned_llm_config_id(
session,
thread_id=chat_id,
search_space_id=search_space_id,
user_id=user_id,
selected_llm_config_id=0,
exclude_config_ids={previous_config_id},
requires_image_input=_requires_image_input,
)
).resolved_llm_config_id
except ValueError as pin_error:
yield _emit_stream_error(
message=str(pin_error),
error_kind="server_error",
error_code="SERVER_ERROR",
)
yield streaming_service.format_done()
return
llm, agent_config, llm_load_error = await _load_llm_bundle(
llm_config_id
)
if llm_load_error or not llm:
yield _emit_stream_error(
message=llm_load_error or "Failed to create LLM instance",
error_kind="server_error",
error_code="SERVER_ERROR",
)
yield streaming_service.format_done()
return
# Trust the freshly-resolved cfg for the remainder of this
# turn rather than recursing into another preflight; the
# in-stream 429 recovery loop is still in place as the
# safety net if even this fallback hits an upstream cap.
mark_healthy(llm_config_id)
_log_chat_stream_error(
flow=flow,
error_kind="rate_limited",
error_code="RATE_LIMITED",
severity="info",
is_expected=True,
request_id=request_id,
thread_id=chat_id,
search_space_id=search_space_id,
user_id=user_id,
message=(
"Auto-pinned model failed preflight; switched to another "
"eligible model and continuing."
),
extra={
"auto_runtime_recover": True,
"preflight": True,
"previous_config_id": previous_config_id,
"fallback_config_id": llm_config_id,
},
)
# Rebuild against the new llm/agent_config. Sequential
# here because we no longer have anything to overlap with.
agent = await agent_factory(
llm=llm,
search_space_id=search_space_id,
db_session=session,
connector_service=connector_service,
checkpointer=checkpointer,
user_id=user_id,
thread_id=chat_id,
agent_config=agent_config,
firecrawl_api_key=firecrawl_api_key,
thread_visibility=visibility,
disabled_tools=disabled_tools,
mentioned_document_ids=mentioned_document_ids,
filesystem_selection=filesystem_selection,
)
if agent is None:
# Either no preflight was needed, or preflight succeeded —
# in both cases the speculative build is the agent we want.
agent = await agent_build_task
_perf_log.info( _perf_log.info(
"[stream_new_chat] Agent created in %.3fs", time.perf_counter() - _t0 "[stream_new_chat] Agent created in %.3fs", time.perf_counter() - _t0
) )
@ -2678,25 +2462,6 @@ async def stream_resume_chat(
yield streaming_service.format_done() yield streaming_service.format_done()
return return
# Auto-mode preflight ping (resume path). Mirrors ``stream_new_chat``:
# one cheap probe before the agent is rebuilt so a 429'd pin gets
# repinned without burning planner/classifier/title calls first.
# See ``stream_new_chat`` for the full rationale on the speculative
# parallel build pattern below.
preflight_needed = (
requested_llm_config_id == 0
and llm_config_id < 0
and not is_recently_healthy(llm_config_id)
)
preflight_task: asyncio.Task[None] | None = None
_t_preflight = 0.0
if preflight_needed:
_t_preflight = time.perf_counter()
preflight_task = asyncio.create_task(
_preflight_llm(llm),
name=f"auto_pin_preflight_resume:{llm_config_id}",
)
_t0 = time.perf_counter() _t0 = time.perf_counter()
connector_service = ConnectorService(session, search_space_id=search_space_id) connector_service = ConnectorService(session, search_space_id=search_space_id)
@ -2726,115 +2491,25 @@ async def stream_resume_chat(
if _app_config.MULTI_AGENT_CHAT_ENABLED if _app_config.MULTI_AGENT_CHAT_ENABLED
else create_surfsense_deep_agent else create_surfsense_deep_agent
) )
agent_build_task = asyncio.create_task( # Build the agent inline. Provider 429s are handled by the
_build_main_agent_for_thread( # in-stream recovery loop, which repins to an eligible
agent_factory, # alternative config and rebuilds the agent before the user sees
llm=llm, # any output.
search_space_id=search_space_id, agent = await _build_main_agent_for_thread(
db_session=session, agent_factory,
connector_service=connector_service, llm=llm,
checkpointer=checkpointer, search_space_id=search_space_id,
user_id=user_id, db_session=session,
thread_id=chat_id, connector_service=connector_service,
agent_config=agent_config, checkpointer=checkpointer,
firecrawl_api_key=firecrawl_api_key, user_id=user_id,
thread_visibility=visibility, thread_id=chat_id,
filesystem_selection=filesystem_selection, agent_config=agent_config,
disabled_tools=disabled_tools, firecrawl_api_key=firecrawl_api_key,
), thread_visibility=visibility,
name="agent_build:stream_resume", filesystem_selection=filesystem_selection,
disabled_tools=disabled_tools,
) )
agent: Any = None
if preflight_task is not None:
try:
await preflight_task
mark_healthy(llm_config_id)
_perf_log.info(
"[stream_resume] auto_pin_preflight ok config_id=%s took=%.3fs (parallel)",
llm_config_id,
time.perf_counter() - _t_preflight,
)
except Exception as preflight_exc:
# Same session-safety rationale as ``stream_new_chat``.
await _settle_speculative_agent_build(agent_build_task)
if not _is_provider_rate_limited(preflight_exc):
raise
previous_config_id = llm_config_id
mark_runtime_cooldown(
previous_config_id, reason="preflight_rate_limited"
)
try:
llm_config_id = (
await resolve_or_get_pinned_llm_config_id(
session,
thread_id=chat_id,
search_space_id=search_space_id,
user_id=user_id,
selected_llm_config_id=0,
exclude_config_ids={previous_config_id},
)
).resolved_llm_config_id
except ValueError as pin_error:
yield _emit_stream_error(
message=str(pin_error),
error_kind="server_error",
error_code="SERVER_ERROR",
)
yield streaming_service.format_done()
return
llm, agent_config, llm_load_error = await _load_llm_bundle(
llm_config_id
)
if llm_load_error or not llm:
yield _emit_stream_error(
message=llm_load_error or "Failed to create LLM instance",
error_kind="server_error",
error_code="SERVER_ERROR",
)
yield streaming_service.format_done()
return
mark_healthy(llm_config_id)
_log_chat_stream_error(
flow="resume",
error_kind="rate_limited",
error_code="RATE_LIMITED",
severity="info",
is_expected=True,
request_id=request_id,
thread_id=chat_id,
search_space_id=search_space_id,
user_id=user_id,
message=(
"Auto-pinned model failed preflight; switched to another "
"eligible model and continuing."
),
extra={
"auto_runtime_recover": True,
"preflight": True,
"previous_config_id": previous_config_id,
"fallback_config_id": llm_config_id,
},
)
agent = await _build_main_agent_for_thread(
agent_factory,
llm=llm,
search_space_id=search_space_id,
db_session=session,
connector_service=connector_service,
checkpointer=checkpointer,
user_id=user_id,
thread_id=chat_id,
agent_config=agent_config,
firecrawl_api_key=firecrawl_api_key,
thread_visibility=visibility,
filesystem_selection=filesystem_selection,
disabled_tools=disabled_tools,
)
if agent is None:
agent = await agent_build_task
_perf_log.info( _perf_log.info(
"[stream_resume] Agent created in %.3fs", time.perf_counter() - _t0 "[stream_resume] Agent created in %.3fs", time.perf_counter() - _t0
) )

View file

@ -209,128 +209,6 @@ def test_stream_exception_classifies_openrouter_429_payload():
assert extra is None assert extra is None
@pytest.mark.asyncio
async def test_preflight_swallows_non_rate_limit_errors_and_re_raises_429(monkeypatch):
"""``_preflight_llm`` is best-effort.
- On rate-limit shaped exceptions (provider 429) it MUST re-raise so the
caller can drive the cooldown/repin branch.
- On any other transient failure it MUST swallow the error so the normal
stream path continues without surfacing preflight noise to the user.
"""
from types import SimpleNamespace
from app.tasks.chat.stream_new_chat import _preflight_llm
class _RateLimitedError(Exception):
"""Class-name carries 'RateLimit' so _is_provider_rate_limited triggers."""
rate_calls: list[dict] = []
other_calls: list[dict] = []
async def _fake_acompletion_429(**kwargs):
rate_calls.append(kwargs)
raise _RateLimitedError("simulated 429")
async def _fake_acompletion_other(**kwargs):
other_calls.append(kwargs)
raise RuntimeError("some unrelated transient failure")
fake_llm = SimpleNamespace(
model="openrouter/google/gemma-4-31b-it:free",
api_key="test",
api_base=None,
)
import litellm # type: ignore[import-not-found]
monkeypatch.setattr(litellm, "acompletion", _fake_acompletion_429)
with pytest.raises(_RateLimitedError):
await _preflight_llm(fake_llm)
assert len(rate_calls) == 1
assert rate_calls[0]["max_tokens"] == 1
assert rate_calls[0]["stream"] is False
monkeypatch.setattr(litellm, "acompletion", _fake_acompletion_other)
# MUST NOT raise: non-rate-limit failures are swallowed.
await _preflight_llm(fake_llm)
assert len(other_calls) == 1
@pytest.mark.asyncio
async def test_preflight_skipped_for_auto_router_model():
"""Router-mode ``model='auto'`` has no single deployment to ping; the
LiteLLM router itself owns per-deployment rate-limit accounting, so the
preflight helper must short-circuit instead of issuing a probe."""
from types import SimpleNamespace
from app.tasks.chat.stream_new_chat import _preflight_llm
fake_llm = SimpleNamespace(model="auto", api_key="x", api_base=None)
# Should return without raising or making any LiteLLM call.
await _preflight_llm(fake_llm)
@pytest.mark.asyncio
async def test_settle_speculative_agent_build_swallows_exceptions():
"""``_settle_speculative_agent_build`` MUST always return cleanly so the
caller can safely re-touch the request-scoped session afterwards.
The helper guards the parallel preflight + agent-build path: when the
speculative build is being discarded (429 or non-429 preflight failure)
we await it solely to release any in-flight ``AsyncSession`` usage
the build's outcome is irrelevant. Any exception (including
``CancelledError``) leaking out would skip the caller's recovery flow
and re-introduce the very session-concurrency hazard the helper exists
to prevent.
"""
import asyncio
from app.tasks.chat.stream_new_chat import _settle_speculative_agent_build
async def _raises() -> None:
raise RuntimeError("speculative build crashed")
async def _succeeds() -> str:
return "agent"
async def _slow() -> None:
await asyncio.sleep(0.05)
for coro in (_raises(), _succeeds(), _slow()):
task = asyncio.create_task(coro)
await _settle_speculative_agent_build(task)
assert task.done()
@pytest.mark.asyncio
async def test_settle_speculative_agent_build_handles_already_done_task():
"""Done tasks (success or failure) must still be settled without raising."""
import asyncio
from app.tasks.chat.stream_new_chat import _settle_speculative_agent_build
async def _ok() -> str:
return "ok"
async def _bad() -> None:
raise ValueError("nope")
ok_task = asyncio.create_task(_ok())
bad_task = asyncio.create_task(_bad())
# Drive both to completion before settling.
await asyncio.sleep(0)
await asyncio.sleep(0)
await _settle_speculative_agent_build(ok_task)
await _settle_speculative_agent_build(bad_task)
assert ok_task.result() == "ok"
# ``bad_task`` exception was consumed by the settle helper; calling
# ``.exception()`` after the fact must still return the original error
# (the helper observes it but doesn't clear it).
assert isinstance(bad_task.exception(), ValueError)
def test_stream_exception_classifies_thread_busy(): def test_stream_exception_classifies_thread_busy():
exc = BusyError(request_id="thread-123") exc = BusyError(request_id="thread-123")
kind, code, severity, is_expected, user_message, extra = _classify_stream_exception( kind, code, severity, is_expected, user_message, extra = _classify_stream_exception(