diff --git a/surfsense_backend/app/config/__init__.py b/surfsense_backend/app/config/__init__.py index 543524456..f6f0c7f62 100644 --- a/surfsense_backend/app/config/__init__.py +++ b/surfsense_backend/app/config/__init__.py @@ -490,12 +490,6 @@ class Config: ENABLE_DESKTOP_LOCAL_FILESYSTEM = ( os.getenv("ENABLE_DESKTOP_LOCAL_FILESYSTEM", "FALSE").upper() == "TRUE" ) - # Streaming entrypoint switch. Keep this at the route layer so orchestrator - # code stays free of legacy fallback branching. - ENABLE_CHAT_STREAM_ORCHESTRATOR = ( - os.getenv("SURFSENSE_ENABLE_CHAT_STREAM_ORCHESTRATOR", "TRUE").upper() - == "TRUE" - ) @classmethod def is_self_hosted(cls) -> bool: diff --git a/surfsense_backend/app/routes/new_chat_routes.py b/surfsense_backend/app/routes/new_chat_routes.py index e54497f93..743b5b849 100644 --- a/surfsense_backend/app/routes/new_chat_routes.py +++ b/surfsense_backend/app/routes/new_chat_routes.py @@ -72,13 +72,8 @@ from app.schemas.new_chat import ( TurnStatusResponse, ) from app.tasks.chat.stream_new_chat import ( - stream_new_chat as legacy_stream_new_chat, - stream_resume_chat as legacy_stream_resume_chat, -) -from app.tasks.chat.streaming.orchestration.orchestrator import ( - stream_chat, - stream_regenerate, - stream_resume, + stream_new_chat, + stream_resume_chat, ) from app.users import current_active_user from app.utils.perf import get_perf_logger @@ -98,10 +93,6 @@ TURN_CANCELLING_MAX_DELAY_MS = 1500 router = APIRouter() -def _use_streaming_orchestrator() -> bool: - return config.ENABLE_CHAT_STREAM_ORCHESTRATOR - - def _resolve_filesystem_selection( *, mode: str, @@ -1782,11 +1773,7 @@ async def handle_new_chat( ) return StreamingResponse( - ( - stream_chat - if _use_streaming_orchestrator() - else legacy_stream_new_chat - )( + stream_new_chat( user_query=request.user_query, search_space_id=request.search_space_id, chat_id=request.chat_id, @@ -2271,12 +2258,7 @@ async def regenerate_response( else None ) try: - regenerate_fn = ( - stream_regenerate - if _use_streaming_orchestrator() - else legacy_stream_new_chat - ) - async for chunk in regenerate_fn( + async for chunk in stream_new_chat( user_query=str(user_query_to_use), search_space_id=request.search_space_id, chat_id=thread_id, @@ -2408,11 +2390,7 @@ async def resume_chat( await session.close() return StreamingResponse( - ( - stream_resume - if _use_streaming_orchestrator() - else legacy_stream_resume_chat - )( + stream_resume_chat( chat_id=thread_id, search_space_id=request.search_space_id, decisions=decisions, diff --git a/surfsense_backend/app/schemas/new_chat.py b/surfsense_backend/app/schemas/new_chat.py index 95d183433..fe8dab076 100644 --- a/surfsense_backend/app/schemas/new_chat.py +++ b/surfsense_backend/app/schemas/new_chat.py @@ -380,7 +380,7 @@ class ResumeRequest(BaseModel): "/regenerate. Resume reuses the original interrupted user " "turn so the server does not write a new user message. " "Currently unused but accepted to keep request bodies " - "uniform across the three streaming entrypoints." + "uniform across new-message, regenerate, and resume stream routes." ), ) diff --git a/surfsense_backend/app/services/streaming/__init__.py b/surfsense_backend/app/services/streaming/__init__.py index 287d48a7a..3ec9b9cf1 100644 --- a/surfsense_backend/app/services/streaming/__init__.py +++ b/surfsense_backend/app/services/streaming/__init__.py @@ -4,7 +4,7 @@ Layout: * ``envelope/`` - SSE wire framing + ID generators * ``emitter/`` - identity of the agent that emitted an event + runtime registry * ``events/`` - one module per SSE event family -* ``service.py`` - composition root used by the orchestrator +* ``service.py`` - composition root used when emitting chat SSE * ``interrupt_correlation.py`` - id-aware lookup over LangGraph state Naming on the wire: @@ -13,8 +13,8 @@ Naming on the wire: * Every SurfSense-added field uses ``snake_case``, including the top-level ``emitted_by`` envelope and all inner ``data`` payloads. -Production keeps using ``app.services.new_streaming_service`` and -``app.tasks.chat.stream_new_chat`` until the cutover phase. +Production chat uses ``app.services.new_streaming_service`` from +``app.tasks.chat.stream_new_chat`` and related routes. """ from __future__ import annotations diff --git a/surfsense_backend/app/services/streaming/events/error.py b/surfsense_backend/app/services/streaming/events/error.py index cd190d1f4..a1e8e01ca 100644 --- a/surfsense_backend/app/services/streaming/events/error.py +++ b/surfsense_backend/app/services/streaming/events/error.py @@ -1,4 +1,4 @@ -"""Single terminal error path the orchestrator must route through.""" +"""Single terminal error path chat streaming must route through.""" from __future__ import annotations diff --git a/surfsense_backend/app/tasks/chat/streaming/__init__.py b/surfsense_backend/app/tasks/chat/streaming/__init__.py index bb06cc021..70c99342a 100644 --- a/surfsense_backend/app/tasks/chat/streaming/__init__.py +++ b/surfsense_backend/app/tasks/chat/streaming/__init__.py @@ -1,3 +1,3 @@ -"""Chat streaming orchestrator and event relay.""" +"""Chat streaming helpers (e.g. LangGraph → SSE relay under ``graph_stream``).""" from __future__ import annotations diff --git a/surfsense_backend/app/tasks/chat/streaming/agent_setup.py b/surfsense_backend/app/tasks/chat/streaming/agent_setup.py deleted file mode 100644 index f67c6ad65..000000000 --- a/surfsense_backend/app/tasks/chat/streaming/agent_setup.py +++ /dev/null @@ -1,92 +0,0 @@ -"""Agent setup helpers for orchestrated chat streaming.""" - -from __future__ import annotations - -import contextlib -import logging -from collections.abc import Callable -from typing import Any - -_PREFLIGHT_TIMEOUT_SEC: float = 2.5 -_PREFLIGHT_MAX_TOKENS: int = 1 - - -async def preflight_llm( - llm: Any, - *, - is_provider_rate_limited: Callable[[BaseException], bool], -) -> None: - """Issue a minimal completion probe to catch immediate provider 429s.""" - from litellm import acompletion - - model = getattr(llm, "model", None) - if not model or model == "auto": - return - - try: - await acompletion( - model=model, - messages=[{"role": "user", "content": "ping"}], - api_key=getattr(llm, "api_key", None), - api_base=getattr(llm, "api_base", None), - max_tokens=_PREFLIGHT_MAX_TOKENS, - timeout=_PREFLIGHT_TIMEOUT_SEC, - stream=False, - metadata={"tags": ["surfsense:internal", "auto-pin-preflight"]}, - ) - except Exception as exc: - if is_provider_rate_limited(exc): - raise - logging.getLogger(__name__).debug( - "auto_pin_preflight non_rate_limit_error model=%s err=%s", - model, - exc, - ) - - -async def build_main_agent_for_thread( - agent_factory: Any, - *, - llm: Any, - search_space_id: int, - db_session: Any, - connector_service: Any, - checkpointer: Any, - user_id: str | None, - thread_id: int | None, - agent_config: Any, - firecrawl_api_key: str | None, - thread_visibility: Any, - filesystem_selection: Any, - disabled_tools: list[str] | None = None, - mentioned_document_ids: list[int] | None = None, -) -> Any: - """Run one canonical agent-build call for a single thread.""" - return await agent_factory( - llm=llm, - search_space_id=search_space_id, - db_session=db_session, - connector_service=connector_service, - checkpointer=checkpointer, - user_id=user_id, - thread_id=thread_id, - agent_config=agent_config, - firecrawl_api_key=firecrawl_api_key, - thread_visibility=thread_visibility, - filesystem_selection=filesystem_selection, - disabled_tools=disabled_tools, - mentioned_document_ids=mentioned_document_ids, - ) - - -async def settle_speculative_agent_build(task: Any) -> None: - """Wait for a discarded speculative build and swallow its outcome.""" - with contextlib.suppress(BaseException): - await task - - -__all__ = [ - "build_main_agent_for_thread", - "preflight_llm", - "settle_speculative_agent_build", -] diff --git a/surfsense_backend/app/tasks/chat/streaming/graph_stream/__init__.py b/surfsense_backend/app/tasks/chat/streaming/graph_stream/__init__.py new file mode 100644 index 000000000..e3bf0426c --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/graph_stream/__init__.py @@ -0,0 +1,21 @@ +"""LangGraph ``astream_events`` → SSE (``stream_output`` + ``StreamingResult``). + +Imports are lazy to avoid a circular import with ``relay.event_relay``. +""" + +from __future__ import annotations + +__all__ = ["StreamingResult", "stream_output"] + + +def __getattr__(name: str): + if name == "stream_output": + from app.tasks.chat.streaming.graph_stream.event_stream import stream_output + + return stream_output + if name == "StreamingResult": + from app.tasks.chat.streaming.graph_stream.result import StreamingResult + + return StreamingResult + msg = f"module {__name__!r} has no attribute {name!r}" + raise AttributeError(msg) diff --git a/surfsense_backend/app/tasks/chat/streaming/orchestration/event_stream.py b/surfsense_backend/app/tasks/chat/streaming/graph_stream/event_stream.py similarity index 92% rename from surfsense_backend/app/tasks/chat/streaming/orchestration/event_stream.py rename to surfsense_backend/app/tasks/chat/streaming/graph_stream/event_stream.py index fc8c13027..9142dd914 100644 --- a/surfsense_backend/app/tasks/chat/streaming/orchestration/event_stream.py +++ b/surfsense_backend/app/tasks/chat/streaming/graph_stream/event_stream.py @@ -1,4 +1,4 @@ -"""Run LangGraph event streams through the EventRelay.""" +"""Run LangGraph event streams through ``EventRelay``.""" from __future__ import annotations @@ -6,7 +6,7 @@ from collections.abc import AsyncIterator from typing import Any from app.agents.new_chat.feature_flags import get_flags -from app.tasks.chat.streaming.orchestration.output import StreamingResult +from app.tasks.chat.streaming.graph_stream.result import StreamingResult from app.tasks.chat.streaming.relay.event_relay import EventRelay from app.tasks.chat.streaming.relay.state import AgentEventRelayState diff --git a/surfsense_backend/app/tasks/chat/streaming/orchestration/output.py b/surfsense_backend/app/tasks/chat/streaming/graph_stream/result.py similarity index 91% rename from surfsense_backend/app/tasks/chat/streaming/orchestration/output.py rename to surfsense_backend/app/tasks/chat/streaming/graph_stream/result.py index 60f8ee6ee..40404e9d0 100644 --- a/surfsense_backend/app/tasks/chat/streaming/orchestration/output.py +++ b/surfsense_backend/app/tasks/chat/streaming/graph_stream/result.py @@ -1,4 +1,4 @@ -"""Output facts collected while streaming one orchestrated agent turn.""" +"""Mutable facts collected while relaying one agent stream (``stream_output``).""" from __future__ import annotations @@ -26,4 +26,3 @@ class StreamingResult: commit_gate_reason: str = "" assistant_message_id: int | None = None content_builder: Any | None = field(default=None, repr=False) - diff --git a/surfsense_backend/app/tasks/chat/streaming/orchestration/__init__.py b/surfsense_backend/app/tasks/chat/streaming/orchestration/__init__.py deleted file mode 100644 index b1a201fd3..000000000 --- a/surfsense_backend/app/tasks/chat/streaming/orchestration/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -"""Composable orchestration pieces for chat streaming.""" - -from app.tasks.chat.streaming.orchestration.event_stream import stream_output -from app.tasks.chat.streaming.orchestration.input import StreamingContext -from app.tasks.chat.streaming.orchestration.output import StreamingResult - -__all__ = [ - "StreamingContext", - "StreamingResult", - "stream_output", -] diff --git a/surfsense_backend/app/tasks/chat/streaming/orchestration/input.py b/surfsense_backend/app/tasks/chat/streaming/orchestration/input.py deleted file mode 100644 index 45a33d435..000000000 --- a/surfsense_backend/app/tasks/chat/streaming/orchestration/input.py +++ /dev/null @@ -1,23 +0,0 @@ -"""Inputs for orchestrator-owned streaming execution.""" - -from __future__ import annotations - -from dataclasses import dataclass -from typing import Any - - -@dataclass(frozen=True) -class StreamingContext: - """Container for dependencies required by ``stream_output``.""" - - agent: Any - config: dict[str, Any] - input_data: Any - streaming_service: Any - step_prefix: str = "thinking" - initial_step_id: str | None = None - initial_step_title: str = "" - initial_step_items: list[str] | None = None - content_builder: Any | None = None - runtime_context: Any = None - diff --git a/surfsense_backend/app/tasks/chat/streaming/orchestration/orchestrator.py b/surfsense_backend/app/tasks/chat/streaming/orchestration/orchestrator.py deleted file mode 100644 index 80cae77a2..000000000 --- a/surfsense_backend/app/tasks/chat/streaming/orchestration/orchestrator.py +++ /dev/null @@ -1,261 +0,0 @@ -"""Top-level chat streaming entrypoints. -""" - -from __future__ import annotations - -from collections.abc import AsyncGenerator -from typing import Any, Literal - -from app.agents.new_chat.filesystem_selection import FilesystemSelection -from app.db import ChatVisibility -from app.tasks.chat.stream_new_chat import stream_new_chat, stream_resume_chat -from app.tasks.chat.streaming.orchestration.streaming_context import ( - build_chat_streaming_context, - build_regenerate_streaming_context, - build_resume_streaming_context, -) -from app.tasks.chat.streaming.orchestration.event_stream import stream_output -from app.tasks.chat.streaming.orchestration.input import StreamingContext -from app.tasks.chat.streaming.orchestration.output import StreamingResult - - -def _build_streaming_result( - *, - chat_id: int, - request_id: str | None, - filesystem_selection: FilesystemSelection | None, - suffix: str, -) -> StreamingResult: - return StreamingResult( - request_id=request_id, - turn_id=f"{chat_id}:{suffix}", - filesystem_mode=(filesystem_selection.mode.value if filesystem_selection else "cloud"), - client_platform=( - filesystem_selection.client_platform.value if filesystem_selection else "web" - ), - ) - - -async def _stream_output_with_streaming_context( - *, - streaming_context: StreamingContext, - result: StreamingResult, -) -> AsyncGenerator[str, None]: - async for frame in stream_output( - agent=streaming_context.agent, - config=streaming_context.config, - input_data=streaming_context.input_data, - streaming_service=streaming_context.streaming_service, - result=result, - step_prefix=streaming_context.step_prefix, - initial_step_id=streaming_context.initial_step_id, - initial_step_title=streaming_context.initial_step_title, - initial_step_items=streaming_context.initial_step_items, - content_builder=streaming_context.content_builder, - runtime_context=streaming_context.runtime_context, - ): - yield frame - - -async def stream_chat( - *, - user_query: str, - search_space_id: int, - chat_id: int, - user_id: str | None = None, - llm_config_id: int = -1, - mentioned_document_ids: list[int] | None = None, - mentioned_surfsense_doc_ids: list[int] | None = None, - mentioned_documents: list[dict[str, Any]] | None = None, - checkpoint_id: str | None = None, - needs_history_bootstrap: bool = False, - thread_visibility: ChatVisibility | None = None, - current_user_display_name: str | None = None, - disabled_tools: list[str] | None = None, - filesystem_selection: FilesystemSelection | None = None, - request_id: str | None = None, - user_image_data_urls: list[str] | None = None, - streaming_context: StreamingContext | None = None, -) -> AsyncGenerator[str, None]: - """Stream a new chat turn through the current production pipeline.""" - if streaming_context is None: - streaming_context = await build_chat_streaming_context( - user_query=user_query, - search_space_id=search_space_id, - chat_id=chat_id, - user_id=user_id, - llm_config_id=llm_config_id, - mentioned_document_ids=mentioned_document_ids, - mentioned_surfsense_doc_ids=mentioned_surfsense_doc_ids, - checkpoint_id=checkpoint_id, - needs_history_bootstrap=needs_history_bootstrap, - thread_visibility=thread_visibility, - current_user_display_name=current_user_display_name, - disabled_tools=disabled_tools, - filesystem_selection=filesystem_selection, - request_id=request_id, - user_image_data_urls=user_image_data_urls, - ) - if streaming_context is not None: - result = _build_streaming_result( - chat_id=chat_id, - request_id=request_id, - filesystem_selection=filesystem_selection, - suffix="orchestrator", - ) - async for frame in _stream_output_with_streaming_context( - streaming_context=streaming_context, - result=result, - ): - yield frame - return - - async for chunk in stream_new_chat( - user_query=user_query, - search_space_id=search_space_id, - chat_id=chat_id, - user_id=user_id, - llm_config_id=llm_config_id, - mentioned_document_ids=mentioned_document_ids, - mentioned_surfsense_doc_ids=mentioned_surfsense_doc_ids, - mentioned_documents=mentioned_documents, - checkpoint_id=checkpoint_id, - needs_history_bootstrap=needs_history_bootstrap, - thread_visibility=thread_visibility, - current_user_display_name=current_user_display_name, - disabled_tools=disabled_tools, - filesystem_selection=filesystem_selection, - request_id=request_id, - user_image_data_urls=user_image_data_urls, - ): - yield chunk - - -async def stream_resume( - *, - chat_id: int, - search_space_id: int, - decisions: list[dict], - user_id: str | None = None, - llm_config_id: int = -1, - thread_visibility: ChatVisibility | None = None, - filesystem_selection: FilesystemSelection | None = None, - request_id: str | None = None, - disabled_tools: list[str] | None = None, - streaming_context: StreamingContext | None = None, -) -> AsyncGenerator[str, None]: - """Resume an interrupted chat turn through the current production pipeline.""" - if streaming_context is None: - streaming_context = await build_resume_streaming_context( - chat_id=chat_id, - search_space_id=search_space_id, - decisions=decisions, - user_id=user_id, - llm_config_id=llm_config_id, - thread_visibility=thread_visibility, - filesystem_selection=filesystem_selection, - request_id=request_id, - disabled_tools=disabled_tools, - ) - if streaming_context is not None: - result = _build_streaming_result( - chat_id=chat_id, - request_id=request_id, - filesystem_selection=filesystem_selection, - suffix="orchestrator-resume", - ) - async for frame in _stream_output_with_streaming_context( - streaming_context=streaming_context, - result=result, - ): - yield frame - return - - async for chunk in stream_resume_chat( - chat_id=chat_id, - search_space_id=search_space_id, - decisions=decisions, - user_id=user_id, - llm_config_id=llm_config_id, - thread_visibility=thread_visibility, - filesystem_selection=filesystem_selection, - request_id=request_id, - disabled_tools=disabled_tools, - ): - yield chunk - - -async def stream_regenerate( - *, - user_query: str, - search_space_id: int, - chat_id: int, - user_id: str | None = None, - llm_config_id: int = -1, - mentioned_document_ids: list[int] | None = None, - mentioned_surfsense_doc_ids: list[int] | None = None, - mentioned_documents: list[dict[str, Any]] | None = None, - checkpoint_id: str | None = None, - needs_history_bootstrap: bool = False, - thread_visibility: ChatVisibility | None = None, - current_user_display_name: str | None = None, - disabled_tools: list[str] | None = None, - filesystem_selection: FilesystemSelection | None = None, - request_id: str | None = None, - user_image_data_urls: list[str] | None = None, - flow: Literal["new", "regenerate"] = "regenerate", - streaming_context: StreamingContext | None = None, -) -> AsyncGenerator[str, None]: - """Regenerate an assistant turn through the current production pipeline.""" - if streaming_context is None: - streaming_context = await build_regenerate_streaming_context( - user_query=user_query, - search_space_id=search_space_id, - chat_id=chat_id, - user_id=user_id, - llm_config_id=llm_config_id, - mentioned_document_ids=mentioned_document_ids, - mentioned_surfsense_doc_ids=mentioned_surfsense_doc_ids, - checkpoint_id=checkpoint_id, - needs_history_bootstrap=needs_history_bootstrap, - thread_visibility=thread_visibility, - current_user_display_name=current_user_display_name, - disabled_tools=disabled_tools, - filesystem_selection=filesystem_selection, - request_id=request_id, - user_image_data_urls=user_image_data_urls, - ) - if streaming_context is not None: - result = _build_streaming_result( - chat_id=chat_id, - request_id=request_id, - filesystem_selection=filesystem_selection, - suffix="orchestrator-regenerate", - ) - async for frame in _stream_output_with_streaming_context( - streaming_context=streaming_context, - result=result, - ): - yield frame - return - - async for chunk in stream_new_chat( - user_query=user_query, - search_space_id=search_space_id, - chat_id=chat_id, - user_id=user_id, - llm_config_id=llm_config_id, - mentioned_document_ids=mentioned_document_ids, - mentioned_surfsense_doc_ids=mentioned_surfsense_doc_ids, - mentioned_documents=mentioned_documents, - checkpoint_id=checkpoint_id, - needs_history_bootstrap=needs_history_bootstrap, - thread_visibility=thread_visibility, - current_user_display_name=current_user_display_name, - disabled_tools=disabled_tools, - filesystem_selection=filesystem_selection, - request_id=request_id, - user_image_data_urls=user_image_data_urls, - flow=flow, - ): - yield chunk diff --git a/surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/__init__.py b/surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/__init__.py deleted file mode 100644 index 1bd3e103d..000000000 --- a/surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -"""Streaming context builders per orchestrator entrypoint.""" - -from app.tasks.chat.streaming.orchestration.streaming_context.chat import ( - build_chat_streaming_context, -) -from app.tasks.chat.streaming.orchestration.streaming_context.regenerate import ( - build_regenerate_streaming_context, -) -from app.tasks.chat.streaming.orchestration.streaming_context.resume import ( - build_resume_streaming_context, -) - -__all__ = [ - "build_chat_streaming_context", - "build_regenerate_streaming_context", - "build_resume_streaming_context", -] - diff --git a/surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/chat.py b/surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/chat.py deleted file mode 100644 index eb459ae5c..000000000 --- a/surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/chat.py +++ /dev/null @@ -1,258 +0,0 @@ -"""Build ``StreamingContext`` for chat streaming.""" - -from __future__ import annotations - -import logging -import time -from typing import Any - -from langchain_core.messages import HumanMessage -from sqlalchemy.future import select -from sqlalchemy.orm import selectinload - -from app.agents.multi_agent_chat import create_multi_agent_chat_deep_agent -from app.agents.new_chat.chat_deepagent import create_surfsense_deep_agent -from app.agents.new_chat.checkpointer import get_checkpointer -from app.agents.new_chat.context import SurfSenseContextSchema -from app.agents.new_chat.filesystem_selection import FilesystemSelection -from app.agents.new_chat.llm_config import ( - AgentConfig, - create_chat_litellm_from_agent_config, - create_chat_litellm_from_config, - load_agent_config, - load_global_llm_config_by_id, -) -from app.db import ( - ChatVisibility, - NewChatThread, - Report, - SearchSourceConnectorType, - SurfsenseDocsDocument, - async_session_maker, -) -from app.services.auto_model_pin_service import resolve_or_get_pinned_llm_config_id -from app.services.connector_service import ConnectorService -from app.services.new_streaming_service import VercelStreamingService -from app.tasks.chat.stream_new_chat import format_mentioned_surfsense_docs_as_context -from app.tasks.chat.streaming.agent_setup import build_main_agent_for_thread -from app.tasks.chat.streaming.orchestration.input import StreamingContext -from app.utils.content_utils import bootstrap_history_from_db -from app.utils.user_message_multimodal import build_human_message_content - -logger = logging.getLogger(__name__) - - -async def build_chat_streaming_context( - *, - user_query: str, - search_space_id: int, - chat_id: int, - user_id: str | None = None, - llm_config_id: int = -1, - mentioned_document_ids: list[int] | None = None, - mentioned_surfsense_doc_ids: list[int] | None = None, - checkpoint_id: str | None = None, - needs_history_bootstrap: bool = False, - thread_visibility: ChatVisibility | None = None, - current_user_display_name: str | None = None, - disabled_tools: list[str] | None = None, - filesystem_selection: FilesystemSelection | None = None, - request_id: str | None = None, - user_image_data_urls: list[str] | None = None, -) -> StreamingContext | None: - """Build context for ``stream_output`` from route-level chat inputs.""" - session = async_session_maker() - try: - requested_llm_config_id = llm_config_id - llm_config_id = ( - await resolve_or_get_pinned_llm_config_id( - session, - thread_id=chat_id, - search_space_id=search_space_id, - user_id=user_id, - selected_llm_config_id=llm_config_id, - requires_image_input=bool(user_image_data_urls), - ) - ).resolved_llm_config_id - - llm: Any - agent_config: AgentConfig | None - if llm_config_id >= 0: - agent_config = await load_agent_config( - session=session, - config_id=llm_config_id, - search_space_id=search_space_id, - ) - if not agent_config: - logger.warning("streaming context build failed: missing config %s", llm_config_id) - return None - llm = create_chat_litellm_from_agent_config(agent_config) - else: - loaded_llm_config = load_global_llm_config_by_id(llm_config_id) - if not loaded_llm_config: - logger.warning( - "streaming context build failed: missing global config %s", - llm_config_id, - ) - return None - llm = create_chat_litellm_from_config(loaded_llm_config) - agent_config = AgentConfig.from_yaml_config(loaded_llm_config) - - connector_service = ConnectorService(session, search_space_id=search_space_id) - firecrawl_api_key = None - webcrawler_connector = await connector_service.get_connector_by_type( - SearchSourceConnectorType.WEBCRAWLER_CONNECTOR, - search_space_id, - ) - if webcrawler_connector and webcrawler_connector.config: - firecrawl_api_key = webcrawler_connector.config.get("FIRECRAWL_API_KEY") - - checkpointer = await get_checkpointer() - visibility = thread_visibility or ChatVisibility.PRIVATE - - from app.config import config as app_config - - agent_factory = ( - create_multi_agent_chat_deep_agent - if bool(app_config.MULTI_AGENT_CHAT_ENABLED) - else create_surfsense_deep_agent - ) - agent = await build_main_agent_for_thread( - agent_factory, - llm=llm, - search_space_id=search_space_id, - db_session=session, - connector_service=connector_service, - checkpointer=checkpointer, - user_id=user_id, - thread_id=chat_id, - agent_config=agent_config, - firecrawl_api_key=firecrawl_api_key, - thread_visibility=visibility, - filesystem_selection=filesystem_selection, - disabled_tools=disabled_tools, - mentioned_document_ids=mentioned_document_ids, - ) - - langchain_messages = [] - if needs_history_bootstrap: - langchain_messages = await bootstrap_history_from_db( - session, - chat_id, - thread_visibility=visibility, - ) - thread_result = await session.execute( - select(NewChatThread).filter(NewChatThread.id == chat_id) - ) - thread = thread_result.scalars().first() - if thread: - thread.needs_history_bootstrap = False - await session.commit() - - mentioned_surfsense_docs: list[SurfsenseDocsDocument] = [] - if mentioned_surfsense_doc_ids: - result = await session.execute( - select(SurfsenseDocsDocument) - .options(selectinload(SurfsenseDocsDocument.chunks)) - .filter(SurfsenseDocsDocument.id.in_(mentioned_surfsense_doc_ids)) - ) - mentioned_surfsense_docs = list(result.scalars().all()) - - recent_reports_result = await session.execute( - select(Report) - .filter(Report.thread_id == chat_id, Report.content.isnot(None)) - .order_by(Report.id.desc()) - .limit(3) - ) - recent_reports = list(recent_reports_result.scalars().all()) - - final_query = user_query - context_parts = [] - if mentioned_surfsense_docs: - context_parts.append( - format_mentioned_surfsense_docs_as_context(mentioned_surfsense_docs) - ) - if recent_reports: - report_lines = [ - f' - report_id={r.id}, title="{r.title}", style="{r.report_style or "detailed"}"' - for r in recent_reports - ] - reports_listing = "\n".join(report_lines) - context_parts.append( - "\n" - "Previously generated reports in this conversation:\n" - f"{reports_listing}\n\n" - "If the user wants to MODIFY, REVISE, UPDATE, or ADD to one of these reports, " - "set parent_report_id to the relevant report_id above.\n" - "If the user wants a completely NEW report on a different topic, " - "leave parent_report_id unset.\n" - "" - ) - if context_parts: - joined_context = "\n\n".join(context_parts) - final_query = f"{joined_context}\n\n{user_query}" - if visibility == ChatVisibility.SEARCH_SPACE and current_user_display_name: - final_query = f"**[{current_user_display_name}]:** {final_query}" - - human_content = build_human_message_content( - final_query, - list(user_image_data_urls or ()), - ) - langchain_messages.append(HumanMessage(content=human_content)) - - turn_id = f"{chat_id}:{int(time.time() * 1000)}" - input_state = { - "messages": langchain_messages, - "search_space_id": search_space_id, - "request_id": request_id or "unknown", - "turn_id": turn_id, - } - configurable = { - "thread_id": str(chat_id), - "request_id": request_id or "unknown", - "turn_id": turn_id, - } - if checkpoint_id: - configurable["checkpoint_id"] = checkpoint_id - config = {"configurable": configurable, "recursion_limit": 10_000} - - initial_title = ( - "Analyzing referenced content" - if mentioned_surfsense_docs - else "Understanding your request" - ) - action_verb = "Analyzing" if mentioned_surfsense_docs else "Processing" - query_excerpt = user_query[:80] + ("..." if len(user_query) > 80 else "") - query_part = query_excerpt if query_excerpt.strip() else "(message)" - initial_items = [f"{action_verb}: {query_part}"] - - runtime_context = SurfSenseContextSchema( - search_space_id=search_space_id, - mentioned_document_ids=list(mentioned_document_ids or []), - request_id=request_id, - turn_id=turn_id, - ) - - await session.commit() - return StreamingContext( - agent=agent, - config=config, - input_data=input_state, - streaming_service=VercelStreamingService(), - step_prefix="thinking", - initial_step_id="thinking-1", - initial_step_title=initial_title, - initial_step_items=initial_items, - content_builder=None, - runtime_context=runtime_context, - ) - except Exception: - logger.exception( - "Failed to build chat streaming context (llm_config_id=%s requested=%s)", - llm_config_id, - requested_llm_config_id, - ) - return None - finally: - await session.close() - diff --git a/surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/regenerate.py b/surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/regenerate.py deleted file mode 100644 index 02e871a2c..000000000 --- a/surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/regenerate.py +++ /dev/null @@ -1,49 +0,0 @@ -"""Build ``StreamingContext`` for regenerate streaming.""" - -from __future__ import annotations - -from app.agents.new_chat.filesystem_selection import FilesystemSelection -from app.db import ChatVisibility -from app.tasks.chat.streaming.orchestration.input import StreamingContext -from app.tasks.chat.streaming.orchestration.streaming_context.chat import ( - build_chat_streaming_context, -) - - -async def build_regenerate_streaming_context( - *, - user_query: str, - search_space_id: int, - chat_id: int, - user_id: str | None = None, - llm_config_id: int = -1, - mentioned_document_ids: list[int] | None = None, - mentioned_surfsense_doc_ids: list[int] | None = None, - checkpoint_id: str | None = None, - needs_history_bootstrap: bool = False, - thread_visibility: ChatVisibility | None = None, - current_user_display_name: str | None = None, - disabled_tools: list[str] | None = None, - filesystem_selection: FilesystemSelection | None = None, - request_id: str | None = None, - user_image_data_urls: list[str] | None = None, -) -> StreamingContext | None: - """Build context for ``stream_regenerate`` execution.""" - return await build_chat_streaming_context( - user_query=user_query, - search_space_id=search_space_id, - chat_id=chat_id, - user_id=user_id, - llm_config_id=llm_config_id, - mentioned_document_ids=mentioned_document_ids, - mentioned_surfsense_doc_ids=mentioned_surfsense_doc_ids, - checkpoint_id=checkpoint_id, - needs_history_bootstrap=needs_history_bootstrap, - thread_visibility=thread_visibility, - current_user_display_name=current_user_display_name, - disabled_tools=disabled_tools, - filesystem_selection=filesystem_selection, - request_id=request_id, - user_image_data_urls=user_image_data_urls, - ) - diff --git a/surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/resume.py b/surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/resume.py deleted file mode 100644 index 6d0caea4d..000000000 --- a/surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/resume.py +++ /dev/null @@ -1,154 +0,0 @@ -"""Build ``StreamingContext`` for resume streaming.""" - -from __future__ import annotations - -import logging -import time -from typing import Any - -from langgraph.types import Command - -from app.agents.multi_agent_chat import create_multi_agent_chat_deep_agent -from app.agents.new_chat.chat_deepagent import create_surfsense_deep_agent -from app.agents.new_chat.checkpointer import get_checkpointer -from app.agents.new_chat.context import SurfSenseContextSchema -from app.agents.new_chat.filesystem_selection import FilesystemSelection -from app.agents.new_chat.llm_config import ( - AgentConfig, - create_chat_litellm_from_agent_config, - create_chat_litellm_from_config, - load_agent_config, - load_global_llm_config_by_id, -) -from app.db import ChatVisibility, SearchSourceConnectorType, async_session_maker -from app.services.auto_model_pin_service import resolve_or_get_pinned_llm_config_id -from app.services.connector_service import ConnectorService -from app.services.new_streaming_service import VercelStreamingService -from app.tasks.chat.streaming.agent_setup import build_main_agent_for_thread -from app.tasks.chat.streaming.orchestration.input import StreamingContext - -logger = logging.getLogger(__name__) - - -async def build_resume_streaming_context( - *, - chat_id: int, - search_space_id: int, - decisions: list[dict], - user_id: str | None = None, - llm_config_id: int = -1, - thread_visibility: ChatVisibility | None = None, - filesystem_selection: FilesystemSelection | None = None, - request_id: str | None = None, - disabled_tools: list[str] | None = None, -) -> StreamingContext | None: - """Build context for ``stream_resume`` execution.""" - session = async_session_maker() - try: - llm_config_id = ( - await resolve_or_get_pinned_llm_config_id( - session, - thread_id=chat_id, - search_space_id=search_space_id, - user_id=user_id, - selected_llm_config_id=llm_config_id, - ) - ).resolved_llm_config_id - - llm: Any - agent_config: AgentConfig | None - if llm_config_id >= 0: - agent_config = await load_agent_config( - session=session, - config_id=llm_config_id, - search_space_id=search_space_id, - ) - if not agent_config: - logger.warning("resume context build failed: missing config %s", llm_config_id) - return None - llm = create_chat_litellm_from_agent_config(agent_config) - else: - loaded_llm_config = load_global_llm_config_by_id(llm_config_id) - if not loaded_llm_config: - logger.warning( - "resume context build failed: missing global config %s", - llm_config_id, - ) - return None - llm = create_chat_litellm_from_config(loaded_llm_config) - agent_config = AgentConfig.from_yaml_config(loaded_llm_config) - - connector_service = ConnectorService(session, search_space_id=search_space_id) - firecrawl_api_key = None - webcrawler_connector = await connector_service.get_connector_by_type( - SearchSourceConnectorType.WEBCRAWLER_CONNECTOR, - search_space_id, - ) - if webcrawler_connector and webcrawler_connector.config: - firecrawl_api_key = webcrawler_connector.config.get("FIRECRAWL_API_KEY") - - checkpointer = await get_checkpointer() - visibility = thread_visibility or ChatVisibility.PRIVATE - - from app.config import config as app_config - - agent_factory = ( - create_multi_agent_chat_deep_agent - if bool(app_config.MULTI_AGENT_CHAT_ENABLED) - else create_surfsense_deep_agent - ) - agent = await build_main_agent_for_thread( - agent_factory, - llm=llm, - search_space_id=search_space_id, - db_session=session, - connector_service=connector_service, - checkpointer=checkpointer, - user_id=user_id, - thread_id=chat_id, - agent_config=agent_config, - firecrawl_api_key=firecrawl_api_key, - thread_visibility=visibility, - filesystem_selection=filesystem_selection, - disabled_tools=disabled_tools, - ) - - turn_id = f"{chat_id}:{int(time.time() * 1000)}" - config = { - "configurable": { - "thread_id": str(chat_id), - "request_id": request_id or "unknown", - "turn_id": turn_id, - "surfsense_resume_value": {"decisions": decisions}, - }, - "recursion_limit": 10_000, - } - - runtime_context = SurfSenseContextSchema( - search_space_id=search_space_id, - request_id=request_id, - turn_id=turn_id, - ) - - await session.commit() - return StreamingContext( - agent=agent, - config=config, - input_data=Command(resume={"decisions": decisions}), - streaming_service=VercelStreamingService(), - step_prefix="thinking-resume", - initial_step_id=None, - initial_step_title="", - initial_step_items=None, - content_builder=None, - runtime_context=runtime_context, - ) - except Exception: - logger.exception( - "Failed to build resume streaming context (llm_config_id=%s)", - llm_config_id, - ) - return None - finally: - await session.close() - diff --git a/surfsense_backend/app/tasks/chat/streaming/relay/__init__.py b/surfsense_backend/app/tasks/chat/streaming/relay/__init__.py index 351e878a8..18eda9a6d 100644 --- a/surfsense_backend/app/tasks/chat/streaming/relay/__init__.py +++ b/surfsense_backend/app/tasks/chat/streaming/relay/__init__.py @@ -1,7 +1,23 @@ -"""Relay state: thinking steps, tool bookkeeping, and stream helpers.""" +"""Relay: thinking steps, tool bookkeeping, and ``EventRelay``. + +Package imports are lazy so ``relay.thinking_step_sse`` (and siblings) can load +without pulling in ``event_relay`` (which imports handler modules that may +import those siblings). +""" from __future__ import annotations -from app.tasks.chat.streaming.relay.event_relay import EventRelay, EventRelayConfig - __all__ = ["EventRelay", "EventRelayConfig"] + + +def __getattr__(name: str): + if name == "EventRelay": + from app.tasks.chat.streaming.relay.event_relay import EventRelay + + return EventRelay + if name == "EventRelayConfig": + from app.tasks.chat.streaming.relay.event_relay import EventRelayConfig + + return EventRelayConfig + msg = f"module {__name__!r} has no attribute {name!r}" + raise AttributeError(msg) diff --git a/surfsense_backend/app/tasks/chat/streaming/relay/event_relay.py b/surfsense_backend/app/tasks/chat/streaming/relay/event_relay.py index c8aebd99c..872998926 100644 --- a/surfsense_backend/app/tasks/chat/streaming/relay/event_relay.py +++ b/surfsense_backend/app/tasks/chat/streaming/relay/event_relay.py @@ -7,6 +7,7 @@ from dataclasses import dataclass, field from typing import Any from app.services.streaming.emitter import EmitterRegistry +from app.tasks.chat.streaming.graph_stream.result import StreamingResult from app.tasks.chat.streaming.handlers.chain_end import iter_chain_end_frames from app.tasks.chat.streaming.handlers.chat_model_stream import ( iter_chat_model_stream_frames, @@ -16,7 +17,6 @@ from app.tasks.chat.streaming.handlers.custom_event_dispatch import ( ) from app.tasks.chat.streaming.handlers.tool_end import iter_tool_end_frames from app.tasks.chat.streaming.handlers.tool_start import iter_tool_start_frames -from app.tasks.chat.streaming.orchestration.output import StreamingResult from app.tasks.chat.streaming.relay.state import AgentEventRelayState from app.tasks.chat.streaming.relay.thinking_step_completion import ( complete_active_thinking_step, diff --git a/surfsense_backend/tests/unit/tasks/chat/streaming/test_agent_setup.py b/surfsense_backend/tests/unit/tasks/chat/streaming/test_agent_setup.py deleted file mode 100644 index e1f7dd027..000000000 --- a/surfsense_backend/tests/unit/tasks/chat/streaming/test_agent_setup.py +++ /dev/null @@ -1,120 +0,0 @@ -"""Behavior tests for streaming agent setup helpers.""" - -from __future__ import annotations - -import sys -import types -from typing import Any - -import pytest - -from app.tasks.chat.streaming import agent_setup - -pytestmark = pytest.mark.unit - - -async def test_preflight_llm_calls_litellm_when_model_present( - monkeypatch: pytest.MonkeyPatch, -) -> None: - calls: dict[str, Any] = {} - - async def _fake_acompletion(**kwargs: Any): - calls.update(kwargs) - return {"ok": True} - - monkeypatch.setitem( - sys.modules, - "litellm", - types.SimpleNamespace(acompletion=_fake_acompletion), - ) - - llm = types.SimpleNamespace(model="openai/test", api_key="k", api_base="b") - await agent_setup.preflight_llm(llm, is_provider_rate_limited=lambda _: False) - - assert calls["model"] == "openai/test" - assert calls["max_tokens"] == 1 - assert calls["timeout"] == 2.5 - assert calls["stream"] is False - - -async def test_preflight_llm_rethrows_rate_limited(monkeypatch: pytest.MonkeyPatch) -> None: - class _RateLimitedError(Exception): - pass - - async def _fake_acompletion(**kwargs: Any): - del kwargs - raise _RateLimitedError("rl") - - monkeypatch.setitem( - sys.modules, - "litellm", - types.SimpleNamespace(acompletion=_fake_acompletion), - ) - - with pytest.raises(_RateLimitedError): - await agent_setup.preflight_llm( - types.SimpleNamespace(model="openai/test"), - is_provider_rate_limited=lambda exc: isinstance(exc, _RateLimitedError), - ) - - -async def test_preflight_llm_skips_probe_for_auto_model( - monkeypatch: pytest.MonkeyPatch, -) -> None: - called = {"count": 0} - - async def _fake_acompletion(**kwargs: Any): - del kwargs - called["count"] += 1 - return {"ok": True} - - monkeypatch.setitem( - sys.modules, - "litellm", - types.SimpleNamespace(acompletion=_fake_acompletion), - ) - - await agent_setup.preflight_llm( - types.SimpleNamespace(model="auto"), - is_provider_rate_limited=lambda _: False, - ) - assert called["count"] == 0 - - -async def test_build_main_agent_for_thread_forwards_arguments() -> None: - seen: dict[str, Any] = {} - - async def _factory(**kwargs: Any): - seen.update(kwargs) - return "agent" - - out = await agent_setup.build_main_agent_for_thread( - _factory, - llm="llm", - search_space_id=1, - db_session="db", - connector_service="connector", - checkpointer="cp", - user_id="u", - thread_id=10, - agent_config="cfg", - firecrawl_api_key="key", - thread_visibility="vis", - filesystem_selection="fs", - disabled_tools=["a"], - mentioned_document_ids=[5], - ) - assert out == "agent" - assert seen["thread_id"] == 10 - assert seen["mentioned_document_ids"] == [5] - - -async def test_settle_speculative_agent_build_swallows_exceptions() -> None: - async def _boom() -> None: - raise RuntimeError("ignore") - - import asyncio - - task = asyncio.create_task(_boom()) - await agent_setup.settle_speculative_agent_build(task) - assert task.done() diff --git a/surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestrator_stream_chat.py b/surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestrator_stream_chat.py deleted file mode 100644 index 46c61b498..000000000 --- a/surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestrator_stream_chat.py +++ /dev/null @@ -1,240 +0,0 @@ -"""Behavior tests for orchestrator ``stream_chat`` public API.""" - -from __future__ import annotations - -from dataclasses import dataclass, field -from typing import Any - -import pytest - -from app.tasks.chat.streaming.orchestration import StreamingContext -from app.tasks.chat.streaming.orchestration import orchestrator -from app.tasks.chat.streaming.orchestration.orchestrator import ( - stream_chat, - stream_regenerate, - stream_resume, -) - -pytestmark = pytest.mark.unit - - -@dataclass -class _Chunk: - content: Any = "" - additional_kwargs: dict[str, Any] = field(default_factory=dict) - tool_call_chunks: list[dict[str, Any]] = field(default_factory=list) - - -class _StreamingService: - def __init__(self) -> None: - self._text_idx = 0 - - def generate_text_id(self) -> str: - self._text_idx += 1 - return f"text-{self._text_idx}" - - def format_text_start(self, text_id: str) -> str: - return f"text_start:{text_id}" - - def format_text_delta(self, text_id: str, text: str) -> str: - return f"text_delta:{text_id}:{text}" - - def format_text_end(self, text_id: str) -> str: - return f"text_end:{text_id}" - - -class _Agent: - def __init__(self, events: list[dict[str, Any]]) -> None: - self.events = list(events) - self.calls: list[tuple[Any, dict[str, Any]]] = [] - - async def astream_events(self, input_data: Any, **kwargs: Any): - self.calls.append((input_data, kwargs)) - for event in self.events: - yield event - - -async def _collect(stream: Any) -> list[str]: - out: list[str] = [] - async for x in stream: - out.append(x) - return out - - -async def test_stream_chat_uses_streaming_context_path() -> None: - service = _StreamingService() - agent = _Agent( - [ - {"event": "on_chat_model_stream", "data": {"chunk": _Chunk(content="hello")}}, - {"event": "on_chat_model_stream", "data": {"chunk": _Chunk(content="!")}}, - ] - ) - frames = await _collect( - stream_chat( - user_query="ignored-here", - search_space_id=1, - chat_id=77, - streaming_context=StreamingContext( - agent=agent, - config={"configurable": {"thread_id": "thread-1"}}, - input_data={"messages": []}, - streaming_service=service, - ), - ) - ) - - assert frames == [ - "text_start:text-1", - "text_delta:text-1:hello", - "text_delta:text-1:!", - "text_end:text-1", - ] - - -async def test_stream_resume_uses_streaming_context_path() -> None: - service = _StreamingService() - agent = _Agent([{"event": "on_chat_model_stream", "data": {"chunk": _Chunk("r")}}]) - - frames = await _collect( - stream_resume( - chat_id=9, - search_space_id=1, - decisions=[], - streaming_context=StreamingContext( - agent=agent, - config={"configurable": {"thread_id": "thread-r"}}, - input_data={"messages": []}, - streaming_service=service, - ), - ) - ) - - assert frames == [ - "text_start:text-1", - "text_delta:text-1:r", - "text_end:text-1", - ] - - -async def test_stream_regenerate_uses_streaming_context_path() -> None: - service = _StreamingService() - agent = _Agent([{"event": "on_chat_model_stream", "data": {"chunk": _Chunk("g")}}]) - - frames = await _collect( - stream_regenerate( - user_query="q", - search_space_id=1, - chat_id=2, - streaming_context=StreamingContext( - agent=agent, - config={"configurable": {"thread_id": "thread-g"}}, - input_data={"messages": []}, - streaming_service=service, - ), - ) - ) - - assert frames == [ - "text_start:text-1", - "text_delta:text-1:g", - "text_end:text-1", - ] - - -async def test_stream_chat_builds_streaming_context_when_not_provided() -> None: - service = _StreamingService() - agent = _Agent([{"event": "on_chat_model_stream", "data": {"chunk": _Chunk("b")}}]) - - async def _fake_builder(**kwargs: Any) -> StreamingContext: - del kwargs - return StreamingContext( - agent=agent, - config={"configurable": {"thread_id": "thread-b"}}, - input_data={"messages": []}, - streaming_service=service, - ) - - old = orchestrator.build_chat_streaming_context - orchestrator.build_chat_streaming_context = _fake_builder - try: - frames = await _collect( - stream_chat( - user_query="q", - search_space_id=1, - chat_id=3, - ) - ) - finally: - orchestrator.build_chat_streaming_context = old - - assert frames == [ - "text_start:text-1", - "text_delta:text-1:b", - "text_end:text-1", - ] - - -async def test_stream_resume_builds_streaming_context_when_not_provided() -> None: - service = _StreamingService() - agent = _Agent([{"event": "on_chat_model_stream", "data": {"chunk": _Chunk("u")}}]) - - async def _fake_builder(**kwargs: Any) -> StreamingContext: - del kwargs - return StreamingContext( - agent=agent, - config={"configurable": {"thread_id": "thread-u"}}, - input_data={"messages": []}, - streaming_service=service, - ) - - old = orchestrator.build_resume_streaming_context - orchestrator.build_resume_streaming_context = _fake_builder - try: - frames = await _collect( - stream_resume( - chat_id=9, - search_space_id=1, - decisions=[], - ) - ) - finally: - orchestrator.build_resume_streaming_context = old - - assert frames == [ - "text_start:text-1", - "text_delta:text-1:u", - "text_end:text-1", - ] - - -async def test_stream_regenerate_builds_streaming_context_when_not_provided() -> None: - service = _StreamingService() - agent = _Agent([{"event": "on_chat_model_stream", "data": {"chunk": _Chunk("x")}}]) - - async def _fake_builder(**kwargs: Any) -> StreamingContext: - del kwargs - return StreamingContext( - agent=agent, - config={"configurable": {"thread_id": "thread-x"}}, - input_data={"messages": []}, - streaming_service=service, - ) - - old = orchestrator.build_regenerate_streaming_context - orchestrator.build_regenerate_streaming_context = _fake_builder - try: - frames = await _collect( - stream_regenerate( - user_query="q", - search_space_id=1, - chat_id=2, - ) - ) - finally: - orchestrator.build_regenerate_streaming_context = old - - assert frames == [ - "text_start:text-1", - "text_delta:text-1:x", - "text_end:text-1", - ] diff --git a/surfsense_backend/tests/unit/tasks/chat/streaming/test_stage_1_parity.py b/surfsense_backend/tests/unit/tasks/chat/streaming/test_stage_1_parity.py index 9207f37d1..023c8b999 100644 --- a/surfsense_backend/tests/unit/tasks/chat/streaming/test_stage_1_parity.py +++ b/surfsense_backend/tests/unit/tasks/chat/streaming/test_stage_1_parity.py @@ -1,7 +1,7 @@ """Pin Stage 1 extractions as faithful copies of the old helpers. -The new orchestrator under ``app.tasks.chat.streaming`` is built in -parallel with the production module ``app.tasks.chat.stream_new_chat``. +Extractions under ``app.tasks.chat.streaming`` are compared to +``app.tasks.chat.stream_new_chat`` helpers. For each Stage 1 extraction we assert the new function returns the same output as the old one for a representative input set. The moment the two diverge - intentionally or otherwise - this file fails loudly so diff --git a/surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestration_event_stream.py b/surfsense_backend/tests/unit/tasks/chat/streaming/test_stream_output.py similarity index 93% rename from surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestration_event_stream.py rename to surfsense_backend/tests/unit/tasks/chat/streaming/test_stream_output.py index b17d82293..9fb876dd7 100644 --- a/surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestration_event_stream.py +++ b/surfsense_backend/tests/unit/tasks/chat/streaming/test_stream_output.py @@ -1,4 +1,4 @@ -"""Behavior tests for orchestration event-stream execution.""" +"""Tests for ``stream_output`` (LangGraph events → SSE).""" from __future__ import annotations @@ -7,8 +7,8 @@ from typing import Any import pytest -from app.tasks.chat.streaming.orchestration import stream_output -from app.tasks.chat.streaming.orchestration.output import StreamingResult +from app.tasks.chat.streaming.graph_stream import stream_output +from app.tasks.chat.streaming.graph_stream.result import StreamingResult pytestmark = pytest.mark.unit @@ -88,6 +88,7 @@ async def test_stream_output_emits_text_lifecycle_and_updates_result() -> None: async def test_stream_output_passes_runtime_context_to_agent() -> None: service = _StreamingService() + class _ContextAwareAgent: async def astream_events(self, input_data: Any, **kwargs: Any): del input_data