Merge remote-tracking branch 'upstream/dev' into pr-611

This commit is contained in:
Anish Sarkar 2025-12-23 15:45:28 +05:30
commit 6f330e7b8d
92 changed files with 5331 additions and 6029 deletions

View file

@ -3,10 +3,15 @@ Streaming task for the new SurfSense deep agent chat.
This module streams responses from the deep agent using the Vercel AI SDK
Data Stream Protocol (SSE format).
Supports loading LLM configurations from:
- YAML files (negative IDs for global configs)
- NewLLMConfig database table (positive IDs for user-created configs with prompt settings)
"""
import json
from collections.abc import AsyncGenerator
from langchain_core.messages import HumanMessage
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
@ -14,11 +19,14 @@ from sqlalchemy.future import select
from app.agents.new_chat.chat_deepagent import create_surfsense_deep_agent
from app.agents.new_chat.checkpointer import get_checkpointer
from app.agents.new_chat.llm_config import (
AgentConfig,
create_chat_litellm_from_agent_config,
create_chat_litellm_from_config,
load_agent_config,
load_llm_config_from_yaml,
)
from app.db import Document
from app.schemas.new_chat import ChatAttachment, ChatMessage
from app.schemas.new_chat import ChatAttachment
from app.services.connector_service import ConnectorService
from app.services.new_streaming_service import VercelStreamingService
@ -67,7 +75,6 @@ async def stream_new_chat(
chat_id: int,
session: AsyncSession,
llm_config_id: int = -1,
messages: list[ChatMessage] | None = None,
attachments: list[ChatAttachment] | None = None,
mentioned_document_ids: list[int] | None = None,
) -> AsyncGenerator[str, None]:
@ -97,17 +104,40 @@ async def stream_new_chat(
current_text_id: str | None = None
try:
# Load LLM config
llm_config = load_llm_config_from_yaml(llm_config_id=llm_config_id)
if not llm_config:
yield streaming_service.format_error(
f"Failed to load LLM config with id {llm_config_id}"
)
yield streaming_service.format_done()
return
# Load LLM config - supports both YAML (negative IDs) and database (positive IDs)
agent_config: AgentConfig | None = None
if llm_config_id >= 0:
# Positive ID: Load from NewLLMConfig database table
agent_config = await load_agent_config(
session=session,
config_id=llm_config_id,
search_space_id=search_space_id,
)
if not agent_config:
yield streaming_service.format_error(
f"Failed to load NewLLMConfig with id {llm_config_id}"
)
yield streaming_service.format_done()
return
# Create ChatLiteLLM from AgentConfig
llm = create_chat_litellm_from_agent_config(agent_config)
else:
# Negative ID: Load from YAML (global configs)
llm_config = load_llm_config_from_yaml(llm_config_id=llm_config_id)
if not llm_config:
yield streaming_service.format_error(
f"Failed to load LLM config with id {llm_config_id}"
)
yield streaming_service.format_done()
return
# Create ChatLiteLLM from YAML config dict
llm = create_chat_litellm_from_config(llm_config)
# Create AgentConfig from YAML for consistency (uses defaults for prompt settings)
agent_config = AgentConfig.from_yaml_config(llm_config)
# Create ChatLiteLLM instance
llm = create_chat_litellm_from_config(llm_config)
if not llm:
yield streaming_service.format_error("Failed to create LLM instance")
yield streaming_service.format_done()
@ -119,13 +149,14 @@ async def stream_new_chat(
# Get the PostgreSQL checkpointer for persistent conversation memory
checkpointer = await get_checkpointer()
# Create the deep agent with checkpointer with podcast capability
# Create the deep agent with checkpointer and configurable prompts
agent = create_surfsense_deep_agent(
llm=llm,
search_space_id=search_space_id,
db_session=session,
connector_service=connector_service,
checkpointer=checkpointer,
agent_config=agent_config, # Pass prompt configuration
)
# Build input with message history from frontend
@ -223,7 +254,9 @@ async def stream_new_chat(
analyze_step_id = next_thinking_step_id()
last_active_step_id = analyze_step_id
last_active_step_title = "Understanding your request"
last_active_step_items = [f"Processing: {user_query[:80]}{'...' if len(user_query) > 80 else ''}"]
last_active_step_items = [
f"Processing: {user_query[:80]}{'...' if len(user_query) > 80 else ''}"
]
yield streaming_service.format_thinking_step(
step_id=analyze_step_id,
title="Understanding your request",
@ -298,7 +331,9 @@ async def stream_new_chat(
else str(tool_input)
)
last_active_step_title = "Searching knowledge base"
last_active_step_items = [f"Query: {query[:100]}{'...' if len(query) > 100 else ''}"]
last_active_step_items = [
f"Query: {query[:100]}{'...' if len(query) > 100 else ''}"
]
yield streaming_service.format_thinking_step(
step_id=tool_step_id,
title="Searching knowledge base",
@ -312,7 +347,9 @@ async def stream_new_chat(
else str(tool_input)
)
last_active_step_title = "Fetching link preview"
last_active_step_items = [f"URL: {url[:80]}{'...' if len(url) > 80 else ''}"]
last_active_step_items = [
f"URL: {url[:80]}{'...' if len(url) > 80 else ''}"
]
yield streaming_service.format_thinking_step(
step_id=tool_step_id,
title="Fetching link preview",
@ -347,7 +384,9 @@ async def stream_new_chat(
else str(tool_input)
)
last_active_step_title = "Scraping webpage"
last_active_step_items = [f"URL: {url[:80]}{'...' if len(url) > 80 else ''}"]
last_active_step_items = [
f"URL: {url[:80]}{'...' if len(url) > 80 else ''}"
]
yield streaming_service.format_thinking_step(
step_id=tool_step_id,
title="Scraping webpage",
@ -484,7 +523,9 @@ async def stream_new_chat(
tool_call_id = f"call_{run_id[:32]}" if run_id else "call_unknown"
# Get the original tool step ID to update it (not create a new one)
original_step_id = tool_step_ids.get(run_id, f"thinking-unknown-{run_id[:8]}")
original_step_id = tool_step_ids.get(
run_id, f"thinking-unknown-{run_id[:8]}"
)
# Mark the tool thinking step as completed using the SAME step ID
# Also add to completed set so we don't try to complete it again
@ -495,7 +536,9 @@ async def stream_new_chat(
if isinstance(tool_output, dict):
result_len = tool_output.get("result_length", 0)
if result_len > 0:
result_info = f"Found relevant information ({result_len} chars)"
result_info = (
f"Found relevant information ({result_len} chars)"
)
# Include original query in completed items
completed_items = [*last_active_step_items, result_info]
yield streaming_service.format_thinking_step(
@ -584,7 +627,7 @@ async def stream_new_chat(
if isinstance(tool_output, dict)
else "Podcast"
)
if podcast_status == "processing":
completed_items = [
f"Title: {podcast_title}",
@ -609,7 +652,7 @@ async def stream_new_chat(
]
else:
completed_items = last_active_step_items
yield streaming_service.format_thinking_step(
step_id=original_step_id,
title="Generating podcast",
@ -695,7 +738,9 @@ async def stream_new_chat(
)
# Send terminal message
if isinstance(tool_output, dict):
title = tool_output.get("title") or tool_output.get("alt", "Image")
title = tool_output.get("title") or tool_output.get(
"alt", "Image"
)
yield streaming_service.format_terminal_info(
f"Image displayed: {title[:40]}{'...' if len(title) > 40 else ''}",
"success",