fix: implement real-time streaming for responses

- Added streaming service support to the Q&A agent for real-time token streaming.
- Updated `answer_question` method to stream responses token-by-token to the frontend.
- Modified `handle_qna_workflow` to handle both custom and values streaming modes.
- Enhanced state management to include streaming service for improved user experience.
This commit is contained in:
DESKTOP-RTLN3BA\$punk 2025-12-05 00:14:36 -08:00
parent 264532b3cf
commit c97887a63d
5 changed files with 64 additions and 41 deletions

View file

@ -3,6 +3,7 @@ from typing import Any
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.runnables import RunnableConfig
from langgraph.types import StreamWriter
from sqlalchemy import select
from app.db import SearchSpace
@ -129,9 +130,11 @@ async def rerank_documents(state: State, config: RunnableConfig) -> dict[str, An
return {"reranked_documents": documents}
async def answer_question(state: State, config: RunnableConfig) -> dict[str, Any]:
async def answer_question(
state: State, config: RunnableConfig, writer: StreamWriter
) -> dict[str, Any]:
"""
Answer the user's question using the provided documents.
Answer the user's question using the provided documents with real-time streaming.
This node takes the relevant documents provided in the configuration and uses
an LLM to generate a comprehensive answer to the user's question with
@ -139,6 +142,8 @@ async def answer_question(state: State, config: RunnableConfig) -> dict[str, Any
documents. If no documents are provided, it will use chat history to generate
an answer.
The response is streamed token-by-token for real-time updates to the frontend.
Returns:
Dict containing the final answer in the "final_answer" key.
"""
@ -151,6 +156,9 @@ async def answer_question(state: State, config: RunnableConfig) -> dict[str, Any
search_space_id = configuration.search_space_id
language = configuration.language
# Get streaming service from state
streaming_service = state.streaming_service
# Fetch search space to get QnA configuration
result = await state.db_session.execute(
select(SearchSpace).where(SearchSpace.id == search_space_id)
@ -279,8 +287,17 @@ async def answer_question(state: State, config: RunnableConfig) -> dict[str, Any
total_tokens = calculate_token_count(messages_with_chat_history, llm.model)
print(f"Final token count: {total_tokens}")
# Call the LLM and get the response
response = await llm.ainvoke(messages_with_chat_history)
final_answer = response.content
# Stream the LLM response token by token
final_answer = ""
async for chunk in llm.astream(messages_with_chat_history):
# Extract the content from the chunk
if hasattr(chunk, "content") and chunk.content:
token = chunk.content
final_answer += token
# Stream the token to the frontend via custom stream
if streaming_service:
writer({"yield_value": streaming_service.format_text_chunk(token)})
return {"final_answer": final_answer}