From 52895e37e9ec86aba93cf4dd3fcf734da2b2e162 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Thu, 7 May 2026 17:57:27 +0200 Subject: [PATCH] build streaming contexts for chat resume and regenerate paths --- .../streaming/orchestration/orchestrator.py | 55 +++- .../streaming_context/__init__.py | 18 ++ .../orchestration/streaming_context/chat.py | 258 ++++++++++++++++++ .../streaming_context/regenerate.py | 49 ++++ .../orchestration/streaming_context/resume.py | 154 +++++++++++ .../test_orchestrator_stream_chat.py | 100 +++++++ 6 files changed, 633 insertions(+), 1 deletion(-) create mode 100644 surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/__init__.py create mode 100644 surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/chat.py create mode 100644 surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/regenerate.py create mode 100644 surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/resume.py diff --git a/surfsense_backend/app/tasks/chat/streaming/orchestration/orchestrator.py b/surfsense_backend/app/tasks/chat/streaming/orchestration/orchestrator.py index b40083f42..80cae77a2 100644 --- a/surfsense_backend/app/tasks/chat/streaming/orchestration/orchestrator.py +++ b/surfsense_backend/app/tasks/chat/streaming/orchestration/orchestrator.py @@ -9,6 +9,11 @@ from typing import Any, Literal from app.agents.new_chat.filesystem_selection import FilesystemSelection from app.db import ChatVisibility from app.tasks.chat.stream_new_chat import stream_new_chat, stream_resume_chat +from app.tasks.chat.streaming.orchestration.streaming_context import ( + build_chat_streaming_context, + build_regenerate_streaming_context, + build_resume_streaming_context, +) from app.tasks.chat.streaming.orchestration.event_stream import stream_output from app.tasks.chat.streaming.orchestration.input import StreamingContext from app.tasks.chat.streaming.orchestration.output import StreamingResult @@ -38,7 +43,7 @@ async def _stream_output_with_streaming_context( ) -> AsyncGenerator[str, None]: async for frame in stream_output( agent=streaming_context.agent, - config=streaming_context.config, + config=streaming_context.config, input_data=streaming_context.input_data, streaming_service=streaming_context.streaming_service, result=result, @@ -73,6 +78,24 @@ async def stream_chat( streaming_context: StreamingContext | None = None, ) -> AsyncGenerator[str, None]: """Stream a new chat turn through the current production pipeline.""" + if streaming_context is None: + streaming_context = await build_chat_streaming_context( + user_query=user_query, + search_space_id=search_space_id, + chat_id=chat_id, + user_id=user_id, + llm_config_id=llm_config_id, + mentioned_document_ids=mentioned_document_ids, + mentioned_surfsense_doc_ids=mentioned_surfsense_doc_ids, + checkpoint_id=checkpoint_id, + needs_history_bootstrap=needs_history_bootstrap, + thread_visibility=thread_visibility, + current_user_display_name=current_user_display_name, + disabled_tools=disabled_tools, + filesystem_selection=filesystem_selection, + request_id=request_id, + user_image_data_urls=user_image_data_urls, + ) if streaming_context is not None: result = _build_streaming_result( chat_id=chat_id, @@ -122,6 +145,18 @@ async def stream_resume( streaming_context: StreamingContext | None = None, ) -> AsyncGenerator[str, None]: """Resume an interrupted chat turn through the current production pipeline.""" + if streaming_context is None: + streaming_context = await build_resume_streaming_context( + chat_id=chat_id, + search_space_id=search_space_id, + decisions=decisions, + user_id=user_id, + llm_config_id=llm_config_id, + thread_visibility=thread_visibility, + filesystem_selection=filesystem_selection, + request_id=request_id, + disabled_tools=disabled_tools, + ) if streaming_context is not None: result = _build_streaming_result( chat_id=chat_id, @@ -172,6 +207,24 @@ async def stream_regenerate( streaming_context: StreamingContext | None = None, ) -> AsyncGenerator[str, None]: """Regenerate an assistant turn through the current production pipeline.""" + if streaming_context is None: + streaming_context = await build_regenerate_streaming_context( + user_query=user_query, + search_space_id=search_space_id, + chat_id=chat_id, + user_id=user_id, + llm_config_id=llm_config_id, + mentioned_document_ids=mentioned_document_ids, + mentioned_surfsense_doc_ids=mentioned_surfsense_doc_ids, + checkpoint_id=checkpoint_id, + needs_history_bootstrap=needs_history_bootstrap, + thread_visibility=thread_visibility, + current_user_display_name=current_user_display_name, + disabled_tools=disabled_tools, + filesystem_selection=filesystem_selection, + request_id=request_id, + user_image_data_urls=user_image_data_urls, + ) if streaming_context is not None: result = _build_streaming_result( chat_id=chat_id, diff --git a/surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/__init__.py b/surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/__init__.py new file mode 100644 index 000000000..1bd3e103d --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/__init__.py @@ -0,0 +1,18 @@ +"""Streaming context builders per orchestrator entrypoint.""" + +from app.tasks.chat.streaming.orchestration.streaming_context.chat import ( + build_chat_streaming_context, +) +from app.tasks.chat.streaming.orchestration.streaming_context.regenerate import ( + build_regenerate_streaming_context, +) +from app.tasks.chat.streaming.orchestration.streaming_context.resume import ( + build_resume_streaming_context, +) + +__all__ = [ + "build_chat_streaming_context", + "build_regenerate_streaming_context", + "build_resume_streaming_context", +] + diff --git a/surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/chat.py b/surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/chat.py new file mode 100644 index 000000000..eb459ae5c --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/chat.py @@ -0,0 +1,258 @@ +"""Build ``StreamingContext`` for chat streaming.""" + +from __future__ import annotations + +import logging +import time +from typing import Any + +from langchain_core.messages import HumanMessage +from sqlalchemy.future import select +from sqlalchemy.orm import selectinload + +from app.agents.multi_agent_chat import create_multi_agent_chat_deep_agent +from app.agents.new_chat.chat_deepagent import create_surfsense_deep_agent +from app.agents.new_chat.checkpointer import get_checkpointer +from app.agents.new_chat.context import SurfSenseContextSchema +from app.agents.new_chat.filesystem_selection import FilesystemSelection +from app.agents.new_chat.llm_config import ( + AgentConfig, + create_chat_litellm_from_agent_config, + create_chat_litellm_from_config, + load_agent_config, + load_global_llm_config_by_id, +) +from app.db import ( + ChatVisibility, + NewChatThread, + Report, + SearchSourceConnectorType, + SurfsenseDocsDocument, + async_session_maker, +) +from app.services.auto_model_pin_service import resolve_or_get_pinned_llm_config_id +from app.services.connector_service import ConnectorService +from app.services.new_streaming_service import VercelStreamingService +from app.tasks.chat.stream_new_chat import format_mentioned_surfsense_docs_as_context +from app.tasks.chat.streaming.agent_setup import build_main_agent_for_thread +from app.tasks.chat.streaming.orchestration.input import StreamingContext +from app.utils.content_utils import bootstrap_history_from_db +from app.utils.user_message_multimodal import build_human_message_content + +logger = logging.getLogger(__name__) + + +async def build_chat_streaming_context( + *, + user_query: str, + search_space_id: int, + chat_id: int, + user_id: str | None = None, + llm_config_id: int = -1, + mentioned_document_ids: list[int] | None = None, + mentioned_surfsense_doc_ids: list[int] | None = None, + checkpoint_id: str | None = None, + needs_history_bootstrap: bool = False, + thread_visibility: ChatVisibility | None = None, + current_user_display_name: str | None = None, + disabled_tools: list[str] | None = None, + filesystem_selection: FilesystemSelection | None = None, + request_id: str | None = None, + user_image_data_urls: list[str] | None = None, +) -> StreamingContext | None: + """Build context for ``stream_output`` from route-level chat inputs.""" + session = async_session_maker() + try: + requested_llm_config_id = llm_config_id + llm_config_id = ( + await resolve_or_get_pinned_llm_config_id( + session, + thread_id=chat_id, + search_space_id=search_space_id, + user_id=user_id, + selected_llm_config_id=llm_config_id, + requires_image_input=bool(user_image_data_urls), + ) + ).resolved_llm_config_id + + llm: Any + agent_config: AgentConfig | None + if llm_config_id >= 0: + agent_config = await load_agent_config( + session=session, + config_id=llm_config_id, + search_space_id=search_space_id, + ) + if not agent_config: + logger.warning("streaming context build failed: missing config %s", llm_config_id) + return None + llm = create_chat_litellm_from_agent_config(agent_config) + else: + loaded_llm_config = load_global_llm_config_by_id(llm_config_id) + if not loaded_llm_config: + logger.warning( + "streaming context build failed: missing global config %s", + llm_config_id, + ) + return None + llm = create_chat_litellm_from_config(loaded_llm_config) + agent_config = AgentConfig.from_yaml_config(loaded_llm_config) + + connector_service = ConnectorService(session, search_space_id=search_space_id) + firecrawl_api_key = None + webcrawler_connector = await connector_service.get_connector_by_type( + SearchSourceConnectorType.WEBCRAWLER_CONNECTOR, + search_space_id, + ) + if webcrawler_connector and webcrawler_connector.config: + firecrawl_api_key = webcrawler_connector.config.get("FIRECRAWL_API_KEY") + + checkpointer = await get_checkpointer() + visibility = thread_visibility or ChatVisibility.PRIVATE + + from app.config import config as app_config + + agent_factory = ( + create_multi_agent_chat_deep_agent + if bool(app_config.MULTI_AGENT_CHAT_ENABLED) + else create_surfsense_deep_agent + ) + agent = await build_main_agent_for_thread( + agent_factory, + llm=llm, + search_space_id=search_space_id, + db_session=session, + connector_service=connector_service, + checkpointer=checkpointer, + user_id=user_id, + thread_id=chat_id, + agent_config=agent_config, + firecrawl_api_key=firecrawl_api_key, + thread_visibility=visibility, + filesystem_selection=filesystem_selection, + disabled_tools=disabled_tools, + mentioned_document_ids=mentioned_document_ids, + ) + + langchain_messages = [] + if needs_history_bootstrap: + langchain_messages = await bootstrap_history_from_db( + session, + chat_id, + thread_visibility=visibility, + ) + thread_result = await session.execute( + select(NewChatThread).filter(NewChatThread.id == chat_id) + ) + thread = thread_result.scalars().first() + if thread: + thread.needs_history_bootstrap = False + await session.commit() + + mentioned_surfsense_docs: list[SurfsenseDocsDocument] = [] + if mentioned_surfsense_doc_ids: + result = await session.execute( + select(SurfsenseDocsDocument) + .options(selectinload(SurfsenseDocsDocument.chunks)) + .filter(SurfsenseDocsDocument.id.in_(mentioned_surfsense_doc_ids)) + ) + mentioned_surfsense_docs = list(result.scalars().all()) + + recent_reports_result = await session.execute( + select(Report) + .filter(Report.thread_id == chat_id, Report.content.isnot(None)) + .order_by(Report.id.desc()) + .limit(3) + ) + recent_reports = list(recent_reports_result.scalars().all()) + + final_query = user_query + context_parts = [] + if mentioned_surfsense_docs: + context_parts.append( + format_mentioned_surfsense_docs_as_context(mentioned_surfsense_docs) + ) + if recent_reports: + report_lines = [ + f' - report_id={r.id}, title="{r.title}", style="{r.report_style or "detailed"}"' + for r in recent_reports + ] + reports_listing = "\n".join(report_lines) + context_parts.append( + "\n" + "Previously generated reports in this conversation:\n" + f"{reports_listing}\n\n" + "If the user wants to MODIFY, REVISE, UPDATE, or ADD to one of these reports, " + "set parent_report_id to the relevant report_id above.\n" + "If the user wants a completely NEW report on a different topic, " + "leave parent_report_id unset.\n" + "" + ) + if context_parts: + joined_context = "\n\n".join(context_parts) + final_query = f"{joined_context}\n\n{user_query}" + if visibility == ChatVisibility.SEARCH_SPACE and current_user_display_name: + final_query = f"**[{current_user_display_name}]:** {final_query}" + + human_content = build_human_message_content( + final_query, + list(user_image_data_urls or ()), + ) + langchain_messages.append(HumanMessage(content=human_content)) + + turn_id = f"{chat_id}:{int(time.time() * 1000)}" + input_state = { + "messages": langchain_messages, + "search_space_id": search_space_id, + "request_id": request_id or "unknown", + "turn_id": turn_id, + } + configurable = { + "thread_id": str(chat_id), + "request_id": request_id or "unknown", + "turn_id": turn_id, + } + if checkpoint_id: + configurable["checkpoint_id"] = checkpoint_id + config = {"configurable": configurable, "recursion_limit": 10_000} + + initial_title = ( + "Analyzing referenced content" + if mentioned_surfsense_docs + else "Understanding your request" + ) + action_verb = "Analyzing" if mentioned_surfsense_docs else "Processing" + query_excerpt = user_query[:80] + ("..." if len(user_query) > 80 else "") + query_part = query_excerpt if query_excerpt.strip() else "(message)" + initial_items = [f"{action_verb}: {query_part}"] + + runtime_context = SurfSenseContextSchema( + search_space_id=search_space_id, + mentioned_document_ids=list(mentioned_document_ids or []), + request_id=request_id, + turn_id=turn_id, + ) + + await session.commit() + return StreamingContext( + agent=agent, + config=config, + input_data=input_state, + streaming_service=VercelStreamingService(), + step_prefix="thinking", + initial_step_id="thinking-1", + initial_step_title=initial_title, + initial_step_items=initial_items, + content_builder=None, + runtime_context=runtime_context, + ) + except Exception: + logger.exception( + "Failed to build chat streaming context (llm_config_id=%s requested=%s)", + llm_config_id, + requested_llm_config_id, + ) + return None + finally: + await session.close() + diff --git a/surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/regenerate.py b/surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/regenerate.py new file mode 100644 index 000000000..02e871a2c --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/regenerate.py @@ -0,0 +1,49 @@ +"""Build ``StreamingContext`` for regenerate streaming.""" + +from __future__ import annotations + +from app.agents.new_chat.filesystem_selection import FilesystemSelection +from app.db import ChatVisibility +from app.tasks.chat.streaming.orchestration.input import StreamingContext +from app.tasks.chat.streaming.orchestration.streaming_context.chat import ( + build_chat_streaming_context, +) + + +async def build_regenerate_streaming_context( + *, + user_query: str, + search_space_id: int, + chat_id: int, + user_id: str | None = None, + llm_config_id: int = -1, + mentioned_document_ids: list[int] | None = None, + mentioned_surfsense_doc_ids: list[int] | None = None, + checkpoint_id: str | None = None, + needs_history_bootstrap: bool = False, + thread_visibility: ChatVisibility | None = None, + current_user_display_name: str | None = None, + disabled_tools: list[str] | None = None, + filesystem_selection: FilesystemSelection | None = None, + request_id: str | None = None, + user_image_data_urls: list[str] | None = None, +) -> StreamingContext | None: + """Build context for ``stream_regenerate`` execution.""" + return await build_chat_streaming_context( + user_query=user_query, + search_space_id=search_space_id, + chat_id=chat_id, + user_id=user_id, + llm_config_id=llm_config_id, + mentioned_document_ids=mentioned_document_ids, + mentioned_surfsense_doc_ids=mentioned_surfsense_doc_ids, + checkpoint_id=checkpoint_id, + needs_history_bootstrap=needs_history_bootstrap, + thread_visibility=thread_visibility, + current_user_display_name=current_user_display_name, + disabled_tools=disabled_tools, + filesystem_selection=filesystem_selection, + request_id=request_id, + user_image_data_urls=user_image_data_urls, + ) + diff --git a/surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/resume.py b/surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/resume.py new file mode 100644 index 000000000..6d0caea4d --- /dev/null +++ b/surfsense_backend/app/tasks/chat/streaming/orchestration/streaming_context/resume.py @@ -0,0 +1,154 @@ +"""Build ``StreamingContext`` for resume streaming.""" + +from __future__ import annotations + +import logging +import time +from typing import Any + +from langgraph.types import Command + +from app.agents.multi_agent_chat import create_multi_agent_chat_deep_agent +from app.agents.new_chat.chat_deepagent import create_surfsense_deep_agent +from app.agents.new_chat.checkpointer import get_checkpointer +from app.agents.new_chat.context import SurfSenseContextSchema +from app.agents.new_chat.filesystem_selection import FilesystemSelection +from app.agents.new_chat.llm_config import ( + AgentConfig, + create_chat_litellm_from_agent_config, + create_chat_litellm_from_config, + load_agent_config, + load_global_llm_config_by_id, +) +from app.db import ChatVisibility, SearchSourceConnectorType, async_session_maker +from app.services.auto_model_pin_service import resolve_or_get_pinned_llm_config_id +from app.services.connector_service import ConnectorService +from app.services.new_streaming_service import VercelStreamingService +from app.tasks.chat.streaming.agent_setup import build_main_agent_for_thread +from app.tasks.chat.streaming.orchestration.input import StreamingContext + +logger = logging.getLogger(__name__) + + +async def build_resume_streaming_context( + *, + chat_id: int, + search_space_id: int, + decisions: list[dict], + user_id: str | None = None, + llm_config_id: int = -1, + thread_visibility: ChatVisibility | None = None, + filesystem_selection: FilesystemSelection | None = None, + request_id: str | None = None, + disabled_tools: list[str] | None = None, +) -> StreamingContext | None: + """Build context for ``stream_resume`` execution.""" + session = async_session_maker() + try: + llm_config_id = ( + await resolve_or_get_pinned_llm_config_id( + session, + thread_id=chat_id, + search_space_id=search_space_id, + user_id=user_id, + selected_llm_config_id=llm_config_id, + ) + ).resolved_llm_config_id + + llm: Any + agent_config: AgentConfig | None + if llm_config_id >= 0: + agent_config = await load_agent_config( + session=session, + config_id=llm_config_id, + search_space_id=search_space_id, + ) + if not agent_config: + logger.warning("resume context build failed: missing config %s", llm_config_id) + return None + llm = create_chat_litellm_from_agent_config(agent_config) + else: + loaded_llm_config = load_global_llm_config_by_id(llm_config_id) + if not loaded_llm_config: + logger.warning( + "resume context build failed: missing global config %s", + llm_config_id, + ) + return None + llm = create_chat_litellm_from_config(loaded_llm_config) + agent_config = AgentConfig.from_yaml_config(loaded_llm_config) + + connector_service = ConnectorService(session, search_space_id=search_space_id) + firecrawl_api_key = None + webcrawler_connector = await connector_service.get_connector_by_type( + SearchSourceConnectorType.WEBCRAWLER_CONNECTOR, + search_space_id, + ) + if webcrawler_connector and webcrawler_connector.config: + firecrawl_api_key = webcrawler_connector.config.get("FIRECRAWL_API_KEY") + + checkpointer = await get_checkpointer() + visibility = thread_visibility or ChatVisibility.PRIVATE + + from app.config import config as app_config + + agent_factory = ( + create_multi_agent_chat_deep_agent + if bool(app_config.MULTI_AGENT_CHAT_ENABLED) + else create_surfsense_deep_agent + ) + agent = await build_main_agent_for_thread( + agent_factory, + llm=llm, + search_space_id=search_space_id, + db_session=session, + connector_service=connector_service, + checkpointer=checkpointer, + user_id=user_id, + thread_id=chat_id, + agent_config=agent_config, + firecrawl_api_key=firecrawl_api_key, + thread_visibility=visibility, + filesystem_selection=filesystem_selection, + disabled_tools=disabled_tools, + ) + + turn_id = f"{chat_id}:{int(time.time() * 1000)}" + config = { + "configurable": { + "thread_id": str(chat_id), + "request_id": request_id or "unknown", + "turn_id": turn_id, + "surfsense_resume_value": {"decisions": decisions}, + }, + "recursion_limit": 10_000, + } + + runtime_context = SurfSenseContextSchema( + search_space_id=search_space_id, + request_id=request_id, + turn_id=turn_id, + ) + + await session.commit() + return StreamingContext( + agent=agent, + config=config, + input_data=Command(resume={"decisions": decisions}), + streaming_service=VercelStreamingService(), + step_prefix="thinking-resume", + initial_step_id=None, + initial_step_title="", + initial_step_items=None, + content_builder=None, + runtime_context=runtime_context, + ) + except Exception: + logger.exception( + "Failed to build resume streaming context (llm_config_id=%s)", + llm_config_id, + ) + return None + finally: + await session.close() + diff --git a/surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestrator_stream_chat.py b/surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestrator_stream_chat.py index b84193cb7..46c61b498 100644 --- a/surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestrator_stream_chat.py +++ b/surfsense_backend/tests/unit/tasks/chat/streaming/test_orchestrator_stream_chat.py @@ -8,6 +8,7 @@ from typing import Any import pytest from app.tasks.chat.streaming.orchestration import StreamingContext +from app.tasks.chat.streaming.orchestration import orchestrator from app.tasks.chat.streaming.orchestration.orchestrator import ( stream_chat, stream_regenerate, @@ -138,3 +139,102 @@ async def test_stream_regenerate_uses_streaming_context_path() -> None: "text_delta:text-1:g", "text_end:text-1", ] + + +async def test_stream_chat_builds_streaming_context_when_not_provided() -> None: + service = _StreamingService() + agent = _Agent([{"event": "on_chat_model_stream", "data": {"chunk": _Chunk("b")}}]) + + async def _fake_builder(**kwargs: Any) -> StreamingContext: + del kwargs + return StreamingContext( + agent=agent, + config={"configurable": {"thread_id": "thread-b"}}, + input_data={"messages": []}, + streaming_service=service, + ) + + old = orchestrator.build_chat_streaming_context + orchestrator.build_chat_streaming_context = _fake_builder + try: + frames = await _collect( + stream_chat( + user_query="q", + search_space_id=1, + chat_id=3, + ) + ) + finally: + orchestrator.build_chat_streaming_context = old + + assert frames == [ + "text_start:text-1", + "text_delta:text-1:b", + "text_end:text-1", + ] + + +async def test_stream_resume_builds_streaming_context_when_not_provided() -> None: + service = _StreamingService() + agent = _Agent([{"event": "on_chat_model_stream", "data": {"chunk": _Chunk("u")}}]) + + async def _fake_builder(**kwargs: Any) -> StreamingContext: + del kwargs + return StreamingContext( + agent=agent, + config={"configurable": {"thread_id": "thread-u"}}, + input_data={"messages": []}, + streaming_service=service, + ) + + old = orchestrator.build_resume_streaming_context + orchestrator.build_resume_streaming_context = _fake_builder + try: + frames = await _collect( + stream_resume( + chat_id=9, + search_space_id=1, + decisions=[], + ) + ) + finally: + orchestrator.build_resume_streaming_context = old + + assert frames == [ + "text_start:text-1", + "text_delta:text-1:u", + "text_end:text-1", + ] + + +async def test_stream_regenerate_builds_streaming_context_when_not_provided() -> None: + service = _StreamingService() + agent = _Agent([{"event": "on_chat_model_stream", "data": {"chunk": _Chunk("x")}}]) + + async def _fake_builder(**kwargs: Any) -> StreamingContext: + del kwargs + return StreamingContext( + agent=agent, + config={"configurable": {"thread_id": "thread-x"}}, + input_data={"messages": []}, + streaming_service=service, + ) + + old = orchestrator.build_regenerate_streaming_context + orchestrator.build_regenerate_streaming_context = _fake_builder + try: + frames = await _collect( + stream_regenerate( + user_query="q", + search_space_id=1, + chat_id=2, + ) + ) + finally: + orchestrator.build_regenerate_streaming_context = old + + assert frames == [ + "text_start:text-1", + "text_delta:text-1:x", + "text_end:text-1", + ]