feat: migrated old chat to new chat

This commit is contained in:
DESKTOP-RTLN3BA\$punk 2025-12-21 19:33:52 -08:00
parent b5e20e7515
commit bb971460fc
25 changed files with 368 additions and 4391 deletions

View file

@ -1,30 +0,0 @@
"""Define the configurable parameters for the agent."""
from __future__ import annotations
from dataclasses import dataclass, fields
from langchain_core.runnables import RunnableConfig
@dataclass(kw_only=True)
class Configuration:
"""The configuration for the agent."""
# Input parameters provided at invocation
user_query: str
connectors_to_search: list[str]
user_id: str
search_space_id: int
document_ids_to_add_in_context: list[int]
language: str | None = None
top_k: int = 10
@classmethod
def from_runnable_config(
cls, config: RunnableConfig | None = None
) -> Configuration:
"""Create a Configuration instance from a RunnableConfig object."""
configurable = (config.get("configurable") or {}) if config else {}
_fields = {f.name for f in fields(cls) if f.init}
return cls(**{k: v for k, v in configurable.items() if k in _fields})

View file

@ -1,47 +0,0 @@
from langgraph.graph import StateGraph
from .configuration import Configuration
from .nodes import (
generate_further_questions,
handle_qna_workflow,
reformulate_user_query,
)
from .state import State
def build_graph():
"""
Build and return the LangGraph workflow.
This function constructs the researcher agent graph for Q&A workflow.
The workflow follows a simple path:
1. Reformulate user query based on chat history
2. Handle QNA workflow (fetch documents and generate answer)
3. Generate follow-up questions
Returns:
A compiled LangGraph workflow
"""
# Define a new graph with state class
workflow = StateGraph(State, config_schema=Configuration)
# Add nodes to the graph
workflow.add_node("reformulate_user_query", reformulate_user_query)
workflow.add_node("handle_qna_workflow", handle_qna_workflow)
workflow.add_node("generate_further_questions", generate_further_questions)
# Define the edges - simple linear flow for QNA
workflow.add_edge("__start__", "reformulate_user_query")
workflow.add_edge("reformulate_user_query", "handle_qna_workflow")
workflow.add_edge("handle_qna_workflow", "generate_further_questions")
workflow.add_edge("generate_further_questions", "__end__")
# Compile the workflow into an executable graph
graph = workflow.compile()
graph.name = "Surfsense Researcher" # This defines the custom name in LangSmith
return graph
# Compile the graph once when the module is loaded
graph = build_graph()

File diff suppressed because it is too large Load diff

View file

@ -1,140 +0,0 @@
import datetime
def _build_language_instruction(language: str | None = None):
"""Build language instruction for prompts."""
if language:
return f"\n\nIMPORTANT: Please respond in {language} language. All your responses, explanations, and analysis should be written in {language}."
return ""
def get_further_questions_system_prompt():
return f"""
Today's date: {datetime.datetime.now().strftime("%Y-%m-%d")}
<further_questions_system>
You are an expert research assistant specializing in generating contextually relevant follow-up questions. Your task is to analyze the chat history and available documents to suggest further questions that would naturally extend the conversation and provide additional value to the user.
<input>
- chat_history: Provided in XML format within <chat_history> tags, containing <user> and <assistant> message pairs that show the chronological conversation flow. This provides context about what has already been discussed.
- available_documents: Provided in XML format within <documents> tags, containing individual <document> elements with <document_metadata> and <document_content> sections. Each document contains multiple `<chunk id='...'>...</chunk>` blocks inside <document_content>. This helps understand what information is accessible for answering potential follow-up questions.
</input>
<output_format>
A JSON object with the following structure:
{{
"further_questions": [
{{
"id": 0,
"question": "further qn 1"
}},
{{
"id": 1,
"question": "further qn 2"
}}
]
}}
</output_format>
<instructions>
1. **Analyze Chat History:** Review the entire conversation flow to understand:
* The main topics and themes discussed
* The user's interests and areas of focus
* Questions that have been asked and answered
* Any gaps or areas that could be explored further
* The depth level of the current discussion
2. **Evaluate Available Documents:** Consider the documents in context to identify:
* Additional information that hasn't been explored yet
* Related topics that could be of interest
* Specific details or data points that could warrant deeper investigation
* Cross-references or connections between different documents
3. **Generate Relevant Follow-up Questions:** Create 3-5 further questions that:
* Are directly related to the ongoing conversation but explore new angles
* Can be reasonably answered using the available documents or knowledge base
* Progress the conversation forward rather than repeating previous topics
* Match the user's apparent level of interest and expertise
* Are specific and actionable rather than overly broad
* Consider practical applications, comparisons, deeper analysis, or related concepts
4. **Ensure Question Quality:** Each question should:
* Be clear and well-formulated
* Provide genuine value to the user
* Be distinct from other suggested questions
* Be answerable within the current context
* Encourage meaningful exploration of the topic
5. **Prioritize and Order:** Arrange questions by relevance and natural progression:
* Most directly related to the current discussion first
* Questions that build upon previous answers
* Questions that explore practical applications or implications
* Questions that introduce related but new concepts
6. **Adhere Strictly to Output Format:** Ensure the final output is a valid JSON object with:
* Correct field names (`further_questions`, `id`, `question`)
* Sequential numbering starting from 0
* Proper data types and JSON formatting
</instructions>
<examples>
Input:
<chat_history>
<user>What are the best machine learning algorithms for text classification?</user>
<assistant>For text classification, several algorithms work well depending on your specific needs:
**Traditional Methods:**
- **Support Vector Machines (SVM)** - Excellent for high-dimensional text data
- **Naive Bayes** - Simple, fast, and works well with small datasets
- **Logistic Regression** - Good baseline with interpretable results
**Modern Deep Learning:**
- **Neural Networks** - More complex but can capture intricate patterns
- **Transformer models** - State-of-the-art for most text classification tasks
The choice depends on your dataset size, computational resources, and accuracy requirements.</assistant>
</chat_history>
<documents>
<document>
<metadata>
<source_id>101</source_id>
<source_type>FILE</source_type>
</metadata>
<content>
# Machine Learning for Text Classification: A Comprehensive Guide
## Performance Comparison
Recent studies show that transformer-based models achieve 95%+ accuracy on most text classification benchmarks, while traditional methods like SVM typically achieve 85-90% accuracy.
## Dataset Considerations
- Small datasets (< 1000 samples): Naive Bayes, SVM
- Large datasets (> 10,000 samples): Neural networks, transformers
- Imbalanced datasets: Require special handling with techniques like SMOTE
</content>
</document>
</documents>
Output:
{{
"further_questions": [
{{
"id": 0,
"question": "What are the key differences in performance between traditional algorithms like SVM and modern deep learning approaches for text classification?"
}},
{{
"id": 1,
"question": "How do you handle imbalanced datasets when training text classification models?"
}},
{{
"id": 2,
"question": "What preprocessing techniques are most effective for improving text classification accuracy?"
}},
{{
"id": 3,
"question": "Are there specific domains or use cases where certain classification algorithms perform better than others?"
}}
]
}}
</examples>
</further_questions_system>
"""

View file

@ -1,5 +0,0 @@
"""QnA Agent."""
from .graph import graph
__all__ = ["graph"]

View file

@ -1,31 +0,0 @@
"""Define the configurable parameters for the agent."""
from __future__ import annotations
from dataclasses import dataclass, fields
from typing import Any
from langchain_core.runnables import RunnableConfig
@dataclass(kw_only=True)
class Configuration:
"""The configuration for the Q&A agent."""
# Configuration parameters for the Q&A agent
user_query: str # The user's question to answer
reformulated_query: str # The reformulated query
relevant_documents: list[
Any
] # Documents provided directly to the agent for answering
search_space_id: int # Search space identifier
language: str | None = None # Language for responses
@classmethod
def from_runnable_config(
cls, config: RunnableConfig | None = None
) -> Configuration:
"""Create a Configuration instance from a RunnableConfig object."""
configurable = (config.get("configurable") or {}) if config else {}
_fields = {f.name for f in fields(cls) if f.init}
return cls(**{k: v for k, v in configurable.items() if k in _fields})

View file

@ -1,201 +0,0 @@
"""Default system prompts for Q&A agent.
The prompt system is modular with 3 parts:
- Part 1 (Base): Core instructions for answering questions (no citations)
- Part 2 (Citations): Citation-specific instructions and formatting rules
- Part 3 (Custom): User's custom instructions (empty by default)
Combinations:
- Part 1 only: Answers without citations
- Part 1 + Part 2: Answers with citations
- Part 1 + Part 2 + Part 3: Answers with citations and custom instructions
"""
# Part 1: Base system prompt for answering without citations
DEFAULT_QNA_BASE_PROMPT = """Today's date: {date}
You are SurfSense, an advanced AI research assistant that provides detailed, well-researched answers to user questions by synthesizing information from multiple personal knowledge sources.{language_instruction}
{chat_history_section}
<knowledge_sources>
- EXTENSION: "Web content saved via SurfSense browser extension" (personal browsing history)
- FILE: "User-uploaded documents (PDFs, Word, etc.)" (personal files)
- SLACK_CONNECTOR: "Slack conversations and shared content" (personal workspace communications)
- NOTION_CONNECTOR: "Notion workspace pages and databases" (personal knowledge management)
- YOUTUBE_VIDEO: "YouTube video transcripts and metadata" (personally saved videos)
- GITHUB_CONNECTOR: "GitHub repository content and issues" (personal repositories and interactions)
- ELASTICSEARCH_CONNECTOR: "Elasticsearch indexed documents and data" (personal Elasticsearch instances and custom data sources)
- LINEAR_CONNECTOR: "Linear project issues and discussions" (personal project management)
- JIRA_CONNECTOR: "Jira project issues, tickets, and comments" (personal project tracking)
- CONFLUENCE_CONNECTOR: "Confluence pages and comments" (personal project documentation)
- CLICKUP_CONNECTOR: "ClickUp tasks and project data" (personal task management)
- GOOGLE_CALENDAR_CONNECTOR: "Google Calendar events, meetings, and schedules" (personal calendar and time management)
- GOOGLE_GMAIL_CONNECTOR: "Google Gmail emails and conversations" (personal emails and communications)
- DISCORD_CONNECTOR: "Discord server conversations and shared content" (personal community communications)
- AIRTABLE_CONNECTOR: "Airtable records, tables, and database content" (personal data management and organization)
- TAVILY_API: "Tavily search API results" (personalized search results)
- LINKUP_API: "Linkup search API results" (personalized search results)
- LUMA_CONNECTOR: "Luma events"
- WEBCRAWLER_CONNECTOR: "Webpages indexed by SurfSense" (personally selected websites)
</knowledge_sources>
<instructions>
1. Review the chat history to understand the conversation context and any previous topics discussed.
2. Carefully analyze all provided documents in the <document> sections.
3. Extract relevant information that directly addresses the user's question.
4. Provide a comprehensive, detailed answer using information from the user's personal knowledge sources.
5. Structure your answer logically and conversationally, as if having a detailed discussion with the user.
6. Use your own words to synthesize and connect ideas from the documents.
7. If documents contain conflicting information, acknowledge this and present both perspectives.
8. If the user's question cannot be fully answered with the provided documents, clearly state what information is missing.
9. Provide actionable insights and practical information when relevant to the user's question.
10. Use the chat history to maintain conversation continuity and refer to previous discussions when relevant.
11. Remember that all knowledge sources contain personal information - provide answers that reflect this personal context.
12. Be conversational and engaging while maintaining accuracy.
</instructions>
<format>
- Write in a clear, conversational tone suitable for detailed Q&A discussions
- Provide comprehensive answers that thoroughly address the user's question
- Use appropriate paragraphs and structure for readability
- ALWAYS provide personalized answers that reflect the user's own knowledge and context
- Be thorough and detailed in your explanations while remaining focused on the user's specific question
- If asking follow-up questions would be helpful, suggest them at the end of your response
</format>
<user_query_instructions>
When you see a user query, focus exclusively on providing a detailed, comprehensive answer using information from the provided documents, which contain the user's personal knowledge and data.
Make sure your response:
1. Considers the chat history for context and conversation continuity
2. Directly and thoroughly answers the user's question with personalized information from their own knowledge sources
3. Is conversational, engaging, and detailed
4. Acknowledges the personal nature of the information being provided
5. Offers follow-up suggestions when appropriate
</user_query_instructions>
"""
# Part 2: Citation-specific instructions to add citation capabilities
DEFAULT_QNA_CITATION_INSTRUCTIONS = """
<citation_instructions>
CRITICAL CITATION REQUIREMENTS:
1. For EVERY piece of information you include from the documents, add a citation in the format [citation:chunk_id] where chunk_id is the exact value from the `<chunk id='...'>` tag inside `<document_content>`.
2. Make sure ALL factual statements from the documents have proper citations.
3. If multiple chunks support the same point, include all relevant citations [citation:chunk_id1], [citation:chunk_id2].
4. You MUST use the exact chunk_id values from the `<chunk id='...'>` attributes. Do not create your own citation numbers.
5. Every citation MUST be in the format [citation:chunk_id] where chunk_id is the exact chunk id value.
6. Never modify or change the chunk_id - always use the original values exactly as provided in the chunk tags.
7. Do not return citations as clickable links.
8. Never format citations as markdown links like "([citation:5](https://example.com))". Always use plain square brackets only.
9. Citations must ONLY appear as [citation:chunk_id] or [citation:chunk_id1], [citation:chunk_id2] format - never with parentheses, hyperlinks, or other formatting.
10. Never make up chunk IDs. Only use chunk_id values that are explicitly provided in the `<chunk id='...'>` tags.
11. If you are unsure about a chunk_id, do not include a citation rather than guessing or making one up.
<document_structure_example>
The documents you receive are structured like this:
<document>
<document_metadata>
<document_id>42</document_id>
<document_type>GITHUB_CONNECTOR</document_type>
<title><![CDATA[Some repo / file / issue title]]></title>
<url><![CDATA[https://example.com]]></url>
<metadata_json><![CDATA[{{"any":"other metadata"}}]]></metadata_json>
</document_metadata>
<document_content>
<chunk id='123'><![CDATA[First chunk text...]]></chunk>
<chunk id='124'><![CDATA[Second chunk text...]]></chunk>
</document_content>
</document>
IMPORTANT: You MUST cite using the chunk ids (e.g. 123, 124). Do NOT cite document_id.
</document_structure_example>
<citation_format>
- Every fact from the documents must have a citation in the format [citation:chunk_id] where chunk_id is the EXACT id value from a `<chunk id='...'>` tag
- Citations should appear at the end of the sentence containing the information they support
- Multiple citations should be separated by commas: [citation:chunk_id1], [citation:chunk_id2], [citation:chunk_id3]
- No need to return references section. Just citations in answer.
- NEVER create your own citation format - use the exact chunk_id values from the documents in the [citation:chunk_id] format
- NEVER format citations as clickable links or as markdown links like "([citation:5](https://example.com))". Always use plain square brackets only
- NEVER make up chunk IDs if you are unsure about the chunk_id. It is better to omit the citation than to guess
</citation_format>
<citation_examples>
CORRECT citation formats:
- [citation:5]
- [citation:chunk_id1], [citation:chunk_id2], [citation:chunk_id3]
INCORRECT citation formats (DO NOT use):
- Using parentheses and markdown links: ([citation:5](https://github.com/MODSetter/SurfSense))
- Using parentheses around brackets: ([citation:5])
- Using hyperlinked text: [link to source 5](https://example.com)
- Using footnote style: ... library¹
- Making up source IDs when source_id is unknown
- Using old IEEE format: [1], [2], [3]
- Using source types instead of IDs: [citation:GITHUB_CONNECTOR] instead of [citation:5]
</citation_examples>
<citation_output_example>
Based on your GitHub repositories and video content, Python's asyncio library provides tools for writing concurrent code using the async/await syntax [citation:5]. It's particularly useful for I/O-bound and high-level structured network code [citation:5].
The key advantage of asyncio is that it can improve performance by allowing other code to run while waiting for I/O operations to complete [citation:12]. This makes it excellent for scenarios like web scraping, API calls, database operations, or any situation where your program spends time waiting for external resources.
However, from your video learning, it's important to note that asyncio is not suitable for CPU-bound tasks as it runs on a single thread [citation:12]. For computationally intensive work, you'd want to use multiprocessing instead.
</citation_output_example>
</citation_instructions>
"""
# Part 3: User's custom instructions (empty by default, can be set by user from UI)
DEFAULT_QNA_CUSTOM_INSTRUCTIONS = ""
# Full prompt with all parts combined (for backward compatibility and migration)
DEFAULT_QNA_CITATION_PROMPT = (
DEFAULT_QNA_BASE_PROMPT
+ DEFAULT_QNA_CITATION_INSTRUCTIONS
+ DEFAULT_QNA_CUSTOM_INSTRUCTIONS
)
DEFAULT_QNA_NO_DOCUMENTS_PROMPT = """Today's date: {date}
You are SurfSense, an advanced AI research assistant that provides helpful, detailed answers to user questions in a conversational manner.{language_instruction}
{chat_history_section}
<context>
The user has asked a question but there are no specific documents from their personal knowledge base available to answer it. You should provide a helpful response based on:
1. The conversation history and context
2. Your general knowledge and expertise
3. Understanding of the user's needs and interests based on our conversation
</context>
<instructions>
1. Provide a comprehensive, helpful answer to the user's question
2. Draw upon the conversation history to understand context and the user's specific needs
3. Use your general knowledge to provide accurate, detailed information
4. Be conversational and engaging, as if having a detailed discussion with the user
5. Acknowledge when you're drawing from general knowledge rather than their personal sources
6. Provide actionable insights and practical information when relevant
7. Structure your answer logically and clearly
8. If the question would benefit from personalized information from their knowledge base, gently suggest they might want to add relevant content to SurfSense
9. Be honest about limitations while still being maximally helpful
10. Maintain the helpful, knowledgeable tone that users expect from SurfSense
</instructions>
<format>
- Write in a clear, conversational tone suitable for detailed Q&A discussions
- Provide comprehensive answers that thoroughly address the user's question
- Use appropriate paragraphs and structure for readability
- No citations are needed since you're using general knowledge
- Be thorough and detailed in your explanations while remaining focused on the user's specific question
- If asking follow-up questions would be helpful, suggest them at the end of your response
- When appropriate, mention that adding relevant content to their SurfSense knowledge base could provide more personalized answers
</format>
<user_query_instructions>
When answering the user's question without access to their personal documents:
1. Review the chat history to understand conversation context and maintain continuity
2. Provide the most helpful and comprehensive answer possible using general knowledge
3. Be conversational and engaging
4. Draw upon conversation history for context
5. Be clear that you're providing general information
6. Suggest ways the user could get more personalized answers by expanding their knowledge base when relevant
</user_query_instructions>
"""

View file

@ -1,21 +0,0 @@
from langgraph.graph import StateGraph
from .configuration import Configuration
from .nodes import answer_question, rerank_documents
from .state import State
# Define a new graph
workflow = StateGraph(State, config_schema=Configuration)
# Add the nodes to the graph
workflow.add_node("rerank_documents", rerank_documents)
workflow.add_node("answer_question", answer_question)
# Connect the nodes
workflow.add_edge("__start__", "rerank_documents")
workflow.add_edge("rerank_documents", "answer_question")
workflow.add_edge("answer_question", "__end__")
# Compile the workflow into an executable graph
graph = workflow.compile()
graph.name = "SurfSense QnA Agent" # This defines the custom name in LangSmith

View file

@ -1,297 +0,0 @@
import datetime
from typing import Any
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.runnables import RunnableConfig
from langgraph.types import StreamWriter
from sqlalchemy import select
from app.db import SearchSpace
from app.services.reranker_service import RerankerService
from ..utils import (
calculate_token_count,
format_documents_section,
langchain_chat_history_to_str,
optimize_documents_for_token_limit,
)
from .configuration import Configuration
from .default_prompts import (
DEFAULT_QNA_BASE_PROMPT,
DEFAULT_QNA_CITATION_INSTRUCTIONS,
DEFAULT_QNA_NO_DOCUMENTS_PROMPT,
)
from .state import State
def _build_language_instruction(language: str | None = None):
"""Build language instruction for prompts."""
if language:
return f"\n\nIMPORTANT: Please respond in {language} language. All your responses, explanations, and analysis should be written in {language}."
return ""
def _build_chat_history_section(chat_history: str | None = None):
"""Build chat history section for prompts."""
if chat_history:
return f"""
<chat_history>
{chat_history if chat_history else "NO CHAT HISTORY PROVIDED"}
</chat_history>
"""
return """
<chat_history>
NO CHAT HISTORY PROVIDED
</chat_history>
"""
def _format_system_prompt(
prompt_template: str,
chat_history: str | None = None,
language: str | None = None,
):
"""Format a system prompt template with dynamic values."""
date = datetime.datetime.now().strftime("%Y-%m-%d")
language_instruction = _build_language_instruction(language)
chat_history_section = _build_chat_history_section(chat_history)
return prompt_template.format(
date=date,
language_instruction=language_instruction,
chat_history_section=chat_history_section,
)
async def rerank_documents(state: State, config: RunnableConfig) -> dict[str, Any]:
"""
Rerank the documents based on relevance to the user's question.
This node takes the relevant documents provided in the configuration,
reranks them using the reranker service based on the user's query,
and updates the state with the reranked documents.
Documents are now document-grouped with a `chunks` list. Reranking is done
using the concatenated `content` field, and the full structure (including
`chunks`) is preserved for proper citation formatting.
If reranking is disabled, returns the original documents without processing.
Returns:
Dict containing the reranked documents.
"""
# Get configuration and relevant documents
configuration = Configuration.from_runnable_config(config)
documents = configuration.relevant_documents
user_query = configuration.user_query
reformulated_query = configuration.reformulated_query
# If no documents were provided, return empty list
if not documents or len(documents) == 0:
return {"reranked_documents": []}
# Get reranker service from app config
reranker_service = RerankerService.get_reranker_instance()
# If reranking is not enabled, sort by existing score and return
if not reranker_service:
print("Reranking is disabled. Sorting documents by existing score.")
sorted_documents = sorted(
documents, key=lambda x: x.get("score", 0), reverse=True
)
return {"reranked_documents": sorted_documents}
# Perform reranking
try:
# Pass documents directly to reranker - it will use:
# - "content" (concatenated chunk text) for scoring
# - "chunk_id" (primary chunk id) for matching
# The full document structure including "chunks" is preserved
reranked_docs = reranker_service.rerank_documents(
user_query + "\n" + reformulated_query, documents
)
# Sort by score in descending order
reranked_docs.sort(key=lambda x: x.get("score", 0), reverse=True)
print(f"Reranked {len(reranked_docs)} documents for Q&A query: {user_query}")
return {"reranked_documents": reranked_docs}
except Exception as e:
print(f"Error during reranking: {e!s}")
# Fall back to original documents if reranking fails
return {"reranked_documents": documents}
async def answer_question(
state: State, config: RunnableConfig, writer: StreamWriter
) -> dict[str, Any]:
"""
Answer the user's question using the provided documents with real-time streaming.
This node takes the relevant documents provided in the configuration and uses
an LLM to generate a comprehensive answer to the user's question with
proper citations. The citations follow [citation:chunk_id] format using chunk IDs from the
`<chunk id='...'>` tags in the provided documents. If no documents are provided, it will use chat history to generate
an answer.
The response is streamed token-by-token for real-time updates to the frontend.
Returns:
Dict containing the final answer in the "final_answer" key.
"""
from app.services.llm_service import get_fast_llm
# Get configuration and relevant documents from configuration
configuration = Configuration.from_runnable_config(config)
documents = state.reranked_documents
user_query = configuration.user_query
search_space_id = configuration.search_space_id
language = configuration.language
# Get streaming service from state
streaming_service = state.streaming_service
# Fetch search space to get QnA configuration
result = await state.db_session.execute(
select(SearchSpace).where(SearchSpace.id == search_space_id)
)
search_space = result.scalar_one_or_none()
if not search_space:
error_message = f"Search space {search_space_id} not found"
print(error_message)
raise RuntimeError(error_message)
# Get QnA configuration from search space
citations_enabled = search_space.citations_enabled
custom_instructions_text = search_space.qna_custom_instructions or ""
# Use constants for base prompt and citation instructions
qna_base_prompt = DEFAULT_QNA_BASE_PROMPT
qna_citation_instructions = (
DEFAULT_QNA_CITATION_INSTRUCTIONS if citations_enabled else ""
)
qna_custom_instructions = (
f"\n<special_important_custom_instructions>\n{custom_instructions_text}\n</special_important_custom_instructions>"
if custom_instructions_text
else ""
)
# Get search space's fast LLM
llm = await get_fast_llm(state.db_session, search_space_id)
if not llm:
error_message = f"No fast LLM configured for search space {search_space_id}"
print(error_message)
raise RuntimeError(error_message)
# Determine if we have documents and optimize for token limits
has_documents_initially = documents and len(documents) > 0
chat_history_str = langchain_chat_history_to_str(state.chat_history)
if has_documents_initially:
# Compose the full citation prompt: base + citation instructions + custom instructions
full_citation_prompt_template = (
qna_base_prompt + qna_citation_instructions + qna_custom_instructions
)
# Create base message template for token calculation (without documents)
base_human_message_template = f"""
User's question:
<user_query>
{user_query}
</user_query>
Please provide a detailed, comprehensive answer to the user's question using the information from their personal knowledge sources. Make sure to cite all information appropriately and engage in a conversational manner.
"""
# Use initial system prompt for token calculation
initial_system_prompt = _format_system_prompt(
full_citation_prompt_template, chat_history_str, language
)
base_messages = [
SystemMessage(content=initial_system_prompt),
HumanMessage(content=base_human_message_template),
]
# Optimize documents to fit within token limits
optimized_documents, has_optimized_documents = (
optimize_documents_for_token_limit(documents, base_messages, llm.model)
)
# Update state based on optimization result
documents = optimized_documents
has_documents = has_optimized_documents
else:
has_documents = False
# Choose system prompt based on final document availability
# With documents: use base + citation instructions + custom instructions
# Without documents: use the default no-documents prompt from constants
if has_documents:
full_citation_prompt_template = (
qna_base_prompt + qna_citation_instructions + qna_custom_instructions
)
system_prompt = _format_system_prompt(
full_citation_prompt_template, chat_history_str, language
)
else:
system_prompt = _format_system_prompt(
DEFAULT_QNA_NO_DOCUMENTS_PROMPT + qna_custom_instructions,
chat_history_str,
language,
)
# Generate documents section
documents_text = (
format_documents_section(
documents, "Source material from your personal knowledge base"
)
if has_documents
else ""
)
# Create final human message content
instruction_text = (
"Please provide a detailed, comprehensive answer to the user's question using the information from their personal knowledge sources. Make sure to cite all information appropriately and engage in a conversational manner."
if has_documents
else "Please provide a helpful answer to the user's question based on our conversation history and your general knowledge. Engage in a conversational manner."
)
human_message_content = f"""
{documents_text}
User's question:
<user_query>
{user_query}
</user_query>
{instruction_text}
"""
# Create final messages for the LLM
messages_with_chat_history = [
SystemMessage(content=system_prompt),
HumanMessage(content=human_message_content),
]
# Log final token count
total_tokens = calculate_token_count(messages_with_chat_history, llm.model)
print(f"Final token count: {total_tokens}")
# Stream the LLM response token by token
final_answer = ""
async for chunk in llm.astream(messages_with_chat_history):
# Extract the content from the chunk
if hasattr(chunk, "content") and chunk.content:
token = chunk.content
final_answer += token
# Stream the token to the frontend via custom stream
if streaming_service:
writer({"yield_value": streaming_service.format_text_chunk(token)})
return {"final_answer": final_answer}

View file

@ -1,32 +0,0 @@
"""Define the state structures for the agent."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession
from app.services.streaming_service import StreamingService
@dataclass
class State:
"""Defines the dynamic state for the Q&A agent during execution.
This state tracks the database session, chat history, and the outputs
generated by the agent's nodes during question answering.
See: https://langchain-ai.github.io/langgraph/concepts/low_level/#state
for more information.
"""
# Runtime context
db_session: AsyncSession
# Streaming service for real-time token streaming
streaming_service: StreamingService | None = None
chat_history: list[Any] | None = field(default_factory=list)
# OUTPUT: Populated by agent nodes
reranked_documents: list[Any] | None = None
final_answer: str | None = None

View file

@ -1,38 +0,0 @@
"""Define the state structures for the agent."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession
from app.services.streaming_service import StreamingService
@dataclass
class State:
"""Defines the dynamic state for the agent during execution.
This state tracks the database session and the outputs generated by the agent's nodes.
See: https://langchain-ai.github.io/langgraph/concepts/low_level/#state
for more information.
"""
# Runtime context (not part of actual graph state)
db_session: AsyncSession
# Streaming service
streaming_service: StreamingService
chat_history: list[Any] | None = field(default_factory=list)
reformulated_query: str | None = field(default=None)
further_questions: Any | None = field(default=None)
# Temporary field to hold reranked documents from sub-agents for further question generation
reranked_documents: list[Any] | None = field(default=None)
# OUTPUT: Populated by agent nodes
# Using field to explicitly mark as part of state
final_written_report: str | None = field(default=None)

View file

@ -1,292 +0,0 @@
import json
from typing import Any, NamedTuple
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, SystemMessage
from litellm import get_model_info, token_counter
class DocumentTokenInfo(NamedTuple):
"""Information about a document and its token cost."""
index: int
document: dict[str, Any]
formatted_content: str
token_count: int
def get_connector_emoji(connector_name: str) -> str:
"""Get an appropriate emoji for a connector type."""
connector_emojis = {
"YOUTUBE_VIDEO": "📹",
"EXTENSION": "🧩",
"FILE": "📄",
"SLACK_CONNECTOR": "💬",
"NOTION_CONNECTOR": "📘",
"GITHUB_CONNECTOR": "🐙",
"LINEAR_CONNECTOR": "📊",
"JIRA_CONNECTOR": "🎫",
"DISCORD_CONNECTOR": "🗨️",
"TAVILY_API": "🔍",
"LINKUP_API": "🔗",
"BAIDU_SEARCH_API": "🇨🇳",
"GOOGLE_CALENDAR_CONNECTOR": "📅",
"AIRTABLE_CONNECTOR": "🗃️",
"LUMA_CONNECTOR": "",
"ELASTICSEARCH_CONNECTOR": "",
"WEBCRAWLER_CONNECTOR": "🌐",
"BOOKSTACK_CONNECTOR": "📚",
"NOTE": "📝",
}
return connector_emojis.get(connector_name, "🔎")
def get_connector_friendly_name(connector_name: str) -> str:
"""Convert technical connector IDs to user-friendly names."""
connector_friendly_names = {
"YOUTUBE_VIDEO": "YouTube",
"EXTENSION": "Browser Extension",
"FILE": "Files",
"SLACK_CONNECTOR": "Slack",
"NOTION_CONNECTOR": "Notion",
"GITHUB_CONNECTOR": "GitHub",
"LINEAR_CONNECTOR": "Linear",
"JIRA_CONNECTOR": "Jira",
"CONFLUENCE_CONNECTOR": "Confluence",
"GOOGLE_CALENDAR_CONNECTOR": "Google Calendar",
"DISCORD_CONNECTOR": "Discord",
"TAVILY_API": "Tavily Search",
"LINKUP_API": "Linkup Search",
"BAIDU_SEARCH_API": "Baidu Search",
"AIRTABLE_CONNECTOR": "Airtable",
"LUMA_CONNECTOR": "Luma",
"ELASTICSEARCH_CONNECTOR": "Elasticsearch",
"WEBCRAWLER_CONNECTOR": "Web Pages",
"BOOKSTACK_CONNECTOR": "BookStack",
"NOTE": "Notes",
}
return connector_friendly_names.get(connector_name, connector_name)
def convert_langchain_messages_to_dict(
messages: list[BaseMessage],
) -> list[dict[str, str]]:
"""Convert LangChain messages to format expected by token_counter."""
role_mapping = {"system": "system", "human": "user", "ai": "assistant"}
converted_messages = []
for msg in messages:
role = role_mapping.get(getattr(msg, "type", None), "user")
converted_messages.append({"role": role, "content": str(msg.content)})
return converted_messages
def format_document_for_citation(document: dict[str, Any]) -> str:
"""Format a single document for citation in the new document+chunks XML format.
IMPORTANT:
- Citations must reference real DB chunk IDs: `[citation:<chunk_id>]`
- Document metadata is included under <document_metadata>, but citations are NOT document_id-based.
"""
def _to_cdata(value: Any) -> str:
text = "" if value is None else str(value)
# Safely nest CDATA even if the content includes "]]>"
return "<![CDATA[" + text.replace("]]>", "]]]]><![CDATA[>") + "]]>"
doc_info = document.get("document", {}) or {}
metadata = doc_info.get("metadata", {}) or {}
doc_id = doc_info.get("id", "")
title = doc_info.get("title", "")
document_type = doc_info.get("document_type", "CRAWLED_URL")
url = (
metadata.get("url")
or metadata.get("source")
or metadata.get("page_url")
or metadata.get("VisitedWebPageURL")
or ""
)
metadata_json = json.dumps(metadata, ensure_ascii=False)
chunks = document.get("chunks") or []
if not chunks:
# Fallback: treat `content` as a single chunk (no chunk_id available for citation)
chunks = [{"chunk_id": "", "content": document.get("content", "")}]
chunks_xml = "\n".join(
[
f"<chunk id='{chunk.get('chunk_id', '')}'>{_to_cdata(chunk.get('content', ''))}</chunk>"
for chunk in chunks
]
)
return f"""<document>
<document_metadata>
<document_id>{doc_id}</document_id>
<document_type>{document_type}</document_type>
<title>{_to_cdata(title)}</title>
<url>{_to_cdata(url)}</url>
<metadata_json>{_to_cdata(metadata_json)}</metadata_json>
</document_metadata>
<document_content>
{chunks_xml}
</document_content>
</document>"""
def format_documents_section(
documents: list[dict[str, Any]], section_title: str = "Source material"
) -> str:
"""Format multiple documents into a complete documents section."""
if not documents:
return ""
formatted_docs = [format_document_for_citation(doc) for doc in documents]
return f"""{section_title}:
<documents>
{chr(10).join(formatted_docs)}
</documents>"""
def calculate_document_token_costs(
documents: list[dict[str, Any]], model: str
) -> list[DocumentTokenInfo]:
"""Pre-calculate token costs for each document."""
document_token_info = []
for i, doc in enumerate(documents):
formatted_doc = format_document_for_citation(doc)
# Calculate token count for this document
token_count = token_counter(
messages=[{"role": "user", "content": formatted_doc}], model=model
)
document_token_info.append(
DocumentTokenInfo(
index=i,
document=doc,
formatted_content=formatted_doc,
token_count=token_count,
)
)
return document_token_info
def find_optimal_documents_with_binary_search(
document_tokens: list[DocumentTokenInfo], available_tokens: int
) -> list[DocumentTokenInfo]:
"""Use binary search to find the maximum number of documents that fit within token limit."""
if not document_tokens or available_tokens <= 0:
return []
left, right = 0, len(document_tokens)
optimal_docs = []
while left <= right:
mid = (left + right) // 2
current_docs = document_tokens[:mid]
current_token_sum = sum(doc_info.token_count for doc_info in current_docs)
if current_token_sum <= available_tokens:
optimal_docs = current_docs
left = mid + 1
else:
right = mid - 1
return optimal_docs
def get_model_context_window(model_name: str) -> int:
"""Get the total context window size for a model (input + output tokens)."""
try:
model_info = get_model_info(model_name)
context_window = model_info.get("max_input_tokens", 4096) # Default fallback
return context_window
except Exception as e:
print(
f"Warning: Could not get model info for {model_name}, using default 4096 tokens. Error: {e}"
)
return 4096 # Conservative fallback
def optimize_documents_for_token_limit(
documents: list[dict[str, Any]], base_messages: list[BaseMessage], model_name: str
) -> tuple[list[dict[str, Any]], bool]:
"""
Optimize documents to fit within token limits using binary search.
Args:
documents: List of documents with content and metadata
base_messages: Base messages without documents (chat history + system + human message template)
model_name: Model name for token counting (required)
output_token_buffer: Number of tokens to reserve for model output
Returns:
Tuple of (optimized_documents, has_documents_remaining)
"""
if not documents:
return [], False
model = model_name
context_window = get_model_context_window(model)
# Calculate base token cost
base_messages_dict = convert_langchain_messages_to_dict(base_messages)
base_tokens = token_counter(messages=base_messages_dict, model=model)
available_tokens_for_docs = context_window - base_tokens
print(
f"Token optimization: Context window={context_window}, Base={base_tokens}, Available for docs={available_tokens_for_docs}"
)
if available_tokens_for_docs <= 0:
print("No tokens available for documents after base content and output buffer")
return [], False
# Calculate token costs for all documents
document_token_info = calculate_document_token_costs(documents, model)
# Find optimal number of documents using binary search
optimal_doc_info = find_optimal_documents_with_binary_search(
document_token_info, available_tokens_for_docs
)
# Extract the original document objects
optimized_documents = [doc_info.document for doc_info in optimal_doc_info]
has_documents_remaining = len(optimized_documents) > 0
print(
f"Token optimization result: Using {len(optimized_documents)}/{len(documents)} documents"
)
return optimized_documents, has_documents_remaining
def calculate_token_count(messages: list[BaseMessage], model_name: str) -> int:
"""Calculate token count for a list of LangChain messages."""
model = model_name
messages_dict = convert_langchain_messages_to_dict(messages)
return token_counter(messages=messages_dict, model=model)
def langchain_chat_history_to_str(chat_history: list[BaseMessage]) -> str:
"""
Convert a list of chat history messages to a string.
"""
chat_history_str = ""
for chat_message in chat_history:
if isinstance(chat_message, HumanMessage):
chat_history_str += f"<user>{chat_message.content}</user>\n"
elif isinstance(chat_message, AIMessage):
chat_history_str += f"<assistant>{chat_message.content}</assistant>\n"
elif isinstance(chat_message, SystemMessage):
chat_history_str += f"<system>{chat_message.content}</system>\n"
return chat_history_str