2025-11-19 15:04:46 -08:00
import datetime
2025-07-24 14:43:48 -07:00
from typing import Any
2025-06-03 00:10:35 -07:00
from langchain_core . messages import HumanMessage , SystemMessage
2025-07-24 14:43:48 -07:00
from langchain_core . runnables import RunnableConfig
2025-12-05 00:14:36 -08:00
from langgraph . types import StreamWriter
2025-11-19 15:04:46 -08:00
from sqlalchemy import select
2025-07-24 14:43:48 -07:00
2025-11-19 15:04:46 -08:00
from app . db import SearchSpace
2025-07-24 14:43:48 -07:00
from app . services . reranker_service import RerankerService
2025-06-05 20:33:09 -07:00
from . . utils import (
calculate_token_count ,
2025-07-25 15:11:19 -07:00
format_documents_section ,
2025-08-27 18:43:33 -07:00
langchain_chat_history_to_str ,
2025-07-24 14:43:48 -07:00
optimize_documents_for_token_limit ,
2025-07-25 15:11:19 -07:00
)
2025-07-24 14:43:48 -07:00
from . configuration import Configuration
2025-11-19 15:04:46 -08:00
from . default_prompts import (
DEFAULT_QNA_BASE_PROMPT ,
DEFAULT_QNA_CITATION_INSTRUCTIONS ,
DEFAULT_QNA_NO_DOCUMENTS_PROMPT ,
)
2025-07-24 14:43:48 -07:00
from . state import State
2025-07-25 15:11:19 -07:00
2025-06-03 00:10:35 -07:00
2025-11-19 15:04:46 -08:00
def _build_language_instruction ( language : str | None = None ) :
""" Build language instruction for prompts. """
if language :
return f " \n \n IMPORTANT: Please respond in { language } language. All your responses, explanations, and analysis should be written in { language } . "
return " "
def _build_chat_history_section ( chat_history : str | None = None ) :
""" Build chat history section for prompts. """
if chat_history :
return f """
< chat_history >
{ chat_history if chat_history else " NO CHAT HISTORY PROVIDED " }
< / chat_history >
"""
return """
< chat_history >
NO CHAT HISTORY PROVIDED
< / chat_history >
"""
def _format_system_prompt (
prompt_template : str ,
chat_history : str | None = None ,
language : str | None = None ,
) :
""" Format a system prompt template with dynamic values. """
date = datetime . datetime . now ( ) . strftime ( " % Y- % m- %d " )
language_instruction = _build_language_instruction ( language )
chat_history_section = _build_chat_history_section ( chat_history )
return prompt_template . format (
date = date ,
language_instruction = language_instruction ,
chat_history_section = chat_history_section ,
)
2025-07-24 14:43:48 -07:00
async def rerank_documents ( state : State , config : RunnableConfig ) - > dict [ str , Any ] :
2025-06-03 00:10:35 -07:00
"""
Rerank the documents based on relevance to the user ' s question.
2025-07-25 15:11:19 -07:00
2025-06-03 00:10:35 -07:00
This node takes the relevant documents provided in the configuration ,
reranks them using the reranker service based on the user ' s query,
and updates the state with the reranked documents .
2025-07-25 15:11:19 -07:00
2025-12-14 22:07:31 -08:00
Documents are now document - grouped with a ` chunks ` list . Reranking is done
using the concatenated ` content ` field , and the full structure ( including
` chunks ` ) is preserved for proper citation formatting .
2025-10-29 23:23:08 -07:00
If reranking is disabled , returns the original documents without processing .
2025-06-03 00:10:35 -07:00
Returns :
Dict containing the reranked documents .
"""
# Get configuration and relevant documents
configuration = Configuration . from_runnable_config ( config )
documents = configuration . relevant_documents
user_query = configuration . user_query
2025-06-05 23:52:34 -07:00
reformulated_query = configuration . reformulated_query
2025-06-03 00:10:35 -07:00
# If no documents were provided, return empty list
if not documents or len ( documents ) == 0 :
2025-07-25 15:11:19 -07:00
return { " reranked_documents " : [ ] }
2025-06-03 00:10:35 -07:00
# Get reranker service from app config
2025-07-03 14:09:36 -07:00
reranker_service = RerankerService . get_reranker_instance ( )
2025-07-25 15:11:19 -07:00
2025-12-12 04:17:30 -08:00
# If reranking is not enabled, sort by existing score and return
2025-10-29 23:23:08 -07:00
if not reranker_service :
2025-12-12 04:17:30 -08:00
print ( " Reranking is disabled. Sorting documents by existing score. " )
sorted_documents = sorted (
documents , key = lambda x : x . get ( " score " , 0 ) , reverse = True
)
return { " reranked_documents " : sorted_documents }
2025-10-29 23:23:08 -07:00
# Perform reranking
try :
2025-12-14 22:07:31 -08:00
# Pass documents directly to reranker - it will use:
# - "content" (concatenated chunk text) for scoring
# - "chunk_id" (primary chunk id) for matching
# The full document structure including "chunks" is preserved
2025-10-29 23:23:08 -07:00
reranked_docs = reranker_service . rerank_documents (
2025-12-14 22:07:31 -08:00
user_query + " \n " + reformulated_query , documents
2025-10-29 23:23:08 -07:00
)
# Sort by score in descending order
reranked_docs . sort ( key = lambda x : x . get ( " score " , 0 ) , reverse = True )
print ( f " Reranked { len ( reranked_docs ) } documents for Q&A query: { user_query } " )
return { " reranked_documents " : reranked_docs }
except Exception as e :
print ( f " Error during reranking: { e !s} " )
# Fall back to original documents if reranking fails
return { " reranked_documents " : documents }
2025-07-25 15:11:19 -07:00
2025-06-03 00:10:35 -07:00
2025-12-05 00:14:36 -08:00
async def answer_question (
state : State , config : RunnableConfig , writer : StreamWriter
) - > dict [ str , Any ] :
2025-06-03 00:10:35 -07:00
"""
2025-12-05 00:14:36 -08:00
Answer the user ' s question using the provided documents with real-time streaming.
2025-07-25 15:11:19 -07:00
2025-06-03 00:10:35 -07:00
This node takes the relevant documents provided in the configuration and uses
an LLM to generate a comprehensive answer to the user ' s question with
2025-12-14 22:07:31 -08:00
proper citations . The citations follow [ citation : chunk_id ] format using chunk IDs from the
` < chunk id = ' ... ' > ` tags in the provided documents . If no documents are provided , it will use chat history to generate
2025-06-04 23:09:31 -07:00
an answer .
2025-07-25 15:11:19 -07:00
2025-12-05 00:14:36 -08:00
The response is streamed token - by - token for real - time updates to the frontend .
2025-06-03 00:10:35 -07:00
Returns :
Dict containing the final answer in the " final_answer " key .
"""
2025-11-27 22:45:04 -08:00
from app . services . llm_service import get_fast_llm
2025-07-25 15:11:19 -07:00
2025-06-03 00:10:35 -07:00
# Get configuration and relevant documents from configuration
configuration = Configuration . from_runnable_config ( config )
2025-06-05 20:33:09 -07:00
documents = state . reranked_documents
2025-06-03 00:10:35 -07:00
user_query = configuration . user_query
2025-10-10 00:50:29 -07:00
search_space_id = configuration . search_space_id
2025-10-12 20:15:27 -07:00
language = configuration . language
2025-11-19 15:04:46 -08:00
2025-12-05 00:14:36 -08:00
# Get streaming service from state
streaming_service = state . streaming_service
2025-11-19 15:04:46 -08:00
# Fetch search space to get QnA configuration
result = await state . db_session . execute (
select ( SearchSpace ) . where ( SearchSpace . id == search_space_id )
)
search_space = result . scalar_one_or_none ( )
if not search_space :
error_message = f " Search space { search_space_id } not found "
print ( error_message )
raise RuntimeError ( error_message )
# Get QnA configuration from search space
citations_enabled = search_space . citations_enabled
custom_instructions_text = search_space . qna_custom_instructions or " "
# Use constants for base prompt and citation instructions
qna_base_prompt = DEFAULT_QNA_BASE_PROMPT
qna_citation_instructions = (
DEFAULT_QNA_CITATION_INSTRUCTIONS if citations_enabled else " "
)
qna_custom_instructions = (
f " \n <special_important_custom_instructions> \n { custom_instructions_text } \n </special_important_custom_instructions> "
if custom_instructions_text
else " "
)
2025-11-27 22:45:04 -08:00
# Get search space's fast LLM
llm = await get_fast_llm ( state . db_session , search_space_id )
2025-06-09 15:50:15 -07:00
if not llm :
2025-11-27 22:45:04 -08:00
error_message = f " No fast LLM configured for search space { search_space_id } "
2025-06-09 15:50:15 -07:00
print ( error_message )
raise RuntimeError ( error_message )
2025-07-25 15:11:19 -07:00
2025-06-05 20:33:09 -07:00
# Determine if we have documents and optimize for token limits
has_documents_initially = documents and len ( documents ) > 0
2025-08-27 18:43:33 -07:00
chat_history_str = langchain_chat_history_to_str ( state . chat_history )
2025-07-25 15:11:19 -07:00
2025-06-05 20:33:09 -07:00
if has_documents_initially :
2025-11-19 15:04:46 -08:00
# Compose the full citation prompt: base + citation instructions + custom instructions
full_citation_prompt_template = (
qna_base_prompt + qna_citation_instructions + qna_custom_instructions
)
2025-06-05 20:33:09 -07:00
# Create base message template for token calculation (without documents)
base_human_message_template = f """
User ' s question:
< user_query >
{ user_query }
< / user_query >
2025-06-03 00:10:35 -07:00
2025-06-05 20:33:09 -07:00
Please provide a detailed , comprehensive answer to the user ' s question using the information from their personal knowledge sources. Make sure to cite all information appropriately and engage in a conversational manner.
2025-06-03 00:10:35 -07:00
"""
2025-07-25 15:11:19 -07:00
2025-06-05 20:33:09 -07:00
# Use initial system prompt for token calculation
2025-11-19 15:04:46 -08:00
initial_system_prompt = _format_system_prompt (
full_citation_prompt_template , chat_history_str , language
2025-10-12 20:15:27 -07:00
)
2025-07-24 14:43:48 -07:00
base_messages = [
2025-06-05 20:33:09 -07:00
SystemMessage ( content = initial_system_prompt ) ,
2025-07-25 15:11:19 -07:00
HumanMessage ( content = base_human_message_template ) ,
2025-06-05 20:33:09 -07:00
]
2025-07-25 15:11:19 -07:00
2025-06-05 20:33:09 -07:00
# Optimize documents to fit within token limits
2025-07-25 15:11:19 -07:00
optimized_documents , has_optimized_documents = (
optimize_documents_for_token_limit ( documents , base_messages , llm . model )
2025-06-05 20:33:09 -07:00
)
2025-07-25 15:11:19 -07:00
2025-06-05 20:33:09 -07:00
# Update state based on optimization result
documents = optimized_documents
has_documents = has_optimized_documents
else :
has_documents = False
2025-07-25 15:11:19 -07:00
2025-06-05 20:33:09 -07:00
# Choose system prompt based on final document availability
2025-11-19 15:04:46 -08:00
# With documents: use base + citation instructions + custom instructions
# Without documents: use the default no-documents prompt from constants
if has_documents :
full_citation_prompt_template = (
qna_base_prompt + qna_citation_instructions + qna_custom_instructions
)
system_prompt = _format_system_prompt (
full_citation_prompt_template , chat_history_str , language
)
else :
system_prompt = _format_system_prompt (
DEFAULT_QNA_NO_DOCUMENTS_PROMPT + qna_custom_instructions ,
chat_history_str ,
language ,
)
2025-07-25 15:11:19 -07:00
2025-06-05 20:33:09 -07:00
# Generate documents section
2025-07-25 15:11:19 -07:00
documents_text = (
format_documents_section (
documents , " Source material from your personal knowledge base "
)
if has_documents
else " "
)
2025-06-05 20:33:09 -07:00
# Create final human message content
instruction_text = (
" Please provide a detailed, comprehensive answer to the user ' s question using the information from their personal knowledge sources. Make sure to cite all information appropriately and engage in a conversational manner. "
2025-07-25 15:11:19 -07:00
if has_documents
else " Please provide a helpful answer to the user ' s question based on our conversation history and your general knowledge. Engage in a conversational manner. "
2025-06-05 20:33:09 -07:00
)
2025-07-25 15:11:19 -07:00
2025-06-03 00:10:35 -07:00
human_message_content = f """
2025-06-04 23:09:31 -07:00
{ documents_text }
2025-06-03 00:10:35 -07:00
User ' s question:
< user_query >
{ user_query }
< / user_query >
2025-06-05 20:33:09 -07:00
{ instruction_text }
2025-06-03 00:10:35 -07:00
"""
2025-07-25 15:11:19 -07:00
2025-06-05 20:33:09 -07:00
# Create final messages for the LLM
2025-07-24 14:43:48 -07:00
messages_with_chat_history = [
2025-06-04 23:09:31 -07:00
SystemMessage ( content = system_prompt ) ,
2025-07-25 15:11:19 -07:00
HumanMessage ( content = human_message_content ) ,
2025-06-03 00:10:35 -07:00
]
2025-07-25 15:11:19 -07:00
2025-06-05 20:33:09 -07:00
# Log final token count
2025-06-09 15:50:15 -07:00
total_tokens = calculate_token_count ( messages_with_chat_history , llm . model )
2025-06-05 20:33:09 -07:00
print ( f " Final token count: { total_tokens } " )
2025-07-25 15:11:19 -07:00
2025-12-05 00:14:36 -08:00
# Stream the LLM response token by token
final_answer = " "
async for chunk in llm . astream ( messages_with_chat_history ) :
# Extract the content from the chunk
if hasattr ( chunk , " content " ) and chunk . content :
token = chunk . content
final_answer + = token
# Stream the token to the frontend via custom stream
if streaming_service :
writer ( { " yield_value " : streaming_service . format_text_chunk ( token ) } )
2025-07-25 15:11:19 -07:00
return { " final_answer " : final_answer }