From 9fa923051cca62be9f5101375c0dc953a592fc6c Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Tue, 10 Feb 2026 15:23:29 +0200 Subject: [PATCH 01/42] add stub create_notion_page tool --- .../new_chat/tools/create_notion_page.py | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 surfsense_backend/app/agents/new_chat/tools/create_notion_page.py diff --git a/surfsense_backend/app/agents/new_chat/tools/create_notion_page.py b/surfsense_backend/app/agents/new_chat/tools/create_notion_page.py new file mode 100644 index 000000000..bc3d2b81f --- /dev/null +++ b/surfsense_backend/app/agents/new_chat/tools/create_notion_page.py @@ -0,0 +1,29 @@ +from typing import Any + +from langchain_core.tools import tool + + +def create_create_notion_page_tool(): + @tool + async def create_notion_page( + title: str, + content: str, + ) -> dict[str, Any]: + """Create a new page in Notion with the given title and content. + + Use this tool when the user asks you to create, save, or publish + something to Notion. The page will be created in the user's + configured Notion workspace. + + Args: + title: The title of the Notion page. + content: The markdown content for the page body. + """ + return { + "status": "success", + "page_id": "stub-page-id-12345", + "title": title, + "url": "https://www.notion.so/stub-page-12345", + } + + return create_notion_page From c9542c86035287924f50f1e08e9cff0397e0f376 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Tue, 10 Feb 2026 15:27:47 +0200 Subject: [PATCH 02/42] register create_notion_page tool with interrupt_on --- .../app/agents/new_chat/chat_deepagent.py | 1 + .../app/agents/new_chat/tools/registry.py | 14 +++++++------- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/surfsense_backend/app/agents/new_chat/chat_deepagent.py b/surfsense_backend/app/agents/new_chat/chat_deepagent.py index d58a0fadb..525b18e7c 100644 --- a/surfsense_backend/app/agents/new_chat/chat_deepagent.py +++ b/surfsense_backend/app/agents/new_chat/chat_deepagent.py @@ -273,6 +273,7 @@ async def create_surfsense_deep_agent( system_prompt=system_prompt, context_schema=SurfSenseContextSchema, checkpointer=checkpointer, + interrupt_on={"create_notion_page": True}, ) return agent diff --git a/surfsense_backend/app/agents/new_chat/tools/registry.py b/surfsense_backend/app/agents/new_chat/tools/registry.py index 8092a6104..3716a19b6 100644 --- a/surfsense_backend/app/agents/new_chat/tools/registry.py +++ b/surfsense_backend/app/agents/new_chat/tools/registry.py @@ -45,6 +45,7 @@ from langchain_core.tools import BaseTool from app.db import ChatVisibility +from .create_notion_page import create_create_notion_page_tool from .display_image import create_display_image_tool from .generate_image import create_generate_image_tool from .knowledge_base import create_search_knowledge_base_tool @@ -202,13 +203,12 @@ BUILTIN_TOOLS: list[ToolDefinition] = [ # ========================================================================= # ADD YOUR CUSTOM TOOLS BELOW # ========================================================================= - # Example: - # ToolDefinition( - # name="my_custom_tool", - # description="What my tool does", - # factory=lambda deps: create_my_custom_tool(...), - # requires=["search_space_id"], - # ), + ToolDefinition( + name="create_notion_page", + description="Create a new page in the user's Notion workspace", + factory=lambda deps: create_create_notion_page_tool(), + requires=[], + ), ] From 9751918e41468cd30d4123cc32dcdea2f49346f8 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Tue, 10 Feb 2026 15:35:42 +0200 Subject: [PATCH 03/42] add interrupt detection, SSE event, and resume schemas --- surfsense_backend/app/schemas/new_chat.py | 12 +++++++++++- .../app/services/new_streaming_service.py | 12 ++++++++++++ surfsense_backend/app/tasks/chat/stream_new_chat.py | 11 +++++++++++ 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/surfsense_backend/app/schemas/new_chat.py b/surfsense_backend/app/schemas/new_chat.py index aa95e49e6..a7200903e 100644 --- a/surfsense_backend/app/schemas/new_chat.py +++ b/surfsense_backend/app/schemas/new_chat.py @@ -7,7 +7,7 @@ These schemas follow the assistant-ui ThreadHistoryAdapter pattern: """ from datetime import datetime -from typing import Any +from typing import Any, Literal from uuid import UUID from pydantic import BaseModel, ConfigDict, Field @@ -193,6 +193,16 @@ class RegenerateRequest(BaseModel): mentioned_surfsense_doc_ids: list[int] | None = None +class ResumeDecision(BaseModel): + type: Literal["approve", "edit", "reject"] + edited_action: dict[str, Any] | None = None + + +class ResumeRequest(BaseModel): + search_space_id: int + decisions: list[ResumeDecision] + + # ============================================================================= # Public Chat Snapshot Schemas # ============================================================================= diff --git a/surfsense_backend/app/services/new_streaming_service.py b/surfsense_backend/app/services/new_streaming_service.py index 57fbc9663..aa14dec7d 100644 --- a/surfsense_backend/app/services/new_streaming_service.py +++ b/surfsense_backend/app/services/new_streaming_service.py @@ -504,6 +504,18 @@ class VercelStreamingService: }, ) + def format_interrupt_request(self, interrupt_value: dict[str, Any]) -> str: + """Format an interrupt request for human-in-the-loop approval. + + Args: + interrupt_value: The interrupt payload from HumanInTheLoopMiddleware + containing action_requests and review_configs. + + Returns: + str: SSE formatted interrupt request data part + """ + return self.format_data("interrupt-request", interrupt_value) + # ========================================================================= # Error Part # ========================================================================= diff --git a/surfsense_backend/app/tasks/chat/stream_new_chat.py b/surfsense_backend/app/tasks/chat/stream_new_chat.py index 31e67c7ff..801d140be 100644 --- a/surfsense_backend/app/tasks/chat/stream_new_chat.py +++ b/surfsense_backend/app/tasks/chat/stream_new_chat.py @@ -1175,6 +1175,17 @@ async def stream_new_chat( if completion_event: yield completion_event + # Check if the graph was interrupted (human-in-the-loop) + state = await agent.aget_state(config) + is_interrupted = state.tasks and any(task.interrupts for task in state.tasks) + if is_interrupted: + interrupt_value = state.tasks[0].interrupts[0].value + yield streaming_service.format_interrupt_request(interrupt_value) + yield streaming_service.format_finish_step() + yield streaming_service.format_finish() + yield streaming_service.format_done() + return + # Generate LLM title for new chats after first response # Check if this is the first assistant response by counting existing assistant messages from sqlalchemy import func From 39ee4742d2cb91f2fbd7aca24b5c9ffd52287621 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Tue, 10 Feb 2026 15:43:52 +0200 Subject: [PATCH 04/42] add stream_resume_chat and POST /threads/{id}/resume endpoint --- .../app/routes/new_chat_routes.py | 78 ++- .../app/tasks/chat/stream_new_chat.py | 620 ++++++++++++++++++ 2 files changed, 697 insertions(+), 1 deletion(-) diff --git a/surfsense_backend/app/routes/new_chat_routes.py b/surfsense_backend/app/routes/new_chat_routes.py index ecb1c2a6f..53e6c8e09 100644 --- a/surfsense_backend/app/routes/new_chat_routes.py +++ b/surfsense_backend/app/routes/new_chat_routes.py @@ -43,11 +43,12 @@ from app.schemas.new_chat import ( PublicChatSnapshotCreateResponse, PublicChatSnapshotListResponse, RegenerateRequest, + ResumeRequest, ThreadHistoryLoadResponse, ThreadListItem, ThreadListResponse, ) -from app.tasks.chat.stream_new_chat import stream_new_chat +from app.tasks.chat.stream_new_chat import stream_new_chat, stream_resume_chat from app.users import current_active_user from app.utils.rbac import check_permission @@ -1326,3 +1327,78 @@ async def regenerate_response( status_code=500, detail=f"An unexpected error occurred during regeneration: {e!s}", ) from None + + +# ============================================================================= +# Resume Interrupted Chat Endpoint +# ============================================================================= + + +@router.post("/threads/{thread_id}/resume") +async def resume_chat( + thread_id: int, + request: ResumeRequest, + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user), +): + try: + result = await session.execute( + select(NewChatThread).filter(NewChatThread.id == thread_id) + ) + thread = result.scalars().first() + + if not thread: + raise HTTPException(status_code=404, detail="Thread not found") + + await check_permission( + session, + user, + thread.search_space_id, + Permission.CHATS_CREATE.value, + "You don't have permission to chat in this search space", + ) + + await check_thread_access(session, thread, user) + + search_space_result = await session.execute( + select(SearchSpace).filter(SearchSpace.id == request.search_space_id) + ) + search_space = search_space_result.scalars().first() + + if not search_space: + raise HTTPException(status_code=404, detail="Search space not found") + + llm_config_id = ( + search_space.agent_llm_id if search_space.agent_llm_id is not None else -1 + ) + + decisions = [d.model_dump() for d in request.decisions] + + return StreamingResponse( + stream_resume_chat( + chat_id=thread_id, + search_space_id=request.search_space_id, + decisions=decisions, + session=session, + user_id=str(user.id), + llm_config_id=llm_config_id, + thread_visibility=thread.visibility, + ), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + + except HTTPException: + raise + except Exception as e: + import traceback + + traceback.print_exc() + raise HTTPException( + status_code=500, + detail=f"An unexpected error occurred during resume: {e!s}", + ) from None diff --git a/surfsense_backend/app/tasks/chat/stream_new_chat.py b/surfsense_backend/app/tasks/chat/stream_new_chat.py index 801d140be..852793230 100644 --- a/surfsense_backend/app/tasks/chat/stream_new_chat.py +++ b/surfsense_backend/app/tasks/chat/stream_new_chat.py @@ -1268,3 +1268,623 @@ async def stream_new_chat( finally: # Clear AI responding state for live collaboration await clear_ai_responding(session, chat_id) + + +async def stream_resume_chat( + chat_id: int, + search_space_id: int, + decisions: list[dict], + session: AsyncSession, + user_id: str | None = None, + llm_config_id: int = -1, + thread_visibility: ChatVisibility | None = None, +) -> AsyncGenerator[str, None]: + streaming_service = VercelStreamingService() + current_text_id: str | None = None + + try: + if user_id: + await set_ai_responding(session, chat_id, UUID(user_id)) + + agent_config: AgentConfig | None = None + if llm_config_id >= 0: + agent_config = await load_agent_config( + session=session, + config_id=llm_config_id, + search_space_id=search_space_id, + ) + if not agent_config: + yield streaming_service.format_error( + f"Failed to load NewLLMConfig with id {llm_config_id}" + ) + yield streaming_service.format_done() + return + llm = create_chat_litellm_from_agent_config(agent_config) + else: + llm_config = load_llm_config_from_yaml(llm_config_id=llm_config_id) + if not llm_config: + yield streaming_service.format_error( + f"Failed to load LLM config with id {llm_config_id}" + ) + yield streaming_service.format_done() + return + llm = create_chat_litellm_from_config(llm_config) + agent_config = AgentConfig.from_yaml_config(llm_config) + + if not llm: + yield streaming_service.format_error("Failed to create LLM instance") + yield streaming_service.format_done() + return + + connector_service = ConnectorService(session, search_space_id=search_space_id) + + from app.db import SearchSourceConnectorType + + firecrawl_api_key = None + webcrawler_connector = await connector_service.get_connector_by_type( + SearchSourceConnectorType.WEBCRAWLER_CONNECTOR, search_space_id + ) + if webcrawler_connector and webcrawler_connector.config: + firecrawl_api_key = webcrawler_connector.config.get("FIRECRAWL_API_KEY") + + checkpointer = await get_checkpointer() + visibility = thread_visibility or ChatVisibility.PRIVATE + + agent = await create_surfsense_deep_agent( + llm=llm, + search_space_id=search_space_id, + db_session=session, + connector_service=connector_service, + checkpointer=checkpointer, + user_id=user_id, + thread_id=chat_id, + agent_config=agent_config, + firecrawl_api_key=firecrawl_api_key, + thread_visibility=visibility, + ) + + from langgraph.types import Command + + config = { + "configurable": {"thread_id": str(chat_id)}, + "recursion_limit": 80, + } + + yield streaming_service.format_message_start() + yield streaming_service.format_start_step() + + accumulated_text = "" + thinking_step_counter = 0 + tool_step_ids: dict[str, str] = {} + completed_step_ids: set[str] = set() + last_active_step_id: str | None = None + last_active_step_title = "" + last_active_step_items: list[str] = [] + just_finished_tool = False + + def next_thinking_step_id() -> str: + nonlocal thinking_step_counter + thinking_step_counter += 1 + return f"thinking-resume-{thinking_step_counter}" + + def complete_current_step() -> str | None: + nonlocal last_active_step_id + if last_active_step_id and last_active_step_id not in completed_step_ids: + completed_step_ids.add(last_active_step_id) + event = streaming_service.format_thinking_step( + step_id=last_active_step_id, + title=last_active_step_title, + status="completed", + items=last_active_step_items, + ) + last_active_step_id = None + return event + return None + + async for event in agent.astream_events( + Command(resume={"decisions": decisions}), config=config, version="v2" + ): + event_type = event.get("event", "") + + if event_type == "on_chat_model_stream": + chunk = event.get("data", {}).get("chunk") + if chunk and hasattr(chunk, "content"): + content = chunk.content + if content and isinstance(content, str): + if current_text_id is None: + completion_event = complete_current_step() + if completion_event: + yield completion_event + if just_finished_tool: + last_active_step_id = None + last_active_step_title = "" + last_active_step_items = [] + just_finished_tool = False + current_text_id = streaming_service.generate_text_id() + yield streaming_service.format_text_start(current_text_id) + yield streaming_service.format_text_delta( + current_text_id, content + ) + accumulated_text += content + + elif event_type == "on_tool_start": + tool_name = event.get("name", "unknown_tool") + run_id = event.get("run_id", "") + tool_input = event.get("data", {}).get("input", {}) + + if current_text_id is not None: + yield streaming_service.format_text_end(current_text_id) + current_text_id = None + + if last_active_step_title != "Synthesizing response": + completion_event = complete_current_step() + if completion_event: + yield completion_event + + just_finished_tool = False + tool_step_id = next_thinking_step_id() + tool_step_ids[run_id] = tool_step_id + last_active_step_id = tool_step_id + + if tool_name == "search_knowledge_base": + query = ( + tool_input.get("query", "") + if isinstance(tool_input, dict) + else str(tool_input) + ) + last_active_step_title = "Searching knowledge base" + last_active_step_items = [ + f"Query: {query[:100]}{'...' if len(query) > 100 else ''}" + ] + yield streaming_service.format_thinking_step( + step_id=tool_step_id, + title="Searching knowledge base", + status="in_progress", + items=last_active_step_items, + ) + elif tool_name == "link_preview": + url = ( + tool_input.get("url", "") + if isinstance(tool_input, dict) + else str(tool_input) + ) + last_active_step_title = "Fetching link preview" + last_active_step_items = [ + f"URL: {url[:80]}{'...' if len(url) > 80 else ''}" + ] + yield streaming_service.format_thinking_step( + step_id=tool_step_id, + title="Fetching link preview", + status="in_progress", + items=last_active_step_items, + ) + elif tool_name == "display_image": + src = ( + tool_input.get("src", "") + if isinstance(tool_input, dict) + else str(tool_input) + ) + title = ( + tool_input.get("title", "") + if isinstance(tool_input, dict) + else "" + ) + last_active_step_title = "Analyzing the image" + last_active_step_items = [ + f"Analyzing: {title[:50] if title else src[:50]}{'...' if len(title or src) > 50 else ''}" + ] + yield streaming_service.format_thinking_step( + step_id=tool_step_id, + title="Analyzing the image", + status="in_progress", + items=last_active_step_items, + ) + elif tool_name == "scrape_webpage": + url = ( + tool_input.get("url", "") + if isinstance(tool_input, dict) + else str(tool_input) + ) + last_active_step_title = "Scraping webpage" + last_active_step_items = [ + f"URL: {url[:80]}{'...' if len(url) > 80 else ''}" + ] + yield streaming_service.format_thinking_step( + step_id=tool_step_id, + title="Scraping webpage", + status="in_progress", + items=last_active_step_items, + ) + elif tool_name == "generate_podcast": + podcast_title = ( + tool_input.get("podcast_title", "SurfSense Podcast") + if isinstance(tool_input, dict) + else "SurfSense Podcast" + ) + content_len = len( + tool_input.get("source_content", "") + if isinstance(tool_input, dict) + else "" + ) + last_active_step_title = "Generating podcast" + last_active_step_items = [ + f"Title: {podcast_title}", + f"Content: {content_len:,} characters", + "Preparing audio generation...", + ] + yield streaming_service.format_thinking_step( + step_id=tool_step_id, + title="Generating podcast", + status="in_progress", + items=last_active_step_items, + ) + else: + last_active_step_title = f"Using {tool_name.replace('_', ' ')}" + last_active_step_items = [] + yield streaming_service.format_thinking_step( + step_id=tool_step_id, + title=last_active_step_title, + status="in_progress", + ) + + tool_call_id = ( + f"call_{run_id[:32]}" + if run_id + else streaming_service.generate_tool_call_id() + ) + yield streaming_service.format_tool_input_start(tool_call_id, tool_name) + yield streaming_service.format_tool_input_available( + tool_call_id, + tool_name, + tool_input + if isinstance(tool_input, dict) + else {"input": tool_input}, + ) + + elif event_type == "on_tool_end": + run_id = event.get("run_id", "") + tool_name = event.get("name", "unknown_tool") + raw_output = event.get("data", {}).get("output", "") + + if hasattr(raw_output, "content"): + content = raw_output.content + if isinstance(content, str): + try: + tool_output = json.loads(content) + except (json.JSONDecodeError, TypeError): + tool_output = {"result": content} + elif isinstance(content, dict): + tool_output = content + else: + tool_output = {"result": str(content)} + elif isinstance(raw_output, dict): + tool_output = raw_output + else: + tool_output = { + "result": str(raw_output) if raw_output else "completed" + } + + tool_call_id = f"call_{run_id[:32]}" if run_id else "call_unknown" + original_step_id = tool_step_ids.get( + run_id, f"thinking-unknown-{run_id[:8]}" + ) + completed_step_ids.add(original_step_id) + + if tool_name == "search_knowledge_base": + result_info = "Search completed" + if isinstance(tool_output, dict): + result_len = tool_output.get("result_length", 0) + if result_len > 0: + result_info = ( + f"Found relevant information ({result_len} chars)" + ) + completed_items = [*last_active_step_items, result_info] + yield streaming_service.format_thinking_step( + step_id=original_step_id, + title="Searching knowledge base", + status="completed", + items=completed_items, + ) + elif tool_name == "link_preview": + if isinstance(tool_output, dict): + title = tool_output.get("title", "Link") + domain = tool_output.get("domain", "") + has_error = "error" in tool_output + if has_error: + completed_items = [ + *last_active_step_items, + f"Error: {tool_output.get('error', 'Failed to fetch')}", + ] + else: + completed_items = [ + *last_active_step_items, + f"Title: {title[:60]}{'...' if len(title) > 60 else ''}", + f"Domain: {domain}" if domain else "Preview loaded", + ] + else: + completed_items = [*last_active_step_items, "Preview loaded"] + yield streaming_service.format_thinking_step( + step_id=original_step_id, + title="Fetching link preview", + status="completed", + items=completed_items, + ) + elif tool_name == "display_image": + if isinstance(tool_output, dict): + title = tool_output.get("title", "") + alt = tool_output.get("alt", "Image") + display_name = title or alt + completed_items = [ + *last_active_step_items, + f"Analyzed: {display_name[:50]}{'...' if len(display_name) > 50 else ''}", + ] + else: + completed_items = [*last_active_step_items, "Image analyzed"] + yield streaming_service.format_thinking_step( + step_id=original_step_id, + title="Analyzing the image", + status="completed", + items=completed_items, + ) + elif tool_name == "scrape_webpage": + if isinstance(tool_output, dict): + title = tool_output.get("title", "Webpage") + word_count = tool_output.get("word_count", 0) + has_error = "error" in tool_output + if has_error: + completed_items = [ + *last_active_step_items, + f"Error: {tool_output.get('error', 'Failed to scrape')[:50]}", + ] + else: + completed_items = [ + *last_active_step_items, + f"Title: {title[:50]}{'...' if len(title) > 50 else ''}", + f"Extracted: {word_count:,} words", + ] + else: + completed_items = [*last_active_step_items, "Content extracted"] + yield streaming_service.format_thinking_step( + step_id=original_step_id, + title="Scraping webpage", + status="completed", + items=completed_items, + ) + elif tool_name == "generate_podcast": + podcast_status = ( + tool_output.get("status", "unknown") + if isinstance(tool_output, dict) + else "unknown" + ) + podcast_title = ( + tool_output.get("title", "Podcast") + if isinstance(tool_output, dict) + else "Podcast" + ) + if podcast_status == "processing": + completed_items = [ + f"Title: {podcast_title}", + "Audio generation started", + "Processing in background...", + ] + elif podcast_status == "already_generating": + completed_items = [ + f"Title: {podcast_title}", + "Podcast already in progress", + "Please wait for it to complete", + ] + elif podcast_status == "error": + error_msg = ( + tool_output.get("error", "Unknown error") + if isinstance(tool_output, dict) + else "Unknown error" + ) + completed_items = [ + f"Title: {podcast_title}", + f"Error: {error_msg[:50]}", + ] + else: + completed_items = last_active_step_items + yield streaming_service.format_thinking_step( + step_id=original_step_id, + title="Generating podcast", + status="completed", + items=completed_items, + ) + elif tool_name == "ls": + if isinstance(tool_output, dict): + result = tool_output.get("result", "") + elif isinstance(tool_output, str): + result = tool_output + else: + result = str(tool_output) if tool_output else "" + file_names = [] + if result: + for line in result.strip().split("\n"): + line = line.strip() + if line: + name = line.rstrip("/").split("/")[-1] + if name and len(name) <= 40: + file_names.append(name) + elif name: + file_names.append(name[:37] + "...") + if file_names: + if len(file_names) <= 5: + completed_items = [f"[{name}]" for name in file_names] + else: + completed_items = [f"[{name}]" for name in file_names[:4]] + completed_items.append(f"(+{len(file_names) - 4} more)") + else: + completed_items = ["No files found"] + yield streaming_service.format_thinking_step( + step_id=original_step_id, + title="Exploring files", + status="completed", + items=completed_items, + ) + else: + yield streaming_service.format_thinking_step( + step_id=original_step_id, + title=f"Using {tool_name.replace('_', ' ')}", + status="completed", + items=last_active_step_items, + ) + + just_finished_tool = True + last_active_step_id = None + last_active_step_title = "" + last_active_step_items = [] + + if tool_name == "generate_podcast": + yield streaming_service.format_tool_output_available( + tool_call_id, + tool_output + if isinstance(tool_output, dict) + else {"result": tool_output}, + ) + if ( + isinstance(tool_output, dict) + and tool_output.get("status") == "success" + ): + yield streaming_service.format_terminal_info( + f"Podcast generated successfully: {tool_output.get('title', 'Podcast')}", + "success", + ) + else: + error_msg = ( + tool_output.get("error", "Unknown error") + if isinstance(tool_output, dict) + else "Unknown error" + ) + yield streaming_service.format_terminal_info( + f"Podcast generation failed: {error_msg}", + "error", + ) + elif tool_name == "link_preview": + yield streaming_service.format_tool_output_available( + tool_call_id, + tool_output + if isinstance(tool_output, dict) + else {"result": tool_output}, + ) + if isinstance(tool_output, dict) and "error" not in tool_output: + title = tool_output.get("title", "Link") + yield streaming_service.format_terminal_info( + f"Link preview loaded: {title[:50]}{'...' if len(title) > 50 else ''}", + "success", + ) + else: + error_msg = ( + tool_output.get("error", "Failed to fetch") + if isinstance(tool_output, dict) + else "Failed to fetch" + ) + yield streaming_service.format_terminal_info( + f"Link preview failed: {error_msg}", + "error", + ) + elif tool_name == "display_image": + yield streaming_service.format_tool_output_available( + tool_call_id, + tool_output + if isinstance(tool_output, dict) + else {"result": tool_output}, + ) + if isinstance(tool_output, dict): + title = tool_output.get("title") or tool_output.get( + "alt", "Image" + ) + yield streaming_service.format_terminal_info( + f"Image analyzed: {title[:40]}{'...' if len(title) > 40 else ''}", + "success", + ) + elif tool_name == "scrape_webpage": + if isinstance(tool_output, dict): + display_output = { + k: v for k, v in tool_output.items() if k != "content" + } + if "content" in tool_output: + content = tool_output.get("content", "") + display_output["content_preview"] = ( + content[:500] + "..." if len(content) > 500 else content + ) + yield streaming_service.format_tool_output_available( + tool_call_id, + display_output, + ) + else: + yield streaming_service.format_tool_output_available( + tool_call_id, + {"result": tool_output}, + ) + if isinstance(tool_output, dict) and "error" not in tool_output: + title = tool_output.get("title", "Webpage") + word_count = tool_output.get("word_count", 0) + yield streaming_service.format_terminal_info( + f"Scraped: {title[:40]}{'...' if len(title) > 40 else ''} ({word_count:,} words)", + "success", + ) + else: + error_msg = ( + tool_output.get("error", "Failed to scrape") + if isinstance(tool_output, dict) + else "Failed to scrape" + ) + yield streaming_service.format_terminal_info( + f"Scrape failed: {error_msg}", + "error", + ) + elif tool_name == "search_knowledge_base": + yield streaming_service.format_tool_output_available( + tool_call_id, + {"status": "completed", "result_length": len(str(tool_output))}, + ) + yield streaming_service.format_terminal_info( + "Knowledge base search completed", "success" + ) + else: + yield streaming_service.format_tool_output_available( + tool_call_id, + {"status": "completed", "result_length": len(str(tool_output))}, + ) + yield streaming_service.format_terminal_info( + f"Tool {tool_name} completed", "success" + ) + + elif event_type in ("on_chain_end", "on_agent_end"): + if current_text_id is not None: + yield streaming_service.format_text_end(current_text_id) + current_text_id = None + + if current_text_id is not None: + yield streaming_service.format_text_end(current_text_id) + + completion_event = complete_current_step() + if completion_event: + yield completion_event + + state = await agent.aget_state(config) + is_interrupted = state.tasks and any(task.interrupts for task in state.tasks) + if is_interrupted: + interrupt_value = state.tasks[0].interrupts[0].value + yield streaming_service.format_interrupt_request(interrupt_value) + + yield streaming_service.format_finish_step() + yield streaming_service.format_finish() + yield streaming_service.format_done() + + except Exception as e: + import traceback + + error_message = f"Error during resume: {e!s}" + print(f"[stream_resume_chat] {error_message}") + print(f"[stream_resume_chat] Traceback:\n{traceback.format_exc()}") + if current_text_id is not None: + yield streaming_service.format_text_end(current_text_id) + yield streaming_service.format_error(error_message) + yield streaming_service.format_finish_step() + yield streaming_service.format_finish() + yield streaming_service.format_done() + + finally: + await clear_ai_responding(session, chat_id) From 2343fecf977c6e0d26f7bc8074b023c79b7be16c Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Tue, 10 Feb 2026 18:05:01 +0200 Subject: [PATCH 05/42] add frontend HITL: approval UI, SSE handler, and resume flow - Create CreateNotionPageToolUI component with approval card (approve/reject) - Handle data-interrupt-request SSE event in page.tsx - Add handleResume callback to POST decisions and stream continuation - Skip message persistence for interrupted messages (handleResume persists) - Stamp __decided__ on tool results for immutable post-decision state - Sync pendingInterrupt ID after message persistence rename --- .../new-chat/[[...chat_id]]/page.tsx | 443 +++++++++++++++++- .../components/tool-ui/create-notion-page.tsx | 185 ++++++++ surfsense_web/components/tool-ui/index.ts | 1 + 3 files changed, 623 insertions(+), 6 deletions(-) create mode 100644 surfsense_web/components/tool-ui/create-notion-page.tsx diff --git a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx index bc5aca91e..2db00a03d 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx @@ -32,6 +32,7 @@ import { membersAtom } from "@/atoms/members/members-query.atoms"; import { currentUserAtom } from "@/atoms/user/user-query.atoms"; import { Thread } from "@/components/assistant-ui/thread"; import { ChatHeader } from "@/components/new-chat/chat-header"; +import { CreateNotionPageToolUI } from "@/components/tool-ui/create-notion-page"; import type { ThinkingStep } from "@/components/tool-ui/deepagent-thinking"; import { DisplayImageToolUI } from "@/components/tool-ui/display-image"; import { GeneratePodcastToolUI } from "@/components/tool-ui/generate-podcast"; @@ -120,6 +121,7 @@ const TOOLS_WITH_UI = new Set([ "link_preview", "display_image", "scrape_webpage", + "create_notion_page", // "write_todos", // Disabled for now ]); @@ -147,6 +149,11 @@ export default function NewChatPage() { new Map() ); const abortControllerRef = useRef(null); + const [pendingInterrupt, setPendingInterrupt] = useState<{ + threadId: number; + assistantMsgId: string; + interruptData: Record; + } | null>(null); // Get mentioned document IDs from the composer const mentionedDocumentIds = useAtomValue(mentionedDocumentIdsAtom); @@ -545,6 +552,7 @@ export default function NewChatPage() { result?: unknown; }; const contentParts: ContentPart[] = []; + let wasInterrupted = false; // Track the current text segment index (for appending text deltas) let currentTextPartIndex = -1; @@ -816,27 +824,69 @@ export default function NewChatPage() { String(titleData.threadId), ], }); + } + break; + } + + case "data-interrupt-request": { + wasInterrupted = true; + const interruptData = parsed.data as Record; + const actionRequests = (interruptData.action_requests ?? []) as Array<{ + name: string; + args: Record; + }>; + for (const action of actionRequests) { + const existingIdx = Array.from(toolCallIndices.entries()).find( + ([, idx]) => { + const part = contentParts[idx]; + return part?.type === "tool-call" && part.toolName === action.name; + } + ); + if (existingIdx) { + updateToolCall(existingIdx[0], { + result: { __interrupt__: true, ...interruptData }, + }); + } else { + const tcId = `interrupt-${action.name}`; + addToolCall(tcId, action.name, action.args); + updateToolCall(tcId, { + result: { __interrupt__: true, ...interruptData }, + }); + } + } + setMessages((prev) => + prev.map((m) => + m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + ) + ); + if (currentThreadId) { + setPendingInterrupt({ + threadId: currentThreadId, + assistantMsgId, + interruptData, + }); } break; } case "error": throw new Error(parsed.errorText || "Server error"); - } - } catch (e) { - if (e instanceof SyntaxError) continue; - throw e; } + } catch (e) { + if (e instanceof SyntaxError) continue; + throw e; } } } - } finally { + } + } finally { reader.releaseLock(); } // Persist assistant message (with thinking steps for restoration on refresh) + // Skip persistence for interrupted messages -- handleResume will persist the final version const finalContent = buildContentForPersistence(); - if (contentParts.length > 0) { + if (contentParts.length > 0 && !wasInterrupted) { try { const savedMessage = await appendMessage(currentThreadId, { role: "assistant", @@ -849,6 +899,13 @@ export default function NewChatPage() { prev.map((m) => (m.id === assistantMsgId ? { ...m, id: newMsgId } : m)) ); + // Update pending interrupt with the new persisted message ID + setPendingInterrupt((prev) => + prev && prev.assistantMsgId === assistantMsgId + ? { ...prev, assistantMsgId: newMsgId } + : prev + ); + // Also update thinking steps map with new ID setMessageThinkingSteps((prev) => { const steps = prev.get(assistantMsgId); @@ -941,6 +998,379 @@ export default function NewChatPage() { ] ); + const handleResume = useCallback( + async (decisions: Array<{ type: string; message?: string }>) => { + if (!pendingInterrupt) return; + const { threadId: resumeThreadId, assistantMsgId } = pendingInterrupt; + setPendingInterrupt(null); + setIsRunning(true); + + const token = getBearerToken(); + if (!token) { + toast.error("Not authenticated. Please log in again."); + setIsRunning(false); + return; + } + + const controller = new AbortController(); + abortControllerRef.current = controller; + + const currentThinkingSteps = new Map( + (messageThinkingSteps.get(assistantMsgId) ?? []).map((s) => [s.id, s]) + ); + + type ContentPart = + | { type: "text"; text: string } + | { + type: "tool-call"; + toolCallId: string; + toolName: string; + args: Record; + result?: unknown; + }; + const contentParts: ContentPart[] = []; + let currentTextPartIndex = -1; + const toolCallIndices = new Map(); + + const existingMsg = messages.find((m) => m.id === assistantMsgId); + if (existingMsg && Array.isArray(existingMsg.content)) { + for (const part of existingMsg.content) { + if (typeof part === "object" && part !== null) { + const p = part as Record; + if (p.type === "text") { + contentParts.push({ type: "text", text: String(p.text ?? "") }); + currentTextPartIndex = contentParts.length - 1; + } else if (p.type === "tool-call") { + toolCallIndices.set(String(p.toolCallId), contentParts.length); + contentParts.push({ + type: "tool-call", + toolCallId: String(p.toolCallId), + toolName: String(p.toolName), + args: (p.args as Record) ?? {}, + result: p.result as unknown, + }); + currentTextPartIndex = -1; + } + } + } + } + + const decisionType = decisions[0]?.type as "approve" | "reject" | undefined; + if (decisionType) { + for (const part of contentParts) { + if ( + part.type === "tool-call" && + typeof part.result === "object" && + part.result !== null && + "__interrupt__" in (part.result as Record) + ) { + part.result = { + ...(part.result as Record), + __decided__: decisionType, + }; + } + } + } + + const appendText = (delta: string) => { + if (currentTextPartIndex >= 0 && contentParts[currentTextPartIndex]?.type === "text") { + (contentParts[currentTextPartIndex] as { type: "text"; text: string }).text += delta; + } else { + contentParts.push({ type: "text", text: delta }); + currentTextPartIndex = contentParts.length - 1; + } + }; + + const addToolCall = (toolCallId: string, toolName: string, args: Record) => { + if (TOOLS_WITH_UI.has(toolName)) { + contentParts.push({ + type: "tool-call", + toolCallId, + toolName, + args, + }); + toolCallIndices.set(toolCallId, contentParts.length - 1); + currentTextPartIndex = -1; + } + }; + + const updateToolCall = ( + toolCallId: string, + update: { args?: Record; result?: unknown } + ) => { + const index = toolCallIndices.get(toolCallId); + if (index !== undefined && contentParts[index]?.type === "tool-call") { + const tc = contentParts[index] as ContentPart & { type: "tool-call" }; + if (update.args) tc.args = update.args; + if (update.result !== undefined) tc.result = update.result; + } + }; + + const buildContentForUI = (): ThreadMessageLike["content"] => { + const filtered = contentParts.filter((part) => { + if (part.type === "text") return part.text.length > 0; + if (part.type === "tool-call") return TOOLS_WITH_UI.has(part.toolName); + return false; + }); + return filtered.length > 0 + ? (filtered as ThreadMessageLike["content"]) + : [{ type: "text", text: "" }]; + }; + + const buildContentForPersistence = (): unknown[] => { + const parts: unknown[] = []; + if (currentThinkingSteps.size > 0) { + parts.push({ + type: "thinking-steps", + steps: Array.from(currentThinkingSteps.values()), + }); + } + for (const part of contentParts) { + if (part.type === "text" && part.text.length > 0) { + parts.push(part); + } else if (part.type === "tool-call" && TOOLS_WITH_UI.has(part.toolName)) { + parts.push(part); + } + } + return parts.length > 0 ? parts : [{ type: "text", text: "" }]; + }; + + try { + const backendUrl = process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL || "http://localhost:8000"; + const response = await fetch(`${backendUrl}/api/v1/threads/${resumeThreadId}/resume`, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${token}`, + }, + body: JSON.stringify({ + search_space_id: searchSpaceId, + decisions, + }), + signal: controller.signal, + }); + + if (!response.ok) { + throw new Error(`Backend error: ${response.status}`); + } + + if (!response.body) { + throw new Error("No response body"); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const events = buffer.split(/\r?\n\r?\n/); + buffer = events.pop() || ""; + + for (const event of events) { + const lines = event.split(/\r?\n/); + for (const line of lines) { + if (!line.startsWith("data: ")) continue; + const data = line.slice(6).trim(); + if (!data || data === "[DONE]") continue; + + try { + const parsed = JSON.parse(data); + + switch (parsed.type) { + case "text-delta": + appendText(parsed.delta); + setMessages((prev) => + prev.map((m) => + m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + ) + ); + break; + + case "tool-input-start": + addToolCall(parsed.toolCallId, parsed.toolName, {}); + setMessages((prev) => + prev.map((m) => + m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + ) + ); + break; + + case "tool-input-available": + if (toolCallIndices.has(parsed.toolCallId)) { + updateToolCall(parsed.toolCallId, { + args: parsed.input || {}, + }); + } else { + addToolCall(parsed.toolCallId, parsed.toolName, parsed.input || {}); + } + setMessages((prev) => + prev.map((m) => + m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + ) + ); + break; + + case "tool-output-available": + updateToolCall(parsed.toolCallId, { + result: parsed.output, + }); + setMessages((prev) => + prev.map((m) => + m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + ) + ); + break; + + case "data-thinking-step": { + const stepData = parsed.data as ThinkingStepData; + if (stepData?.id) { + currentThinkingSteps.set(stepData.id, stepData); + setMessageThinkingSteps((prev) => { + const newMap = new Map(prev); + newMap.set(assistantMsgId, Array.from(currentThinkingSteps.values())); + return newMap; + }); + } + break; + } + + case "data-interrupt-request": { + const interruptData = parsed.data as Record; + const actionRequests = (interruptData.action_requests ?? []) as Array<{ + name: string; + args: Record; + }>; + for (const action of actionRequests) { + const existingIdx = Array.from(toolCallIndices.entries()).find( + ([, idx]) => { + const part = contentParts[idx]; + return part?.type === "tool-call" && part.toolName === action.name; + } + ); + if (existingIdx) { + updateToolCall(existingIdx[0], { + result: { + __interrupt__: true, + ...interruptData, + }, + }); + } else { + const tcId = `interrupt-${action.name}`; + addToolCall(tcId, action.name, action.args); + updateToolCall(tcId, { + result: { + __interrupt__: true, + ...interruptData, + }, + }); + } + } + setMessages((prev) => + prev.map((m) => + m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + ) + ); + setPendingInterrupt({ + threadId: resumeThreadId, + assistantMsgId, + interruptData, + }); + break; + } + + case "error": + throw new Error(parsed.errorText || "Server error"); + } + } catch (e) { + if (e instanceof SyntaxError) continue; + throw e; + } + } + } + } + } finally { + reader.releaseLock(); + } + + const finalContent = buildContentForPersistence(); + if (contentParts.length > 0) { + try { + const savedMessage = await appendMessage(resumeThreadId, { + role: "assistant", + content: finalContent, + }); + const newMsgId = `msg-${savedMessage.id}`; + setMessages((prev) => + prev.map((m) => (m.id === assistantMsgId ? { ...m, id: newMsgId } : m)) + ); + setMessageThinkingSteps((prev) => { + const steps = prev.get(assistantMsgId); + if (steps) { + const newMap = new Map(prev); + newMap.delete(assistantMsgId); + newMap.set(newMsgId, steps); + return newMap; + } + return prev; + }); + } catch (err) { + console.error("Failed to persist resumed assistant message:", err); + } + } + } catch (error) { + if (error instanceof Error && error.name === "AbortError") { + return; + } + console.error("[NewChatPage] Resume error:", error); + toast.error("Failed to resume. Please try again."); + } finally { + setIsRunning(false); + abortControllerRef.current = null; + } + }, + [pendingInterrupt, messages, searchSpaceId, messageThinkingSteps] + ); + + useEffect(() => { + const handler = (e: Event) => { + const detail = (e as CustomEvent).detail as { + decisions: Array<{ type: string; message?: string }>; + }; + if (detail?.decisions && pendingInterrupt) { + const decisionType = detail.decisions[0]?.type as "approve" | "reject"; + setMessages((prev) => + prev.map((m) => { + if (m.id !== pendingInterrupt.assistantMsgId) return m; + const parts = m.content as unknown as Array>; + const newContent = parts.map((part) => { + if ( + part.type === "tool-call" && + typeof part.result === "object" && + part.result !== null && + "__interrupt__" in part.result + ) { + return { + ...part, + result: { ...(part.result as Record), __decided__: decisionType }, + }; + } + return part; + }); + return { ...m, content: newContent as unknown as ThreadMessageLike["content"] }; + }) + ); + handleResume(detail.decisions); + } + }; + window.addEventListener("hitl-decision", handler); + return () => window.removeEventListener("hitl-decision", handler); + }, [handleResume, pendingInterrupt]); + // Convert message (pass through since already in correct format) const convertMessage = useCallback( (message: ThreadMessageLike): ThreadMessageLike => message, @@ -1432,6 +1862,7 @@ export default function NewChatPage() { + {/* Disabled for now */}
; + description?: string; + }>; + review_configs: Array<{ + action_name: string; + allowed_decisions: Array<"approve" | "edit" | "reject">; + }>; +} + +interface SuccessResult { + status: string; + page_id: string; + title: string; + url: string; +} + +type CreateNotionPageResult = InterruptResult | SuccessResult; + +function isInterruptResult(result: unknown): result is InterruptResult { + return ( + typeof result === "object" && + result !== null && + "__interrupt__" in result && + (result as InterruptResult).__interrupt__ === true + ); +} + +function ApprovalCard({ + args, + interruptData, + onDecision, +}: { + args: Record; + interruptData: InterruptResult; + onDecision: (decision: { type: "approve" | "reject"; message?: string }) => void; +}) { + const [decided, setDecided] = useState<"approve" | "reject" | null>( + interruptData.__decided__ ?? null + ); + const reviewConfig = interruptData.review_configs[0]; + const allowedDecisions = reviewConfig?.allowed_decisions ?? ["approve", "reject"]; + + return ( +
+
+
+ +
+
+

Create Notion Page

+

+ Requires your approval to proceed +

+
+
+ +
+ {args.title != null && ( +
+

Title

+

{String(args.title)}

+
+ )} + {args.content != null && ( +
+

Content

+

{String(args.content)}

+
+ )} +
+ +
+ {decided ? ( +

+ {decided === "approve" ? ( + <> + + Approved + + ) : ( + <> + + Rejected + + )} +

+ ) : ( + <> + {allowedDecisions.includes("approve") && ( + + )} + {allowedDecisions.includes("reject") && ( + + )} + + )} +
+
+ ); +} + +function SuccessCard({ result }: { result: SuccessResult }) { + return ( +
+
+
+ +
+
+

{result.title}

+

Notion page created

+
+
+
+ ); +} + +export const CreateNotionPageToolUI = makeAssistantToolUI< + { title: string; content: string }, + CreateNotionPageResult +>({ + toolName: "create_notion_page", + render: function CreateNotionPageUI({ args, result, status }) { + if (status.type === "running") { + return ( +
+ +

Preparing Notion page...

+
+ ); + } + + if (!result) { + return null; + } + + if (isInterruptResult(result)) { + return ( + { + const event = new CustomEvent("hitl-decision", { + detail: { decisions: [decision] }, + }); + window.dispatchEvent(event); + }} + /> + ); + } + + return ; + }, +}); diff --git a/surfsense_web/components/tool-ui/index.ts b/surfsense_web/components/tool-ui/index.ts index 5b4ea0a34..56e5c975b 100644 --- a/surfsense_web/components/tool-ui/index.ts +++ b/surfsense_web/components/tool-ui/index.ts @@ -16,6 +16,7 @@ export { type SerializableArticle, } from "./article"; export { Audio } from "./audio"; +export { CreateNotionPageToolUI } from "./create-notion-page"; export { type DeepAgentThinkingArgs, type DeepAgentThinkingResult, From 8f81c9859dcebbd2b1f7d8da06bb06df0043e8d5 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Tue, 10 Feb 2026 19:47:10 +0200 Subject: [PATCH 06/42] deduplicate backend streaming loop into shared _stream_agent_events helper --- .../app/tasks/chat/stream_new_chat.py | 1852 ++++++----------- 1 file changed, 582 insertions(+), 1270 deletions(-) diff --git a/surfsense_backend/app/tasks/chat/stream_new_chat.py b/surfsense_backend/app/tasks/chat/stream_new_chat.py index 852793230..2cd05e80c 100644 --- a/surfsense_backend/app/tasks/chat/stream_new_chat.py +++ b/surfsense_backend/app/tasks/chat/stream_new_chat.py @@ -11,6 +11,8 @@ Supports loading LLM configurations from: import json from collections.abc import AsyncGenerator +from dataclasses import dataclass +from typing import Any from uuid import UUID from langchain_core.messages import HumanMessage @@ -178,6 +180,550 @@ def extract_todos_from_deepagents(command_output) -> dict: return {"todos": todos_data} +@dataclass +class StreamResult: + accumulated_text: str = "" + is_interrupted: bool = False + interrupt_value: dict[str, Any] | None = None + + +async def _stream_agent_events( + agent: Any, + config: dict[str, Any], + input_data: Any, + streaming_service: VercelStreamingService, + result: StreamResult, + step_prefix: str = "thinking", + initial_step_id: str | None = None, + initial_step_title: str = "", + initial_step_items: list[str] | None = None, +) -> AsyncGenerator[str, None]: + """Shared async generator that streams and formats astream_events from the agent. + + Yields SSE-formatted strings. After exhausting, inspect the ``result`` + object for accumulated_text and interrupt state. + + Args: + agent: The compiled LangGraph agent. + config: LangGraph config dict (must include configurable.thread_id). + input_data: The input to pass to agent.astream_events (dict or Command). + streaming_service: VercelStreamingService instance for formatting events. + result: Mutable StreamResult populated with accumulated_text / interrupt info. + step_prefix: Prefix for thinking step IDs (e.g. "thinking" or "thinking-resume"). + initial_step_id: If set, the helper inherits an already-active thinking step. + initial_step_title: Title of the inherited thinking step. + initial_step_items: Items of the inherited thinking step. + + Yields: + SSE-formatted strings for each event. + """ + accumulated_text = "" + current_text_id: str | None = None + thinking_step_counter = 1 if initial_step_id else 0 + tool_step_ids: dict[str, str] = {} + completed_step_ids: set[str] = set() + last_active_step_id: str | None = initial_step_id + last_active_step_title: str = initial_step_title + last_active_step_items: list[str] = initial_step_items or [] + just_finished_tool: bool = False + + def next_thinking_step_id() -> str: + nonlocal thinking_step_counter + thinking_step_counter += 1 + return f"{step_prefix}-{thinking_step_counter}" + + def complete_current_step() -> str | None: + nonlocal last_active_step_id + if last_active_step_id and last_active_step_id not in completed_step_ids: + completed_step_ids.add(last_active_step_id) + event = streaming_service.format_thinking_step( + step_id=last_active_step_id, + title=last_active_step_title, + status="completed", + items=last_active_step_items if last_active_step_items else None, + ) + last_active_step_id = None + return event + return None + + async for event in agent.astream_events(input_data, config=config, version="v2"): + event_type = event.get("event", "") + + if event_type == "on_chat_model_stream": + chunk = event.get("data", {}).get("chunk") + if chunk and hasattr(chunk, "content"): + content = chunk.content + if content and isinstance(content, str): + if current_text_id is None: + completion_event = complete_current_step() + if completion_event: + yield completion_event + if just_finished_tool: + last_active_step_id = None + last_active_step_title = "" + last_active_step_items = [] + just_finished_tool = False + current_text_id = streaming_service.generate_text_id() + yield streaming_service.format_text_start(current_text_id) + yield streaming_service.format_text_delta(current_text_id, content) + accumulated_text += content + + elif event_type == "on_tool_start": + tool_name = event.get("name", "unknown_tool") + run_id = event.get("run_id", "") + tool_input = event.get("data", {}).get("input", {}) + + if current_text_id is not None: + yield streaming_service.format_text_end(current_text_id) + current_text_id = None + + if last_active_step_title != "Synthesizing response": + completion_event = complete_current_step() + if completion_event: + yield completion_event + + just_finished_tool = False + tool_step_id = next_thinking_step_id() + tool_step_ids[run_id] = tool_step_id + last_active_step_id = tool_step_id + + if tool_name == "search_knowledge_base": + query = ( + tool_input.get("query", "") + if isinstance(tool_input, dict) + else str(tool_input) + ) + last_active_step_title = "Searching knowledge base" + last_active_step_items = [ + f"Query: {query[:100]}{'...' if len(query) > 100 else ''}" + ] + yield streaming_service.format_thinking_step( + step_id=tool_step_id, + title="Searching knowledge base", + status="in_progress", + items=last_active_step_items, + ) + elif tool_name == "link_preview": + url = ( + tool_input.get("url", "") + if isinstance(tool_input, dict) + else str(tool_input) + ) + last_active_step_title = "Fetching link preview" + last_active_step_items = [ + f"URL: {url[:80]}{'...' if len(url) > 80 else ''}" + ] + yield streaming_service.format_thinking_step( + step_id=tool_step_id, + title="Fetching link preview", + status="in_progress", + items=last_active_step_items, + ) + elif tool_name == "display_image": + src = ( + tool_input.get("src", "") + if isinstance(tool_input, dict) + else str(tool_input) + ) + title = ( + tool_input.get("title", "") if isinstance(tool_input, dict) else "" + ) + last_active_step_title = "Analyzing the image" + last_active_step_items = [ + f"Analyzing: {title[:50] if title else src[:50]}{'...' if len(title or src) > 50 else ''}" + ] + yield streaming_service.format_thinking_step( + step_id=tool_step_id, + title="Analyzing the image", + status="in_progress", + items=last_active_step_items, + ) + elif tool_name == "scrape_webpage": + url = ( + tool_input.get("url", "") + if isinstance(tool_input, dict) + else str(tool_input) + ) + last_active_step_title = "Scraping webpage" + last_active_step_items = [ + f"URL: {url[:80]}{'...' if len(url) > 80 else ''}" + ] + yield streaming_service.format_thinking_step( + step_id=tool_step_id, + title="Scraping webpage", + status="in_progress", + items=last_active_step_items, + ) + elif tool_name == "generate_podcast": + podcast_title = ( + tool_input.get("podcast_title", "SurfSense Podcast") + if isinstance(tool_input, dict) + else "SurfSense Podcast" + ) + content_len = len( + tool_input.get("source_content", "") + if isinstance(tool_input, dict) + else "" + ) + last_active_step_title = "Generating podcast" + last_active_step_items = [ + f"Title: {podcast_title}", + f"Content: {content_len:,} characters", + "Preparing audio generation...", + ] + yield streaming_service.format_thinking_step( + step_id=tool_step_id, + title="Generating podcast", + status="in_progress", + items=last_active_step_items, + ) + else: + last_active_step_title = f"Using {tool_name.replace('_', ' ')}" + last_active_step_items = [] + yield streaming_service.format_thinking_step( + step_id=tool_step_id, + title=last_active_step_title, + status="in_progress", + ) + + tool_call_id = ( + f"call_{run_id[:32]}" + if run_id + else streaming_service.generate_tool_call_id() + ) + yield streaming_service.format_tool_input_start(tool_call_id, tool_name) + yield streaming_service.format_tool_input_available( + tool_call_id, + tool_name, + tool_input if isinstance(tool_input, dict) else {"input": tool_input}, + ) + + elif event_type == "on_tool_end": + run_id = event.get("run_id", "") + tool_name = event.get("name", "unknown_tool") + raw_output = event.get("data", {}).get("output", "") + + if hasattr(raw_output, "content"): + content = raw_output.content + if isinstance(content, str): + try: + tool_output = json.loads(content) + except (json.JSONDecodeError, TypeError): + tool_output = {"result": content} + elif isinstance(content, dict): + tool_output = content + else: + tool_output = {"result": str(content)} + elif isinstance(raw_output, dict): + tool_output = raw_output + else: + tool_output = {"result": str(raw_output) if raw_output else "completed"} + + tool_call_id = f"call_{run_id[:32]}" if run_id else "call_unknown" + original_step_id = tool_step_ids.get( + run_id, f"{step_prefix}-unknown-{run_id[:8]}" + ) + completed_step_ids.add(original_step_id) + + if tool_name == "search_knowledge_base": + result_info = "Search completed" + if isinstance(tool_output, dict): + result_len = tool_output.get("result_length", 0) + if result_len > 0: + result_info = f"Found relevant information ({result_len} chars)" + completed_items = [*last_active_step_items, result_info] + yield streaming_service.format_thinking_step( + step_id=original_step_id, + title="Searching knowledge base", + status="completed", + items=completed_items, + ) + elif tool_name == "link_preview": + if isinstance(tool_output, dict): + title = tool_output.get("title", "Link") + domain = tool_output.get("domain", "") + has_error = "error" in tool_output + if has_error: + completed_items = [ + *last_active_step_items, + f"Error: {tool_output.get('error', 'Failed to fetch')}", + ] + else: + completed_items = [ + *last_active_step_items, + f"Title: {title[:60]}{'...' if len(title) > 60 else ''}", + f"Domain: {domain}" if domain else "Preview loaded", + ] + else: + completed_items = [*last_active_step_items, "Preview loaded"] + yield streaming_service.format_thinking_step( + step_id=original_step_id, + title="Fetching link preview", + status="completed", + items=completed_items, + ) + elif tool_name == "display_image": + if isinstance(tool_output, dict): + title = tool_output.get("title", "") + alt = tool_output.get("alt", "Image") + display_name = title or alt + completed_items = [ + *last_active_step_items, + f"Analyzed: {display_name[:50]}{'...' if len(display_name) > 50 else ''}", + ] + else: + completed_items = [*last_active_step_items, "Image analyzed"] + yield streaming_service.format_thinking_step( + step_id=original_step_id, + title="Analyzing the image", + status="completed", + items=completed_items, + ) + elif tool_name == "scrape_webpage": + if isinstance(tool_output, dict): + title = tool_output.get("title", "Webpage") + word_count = tool_output.get("word_count", 0) + has_error = "error" in tool_output + if has_error: + completed_items = [ + *last_active_step_items, + f"Error: {tool_output.get('error', 'Failed to scrape')[:50]}", + ] + else: + completed_items = [ + *last_active_step_items, + f"Title: {title[:50]}{'...' if len(title) > 50 else ''}", + f"Extracted: {word_count:,} words", + ] + else: + completed_items = [*last_active_step_items, "Content extracted"] + yield streaming_service.format_thinking_step( + step_id=original_step_id, + title="Scraping webpage", + status="completed", + items=completed_items, + ) + elif tool_name == "generate_podcast": + podcast_status = ( + tool_output.get("status", "unknown") + if isinstance(tool_output, dict) + else "unknown" + ) + podcast_title = ( + tool_output.get("title", "Podcast") + if isinstance(tool_output, dict) + else "Podcast" + ) + if podcast_status == "processing": + completed_items = [ + f"Title: {podcast_title}", + "Audio generation started", + "Processing in background...", + ] + elif podcast_status == "already_generating": + completed_items = [ + f"Title: {podcast_title}", + "Podcast already in progress", + "Please wait for it to complete", + ] + elif podcast_status == "error": + error_msg = ( + tool_output.get("error", "Unknown error") + if isinstance(tool_output, dict) + else "Unknown error" + ) + completed_items = [ + f"Title: {podcast_title}", + f"Error: {error_msg[:50]}", + ] + else: + completed_items = last_active_step_items + yield streaming_service.format_thinking_step( + step_id=original_step_id, + title="Generating podcast", + status="completed", + items=completed_items, + ) + elif tool_name == "ls": + if isinstance(tool_output, dict): + ls_output = tool_output.get("result", "") + elif isinstance(tool_output, str): + ls_output = tool_output + else: + ls_output = str(tool_output) if tool_output else "" + file_names: list[str] = [] + if ls_output: + for line in ls_output.strip().split("\n"): + line = line.strip() + if line: + name = line.rstrip("/").split("/")[-1] + if name and len(name) <= 40: + file_names.append(name) + elif name: + file_names.append(name[:37] + "...") + if file_names: + if len(file_names) <= 5: + completed_items = [f"[{name}]" for name in file_names] + else: + completed_items = [f"[{name}]" for name in file_names[:4]] + completed_items.append(f"(+{len(file_names) - 4} more)") + else: + completed_items = ["No files found"] + yield streaming_service.format_thinking_step( + step_id=original_step_id, + title="Exploring files", + status="completed", + items=completed_items, + ) + else: + yield streaming_service.format_thinking_step( + step_id=original_step_id, + title=f"Using {tool_name.replace('_', ' ')}", + status="completed", + items=last_active_step_items, + ) + + just_finished_tool = True + last_active_step_id = None + last_active_step_title = "" + last_active_step_items = [] + + if tool_name == "generate_podcast": + yield streaming_service.format_tool_output_available( + tool_call_id, + tool_output + if isinstance(tool_output, dict) + else {"result": tool_output}, + ) + if ( + isinstance(tool_output, dict) + and tool_output.get("status") == "success" + ): + yield streaming_service.format_terminal_info( + f"Podcast generated successfully: {tool_output.get('title', 'Podcast')}", + "success", + ) + else: + error_msg = ( + tool_output.get("error", "Unknown error") + if isinstance(tool_output, dict) + else "Unknown error" + ) + yield streaming_service.format_terminal_info( + f"Podcast generation failed: {error_msg}", + "error", + ) + elif tool_name == "link_preview": + yield streaming_service.format_tool_output_available( + tool_call_id, + tool_output + if isinstance(tool_output, dict) + else {"result": tool_output}, + ) + if isinstance(tool_output, dict) and "error" not in tool_output: + title = tool_output.get("title", "Link") + yield streaming_service.format_terminal_info( + f"Link preview loaded: {title[:50]}{'...' if len(title) > 50 else ''}", + "success", + ) + else: + error_msg = ( + tool_output.get("error", "Failed to fetch") + if isinstance(tool_output, dict) + else "Failed to fetch" + ) + yield streaming_service.format_terminal_info( + f"Link preview failed: {error_msg}", + "error", + ) + elif tool_name == "display_image": + yield streaming_service.format_tool_output_available( + tool_call_id, + tool_output + if isinstance(tool_output, dict) + else {"result": tool_output}, + ) + if isinstance(tool_output, dict): + title = tool_output.get("title") or tool_output.get("alt", "Image") + yield streaming_service.format_terminal_info( + f"Image analyzed: {title[:40]}{'...' if len(title) > 40 else ''}", + "success", + ) + elif tool_name == "scrape_webpage": + if isinstance(tool_output, dict): + display_output = { + k: v for k, v in tool_output.items() if k != "content" + } + if "content" in tool_output: + content = tool_output.get("content", "") + display_output["content_preview"] = ( + content[:500] + "..." if len(content) > 500 else content + ) + yield streaming_service.format_tool_output_available( + tool_call_id, + display_output, + ) + else: + yield streaming_service.format_tool_output_available( + tool_call_id, + {"result": tool_output}, + ) + if isinstance(tool_output, dict) and "error" not in tool_output: + title = tool_output.get("title", "Webpage") + word_count = tool_output.get("word_count", 0) + yield streaming_service.format_terminal_info( + f"Scraped: {title[:40]}{'...' if len(title) > 40 else ''} ({word_count:,} words)", + "success", + ) + else: + error_msg = ( + tool_output.get("error", "Failed to scrape") + if isinstance(tool_output, dict) + else "Failed to scrape" + ) + yield streaming_service.format_terminal_info( + f"Scrape failed: {error_msg}", + "error", + ) + elif tool_name == "search_knowledge_base": + yield streaming_service.format_tool_output_available( + tool_call_id, + {"status": "completed", "result_length": len(str(tool_output))}, + ) + yield streaming_service.format_terminal_info( + "Knowledge base search completed", "success" + ) + else: + yield streaming_service.format_tool_output_available( + tool_call_id, + {"status": "completed", "result_length": len(str(tool_output))}, + ) + yield streaming_service.format_terminal_info( + f"Tool {tool_name} completed", "success" + ) + + elif event_type in ("on_chain_end", "on_agent_end"): + if current_text_id is not None: + yield streaming_service.format_text_end(current_text_id) + current_text_id = None + + if current_text_id is not None: + yield streaming_service.format_text_end(current_text_id) + + completion_event = complete_current_step() + if completion_event: + yield completion_event + + result.accumulated_text = accumulated_text + + state = await agent.aget_state(config) + is_interrupted = state.tasks and any(task.interrupts for task in state.tasks) + if is_interrupted: + result.is_interrupted = True + result.interrupt_value = state.tasks[0].interrupts[0].value + yield streaming_service.format_interrupt_request(result.interrupt_value) + + async def stream_new_chat( user_query: str, search_space_id: int, @@ -215,9 +761,6 @@ async def stream_new_chat( """ streaming_service = VercelStreamingService() - # Track the current text block for streaming (defined early for exception handling) - current_text_id: str | None = None - try: # Mark AI as responding to this user for live collaboration if user_id: @@ -394,63 +937,18 @@ async def stream_new_chat( yield streaming_service.format_message_start() yield streaming_service.format_start_step() - # Reset text tracking for this stream - accumulated_text = "" - - # Track thinking steps for chain-of-thought display - thinking_step_counter = 0 - # Map run_id -> step_id for tool calls so we can update them on completion - tool_step_ids: dict[str, str] = {} - # Track the last active step so we can mark it complete at the end - last_active_step_id: str | None = None - last_active_step_title: str = "" - last_active_step_items: list[str] = [] - # Track which steps have been completed to avoid duplicate completions - completed_step_ids: set[str] = set() - # Track if we just finished a tool (text flows silently after tools) - just_finished_tool: bool = False - # Track write_todos calls to show "Creating plan" vs "Updating plan" - # Disabled for now - # write_todos_call_count: int = 0 - - def next_thinking_step_id() -> str: - nonlocal thinking_step_counter - thinking_step_counter += 1 - return f"thinking-{thinking_step_counter}" - - def complete_current_step() -> str | None: - """Complete the current active step and return the completion event, if any.""" - nonlocal last_active_step_id, last_active_step_title, last_active_step_items - if last_active_step_id and last_active_step_id not in completed_step_ids: - completed_step_ids.add(last_active_step_id) - return streaming_service.format_thinking_step( - step_id=last_active_step_id, - title=last_active_step_title, - status="completed", - items=last_active_step_items if last_active_step_items else None, - ) - return None - # Initial thinking step - analyzing the request - analyze_step_id = next_thinking_step_id() - last_active_step_id = analyze_step_id - - # Determine step title and action verb based on context if mentioned_documents or mentioned_surfsense_docs: - last_active_step_title = "Analyzing referenced content" + initial_title = "Analyzing referenced content" action_verb = "Analyzing" else: - last_active_step_title = "Understanding your request" + initial_title = "Understanding your request" action_verb = "Processing" - # Build the message with inline context about referenced documents processing_parts = [] - - # Add the user query query_text = user_query[:80] + ("..." if len(user_query) > 80 else "") processing_parts.append(query_text) - # Add mentioned document names inline if mentioned_documents: doc_names = [] for doc in mentioned_documents: @@ -463,7 +961,6 @@ async def stream_new_chat( else: processing_parts.append(f"[{len(doc_names)} documents]") - # Add mentioned SurfSense docs inline if mentioned_surfsense_docs: doc_names = [] for doc in mentioned_surfsense_docs: @@ -476,716 +973,38 @@ async def stream_new_chat( else: processing_parts.append(f"[{len(doc_names)} docs]") - last_active_step_items = [f"{action_verb}: {' '.join(processing_parts)}"] + initial_items = [f"{action_verb}: {' '.join(processing_parts)}"] + initial_step_id = "thinking-1" yield streaming_service.format_thinking_step( - step_id=analyze_step_id, - title=last_active_step_title, + step_id=initial_step_id, + title=initial_title, status="in_progress", - items=last_active_step_items, + items=initial_items, ) - # Stream the agent response with thread config for memory - async for event in agent.astream_events( - input_state, config=config, version="v2" + stream_result = StreamResult() + async for sse in _stream_agent_events( + agent=agent, + config=config, + input_data=input_state, + streaming_service=streaming_service, + result=stream_result, + step_prefix="thinking", + initial_step_id=initial_step_id, + initial_step_title=initial_title, + initial_step_items=initial_items, ): - event_type = event.get("event", "") + yield sse - # Handle chat model stream events (text streaming) - if event_type == "on_chat_model_stream": - chunk = event.get("data", {}).get("chunk") - if chunk and hasattr(chunk, "content"): - content = chunk.content - if content and isinstance(content, str): - # Start a new text block if needed - if current_text_id is None: - # Complete any previous step - completion_event = complete_current_step() - if completion_event: - yield completion_event - - if just_finished_tool: - # Clear the active step tracking - text flows without a dedicated step - last_active_step_id = None - last_active_step_title = "" - last_active_step_items = [] - just_finished_tool = False - - current_text_id = streaming_service.generate_text_id() - yield streaming_service.format_text_start(current_text_id) - - # Stream the text delta - yield streaming_service.format_text_delta( - current_text_id, content - ) - accumulated_text += content - - # Handle tool calls - elif event_type == "on_tool_start": - tool_name = event.get("name", "unknown_tool") - run_id = event.get("run_id", "") - tool_input = event.get("data", {}).get("input", {}) - - # End current text block if any - if current_text_id is not None: - yield streaming_service.format_text_end(current_text_id) - current_text_id = None - - # Complete any previous step EXCEPT "Synthesizing response" - # (we want to reuse the Synthesizing step after tools complete) - if last_active_step_title != "Synthesizing response": - completion_event = complete_current_step() - if completion_event: - yield completion_event - - # Reset the just_finished_tool flag since we're starting a new tool - just_finished_tool = False - - # Create thinking step for the tool call and store it for later update - tool_step_id = next_thinking_step_id() - tool_step_ids[run_id] = tool_step_id - last_active_step_id = tool_step_id - if tool_name == "search_knowledge_base": - query = ( - tool_input.get("query", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - last_active_step_title = "Searching knowledge base" - last_active_step_items = [ - f"Query: {query[:100]}{'...' if len(query) > 100 else ''}" - ] - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title="Searching knowledge base", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "link_preview": - url = ( - tool_input.get("url", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - last_active_step_title = "Fetching link preview" - last_active_step_items = [ - f"URL: {url[:80]}{'...' if len(url) > 80 else ''}" - ] - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title="Fetching link preview", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "display_image": - src = ( - tool_input.get("src", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - title = ( - tool_input.get("title", "") - if isinstance(tool_input, dict) - else "" - ) - last_active_step_title = "Analyzing the image" - last_active_step_items = [ - f"Analyzing: {title[:50] if title else src[:50]}{'...' if len(title or src) > 50 else ''}" - ] - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title="Analyzing the image", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "scrape_webpage": - url = ( - tool_input.get("url", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - last_active_step_title = "Scraping webpage" - last_active_step_items = [ - f"URL: {url[:80]}{'...' if len(url) > 80 else ''}" - ] - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title="Scraping webpage", - status="in_progress", - items=last_active_step_items, - ) - # elif tool_name == "write_todos": # Disabled for now - # # Track write_todos calls for better messaging - # write_todos_call_count += 1 - # todos = ( - # tool_input.get("todos", []) - # if isinstance(tool_input, dict) - # else [] - # ) - # todo_count = len(todos) if isinstance(todos, list) else 0 - - # if write_todos_call_count == 1: - # # First call - creating the plan - # last_active_step_title = "Creating plan" - # last_active_step_items = [f"Defining {todo_count} tasks..."] - # else: - # # Subsequent calls - updating the plan - # # Try to provide context about what's being updated - # in_progress_count = ( - # sum( - # 1 - # for t in todos - # if isinstance(t, dict) - # and t.get("status") == "in_progress" - # ) - # if isinstance(todos, list) - # else 0 - # ) - # completed_count = ( - # sum( - # 1 - # for t in todos - # if isinstance(t, dict) - # and t.get("status") == "completed" - # ) - # if isinstance(todos, list) - # else 0 - # ) - - # last_active_step_title = "Updating progress" - # last_active_step_items = ( - # [ - # f"Progress: {completed_count}/{todo_count} completed", - # f"In progress: {in_progress_count} tasks", - # ] - # if completed_count > 0 - # else [f"Working on {todo_count} tasks"] - # ) - - # yield streaming_service.format_thinking_step( - # step_id=tool_step_id, - # title=last_active_step_title, - # status="in_progress", - # items=last_active_step_items, - # ) - elif tool_name == "generate_podcast": - podcast_title = ( - tool_input.get("podcast_title", "SurfSense Podcast") - if isinstance(tool_input, dict) - else "SurfSense Podcast" - ) - # Get content length for context - content_len = len( - tool_input.get("source_content", "") - if isinstance(tool_input, dict) - else "" - ) - last_active_step_title = "Generating podcast" - last_active_step_items = [ - f"Title: {podcast_title}", - f"Content: {content_len:,} characters", - "Preparing audio generation...", - ] - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title="Generating podcast", - status="in_progress", - items=last_active_step_items, - ) - # elif tool_name == "ls": - # last_active_step_title = "Exploring files" - # last_active_step_items = [] - # yield streaming_service.format_thinking_step( - # step_id=tool_step_id, - # title="Exploring files", - # status="in_progress", - # items=None, - # ) - else: - last_active_step_title = f"Using {tool_name.replace('_', ' ')}" - last_active_step_items = [] - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title=last_active_step_title, - status="in_progress", - ) - - # Stream tool info - tool_call_id = ( - f"call_{run_id[:32]}" - if run_id - else streaming_service.generate_tool_call_id() - ) - yield streaming_service.format_tool_input_start(tool_call_id, tool_name) - yield streaming_service.format_tool_input_available( - tool_call_id, - tool_name, - tool_input - if isinstance(tool_input, dict) - else {"input": tool_input}, - ) - - elif event_type == "on_tool_end": - run_id = event.get("run_id", "") - tool_name = event.get("name", "unknown_tool") - raw_output = event.get("data", {}).get("output", "") - - # Handle deepagents' write_todos Command object specially - # Disabled for now - # if tool_name == "write_todos" and hasattr(raw_output, "update"): - # # deepagents returns a Command object - extract todos directly - # tool_output = extract_todos_from_deepagents(raw_output) - # elif hasattr(raw_output, "content"): - if hasattr(raw_output, "content"): - # It's a ToolMessage object - extract the content - content = raw_output.content - # If content is a string that looks like JSON, try to parse it - if isinstance(content, str): - try: - tool_output = json.loads(content) - except (json.JSONDecodeError, TypeError): - tool_output = {"result": content} - elif isinstance(content, dict): - tool_output = content - else: - tool_output = {"result": str(content)} - elif isinstance(raw_output, dict): - tool_output = raw_output - else: - tool_output = { - "result": str(raw_output) if raw_output else "completed" - } - - tool_call_id = f"call_{run_id[:32]}" if run_id else "call_unknown" - - # Get the original tool step ID to update it (not create a new one) - original_step_id = tool_step_ids.get( - run_id, f"thinking-unknown-{run_id[:8]}" - ) - - # Mark the tool thinking step as completed using the SAME step ID - # Also add to completed set so we don't try to complete it again - completed_step_ids.add(original_step_id) - if tool_name == "search_knowledge_base": - # Get result count if available - result_info = "Search completed" - if isinstance(tool_output, dict): - result_len = tool_output.get("result_length", 0) - if result_len > 0: - result_info = ( - f"Found relevant information ({result_len} chars)" - ) - # Include original query in completed items - completed_items = [*last_active_step_items, result_info] - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title="Searching knowledge base", - status="completed", - items=completed_items, - ) - elif tool_name == "link_preview": - # Build completion items based on link preview result - if isinstance(tool_output, dict): - title = tool_output.get("title", "Link") - domain = tool_output.get("domain", "") - has_error = "error" in tool_output - if has_error: - completed_items = [ - *last_active_step_items, - f"Error: {tool_output.get('error', 'Failed to fetch')}", - ] - else: - completed_items = [ - *last_active_step_items, - f"Title: {title[:60]}{'...' if len(title) > 60 else ''}", - f"Domain: {domain}" if domain else "Preview loaded", - ] - else: - completed_items = [*last_active_step_items, "Preview loaded"] - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title="Fetching link preview", - status="completed", - items=completed_items, - ) - elif tool_name == "display_image": - # Build completion items for image analysis - if isinstance(tool_output, dict): - title = tool_output.get("title", "") - alt = tool_output.get("alt", "Image") - display_name = title or alt - completed_items = [ - *last_active_step_items, - f"Analyzed: {display_name[:50]}{'...' if len(display_name) > 50 else ''}", - ] - else: - completed_items = [*last_active_step_items, "Image analyzed"] - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title="Analyzing the image", - status="completed", - items=completed_items, - ) - elif tool_name == "scrape_webpage": - # Build completion items for webpage scraping - if isinstance(tool_output, dict): - title = tool_output.get("title", "Webpage") - word_count = tool_output.get("word_count", 0) - has_error = "error" in tool_output - if has_error: - completed_items = [ - *last_active_step_items, - f"Error: {tool_output.get('error', 'Failed to scrape')[:50]}", - ] - else: - completed_items = [ - *last_active_step_items, - f"Title: {title[:50]}{'...' if len(title) > 50 else ''}", - f"Extracted: {word_count:,} words", - ] - else: - completed_items = [*last_active_step_items, "Content extracted"] - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title="Scraping webpage", - status="completed", - items=completed_items, - ) - elif tool_name == "generate_podcast": - # Build detailed completion items based on podcast status - podcast_status = ( - tool_output.get("status", "unknown") - if isinstance(tool_output, dict) - else "unknown" - ) - podcast_title = ( - tool_output.get("title", "Podcast") - if isinstance(tool_output, dict) - else "Podcast" - ) - - if podcast_status == "processing": - completed_items = [ - f"Title: {podcast_title}", - "Audio generation started", - "Processing in background...", - ] - elif podcast_status == "already_generating": - completed_items = [ - f"Title: {podcast_title}", - "Podcast already in progress", - "Please wait for it to complete", - ] - elif podcast_status == "error": - error_msg = ( - tool_output.get("error", "Unknown error") - if isinstance(tool_output, dict) - else "Unknown error" - ) - completed_items = [ - f"Title: {podcast_title}", - f"Error: {error_msg[:50]}", - ] - else: - completed_items = last_active_step_items - - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title="Generating podcast", - status="completed", - items=completed_items, - ) - # elif tool_name == "write_todos": # Disabled for now - # # Build completion items for planning/updating - # if isinstance(tool_output, dict): - # todos = tool_output.get("todos", []) - # todo_count = len(todos) if isinstance(todos, list) else 0 - # completed_count = ( - # sum( - # 1 - # for t in todos - # if isinstance(t, dict) - # and t.get("status") == "completed" - # ) - # if isinstance(todos, list) - # else 0 - # ) - # in_progress_count = ( - # sum( - # 1 - # for t in todos - # if isinstance(t, dict) - # and t.get("status") == "in_progress" - # ) - # if isinstance(todos, list) - # else 0 - # ) - - # # Use context-aware completion message - # if last_active_step_title == "Creating plan": - # completed_items = [f"Created {todo_count} tasks"] - # else: - # # Updating progress - show stats - # completed_items = [ - # f"Progress: {completed_count}/{todo_count} completed", - # ] - # if in_progress_count > 0: - # # Find the currently in-progress task name - # in_progress_task = next( - # ( - # t.get("content", "")[:40] - # for t in todos - # if isinstance(t, dict) - # and t.get("status") == "in_progress" - # ), - # None, - # ) - # if in_progress_task: - # completed_items.append( - # f"Current: {in_progress_task}..." - # ) - # else: - # completed_items = ["Plan updated"] - # yield streaming_service.format_thinking_step( - # step_id=original_step_id, - # title=last_active_step_title, - # status="completed", - # items=completed_items, - # ) - elif tool_name == "ls": - # Build completion items showing file names found - if isinstance(tool_output, dict): - result = tool_output.get("result", "") - elif isinstance(tool_output, str): - result = tool_output - else: - result = str(tool_output) if tool_output else "" - - # Parse file paths and extract just the file names - file_names = [] - if result: - # The ls tool returns paths, extract just the file/folder names - for line in result.strip().split("\n"): - line = line.strip() - if line: - # Get just the filename from the path - name = line.rstrip("/").split("/")[-1] - if name and len(name) <= 40: - file_names.append(name) - elif name: - file_names.append(name[:37] + "...") - - # Build display items - wrap file names in brackets for icon rendering - if file_names: - if len(file_names) <= 5: - # Wrap each file name in brackets for styled tile rendering - completed_items = [f"[{name}]" for name in file_names] - else: - # Show first few with brackets and count - completed_items = [f"[{name}]" for name in file_names[:4]] - completed_items.append(f"(+{len(file_names) - 4} more)") - else: - completed_items = ["No files found"] - - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title="Exploring files", - status="completed", - items=completed_items, - ) - else: - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title=f"Using {tool_name.replace('_', ' ')}", - status="completed", - items=last_active_step_items, - ) - - # Mark that we just finished a tool - "Synthesizing response" will be created - # when text actually starts flowing (not immediately) - just_finished_tool = True - # Clear the active step since the tool is done - last_active_step_id = None - last_active_step_title = "" - last_active_step_items = [] - - # Handle different tool outputs - if tool_name == "generate_podcast": - # Stream the full podcast result so frontend can render the audio player - yield streaming_service.format_tool_output_available( - tool_call_id, - tool_output - if isinstance(tool_output, dict) - else {"result": tool_output}, - ) - # Send appropriate terminal message based on status - if ( - isinstance(tool_output, dict) - and tool_output.get("status") == "success" - ): - yield streaming_service.format_terminal_info( - f"Podcast generated successfully: {tool_output.get('title', 'Podcast')}", - "success", - ) - else: - error_msg = ( - tool_output.get("error", "Unknown error") - if isinstance(tool_output, dict) - else "Unknown error" - ) - yield streaming_service.format_terminal_info( - f"Podcast generation failed: {error_msg}", - "error", - ) - elif tool_name == "link_preview": - # Stream the full link preview result so frontend can render the MediaCard - yield streaming_service.format_tool_output_available( - tool_call_id, - tool_output - if isinstance(tool_output, dict) - else {"result": tool_output}, - ) - # Send appropriate terminal message - if isinstance(tool_output, dict) and "error" not in tool_output: - title = tool_output.get("title", "Link") - yield streaming_service.format_terminal_info( - f"Link preview loaded: {title[:50]}{'...' if len(title) > 50 else ''}", - "success", - ) - else: - error_msg = ( - tool_output.get("error", "Failed to fetch") - if isinstance(tool_output, dict) - else "Failed to fetch" - ) - yield streaming_service.format_terminal_info( - f"Link preview failed: {error_msg}", - "error", - ) - elif tool_name == "display_image": - # Stream the full image result so frontend can render the Image component - yield streaming_service.format_tool_output_available( - tool_call_id, - tool_output - if isinstance(tool_output, dict) - else {"result": tool_output}, - ) - # Send terminal message - if isinstance(tool_output, dict): - title = tool_output.get("title") or tool_output.get( - "alt", "Image" - ) - yield streaming_service.format_terminal_info( - f"Image analyzed: {title[:40]}{'...' if len(title) > 40 else ''}", - "success", - ) - elif tool_name == "scrape_webpage": - # Stream the scrape result so frontend can render the Article component - # Note: We send metadata for display, but content goes to LLM for processing - if isinstance(tool_output, dict): - # Create a display-friendly output (without full content for the card) - display_output = { - k: v for k, v in tool_output.items() if k != "content" - } - # But keep a truncated content preview - if "content" in tool_output: - content = tool_output.get("content", "") - display_output["content_preview"] = ( - content[:500] + "..." if len(content) > 500 else content - ) - yield streaming_service.format_tool_output_available( - tool_call_id, - display_output, - ) - else: - yield streaming_service.format_tool_output_available( - tool_call_id, - {"result": tool_output}, - ) - # Send terminal message - if isinstance(tool_output, dict) and "error" not in tool_output: - title = tool_output.get("title", "Webpage") - word_count = tool_output.get("word_count", 0) - yield streaming_service.format_terminal_info( - f"Scraped: {title[:40]}{'...' if len(title) > 40 else ''} ({word_count:,} words)", - "success", - ) - else: - error_msg = ( - tool_output.get("error", "Failed to scrape") - if isinstance(tool_output, dict) - else "Failed to scrape" - ) - yield streaming_service.format_terminal_info( - f"Scrape failed: {error_msg}", - "error", - ) - elif tool_name == "search_knowledge_base": - # Don't stream the full output for search (can be very large), just acknowledge - yield streaming_service.format_tool_output_available( - tool_call_id, - {"status": "completed", "result_length": len(str(tool_output))}, - ) - yield streaming_service.format_terminal_info( - "Knowledge base search completed", "success" - ) - # elif tool_name == "write_todos": # Disabled for now - # # Stream the full write_todos result so frontend can render the Plan component - # yield streaming_service.format_tool_output_available( - # tool_call_id, - # tool_output - # if isinstance(tool_output, dict) - # else {"result": tool_output}, - # ) - # # Send terminal message with plan info - # if isinstance(tool_output, dict): - # todos = tool_output.get("todos", []) - # todo_count = len(todos) if isinstance(todos, list) else 0 - # yield streaming_service.format_terminal_info( - # f"Plan created ({todo_count} tasks)", - # "success", - # ) - # else: - # yield streaming_service.format_terminal_info( - # "Plan created", - # "success", - # ) - else: - # Default handling for other tools - yield streaming_service.format_tool_output_available( - tool_call_id, - {"status": "completed", "result_length": len(str(tool_output))}, - ) - yield streaming_service.format_terminal_info( - f"Tool {tool_name} completed", "success" - ) - - # Handle chain/agent end to close any open text blocks - elif event_type in ("on_chain_end", "on_agent_end"): - if current_text_id is not None: - yield streaming_service.format_text_end(current_text_id) - current_text_id = None - - # Ensure text block is closed - if current_text_id is not None: - yield streaming_service.format_text_end(current_text_id) - - # Mark the last active thinking step as completed using the same title - completion_event = complete_current_step() - if completion_event: - yield completion_event - - # Check if the graph was interrupted (human-in-the-loop) - state = await agent.aget_state(config) - is_interrupted = state.tasks and any(task.interrupts for task in state.tasks) - if is_interrupted: - interrupt_value = state.tasks[0].interrupts[0].value - yield streaming_service.format_interrupt_request(interrupt_value) + if stream_result.is_interrupted: yield streaming_service.format_finish_step() yield streaming_service.format_finish() yield streaming_service.format_done() return + accumulated_text = stream_result.accumulated_text + # Generate LLM title for new chats after first response # Check if this is the first assistant response by counting existing assistant messages from sqlalchemy import func @@ -1256,10 +1075,6 @@ async def stream_new_chat( print(f"[stream_new_chat] Exception type: {type(e).__name__}") print(f"[stream_new_chat] Traceback:\n{traceback.format_exc()}") - # Close any open text block - if current_text_id is not None: - yield streaming_service.format_text_end(current_text_id) - yield streaming_service.format_error(error_message) yield streaming_service.format_finish_step() yield streaming_service.format_finish() @@ -1280,7 +1095,6 @@ async def stream_resume_chat( thread_visibility: ChatVisibility | None = None, ) -> AsyncGenerator[str, None]: streaming_service = VercelStreamingService() - current_text_id: str | None = None try: if user_id: @@ -1353,521 +1167,21 @@ async def stream_resume_chat( yield streaming_service.format_message_start() yield streaming_service.format_start_step() - accumulated_text = "" - thinking_step_counter = 0 - tool_step_ids: dict[str, str] = {} - completed_step_ids: set[str] = set() - last_active_step_id: str | None = None - last_active_step_title = "" - last_active_step_items: list[str] = [] - just_finished_tool = False - - def next_thinking_step_id() -> str: - nonlocal thinking_step_counter - thinking_step_counter += 1 - return f"thinking-resume-{thinking_step_counter}" - - def complete_current_step() -> str | None: - nonlocal last_active_step_id - if last_active_step_id and last_active_step_id not in completed_step_ids: - completed_step_ids.add(last_active_step_id) - event = streaming_service.format_thinking_step( - step_id=last_active_step_id, - title=last_active_step_title, - status="completed", - items=last_active_step_items, - ) - last_active_step_id = None - return event - return None - - async for event in agent.astream_events( - Command(resume={"decisions": decisions}), config=config, version="v2" + stream_result = StreamResult() + async for sse in _stream_agent_events( + agent=agent, + config=config, + input_data=Command(resume={"decisions": decisions}), + streaming_service=streaming_service, + result=stream_result, + step_prefix="thinking-resume", ): - event_type = event.get("event", "") - - if event_type == "on_chat_model_stream": - chunk = event.get("data", {}).get("chunk") - if chunk and hasattr(chunk, "content"): - content = chunk.content - if content and isinstance(content, str): - if current_text_id is None: - completion_event = complete_current_step() - if completion_event: - yield completion_event - if just_finished_tool: - last_active_step_id = None - last_active_step_title = "" - last_active_step_items = [] - just_finished_tool = False - current_text_id = streaming_service.generate_text_id() - yield streaming_service.format_text_start(current_text_id) - yield streaming_service.format_text_delta( - current_text_id, content - ) - accumulated_text += content - - elif event_type == "on_tool_start": - tool_name = event.get("name", "unknown_tool") - run_id = event.get("run_id", "") - tool_input = event.get("data", {}).get("input", {}) - - if current_text_id is not None: - yield streaming_service.format_text_end(current_text_id) - current_text_id = None - - if last_active_step_title != "Synthesizing response": - completion_event = complete_current_step() - if completion_event: - yield completion_event - - just_finished_tool = False - tool_step_id = next_thinking_step_id() - tool_step_ids[run_id] = tool_step_id - last_active_step_id = tool_step_id - - if tool_name == "search_knowledge_base": - query = ( - tool_input.get("query", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - last_active_step_title = "Searching knowledge base" - last_active_step_items = [ - f"Query: {query[:100]}{'...' if len(query) > 100 else ''}" - ] - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title="Searching knowledge base", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "link_preview": - url = ( - tool_input.get("url", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - last_active_step_title = "Fetching link preview" - last_active_step_items = [ - f"URL: {url[:80]}{'...' if len(url) > 80 else ''}" - ] - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title="Fetching link preview", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "display_image": - src = ( - tool_input.get("src", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - title = ( - tool_input.get("title", "") - if isinstance(tool_input, dict) - else "" - ) - last_active_step_title = "Analyzing the image" - last_active_step_items = [ - f"Analyzing: {title[:50] if title else src[:50]}{'...' if len(title or src) > 50 else ''}" - ] - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title="Analyzing the image", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "scrape_webpage": - url = ( - tool_input.get("url", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - last_active_step_title = "Scraping webpage" - last_active_step_items = [ - f"URL: {url[:80]}{'...' if len(url) > 80 else ''}" - ] - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title="Scraping webpage", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "generate_podcast": - podcast_title = ( - tool_input.get("podcast_title", "SurfSense Podcast") - if isinstance(tool_input, dict) - else "SurfSense Podcast" - ) - content_len = len( - tool_input.get("source_content", "") - if isinstance(tool_input, dict) - else "" - ) - last_active_step_title = "Generating podcast" - last_active_step_items = [ - f"Title: {podcast_title}", - f"Content: {content_len:,} characters", - "Preparing audio generation...", - ] - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title="Generating podcast", - status="in_progress", - items=last_active_step_items, - ) - else: - last_active_step_title = f"Using {tool_name.replace('_', ' ')}" - last_active_step_items = [] - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title=last_active_step_title, - status="in_progress", - ) - - tool_call_id = ( - f"call_{run_id[:32]}" - if run_id - else streaming_service.generate_tool_call_id() - ) - yield streaming_service.format_tool_input_start(tool_call_id, tool_name) - yield streaming_service.format_tool_input_available( - tool_call_id, - tool_name, - tool_input - if isinstance(tool_input, dict) - else {"input": tool_input}, - ) - - elif event_type == "on_tool_end": - run_id = event.get("run_id", "") - tool_name = event.get("name", "unknown_tool") - raw_output = event.get("data", {}).get("output", "") - - if hasattr(raw_output, "content"): - content = raw_output.content - if isinstance(content, str): - try: - tool_output = json.loads(content) - except (json.JSONDecodeError, TypeError): - tool_output = {"result": content} - elif isinstance(content, dict): - tool_output = content - else: - tool_output = {"result": str(content)} - elif isinstance(raw_output, dict): - tool_output = raw_output - else: - tool_output = { - "result": str(raw_output) if raw_output else "completed" - } - - tool_call_id = f"call_{run_id[:32]}" if run_id else "call_unknown" - original_step_id = tool_step_ids.get( - run_id, f"thinking-unknown-{run_id[:8]}" - ) - completed_step_ids.add(original_step_id) - - if tool_name == "search_knowledge_base": - result_info = "Search completed" - if isinstance(tool_output, dict): - result_len = tool_output.get("result_length", 0) - if result_len > 0: - result_info = ( - f"Found relevant information ({result_len} chars)" - ) - completed_items = [*last_active_step_items, result_info] - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title="Searching knowledge base", - status="completed", - items=completed_items, - ) - elif tool_name == "link_preview": - if isinstance(tool_output, dict): - title = tool_output.get("title", "Link") - domain = tool_output.get("domain", "") - has_error = "error" in tool_output - if has_error: - completed_items = [ - *last_active_step_items, - f"Error: {tool_output.get('error', 'Failed to fetch')}", - ] - else: - completed_items = [ - *last_active_step_items, - f"Title: {title[:60]}{'...' if len(title) > 60 else ''}", - f"Domain: {domain}" if domain else "Preview loaded", - ] - else: - completed_items = [*last_active_step_items, "Preview loaded"] - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title="Fetching link preview", - status="completed", - items=completed_items, - ) - elif tool_name == "display_image": - if isinstance(tool_output, dict): - title = tool_output.get("title", "") - alt = tool_output.get("alt", "Image") - display_name = title or alt - completed_items = [ - *last_active_step_items, - f"Analyzed: {display_name[:50]}{'...' if len(display_name) > 50 else ''}", - ] - else: - completed_items = [*last_active_step_items, "Image analyzed"] - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title="Analyzing the image", - status="completed", - items=completed_items, - ) - elif tool_name == "scrape_webpage": - if isinstance(tool_output, dict): - title = tool_output.get("title", "Webpage") - word_count = tool_output.get("word_count", 0) - has_error = "error" in tool_output - if has_error: - completed_items = [ - *last_active_step_items, - f"Error: {tool_output.get('error', 'Failed to scrape')[:50]}", - ] - else: - completed_items = [ - *last_active_step_items, - f"Title: {title[:50]}{'...' if len(title) > 50 else ''}", - f"Extracted: {word_count:,} words", - ] - else: - completed_items = [*last_active_step_items, "Content extracted"] - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title="Scraping webpage", - status="completed", - items=completed_items, - ) - elif tool_name == "generate_podcast": - podcast_status = ( - tool_output.get("status", "unknown") - if isinstance(tool_output, dict) - else "unknown" - ) - podcast_title = ( - tool_output.get("title", "Podcast") - if isinstance(tool_output, dict) - else "Podcast" - ) - if podcast_status == "processing": - completed_items = [ - f"Title: {podcast_title}", - "Audio generation started", - "Processing in background...", - ] - elif podcast_status == "already_generating": - completed_items = [ - f"Title: {podcast_title}", - "Podcast already in progress", - "Please wait for it to complete", - ] - elif podcast_status == "error": - error_msg = ( - tool_output.get("error", "Unknown error") - if isinstance(tool_output, dict) - else "Unknown error" - ) - completed_items = [ - f"Title: {podcast_title}", - f"Error: {error_msg[:50]}", - ] - else: - completed_items = last_active_step_items - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title="Generating podcast", - status="completed", - items=completed_items, - ) - elif tool_name == "ls": - if isinstance(tool_output, dict): - result = tool_output.get("result", "") - elif isinstance(tool_output, str): - result = tool_output - else: - result = str(tool_output) if tool_output else "" - file_names = [] - if result: - for line in result.strip().split("\n"): - line = line.strip() - if line: - name = line.rstrip("/").split("/")[-1] - if name and len(name) <= 40: - file_names.append(name) - elif name: - file_names.append(name[:37] + "...") - if file_names: - if len(file_names) <= 5: - completed_items = [f"[{name}]" for name in file_names] - else: - completed_items = [f"[{name}]" for name in file_names[:4]] - completed_items.append(f"(+{len(file_names) - 4} more)") - else: - completed_items = ["No files found"] - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title="Exploring files", - status="completed", - items=completed_items, - ) - else: - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title=f"Using {tool_name.replace('_', ' ')}", - status="completed", - items=last_active_step_items, - ) - - just_finished_tool = True - last_active_step_id = None - last_active_step_title = "" - last_active_step_items = [] - - if tool_name == "generate_podcast": - yield streaming_service.format_tool_output_available( - tool_call_id, - tool_output - if isinstance(tool_output, dict) - else {"result": tool_output}, - ) - if ( - isinstance(tool_output, dict) - and tool_output.get("status") == "success" - ): - yield streaming_service.format_terminal_info( - f"Podcast generated successfully: {tool_output.get('title', 'Podcast')}", - "success", - ) - else: - error_msg = ( - tool_output.get("error", "Unknown error") - if isinstance(tool_output, dict) - else "Unknown error" - ) - yield streaming_service.format_terminal_info( - f"Podcast generation failed: {error_msg}", - "error", - ) - elif tool_name == "link_preview": - yield streaming_service.format_tool_output_available( - tool_call_id, - tool_output - if isinstance(tool_output, dict) - else {"result": tool_output}, - ) - if isinstance(tool_output, dict) and "error" not in tool_output: - title = tool_output.get("title", "Link") - yield streaming_service.format_terminal_info( - f"Link preview loaded: {title[:50]}{'...' if len(title) > 50 else ''}", - "success", - ) - else: - error_msg = ( - tool_output.get("error", "Failed to fetch") - if isinstance(tool_output, dict) - else "Failed to fetch" - ) - yield streaming_service.format_terminal_info( - f"Link preview failed: {error_msg}", - "error", - ) - elif tool_name == "display_image": - yield streaming_service.format_tool_output_available( - tool_call_id, - tool_output - if isinstance(tool_output, dict) - else {"result": tool_output}, - ) - if isinstance(tool_output, dict): - title = tool_output.get("title") or tool_output.get( - "alt", "Image" - ) - yield streaming_service.format_terminal_info( - f"Image analyzed: {title[:40]}{'...' if len(title) > 40 else ''}", - "success", - ) - elif tool_name == "scrape_webpage": - if isinstance(tool_output, dict): - display_output = { - k: v for k, v in tool_output.items() if k != "content" - } - if "content" in tool_output: - content = tool_output.get("content", "") - display_output["content_preview"] = ( - content[:500] + "..." if len(content) > 500 else content - ) - yield streaming_service.format_tool_output_available( - tool_call_id, - display_output, - ) - else: - yield streaming_service.format_tool_output_available( - tool_call_id, - {"result": tool_output}, - ) - if isinstance(tool_output, dict) and "error" not in tool_output: - title = tool_output.get("title", "Webpage") - word_count = tool_output.get("word_count", 0) - yield streaming_service.format_terminal_info( - f"Scraped: {title[:40]}{'...' if len(title) > 40 else ''} ({word_count:,} words)", - "success", - ) - else: - error_msg = ( - tool_output.get("error", "Failed to scrape") - if isinstance(tool_output, dict) - else "Failed to scrape" - ) - yield streaming_service.format_terminal_info( - f"Scrape failed: {error_msg}", - "error", - ) - elif tool_name == "search_knowledge_base": - yield streaming_service.format_tool_output_available( - tool_call_id, - {"status": "completed", "result_length": len(str(tool_output))}, - ) - yield streaming_service.format_terminal_info( - "Knowledge base search completed", "success" - ) - else: - yield streaming_service.format_tool_output_available( - tool_call_id, - {"status": "completed", "result_length": len(str(tool_output))}, - ) - yield streaming_service.format_terminal_info( - f"Tool {tool_name} completed", "success" - ) - - elif event_type in ("on_chain_end", "on_agent_end"): - if current_text_id is not None: - yield streaming_service.format_text_end(current_text_id) - current_text_id = None - - if current_text_id is not None: - yield streaming_service.format_text_end(current_text_id) - - completion_event = complete_current_step() - if completion_event: - yield completion_event - - state = await agent.aget_state(config) - is_interrupted = state.tasks and any(task.interrupts for task in state.tasks) - if is_interrupted: - interrupt_value = state.tasks[0].interrupts[0].value - yield streaming_service.format_interrupt_request(interrupt_value) + yield sse + if stream_result.is_interrupted: + yield streaming_service.format_finish_step() + yield streaming_service.format_finish() + yield streaming_service.format_done() + return yield streaming_service.format_finish_step() yield streaming_service.format_finish() @@ -1879,8 +1193,6 @@ async def stream_resume_chat( error_message = f"Error during resume: {e!s}" print(f"[stream_resume_chat] {error_message}") print(f"[stream_resume_chat] Traceback:\n{traceback.format_exc()}") - if current_text_id is not None: - yield streaming_service.format_text_end(current_text_id) yield streaming_service.format_error(error_message) yield streaming_service.format_finish_step() yield streaming_service.format_finish() From 5d1c386105a329404eaa8a36c2637c6cd345de3c Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 11 Feb 2026 13:50:46 +0200 Subject: [PATCH 07/42] extract shared streaming helpers from page.tsx into streaming-state.ts Move duplicated types (ThinkingStepData, ContentPart), content-part helpers (appendText, addToolCall, updateToolCall, buildContentForUI, buildContentForPersistence), and SSE read loop (readSSEStream) into a shared module. Removes ~395 lines of tripled code from page.tsx. --- .../new-chat/[[...chat_id]]/page.tsx | 446 ++---------------- surfsense_web/lib/chat/streaming-state.ts | 184 ++++++++ 2 files changed, 235 insertions(+), 395 deletions(-) create mode 100644 surfsense_web/lib/chat/streaming-state.ts diff --git a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx index 2db00a03d..448148229 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx @@ -12,6 +12,7 @@ import { useParams, useSearchParams } from "next/navigation"; import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { toast } from "sonner"; import { z } from "zod"; +import { addToolCall, appendText, buildContentForPersistence, buildContentForUI, type ContentPart, type ContentPartsState, readSSEStream, type ThinkingStepData, updateToolCall } from "@/lib/chat/streaming-state"; import { clearTargetCommentIdAtom, currentThreadAtom, @@ -125,15 +126,6 @@ const TOOLS_WITH_UI = new Set([ // "write_todos", // Disabled for now ]); -/** - * Type for thinking step data from the backend - */ -interface ThinkingStepData { - id: string; - title: string; - status: "pending" | "in_progress" | "completed"; - items: string[]; -} export default function NewChatPage() { const params = useParams(); @@ -540,102 +532,11 @@ export default function NewChatPage() { const assistantMsgId = `msg-assistant-${Date.now()}`; const currentThinkingSteps = new Map(); - // Ordered content parts to preserve inline tool call positions - // Each part is either a text segment or a tool call - type ContentPart = - | { type: "text"; text: string } - | { - type: "tool-call"; - toolCallId: string; - toolName: string; - args: Record; - result?: unknown; - }; - const contentParts: ContentPart[] = []; + const contentPartsState: ContentPartsState = { contentParts: [], currentTextPartIndex: -1, toolCallIndices: new Map() }; + const { contentParts, toolCallIndices } = contentPartsState; let wasInterrupted = false; - // Track the current text segment index (for appending text deltas) - let currentTextPartIndex = -1; - // Map to track tool call indices for updating results - const toolCallIndices = new Map(); - - // Helper to get or create the current text part for appending text - const appendText = (delta: string) => { - if (currentTextPartIndex >= 0 && contentParts[currentTextPartIndex]?.type === "text") { - // Append to existing text part - (contentParts[currentTextPartIndex] as { type: "text"; text: string }).text += delta; - } else { - // Create new text part - contentParts.push({ type: "text", text: delta }); - currentTextPartIndex = contentParts.length - 1; - } - }; - - // Helper to add a tool call (this "breaks" the current text segment) - const addToolCall = (toolCallId: string, toolName: string, args: Record) => { - if (TOOLS_WITH_UI.has(toolName)) { - contentParts.push({ - type: "tool-call", - toolCallId, - toolName, - args, - }); - toolCallIndices.set(toolCallId, contentParts.length - 1); - // Reset text part index so next text creates a new segment - currentTextPartIndex = -1; - } - }; - - // Helper to update a tool call's args or result - const updateToolCall = ( - toolCallId: string, - update: { args?: Record; result?: unknown } - ) => { - const index = toolCallIndices.get(toolCallId); - if (index !== undefined && contentParts[index]?.type === "tool-call") { - const tc = contentParts[index] as ContentPart & { type: "tool-call" }; - if (update.args) tc.args = update.args; - if (update.result !== undefined) tc.result = update.result; - } - }; - - // Helper to build content for UI (without thinking-steps to avoid assistant-ui errors) - const buildContentForUI = (): ThreadMessageLike["content"] => { - // Filter to only include text parts with content and tool-calls with UI - const filtered = contentParts.filter((part) => { - if (part.type === "text") return part.text.length > 0; - if (part.type === "tool-call") return TOOLS_WITH_UI.has(part.toolName); - return false; - }); - return filtered.length > 0 - ? (filtered as ThreadMessageLike["content"]) - : [{ type: "text", text: "" }]; - }; - - // Helper to build content for persistence (includes thinking-steps for restoration) - const buildContentForPersistence = (): unknown[] => { - const parts: unknown[] = []; - - // Include thinking steps for persistence - if (currentThinkingSteps.size > 0) { - parts.push({ - type: "thinking-steps", - steps: Array.from(currentThinkingSteps.values()), - }); - } - - // Add content parts (filtered) - for (const part of contentParts) { - if (part.type === "text" && part.text.length > 0) { - parts.push(part); - } else if (part.type === "tool-call" && TOOLS_WITH_UI.has(part.toolName)) { - parts.push(part); - } - } - - return parts.length > 0 ? parts : [{ type: "text", text: "" }]; - }; // Add placeholder assistant message setMessages((prev) => [ @@ -701,50 +602,23 @@ export default function NewChatPage() { throw new Error(`Backend error: ${response.status}`); } - if (!response.body) { - throw new Error("No response body"); - } - - // Parse SSE stream - const reader = response.body.getReader(); - const decoder = new TextDecoder(); - let buffer = ""; - - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - - buffer += decoder.decode(value, { stream: true }); - const events = buffer.split(/\r?\n\r?\n/); - buffer = events.pop() || ""; - - for (const event of events) { - const lines = event.split(/\r?\n/); - for (const line of lines) { - if (!line.startsWith("data: ")) continue; - const data = line.slice(6).trim(); - if (!data || data === "[DONE]") continue; - - try { - const parsed = JSON.parse(data); - + for await (const parsed of readSSEStream(response)) { switch (parsed.type) { case "text-delta": - appendText(parsed.delta); + appendText(contentPartsState, parsed.delta); setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); break; case "tool-input-start": // Add tool call inline - this breaks the current text segment - addToolCall(parsed.toolCallId, parsed.toolName, {}); + addToolCall(contentPartsState, TOOLS_WITH_UI, parsed.toolCallId, parsed.toolName, {}); setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); break; @@ -752,13 +626,13 @@ export default function NewChatPage() { case "tool-input-available": { // Update existing tool call's args, or add if not exists if (toolCallIndices.has(parsed.toolCallId)) { - updateToolCall(parsed.toolCallId, { args: parsed.input || {} }); + updateToolCall(contentPartsState, parsed.toolCallId, { args: parsed.input || {} }); } else { - addToolCall(parsed.toolCallId, parsed.toolName, parsed.input || {}); + addToolCall(contentPartsState, TOOLS_WITH_UI, parsed.toolCallId, parsed.toolName, parsed.input || {}); } setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); break; @@ -766,7 +640,7 @@ export default function NewChatPage() { case "tool-output-available": { // Update the tool call with its result - updateToolCall(parsed.toolCallId, { result: parsed.output }); + updateToolCall(contentPartsState, parsed.toolCallId, { result: parsed.output }); // Handle podcast-specific logic if (parsed.output?.status === "pending" && parsed.output?.podcast_id) { // Check if this is a podcast tool by looking at the content part @@ -780,7 +654,7 @@ export default function NewChatPage() { } setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); break; @@ -843,20 +717,20 @@ export default function NewChatPage() { } ); if (existingIdx) { - updateToolCall(existingIdx[0], { + updateToolCall(contentPartsState, existingIdx[0], { result: { __interrupt__: true, ...interruptData }, }); } else { const tcId = `interrupt-${action.name}`; - addToolCall(tcId, action.name, action.args); - updateToolCall(tcId, { + addToolCall(contentPartsState, TOOLS_WITH_UI, tcId, action.name, action.args); + updateToolCall(contentPartsState, tcId, { result: { __interrupt__: true, ...interruptData }, }); } } setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); if (currentThreadId) { @@ -872,20 +746,11 @@ export default function NewChatPage() { case "error": throw new Error(parsed.errorText || "Server error"); } - } catch (e) { - if (e instanceof SyntaxError) continue; - throw e; - } - } - } - } - } finally { - reader.releaseLock(); } // Persist assistant message (with thinking steps for restoration on refresh) // Skip persistence for interrupted messages -- handleResume will persist the final version - const finalContent = buildContentForPersistence(); + const finalContent = buildContentForPersistence(contentPartsState, TOOLS_WITH_UI, currentThinkingSteps); if (contentParts.length > 0 && !wasInterrupted) { try { const savedMessage = await appendMessage(currentThreadId, { @@ -933,7 +798,7 @@ export default function NewChatPage() { (part.type === "tool-call" && TOOLS_WITH_UI.has(part.toolName)) ); if (hasContent && currentThreadId) { - const partialContent = buildContentForPersistence(); + const partialContent = buildContentForPersistence(contentPartsState, TOOLS_WITH_UI, currentThinkingSteps); try { const savedMessage = await appendMessage(currentThreadId, { role: "assistant", @@ -1019,18 +884,8 @@ export default function NewChatPage() { (messageThinkingSteps.get(assistantMsgId) ?? []).map((s) => [s.id, s]) ); - type ContentPart = - | { type: "text"; text: string } - | { - type: "tool-call"; - toolCallId: string; - toolName: string; - args: Record; - result?: unknown; - }; - const contentParts: ContentPart[] = []; - let currentTextPartIndex = -1; - const toolCallIndices = new Map(); + const contentPartsState: ContentPartsState = { contentParts: [], currentTextPartIndex: -1, toolCallIndices: new Map() }; + const { contentParts, toolCallIndices } = contentPartsState; const existingMsg = messages.find((m) => m.id === assistantMsgId); if (existingMsg && Array.isArray(existingMsg.content)) { @@ -1039,7 +894,7 @@ export default function NewChatPage() { const p = part as Record; if (p.type === "text") { contentParts.push({ type: "text", text: String(p.text ?? "") }); - currentTextPartIndex = contentParts.length - 1; + contentPartsState.currentTextPartIndex = contentParts.length - 1; } else if (p.type === "tool-call") { toolCallIndices.set(String(p.toolCallId), contentParts.length); contentParts.push({ @@ -1049,7 +904,7 @@ export default function NewChatPage() { args: (p.args as Record) ?? {}, result: p.result as unknown, }); - currentTextPartIndex = -1; + contentPartsState.currentTextPartIndex = -1; } } } @@ -1072,68 +927,7 @@ export default function NewChatPage() { } } - const appendText = (delta: string) => { - if (currentTextPartIndex >= 0 && contentParts[currentTextPartIndex]?.type === "text") { - (contentParts[currentTextPartIndex] as { type: "text"; text: string }).text += delta; - } else { - contentParts.push({ type: "text", text: delta }); - currentTextPartIndex = contentParts.length - 1; - } - }; - const addToolCall = (toolCallId: string, toolName: string, args: Record) => { - if (TOOLS_WITH_UI.has(toolName)) { - contentParts.push({ - type: "tool-call", - toolCallId, - toolName, - args, - }); - toolCallIndices.set(toolCallId, contentParts.length - 1); - currentTextPartIndex = -1; - } - }; - - const updateToolCall = ( - toolCallId: string, - update: { args?: Record; result?: unknown } - ) => { - const index = toolCallIndices.get(toolCallId); - if (index !== undefined && contentParts[index]?.type === "tool-call") { - const tc = contentParts[index] as ContentPart & { type: "tool-call" }; - if (update.args) tc.args = update.args; - if (update.result !== undefined) tc.result = update.result; - } - }; - - const buildContentForUI = (): ThreadMessageLike["content"] => { - const filtered = contentParts.filter((part) => { - if (part.type === "text") return part.text.length > 0; - if (part.type === "tool-call") return TOOLS_WITH_UI.has(part.toolName); - return false; - }); - return filtered.length > 0 - ? (filtered as ThreadMessageLike["content"]) - : [{ type: "text", text: "" }]; - }; - - const buildContentForPersistence = (): unknown[] => { - const parts: unknown[] = []; - if (currentThinkingSteps.size > 0) { - parts.push({ - type: "thinking-steps", - steps: Array.from(currentThinkingSteps.values()), - }); - } - for (const part of contentParts) { - if (part.type === "text" && part.text.length > 0) { - parts.push(part); - } else if (part.type === "tool-call" && TOOLS_WITH_UI.has(part.toolName)) { - parts.push(part); - } - } - return parts.length > 0 ? parts : [{ type: "text", text: "" }]; - }; try { const backendUrl = process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL || "http://localhost:8000"; @@ -1154,74 +948,48 @@ export default function NewChatPage() { throw new Error(`Backend error: ${response.status}`); } - if (!response.body) { - throw new Error("No response body"); - } - - const reader = response.body.getReader(); - const decoder = new TextDecoder(); - let buffer = ""; - - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - - buffer += decoder.decode(value, { stream: true }); - const events = buffer.split(/\r?\n\r?\n/); - buffer = events.pop() || ""; - - for (const event of events) { - const lines = event.split(/\r?\n/); - for (const line of lines) { - if (!line.startsWith("data: ")) continue; - const data = line.slice(6).trim(); - if (!data || data === "[DONE]") continue; - - try { - const parsed = JSON.parse(data); - + for await (const parsed of readSSEStream(response)) { switch (parsed.type) { case "text-delta": - appendText(parsed.delta); + appendText(contentPartsState, parsed.delta); setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); break; case "tool-input-start": - addToolCall(parsed.toolCallId, parsed.toolName, {}); + addToolCall(contentPartsState, TOOLS_WITH_UI, parsed.toolCallId, parsed.toolName, {}); setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); break; case "tool-input-available": if (toolCallIndices.has(parsed.toolCallId)) { - updateToolCall(parsed.toolCallId, { + updateToolCall(contentPartsState, parsed.toolCallId, { args: parsed.input || {}, }); } else { - addToolCall(parsed.toolCallId, parsed.toolName, parsed.input || {}); + addToolCall(contentPartsState, TOOLS_WITH_UI, parsed.toolCallId, parsed.toolName, parsed.input || {}); } setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); break; case "tool-output-available": - updateToolCall(parsed.toolCallId, { + updateToolCall(contentPartsState, parsed.toolCallId, { result: parsed.output, }); setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); break; @@ -1253,7 +1021,7 @@ export default function NewChatPage() { } ); if (existingIdx) { - updateToolCall(existingIdx[0], { + updateToolCall(contentPartsState, existingIdx[0], { result: { __interrupt__: true, ...interruptData, @@ -1261,8 +1029,8 @@ export default function NewChatPage() { }); } else { const tcId = `interrupt-${action.name}`; - addToolCall(tcId, action.name, action.args); - updateToolCall(tcId, { + addToolCall(contentPartsState, TOOLS_WITH_UI, tcId, action.name, action.args); + updateToolCall(contentPartsState, tcId, { result: { __interrupt__: true, ...interruptData, @@ -1272,7 +1040,7 @@ export default function NewChatPage() { } setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); setPendingInterrupt({ @@ -1285,19 +1053,10 @@ export default function NewChatPage() { case "error": throw new Error(parsed.errorText || "Server error"); - } - } catch (e) { - if (e instanceof SyntaxError) continue; - throw e; } - } - } - } - } finally { - reader.releaseLock(); } - const finalContent = buildContentForPersistence(); + const finalContent = buildContentForPersistence(contentPartsState, TOOLS_WITH_UI, currentThinkingSteps); if (contentParts.length > 0) { try { const savedMessage = await appendMessage(resumeThreadId, { @@ -1456,77 +1215,10 @@ export default function NewChatPage() { const assistantMsgId = `msg-assistant-${Date.now()}`; const currentThinkingSteps = new Map(); - // Content parts tracking (same as onNew) - type ContentPart = - | { type: "text"; text: string } - | { - type: "tool-call"; - toolCallId: string; - toolName: string; - args: Record; - result?: unknown; - }; - const contentParts: ContentPart[] = []; - let currentTextPartIndex = -1; - const toolCallIndices = new Map(); + const contentPartsState: ContentPartsState = { contentParts: [], currentTextPartIndex: -1, toolCallIndices: new Map() }; + const { contentParts, toolCallIndices } = contentPartsState; - const appendText = (delta: string) => { - if (currentTextPartIndex >= 0 && contentParts[currentTextPartIndex]?.type === "text") { - (contentParts[currentTextPartIndex] as { type: "text"; text: string }).text += delta; - } else { - contentParts.push({ type: "text", text: delta }); - currentTextPartIndex = contentParts.length - 1; - } - }; - const addToolCall = (toolCallId: string, toolName: string, args: Record) => { - if (TOOLS_WITH_UI.has(toolName)) { - contentParts.push({ type: "tool-call", toolCallId, toolName, args }); - toolCallIndices.set(toolCallId, contentParts.length - 1); - currentTextPartIndex = -1; - } - }; - - const updateToolCall = ( - toolCallId: string, - update: { args?: Record; result?: unknown } - ) => { - const index = toolCallIndices.get(toolCallId); - if (index !== undefined && contentParts[index]?.type === "tool-call") { - const tc = contentParts[index] as ContentPart & { type: "tool-call" }; - if (update.args) tc.args = update.args; - if (update.result !== undefined) tc.result = update.result; - } - }; - - const buildContentForUI = (): ThreadMessageLike["content"] => { - const filtered = contentParts.filter((part) => { - if (part.type === "text") return part.text.length > 0; - if (part.type === "tool-call") return TOOLS_WITH_UI.has(part.toolName); - return false; - }); - return filtered.length > 0 - ? (filtered as ThreadMessageLike["content"]) - : [{ type: "text", text: "" }]; - }; - - const buildContentForPersistence = (): unknown[] => { - const parts: unknown[] = []; - if (currentThinkingSteps.size > 0) { - parts.push({ - type: "thinking-steps", - steps: Array.from(currentThinkingSteps.values()), - }); - } - for (const part of contentParts) { - if (part.type === "text" && part.text.length > 0) { - parts.push(part); - } else if (part.type === "tool-call" && TOOLS_WITH_UI.has(part.toolName)) { - parts.push(part); - } - } - return parts.length > 0 ? parts : [{ type: "text", text: "" }]; - }; // Add placeholder messages to UI // Always add back the user message (with new query for edit, or original content for reload) @@ -1570,68 +1262,41 @@ export default function NewChatPage() { throw new Error(`Backend error: ${response.status}`); } - if (!response.body) { - throw new Error("No response body"); - } - - // Parse SSE stream (same logic as onNew) - const reader = response.body.getReader(); - const decoder = new TextDecoder(); - let buffer = ""; - - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - - buffer += decoder.decode(value, { stream: true }); - const events = buffer.split(/\r?\n\r?\n/); - buffer = events.pop() || ""; - - for (const event of events) { - const lines = event.split(/\r?\n/); - for (const line of lines) { - if (!line.startsWith("data: ")) continue; - const data = line.slice(6).trim(); - if (!data || data === "[DONE]") continue; - - try { - const parsed = JSON.parse(data); - + for await (const parsed of readSSEStream(response)) { switch (parsed.type) { case "text-delta": - appendText(parsed.delta); + appendText(contentPartsState, parsed.delta); setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); break; case "tool-input-start": - addToolCall(parsed.toolCallId, parsed.toolName, {}); + addToolCall(contentPartsState, TOOLS_WITH_UI, parsed.toolCallId, parsed.toolName, {}); setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); break; case "tool-input-available": if (toolCallIndices.has(parsed.toolCallId)) { - updateToolCall(parsed.toolCallId, { args: parsed.input || {} }); + updateToolCall(contentPartsState, parsed.toolCallId, { args: parsed.input || {} }); } else { - addToolCall(parsed.toolCallId, parsed.toolName, parsed.input || {}); + addToolCall(contentPartsState, TOOLS_WITH_UI, parsed.toolCallId, parsed.toolName, parsed.input || {}); } setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); break; case "tool-output-available": - updateToolCall(parsed.toolCallId, { result: parsed.output }); + updateToolCall(contentPartsState, parsed.toolCallId, { result: parsed.output }); if (parsed.output?.status === "pending" && parsed.output?.podcast_id) { const idx = toolCallIndices.get(parsed.toolCallId); if (idx !== undefined) { @@ -1643,7 +1308,7 @@ export default function NewChatPage() { } setMessages((prev) => prev.map((m) => - m.id === assistantMsgId ? { ...m, content: buildContentForUI() } : m + m.id === assistantMsgId ? { ...m, content: buildContentForUI(contentPartsState, TOOLS_WITH_UI) } : m ) ); break; @@ -1663,20 +1328,11 @@ export default function NewChatPage() { case "error": throw new Error(parsed.errorText || "Server error"); - } - } catch (e) { - if (e instanceof SyntaxError) continue; - throw e; } - } - } - } - } finally { - reader.releaseLock(); } // Persist messages after streaming completes - const finalContent = buildContentForPersistence(); + const finalContent = buildContentForPersistence(contentPartsState, TOOLS_WITH_UI, currentThinkingSteps); if (contentParts.length > 0) { try { // Persist user message (for both edit and reload modes, since backend deleted it) diff --git a/surfsense_web/lib/chat/streaming-state.ts b/surfsense_web/lib/chat/streaming-state.ts new file mode 100644 index 000000000..7165cd9f3 --- /dev/null +++ b/surfsense_web/lib/chat/streaming-state.ts @@ -0,0 +1,184 @@ +import type { ThreadMessageLike } from "@assistant-ui/react"; + +/** + * Extracted from page.tsx lines 131-136. + * Used across onNew, handleResume, and handleRegenerate. + */ +export interface ThinkingStepData { + id: string; + title: string; + status: "pending" | "in_progress" | "completed"; + items: string[]; +} + +/** + * Extracted from page.tsx lines 537-545. + * Duplicated in onNew, handleResume, and handleRegenerate. + */ +export type ContentPart = + | { type: "text"; text: string } + | { + type: "tool-call"; + toolCallId: string; + toolName: string; + args: Record; + result?: unknown; + }; + +/** + * Mutable state shared by the content-part helpers (appendText, addToolCall, etc.). + * All handlers create this same set of variables -- this groups them into one object + * so helpers can read/write them by reference. + */ +export interface ContentPartsState { + contentParts: ContentPart[]; + currentTextPartIndex: number; + toolCallIndices: Map; +} + +/** + * Extracted from page.tsx lines 556-573 (onNew). + * Identical in handleResume (lines 1057-1064) and handleRegenerate (lines 1445-1452). + */ +export function appendText(state: ContentPartsState, delta: string): void { + if (state.currentTextPartIndex >= 0 && state.contentParts[state.currentTextPartIndex]?.type === "text") { + (state.contentParts[state.currentTextPartIndex] as { type: "text"; text: string }).text += delta; + } else { + state.contentParts.push({ type: "text", text: delta }); + state.currentTextPartIndex = state.contentParts.length - 1; + } +} + +/** + * Extracted from page.tsx line 540 (onNew). + * Identical in handleResume (line 1029) and handleRegenerate (line 1407). + */ +export function addToolCall( + state: ContentPartsState, + toolsWithUI: Set, + toolCallId: string, + toolName: string, + args: Record +): void { + if (toolsWithUI.has(toolName)) { + state.contentParts.push({ + type: "tool-call", + toolCallId, + toolName, + args, + }); + state.toolCallIndices.set(toolCallId, state.contentParts.length - 1); + state.currentTextPartIndex = -1; + } +} + +/** + * Extracted from page.tsx line 540 (onNew). + * Identical in handleResume (line 1027) and handleRegenerate (line 1387). + */ +export function updateToolCall( + state: ContentPartsState, + toolCallId: string, + update: { args?: Record; result?: unknown } +): void { + const index = state.toolCallIndices.get(toolCallId); + if (index !== undefined && state.contentParts[index]?.type === "tool-call") { + const tc = state.contentParts[index] as ContentPart & { type: "tool-call" }; + if (update.args) tc.args = update.args; + if (update.result !== undefined) tc.result = update.result; + } +} + +/** + * Extracted from page.tsx line 539 (onNew). + * Identical in handleResume and handleRegenerate. + */ +export function buildContentForUI( + state: ContentPartsState, + toolsWithUI: Set +): ThreadMessageLike["content"] { + const filtered = state.contentParts.filter((part) => { + if (part.type === "text") return part.text.length > 0; + if (part.type === "tool-call") return toolsWithUI.has(part.toolName); + return false; + }); + return filtered.length > 0 + ? (filtered as ThreadMessageLike["content"]) + : [{ type: "text", text: "" }]; +} + +/** + * Extracted from page.tsx line 553 (onNew). + * Identical in handleResume and handleRegenerate. + */ +export function buildContentForPersistence( + state: ContentPartsState, + toolsWithUI: Set, + currentThinkingSteps: Map +): unknown[] { + const parts: unknown[] = []; + + if (currentThinkingSteps.size > 0) { + parts.push({ + type: "thinking-steps", + steps: Array.from(currentThinkingSteps.values()), + }); + } + + for (const part of state.contentParts) { + if (part.type === "text" && part.text.length > 0) { + parts.push(part); + } else if (part.type === "tool-call" && toolsWithUI.has(part.toolName)) { + parts.push(part); + } + } + + return parts.length > 0 ? parts : [{ type: "text", text: "" }]; +} + +/** + * Async generator that reads an SSE stream and yields parsed JSON objects. + * Handles buffering, event splitting, and skips malformed JSON / [DONE] lines. + * + * Extracted from the identical SSE reading boilerplate in onNew, handleResume, + * and handleRegenerate. + */ +// biome-ignore lint/suspicious/noExplicitAny: matches JSON.parse return type +export async function* readSSEStream(response: Response): AsyncGenerator { + if (!response.body) { + throw new Error("No response body"); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const events = buffer.split(/\r?\n\r?\n/); + buffer = events.pop() || ""; + + for (const event of events) { + const lines = event.split(/\r?\n/); + for (const line of lines) { + if (!line.startsWith("data: ")) continue; + const data = line.slice(6).trim(); + if (!data || data === "[DONE]") continue; + + try { + yield JSON.parse(data); + } catch (e) { + if (e instanceof SyntaxError) continue; + throw e; + } + } + } + } + } finally { + reader.releaseLock(); + } +} From 2ef24740587ccddfd6fe010d95d2e12e2bdf88d7 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 11 Feb 2026 15:43:07 +0200 Subject: [PATCH 08/42] add HITL tool argument editing with approval UI Enables users to edit tool call arguments before execution in human-in-the-loop workflows. Adds edit mode UI with form fields, grayscale styling, and subtle pulse animations for pending approvals. Backend stub enhanced to verify edited arguments are correctly passed through. --- .../new_chat/tools/create_notion_page.py | 13 +- .../new-chat/[[...chat_id]]/page.tsx | 28 ++- surfsense_web/app/globals.css | 16 ++ .../components/tool-ui/create-notion-page.tsx | 186 +++++++++++++++--- surfsense_web/lib/chat/streaming-state.ts | 44 +---- 5 files changed, 216 insertions(+), 71 deletions(-) diff --git a/surfsense_backend/app/agents/new_chat/tools/create_notion_page.py b/surfsense_backend/app/agents/new_chat/tools/create_notion_page.py index bc3d2b81f..bbbab81c3 100644 --- a/surfsense_backend/app/agents/new_chat/tools/create_notion_page.py +++ b/surfsense_backend/app/agents/new_chat/tools/create_notion_page.py @@ -1,3 +1,4 @@ +import hashlib from typing import Any from langchain_core.tools import tool @@ -19,11 +20,19 @@ def create_create_notion_page_tool(): title: The title of the Notion page. content: The markdown content for the page body. """ + # Generate a unique page ID based on title for testing + # This helps verify if edited args were used + page_hash = hashlib.md5(title.encode()).hexdigest()[:8] + + # Return detailed response showing what was actually received return { "status": "success", - "page_id": "stub-page-id-12345", + "page_id": f"stub-page-{page_hash}", "title": title, - "url": "https://www.notion.so/stub-page-12345", + "content_preview": content[:100] + "..." if len(content) > 100 else content, + "content_length": len(content), + "url": f"https://www.notion.so/stub-page-{page_hash}", + "message": f"✅ Created Notion page '{title}' with {len(content)} characters", } return create_notion_page diff --git a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx index 448148229..045626307 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/new-chat/[[...chat_id]]/page.tsx @@ -864,7 +864,7 @@ export default function NewChatPage() { ); const handleResume = useCallback( - async (decisions: Array<{ type: string; message?: string }>) => { + async (decisions: Array<{ type: string; message?: string; edited_action?: { name: string; args: Record } }>) => { if (!pendingInterrupt) return; const { threadId: resumeThreadId, assistantMsgId } = pendingInterrupt; setPendingInterrupt(null); @@ -1098,10 +1098,16 @@ export default function NewChatPage() { useEffect(() => { const handler = (e: Event) => { const detail = (e as CustomEvent).detail as { - decisions: Array<{ type: string; message?: string }>; + decisions: Array<{ + type: string; + message?: string; + edited_action?: { name: string; args: Record }; + }>; }; if (detail?.decisions && pendingInterrupt) { - const decisionType = detail.decisions[0]?.type as "approve" | "reject"; + const decision = detail.decisions[0]; + const decisionType = decision?.type as "approve" | "reject" | "edit"; + setMessages((prev) => prev.map((m) => { if (m.id !== pendingInterrupt.assistantMsgId) return m; @@ -1113,9 +1119,23 @@ export default function NewChatPage() { part.result !== null && "__interrupt__" in part.result ) { + // For edit decisions, also update the displayed args + if (decisionType === "edit" && decision.edited_action) { + return { + ...part, + args: decision.edited_action.args, // Update displayed args + result: { + ...(part.result as Record), + __decided__: decisionType + }, + }; + } return { ...part, - result: { ...(part.result as Record), __decided__: decisionType }, + result: { + ...(part.result as Record), + __decided__: decisionType + }, }; } return part; diff --git a/surfsense_web/app/globals.css b/surfsense_web/app/globals.css index cf6f48437..475bcb1b4 100644 --- a/surfsense_web/app/globals.css +++ b/surfsense_web/app/globals.css @@ -187,5 +187,21 @@ button { background-color: hsl(var(--muted-foreground) / 0.4); } +/* Human-in-the-loop approval card animations */ +@keyframes pulse-subtle { + 0%, 100% { + opacity: 1; + box-shadow: 0 0 0 0 rgb(0 0 0 / 0.15); + } + 50% { + opacity: 1; + box-shadow: 0 0 20px 4px rgb(0 0 0 / 0.12); + } +} + +.animate-pulse-subtle { + animation: pulse-subtle 2s cubic-bezier(0.4, 0, 0.6, 1) infinite; +} + @source '../node_modules/@llamaindex/chat-ui/**/*.{ts,tsx}'; @source '../node_modules/streamdown/dist/*.js'; diff --git a/surfsense_web/components/tool-ui/create-notion-page.tsx b/surfsense_web/components/tool-ui/create-notion-page.tsx index 9f69067b1..a1cf67109 100644 --- a/surfsense_web/components/tool-ui/create-notion-page.tsx +++ b/surfsense_web/components/tool-ui/create-notion-page.tsx @@ -1,13 +1,15 @@ "use client"; import { makeAssistantToolUI } from "@assistant-ui/react"; -import { CheckIcon, FileTextIcon, Loader2Icon, XIcon } from "lucide-react"; +import { AlertTriangleIcon, CheckIcon, FileTextIcon, Loader2Icon, PencilIcon, XIcon } from "lucide-react"; import { useState } from "react"; import { Button } from "@/components/ui/button"; +import { Input } from "@/components/ui/input"; +import { Textarea } from "@/components/ui/textarea"; interface InterruptResult { __interrupt__: true; - __decided__?: "approve" | "reject"; + __decided__?: "approve" | "reject" | "edit"; action_requests: Array<{ name: string; args: Record; @@ -24,6 +26,9 @@ interface SuccessResult { page_id: string; title: string; url: string; + content_preview?: string; + content_length?: number; + message?: string; } type CreateNotionPageResult = InterruptResult | SuccessResult; @@ -44,50 +49,114 @@ function ApprovalCard({ }: { args: Record; interruptData: InterruptResult; - onDecision: (decision: { type: "approve" | "reject"; message?: string }) => void; + onDecision: (decision: { type: "approve" | "reject" | "edit"; message?: string; edited_action?: { name: string; args: Record } }) => void; }) { - const [decided, setDecided] = useState<"approve" | "reject" | null>( + const [decided, setDecided] = useState<"approve" | "reject" | "edit" | null>( interruptData.__decided__ ?? null ); + const [isEditing, setIsEditing] = useState(false); + const [editedArgs, setEditedArgs] = useState>(args); + const reviewConfig = interruptData.review_configs[0]; const allowedDecisions = reviewConfig?.allowed_decisions ?? ["approve", "reject"]; + const canEdit = allowedDecisions.includes("edit"); return ( -
-
-
- +
+
+
+
-

Create Notion Page

-

- Requires your approval to proceed +

Create Notion Page

+

+ {isEditing ? "You can edit the arguments below" : "Requires your approval to proceed"}

-
- {args.title != null && ( -
-

Title

-

{String(args.title)}

+ {/* Display mode - show args as read-only */} + {!isEditing && ( +
+ {args.title != null && ( +
+

Title

+

{String(args.title)}

+
+ )} + {args.content != null && ( +
+

Content

+

{String(args.content)}

+
+ )}
)} - {args.content != null && ( -
-

Content

-

{String(args.content)}

-
- )} -
-
+ {/* Edit mode - show editable form fields */} + {isEditing && ( +
+
+ + setEditedArgs({ ...editedArgs, title: e.target.value })} + placeholder="Enter page title" + /> +
+
+ +