diff --git a/surfsense_backend/app/tasks/chat/stream_new_chat.py b/surfsense_backend/app/tasks/chat/stream_new_chat.py index 852793230..2cd05e80c 100644 --- a/surfsense_backend/app/tasks/chat/stream_new_chat.py +++ b/surfsense_backend/app/tasks/chat/stream_new_chat.py @@ -11,6 +11,8 @@ Supports loading LLM configurations from: import json from collections.abc import AsyncGenerator +from dataclasses import dataclass +from typing import Any from uuid import UUID from langchain_core.messages import HumanMessage @@ -178,6 +180,550 @@ def extract_todos_from_deepagents(command_output) -> dict: return {"todos": todos_data} +@dataclass +class StreamResult: + accumulated_text: str = "" + is_interrupted: bool = False + interrupt_value: dict[str, Any] | None = None + + +async def _stream_agent_events( + agent: Any, + config: dict[str, Any], + input_data: Any, + streaming_service: VercelStreamingService, + result: StreamResult, + step_prefix: str = "thinking", + initial_step_id: str | None = None, + initial_step_title: str = "", + initial_step_items: list[str] | None = None, +) -> AsyncGenerator[str, None]: + """Shared async generator that streams and formats astream_events from the agent. + + Yields SSE-formatted strings. After exhausting, inspect the ``result`` + object for accumulated_text and interrupt state. + + Args: + agent: The compiled LangGraph agent. + config: LangGraph config dict (must include configurable.thread_id). + input_data: The input to pass to agent.astream_events (dict or Command). + streaming_service: VercelStreamingService instance for formatting events. + result: Mutable StreamResult populated with accumulated_text / interrupt info. + step_prefix: Prefix for thinking step IDs (e.g. "thinking" or "thinking-resume"). + initial_step_id: If set, the helper inherits an already-active thinking step. + initial_step_title: Title of the inherited thinking step. + initial_step_items: Items of the inherited thinking step. + + Yields: + SSE-formatted strings for each event. + """ + accumulated_text = "" + current_text_id: str | None = None + thinking_step_counter = 1 if initial_step_id else 0 + tool_step_ids: dict[str, str] = {} + completed_step_ids: set[str] = set() + last_active_step_id: str | None = initial_step_id + last_active_step_title: str = initial_step_title + last_active_step_items: list[str] = initial_step_items or [] + just_finished_tool: bool = False + + def next_thinking_step_id() -> str: + nonlocal thinking_step_counter + thinking_step_counter += 1 + return f"{step_prefix}-{thinking_step_counter}" + + def complete_current_step() -> str | None: + nonlocal last_active_step_id + if last_active_step_id and last_active_step_id not in completed_step_ids: + completed_step_ids.add(last_active_step_id) + event = streaming_service.format_thinking_step( + step_id=last_active_step_id, + title=last_active_step_title, + status="completed", + items=last_active_step_items if last_active_step_items else None, + ) + last_active_step_id = None + return event + return None + + async for event in agent.astream_events(input_data, config=config, version="v2"): + event_type = event.get("event", "") + + if event_type == "on_chat_model_stream": + chunk = event.get("data", {}).get("chunk") + if chunk and hasattr(chunk, "content"): + content = chunk.content + if content and isinstance(content, str): + if current_text_id is None: + completion_event = complete_current_step() + if completion_event: + yield completion_event + if just_finished_tool: + last_active_step_id = None + last_active_step_title = "" + last_active_step_items = [] + just_finished_tool = False + current_text_id = streaming_service.generate_text_id() + yield streaming_service.format_text_start(current_text_id) + yield streaming_service.format_text_delta(current_text_id, content) + accumulated_text += content + + elif event_type == "on_tool_start": + tool_name = event.get("name", "unknown_tool") + run_id = event.get("run_id", "") + tool_input = event.get("data", {}).get("input", {}) + + if current_text_id is not None: + yield streaming_service.format_text_end(current_text_id) + current_text_id = None + + if last_active_step_title != "Synthesizing response": + completion_event = complete_current_step() + if completion_event: + yield completion_event + + just_finished_tool = False + tool_step_id = next_thinking_step_id() + tool_step_ids[run_id] = tool_step_id + last_active_step_id = tool_step_id + + if tool_name == "search_knowledge_base": + query = ( + tool_input.get("query", "") + if isinstance(tool_input, dict) + else str(tool_input) + ) + last_active_step_title = "Searching knowledge base" + last_active_step_items = [ + f"Query: {query[:100]}{'...' if len(query) > 100 else ''}" + ] + yield streaming_service.format_thinking_step( + step_id=tool_step_id, + title="Searching knowledge base", + status="in_progress", + items=last_active_step_items, + ) + elif tool_name == "link_preview": + url = ( + tool_input.get("url", "") + if isinstance(tool_input, dict) + else str(tool_input) + ) + last_active_step_title = "Fetching link preview" + last_active_step_items = [ + f"URL: {url[:80]}{'...' if len(url) > 80 else ''}" + ] + yield streaming_service.format_thinking_step( + step_id=tool_step_id, + title="Fetching link preview", + status="in_progress", + items=last_active_step_items, + ) + elif tool_name == "display_image": + src = ( + tool_input.get("src", "") + if isinstance(tool_input, dict) + else str(tool_input) + ) + title = ( + tool_input.get("title", "") if isinstance(tool_input, dict) else "" + ) + last_active_step_title = "Analyzing the image" + last_active_step_items = [ + f"Analyzing: {title[:50] if title else src[:50]}{'...' if len(title or src) > 50 else ''}" + ] + yield streaming_service.format_thinking_step( + step_id=tool_step_id, + title="Analyzing the image", + status="in_progress", + items=last_active_step_items, + ) + elif tool_name == "scrape_webpage": + url = ( + tool_input.get("url", "") + if isinstance(tool_input, dict) + else str(tool_input) + ) + last_active_step_title = "Scraping webpage" + last_active_step_items = [ + f"URL: {url[:80]}{'...' if len(url) > 80 else ''}" + ] + yield streaming_service.format_thinking_step( + step_id=tool_step_id, + title="Scraping webpage", + status="in_progress", + items=last_active_step_items, + ) + elif tool_name == "generate_podcast": + podcast_title = ( + tool_input.get("podcast_title", "SurfSense Podcast") + if isinstance(tool_input, dict) + else "SurfSense Podcast" + ) + content_len = len( + tool_input.get("source_content", "") + if isinstance(tool_input, dict) + else "" + ) + last_active_step_title = "Generating podcast" + last_active_step_items = [ + f"Title: {podcast_title}", + f"Content: {content_len:,} characters", + "Preparing audio generation...", + ] + yield streaming_service.format_thinking_step( + step_id=tool_step_id, + title="Generating podcast", + status="in_progress", + items=last_active_step_items, + ) + else: + last_active_step_title = f"Using {tool_name.replace('_', ' ')}" + last_active_step_items = [] + yield streaming_service.format_thinking_step( + step_id=tool_step_id, + title=last_active_step_title, + status="in_progress", + ) + + tool_call_id = ( + f"call_{run_id[:32]}" + if run_id + else streaming_service.generate_tool_call_id() + ) + yield streaming_service.format_tool_input_start(tool_call_id, tool_name) + yield streaming_service.format_tool_input_available( + tool_call_id, + tool_name, + tool_input if isinstance(tool_input, dict) else {"input": tool_input}, + ) + + elif event_type == "on_tool_end": + run_id = event.get("run_id", "") + tool_name = event.get("name", "unknown_tool") + raw_output = event.get("data", {}).get("output", "") + + if hasattr(raw_output, "content"): + content = raw_output.content + if isinstance(content, str): + try: + tool_output = json.loads(content) + except (json.JSONDecodeError, TypeError): + tool_output = {"result": content} + elif isinstance(content, dict): + tool_output = content + else: + tool_output = {"result": str(content)} + elif isinstance(raw_output, dict): + tool_output = raw_output + else: + tool_output = {"result": str(raw_output) if raw_output else "completed"} + + tool_call_id = f"call_{run_id[:32]}" if run_id else "call_unknown" + original_step_id = tool_step_ids.get( + run_id, f"{step_prefix}-unknown-{run_id[:8]}" + ) + completed_step_ids.add(original_step_id) + + if tool_name == "search_knowledge_base": + result_info = "Search completed" + if isinstance(tool_output, dict): + result_len = tool_output.get("result_length", 0) + if result_len > 0: + result_info = f"Found relevant information ({result_len} chars)" + completed_items = [*last_active_step_items, result_info] + yield streaming_service.format_thinking_step( + step_id=original_step_id, + title="Searching knowledge base", + status="completed", + items=completed_items, + ) + elif tool_name == "link_preview": + if isinstance(tool_output, dict): + title = tool_output.get("title", "Link") + domain = tool_output.get("domain", "") + has_error = "error" in tool_output + if has_error: + completed_items = [ + *last_active_step_items, + f"Error: {tool_output.get('error', 'Failed to fetch')}", + ] + else: + completed_items = [ + *last_active_step_items, + f"Title: {title[:60]}{'...' if len(title) > 60 else ''}", + f"Domain: {domain}" if domain else "Preview loaded", + ] + else: + completed_items = [*last_active_step_items, "Preview loaded"] + yield streaming_service.format_thinking_step( + step_id=original_step_id, + title="Fetching link preview", + status="completed", + items=completed_items, + ) + elif tool_name == "display_image": + if isinstance(tool_output, dict): + title = tool_output.get("title", "") + alt = tool_output.get("alt", "Image") + display_name = title or alt + completed_items = [ + *last_active_step_items, + f"Analyzed: {display_name[:50]}{'...' if len(display_name) > 50 else ''}", + ] + else: + completed_items = [*last_active_step_items, "Image analyzed"] + yield streaming_service.format_thinking_step( + step_id=original_step_id, + title="Analyzing the image", + status="completed", + items=completed_items, + ) + elif tool_name == "scrape_webpage": + if isinstance(tool_output, dict): + title = tool_output.get("title", "Webpage") + word_count = tool_output.get("word_count", 0) + has_error = "error" in tool_output + if has_error: + completed_items = [ + *last_active_step_items, + f"Error: {tool_output.get('error', 'Failed to scrape')[:50]}", + ] + else: + completed_items = [ + *last_active_step_items, + f"Title: {title[:50]}{'...' if len(title) > 50 else ''}", + f"Extracted: {word_count:,} words", + ] + else: + completed_items = [*last_active_step_items, "Content extracted"] + yield streaming_service.format_thinking_step( + step_id=original_step_id, + title="Scraping webpage", + status="completed", + items=completed_items, + ) + elif tool_name == "generate_podcast": + podcast_status = ( + tool_output.get("status", "unknown") + if isinstance(tool_output, dict) + else "unknown" + ) + podcast_title = ( + tool_output.get("title", "Podcast") + if isinstance(tool_output, dict) + else "Podcast" + ) + if podcast_status == "processing": + completed_items = [ + f"Title: {podcast_title}", + "Audio generation started", + "Processing in background...", + ] + elif podcast_status == "already_generating": + completed_items = [ + f"Title: {podcast_title}", + "Podcast already in progress", + "Please wait for it to complete", + ] + elif podcast_status == "error": + error_msg = ( + tool_output.get("error", "Unknown error") + if isinstance(tool_output, dict) + else "Unknown error" + ) + completed_items = [ + f"Title: {podcast_title}", + f"Error: {error_msg[:50]}", + ] + else: + completed_items = last_active_step_items + yield streaming_service.format_thinking_step( + step_id=original_step_id, + title="Generating podcast", + status="completed", + items=completed_items, + ) + elif tool_name == "ls": + if isinstance(tool_output, dict): + ls_output = tool_output.get("result", "") + elif isinstance(tool_output, str): + ls_output = tool_output + else: + ls_output = str(tool_output) if tool_output else "" + file_names: list[str] = [] + if ls_output: + for line in ls_output.strip().split("\n"): + line = line.strip() + if line: + name = line.rstrip("/").split("/")[-1] + if name and len(name) <= 40: + file_names.append(name) + elif name: + file_names.append(name[:37] + "...") + if file_names: + if len(file_names) <= 5: + completed_items = [f"[{name}]" for name in file_names] + else: + completed_items = [f"[{name}]" for name in file_names[:4]] + completed_items.append(f"(+{len(file_names) - 4} more)") + else: + completed_items = ["No files found"] + yield streaming_service.format_thinking_step( + step_id=original_step_id, + title="Exploring files", + status="completed", + items=completed_items, + ) + else: + yield streaming_service.format_thinking_step( + step_id=original_step_id, + title=f"Using {tool_name.replace('_', ' ')}", + status="completed", + items=last_active_step_items, + ) + + just_finished_tool = True + last_active_step_id = None + last_active_step_title = "" + last_active_step_items = [] + + if tool_name == "generate_podcast": + yield streaming_service.format_tool_output_available( + tool_call_id, + tool_output + if isinstance(tool_output, dict) + else {"result": tool_output}, + ) + if ( + isinstance(tool_output, dict) + and tool_output.get("status") == "success" + ): + yield streaming_service.format_terminal_info( + f"Podcast generated successfully: {tool_output.get('title', 'Podcast')}", + "success", + ) + else: + error_msg = ( + tool_output.get("error", "Unknown error") + if isinstance(tool_output, dict) + else "Unknown error" + ) + yield streaming_service.format_terminal_info( + f"Podcast generation failed: {error_msg}", + "error", + ) + elif tool_name == "link_preview": + yield streaming_service.format_tool_output_available( + tool_call_id, + tool_output + if isinstance(tool_output, dict) + else {"result": tool_output}, + ) + if isinstance(tool_output, dict) and "error" not in tool_output: + title = tool_output.get("title", "Link") + yield streaming_service.format_terminal_info( + f"Link preview loaded: {title[:50]}{'...' if len(title) > 50 else ''}", + "success", + ) + else: + error_msg = ( + tool_output.get("error", "Failed to fetch") + if isinstance(tool_output, dict) + else "Failed to fetch" + ) + yield streaming_service.format_terminal_info( + f"Link preview failed: {error_msg}", + "error", + ) + elif tool_name == "display_image": + yield streaming_service.format_tool_output_available( + tool_call_id, + tool_output + if isinstance(tool_output, dict) + else {"result": tool_output}, + ) + if isinstance(tool_output, dict): + title = tool_output.get("title") or tool_output.get("alt", "Image") + yield streaming_service.format_terminal_info( + f"Image analyzed: {title[:40]}{'...' if len(title) > 40 else ''}", + "success", + ) + elif tool_name == "scrape_webpage": + if isinstance(tool_output, dict): + display_output = { + k: v for k, v in tool_output.items() if k != "content" + } + if "content" in tool_output: + content = tool_output.get("content", "") + display_output["content_preview"] = ( + content[:500] + "..." if len(content) > 500 else content + ) + yield streaming_service.format_tool_output_available( + tool_call_id, + display_output, + ) + else: + yield streaming_service.format_tool_output_available( + tool_call_id, + {"result": tool_output}, + ) + if isinstance(tool_output, dict) and "error" not in tool_output: + title = tool_output.get("title", "Webpage") + word_count = tool_output.get("word_count", 0) + yield streaming_service.format_terminal_info( + f"Scraped: {title[:40]}{'...' if len(title) > 40 else ''} ({word_count:,} words)", + "success", + ) + else: + error_msg = ( + tool_output.get("error", "Failed to scrape") + if isinstance(tool_output, dict) + else "Failed to scrape" + ) + yield streaming_service.format_terminal_info( + f"Scrape failed: {error_msg}", + "error", + ) + elif tool_name == "search_knowledge_base": + yield streaming_service.format_tool_output_available( + tool_call_id, + {"status": "completed", "result_length": len(str(tool_output))}, + ) + yield streaming_service.format_terminal_info( + "Knowledge base search completed", "success" + ) + else: + yield streaming_service.format_tool_output_available( + tool_call_id, + {"status": "completed", "result_length": len(str(tool_output))}, + ) + yield streaming_service.format_terminal_info( + f"Tool {tool_name} completed", "success" + ) + + elif event_type in ("on_chain_end", "on_agent_end"): + if current_text_id is not None: + yield streaming_service.format_text_end(current_text_id) + current_text_id = None + + if current_text_id is not None: + yield streaming_service.format_text_end(current_text_id) + + completion_event = complete_current_step() + if completion_event: + yield completion_event + + result.accumulated_text = accumulated_text + + state = await agent.aget_state(config) + is_interrupted = state.tasks and any(task.interrupts for task in state.tasks) + if is_interrupted: + result.is_interrupted = True + result.interrupt_value = state.tasks[0].interrupts[0].value + yield streaming_service.format_interrupt_request(result.interrupt_value) + + async def stream_new_chat( user_query: str, search_space_id: int, @@ -215,9 +761,6 @@ async def stream_new_chat( """ streaming_service = VercelStreamingService() - # Track the current text block for streaming (defined early for exception handling) - current_text_id: str | None = None - try: # Mark AI as responding to this user for live collaboration if user_id: @@ -394,63 +937,18 @@ async def stream_new_chat( yield streaming_service.format_message_start() yield streaming_service.format_start_step() - # Reset text tracking for this stream - accumulated_text = "" - - # Track thinking steps for chain-of-thought display - thinking_step_counter = 0 - # Map run_id -> step_id for tool calls so we can update them on completion - tool_step_ids: dict[str, str] = {} - # Track the last active step so we can mark it complete at the end - last_active_step_id: str | None = None - last_active_step_title: str = "" - last_active_step_items: list[str] = [] - # Track which steps have been completed to avoid duplicate completions - completed_step_ids: set[str] = set() - # Track if we just finished a tool (text flows silently after tools) - just_finished_tool: bool = False - # Track write_todos calls to show "Creating plan" vs "Updating plan" - # Disabled for now - # write_todos_call_count: int = 0 - - def next_thinking_step_id() -> str: - nonlocal thinking_step_counter - thinking_step_counter += 1 - return f"thinking-{thinking_step_counter}" - - def complete_current_step() -> str | None: - """Complete the current active step and return the completion event, if any.""" - nonlocal last_active_step_id, last_active_step_title, last_active_step_items - if last_active_step_id and last_active_step_id not in completed_step_ids: - completed_step_ids.add(last_active_step_id) - return streaming_service.format_thinking_step( - step_id=last_active_step_id, - title=last_active_step_title, - status="completed", - items=last_active_step_items if last_active_step_items else None, - ) - return None - # Initial thinking step - analyzing the request - analyze_step_id = next_thinking_step_id() - last_active_step_id = analyze_step_id - - # Determine step title and action verb based on context if mentioned_documents or mentioned_surfsense_docs: - last_active_step_title = "Analyzing referenced content" + initial_title = "Analyzing referenced content" action_verb = "Analyzing" else: - last_active_step_title = "Understanding your request" + initial_title = "Understanding your request" action_verb = "Processing" - # Build the message with inline context about referenced documents processing_parts = [] - - # Add the user query query_text = user_query[:80] + ("..." if len(user_query) > 80 else "") processing_parts.append(query_text) - # Add mentioned document names inline if mentioned_documents: doc_names = [] for doc in mentioned_documents: @@ -463,7 +961,6 @@ async def stream_new_chat( else: processing_parts.append(f"[{len(doc_names)} documents]") - # Add mentioned SurfSense docs inline if mentioned_surfsense_docs: doc_names = [] for doc in mentioned_surfsense_docs: @@ -476,716 +973,38 @@ async def stream_new_chat( else: processing_parts.append(f"[{len(doc_names)} docs]") - last_active_step_items = [f"{action_verb}: {' '.join(processing_parts)}"] + initial_items = [f"{action_verb}: {' '.join(processing_parts)}"] + initial_step_id = "thinking-1" yield streaming_service.format_thinking_step( - step_id=analyze_step_id, - title=last_active_step_title, + step_id=initial_step_id, + title=initial_title, status="in_progress", - items=last_active_step_items, + items=initial_items, ) - # Stream the agent response with thread config for memory - async for event in agent.astream_events( - input_state, config=config, version="v2" + stream_result = StreamResult() + async for sse in _stream_agent_events( + agent=agent, + config=config, + input_data=input_state, + streaming_service=streaming_service, + result=stream_result, + step_prefix="thinking", + initial_step_id=initial_step_id, + initial_step_title=initial_title, + initial_step_items=initial_items, ): - event_type = event.get("event", "") + yield sse - # Handle chat model stream events (text streaming) - if event_type == "on_chat_model_stream": - chunk = event.get("data", {}).get("chunk") - if chunk and hasattr(chunk, "content"): - content = chunk.content - if content and isinstance(content, str): - # Start a new text block if needed - if current_text_id is None: - # Complete any previous step - completion_event = complete_current_step() - if completion_event: - yield completion_event - - if just_finished_tool: - # Clear the active step tracking - text flows without a dedicated step - last_active_step_id = None - last_active_step_title = "" - last_active_step_items = [] - just_finished_tool = False - - current_text_id = streaming_service.generate_text_id() - yield streaming_service.format_text_start(current_text_id) - - # Stream the text delta - yield streaming_service.format_text_delta( - current_text_id, content - ) - accumulated_text += content - - # Handle tool calls - elif event_type == "on_tool_start": - tool_name = event.get("name", "unknown_tool") - run_id = event.get("run_id", "") - tool_input = event.get("data", {}).get("input", {}) - - # End current text block if any - if current_text_id is not None: - yield streaming_service.format_text_end(current_text_id) - current_text_id = None - - # Complete any previous step EXCEPT "Synthesizing response" - # (we want to reuse the Synthesizing step after tools complete) - if last_active_step_title != "Synthesizing response": - completion_event = complete_current_step() - if completion_event: - yield completion_event - - # Reset the just_finished_tool flag since we're starting a new tool - just_finished_tool = False - - # Create thinking step for the tool call and store it for later update - tool_step_id = next_thinking_step_id() - tool_step_ids[run_id] = tool_step_id - last_active_step_id = tool_step_id - if tool_name == "search_knowledge_base": - query = ( - tool_input.get("query", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - last_active_step_title = "Searching knowledge base" - last_active_step_items = [ - f"Query: {query[:100]}{'...' if len(query) > 100 else ''}" - ] - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title="Searching knowledge base", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "link_preview": - url = ( - tool_input.get("url", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - last_active_step_title = "Fetching link preview" - last_active_step_items = [ - f"URL: {url[:80]}{'...' if len(url) > 80 else ''}" - ] - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title="Fetching link preview", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "display_image": - src = ( - tool_input.get("src", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - title = ( - tool_input.get("title", "") - if isinstance(tool_input, dict) - else "" - ) - last_active_step_title = "Analyzing the image" - last_active_step_items = [ - f"Analyzing: {title[:50] if title else src[:50]}{'...' if len(title or src) > 50 else ''}" - ] - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title="Analyzing the image", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "scrape_webpage": - url = ( - tool_input.get("url", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - last_active_step_title = "Scraping webpage" - last_active_step_items = [ - f"URL: {url[:80]}{'...' if len(url) > 80 else ''}" - ] - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title="Scraping webpage", - status="in_progress", - items=last_active_step_items, - ) - # elif tool_name == "write_todos": # Disabled for now - # # Track write_todos calls for better messaging - # write_todos_call_count += 1 - # todos = ( - # tool_input.get("todos", []) - # if isinstance(tool_input, dict) - # else [] - # ) - # todo_count = len(todos) if isinstance(todos, list) else 0 - - # if write_todos_call_count == 1: - # # First call - creating the plan - # last_active_step_title = "Creating plan" - # last_active_step_items = [f"Defining {todo_count} tasks..."] - # else: - # # Subsequent calls - updating the plan - # # Try to provide context about what's being updated - # in_progress_count = ( - # sum( - # 1 - # for t in todos - # if isinstance(t, dict) - # and t.get("status") == "in_progress" - # ) - # if isinstance(todos, list) - # else 0 - # ) - # completed_count = ( - # sum( - # 1 - # for t in todos - # if isinstance(t, dict) - # and t.get("status") == "completed" - # ) - # if isinstance(todos, list) - # else 0 - # ) - - # last_active_step_title = "Updating progress" - # last_active_step_items = ( - # [ - # f"Progress: {completed_count}/{todo_count} completed", - # f"In progress: {in_progress_count} tasks", - # ] - # if completed_count > 0 - # else [f"Working on {todo_count} tasks"] - # ) - - # yield streaming_service.format_thinking_step( - # step_id=tool_step_id, - # title=last_active_step_title, - # status="in_progress", - # items=last_active_step_items, - # ) - elif tool_name == "generate_podcast": - podcast_title = ( - tool_input.get("podcast_title", "SurfSense Podcast") - if isinstance(tool_input, dict) - else "SurfSense Podcast" - ) - # Get content length for context - content_len = len( - tool_input.get("source_content", "") - if isinstance(tool_input, dict) - else "" - ) - last_active_step_title = "Generating podcast" - last_active_step_items = [ - f"Title: {podcast_title}", - f"Content: {content_len:,} characters", - "Preparing audio generation...", - ] - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title="Generating podcast", - status="in_progress", - items=last_active_step_items, - ) - # elif tool_name == "ls": - # last_active_step_title = "Exploring files" - # last_active_step_items = [] - # yield streaming_service.format_thinking_step( - # step_id=tool_step_id, - # title="Exploring files", - # status="in_progress", - # items=None, - # ) - else: - last_active_step_title = f"Using {tool_name.replace('_', ' ')}" - last_active_step_items = [] - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title=last_active_step_title, - status="in_progress", - ) - - # Stream tool info - tool_call_id = ( - f"call_{run_id[:32]}" - if run_id - else streaming_service.generate_tool_call_id() - ) - yield streaming_service.format_tool_input_start(tool_call_id, tool_name) - yield streaming_service.format_tool_input_available( - tool_call_id, - tool_name, - tool_input - if isinstance(tool_input, dict) - else {"input": tool_input}, - ) - - elif event_type == "on_tool_end": - run_id = event.get("run_id", "") - tool_name = event.get("name", "unknown_tool") - raw_output = event.get("data", {}).get("output", "") - - # Handle deepagents' write_todos Command object specially - # Disabled for now - # if tool_name == "write_todos" and hasattr(raw_output, "update"): - # # deepagents returns a Command object - extract todos directly - # tool_output = extract_todos_from_deepagents(raw_output) - # elif hasattr(raw_output, "content"): - if hasattr(raw_output, "content"): - # It's a ToolMessage object - extract the content - content = raw_output.content - # If content is a string that looks like JSON, try to parse it - if isinstance(content, str): - try: - tool_output = json.loads(content) - except (json.JSONDecodeError, TypeError): - tool_output = {"result": content} - elif isinstance(content, dict): - tool_output = content - else: - tool_output = {"result": str(content)} - elif isinstance(raw_output, dict): - tool_output = raw_output - else: - tool_output = { - "result": str(raw_output) if raw_output else "completed" - } - - tool_call_id = f"call_{run_id[:32]}" if run_id else "call_unknown" - - # Get the original tool step ID to update it (not create a new one) - original_step_id = tool_step_ids.get( - run_id, f"thinking-unknown-{run_id[:8]}" - ) - - # Mark the tool thinking step as completed using the SAME step ID - # Also add to completed set so we don't try to complete it again - completed_step_ids.add(original_step_id) - if tool_name == "search_knowledge_base": - # Get result count if available - result_info = "Search completed" - if isinstance(tool_output, dict): - result_len = tool_output.get("result_length", 0) - if result_len > 0: - result_info = ( - f"Found relevant information ({result_len} chars)" - ) - # Include original query in completed items - completed_items = [*last_active_step_items, result_info] - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title="Searching knowledge base", - status="completed", - items=completed_items, - ) - elif tool_name == "link_preview": - # Build completion items based on link preview result - if isinstance(tool_output, dict): - title = tool_output.get("title", "Link") - domain = tool_output.get("domain", "") - has_error = "error" in tool_output - if has_error: - completed_items = [ - *last_active_step_items, - f"Error: {tool_output.get('error', 'Failed to fetch')}", - ] - else: - completed_items = [ - *last_active_step_items, - f"Title: {title[:60]}{'...' if len(title) > 60 else ''}", - f"Domain: {domain}" if domain else "Preview loaded", - ] - else: - completed_items = [*last_active_step_items, "Preview loaded"] - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title="Fetching link preview", - status="completed", - items=completed_items, - ) - elif tool_name == "display_image": - # Build completion items for image analysis - if isinstance(tool_output, dict): - title = tool_output.get("title", "") - alt = tool_output.get("alt", "Image") - display_name = title or alt - completed_items = [ - *last_active_step_items, - f"Analyzed: {display_name[:50]}{'...' if len(display_name) > 50 else ''}", - ] - else: - completed_items = [*last_active_step_items, "Image analyzed"] - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title="Analyzing the image", - status="completed", - items=completed_items, - ) - elif tool_name == "scrape_webpage": - # Build completion items for webpage scraping - if isinstance(tool_output, dict): - title = tool_output.get("title", "Webpage") - word_count = tool_output.get("word_count", 0) - has_error = "error" in tool_output - if has_error: - completed_items = [ - *last_active_step_items, - f"Error: {tool_output.get('error', 'Failed to scrape')[:50]}", - ] - else: - completed_items = [ - *last_active_step_items, - f"Title: {title[:50]}{'...' if len(title) > 50 else ''}", - f"Extracted: {word_count:,} words", - ] - else: - completed_items = [*last_active_step_items, "Content extracted"] - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title="Scraping webpage", - status="completed", - items=completed_items, - ) - elif tool_name == "generate_podcast": - # Build detailed completion items based on podcast status - podcast_status = ( - tool_output.get("status", "unknown") - if isinstance(tool_output, dict) - else "unknown" - ) - podcast_title = ( - tool_output.get("title", "Podcast") - if isinstance(tool_output, dict) - else "Podcast" - ) - - if podcast_status == "processing": - completed_items = [ - f"Title: {podcast_title}", - "Audio generation started", - "Processing in background...", - ] - elif podcast_status == "already_generating": - completed_items = [ - f"Title: {podcast_title}", - "Podcast already in progress", - "Please wait for it to complete", - ] - elif podcast_status == "error": - error_msg = ( - tool_output.get("error", "Unknown error") - if isinstance(tool_output, dict) - else "Unknown error" - ) - completed_items = [ - f"Title: {podcast_title}", - f"Error: {error_msg[:50]}", - ] - else: - completed_items = last_active_step_items - - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title="Generating podcast", - status="completed", - items=completed_items, - ) - # elif tool_name == "write_todos": # Disabled for now - # # Build completion items for planning/updating - # if isinstance(tool_output, dict): - # todos = tool_output.get("todos", []) - # todo_count = len(todos) if isinstance(todos, list) else 0 - # completed_count = ( - # sum( - # 1 - # for t in todos - # if isinstance(t, dict) - # and t.get("status") == "completed" - # ) - # if isinstance(todos, list) - # else 0 - # ) - # in_progress_count = ( - # sum( - # 1 - # for t in todos - # if isinstance(t, dict) - # and t.get("status") == "in_progress" - # ) - # if isinstance(todos, list) - # else 0 - # ) - - # # Use context-aware completion message - # if last_active_step_title == "Creating plan": - # completed_items = [f"Created {todo_count} tasks"] - # else: - # # Updating progress - show stats - # completed_items = [ - # f"Progress: {completed_count}/{todo_count} completed", - # ] - # if in_progress_count > 0: - # # Find the currently in-progress task name - # in_progress_task = next( - # ( - # t.get("content", "")[:40] - # for t in todos - # if isinstance(t, dict) - # and t.get("status") == "in_progress" - # ), - # None, - # ) - # if in_progress_task: - # completed_items.append( - # f"Current: {in_progress_task}..." - # ) - # else: - # completed_items = ["Plan updated"] - # yield streaming_service.format_thinking_step( - # step_id=original_step_id, - # title=last_active_step_title, - # status="completed", - # items=completed_items, - # ) - elif tool_name == "ls": - # Build completion items showing file names found - if isinstance(tool_output, dict): - result = tool_output.get("result", "") - elif isinstance(tool_output, str): - result = tool_output - else: - result = str(tool_output) if tool_output else "" - - # Parse file paths and extract just the file names - file_names = [] - if result: - # The ls tool returns paths, extract just the file/folder names - for line in result.strip().split("\n"): - line = line.strip() - if line: - # Get just the filename from the path - name = line.rstrip("/").split("/")[-1] - if name and len(name) <= 40: - file_names.append(name) - elif name: - file_names.append(name[:37] + "...") - - # Build display items - wrap file names in brackets for icon rendering - if file_names: - if len(file_names) <= 5: - # Wrap each file name in brackets for styled tile rendering - completed_items = [f"[{name}]" for name in file_names] - else: - # Show first few with brackets and count - completed_items = [f"[{name}]" for name in file_names[:4]] - completed_items.append(f"(+{len(file_names) - 4} more)") - else: - completed_items = ["No files found"] - - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title="Exploring files", - status="completed", - items=completed_items, - ) - else: - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title=f"Using {tool_name.replace('_', ' ')}", - status="completed", - items=last_active_step_items, - ) - - # Mark that we just finished a tool - "Synthesizing response" will be created - # when text actually starts flowing (not immediately) - just_finished_tool = True - # Clear the active step since the tool is done - last_active_step_id = None - last_active_step_title = "" - last_active_step_items = [] - - # Handle different tool outputs - if tool_name == "generate_podcast": - # Stream the full podcast result so frontend can render the audio player - yield streaming_service.format_tool_output_available( - tool_call_id, - tool_output - if isinstance(tool_output, dict) - else {"result": tool_output}, - ) - # Send appropriate terminal message based on status - if ( - isinstance(tool_output, dict) - and tool_output.get("status") == "success" - ): - yield streaming_service.format_terminal_info( - f"Podcast generated successfully: {tool_output.get('title', 'Podcast')}", - "success", - ) - else: - error_msg = ( - tool_output.get("error", "Unknown error") - if isinstance(tool_output, dict) - else "Unknown error" - ) - yield streaming_service.format_terminal_info( - f"Podcast generation failed: {error_msg}", - "error", - ) - elif tool_name == "link_preview": - # Stream the full link preview result so frontend can render the MediaCard - yield streaming_service.format_tool_output_available( - tool_call_id, - tool_output - if isinstance(tool_output, dict) - else {"result": tool_output}, - ) - # Send appropriate terminal message - if isinstance(tool_output, dict) and "error" not in tool_output: - title = tool_output.get("title", "Link") - yield streaming_service.format_terminal_info( - f"Link preview loaded: {title[:50]}{'...' if len(title) > 50 else ''}", - "success", - ) - else: - error_msg = ( - tool_output.get("error", "Failed to fetch") - if isinstance(tool_output, dict) - else "Failed to fetch" - ) - yield streaming_service.format_terminal_info( - f"Link preview failed: {error_msg}", - "error", - ) - elif tool_name == "display_image": - # Stream the full image result so frontend can render the Image component - yield streaming_service.format_tool_output_available( - tool_call_id, - tool_output - if isinstance(tool_output, dict) - else {"result": tool_output}, - ) - # Send terminal message - if isinstance(tool_output, dict): - title = tool_output.get("title") or tool_output.get( - "alt", "Image" - ) - yield streaming_service.format_terminal_info( - f"Image analyzed: {title[:40]}{'...' if len(title) > 40 else ''}", - "success", - ) - elif tool_name == "scrape_webpage": - # Stream the scrape result so frontend can render the Article component - # Note: We send metadata for display, but content goes to LLM for processing - if isinstance(tool_output, dict): - # Create a display-friendly output (without full content for the card) - display_output = { - k: v for k, v in tool_output.items() if k != "content" - } - # But keep a truncated content preview - if "content" in tool_output: - content = tool_output.get("content", "") - display_output["content_preview"] = ( - content[:500] + "..." if len(content) > 500 else content - ) - yield streaming_service.format_tool_output_available( - tool_call_id, - display_output, - ) - else: - yield streaming_service.format_tool_output_available( - tool_call_id, - {"result": tool_output}, - ) - # Send terminal message - if isinstance(tool_output, dict) and "error" not in tool_output: - title = tool_output.get("title", "Webpage") - word_count = tool_output.get("word_count", 0) - yield streaming_service.format_terminal_info( - f"Scraped: {title[:40]}{'...' if len(title) > 40 else ''} ({word_count:,} words)", - "success", - ) - else: - error_msg = ( - tool_output.get("error", "Failed to scrape") - if isinstance(tool_output, dict) - else "Failed to scrape" - ) - yield streaming_service.format_terminal_info( - f"Scrape failed: {error_msg}", - "error", - ) - elif tool_name == "search_knowledge_base": - # Don't stream the full output for search (can be very large), just acknowledge - yield streaming_service.format_tool_output_available( - tool_call_id, - {"status": "completed", "result_length": len(str(tool_output))}, - ) - yield streaming_service.format_terminal_info( - "Knowledge base search completed", "success" - ) - # elif tool_name == "write_todos": # Disabled for now - # # Stream the full write_todos result so frontend can render the Plan component - # yield streaming_service.format_tool_output_available( - # tool_call_id, - # tool_output - # if isinstance(tool_output, dict) - # else {"result": tool_output}, - # ) - # # Send terminal message with plan info - # if isinstance(tool_output, dict): - # todos = tool_output.get("todos", []) - # todo_count = len(todos) if isinstance(todos, list) else 0 - # yield streaming_service.format_terminal_info( - # f"Plan created ({todo_count} tasks)", - # "success", - # ) - # else: - # yield streaming_service.format_terminal_info( - # "Plan created", - # "success", - # ) - else: - # Default handling for other tools - yield streaming_service.format_tool_output_available( - tool_call_id, - {"status": "completed", "result_length": len(str(tool_output))}, - ) - yield streaming_service.format_terminal_info( - f"Tool {tool_name} completed", "success" - ) - - # Handle chain/agent end to close any open text blocks - elif event_type in ("on_chain_end", "on_agent_end"): - if current_text_id is not None: - yield streaming_service.format_text_end(current_text_id) - current_text_id = None - - # Ensure text block is closed - if current_text_id is not None: - yield streaming_service.format_text_end(current_text_id) - - # Mark the last active thinking step as completed using the same title - completion_event = complete_current_step() - if completion_event: - yield completion_event - - # Check if the graph was interrupted (human-in-the-loop) - state = await agent.aget_state(config) - is_interrupted = state.tasks and any(task.interrupts for task in state.tasks) - if is_interrupted: - interrupt_value = state.tasks[0].interrupts[0].value - yield streaming_service.format_interrupt_request(interrupt_value) + if stream_result.is_interrupted: yield streaming_service.format_finish_step() yield streaming_service.format_finish() yield streaming_service.format_done() return + accumulated_text = stream_result.accumulated_text + # Generate LLM title for new chats after first response # Check if this is the first assistant response by counting existing assistant messages from sqlalchemy import func @@ -1256,10 +1075,6 @@ async def stream_new_chat( print(f"[stream_new_chat] Exception type: {type(e).__name__}") print(f"[stream_new_chat] Traceback:\n{traceback.format_exc()}") - # Close any open text block - if current_text_id is not None: - yield streaming_service.format_text_end(current_text_id) - yield streaming_service.format_error(error_message) yield streaming_service.format_finish_step() yield streaming_service.format_finish() @@ -1280,7 +1095,6 @@ async def stream_resume_chat( thread_visibility: ChatVisibility | None = None, ) -> AsyncGenerator[str, None]: streaming_service = VercelStreamingService() - current_text_id: str | None = None try: if user_id: @@ -1353,521 +1167,21 @@ async def stream_resume_chat( yield streaming_service.format_message_start() yield streaming_service.format_start_step() - accumulated_text = "" - thinking_step_counter = 0 - tool_step_ids: dict[str, str] = {} - completed_step_ids: set[str] = set() - last_active_step_id: str | None = None - last_active_step_title = "" - last_active_step_items: list[str] = [] - just_finished_tool = False - - def next_thinking_step_id() -> str: - nonlocal thinking_step_counter - thinking_step_counter += 1 - return f"thinking-resume-{thinking_step_counter}" - - def complete_current_step() -> str | None: - nonlocal last_active_step_id - if last_active_step_id and last_active_step_id not in completed_step_ids: - completed_step_ids.add(last_active_step_id) - event = streaming_service.format_thinking_step( - step_id=last_active_step_id, - title=last_active_step_title, - status="completed", - items=last_active_step_items, - ) - last_active_step_id = None - return event - return None - - async for event in agent.astream_events( - Command(resume={"decisions": decisions}), config=config, version="v2" + stream_result = StreamResult() + async for sse in _stream_agent_events( + agent=agent, + config=config, + input_data=Command(resume={"decisions": decisions}), + streaming_service=streaming_service, + result=stream_result, + step_prefix="thinking-resume", ): - event_type = event.get("event", "") - - if event_type == "on_chat_model_stream": - chunk = event.get("data", {}).get("chunk") - if chunk and hasattr(chunk, "content"): - content = chunk.content - if content and isinstance(content, str): - if current_text_id is None: - completion_event = complete_current_step() - if completion_event: - yield completion_event - if just_finished_tool: - last_active_step_id = None - last_active_step_title = "" - last_active_step_items = [] - just_finished_tool = False - current_text_id = streaming_service.generate_text_id() - yield streaming_service.format_text_start(current_text_id) - yield streaming_service.format_text_delta( - current_text_id, content - ) - accumulated_text += content - - elif event_type == "on_tool_start": - tool_name = event.get("name", "unknown_tool") - run_id = event.get("run_id", "") - tool_input = event.get("data", {}).get("input", {}) - - if current_text_id is not None: - yield streaming_service.format_text_end(current_text_id) - current_text_id = None - - if last_active_step_title != "Synthesizing response": - completion_event = complete_current_step() - if completion_event: - yield completion_event - - just_finished_tool = False - tool_step_id = next_thinking_step_id() - tool_step_ids[run_id] = tool_step_id - last_active_step_id = tool_step_id - - if tool_name == "search_knowledge_base": - query = ( - tool_input.get("query", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - last_active_step_title = "Searching knowledge base" - last_active_step_items = [ - f"Query: {query[:100]}{'...' if len(query) > 100 else ''}" - ] - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title="Searching knowledge base", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "link_preview": - url = ( - tool_input.get("url", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - last_active_step_title = "Fetching link preview" - last_active_step_items = [ - f"URL: {url[:80]}{'...' if len(url) > 80 else ''}" - ] - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title="Fetching link preview", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "display_image": - src = ( - tool_input.get("src", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - title = ( - tool_input.get("title", "") - if isinstance(tool_input, dict) - else "" - ) - last_active_step_title = "Analyzing the image" - last_active_step_items = [ - f"Analyzing: {title[:50] if title else src[:50]}{'...' if len(title or src) > 50 else ''}" - ] - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title="Analyzing the image", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "scrape_webpage": - url = ( - tool_input.get("url", "") - if isinstance(tool_input, dict) - else str(tool_input) - ) - last_active_step_title = "Scraping webpage" - last_active_step_items = [ - f"URL: {url[:80]}{'...' if len(url) > 80 else ''}" - ] - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title="Scraping webpage", - status="in_progress", - items=last_active_step_items, - ) - elif tool_name == "generate_podcast": - podcast_title = ( - tool_input.get("podcast_title", "SurfSense Podcast") - if isinstance(tool_input, dict) - else "SurfSense Podcast" - ) - content_len = len( - tool_input.get("source_content", "") - if isinstance(tool_input, dict) - else "" - ) - last_active_step_title = "Generating podcast" - last_active_step_items = [ - f"Title: {podcast_title}", - f"Content: {content_len:,} characters", - "Preparing audio generation...", - ] - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title="Generating podcast", - status="in_progress", - items=last_active_step_items, - ) - else: - last_active_step_title = f"Using {tool_name.replace('_', ' ')}" - last_active_step_items = [] - yield streaming_service.format_thinking_step( - step_id=tool_step_id, - title=last_active_step_title, - status="in_progress", - ) - - tool_call_id = ( - f"call_{run_id[:32]}" - if run_id - else streaming_service.generate_tool_call_id() - ) - yield streaming_service.format_tool_input_start(tool_call_id, tool_name) - yield streaming_service.format_tool_input_available( - tool_call_id, - tool_name, - tool_input - if isinstance(tool_input, dict) - else {"input": tool_input}, - ) - - elif event_type == "on_tool_end": - run_id = event.get("run_id", "") - tool_name = event.get("name", "unknown_tool") - raw_output = event.get("data", {}).get("output", "") - - if hasattr(raw_output, "content"): - content = raw_output.content - if isinstance(content, str): - try: - tool_output = json.loads(content) - except (json.JSONDecodeError, TypeError): - tool_output = {"result": content} - elif isinstance(content, dict): - tool_output = content - else: - tool_output = {"result": str(content)} - elif isinstance(raw_output, dict): - tool_output = raw_output - else: - tool_output = { - "result": str(raw_output) if raw_output else "completed" - } - - tool_call_id = f"call_{run_id[:32]}" if run_id else "call_unknown" - original_step_id = tool_step_ids.get( - run_id, f"thinking-unknown-{run_id[:8]}" - ) - completed_step_ids.add(original_step_id) - - if tool_name == "search_knowledge_base": - result_info = "Search completed" - if isinstance(tool_output, dict): - result_len = tool_output.get("result_length", 0) - if result_len > 0: - result_info = ( - f"Found relevant information ({result_len} chars)" - ) - completed_items = [*last_active_step_items, result_info] - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title="Searching knowledge base", - status="completed", - items=completed_items, - ) - elif tool_name == "link_preview": - if isinstance(tool_output, dict): - title = tool_output.get("title", "Link") - domain = tool_output.get("domain", "") - has_error = "error" in tool_output - if has_error: - completed_items = [ - *last_active_step_items, - f"Error: {tool_output.get('error', 'Failed to fetch')}", - ] - else: - completed_items = [ - *last_active_step_items, - f"Title: {title[:60]}{'...' if len(title) > 60 else ''}", - f"Domain: {domain}" if domain else "Preview loaded", - ] - else: - completed_items = [*last_active_step_items, "Preview loaded"] - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title="Fetching link preview", - status="completed", - items=completed_items, - ) - elif tool_name == "display_image": - if isinstance(tool_output, dict): - title = tool_output.get("title", "") - alt = tool_output.get("alt", "Image") - display_name = title or alt - completed_items = [ - *last_active_step_items, - f"Analyzed: {display_name[:50]}{'...' if len(display_name) > 50 else ''}", - ] - else: - completed_items = [*last_active_step_items, "Image analyzed"] - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title="Analyzing the image", - status="completed", - items=completed_items, - ) - elif tool_name == "scrape_webpage": - if isinstance(tool_output, dict): - title = tool_output.get("title", "Webpage") - word_count = tool_output.get("word_count", 0) - has_error = "error" in tool_output - if has_error: - completed_items = [ - *last_active_step_items, - f"Error: {tool_output.get('error', 'Failed to scrape')[:50]}", - ] - else: - completed_items = [ - *last_active_step_items, - f"Title: {title[:50]}{'...' if len(title) > 50 else ''}", - f"Extracted: {word_count:,} words", - ] - else: - completed_items = [*last_active_step_items, "Content extracted"] - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title="Scraping webpage", - status="completed", - items=completed_items, - ) - elif tool_name == "generate_podcast": - podcast_status = ( - tool_output.get("status", "unknown") - if isinstance(tool_output, dict) - else "unknown" - ) - podcast_title = ( - tool_output.get("title", "Podcast") - if isinstance(tool_output, dict) - else "Podcast" - ) - if podcast_status == "processing": - completed_items = [ - f"Title: {podcast_title}", - "Audio generation started", - "Processing in background...", - ] - elif podcast_status == "already_generating": - completed_items = [ - f"Title: {podcast_title}", - "Podcast already in progress", - "Please wait for it to complete", - ] - elif podcast_status == "error": - error_msg = ( - tool_output.get("error", "Unknown error") - if isinstance(tool_output, dict) - else "Unknown error" - ) - completed_items = [ - f"Title: {podcast_title}", - f"Error: {error_msg[:50]}", - ] - else: - completed_items = last_active_step_items - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title="Generating podcast", - status="completed", - items=completed_items, - ) - elif tool_name == "ls": - if isinstance(tool_output, dict): - result = tool_output.get("result", "") - elif isinstance(tool_output, str): - result = tool_output - else: - result = str(tool_output) if tool_output else "" - file_names = [] - if result: - for line in result.strip().split("\n"): - line = line.strip() - if line: - name = line.rstrip("/").split("/")[-1] - if name and len(name) <= 40: - file_names.append(name) - elif name: - file_names.append(name[:37] + "...") - if file_names: - if len(file_names) <= 5: - completed_items = [f"[{name}]" for name in file_names] - else: - completed_items = [f"[{name}]" for name in file_names[:4]] - completed_items.append(f"(+{len(file_names) - 4} more)") - else: - completed_items = ["No files found"] - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title="Exploring files", - status="completed", - items=completed_items, - ) - else: - yield streaming_service.format_thinking_step( - step_id=original_step_id, - title=f"Using {tool_name.replace('_', ' ')}", - status="completed", - items=last_active_step_items, - ) - - just_finished_tool = True - last_active_step_id = None - last_active_step_title = "" - last_active_step_items = [] - - if tool_name == "generate_podcast": - yield streaming_service.format_tool_output_available( - tool_call_id, - tool_output - if isinstance(tool_output, dict) - else {"result": tool_output}, - ) - if ( - isinstance(tool_output, dict) - and tool_output.get("status") == "success" - ): - yield streaming_service.format_terminal_info( - f"Podcast generated successfully: {tool_output.get('title', 'Podcast')}", - "success", - ) - else: - error_msg = ( - tool_output.get("error", "Unknown error") - if isinstance(tool_output, dict) - else "Unknown error" - ) - yield streaming_service.format_terminal_info( - f"Podcast generation failed: {error_msg}", - "error", - ) - elif tool_name == "link_preview": - yield streaming_service.format_tool_output_available( - tool_call_id, - tool_output - if isinstance(tool_output, dict) - else {"result": tool_output}, - ) - if isinstance(tool_output, dict) and "error" not in tool_output: - title = tool_output.get("title", "Link") - yield streaming_service.format_terminal_info( - f"Link preview loaded: {title[:50]}{'...' if len(title) > 50 else ''}", - "success", - ) - else: - error_msg = ( - tool_output.get("error", "Failed to fetch") - if isinstance(tool_output, dict) - else "Failed to fetch" - ) - yield streaming_service.format_terminal_info( - f"Link preview failed: {error_msg}", - "error", - ) - elif tool_name == "display_image": - yield streaming_service.format_tool_output_available( - tool_call_id, - tool_output - if isinstance(tool_output, dict) - else {"result": tool_output}, - ) - if isinstance(tool_output, dict): - title = tool_output.get("title") or tool_output.get( - "alt", "Image" - ) - yield streaming_service.format_terminal_info( - f"Image analyzed: {title[:40]}{'...' if len(title) > 40 else ''}", - "success", - ) - elif tool_name == "scrape_webpage": - if isinstance(tool_output, dict): - display_output = { - k: v for k, v in tool_output.items() if k != "content" - } - if "content" in tool_output: - content = tool_output.get("content", "") - display_output["content_preview"] = ( - content[:500] + "..." if len(content) > 500 else content - ) - yield streaming_service.format_tool_output_available( - tool_call_id, - display_output, - ) - else: - yield streaming_service.format_tool_output_available( - tool_call_id, - {"result": tool_output}, - ) - if isinstance(tool_output, dict) and "error" not in tool_output: - title = tool_output.get("title", "Webpage") - word_count = tool_output.get("word_count", 0) - yield streaming_service.format_terminal_info( - f"Scraped: {title[:40]}{'...' if len(title) > 40 else ''} ({word_count:,} words)", - "success", - ) - else: - error_msg = ( - tool_output.get("error", "Failed to scrape") - if isinstance(tool_output, dict) - else "Failed to scrape" - ) - yield streaming_service.format_terminal_info( - f"Scrape failed: {error_msg}", - "error", - ) - elif tool_name == "search_knowledge_base": - yield streaming_service.format_tool_output_available( - tool_call_id, - {"status": "completed", "result_length": len(str(tool_output))}, - ) - yield streaming_service.format_terminal_info( - "Knowledge base search completed", "success" - ) - else: - yield streaming_service.format_tool_output_available( - tool_call_id, - {"status": "completed", "result_length": len(str(tool_output))}, - ) - yield streaming_service.format_terminal_info( - f"Tool {tool_name} completed", "success" - ) - - elif event_type in ("on_chain_end", "on_agent_end"): - if current_text_id is not None: - yield streaming_service.format_text_end(current_text_id) - current_text_id = None - - if current_text_id is not None: - yield streaming_service.format_text_end(current_text_id) - - completion_event = complete_current_step() - if completion_event: - yield completion_event - - state = await agent.aget_state(config) - is_interrupted = state.tasks and any(task.interrupts for task in state.tasks) - if is_interrupted: - interrupt_value = state.tasks[0].interrupts[0].value - yield streaming_service.format_interrupt_request(interrupt_value) + yield sse + if stream_result.is_interrupted: + yield streaming_service.format_finish_step() + yield streaming_service.format_finish() + yield streaming_service.format_done() + return yield streaming_service.format_finish_step() yield streaming_service.format_finish() @@ -1879,8 +1193,6 @@ async def stream_resume_chat( error_message = f"Error during resume: {e!s}" print(f"[stream_resume_chat] {error_message}") print(f"[stream_resume_chat] Traceback:\n{traceback.format_exc()}") - if current_text_id is not None: - yield streaming_service.format_text_end(current_text_id) yield streaming_service.format_error(error_message) yield streaming_service.format_finish_step() yield streaming_service.format_finish()