2025-04-20 19:19:35 -07:00
import asyncio
import json
2025-04-19 23:25:06 -07:00
from typing import Any , Dict , List
2025-04-20 19:19:35 -07:00
2025-04-19 23:25:06 -07:00
from app . config import config as app_config
2025-04-20 19:19:35 -07:00
from app . db import async_session_maker
from app . utils . connector_service import ConnectorService
2025-04-19 23:25:06 -07:00
from langchain_core . messages import HumanMessage , SystemMessage
2025-04-20 19:19:35 -07:00
from langchain_core . runnables import RunnableConfig
2025-04-19 23:25:06 -07:00
from pydantic import BaseModel , Field
from sqlalchemy . ext . asyncio import AsyncSession
2025-04-20 19:19:35 -07:00
from . configuration import Configuration
from . prompts import get_answer_outline_system_prompt
from . state import State
from . sub_section_writer . graph import graph as sub_section_writer_graph
from langgraph . types import StreamWriter
2025-04-19 23:25:06 -07:00
class Section ( BaseModel ) :
""" A section in the answer outline. """
section_id : int = Field ( . . . , description = " The zero-based index of the section " )
section_title : str = Field ( . . . , description = " The title of the section " )
questions : List [ str ] = Field ( . . . , description = " Questions to research for this section " )
class AnswerOutline ( BaseModel ) :
""" The complete answer outline with all sections. """
answer_outline : List [ Section ] = Field ( . . . , description = " List of sections in the answer outline " )
2025-04-20 19:19:35 -07:00
async def write_answer_outline ( state : State , config : RunnableConfig , writer : StreamWriter ) - > Dict [ str , Any ] :
2025-04-19 23:25:06 -07:00
"""
Create a structured answer outline based on the user query .
This node takes the user query and number of sections from the configuration and uses
an LLM to generate a comprehensive outline with logical sections and research questions
for each section .
Returns :
Dict containing the answer outline in the " answer_outline " key for state update .
"""
2025-04-20 19:19:35 -07:00
streaming_service = state . streaming_service
2025-04-19 23:25:06 -07:00
2025-04-20 19:19:35 -07:00
streaming_service . only_update_terminal ( " Generating answer outline... " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
# Get configuration from runnable config
configuration = Configuration . from_runnable_config ( config )
user_query = configuration . user_query
num_sections = configuration . num_sections
2025-04-20 19:19:35 -07:00
streaming_service . only_update_terminal ( f " Planning research approach for query: { user_query [ : 100 ] } ... " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
# Initialize LLM
llm = app_config . strategic_llm_instance
# Create the human message content
human_message_content = f """
Now Please create an answer outline for the following query :
User Query : { user_query }
Number of Sections : { num_sections }
Remember to format your response as valid JSON exactly matching this structure :
{ {
" answer_outline " : [
{ {
" section_id " : 0 ,
" section_title " : " Section Title " ,
" questions " : [
" Question 1 to research for this section " ,
" Question 2 to research for this section "
]
} }
]
} }
Your output MUST be valid JSON in exactly this format . Do not include any other text or explanation .
"""
2025-04-20 19:19:35 -07:00
streaming_service . only_update_terminal ( " Designing structured outline with AI... " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
# Create messages for the LLM
messages = [
2025-04-19 23:44:42 -07:00
SystemMessage ( content = get_answer_outline_system_prompt ( ) ) ,
2025-04-19 23:25:06 -07:00
HumanMessage ( content = human_message_content )
]
# Call the LLM directly without using structured output
2025-04-20 19:19:35 -07:00
streaming_service . only_update_terminal ( " Processing answer structure... " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
response = await llm . ainvoke ( messages )
# Parse the JSON response manually
try :
# Extract JSON content from the response
content = response . content
# Find the JSON in the content (handle case where LLM might add additional text)
json_start = content . find ( ' { ' )
json_end = content . rfind ( ' } ' ) + 1
if json_start > = 0 and json_end > json_start :
json_str = content [ json_start : json_end ]
# Parse the JSON string
parsed_data = json . loads ( json_str )
# Convert to Pydantic model
answer_outline = AnswerOutline ( * * parsed_data )
2025-04-20 19:19:35 -07:00
total_questions = sum ( len ( section . questions ) for section in answer_outline . answer_outline )
streaming_service . only_update_terminal ( f " Successfully generated outline with { len ( answer_outline . answer_outline ) } sections and { total_questions } research questions " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
print ( f " Successfully generated answer outline with { len ( answer_outline . answer_outline ) } sections " )
# Return state update
return { " answer_outline " : answer_outline }
else :
# If JSON structure not found, raise a clear error
2025-04-20 19:19:35 -07:00
error_message = f " Could not find valid JSON in LLM response. Raw response: { content } "
streaming_service . only_update_terminal ( error_message , " error " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
raise ValueError ( error_message )
2025-04-19 23:25:06 -07:00
except ( json . JSONDecodeError , ValueError ) as e :
# Log the error and re-raise it
2025-04-20 19:19:35 -07:00
error_message = f " Error parsing LLM response: { str ( e ) } "
streaming_service . only_update_terminal ( error_message , " error " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
print ( f " Error parsing LLM response: { str ( e ) } " )
print ( f " Raw response: { response . content } " )
raise
async def fetch_relevant_documents (
research_questions : List [ str ] ,
user_id : str ,
search_space_id : int ,
db_session : AsyncSession ,
connectors_to_search : List [ str ] ,
2025-04-20 19:19:35 -07:00
writer : StreamWriter = None ,
state : State = None ,
top_k : int = 20
2025-04-19 23:25:06 -07:00
) - > List [ Dict [ str , Any ] ] :
"""
Fetch relevant documents for research questions using the provided connectors .
Args :
research_questions : List of research questions to find documents for
user_id : The user ID
search_space_id : The search space ID
db_session : The database session
connectors_to_search : List of connectors to search
2025-04-20 19:19:35 -07:00
writer : StreamWriter for sending progress updates
state : The current state containing the streaming service
2025-04-19 23:25:06 -07:00
top_k : Number of top results to retrieve per connector per question
Returns :
List of relevant documents
"""
# Initialize services
connector_service = ConnectorService ( db_session )
2025-04-20 19:19:35 -07:00
# Only use streaming if both writer and state are provided
streaming_service = state . streaming_service if state is not None else None
# Stream initial status update
if streaming_service and writer :
streaming_service . only_update_terminal ( f " Starting research on { len ( research_questions ) } questions using { len ( connectors_to_search ) } connectors... " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
all_raw_documents = [ ] # Store all raw documents
all_sources = [ ] # Store all sources
for i , user_query in enumerate ( research_questions ) :
# Stream question being researched
if streaming_service and writer :
streaming_service . only_update_terminal ( f " Researching question { i + 1 } / { len ( research_questions ) } : { user_query [ : 100 ] } ... " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
# Use original research question as the query
reformulated_query = user_query
# Process each selected connector
for connector in connectors_to_search :
2025-04-20 19:19:35 -07:00
# Stream connector being searched
if streaming_service and writer :
streaming_service . only_update_terminal ( f " Searching { connector } for relevant information... " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
try :
if connector == " YOUTUBE_VIDEO " :
2025-04-20 19:19:35 -07:00
source_object , youtube_chunks = await connector_service . search_youtube (
2025-04-19 23:25:06 -07:00
user_query = reformulated_query ,
user_id = user_id ,
search_space_id = search_space_id ,
top_k = top_k
)
2025-04-20 19:19:35 -07:00
# Add to sources and raw documents
if source_object :
all_sources . append ( source_object )
2025-04-19 23:25:06 -07:00
all_raw_documents . extend ( youtube_chunks )
2025-04-20 19:19:35 -07:00
# Stream found document count
if streaming_service and writer :
streaming_service . only_update_terminal ( f " Found { len ( youtube_chunks ) } YouTube chunks relevant to the query " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
elif connector == " EXTENSION " :
2025-04-20 19:19:35 -07:00
source_object , extension_chunks = await connector_service . search_extension (
2025-04-19 23:25:06 -07:00
user_query = reformulated_query ,
user_id = user_id ,
search_space_id = search_space_id ,
top_k = top_k
)
2025-04-20 19:19:35 -07:00
# Add to sources and raw documents
if source_object :
all_sources . append ( source_object )
2025-04-19 23:25:06 -07:00
all_raw_documents . extend ( extension_chunks )
2025-04-20 19:19:35 -07:00
# Stream found document count
if streaming_service and writer :
streaming_service . only_update_terminal ( f " Found { len ( extension_chunks ) } extension chunks relevant to the query " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
elif connector == " CRAWLED_URL " :
2025-04-20 19:19:35 -07:00
source_object , crawled_urls_chunks = await connector_service . search_crawled_urls (
2025-04-19 23:25:06 -07:00
user_query = reformulated_query ,
user_id = user_id ,
search_space_id = search_space_id ,
top_k = top_k
)
2025-04-20 19:19:35 -07:00
# Add to sources and raw documents
if source_object :
all_sources . append ( source_object )
2025-04-19 23:25:06 -07:00
all_raw_documents . extend ( crawled_urls_chunks )
2025-04-20 19:19:35 -07:00
# Stream found document count
if streaming_service and writer :
streaming_service . only_update_terminal ( f " Found { len ( crawled_urls_chunks ) } crawled URL chunks relevant to the query " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
elif connector == " FILE " :
2025-04-20 19:19:35 -07:00
source_object , files_chunks = await connector_service . search_files (
2025-04-19 23:25:06 -07:00
user_query = reformulated_query ,
user_id = user_id ,
search_space_id = search_space_id ,
top_k = top_k
)
2025-04-20 19:19:35 -07:00
# Add to sources and raw documents
if source_object :
all_sources . append ( source_object )
2025-04-19 23:25:06 -07:00
all_raw_documents . extend ( files_chunks )
2025-04-20 19:19:35 -07:00
# Stream found document count
if streaming_service and writer :
streaming_service . only_update_terminal ( f " Found { len ( files_chunks ) } file chunks relevant to the query " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
elif connector == " TAVILY_API " :
2025-04-20 19:19:35 -07:00
source_object , tavily_chunks = await connector_service . search_tavily (
2025-04-19 23:25:06 -07:00
user_query = reformulated_query ,
user_id = user_id ,
top_k = top_k
)
2025-04-20 19:19:35 -07:00
# Add to sources and raw documents
if source_object :
all_sources . append ( source_object )
2025-04-19 23:25:06 -07:00
all_raw_documents . extend ( tavily_chunks )
2025-04-20 19:19:35 -07:00
# Stream found document count
if streaming_service and writer :
streaming_service . only_update_terminal ( f " Found { len ( tavily_chunks ) } web search results relevant to the query " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
elif connector == " SLACK_CONNECTOR " :
2025-04-20 19:19:35 -07:00
source_object , slack_chunks = await connector_service . search_slack (
2025-04-19 23:25:06 -07:00
user_query = reformulated_query ,
user_id = user_id ,
search_space_id = search_space_id ,
top_k = top_k
)
2025-04-20 19:19:35 -07:00
# Add to sources and raw documents
if source_object :
all_sources . append ( source_object )
2025-04-19 23:25:06 -07:00
all_raw_documents . extend ( slack_chunks )
2025-04-20 19:19:35 -07:00
# Stream found document count
if streaming_service and writer :
streaming_service . only_update_terminal ( f " Found { len ( slack_chunks ) } Slack messages relevant to the query " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
elif connector == " NOTION_CONNECTOR " :
2025-04-20 19:19:35 -07:00
source_object , notion_chunks = await connector_service . search_notion (
2025-04-19 23:25:06 -07:00
user_query = reformulated_query ,
user_id = user_id ,
search_space_id = search_space_id ,
top_k = top_k
)
2025-04-20 19:19:35 -07:00
# Add to sources and raw documents
if source_object :
all_sources . append ( source_object )
2025-04-19 23:25:06 -07:00
all_raw_documents . extend ( notion_chunks )
2025-04-20 19:19:35 -07:00
# Stream found document count
if streaming_service and writer :
streaming_service . only_update_terminal ( f " Found { len ( notion_chunks ) } Notion pages/blocks relevant to the query " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
elif connector == " GITHUB_CONNECTOR " :
source_object , github_chunks = await connector_service . search_github (
user_query = reformulated_query ,
user_id = user_id ,
search_space_id = search_space_id ,
top_k = top_k
)
# Add to sources and raw documents
if source_object :
all_sources . append ( source_object )
all_raw_documents . extend ( github_chunks )
# Stream found document count
if streaming_service and writer :
streaming_service . only_update_terminal ( f " Found { len ( github_chunks ) } GitHub files/issues relevant to the query " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
elif connector == " LINEAR_CONNECTOR " :
source_object , linear_chunks = await connector_service . search_linear (
user_query = reformulated_query ,
user_id = user_id ,
search_space_id = search_space_id ,
top_k = top_k
)
# Add to sources and raw documents
if source_object :
all_sources . append ( source_object )
all_raw_documents . extend ( linear_chunks )
# Stream found document count
if streaming_service and writer :
streaming_service . only_update_terminal ( f " Found { len ( linear_chunks ) } Linear issues relevant to the query " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
except Exception as e :
2025-04-20 19:19:35 -07:00
error_message = f " Error searching connector { connector } : { str ( e ) } "
print ( error_message )
# Stream error message
if streaming_service and writer :
streaming_service . only_update_terminal ( error_message , " error " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
# Continue with other connectors on error
continue
2025-04-20 19:19:35 -07:00
# Deduplicate source objects by ID before streaming
deduplicated_sources = [ ]
seen_source_keys = set ( )
for source_obj in all_sources :
# Use combination of source ID and type as a unique identifier
# This ensures we don't accidentally deduplicate sources from different connectors
source_id = source_obj . get ( ' id ' )
source_type = source_obj . get ( ' type ' )
if source_id and source_type :
source_key = f " { source_type } _ { source_id } "
if source_key not in seen_source_keys :
seen_source_keys . add ( source_key )
deduplicated_sources . append ( source_obj )
else :
# If there's no ID or type, just add it to be safe
deduplicated_sources . append ( source_obj )
# Stream info about deduplicated sources
if streaming_service and writer :
streaming_service . only_update_terminal ( f " Collected { len ( deduplicated_sources ) } unique sources across all connectors " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
# After all sources are collected and deduplicated, stream them
if streaming_service and writer :
streaming_service . only_update_sources ( deduplicated_sources )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
# Deduplicate raw documents based on chunk_id or content
2025-04-19 23:25:06 -07:00
seen_chunk_ids = set ( )
seen_content_hashes = set ( )
deduplicated_docs = [ ]
for doc in all_raw_documents :
chunk_id = doc . get ( " chunk_id " )
content = doc . get ( " content " , " " )
content_hash = hash ( content )
# Skip if we've seen this chunk_id or content before
if ( chunk_id and chunk_id in seen_chunk_ids ) or content_hash in seen_content_hashes :
continue
# Add to our tracking sets and keep this document
if chunk_id :
seen_chunk_ids . add ( chunk_id )
seen_content_hashes . add ( content_hash )
deduplicated_docs . append ( doc )
2025-04-20 19:19:35 -07:00
# Stream info about deduplicated documents
if streaming_service and writer :
streaming_service . only_update_terminal ( f " Found { len ( deduplicated_docs ) } unique document chunks after deduplication " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
# Return deduplicated documents
2025-04-19 23:25:06 -07:00
return deduplicated_docs
2025-04-20 19:19:35 -07:00
async def process_sections ( state : State , config : RunnableConfig , writer : StreamWriter ) - > Dict [ str , Any ] :
2025-04-19 23:25:06 -07:00
"""
Process all sections in parallel and combine the results .
This node takes the answer outline from the previous step , fetches relevant documents
for all questions across all sections once , and then processes each section in parallel
using the sub_section_writer graph with the shared document pool .
Returns :
Dict containing the final written report in the " final_written_report " key .
"""
# Get configuration and answer outline from state
configuration = Configuration . from_runnable_config ( config )
answer_outline = state . answer_outline
2025-04-20 19:19:35 -07:00
streaming_service = state . streaming_service
streaming_service . only_update_terminal ( f " Starting to process research sections... " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
print ( f " Processing sections from outline: { answer_outline is not None } " )
if not answer_outline :
2025-04-20 19:19:35 -07:00
streaming_service . only_update_terminal ( " Error: No answer outline was provided. Cannot generate report. " , " error " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
return {
" final_written_report " : " No answer outline was provided. Cannot generate final report. "
}
# Collect all questions from all sections
all_questions = [ ]
for section in answer_outline . answer_outline :
all_questions . extend ( section . questions )
print ( f " Collected { len ( all_questions ) } questions from all sections " )
2025-04-20 19:19:35 -07:00
streaming_service . only_update_terminal ( f " Found { len ( all_questions ) } research questions across { len ( answer_outline . answer_outline ) } sections " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
# Fetch relevant documents once for all questions
2025-04-20 19:19:35 -07:00
streaming_service . only_update_terminal ( " Searching for relevant information across all connectors... " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
relevant_documents = [ ]
2025-04-20 19:19:35 -07:00
async with async_session_maker ( ) as db_session :
2025-04-20 00:19:21 -07:00
try :
relevant_documents = await fetch_relevant_documents (
research_questions = all_questions ,
user_id = configuration . user_id ,
search_space_id = configuration . search_space_id ,
db_session = db_session ,
2025-04-20 19:19:35 -07:00
connectors_to_search = configuration . connectors_to_search ,
writer = writer ,
state = state
2025-04-20 00:19:21 -07:00
)
except Exception as e :
2025-04-20 19:19:35 -07:00
error_message = f " Error fetching relevant documents: { str ( e ) } "
print ( error_message )
streaming_service . only_update_terminal ( error_message , " error " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-20 00:19:21 -07:00
# Log the error and continue with an empty list of documents
# This allows the process to continue, but the report might lack information
relevant_documents = [ ]
# Consider adding more robust error handling or reporting if needed
2025-04-19 23:25:06 -07:00
print ( f " Fetched { len ( relevant_documents ) } relevant documents for all sections " )
2025-04-20 19:19:35 -07:00
streaming_service . only_update_terminal ( f " Starting to draft { len ( answer_outline . answer_outline ) } sections using { len ( relevant_documents ) } relevant document chunks " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
# Create tasks to process each section in parallel with the same document set
section_tasks = [ ]
2025-04-20 19:19:35 -07:00
streaming_service . only_update_terminal ( " Creating processing tasks for each section... " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
for section in answer_outline . answer_outline :
section_tasks . append (
process_section_with_documents (
section_title = section . section_title ,
section_questions = section . questions ,
2025-04-20 19:19:35 -07:00
user_query = configuration . user_query ,
2025-04-19 23:25:06 -07:00
user_id = configuration . user_id ,
search_space_id = configuration . search_space_id ,
2025-04-20 19:19:35 -07:00
relevant_documents = relevant_documents ,
state = state ,
writer = writer
2025-04-19 23:25:06 -07:00
)
)
# Run all section processing tasks in parallel
print ( f " Running { len ( section_tasks ) } section processing tasks in parallel " )
2025-04-20 19:19:35 -07:00
streaming_service . only_update_terminal ( f " Processing { len ( section_tasks ) } sections simultaneously... " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
section_results = await asyncio . gather ( * section_tasks , return_exceptions = True )
# Handle any exceptions in the results
2025-04-20 19:19:35 -07:00
streaming_service . only_update_terminal ( " Combining section results into final report... " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
processed_results = [ ]
for i , result in enumerate ( section_results ) :
if isinstance ( result , Exception ) :
section_title = answer_outline . answer_outline [ i ] . section_title
error_message = f " Error processing section ' { section_title } ' : { str ( result ) } "
print ( error_message )
2025-04-20 19:19:35 -07:00
streaming_service . only_update_terminal ( error_message , " error " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
processed_results . append ( error_message )
else :
processed_results . append ( result )
# Combine the results into a final report with section titles
final_report = [ ]
for i , ( section , content ) in enumerate ( zip ( answer_outline . answer_outline , processed_results ) ) :
2025-04-20 00:10:23 -07:00
# Skip adding the section header since the content already contains the title
2025-04-19 23:25:06 -07:00
final_report . append ( content )
2025-04-20 19:19:35 -07:00
final_report . append ( " \n " )
2025-04-19 23:25:06 -07:00
# Join all sections with newlines
final_written_report = " \n " . join ( final_report )
print ( f " Generated final report with { len ( final_report ) } parts " )
2025-04-20 19:19:35 -07:00
streaming_service . only_update_terminal ( " Final research report generated successfully! " )
writer ( { " yeild_value " : streaming_service . _format_annotations ( ) } )
if hasattr ( state , ' streaming_service ' ) and state . streaming_service :
# Convert the final report to the expected format for UI:
# A list of strings where empty strings represent line breaks
formatted_report = [ ]
for section in final_report :
if section == " \n " :
# Add an empty string for line breaks
formatted_report . append ( " " )
else :
# Split any multiline content by newlines and add each line
section_lines = section . split ( " \n " )
formatted_report . extend ( section_lines )
state . streaming_service . only_update_answer ( formatted_report )
writer ( { " yeild_value " : state . streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
return {
" final_written_report " : final_written_report
}
async def process_section_with_documents (
section_title : str ,
section_questions : List [ str ] ,
user_id : str ,
search_space_id : int ,
2025-04-20 19:19:35 -07:00
relevant_documents : List [ Dict [ str , Any ] ] ,
user_query : str ,
state : State = None ,
writer : StreamWriter = None
2025-04-19 23:25:06 -07:00
) - > str :
"""
Process a single section using pre - fetched documents .
Args :
section_title : The title of the section
section_questions : List of research questions for this section
user_id : The user ID
search_space_id : The search space ID
relevant_documents : Pre - fetched documents to use for this section
2025-04-20 19:19:35 -07:00
state : The current state
writer : StreamWriter for sending progress updates
2025-04-19 23:25:06 -07:00
Returns :
The written section content
"""
try :
2025-04-20 19:19:35 -07:00
# Use the provided documents
documents_to_use = relevant_documents
# Send status update via streaming if available
if state and state . streaming_service and writer :
state . streaming_service . only_update_terminal ( f " Writing section: { section_title } with { len ( section_questions ) } research questions " )
writer ( { " yeild_value " : state . streaming_service . _format_annotations ( ) } )
# Fallback if no documents found
if not documents_to_use :
print ( f " No relevant documents found for section: { section_title } " )
if state and state . streaming_service and writer :
state . streaming_service . only_update_terminal ( f " Warning: No relevant documents found for section: { section_title } " , " warning " )
writer ( { " yeild_value " : state . streaming_service . _format_annotations ( ) } )
documents_to_use = [
{ " content " : f " No specific information was found for: { question } " }
for question in section_questions
]
2025-04-19 23:25:06 -07:00
# Create a new database session for this section
2025-04-20 19:19:35 -07:00
async with async_session_maker ( ) as db_session :
2025-04-19 23:25:06 -07:00
# Call the sub_section_writer graph with the appropriate config
config = {
" configurable " : {
" sub_section_title " : section_title ,
" sub_section_questions " : section_questions ,
2025-04-20 19:19:35 -07:00
" user_query " : user_query ,
2025-04-19 23:25:06 -07:00
" relevant_documents " : documents_to_use ,
" user_id " : user_id ,
" search_space_id " : search_space_id
}
}
# Create the initial state with db_session
2025-04-20 19:19:35 -07:00
sub_state = { " db_session " : db_session }
2025-04-19 23:25:06 -07:00
# Invoke the sub-section writer graph
print ( f " Invoking sub_section_writer for: { section_title } " )
2025-04-20 19:19:35 -07:00
if state and state . streaming_service and writer :
state . streaming_service . only_update_terminal ( f " Analyzing information and drafting content for section: { section_title } " )
writer ( { " yeild_value " : state . streaming_service . _format_annotations ( ) } )
result = await sub_section_writer_graph . ainvoke ( sub_state , config )
2025-04-19 23:25:06 -07:00
# Return the final answer from the sub_section_writer
final_answer = result . get ( " final_answer " , " No content was generated for this section. " )
2025-04-20 19:19:35 -07:00
# Send section content update via streaming if available
if state and state . streaming_service and writer :
state . streaming_service . only_update_terminal ( f " Completed writing section: { section_title } " )
writer ( { " yeild_value " : state . streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
return final_answer
except Exception as e :
print ( f " Error processing section ' { section_title } ' : { str ( e ) } " )
2025-04-20 19:19:35 -07:00
# Send error update via streaming if available
if state and state . streaming_service and writer :
state . streaming_service . only_update_terminal ( f " Error processing section ' { section_title } ' : { str ( e ) } " , " error " )
writer ( { " yeild_value " : state . streaming_service . _format_annotations ( ) } )
2025-04-19 23:25:06 -07:00
return f " Error processing section: { section_title } . Details: { str ( e ) } "