diff --git a/surfsense_backend/app/tasks/chat/stream_new_chat.py b/surfsense_backend/app/tasks/chat/stream_new_chat.py index 3d639affb..da84e7350 100644 --- a/surfsense_backend/app/tasks/chat/stream_new_chat.py +++ b/surfsense_backend/app/tasks/chat/stream_new_chat.py @@ -64,8 +64,6 @@ from app.db import ( ) from app.prompts import TITLE_GENERATION_PROMPT from app.services.auto_model_pin_service import ( - is_recently_healthy, - mark_healthy, mark_runtime_cooldown, resolve_or_get_pinned_llm_config_id, ) @@ -502,54 +500,6 @@ def _is_provider_rate_limited(exc: BaseException) -> bool: ) -_PREFLIGHT_TIMEOUT_SEC: float = 2.5 -_PREFLIGHT_MAX_TOKENS: int = 1 - - -async def _preflight_llm(llm: Any) -> None: - """Issue a minimal completion to confirm the pinned model isn't 429'ing. - - Used before agent build / planner / classifier / title-gen so a known-bad - free OpenRouter deployment is detected and repinned before it cascades - into multiple wasted internal calls. The probe is intentionally cheap: - one token, low timeout, tagged ``surfsense:internal`` so token tracking - and SSE pipelines treat it as overhead rather than user output. - - Raises the original exception when the provider responds with a - rate-limit-shaped error so the caller can drive the cooldown/repin - branch via :func:`_is_provider_rate_limited`. Other transient failures - are swallowed — the caller continues to the normal stream path and the - in-stream recovery loop remains the safety net. - """ - from litellm import acompletion - - model = getattr(llm, "model", None) - if not model or model == "auto": - # Auto-mode router doesn't have a single deployment to ping; the - # router itself handles per-deployment rate-limit accounting. - return - - try: - await acompletion( - model=model, - messages=[{"role": "user", "content": "ping"}], - api_key=getattr(llm, "api_key", None), - api_base=getattr(llm, "api_base", None), - max_tokens=_PREFLIGHT_MAX_TOKENS, - timeout=_PREFLIGHT_TIMEOUT_SEC, - stream=False, - metadata={"tags": ["surfsense:internal", "auto-pin-preflight"]}, - ) - except Exception as exc: - if _is_provider_rate_limited(exc): - raise - logging.getLogger(__name__).debug( - "auto_pin_preflight non_rate_limit_error model=%s err=%s", - model, - exc, - ) - - async def _build_main_agent_for_thread( agent_factory: Any, *, @@ -567,9 +517,9 @@ async def _build_main_agent_for_thread( disabled_tools: list[str] | None = None, mentioned_document_ids: list[int] | None = None, ) -> Any: - """Single (re)build path so the agent factory cannot drift across - initial build, preflight repin, and mid-stream 429 recovery for one - ``thread_id``: a graph swap mid-turn would corrupt checkpointer state.""" + """Single (re)build path so the agent factory cannot drift across the + initial build and mid-stream 429 recovery for one ``thread_id``: a + graph swap mid-turn would corrupt checkpointer state.""" return await agent_factory( llm=llm, search_space_id=search_space_id, @@ -587,29 +537,6 @@ async def _build_main_agent_for_thread( ) -async def _settle_speculative_agent_build(task: asyncio.Task[Any]) -> None: - """Wait for a discarded speculative agent build to release shared state. - - Used by the parallel preflight + agent-build path. The speculative build - closes over the request-scoped ``AsyncSession`` (for the brief connector - discovery / tool-factory window before its CPU work moves into a worker - thread). If preflight reports a 429 we want to fall back to the original - repin → reload → rebuild path, but we MUST NOT touch ``session`` again - until any in-flight session work owned by the speculative build has - fully settled — :class:`sqlalchemy.ext.asyncio.AsyncSession` is not - concurrency-safe and the same hazard cost us a hard ``InvalidRequestError`` - earlier in this PR (see ``connector_service`` parallel-gather revert). - - We simply ``await`` the task and swallow any exception: in this path the - build's outcome is irrelevant — success populates the agent cache (a free - side effect), failure is discarded. The wasted CPU is acceptable since - 429 fallbacks are rare and the original sequential code also paid the - full build cost on the same path. - """ - with contextlib.suppress(BaseException): - await task - - def _classify_stream_exception( exc: Exception, *, @@ -1237,39 +1164,6 @@ async def stream_new_chat( yield streaming_service.format_done() return - # Auto-mode preflight ping. Runs ONLY for thread-pinned auto cfgs - # (negative ids selected via ``resolve_or_get_pinned_llm_config_id``) - # whose health hasn't already been confirmed within the TTL window. - # Detecting a 429 here lets us repin BEFORE the planner/classifier/ - # title-generation LLM calls fan out and each independently hit the - # same upstream rate limit. - # - # PERF: preflight is a network round-trip to the LLM provider (~1-5s) - # and is independent of the agent build (CPU-bound, ~5-7s). They used - # to run sequentially → ``preflight + build`` on cold cache = 11.5s. - # We now kick off preflight as a background task FIRST, then run the - # synchronous setup work and the agent build in parallel. In the - # success path (the common case) total wall time drops to roughly - # ``max(preflight, build)`` — the preflight finishes during the - # agent compile and we just consume its result. In the rare 429 - # path the speculative build is awaited to completion (so its - # session usage is fully released) via - # :func:`_settle_speculative_agent_build`, then discarded, and - # we fall back to the original repin-and-rebuild flow. - preflight_needed = ( - requested_llm_config_id == 0 - and llm_config_id < 0 - and not is_recently_healthy(llm_config_id) - ) - preflight_task: asyncio.Task[None] | None = None - _t_preflight = 0.0 - if preflight_needed: - _t_preflight = time.perf_counter() - preflight_task = asyncio.create_task( - _preflight_llm(llm), - name=f"auto_pin_preflight:{llm_config_id}", - ) - # Create connector service _t0 = time.perf_counter() connector_service = ConnectorService(session, search_space_id=search_space_id) @@ -1303,136 +1197,26 @@ async def stream_new_chat( if use_multi_agent else create_surfsense_deep_agent ) - # Speculative agent build — runs in parallel with the preflight - # task (if any). Built with the *current* ``llm`` / ``agent_config``; - # if preflight reports 429 we will discard this future and rebuild - # against the freshly pinned config below. - agent_build_task = asyncio.create_task( - _build_main_agent_for_thread( - agent_factory, - llm=llm, - search_space_id=search_space_id, - db_session=session, - connector_service=connector_service, - checkpointer=checkpointer, - user_id=user_id, - thread_id=chat_id, - agent_config=agent_config, - firecrawl_api_key=firecrawl_api_key, - thread_visibility=visibility, - filesystem_selection=filesystem_selection, - disabled_tools=disabled_tools, - mentioned_document_ids=mentioned_document_ids, - ), - name="agent_build:stream_new_chat", + # Build the agent inline. Provider 429s surface through the + # in-stream recovery loop below (``_is_provider_rate_limited``), + # which repins the thread to an eligible alternative config and + # rebuilds the agent before the user sees any output. + agent = await _build_main_agent_for_thread( + agent_factory, + llm=llm, + search_space_id=search_space_id, + db_session=session, + connector_service=connector_service, + checkpointer=checkpointer, + user_id=user_id, + thread_id=chat_id, + agent_config=agent_config, + firecrawl_api_key=firecrawl_api_key, + thread_visibility=visibility, + filesystem_selection=filesystem_selection, + disabled_tools=disabled_tools, + mentioned_document_ids=mentioned_document_ids, ) - - agent: Any = None - if preflight_task is not None: - try: - await preflight_task - mark_healthy(llm_config_id) - _perf_log.info( - "[stream_new_chat] auto_pin_preflight ok config_id=%s took=%.3fs (parallel)", - llm_config_id, - time.perf_counter() - _t_preflight, - ) - except Exception as preflight_exc: - # Both branches below need the session: the non-429 path - # may unwind via cleanup that uses ``session``, and the - # 429 path explicitly calls ``resolve_or_get_pinned_llm_config_id`` - # against it. Wait for the speculative build to release its - # session usage before we proceed. - await _settle_speculative_agent_build(agent_build_task) - if not _is_provider_rate_limited(preflight_exc): - raise - # 429: speculative agent is discarded; run the original - # repin → reload → rebuild path against the freshly - # pinned config. - previous_config_id = llm_config_id - mark_runtime_cooldown( - previous_config_id, reason="preflight_rate_limited" - ) - try: - llm_config_id = ( - await resolve_or_get_pinned_llm_config_id( - session, - thread_id=chat_id, - search_space_id=search_space_id, - user_id=user_id, - selected_llm_config_id=0, - exclude_config_ids={previous_config_id}, - requires_image_input=_requires_image_input, - ) - ).resolved_llm_config_id - except ValueError as pin_error: - yield _emit_stream_error( - message=str(pin_error), - error_kind="server_error", - error_code="SERVER_ERROR", - ) - yield streaming_service.format_done() - return - - llm, agent_config, llm_load_error = await _load_llm_bundle( - llm_config_id - ) - if llm_load_error or not llm: - yield _emit_stream_error( - message=llm_load_error or "Failed to create LLM instance", - error_kind="server_error", - error_code="SERVER_ERROR", - ) - yield streaming_service.format_done() - return - # Trust the freshly-resolved cfg for the remainder of this - # turn rather than recursing into another preflight; the - # in-stream 429 recovery loop is still in place as the - # safety net if even this fallback hits an upstream cap. - mark_healthy(llm_config_id) - _log_chat_stream_error( - flow=flow, - error_kind="rate_limited", - error_code="RATE_LIMITED", - severity="info", - is_expected=True, - request_id=request_id, - thread_id=chat_id, - search_space_id=search_space_id, - user_id=user_id, - message=( - "Auto-pinned model failed preflight; switched to another " - "eligible model and continuing." - ), - extra={ - "auto_runtime_recover": True, - "preflight": True, - "previous_config_id": previous_config_id, - "fallback_config_id": llm_config_id, - }, - ) - # Rebuild against the new llm/agent_config. Sequential - # here because we no longer have anything to overlap with. - agent = await agent_factory( - llm=llm, - search_space_id=search_space_id, - db_session=session, - connector_service=connector_service, - checkpointer=checkpointer, - user_id=user_id, - thread_id=chat_id, - agent_config=agent_config, - firecrawl_api_key=firecrawl_api_key, - thread_visibility=visibility, - disabled_tools=disabled_tools, - mentioned_document_ids=mentioned_document_ids, - filesystem_selection=filesystem_selection, - ) - - if agent is None: - # Either no preflight was needed, or preflight succeeded — - # in both cases the speculative build is the agent we want. - agent = await agent_build_task _perf_log.info( "[stream_new_chat] Agent created in %.3fs", time.perf_counter() - _t0 ) @@ -2678,25 +2462,6 @@ async def stream_resume_chat( yield streaming_service.format_done() return - # Auto-mode preflight ping (resume path). Mirrors ``stream_new_chat``: - # one cheap probe before the agent is rebuilt so a 429'd pin gets - # repinned without burning planner/classifier/title calls first. - # See ``stream_new_chat`` for the full rationale on the speculative - # parallel build pattern below. - preflight_needed = ( - requested_llm_config_id == 0 - and llm_config_id < 0 - and not is_recently_healthy(llm_config_id) - ) - preflight_task: asyncio.Task[None] | None = None - _t_preflight = 0.0 - if preflight_needed: - _t_preflight = time.perf_counter() - preflight_task = asyncio.create_task( - _preflight_llm(llm), - name=f"auto_pin_preflight_resume:{llm_config_id}", - ) - _t0 = time.perf_counter() connector_service = ConnectorService(session, search_space_id=search_space_id) @@ -2726,115 +2491,25 @@ async def stream_resume_chat( if _app_config.MULTI_AGENT_CHAT_ENABLED else create_surfsense_deep_agent ) - agent_build_task = asyncio.create_task( - _build_main_agent_for_thread( - agent_factory, - llm=llm, - search_space_id=search_space_id, - db_session=session, - connector_service=connector_service, - checkpointer=checkpointer, - user_id=user_id, - thread_id=chat_id, - agent_config=agent_config, - firecrawl_api_key=firecrawl_api_key, - thread_visibility=visibility, - filesystem_selection=filesystem_selection, - disabled_tools=disabled_tools, - ), - name="agent_build:stream_resume", + # Build the agent inline. Provider 429s are handled by the + # in-stream recovery loop, which repins to an eligible + # alternative config and rebuilds the agent before the user sees + # any output. + agent = await _build_main_agent_for_thread( + agent_factory, + llm=llm, + search_space_id=search_space_id, + db_session=session, + connector_service=connector_service, + checkpointer=checkpointer, + user_id=user_id, + thread_id=chat_id, + agent_config=agent_config, + firecrawl_api_key=firecrawl_api_key, + thread_visibility=visibility, + filesystem_selection=filesystem_selection, + disabled_tools=disabled_tools, ) - - agent: Any = None - if preflight_task is not None: - try: - await preflight_task - mark_healthy(llm_config_id) - _perf_log.info( - "[stream_resume] auto_pin_preflight ok config_id=%s took=%.3fs (parallel)", - llm_config_id, - time.perf_counter() - _t_preflight, - ) - except Exception as preflight_exc: - # Same session-safety rationale as ``stream_new_chat``. - await _settle_speculative_agent_build(agent_build_task) - if not _is_provider_rate_limited(preflight_exc): - raise - previous_config_id = llm_config_id - mark_runtime_cooldown( - previous_config_id, reason="preflight_rate_limited" - ) - try: - llm_config_id = ( - await resolve_or_get_pinned_llm_config_id( - session, - thread_id=chat_id, - search_space_id=search_space_id, - user_id=user_id, - selected_llm_config_id=0, - exclude_config_ids={previous_config_id}, - ) - ).resolved_llm_config_id - except ValueError as pin_error: - yield _emit_stream_error( - message=str(pin_error), - error_kind="server_error", - error_code="SERVER_ERROR", - ) - yield streaming_service.format_done() - return - - llm, agent_config, llm_load_error = await _load_llm_bundle( - llm_config_id - ) - if llm_load_error or not llm: - yield _emit_stream_error( - message=llm_load_error or "Failed to create LLM instance", - error_kind="server_error", - error_code="SERVER_ERROR", - ) - yield streaming_service.format_done() - return - mark_healthy(llm_config_id) - _log_chat_stream_error( - flow="resume", - error_kind="rate_limited", - error_code="RATE_LIMITED", - severity="info", - is_expected=True, - request_id=request_id, - thread_id=chat_id, - search_space_id=search_space_id, - user_id=user_id, - message=( - "Auto-pinned model failed preflight; switched to another " - "eligible model and continuing." - ), - extra={ - "auto_runtime_recover": True, - "preflight": True, - "previous_config_id": previous_config_id, - "fallback_config_id": llm_config_id, - }, - ) - agent = await _build_main_agent_for_thread( - agent_factory, - llm=llm, - search_space_id=search_space_id, - db_session=session, - connector_service=connector_service, - checkpointer=checkpointer, - user_id=user_id, - thread_id=chat_id, - agent_config=agent_config, - firecrawl_api_key=firecrawl_api_key, - thread_visibility=visibility, - filesystem_selection=filesystem_selection, - disabled_tools=disabled_tools, - ) - - if agent is None: - agent = await agent_build_task _perf_log.info( "[stream_resume] Agent created in %.3fs", time.perf_counter() - _t0 ) diff --git a/surfsense_backend/tests/unit/test_stream_new_chat_contract.py b/surfsense_backend/tests/unit/test_stream_new_chat_contract.py index 208204ca9..19b06201f 100644 --- a/surfsense_backend/tests/unit/test_stream_new_chat_contract.py +++ b/surfsense_backend/tests/unit/test_stream_new_chat_contract.py @@ -209,128 +209,6 @@ def test_stream_exception_classifies_openrouter_429_payload(): assert extra is None -@pytest.mark.asyncio -async def test_preflight_swallows_non_rate_limit_errors_and_re_raises_429(monkeypatch): - """``_preflight_llm`` is best-effort. - - - On rate-limit shaped exceptions (provider 429) it MUST re-raise so the - caller can drive the cooldown/repin branch. - - On any other transient failure it MUST swallow the error so the normal - stream path continues without surfacing preflight noise to the user. - """ - from types import SimpleNamespace - - from app.tasks.chat.stream_new_chat import _preflight_llm - - class _RateLimitedError(Exception): - """Class-name carries 'RateLimit' so _is_provider_rate_limited triggers.""" - - rate_calls: list[dict] = [] - other_calls: list[dict] = [] - - async def _fake_acompletion_429(**kwargs): - rate_calls.append(kwargs) - raise _RateLimitedError("simulated 429") - - async def _fake_acompletion_other(**kwargs): - other_calls.append(kwargs) - raise RuntimeError("some unrelated transient failure") - - fake_llm = SimpleNamespace( - model="openrouter/google/gemma-4-31b-it:free", - api_key="test", - api_base=None, - ) - - import litellm # type: ignore[import-not-found] - - monkeypatch.setattr(litellm, "acompletion", _fake_acompletion_429) - with pytest.raises(_RateLimitedError): - await _preflight_llm(fake_llm) - assert len(rate_calls) == 1 - assert rate_calls[0]["max_tokens"] == 1 - assert rate_calls[0]["stream"] is False - - monkeypatch.setattr(litellm, "acompletion", _fake_acompletion_other) - # MUST NOT raise: non-rate-limit failures are swallowed. - await _preflight_llm(fake_llm) - assert len(other_calls) == 1 - - -@pytest.mark.asyncio -async def test_preflight_skipped_for_auto_router_model(): - """Router-mode ``model='auto'`` has no single deployment to ping; the - LiteLLM router itself owns per-deployment rate-limit accounting, so the - preflight helper must short-circuit instead of issuing a probe.""" - from types import SimpleNamespace - - from app.tasks.chat.stream_new_chat import _preflight_llm - - fake_llm = SimpleNamespace(model="auto", api_key="x", api_base=None) - # Should return without raising or making any LiteLLM call. - await _preflight_llm(fake_llm) - - -@pytest.mark.asyncio -async def test_settle_speculative_agent_build_swallows_exceptions(): - """``_settle_speculative_agent_build`` MUST always return cleanly so the - caller can safely re-touch the request-scoped session afterwards. - - The helper guards the parallel preflight + agent-build path: when the - speculative build is being discarded (429 or non-429 preflight failure) - we await it solely to release any in-flight ``AsyncSession`` usage — - the build's outcome is irrelevant. Any exception (including - ``CancelledError``) leaking out would skip the caller's recovery flow - and re-introduce the very session-concurrency hazard the helper exists - to prevent. - """ - import asyncio - - from app.tasks.chat.stream_new_chat import _settle_speculative_agent_build - - async def _raises() -> None: - raise RuntimeError("speculative build crashed") - - async def _succeeds() -> str: - return "agent" - - async def _slow() -> None: - await asyncio.sleep(0.05) - - for coro in (_raises(), _succeeds(), _slow()): - task = asyncio.create_task(coro) - await _settle_speculative_agent_build(task) - assert task.done() - - -@pytest.mark.asyncio -async def test_settle_speculative_agent_build_handles_already_done_task(): - """Done tasks (success or failure) must still be settled without raising.""" - import asyncio - - from app.tasks.chat.stream_new_chat import _settle_speculative_agent_build - - async def _ok() -> str: - return "ok" - - async def _bad() -> None: - raise ValueError("nope") - - ok_task = asyncio.create_task(_ok()) - bad_task = asyncio.create_task(_bad()) - # Drive both to completion before settling. - await asyncio.sleep(0) - await asyncio.sleep(0) - - await _settle_speculative_agent_build(ok_task) - await _settle_speculative_agent_build(bad_task) - assert ok_task.result() == "ok" - # ``bad_task`` exception was consumed by the settle helper; calling - # ``.exception()`` after the fact must still return the original error - # (the helper observes it but doesn't clear it). - assert isinstance(bad_task.exception(), ValueError) - - def test_stream_exception_classifies_thread_busy(): exc = BusyError(request_id="thread-123") kind, code, severity, is_expected, user_message, extra = _classify_stream_exception(