feat: enhance chat functionality with chain-of-thought display and thinking steps management

This commit is contained in:
Anish Sarkar 2025-12-22 22:54:22 +05:30
parent 2f622891ae
commit 8a99752f2f
8 changed files with 857 additions and 48 deletions

View file

@ -154,6 +154,49 @@ async def stream_new_chat(
# Reset text tracking for this stream
accumulated_text = ""
# Track thinking steps for chain-of-thought display
thinking_step_counter = 0
# Map run_id -> step_id for tool calls so we can update them on completion
tool_step_ids: dict[str, str] = {}
# Track the last active step so we can mark it complete at the end
last_active_step_id: str | None = None
last_active_step_title: str = ""
last_active_step_items: list[str] = []
# Track which steps have been completed to avoid duplicate completions
completed_step_ids: set[str] = set()
# Track if we just finished a tool (text flows silently after tools)
just_finished_tool: bool = False
def next_thinking_step_id() -> str:
nonlocal thinking_step_counter
thinking_step_counter += 1
return f"thinking-{thinking_step_counter}"
def complete_current_step() -> str | None:
"""Complete the current active step and return the completion event, if any."""
nonlocal last_active_step_id, last_active_step_title, last_active_step_items
if last_active_step_id and last_active_step_id not in completed_step_ids:
completed_step_ids.add(last_active_step_id)
return streaming_service.format_thinking_step(
step_id=last_active_step_id,
title=last_active_step_title,
status="completed",
items=last_active_step_items if last_active_step_items else None,
)
return None
# Initial thinking step - analyzing the request
analyze_step_id = next_thinking_step_id()
last_active_step_id = analyze_step_id
last_active_step_title = "Understanding your request"
last_active_step_items = [f"Processing: {user_query[:80]}{'...' if len(user_query) > 80 else ''}"]
yield streaming_service.format_thinking_step(
step_id=analyze_step_id,
title="Understanding your request",
status="in_progress",
items=last_active_step_items,
)
# Stream the agent response with thread config for memory
async for event in agent.astream_events(
input_state, config=config, version="v2"
@ -168,6 +211,31 @@ async def stream_new_chat(
if content and isinstance(content, str):
# Start a new text block if needed
if current_text_id is None:
# Complete any previous step
completion_event = complete_current_step()
if completion_event:
yield completion_event
if just_finished_tool:
# We just finished a tool - don't create a step here,
# text will flow silently after tools.
# Clear the active step tracking.
last_active_step_id = None
last_active_step_title = ""
last_active_step_items = []
just_finished_tool = False
else:
# Normal text generation (not after a tool)
gen_step_id = next_thinking_step_id()
last_active_step_id = gen_step_id
last_active_step_title = "Generating response"
last_active_step_items = []
yield streaming_service.format_thinking_step(
step_id=gen_step_id,
title="Generating response",
status="in_progress",
)
current_text_id = streaming_service.generate_text_id()
yield streaming_service.format_text_start(current_text_id)
@ -188,6 +256,67 @@ async def stream_new_chat(
yield streaming_service.format_text_end(current_text_id)
current_text_id = None
# Complete any previous step EXCEPT "Synthesizing response"
# (we want to reuse the Synthesizing step after tools complete)
if last_active_step_title != "Synthesizing response":
completion_event = complete_current_step()
if completion_event:
yield completion_event
# Reset the just_finished_tool flag since we're starting a new tool
just_finished_tool = False
# Create thinking step for the tool call and store it for later update
tool_step_id = next_thinking_step_id()
tool_step_ids[run_id] = tool_step_id
last_active_step_id = tool_step_id
if tool_name == "search_knowledge_base":
query = (
tool_input.get("query", "")
if isinstance(tool_input, dict)
else str(tool_input)
)
last_active_step_title = "Searching knowledge base"
last_active_step_items = [f"Query: {query[:100]}{'...' if len(query) > 100 else ''}"]
yield streaming_service.format_thinking_step(
step_id=tool_step_id,
title="Searching knowledge base",
status="in_progress",
items=last_active_step_items,
)
elif tool_name == "generate_podcast":
podcast_title = (
tool_input.get("podcast_title", "SurfSense Podcast")
if isinstance(tool_input, dict)
else "SurfSense Podcast"
)
# Get content length for context
content_len = len(
tool_input.get("source_content", "")
if isinstance(tool_input, dict)
else ""
)
last_active_step_title = "Generating podcast"
last_active_step_items = [
f"Title: {podcast_title}",
f"Content: {content_len:,} characters",
"Preparing audio generation...",
]
yield streaming_service.format_thinking_step(
step_id=tool_step_id,
title="Generating podcast",
status="in_progress",
items=last_active_step_items,
)
else:
last_active_step_title = f"Using {tool_name.replace('_', ' ')}"
last_active_step_items = []
yield streaming_service.format_thinking_step(
step_id=tool_step_id,
title=last_active_step_title,
status="in_progress",
)
# Stream tool info
tool_call_id = (
f"call_{run_id[:32]}"
@ -254,6 +383,87 @@ async def stream_new_chat(
tool_call_id = f"call_{run_id[:32]}" if run_id else "call_unknown"
# Get the original tool step ID to update it (not create a new one)
original_step_id = tool_step_ids.get(run_id, f"thinking-unknown-{run_id[:8]}")
# Mark the tool thinking step as completed using the SAME step ID
# Also add to completed set so we don't try to complete it again
completed_step_ids.add(original_step_id)
if tool_name == "search_knowledge_base":
# Get result count if available
result_info = "Search completed"
if isinstance(tool_output, dict):
result_len = tool_output.get("result_length", 0)
if result_len > 0:
result_info = f"Found relevant information ({result_len} chars)"
# Include original query in completed items
completed_items = [*last_active_step_items, result_info]
yield streaming_service.format_thinking_step(
step_id=original_step_id,
title="Searching knowledge base",
status="completed",
items=completed_items,
)
elif tool_name == "generate_podcast":
# Build detailed completion items based on podcast status
podcast_status = (
tool_output.get("status", "unknown")
if isinstance(tool_output, dict)
else "unknown"
)
podcast_title = (
tool_output.get("title", "Podcast")
if isinstance(tool_output, dict)
else "Podcast"
)
if podcast_status == "processing":
completed_items = [
f"Title: {podcast_title}",
"Audio generation started",
"Processing in background...",
]
elif podcast_status == "already_generating":
completed_items = [
f"Title: {podcast_title}",
"Podcast already in progress",
"Please wait for it to complete",
]
elif podcast_status == "error":
error_msg = (
tool_output.get("error", "Unknown error")
if isinstance(tool_output, dict)
else "Unknown error"
)
completed_items = [
f"Title: {podcast_title}",
f"Error: {error_msg[:50]}",
]
else:
completed_items = last_active_step_items
yield streaming_service.format_thinking_step(
step_id=original_step_id,
title="Generating podcast",
status="completed",
items=completed_items,
)
else:
yield streaming_service.format_thinking_step(
step_id=original_step_id,
title=f"Using {tool_name.replace('_', ' ')}",
status="completed",
items=last_active_step_items,
)
# Mark that we just finished a tool - "Synthesizing response" will be created
# when text actually starts flowing (not immediately)
just_finished_tool = True
# Clear the active step since the tool is done
last_active_step_id = None
last_active_step_title = ""
last_active_step_items = []
# Handle different tool outputs
if tool_name == "generate_podcast":
# Stream the full podcast result so frontend can render the audio player
@ -302,6 +512,11 @@ async def stream_new_chat(
if current_text_id is not None:
yield streaming_service.format_text_end(current_text_id)
# Mark the last active thinking step as completed using the same title
completion_event = complete_current_step()
if completion_event:
yield completion_event
# Finish the step and message
yield streaming_service.format_finish_step()
yield streaming_service.format_finish()