diff --git a/surfsense_backend/app/agents/researcher/nodes.py b/surfsense_backend/app/agents/researcher/nodes.py index 7ecdaa213..491840589 100644 --- a/surfsense_backend/app/agents/researcher/nodes.py +++ b/surfsense_backend/app/agents/researcher/nodes.py @@ -30,51 +30,59 @@ def extract_sources_from_documents( all_documents: list[dict[str, Any]], ) -> list[dict[str, Any]]: """ - Extract sources from all_documents and group them by document type. + Extract sources from **document-grouped** results and group them by document type. Args: - all_documents: List of document chunks from user-selected documents and connector-fetched documents + all_documents: List of document-grouped results from user-selected documents and connector-fetched documents Returns: List of source objects grouped by type for streaming """ - # Group documents by their source type + # Group sources by their source type documents_by_type = {} for doc in all_documents: - # Get source type from the document + document_info = doc.get("document", {}) or {} source_type = doc.get("source", "UNKNOWN") - document_info = doc.get("document", {}) - document_type = document_info.get("document_type", source_type) - - # Use document_type if available, otherwise use source + document_type = document_info.get("document_type", source_type) or source_type group_type = document_type if document_type != "UNKNOWN" else source_type - if group_type not in documents_by_type: documents_by_type[group_type] = [] documents_by_type[group_type].append(doc) # Create source objects for each document type source_objects = [] - source_id_counter = 1 - for doc_type, docs in documents_by_type.items(): sources_list = [] for doc in docs: document_info = doc.get("document", {}) metadata = document_info.get("metadata", {}) + url = ( + metadata.get("url") + or metadata.get("source") + or metadata.get("page_url") + or metadata.get("VisitedWebPageURL") + or "" + ) - # Create source entry based on document structure - source = { - "id": doc.get("chunk_id", source_id_counter), - "title": document_info.get("title", "Untitled Document"), - "description": doc.get("content", "").strip(), - "url": metadata.get("url", metadata.get("page_url", "")), - } - - source_id_counter += 1 - sources_list.append(source) + # Each chunk becomes a source entry so citations like [citation:] resolve in UI. + for chunk in doc.get("chunks", []) or []: + chunk_id = chunk.get("chunk_id") + chunk_content = (chunk.get("content") or "").strip() + description = ( + chunk_content + if len(chunk_content) <= 240 + else chunk_content[:240] + "..." + ) + sources_list.append( + { + "id": chunk_id, + "title": document_info.get("title", "Untitled Document"), + "description": description, + "url": url, + } + ) # Create group object group_name = ( @@ -127,50 +135,40 @@ async def fetch_documents_by_ids( documents = result.scalars().all() # Group documents by type for source object creation - documents_by_type = {} - formatted_documents = [] + documents_by_type: dict[str, list[Document]] = {} + formatted_documents: list[dict[str, Any]] = [] + + from app.db import Chunk for doc in documents: - # Fetch associated chunks for this document (similar to DocumentHybridSearchRetriever) - from app.db import Chunk - + # Fetch associated chunks for this document chunks_query = ( select(Chunk).where(Chunk.document_id == doc.id).order_by(Chunk.id) ) chunks_result = await db_session.execute(chunks_query) chunks = chunks_result.scalars().all() - # Return individual chunks instead of concatenated content - if chunks: - for chunk in chunks: - # Format each chunk to match connector service return format - formatted_chunk = { - "chunk_id": chunk.id, - "content": chunk.content, # Use individual chunk content - "score": 0.5, # High score since user explicitly selected these - "document": { - "id": chunk.id, - "title": doc.title, - "document_type": ( - doc.document_type.value - if doc.document_type - else "UNKNOWN" - ), - "metadata": doc.document_metadata or {}, - }, - "source": doc.document_type.value - if doc.document_type - else "UNKNOWN", - } - formatted_documents.append(formatted_chunk) + doc_type = doc.document_type.value if doc.document_type else "UNKNOWN" + documents_by_type.setdefault(doc_type, []).append(doc) - # Group by document type for source objects - doc_type = ( - doc.document_type.value if doc.document_type else "UNKNOWN" - ) - if doc_type not in documents_by_type: - documents_by_type[doc_type] = [] - documents_by_type[doc_type].append(doc) + doc_group = { + "document_id": doc.id, + "content": "\n\n".join(c.content for c in chunks) + if chunks + else (doc.content or ""), + "score": 0.5, # High score since user explicitly selected these + "chunks": [{"chunk_id": c.id, "content": c.content} for c in chunks] + if chunks + else [], + "document": { + "id": doc.id, + "title": doc.title, + "document_type": doc_type, + "metadata": doc.document_metadata or {}, + }, + "source": doc_type, + } + formatted_documents.append(doc_group) # Create source objects for each document type (similar to ConnectorService) source_objects = [] @@ -1265,25 +1263,22 @@ async def fetch_relevant_documents( } ) - # Deduplicate raw documents based on chunk_id or content - seen_chunk_ids = set() + # Deduplicate raw documents based on document_id (preferred) or content hash + seen_doc_ids = set() seen_content_hashes = set() - deduplicated_docs = [] + deduplicated_docs: list[dict[str, Any]] = [] for doc in all_raw_documents: - chunk_id = doc.get("chunk_id") - content = doc.get("content", "") + doc_id = (doc.get("document", {}) or {}).get("id") + content = doc.get("content", "") or "" content_hash = hash(content) - # Skip if we've seen this chunk_id or content before - if ( - chunk_id and chunk_id in seen_chunk_ids - ) or content_hash in seen_content_hashes: + # Skip if we've seen this document_id or content before + if (doc_id and doc_id in seen_doc_ids) or content_hash in seen_content_hashes: continue - # Add to our tracking sets and keep this document - if chunk_id: - seen_chunk_ids.add(chunk_id) + if doc_id: + seen_doc_ids.add(doc_id) seen_content_hashes.add(content_hash) deduplicated_docs.append(doc) @@ -1292,7 +1287,7 @@ async def fetch_relevant_documents( writer( { "yield_value": streaming_service.format_terminal_info_delta( - f"🧹 Found {len(deduplicated_docs)} unique document chunks after removing duplicates" + f"🧹 Found {len(deduplicated_docs)} unique documents after removing duplicates" ) } ) diff --git a/surfsense_backend/app/agents/researcher/prompts.py b/surfsense_backend/app/agents/researcher/prompts.py index 96de760b6..794a594f2 100644 --- a/surfsense_backend/app/agents/researcher/prompts.py +++ b/surfsense_backend/app/agents/researcher/prompts.py @@ -16,7 +16,7 @@ You are an expert research assistant specializing in generating contextually rel - chat_history: Provided in XML format within tags, containing and message pairs that show the chronological conversation flow. This provides context about what has already been discussed. -- available_documents: Provided in XML format within tags, containing individual elements with (source_id, source_type) and sections. This helps understand what information is accessible for answering potential follow-up questions. +- available_documents: Provided in XML format within tags, containing individual elements with and sections. Each document contains multiple `...` blocks inside . This helps understand what information is accessible for answering potential follow-up questions. diff --git a/surfsense_backend/app/agents/researcher/qna_agent/default_prompts.py b/surfsense_backend/app/agents/researcher/qna_agent/default_prompts.py index 7b5d251fe..72ae636cb 100644 --- a/surfsense_backend/app/agents/researcher/qna_agent/default_prompts.py +++ b/surfsense_backend/app/agents/researcher/qna_agent/default_prompts.py @@ -78,32 +78,53 @@ DEFAULT_QNA_CITATION_INSTRUCTIONS = """ CRITICAL CITATION REQUIREMENTS: -1. For EVERY piece of information you include from the documents, add a citation in the format [citation:knowledge_source_id] where knowledge_source_id is the source_id from the document's metadata. +1. For EVERY piece of information you include from the documents, add a citation in the format [citation:chunk_id] where chunk_id is the exact value from the `` tag inside ``. 2. Make sure ALL factual statements from the documents have proper citations. -3. If multiple documents support the same point, include all relevant citations [citation:source_id1], [citation:source_id2]. -4. You MUST use the exact source_id value from each document's metadata for citations. Do not create your own citation numbers. -5. Every citation MUST be in the format [citation:knowledge_source_id] where knowledge_source_id is the exact source_id value. -6. Never modify or change the source_id - always use the original values exactly as provided in the metadata. +3. If multiple chunks support the same point, include all relevant citations [citation:chunk_id1], [citation:chunk_id2]. +4. You MUST use the exact chunk_id values from the `` attributes. Do not create your own citation numbers. +5. Every citation MUST be in the format [citation:chunk_id] where chunk_id is the exact chunk id value. +6. Never modify or change the chunk_id - always use the original values exactly as provided in the chunk tags. 7. Do not return citations as clickable links. 8. Never format citations as markdown links like "([citation:5](https://example.com))". Always use plain square brackets only. -9. Citations must ONLY appear as [citation:source_id] or [citation:source_id1], [citation:source_id2] format - never with parentheses, hyperlinks, or other formatting. -10. Never make up source IDs. Only use source_id values that are explicitly provided in the document metadata. -11. If you are unsure about a source_id, do not include a citation rather than guessing or making one up. +9. Citations must ONLY appear as [citation:chunk_id] or [citation:chunk_id1], [citation:chunk_id2] format - never with parentheses, hyperlinks, or other formatting. +10. Never make up chunk IDs. Only use chunk_id values that are explicitly provided in the `` tags. +11. If you are unsure about a chunk_id, do not include a citation rather than guessing or making one up. + + +The documents you receive are structured like this: + + + + 42 + GITHUB_CONNECTOR + <![CDATA[Some repo / file / issue title]]> + + + + + + + + + + +IMPORTANT: You MUST cite using the chunk ids (e.g. 123, 124). Do NOT cite document_id. + -- Every fact from the documents must have a citation in the format [citation:knowledge_source_id] where knowledge_source_id is the EXACT source_id from the document's metadata +- Every fact from the documents must have a citation in the format [citation:chunk_id] where chunk_id is the EXACT id value from a `` tag - Citations should appear at the end of the sentence containing the information they support -- Multiple citations should be separated by commas: [citation:source_id1], [citation:source_id2], [citation:source_id3] +- Multiple citations should be separated by commas: [citation:chunk_id1], [citation:chunk_id2], [citation:chunk_id3] - No need to return references section. Just citations in answer. -- NEVER create your own citation format - use the exact source_id values from the documents in the [citation:source_id] format +- NEVER create your own citation format - use the exact chunk_id values from the documents in the [citation:chunk_id] format - NEVER format citations as clickable links or as markdown links like "([citation:5](https://example.com))". Always use plain square brackets only -- NEVER make up source IDs if you are unsure about the source_id. It is better to omit the citation than to guess +- NEVER make up chunk IDs if you are unsure about the chunk_id. It is better to omit the citation than to guess CORRECT citation formats: - [citation:5] -- [citation:source_id1], [citation:source_id2], [citation:source_id3] +- [citation:chunk_id1], [citation:chunk_id2], [citation:chunk_id3] INCORRECT citation formats (DO NOT use): - Using parentheses and markdown links: ([citation:5](https://github.com/MODSetter/SurfSense)) diff --git a/surfsense_backend/app/agents/researcher/qna_agent/nodes.py b/surfsense_backend/app/agents/researcher/qna_agent/nodes.py index 1debf0cdf..28c35a20b 100644 --- a/surfsense_backend/app/agents/researcher/qna_agent/nodes.py +++ b/surfsense_backend/app/agents/researcher/qna_agent/nodes.py @@ -71,6 +71,10 @@ async def rerank_documents(state: State, config: RunnableConfig) -> dict[str, An reranks them using the reranker service based on the user's query, and updates the state with the reranked documents. + Documents are now document-grouped with a `chunks` list. Reranking is done + using the concatenated `content` field, and the full structure (including + `chunks`) is preserved for proper citation formatting. + If reranking is disabled, returns the original documents without processing. Returns: @@ -99,25 +103,12 @@ async def rerank_documents(state: State, config: RunnableConfig) -> dict[str, An # Perform reranking try: - # Convert documents to format expected by reranker if needed - reranker_input_docs = [ - { - "chunk_id": doc.get("chunk_id", f"chunk_{i}"), - "content": doc.get("content", ""), - "score": doc.get("score", 0.0), - "document": { - "id": doc.get("document", {}).get("id", ""), - "title": doc.get("document", {}).get("title", ""), - "document_type": doc.get("document", {}).get("document_type", ""), - "metadata": doc.get("document", {}).get("metadata", {}), - }, - } - for i, doc in enumerate(documents) - ] - - # Rerank documents using the user's query + # Pass documents directly to reranker - it will use: + # - "content" (concatenated chunk text) for scoring + # - "chunk_id" (primary chunk id) for matching + # The full document structure including "chunks" is preserved reranked_docs = reranker_service.rerank_documents( - user_query + "\n" + reformulated_query, reranker_input_docs + user_query + "\n" + reformulated_query, documents ) # Sort by score in descending order @@ -141,8 +132,8 @@ async def answer_question( This node takes the relevant documents provided in the configuration and uses an LLM to generate a comprehensive answer to the user's question with - proper citations. The citations follow [citation:source_id] format using source IDs from the - documents. If no documents are provided, it will use chat history to generate + proper citations. The citations follow [citation:chunk_id] format using chunk IDs from the + `` tags in the provided documents. If no documents are provided, it will use chat history to generate an answer. The response is streamed token-by-token for real-time updates to the frontend. diff --git a/surfsense_backend/app/agents/researcher/utils.py b/surfsense_backend/app/agents/researcher/utils.py index 41d5a1f55..ba2a76e69 100644 --- a/surfsense_backend/app/agents/researcher/utils.py +++ b/surfsense_backend/app/agents/researcher/utils.py @@ -1,3 +1,4 @@ +import json from typing import Any, NamedTuple from langchain.schema import AIMessage, HumanMessage, SystemMessage @@ -78,21 +79,59 @@ def convert_langchain_messages_to_dict( def format_document_for_citation(document: dict[str, Any]) -> str: - """Format a single document for citation in the standard XML format.""" - content = document.get("content", "") - doc_info = document.get("document", {}) - document_id = document.get("chunk_id", "") + """Format a single document for citation in the new document+chunks XML format. + + IMPORTANT: + - Citations must reference real DB chunk IDs: `[citation:]` + - Document metadata is included under , but citations are NOT document_id-based. + """ + + def _to_cdata(value: Any) -> str: + text = "" if value is None else str(value) + # Safely nest CDATA even if the content includes "]]>" + return "", "]]]]>") + "]]>" + + doc_info = document.get("document", {}) or {} + metadata = doc_info.get("metadata", {}) or {} + + doc_id = doc_info.get("id", "") + title = doc_info.get("title", "") document_type = doc_info.get("document_type", "CRAWLED_URL") + url = ( + metadata.get("url") + or metadata.get("source") + or metadata.get("page_url") + or metadata.get("VisitedWebPageURL") + or "" + ) + + metadata_json = json.dumps(metadata, ensure_ascii=False) + + chunks = document.get("chunks") or [] + if not chunks: + # Fallback: treat `content` as a single chunk (no chunk_id available for citation) + chunks = [{"chunk_id": "", "content": document.get("content", "")}] + + chunks_xml = "\n".join( + [ + f"{_to_cdata(chunk.get('content', ''))}" + for chunk in chunks + ] + ) return f""" - - {document_id} - {document_type} - - - {content} - - """ + +{doc_id} +{document_type} +{_to_cdata(title)} +{_to_cdata(url)} +{_to_cdata(metadata_json)} + + + +{chunks_xml} + +""" def format_documents_section( diff --git a/surfsense_backend/app/retriever/chunks_hybrid_search.py b/surfsense_backend/app/retriever/chunks_hybrid_search.py index 017f36088..9aa301386 100644 --- a/surfsense_backend/app/retriever/chunks_hybrid_search.py +++ b/surfsense_backend/app/retriever/chunks_hybrid_search.py @@ -131,18 +131,25 @@ class ChucksHybridSearchRetriever: end_date: datetime | None = None, ) -> list: """ - Combine vector similarity and full-text search results using Reciprocal Rank Fusion. + Hybrid search that returns **documents** (not individual chunks). + + Each returned item is a document-grouped dict that preserves real DB chunk IDs so + downstream agents can cite with `[citation:]`. Args: query_text: The search query text - top_k: Number of results to return + top_k: Number of documents to return search_space_id: The search space ID to search within document_type: Optional document type to filter results (e.g., "FILE", "CRAWLED_URL") start_date: Optional start date for filtering documents by updated_at end_date: Optional end date for filtering documents by updated_at Returns: - List of dictionaries containing chunk data and relevance scores + List of dictionaries containing document data and relevance scores. Each dict contains: + - chunk_id: a "primary" chunk id for compatibility (best-ranked chunk for the doc) + - content: concatenated chunk content (useful for reranking) + - chunks: list[{chunk_id, content}] for citation-aware prompting + - document: {id, title, document_type, metadata} """ from sqlalchemy import func, select, text from sqlalchemy.orm import joinedload @@ -154,9 +161,9 @@ class ChucksHybridSearchRetriever: embedding_model = config.embedding_model_instance query_embedding = embedding_model.embed(query_text) - # Constants for RRF calculation - k = 60 # Constant for RRF calculation - n_results = top_k * 2 # Get more results for better fusion + # RRF constants + k = 60 + n_results = top_k * 5 # Fetch extra chunks for better document-level fusion # Create tsvector and tsquery for PostgreSQL full-text search tsvector = func.to_tsvector("english", Chunk.content) @@ -255,10 +262,10 @@ class ChucksHybridSearchRetriever: if not chunks_with_scores: return [] - # Convert to serializable dictionaries if no reranker is available or if reranking failed - serialized_results = [] + # Convert to serializable dictionaries + serialized_chunk_results: list[dict] = [] for chunk, score in chunks_with_scores: - serialized_results.append( + serialized_chunk_results.append( { "chunk_id": chunk.id, "content": chunk.content, @@ -274,4 +281,77 @@ class ChucksHybridSearchRetriever: } ) - return serialized_results + # Group by document, preserving ranking order by best chunk rank + doc_scores: dict[int, float] = {} + doc_order: list[int] = [] + for item in serialized_chunk_results: + doc_id = item.get("document", {}).get("id") + if doc_id is None: + continue + if doc_id not in doc_scores: + doc_scores[doc_id] = item.get("score", 0.0) + doc_order.append(doc_id) + else: + # Use the best score as doc score + doc_scores[doc_id] = max(doc_scores[doc_id], item.get("score", 0.0)) + + # Keep only top_k documents by initial rank order. + doc_ids = doc_order[:top_k] + if not doc_ids: + return [] + + # Fetch ALL chunks for selected documents in a single query so the final prompt can cite + # any chunk from those documents. + chunk_query = ( + select(Chunk) + .options(joinedload(Chunk.document)) + .join(Document, Chunk.document_id == Document.id) + .where(Document.id.in_(doc_ids)) + .where(*base_conditions) + .order_by(Chunk.document_id, Chunk.id) + ) + chunks_result = await self.db_session.execute(chunk_query) + all_chunks = chunks_result.scalars().all() + + # Assemble final doc-grouped results in the same order as doc_ids + doc_map: dict[int, dict] = { + doc_id: { + "document_id": doc_id, + "content": "", + "score": float(doc_scores.get(doc_id, 0.0)), + "chunks": [], + "document": {}, + "source": None, + } + for doc_id in doc_ids + } + + for chunk in all_chunks: + doc = chunk.document + doc_id = doc.id + if doc_id not in doc_map: + continue + doc_entry = doc_map[doc_id] + doc_entry["document"] = { + "id": doc.id, + "title": doc.title, + "document_type": doc.document_type.value + if getattr(doc, "document_type", None) + else None, + "metadata": doc.document_metadata or {}, + } + doc_entry["source"] = ( + doc.document_type.value if getattr(doc, "document_type", None) else None + ) + doc_entry["chunks"].append({"chunk_id": chunk.id, "content": chunk.content}) + + # Fill concatenated content (useful for reranking) + final_docs: list[dict] = [] + for doc_id in doc_ids: + entry = doc_map[doc_id] + entry["content"] = "\n\n".join( + c["content"] for c in entry.get("chunks", []) if c.get("content") + ) + final_docs.append(entry) + + return final_docs diff --git a/surfsense_backend/app/retriever/documents_hybrid_search.py b/surfsense_backend/app/retriever/documents_hybrid_search.py index ba3243a96..9ff104ff0 100644 --- a/surfsense_backend/app/retriever/documents_hybrid_search.py +++ b/surfsense_backend/app/retriever/documents_hybrid_search.py @@ -131,11 +131,14 @@ class DocumentHybridSearchRetriever: end_date: datetime | None = None, ) -> list: """ - Combine vector similarity and full-text search results using Reciprocal Rank Fusion. + Hybrid search that returns **documents** (not individual chunks). + + Each returned item is a document-grouped dict that preserves real DB chunk IDs so + downstream agents can cite with `[citation:]`. Args: query_text: The search query text - top_k: Number of results to return + top_k: Number of documents to return search_space_id: The search space ID to search within document_type: Optional document type to filter results (e.g., "FILE", "CRAWLED_URL") start_date: Optional start date for filtering documents by updated_at @@ -146,15 +149,15 @@ class DocumentHybridSearchRetriever: from sqlalchemy.orm import joinedload from app.config import config - from app.db import Document, DocumentType + from app.db import Chunk, Document, DocumentType # Get embedding for the query embedding_model = config.embedding_model_instance query_embedding = embedding_model.embed(query_text) - # Constants for RRF calculation - k = 60 # Constant for RRF calculation - n_results = top_k * 2 # Get more results for better fusion + # RRF constants + k = 60 + n_results = top_k * 2 # Fetch extra documents for better fusion # Create tsvector and tsquery for PostgreSQL full-text search tsvector = func.to_tsvector("english", Document.content) @@ -248,50 +251,56 @@ class DocumentHybridSearchRetriever: if not documents_with_scores: return [] - # Convert to serializable dictionaries - return individual chunks - serialized_results = [] - for document, score in documents_with_scores: - # Fetch associated chunks for this document - from sqlalchemy import select + # Collect document IDs for chunk fetching + doc_ids: list[int] = [doc.id for doc, _score in documents_with_scores] - from app.db import Chunk + # Fetch ALL chunks for these documents in a single query + chunks_query = ( + select(Chunk) + .options(joinedload(Chunk.document)) + .where(Chunk.document_id.in_(doc_ids)) + .order_by(Chunk.document_id, Chunk.id) + ) + chunks_result = await self.db_session.execute(chunks_query) + chunks = chunks_result.scalars().all() - chunks_query = ( - select(Chunk).where(Chunk.document_id == document.id).order_by(Chunk.id) + # Assemble doc-grouped results + doc_map: dict[int, dict] = { + doc.id: { + "document_id": doc.id, + "content": "", + "score": float(score), + "chunks": [], + "document": { + "id": doc.id, + "title": doc.title, + "document_type": doc.document_type.value + if getattr(doc, "document_type", None) + else None, + "metadata": doc.document_metadata or {}, + }, + "source": doc.document_type.value + if getattr(doc, "document_type", None) + else None, + } + for doc, score in documents_with_scores + } + + for chunk in chunks: + doc_id = chunk.document_id + if doc_id not in doc_map: + continue + doc_map[doc_id]["chunks"].append( + {"chunk_id": chunk.id, "content": chunk.content} ) - chunks_result = await self.db_session.execute(chunks_query) - chunks = chunks_result.scalars().all() - # Return individual chunks instead of concatenated content - if chunks: - for chunk in chunks: - serialized_results.append( - { - "document_id": chunk.id, - "title": document.title, - "content": chunk.content, # Use chunk content instead of document content - "document_type": document.document_type.value - if hasattr(document, "document_type") - else None, - "metadata": document.document_metadata, - "score": float(score), # Ensure score is a Python float - "search_space_id": document.search_space_id, - } - ) - else: - # If no chunks exist, return the document content as a single result - serialized_results.append( - { - "document_id": document.id, - "title": document.title, - "content": document.content, - "document_type": document.document_type.value - if hasattr(document, "document_type") - else None, - "metadata": document.document_metadata, - "score": float(score), # Ensure score is a Python float - "search_space_id": document.search_space_id, - } - ) + # Fill concatenated content (useful for reranking) + final_docs: list[dict] = [] + for doc_id in doc_ids: + entry = doc_map[doc_id] + entry["content"] = "\n\n".join( + c["content"] for c in entry.get("chunks", []) if c.get("content") + ) + final_docs.append(entry) - return serialized_results + return final_docs diff --git a/surfsense_backend/app/services/connector_service.py b/surfsense_backend/app/services/connector_service.py index 7c6f5acb4..cac1b7f47 100644 --- a/surfsense_backend/app/services/connector_service.py +++ b/surfsense_backend/app/services/connector_service.py @@ -80,7 +80,7 @@ class ConnectorService: Returns: tuple: (sources_info, langchain_documents) """ - crawled_urls_chunks = await self._combined_rrf_search( + crawled_urls_docs = await self._combined_rrf_search( query_text=user_query, search_space_id=search_space_id, document_type="CRAWLED_URL", @@ -90,7 +90,7 @@ class ConnectorService: ) # Early return if no results - if not crawled_urls_chunks: + if not crawled_urls_docs: return { "id": 1, "name": "Crawled URLs", @@ -98,55 +98,44 @@ class ConnectorService: "sources": [], }, [] - # Process each chunk and create sources directly without deduplication - sources_list = [] - async with self.counter_lock: - for _i, chunk in enumerate(crawled_urls_chunks): - # Extract document metadata - document = chunk.get("document", {}) - metadata = document.get("metadata", {}) + def _title_fn(doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + return doc_info.get("title") or metadata.get("title") or "Untitled Document" - # Extract webcrawler-specific metadata - url = metadata.get("source", metadata.get("url", "")) - title = document.get( - "title", metadata.get("title", "Untitled Document") - ) - description = metadata.get("description", "") - language = metadata.get("language", "") - last_crawled_at = metadata.get("last_crawled_at", "") + def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + return metadata.get("source") or metadata.get("url") or "" - # Build description with crawler info - content_preview = chunk.get("content", "") - if not description and content_preview: - # Use content preview if no description - description = content_preview[:200] - if len(content_preview) > 200: - description += "..." + def _description_fn( + chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> str: + description = metadata.get("description") or self._chunk_preview( + chunk.get("content", "") + ) + info_parts = [] + language = metadata.get("language", "") + last_crawled_at = metadata.get("last_crawled_at", "") + if language: + info_parts.append(f"Language: {language}") + if last_crawled_at: + info_parts.append(f"Last crawled: {last_crawled_at}") + if info_parts: + description = (description + " | " + " | ".join(info_parts)).strip(" |") + return description - # Add crawler metadata to description if available - info_parts = [] - if language: - info_parts.append(f"Language: {language}") - if last_crawled_at: - info_parts.append(f"Last crawled: {last_crawled_at}") + def _extra_fields_fn( + _chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> dict[str, Any]: + return { + "language": metadata.get("language", ""), + "last_crawled_at": metadata.get("last_crawled_at", ""), + } - if info_parts: - if description: - description += f" | {' | '.join(info_parts)}" - else: - description = " | ".join(info_parts) - - source = { - "id": chunk.get("chunk_id", self.source_id_counter), - "title": title, - "description": description, - "url": url, - "language": language, - "last_crawled_at": last_crawled_at, - } - - self.source_id_counter += 1 - sources_list.append(source) + sources_list = self._build_chunk_sources_from_documents( + crawled_urls_docs, + title_fn=_title_fn, + description_fn=_description_fn, + url_fn=_url_fn, + extra_fields_fn=_extra_fields_fn, + ) # Create result object result_object = { @@ -156,7 +145,7 @@ class ConnectorService: "sources": sources_list, } - return result_object, crawled_urls_chunks + return result_object, crawled_urls_docs async def search_files( self, @@ -181,7 +170,7 @@ class ConnectorService: Returns: tuple: (sources_info, langchain_documents) """ - files_chunks = await self._combined_rrf_search( + files_docs = await self._combined_rrf_search( query_text=user_query, search_space_id=search_space_id, document_type="FILE", @@ -191,7 +180,7 @@ class ConnectorService: ) # Early return if no results - if not files_chunks: + if not files_docs: return { "id": 2, "name": "Files", @@ -199,27 +188,20 @@ class ConnectorService: "sources": [], }, [] - # Process each chunk and create sources directly without deduplication - sources_list = [] - async with self.counter_lock: - for _i, chunk in enumerate(files_chunks): - # Extract document metadata - document = chunk.get("document", {}) - metadata = document.get("metadata", {}) + def _description_fn( + chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> str: + return ( + metadata.get("og:description") + or metadata.get("ogDescription") + or self._chunk_preview(chunk.get("content", "")) + ) - # Create a source entry - source = { - "id": chunk.get("chunk_id", self.source_id_counter), - "title": document.get("title", "Untitled Document"), - "description": metadata.get( - "og:description", - metadata.get("ogDescription", chunk.get("content", "")), - ), - "url": metadata.get("url", ""), - } - - self.source_id_counter += 1 - sources_list.append(source) + sources_list = self._build_chunk_sources_from_documents( + files_docs, + description_fn=_description_fn, + url_fn=lambda _doc_info, metadata: metadata.get("url", "") or "", + ) # Create result object result_object = { @@ -229,37 +211,7 @@ class ConnectorService: "sources": sources_list, } - return result_object, files_chunks - - def _transform_document_results( - self, document_results: list[dict[str, Any]] - ) -> list[dict[str, Any]]: - """ - Transform results from document_retriever.hybrid_search() to match the format - expected by the processing code. - - Args: - document_results: Results from document_retriever.hybrid_search() - - Returns: - List of transformed results in the format expected by the processing code - """ - transformed_results = [] - for doc in document_results: - transformed_results.append( - { - "chunk_id": doc.get("document_id"), - "document": { - "id": doc.get("document_id"), - "title": doc.get("title", "Untitled Document"), - "document_type": doc.get("document_type"), - "metadata": doc.get("metadata", {}), - }, - "content": doc.get("chunks_content", doc.get("content", "")), - "score": doc.get("score", 0.0), - } - ) - return transformed_results + return result_object, files_docs async def _combined_rrf_search( self, @@ -271,8 +223,11 @@ class ConnectorService: end_date: datetime | None = None, ) -> list[dict[str, Any]]: """ - Perform combined search using both chunk-level and document-level hybrid search, - then merge results using Reciprocal Rank Fusion (RRF). + Perform combined search using both chunk-based and document-based hybrid search, + then merge results using Reciprocal Rank Fusion (RRF) **at the document level**. + + Returned results are **document-grouped** objects that contain a list of chunks + with real chunk IDs (used for downstream `[citation:]`). This method: 1. Runs chunk-level hybrid search (vector + keyword on chunks) @@ -289,7 +244,7 @@ class ConnectorService: end_date: Optional end date for filtering documents by updated_at Returns: - List of combined and deduplicated chunk results + List of combined and deduplicated document results """ # RRF constant k = 60 @@ -317,69 +272,130 @@ class ConnectorService: ), ) - # Transform document results to chunk format - doc_results_transformed = self._transform_document_results(doc_results) + # Helper to extract document_id from our doc-grouped result + def _doc_id(item: dict[str, Any]) -> int | None: + doc = item.get("document", {}) + did = doc.get("id") + return int(did) if did is not None else None - # Build rank maps for RRF calculation - # chunk_id -> rank in chunk results (1-indexed) + # Build rank maps for RRF calculation (document-level) chunk_ranks: dict[int, int] = {} for rank, result in enumerate(chunk_results, start=1): - chunk_id = result.get("chunk_id") - if chunk_id is not None: - chunk_ranks[chunk_id] = rank + did = _doc_id(result) + if did is not None and did not in chunk_ranks: + chunk_ranks[did] = rank - # chunk_id -> rank in document results (1-indexed) doc_ranks: dict[int, int] = {} - for rank, result in enumerate(doc_results_transformed, start=1): - chunk_id = result.get("chunk_id") - if chunk_id is not None: - doc_ranks[chunk_id] = rank + for rank, result in enumerate(doc_results, start=1): + did = _doc_id(result) + if did is not None and did not in doc_ranks: + doc_ranks[did] = rank - # Collect all unique chunk_ids - all_chunk_ids = set(chunk_ranks.keys()) | set(doc_ranks.keys()) + all_doc_ids = set(chunk_ranks.keys()) | set(doc_ranks.keys()) - # Calculate RRF scores for each chunk + # Calculate RRF scores for each document rrf_scores: dict[int, float] = {} - for chunk_id in all_chunk_ids: - chunk_rank = chunk_ranks.get(chunk_id) - doc_rank = doc_ranks.get(chunk_id) - - rrf_score = 0.0 + for did in all_doc_ids: + chunk_rank = chunk_ranks.get(did) + doc_rank = doc_ranks.get(did) + score = 0.0 if chunk_rank is not None: - rrf_score += 1.0 / (k + chunk_rank) + score += 1.0 / (k + chunk_rank) if doc_rank is not None: - rrf_score += 1.0 / (k + doc_rank) + score += 1.0 / (k + doc_rank) + rrf_scores[did] = score - rrf_scores[chunk_id] = rrf_score - - # Create a map of chunk_id -> result data (prefer chunk results for data) - chunk_data: dict[int, dict[str, Any]] = {} + # Prefer chunk_results data, fallback to doc_results data + doc_data: dict[int, dict[str, Any]] = {} for result in chunk_results: - chunk_id = result.get("chunk_id") - if chunk_id is not None and chunk_id not in chunk_data: - chunk_data[chunk_id] = result + did = _doc_id(result) + if did is not None and did not in doc_data: + doc_data[did] = result + for result in doc_results: + did = _doc_id(result) + if did is not None and did not in doc_data: + doc_data[did] = result - # Fill in any missing chunks from document results - for result in doc_results_transformed: - chunk_id = result.get("chunk_id") - if chunk_id is not None and chunk_id not in chunk_data: - chunk_data[chunk_id] = result - - # Sort by RRF score and take top-k - sorted_chunk_ids = sorted( - all_chunk_ids, key=lambda cid: rrf_scores[cid], reverse=True + sorted_doc_ids = sorted( + all_doc_ids, key=lambda did: rrf_scores[did], reverse=True )[:top_k] - # Build final results with updated scores - combined_results = [] - for chunk_id in sorted_chunk_ids: - if chunk_id in chunk_data: - result = chunk_data[chunk_id].copy() - result["score"] = rrf_scores[chunk_id] + combined_results: list[dict[str, Any]] = [] + for did in sorted_doc_ids: + if did in doc_data: + result = doc_data[did].copy() + result["document_id"] = did + result["score"] = rrf_scores[did] + # Preserve chunks list if present + if "chunks" in doc_data[did]: + result["chunks"] = doc_data[did]["chunks"] combined_results.append(result) return combined_results + def _get_doc_url(self, metadata: dict[str, Any]) -> str: + return ( + metadata.get("url") + or metadata.get("source") + or metadata.get("page_url") + or metadata.get("VisitedWebPageURL") + or "" + ) + + def _chunk_preview(self, text: str, limit: int = 200) -> str: + if not text: + return "" + text = str(text) + if len(text) <= limit: + return text + return text[:limit] + "..." + + def _build_chunk_sources_from_documents( + self, + documents: list[dict[str, Any]], + *, + title_fn=None, + description_fn=None, + url_fn=None, + extra_fields_fn=None, + ) -> list[dict[str, Any]]: + """ + Build a chunk-level `sources` list from document-grouped results. + + Each chunk becomes a source with `id == chunk_id` so the frontend can resolve + citations like `[citation:]`. + """ + sources: list[dict[str, Any]] = [] + + for doc in documents: + doc_info = doc.get("document", {}) or {} + metadata = doc_info.get("metadata", {}) or {} + url = url_fn(doc_info, metadata) if url_fn else self._get_doc_url(metadata) + chunks = doc.get("chunks", []) or [] + display_title = ( + title_fn(doc_info, metadata) + if title_fn + else doc_info.get("title", "Untitled Document") + ) + for chunk in chunks: + chunk_id = chunk.get("chunk_id") + chunk_content = chunk.get("content", "") + description = ( + description_fn(chunk, doc_info, metadata) + if description_fn + else self._chunk_preview(chunk_content) + ) + source = { + "id": chunk_id, + "title": display_title, + "description": description, + "url": url, + } + if extra_fields_fn: + source.update(extra_fields_fn(chunk, doc_info, metadata) or {}) + sources.append(source) + return sources + async def get_connector_by_type( self, connector_type: SearchSourceConnectorType, @@ -941,7 +957,7 @@ class ConnectorService: Returns: tuple: (sources_info, langchain_documents) """ - slack_chunks = await self._combined_rrf_search( + slack_docs = await self._combined_rrf_search( query_text=user_query, search_space_id=search_space_id, document_type="SLACK_CONNECTOR", @@ -951,7 +967,7 @@ class ConnectorService: ) # Early return if no results - if not slack_chunks: + if not slack_docs: return { "id": 4, "name": "Slack", @@ -959,41 +975,28 @@ class ConnectorService: "sources": [], }, [] - # Process each chunk and create sources directly without deduplication - sources_list = [] - async with self.counter_lock: - for _i, chunk in enumerate(slack_chunks): - # Extract document metadata - document = chunk.get("document", {}) - metadata = document.get("metadata", {}) + def _title_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + channel_name = metadata.get("channel_name", "Unknown Channel") + message_date = metadata.get("start_date", "") + title = f"Slack: {channel_name}" + if message_date: + title += f" ({message_date})" + return title - # Create a mapped source entry with Slack-specific metadata - channel_name = metadata.get("channel_name", "Unknown Channel") - channel_id = metadata.get("channel_id", "") - message_date = metadata.get("start_date", "") + def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + channel_id = metadata.get("channel_id", "") + return ( + f"https://slack.com/app_redirect?channel={channel_id}" + if channel_id + else "" + ) - # Create a more descriptive title for Slack messages - title = f"Slack: {channel_name}" - if message_date: - title += f" ({message_date})" - - # Create a more descriptive description for Slack messages - description = chunk.get("content", "") - - # For URL, we can use a placeholder or construct a URL to the Slack channel if available - url = "" - if channel_id: - url = f"https://slack.com/app_redirect?channel={channel_id}" - - source = { - "id": chunk.get("chunk_id", self.source_id_counter), - "title": title, - "description": description, - "url": url, - } - - self.source_id_counter += 1 - sources_list.append(source) + sources_list = self._build_chunk_sources_from_documents( + slack_docs, + title_fn=_title_fn, + url_fn=_url_fn, + description_fn=lambda chunk, _doc_info, _metadata: chunk.get("content", ""), + ) # Create result object result_object = { @@ -1003,7 +1006,7 @@ class ConnectorService: "sources": sources_list, } - return result_object, slack_chunks + return result_object, slack_docs async def search_notion( self, @@ -1028,7 +1031,7 @@ class ConnectorService: Returns: tuple: (sources_info, langchain_documents) """ - notion_chunks = await self._combined_rrf_search( + notion_docs = await self._combined_rrf_search( query_text=user_query, search_space_id=search_space_id, document_type="NOTION_CONNECTOR", @@ -1038,7 +1041,7 @@ class ConnectorService: ) # Early return if no results - if not notion_chunks: + if not notion_docs: return { "id": 5, "name": "Notion", @@ -1046,44 +1049,24 @@ class ConnectorService: "sources": [], }, [] - # Process each chunk and create sources directly without deduplication - sources_list = [] - async with self.counter_lock: - for _i, chunk in enumerate(notion_chunks): - # Extract document metadata - document = chunk.get("document", {}) - metadata = document.get("metadata", {}) + def _title_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + page_title = metadata.get("page_title", "Untitled Page") + indexed_at = metadata.get("indexed_at", "") + title = f"Notion: {page_title}" + if indexed_at: + title += f" (indexed: {indexed_at})" + return title - # Create a mapped source entry with Notion-specific metadata - page_title = metadata.get("page_title", "Untitled Page") - page_id = metadata.get("page_id", "") - indexed_at = metadata.get("indexed_at", "") + def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + page_id = metadata.get("page_id", "") + return f"https://notion.so/{page_id.replace('-', '')}" if page_id else "" - # Create a more descriptive title for Notion pages - title = f"Notion: {page_title}" - if indexed_at: - title += f" (indexed: {indexed_at})" - - # Create a more descriptive description for Notion pages - description = chunk.get("content", "") - if len(description) == 100: - description += "..." - - # For URL, we can use a placeholder or construct a URL to the Notion page if available - url = "" - if page_id: - # Notion page URLs follow this format - url = f"https://notion.so/{page_id.replace('-', '')}" - - source = { - "id": chunk.get("chunk_id", self.source_id_counter), - "title": title, - "description": description, - "url": url, - } - - self.source_id_counter += 1 - sources_list.append(source) + sources_list = self._build_chunk_sources_from_documents( + notion_docs, + title_fn=_title_fn, + url_fn=_url_fn, + description_fn=lambda chunk, _doc_info, _metadata: chunk.get("content", ""), + ) # Create result object result_object = { @@ -1093,7 +1076,7 @@ class ConnectorService: "sources": sources_list, } - return result_object, notion_chunks + return result_object, notion_docs async def search_extension( self, @@ -1118,7 +1101,7 @@ class ConnectorService: Returns: tuple: (sources_info, langchain_documents) """ - extension_chunks = await self._combined_rrf_search( + extension_docs = await self._combined_rrf_search( query_text=user_query, search_space_id=search_space_id, document_type="EXTENSION", @@ -1128,7 +1111,7 @@ class ConnectorService: ) # Early return if no results - if not extension_chunks: + if not extension_docs: return { "id": 6, "name": "Extension", @@ -1136,68 +1119,51 @@ class ConnectorService: "sources": [], }, [] - # Process each chunk and create sources directly without deduplication - sources_list = [] - async with self.counter_lock: - for _, chunk in enumerate(extension_chunks): - # Extract document metadata - document = chunk.get("document", {}) - metadata = document.get("metadata", {}) + def _title_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + webpage_title = metadata.get("VisitedWebPageTitle", "Untitled Page") + visit_date = metadata.get("VisitedWebPageDateWithTimeInISOString", "") + title = webpage_title + if visit_date: + try: + formatted_date = ( + visit_date.split("T")[0] if "T" in visit_date else visit_date + ) + title += f" (visited: {formatted_date})" + except Exception: + title += f" (visited: {visit_date})" + return title - # Extract extension-specific metadata - webpage_title = metadata.get("VisitedWebPageTitle", "Untitled Page") - webpage_url = metadata.get("VisitedWebPageURL", "") - visit_date = metadata.get("VisitedWebPageDateWithTimeInISOString", "") - visit_duration = metadata.get( - "VisitedWebPageVisitDurationInMilliseconds", "" - ) - _browsing_session_id = metadata.get("BrowsingSessionId", "") + def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + return metadata.get("VisitedWebPageURL", "") or "" - # Create a more descriptive title for extension data - title = webpage_title - if visit_date: - # Format the date for display (simplified) - try: - # Just extract the date part for display - formatted_date = ( - visit_date.split("T")[0] - if "T" in visit_date - else visit_date - ) - title += f" (visited: {formatted_date})" - except Exception: - # Fallback if date parsing fails - title += f" (visited: {visit_date})" + def _description_fn( + chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> str: + description = chunk.get("content", "") + visit_duration = metadata.get( + "VisitedWebPageVisitDurationInMilliseconds", "" + ) + if visit_duration: + try: + duration_seconds = int(visit_duration) / 1000 + duration_text = ( + f"{duration_seconds:.1f} seconds" + if duration_seconds < 60 + else f"{duration_seconds / 60:.1f} minutes" + ) + description = (description + f" | Duration: {duration_text}").strip( + " |" + ) + except Exception: + pass + return description - # Create a more descriptive description for extension data - description = chunk.get("content", "") - if len(description) == 100: - description += "..." - - # Add visit duration if available - if visit_duration: - try: - duration_seconds = int(visit_duration) / 1000 - if duration_seconds < 60: - duration_text = f"{duration_seconds:.1f} seconds" - else: - duration_text = f"{duration_seconds / 60:.1f} minutes" - - if description: - description += f" | Duration: {duration_text}" - except Exception: - # Fallback if duration parsing fails - pass - - source = { - "id": chunk.get("chunk_id", self.source_id_counter), - "title": title, - "description": description, - "url": webpage_url, - } - - self.source_id_counter += 1 - sources_list.append(source) + sources_list = self._build_chunk_sources_from_documents( + extension_docs, + title_fn=_title_fn, + url_fn=_url_fn, + description_fn=_description_fn, + ) # Create result object result_object = { @@ -1207,7 +1173,7 @@ class ConnectorService: "sources": sources_list, } - return result_object, extension_chunks + return result_object, extension_docs async def search_youtube( self, @@ -1232,7 +1198,7 @@ class ConnectorService: Returns: tuple: (sources_info, langchain_documents) """ - youtube_chunks = await self._combined_rrf_search( + youtube_docs = await self._combined_rrf_search( query_text=user_query, search_space_id=search_space_id, document_type="YOUTUBE_VIDEO", @@ -1242,7 +1208,7 @@ class ConnectorService: ) # Early return if no results - if not youtube_chunks: + if not youtube_docs: return { "id": 7, "name": "YouTube Videos", @@ -1250,44 +1216,35 @@ class ConnectorService: "sources": [], }, [] - # Process each chunk and create sources directly without deduplication - sources_list = [] - async with self.counter_lock: - for _i, chunk in enumerate(youtube_chunks): - # Extract document metadata - document = chunk.get("document", {}) - metadata = document.get("metadata", {}) + def _title_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + video_title = metadata.get("video_title", "Untitled Video") + channel_name = metadata.get("channel_name", "") + return f"{video_title} - {channel_name}" if channel_name else video_title - # Extract YouTube-specific metadata - video_title = metadata.get("video_title", "Untitled Video") - video_id = metadata.get("video_id", "") - channel_name = metadata.get("channel_name", "") - # published_date = metadata.get('published_date', '') + def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + video_id = metadata.get("video_id", "") + return f"https://www.youtube.com/watch?v={video_id}" if video_id else "" - # Create a more descriptive title for YouTube videos - title = video_title - if channel_name: - title += f" - {channel_name}" + def _description_fn( + chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> str: + return metadata.get("description") or chunk.get("content", "") - # Create a more descriptive description for YouTube videos - description = metadata.get("description", chunk.get("content", "")) - if len(description) == 100: - description += "..." + def _extra_fields_fn( + _chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> dict[str, Any]: + return { + "video_id": metadata.get("video_id", ""), + "channel_name": metadata.get("channel_name", ""), + } - # For URL, construct a URL to the YouTube video - url = f"https://www.youtube.com/watch?v={video_id}" if video_id else "" - - source = { - "id": chunk.get("chunk_id", self.source_id_counter), - "title": title, - "description": description, - "url": url, - "video_id": video_id, # Additional field for YouTube videos - "channel_name": channel_name, # Additional field for YouTube videos - } - - self.source_id_counter += 1 - sources_list.append(source) + sources_list = self._build_chunk_sources_from_documents( + youtube_docs, + title_fn=_title_fn, + url_fn=_url_fn, + description_fn=_description_fn, + extra_fields_fn=_extra_fields_fn, + ) # Create result object result_object = { @@ -1297,7 +1254,7 @@ class ConnectorService: "sources": sources_list, } - return result_object, youtube_chunks + return result_object, youtube_docs async def search_github( self, @@ -1322,7 +1279,7 @@ class ConnectorService: Returns: tuple: (sources_info, langchain_documents) """ - github_chunks = await self._combined_rrf_search( + github_docs = await self._combined_rrf_search( query_text=user_query, search_space_id=search_space_id, document_type="GITHUB_CONNECTOR", @@ -1332,7 +1289,7 @@ class ConnectorService: ) # Early return if no results - if not github_chunks: + if not github_docs: return { "id": 8, "name": "GitHub", @@ -1340,28 +1297,14 @@ class ConnectorService: "sources": [], }, [] - # Process each chunk and create sources directly without deduplication - sources_list = [] - async with self.counter_lock: - for _i, chunk in enumerate(github_chunks): - # Extract document metadata - document = chunk.get("document", {}) - metadata = document.get("metadata", {}) - - # Create a source entry - source = { - "id": chunk.get("chunk_id", self.source_id_counter), - "title": document.get( - "title", "GitHub Document" - ), # Use specific title if available - "description": metadata.get( - "description", chunk.get("content", "") - ), # Use description or content preview - "url": metadata.get("url", ""), # Use URL if available in metadata - } - - self.source_id_counter += 1 - sources_list.append(source) + sources_list = self._build_chunk_sources_from_documents( + github_docs, + description_fn=lambda chunk, _doc_info, metadata: metadata.get( + "description" + ) + or chunk.get("content", ""), + url_fn=lambda _doc_info, metadata: metadata.get("url", "") or "", + ) # Create result object result_object = { @@ -1371,7 +1314,7 @@ class ConnectorService: "sources": sources_list, } - return result_object, github_chunks + return result_object, github_docs async def search_linear( self, @@ -1396,7 +1339,7 @@ class ConnectorService: Returns: tuple: (sources_info, langchain_documents) """ - linear_chunks = await self._combined_rrf_search( + linear_docs = await self._combined_rrf_search( query_text=user_query, search_space_id=search_space_id, document_type="LINEAR_CONNECTOR", @@ -1406,7 +1349,7 @@ class ConnectorService: ) # Early return if no results - if not linear_chunks: + if not linear_docs: return { "id": 9, "name": "Linear Issues", @@ -1414,56 +1357,54 @@ class ConnectorService: "sources": [], }, [] - # Process each chunk and create sources directly without deduplication - sources_list = [] - async with self.counter_lock: - for _i, chunk in enumerate(linear_chunks): - # Extract document metadata - document = chunk.get("document", {}) - metadata = document.get("metadata", {}) + def _title_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + issue_identifier = metadata.get("issue_identifier", "") + issue_title = metadata.get("issue_title", "Untitled Issue") + issue_state = metadata.get("state", "") + title = ( + f"Linear: {issue_identifier} - {issue_title}" + if issue_identifier + else f"Linear: {issue_title}" + ) + if issue_state: + title += f" ({issue_state})" + return title - # Extract Linear-specific metadata - issue_identifier = metadata.get("issue_identifier", "") - issue_title = metadata.get("issue_title", "Untitled Issue") - issue_state = metadata.get("state", "") - comment_count = metadata.get("comment_count", 0) + def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + issue_identifier = metadata.get("issue_identifier", "") + return ( + f"https://linear.app/issue/{issue_identifier}" + if issue_identifier + else "" + ) - # Create a more descriptive title for Linear issues - title = f"Linear: {issue_identifier} - {issue_title}" - if issue_state: - title += f" ({issue_state})" + def _description_fn( + chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> str: + description = chunk.get("content", "") + comment_count = metadata.get("comment_count", 0) + if comment_count: + description = (description + f" | Comments: {comment_count}").strip( + " |" + ) + return description - # Create a more descriptive description for Linear issues - description = chunk.get("content", "") - if len(description) == 100: - description += "..." + def _extra_fields_fn( + _chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> dict[str, Any]: + return { + "issue_identifier": metadata.get("issue_identifier", ""), + "state": metadata.get("state", ""), + "comment_count": metadata.get("comment_count", 0), + } - # Add comment count info to description - if comment_count: - if description: - description += f" | Comments: {comment_count}" - else: - description = f"Comments: {comment_count}" - - # For URL, we could construct a URL to the Linear issue if we have the workspace info - # For now, use a generic placeholder - url = "" - if issue_identifier: - # This is a generic format, may need to be adjusted based on actual Linear workspace - url = f"https://linear.app/issue/{issue_identifier}" - - source = { - "id": chunk.get("chunk_id", self.source_id_counter), - "title": title, - "description": description, - "url": url, - "issue_identifier": issue_identifier, - "state": issue_state, - "comment_count": comment_count, - } - - self.source_id_counter += 1 - sources_list.append(source) + sources_list = self._build_chunk_sources_from_documents( + linear_docs, + title_fn=_title_fn, + url_fn=_url_fn, + description_fn=_description_fn, + extra_fields_fn=_extra_fields_fn, + ) # Create result object result_object = { @@ -1473,7 +1414,7 @@ class ConnectorService: "sources": sources_list, } - return result_object, linear_chunks + return result_object, linear_docs async def search_jira( self, @@ -1498,7 +1439,7 @@ class ConnectorService: Returns: tuple: (sources_info, langchain_documents) """ - jira_chunks = await self._combined_rrf_search( + jira_docs = await self._combined_rrf_search( query_text=user_query, search_space_id=search_space_id, document_type="JIRA_CONNECTOR", @@ -1508,7 +1449,7 @@ class ConnectorService: ) # Early return if no results - if not jira_chunks: + if not jira_docs: return { "id": 30, "name": "Jira Issues", @@ -1516,67 +1457,60 @@ class ConnectorService: "sources": [], }, [] - # Process each chunk and create sources directly without deduplication - sources_list = [] - async with self.counter_lock: - for _i, chunk in enumerate(jira_chunks): - # Extract document metadata - document = chunk.get("document", {}) - metadata = document.get("metadata", {}) + def _title_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + issue_key = metadata.get("issue_key", "") + issue_title = metadata.get("issue_title", "Untitled Issue") + status = metadata.get("status", "") + title = ( + f"Jira: {issue_key} - {issue_title}" + if issue_key + else f"Jira: {issue_title}" + ) + if status: + title += f" ({status})" + return title - # Extract Jira-specific metadata - issue_key = metadata.get("issue_key", "") - issue_title = metadata.get("issue_title", "Untitled Issue") - status = metadata.get("status", "") - priority = metadata.get("priority", "") - issue_type = metadata.get("issue_type", "") - comment_count = metadata.get("comment_count", 0) + def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + issue_key = metadata.get("issue_key", "") + base_url = metadata.get("base_url") + return f"{base_url}/browse/{issue_key}" if issue_key and base_url else "" - # Create a more descriptive title for Jira issues - title = f"Jira: {issue_key} - {issue_title}" - if status: - title += f" ({status})" + def _description_fn( + chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> str: + description = chunk.get("content", "") + info_parts = [] + priority = metadata.get("priority", "") + issue_type = metadata.get("issue_type", "") + comment_count = metadata.get("comment_count", 0) + if priority: + info_parts.append(f"Priority: {priority}") + if issue_type: + info_parts.append(f"Type: {issue_type}") + if comment_count: + info_parts.append(f"Comments: {comment_count}") + if info_parts: + description = (description + " | " + " | ".join(info_parts)).strip(" |") + return description - # Create a more descriptive description for Jira issues - description = chunk.get("content", "") - if len(description) == 100: - description += "..." + def _extra_fields_fn( + _chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> dict[str, Any]: + return { + "issue_key": metadata.get("issue_key", ""), + "status": metadata.get("status", ""), + "priority": metadata.get("priority", ""), + "issue_type": metadata.get("issue_type", ""), + "comment_count": metadata.get("comment_count", 0), + } - # Add priority and type info to description - info_parts = [] - if priority: - info_parts.append(f"Priority: {priority}") - if issue_type: - info_parts.append(f"Type: {issue_type}") - if comment_count: - info_parts.append(f"Comments: {comment_count}") - - if info_parts: - if description: - description += f" | {' | '.join(info_parts)}" - else: - description = " | ".join(info_parts) - - # For URL, we could construct a URL to the Jira issue if we have the base URL - # For now, use a generic placeholder - url = "" - if issue_key and metadata.get("base_url"): - url = f"{metadata.get('base_url')}/browse/{issue_key}" - - source = { - "id": chunk.get("chunk_id", self.source_id_counter), - "title": title, - "description": description, - "url": url, - "issue_key": issue_key, - "status": status, - "priority": priority, - "issue_type": issue_type, - "comment_count": comment_count, - } - - self.source_id_counter += 1 - sources_list.append(source) + sources_list = self._build_chunk_sources_from_documents( + jira_docs, + title_fn=_title_fn, + url_fn=_url_fn, + description_fn=_description_fn, + extra_fields_fn=_extra_fields_fn, + ) # Create result object result_object = { @@ -1586,7 +1520,7 @@ class ConnectorService: "sources": sources_list, } - return result_object, jira_chunks + return result_object, jira_docs async def search_google_calendar( self, @@ -1611,7 +1545,7 @@ class ConnectorService: Returns: tuple: (sources_info, langchain_documents) """ - calendar_chunks = await self._combined_rrf_search( + calendar_docs = await self._combined_rrf_search( query_text=user_query, search_space_id=search_space_id, document_type="GOOGLE_CALENDAR_CONNECTOR", @@ -1621,7 +1555,7 @@ class ConnectorService: ) # Early return if no results - if not calendar_chunks: + if not calendar_docs: return { "id": 31, "name": "Google Calendar Events", @@ -1629,79 +1563,60 @@ class ConnectorService: "sources": [], }, [] - # Process each chunk and create sources directly without deduplication - sources_list = [] - async with self.counter_lock: - for _i, chunk in enumerate(calendar_chunks): - # Extract document metadata - document = chunk.get("document", {}) - metadata = document.get("metadata", {}) + def _title_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + event_summary = metadata.get("event_summary", "Untitled Event") + start_time = metadata.get("start_time", "") + title = f"Calendar: {event_summary}" + if start_time: + title += f" ({start_time})" + return title - # Extract Google Calendar-specific metadata - event_id = metadata.get("event_id", "") - event_summary = metadata.get("event_summary", "Untitled Event") - calendar_id = metadata.get("calendar_id", "") - start_time = metadata.get("start_time", "") - end_time = metadata.get("end_time", "") - location = metadata.get("location", "") + def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + event_id = metadata.get("event_id", "") + calendar_id = metadata.get("calendar_id", "") + return ( + f"https://calendar.google.com/calendar/event?eid={event_id}" + if event_id and calendar_id + else "" + ) - # Create a more descriptive title for calendar events - title = f"Calendar: {event_summary}" - if start_time: - # Format the start time for display - try: - if "T" in start_time: - from datetime import datetime + def _description_fn( + chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> str: + description = chunk.get("content", "") + info_parts = [] + location = metadata.get("location", "") + calendar_id = metadata.get("calendar_id", "") + end_time = metadata.get("end_time", "") + if location: + info_parts.append(f"Location: {location}") + if calendar_id and calendar_id != "primary": + info_parts.append(f"Calendar: {calendar_id}") + if end_time: + info_parts.append(f"End: {end_time}") + if info_parts: + description = (description + " | " + " | ".join(info_parts)).strip(" |") + return description - start_dt = datetime.fromisoformat( - start_time.replace("Z", "+00:00") - ) - formatted_time = start_dt.strftime("%Y-%m-%d %H:%M") - title += f" ({formatted_time})" - else: - title += f" ({start_time})" - except Exception: - title += f" ({start_time})" + def _extra_fields_fn( + _chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> dict[str, Any]: + return { + "event_id": metadata.get("event_id", ""), + "event_summary": metadata.get("event_summary", "Untitled Event"), + "calendar_id": metadata.get("calendar_id", ""), + "start_time": metadata.get("start_time", ""), + "end_time": metadata.get("end_time", ""), + "location": metadata.get("location", ""), + } - # Create a more descriptive description for calendar events - description = chunk.get("content", "") - - # Add event info to description - info_parts = [] - if location: - info_parts.append(f"Location: {location}") - if calendar_id and calendar_id != "primary": - info_parts.append(f"Calendar: {calendar_id}") - if end_time: - info_parts.append(f"End: {end_time}") - - if info_parts: - if description: - description += f" | {' | '.join(info_parts)}" - else: - description = " | ".join(info_parts) - - # For URL, we could construct a URL to the Google Calendar event - url = "" - if event_id and calendar_id: - # Google Calendar event URL format - url = f"https://calendar.google.com/calendar/event?eid={event_id}" - - source = { - "id": chunk.get("chunk_id", self.source_id_counter), - "title": title, - "description": description, - "url": url, - "event_id": event_id, - "event_summary": event_summary, - "calendar_id": calendar_id, - "start_time": start_time, - "end_time": end_time, - "location": location, - } - - self.source_id_counter += 1 - sources_list.append(source) + sources_list = self._build_chunk_sources_from_documents( + calendar_docs, + title_fn=_title_fn, + url_fn=_url_fn, + description_fn=_description_fn, + extra_fields_fn=_extra_fields_fn, + ) # Create result object result_object = { @@ -1711,7 +1626,7 @@ class ConnectorService: "sources": sources_list, } - return result_object, calendar_chunks + return result_object, calendar_docs async def search_airtable( self, @@ -1736,7 +1651,7 @@ class ConnectorService: Returns: tuple: (sources_info, langchain_documents) """ - airtable_chunks = await self._combined_rrf_search( + airtable_docs = await self._combined_rrf_search( query_text=user_query, search_space_id=search_space_id, document_type="AIRTABLE_CONNECTOR", @@ -1746,7 +1661,7 @@ class ConnectorService: ) # Early return if no results - if not airtable_chunks: + if not airtable_docs: return { "id": 32, "name": "Airtable Records", @@ -1754,35 +1669,31 @@ class ConnectorService: "sources": [], }, [] - # Process chunks to create sources - sources_list = [] - async with self.counter_lock: - for _i, chunk in enumerate(airtable_chunks): - # Extract document metadata - document = chunk.get("document", {}) - metadata = document.get("metadata", {}) + def _title_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + record_id = metadata.get("record_id", "") + return f"Airtable Record: {record_id}" if record_id else "Airtable Record" - # Extract Airtable-specific metadata - record_id = metadata.get("record_id", "") - created_time = metadata.get("created_time", "") + def _description_fn( + _chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> str: + created_time = metadata.get("created_time", "") + return f"Created: {created_time}" if created_time else "" - # Create a more descriptive title for Airtable records - title = f"Airtable Record: {record_id}" + def _extra_fields_fn( + _chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> dict[str, Any]: + return { + "record_id": metadata.get("record_id", ""), + "created_time": metadata.get("created_time", ""), + } - # Create a more descriptive description for Airtable records - description = f"Created: {created_time}" - - source = { - "id": chunk.get("chunk_id", self.source_id_counter), - "title": title, - "description": description, - "url": "", # TODO: Add URL to Airtable record - "record_id": record_id, - "created_time": created_time, - } - - self.source_id_counter += 1 - sources_list.append(source) + sources_list = self._build_chunk_sources_from_documents( + airtable_docs, + title_fn=_title_fn, + url_fn=lambda _doc_info, _metadata: "", + description_fn=_description_fn, + extra_fields_fn=_extra_fields_fn, + ) result_object = { "id": 32, @@ -1791,7 +1702,7 @@ class ConnectorService: "sources": sources_list, } - return result_object, airtable_chunks + return result_object, airtable_docs async def search_google_gmail( self, @@ -1816,7 +1727,7 @@ class ConnectorService: Returns: tuple: (sources_info, langchain_documents) """ - gmail_chunks = await self._combined_rrf_search( + gmail_docs = await self._combined_rrf_search( query_text=user_query, search_space_id=search_space_id, document_type="GOOGLE_GMAIL_CONNECTOR", @@ -1826,7 +1737,7 @@ class ConnectorService: ) # Early return if no results - if not gmail_chunks: + if not gmail_docs: return { "id": 32, "name": "Gmail Messages", @@ -1834,70 +1745,54 @@ class ConnectorService: "sources": [], }, [] - # Process each chunk and create sources directly without deduplication - sources_list = [] - async with self.counter_lock: - for _i, chunk in enumerate(gmail_chunks): - # Extract document metadata - document = chunk.get("document", {}) - metadata = document.get("metadata", {}) + def _title_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + subject = metadata.get("subject", "No Subject") + sender = metadata.get("sender", "Unknown Sender") + return ( + f"Email: {subject} (from {sender})" if sender else f"Email: {subject}" + ) - # Extract Gmail-specific metadata - message_id = metadata.get("message_id", "") - subject = metadata.get("subject", "No Subject") - sender = metadata.get("sender", "Unknown Sender") - date_str = metadata.get("date", "") - thread_id = metadata.get("thread_id", "") + def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + message_id = metadata.get("message_id", "") + return ( + f"https://mail.google.com/mail/u/0/#inbox/{message_id}" + if message_id + else "" + ) - # Create a more descriptive title for Gmail messages - title = f"Email: {subject}" - if sender: - # Extract just the email address or name from sender - import re + def _description_fn( + chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> str: + description = chunk.get("content", "") + info_parts = [] + date_str = metadata.get("date", "") + thread_id = metadata.get("thread_id", "") + if date_str: + info_parts.append(f"Date: {date_str}") + if thread_id: + info_parts.append(f"Thread: {thread_id}") + if info_parts: + description = (description + " | " + " | ".join(info_parts)).strip(" |") + return description - sender_match = re.search(r"<([^>]+)>", sender) - if sender_match: - sender_email = sender_match.group(1) - title += f" (from {sender_email})" - else: - title += f" (from {sender})" + def _extra_fields_fn( + _chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> dict[str, Any]: + return { + "message_id": metadata.get("message_id", ""), + "subject": metadata.get("subject", "No Subject"), + "sender": metadata.get("sender", "Unknown Sender"), + "date": metadata.get("date", ""), + "thread_id": metadata.get("thread_id", ""), + } - # Create a more descriptive description for Gmail messages - description = chunk.get("content", "") - - # Add message info to description - info_parts = [] - if date_str: - info_parts.append(f"Date: {date_str}") - if thread_id: - info_parts.append(f"Thread: {thread_id}") - - if info_parts: - if description: - description += f" | {' | '.join(info_parts)}" - else: - description = " | ".join(info_parts) - - # For URL, we could construct a URL to the Gmail message - url = "" - if message_id: - # Gmail message URL format - url = f"https://mail.google.com/mail/u/0/#inbox/{message_id}" - - source = { - "id": chunk.get("chunk_id", self.source_id_counter), - "title": title, - "description": description, - "url": url, - "message_id": message_id, - "subject": subject, - "sender": sender, - "date": date_str, - "thread_id": thread_id, - } - - self.source_id_counter += 1 - sources_list.append(source) + sources_list = self._build_chunk_sources_from_documents( + gmail_docs, + title_fn=_title_fn, + url_fn=_url_fn, + description_fn=_description_fn, + extra_fields_fn=_extra_fields_fn, + ) # Create result object result_object = { @@ -1907,7 +1802,7 @@ class ConnectorService: "sources": sources_list, } - return result_object, gmail_chunks + return result_object, gmail_docs async def search_confluence( self, @@ -1932,7 +1827,7 @@ class ConnectorService: Returns: tuple: (sources_info, langchain_documents) """ - confluence_chunks = await self._combined_rrf_search( + confluence_docs = await self._combined_rrf_search( query_text=user_query, search_space_id=search_space_id, document_type="CONFLUENCE_CONNECTOR", @@ -1942,7 +1837,7 @@ class ConnectorService: ) # Early return if no results - if not confluence_chunks: + if not confluence_docs: return { "id": 40, "name": "Confluence", @@ -1950,41 +1845,25 @@ class ConnectorService: "sources": [], }, [] - # Process each chunk and create sources directly without deduplication - sources_list = [] - async with self.counter_lock: - for _i, chunk in enumerate(confluence_chunks): - # Extract document metadata - document = chunk.get("document", {}) - metadata = document.get("metadata", {}) + def _title_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + page_title = metadata.get("page_title", "Untitled Page") + space_key = metadata.get("space_key", "") + title = f"Confluence: {page_title}" + if space_key: + title += f" ({space_key})" + return title - # Extract Confluence-specific metadata - page_title = metadata.get("page_title", "Untitled Page") - page_id = metadata.get("page_id", "") - space_key = metadata.get("space_key", "") + def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + page_id = metadata.get("page_id", "") + base_url = metadata.get("base_url", "") + return f"{base_url}/pages/{page_id}" if base_url and page_id else "" - # Create a more descriptive title for Confluence pages - title = f"Confluence: {page_title}" - if space_key: - title += f" ({space_key})" - - # Create a more descriptive description for Confluence pages - description = chunk.get("content", "") - - # For URL, we can use a placeholder or construct a URL to the Confluence page if available - url = "" # TODO: Add base_url to metadata - if page_id: - url = f"{metadata.get('base_url')}/pages/{page_id}" - - source = { - "id": chunk.get("chunk_id", self.source_id_counter), - "title": title, - "description": description, - "url": url, - } - - self.source_id_counter += 1 - sources_list.append(source) + sources_list = self._build_chunk_sources_from_documents( + confluence_docs, + title_fn=_title_fn, + url_fn=_url_fn, + description_fn=lambda chunk, _doc_info, _metadata: chunk.get("content", ""), + ) # Create result object result_object = { @@ -1994,7 +1873,7 @@ class ConnectorService: "sources": sources_list, } - return result_object, confluence_chunks + return result_object, confluence_docs async def search_clickup( self, @@ -2019,7 +1898,7 @@ class ConnectorService: Returns: tuple: (sources_info, langchain_documents) """ - clickup_chunks = await self._combined_rrf_search( + clickup_docs = await self._combined_rrf_search( query_text=user_query, search_space_id=search_space_id, document_type="CLICKUP_CONNECTOR", @@ -2029,7 +1908,7 @@ class ConnectorService: ) # Early return if no results - if not clickup_chunks: + if not clickup_docs: return { "id": 31, "name": "ClickUp Tasks", @@ -2037,62 +1916,48 @@ class ConnectorService: "sources": [], }, [] - sources_list = [] + def _title_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + return metadata.get("task_name", "ClickUp Task") - for chunk in clickup_chunks: - # Extract document metadata - document = chunk.get("document", {}) - metadata = document.get("metadata", {}) + def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + return metadata.get("task_url", "") or "" - # Extract ClickUp task information from metadata - task_name = metadata.get("task_name", "Unknown Task") - task_id = metadata.get("task_id", "") - task_url = metadata.get("task_url", "") - task_status = metadata.get("task_status", "Unknown") - task_priority = metadata.get("task_priority", "Unknown") - task_assignees = metadata.get("task_assignees", []) - task_due_date = metadata.get("task_due_date", "") - task_list_name = metadata.get("task_list_name", "") - task_space_name = metadata.get("task_space_name", "") + def _description_fn( + _chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> str: + parts = [] + if metadata.get("task_status"): + parts.append(f"Status: {metadata.get('task_status')}") + if metadata.get("task_priority"): + parts.append(f"Priority: {metadata.get('task_priority')}") + if metadata.get("task_due_date"): + parts.append(f"Due: {metadata.get('task_due_date')}") + if metadata.get("task_list_name"): + parts.append(f"List: {metadata.get('task_list_name')}") + if metadata.get("task_space_name"): + parts.append(f"Space: {metadata.get('task_space_name')}") + return " | ".join(parts) if parts else "ClickUp Task" - # Create description from task details - description_parts = [] - if task_status: - description_parts.append(f"Status: {task_status}") - if task_priority: - description_parts.append(f"Priority: {task_priority}") - if task_assignees: - assignee_names = [ - assignee.get("username", "Unknown") for assignee in task_assignees - ] - description_parts.append(f"Assignees: {', '.join(assignee_names)}") - if task_due_date: - description_parts.append(f"Due: {task_due_date}") - if task_list_name: - description_parts.append(f"List: {task_list_name}") - if task_space_name: - description_parts.append(f"Space: {task_space_name}") - - description = ( - " | ".join(description_parts) if description_parts else "ClickUp Task" - ) - - source = { - "id": chunk.get("chunk_id", self.source_id_counter), - "title": task_name, - "description": description, - "url": task_url, - "task_id": task_id, - "status": task_status, - "priority": task_priority, - "assignees": task_assignees, - "due_date": task_due_date, - "list_name": task_list_name, - "space_name": task_space_name, + def _extra_fields_fn( + _chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> dict[str, Any]: + return { + "task_id": metadata.get("task_id", ""), + "status": metadata.get("task_status", ""), + "priority": metadata.get("task_priority", ""), + "assignees": metadata.get("task_assignees", []), + "due_date": metadata.get("task_due_date", ""), + "list_name": metadata.get("task_list_name", ""), + "space_name": metadata.get("task_space_name", ""), } - self.source_id_counter += 1 - sources_list.append(source) + sources_list = self._build_chunk_sources_from_documents( + clickup_docs, + title_fn=_title_fn, + url_fn=_url_fn, + description_fn=_description_fn, + extra_fields_fn=_extra_fields_fn, + ) # Create result object result_object = { @@ -2102,7 +1967,7 @@ class ConnectorService: "sources": sources_list, } - return result_object, clickup_chunks + return result_object, clickup_docs async def search_linkup( self, @@ -2248,7 +2113,7 @@ class ConnectorService: Returns: tuple: (sources_info, langchain_documents) """ - discord_chunks = await self._combined_rrf_search( + discord_docs = await self._combined_rrf_search( query_text=user_query, search_space_id=search_space_id, document_type="DISCORD_CONNECTOR", @@ -2258,7 +2123,7 @@ class ConnectorService: ) # Early return if no results - if not discord_chunks: + if not discord_docs: return { "id": 11, "name": "Discord", @@ -2266,44 +2131,29 @@ class ConnectorService: "sources": [], }, [] - # Process each chunk and create sources directly without deduplication - sources_list = [] - async with self.counter_lock: - for _, chunk in enumerate(discord_chunks): - # Extract document metadata - document = chunk.get("document", {}) - metadata = document.get("metadata", {}) + def _title_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + channel_name = metadata.get("channel_name", "Unknown Channel") + message_date = metadata.get("start_date", "") + title = f"Discord: {channel_name}" + if message_date: + title += f" ({message_date})" + return title - # Create a mapped source entry with Discord-specific metadata - channel_name = metadata.get("channel_name", "Unknown Channel") - channel_id = metadata.get("channel_id", "") - message_date = metadata.get("start_date", "") + def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + channel_id = metadata.get("channel_id", "") + guild_id = metadata.get("guild_id", "") + if guild_id and channel_id: + return f"https://discord.com/channels/{guild_id}/{channel_id}" + if channel_id: + return f"https://discord.com/channels/@me/{channel_id}" + return "" - # Create a more descriptive title for Discord messages - title = f"Discord: {channel_name}" - if message_date: - title += f" ({message_date})" - - # Create a more descriptive description for Discord messages - description = chunk.get("content", "") - - url = "" - guild_id = metadata.get("guild_id", "") - if guild_id and channel_id: - url = f"https://discord.com/channels/{guild_id}/{channel_id}" - elif channel_id: - # Fallback for DM channels or when guild_id is not available - url = f"https://discord.com/channels/@me/{channel_id}" - - source = { - "id": chunk.get("chunk_id", self.source_id_counter), - "title": title, - "description": description, - "url": url, - } - - self.source_id_counter += 1 - sources_list.append(source) + sources_list = self._build_chunk_sources_from_documents( + discord_docs, + title_fn=_title_fn, + url_fn=_url_fn, + description_fn=lambda chunk, _doc_info, _metadata: chunk.get("content", ""), + ) # Create result object result_object = { @@ -2313,7 +2163,7 @@ class ConnectorService: "sources": sources_list, } - return result_object, discord_chunks + return result_object, discord_docs async def search_luma( self, @@ -2338,7 +2188,7 @@ class ConnectorService: Returns: tuple: (sources_info, langchain_documents) """ - luma_chunks = await self._combined_rrf_search( + luma_docs = await self._combined_rrf_search( query_text=user_query, search_space_id=search_space_id, document_type="LUMA_CONNECTOR", @@ -2348,7 +2198,7 @@ class ConnectorService: ) # Early return if no results - if not luma_chunks: + if not luma_docs: return { "id": 33, "name": "Luma Events", @@ -2356,104 +2206,63 @@ class ConnectorService: "sources": [], }, [] - # Process each chunk and create sources directly without deduplication - sources_list = [] - async with self.counter_lock: - for _i, chunk in enumerate(luma_chunks): - # Extract document metadata - document = chunk.get("document", {}) - metadata = document.get("metadata", {}) + def _title_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + event_name = metadata.get("event_name", "Untitled Event") + start_time = metadata.get("start_time", "") + return ( + f"Luma: {event_name} ({start_time})" + if start_time + else f"Luma: {event_name}" + ) - # Extract Luma-specific metadata - event_id = metadata.get("event_id", "") - event_name = metadata.get("event_name", "Untitled Event") - event_url = metadata.get("event_url", "") - start_time = metadata.get("start_time", "") - end_time = metadata.get("end_time", "") - location_name = metadata.get("location_name", "") - location_address = metadata.get("location_address", "") - meeting_url = metadata.get("meeting_url", "") - timezone = metadata.get("timezone", "") - visibility = metadata.get("visibility", "") + def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + return metadata.get("event_url", "") or "" - # Create a more descriptive title for Luma events - title = f"Luma: {event_name}" - if start_time: - # Format the start time for display - try: - if "T" in start_time: - from datetime import datetime + def _description_fn( + chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> str: + description = chunk.get("content", "") + info_parts = [] + if metadata.get("location_name"): + info_parts.append(f"Venue: {metadata.get('location_name')}") + elif metadata.get("location_address"): + info_parts.append(f"Location: {metadata.get('location_address')}") + if metadata.get("meeting_url"): + info_parts.append("Online Event") + if metadata.get("end_time"): + info_parts.append(f"Ends: {metadata.get('end_time')}") + if metadata.get("timezone"): + info_parts.append(f"TZ: {metadata.get('timezone')}") + if metadata.get("visibility"): + info_parts.append( + f"Visibility: {str(metadata.get('visibility')).title()}" + ) + if info_parts: + description = (description + " | " + " | ".join(info_parts)).strip(" |") + return description - start_dt = datetime.fromisoformat( - start_time.replace("Z", "+00:00") - ) - formatted_time = start_dt.strftime("%Y-%m-%d %H:%M") - title += f" ({formatted_time})" - else: - title += f" ({start_time})" - except Exception: - title += f" ({start_time})" + def _extra_fields_fn( + _chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> dict[str, Any]: + return { + "event_id": metadata.get("event_id", ""), + "event_name": metadata.get("event_name", "Untitled Event"), + "start_time": metadata.get("start_time", ""), + "end_time": metadata.get("end_time", ""), + "location_name": metadata.get("location_name", ""), + "location_address": metadata.get("location_address", ""), + "meeting_url": metadata.get("meeting_url", ""), + "timezone": metadata.get("timezone", ""), + "visibility": metadata.get("visibility", ""), + } - description = chunk.get("content", "") - - # Add event info to description - info_parts = [] - if location_name: - info_parts.append(f"Venue: {location_name}") - elif location_address: - info_parts.append(f"Location: {location_address}") - - if meeting_url: - info_parts.append("Online Event") - - if end_time: - try: - if "T" in end_time: - from datetime import datetime - - end_dt = datetime.fromisoformat( - end_time.replace("Z", "+00:00") - ) - formatted_end = end_dt.strftime("%Y-%m-%d %H:%M") - info_parts.append(f"Ends: {formatted_end}") - else: - info_parts.append(f"Ends: {end_time}") - except Exception: - info_parts.append(f"Ends: {end_time}") - - if timezone: - info_parts.append(f"TZ: {timezone}") - - if visibility: - info_parts.append(f"Visibility: {visibility.title()}") - - if info_parts: - if description: - description += f" | {' | '.join(info_parts)}" - else: - description = " | ".join(info_parts) - - # Use the Luma event URL if available - url = event_url if event_url else "" - - source = { - "id": chunk.get("chunk_id", self.source_id_counter), - "title": title, - "description": description, - "url": url, - "event_id": event_id, - "event_name": event_name, - "start_time": start_time, - "end_time": end_time, - "location_name": location_name, - "location_address": location_address, - "meeting_url": meeting_url, - "timezone": timezone, - "visibility": visibility, - } - - self.source_id_counter += 1 - sources_list.append(source) + sources_list = self._build_chunk_sources_from_documents( + luma_docs, + title_fn=_title_fn, + url_fn=_url_fn, + description_fn=_description_fn, + extra_fields_fn=_extra_fields_fn, + ) # Create result object result_object = { @@ -2463,7 +2272,7 @@ class ConnectorService: "sources": sources_list, } - return result_object, luma_chunks + return result_object, luma_docs async def search_elasticsearch( self, @@ -2488,7 +2297,7 @@ class ConnectorService: Returns: tuple: (sources_info, langchain_documents) """ - elasticsearch_chunks = await self._combined_rrf_search( + elasticsearch_docs = await self._combined_rrf_search( query_text=user_query, search_space_id=search_space_id, document_type="ELASTICSEARCH_CONNECTOR", @@ -2498,7 +2307,7 @@ class ConnectorService: ) # Early return if no results - if not elasticsearch_chunks: + if not elasticsearch_docs: return { "id": 34, "name": "Elasticsearch", @@ -2506,58 +2315,40 @@ class ConnectorService: "sources": [], }, [] - # Process each chunk and create sources directly without deduplication - sources_list = [] - async with self.counter_lock: - for _i, chunk in enumerate(elasticsearch_chunks): - # Extract document metadata - document = chunk.get("document", {}) - metadata = document.get("metadata", {}) + def _title_fn(doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + title = doc_info.get("title", "Elasticsearch Document") + es_index = metadata.get("elasticsearch_index", "") + return f"{title} (Index: {es_index})" if es_index else title - # Extract Elasticsearch-specific metadata - es_id = metadata.get("elasticsearch_id", "") - es_index = metadata.get("elasticsearch_index", "") - es_score = metadata.get("elasticsearch_score", "") + def _description_fn( + chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> str: + description = self._chunk_preview(chunk.get("content", ""), limit=150) + info_parts = [] + if metadata.get("elasticsearch_id"): + info_parts.append(f"ID: {metadata.get('elasticsearch_id')}") + if metadata.get("elasticsearch_score"): + info_parts.append(f"Score: {metadata.get('elasticsearch_score')}") + if info_parts: + description = (description + " | " + " | ".join(info_parts)).strip(" |") + return description - # Create a more descriptive title for Elasticsearch documents - title = document.get("title", "Elasticsearch Document") - if es_index: - title = f"{title} (Index: {es_index})" + def _extra_fields_fn( + _chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] + ) -> dict[str, Any]: + return { + "elasticsearch_id": metadata.get("elasticsearch_id", ""), + "elasticsearch_index": metadata.get("elasticsearch_index", ""), + "elasticsearch_score": metadata.get("elasticsearch_score", ""), + } - # Create a more descriptive description for Elasticsearch documents - description = chunk.get("content", "")[:150] - if len(description) == 150: - description += "..." - - # Add Elasticsearch info to description - info_parts = [] - if es_id: - info_parts.append(f"ID: {es_id}") - if es_score: - info_parts.append(f"Score: {es_score}") - - if info_parts: - if description: - description = f"{description} | {' | '.join(info_parts)}" - else: - description = " | ".join(info_parts) - - # For URL, we could construct a URL to view the document if we have the Elasticsearch UI URL - url = "" - # Could be extended to include Kibana or other UI URLs if configured - - source = { - "id": chunk.get("chunk_id", self.source_id_counter), - "title": title, - "description": description, - "url": url, - "elasticsearch_id": es_id, - "elasticsearch_index": es_index, - "elasticsearch_score": es_score, - } - - self.source_id_counter += 1 - sources_list.append(source) + sources_list = self._build_chunk_sources_from_documents( + elasticsearch_docs, + title_fn=_title_fn, + url_fn=lambda _doc_info, _metadata: "", + description_fn=_description_fn, + extra_fields_fn=_extra_fields_fn, + ) # Create result object result_object = { @@ -2567,7 +2358,7 @@ class ConnectorService: "sources": sources_list, } - return result_object, elasticsearch_chunks + return result_object, elasticsearch_docs async def search_bookstack( self, @@ -2594,7 +2385,7 @@ class ConnectorService: Returns: tuple: (sources_info, langchain_documents) """ - bookstack_chunks = await self._combined_rrf_search( + bookstack_docs = await self._combined_rrf_search( query_text=user_query, search_space_id=search_space_id, document_type="BOOKSTACK_CONNECTOR", @@ -2604,7 +2395,7 @@ class ConnectorService: ) # Early return if no results - if not bookstack_chunks: + if not bookstack_docs: return { "id": 50, "name": "BookStack", @@ -2612,41 +2403,27 @@ class ConnectorService: "sources": [], }, [] - # Process each chunk and create sources directly without deduplication - sources_list = [] - async with self.counter_lock: - for _i, chunk in enumerate(bookstack_chunks): - # Extract document metadata - document = chunk.get("document", {}) - metadata = document.get("metadata", {}) + def _title_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + page_name = metadata.get("page_name", "Untitled Page") + return f"BookStack: {page_name}" - # Extract BookStack-specific metadata - page_name = metadata.get("page_name", "Untitled Page") - page_slug = metadata.get("page_slug", "") - book_slug = metadata.get("book_slug", "") - base_url = metadata.get("base_url", "") - page_url = metadata.get("page_url", "") + def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: + page_slug = metadata.get("page_slug", "") + book_slug = metadata.get("book_slug", "") + base_url = metadata.get("base_url", "") + page_url = metadata.get("page_url", "") + if page_url: + return page_url + if base_url and book_slug and page_slug: + return f"{base_url}/books/{book_slug}/page/{page_slug}" + return "" - # Create a more descriptive title for BookStack pages - title = f"BookStack: {page_name}" - - # Create description from content - description = chunk.get("content", "") - - # Build URL to the BookStack page - url = page_url - if not url and base_url and book_slug and page_slug: - url = f"{base_url}/books/{book_slug}/page/{page_slug}" - - source = { - "id": chunk.get("chunk_id", self.source_id_counter), - "title": title, - "description": description, - "url": url, - } - - self.source_id_counter += 1 - sources_list.append(source) + sources_list = self._build_chunk_sources_from_documents( + bookstack_docs, + title_fn=_title_fn, + url_fn=_url_fn, + description_fn=lambda chunk, _doc_info, _metadata: chunk.get("content", ""), + ) # Create result object result_object = { @@ -2656,4 +2433,4 @@ class ConnectorService: "sources": sources_list, } - return result_object, bookstack_chunks + return result_object, bookstack_docs diff --git a/surfsense_backend/app/services/reranker_service.py b/surfsense_backend/app/services/reranker_service.py index bea74e3cb..ea541ee38 100644 --- a/surfsense_backend/app/services/reranker_service.py +++ b/surfsense_backend/app/services/reranker_service.py @@ -22,14 +22,18 @@ class RerankerService: self, query_text: str, documents: list[dict[str, Any]] ) -> list[dict[str, Any]]: """ - Rerank documents using the configured reranker + Rerank documents using the configured reranker. + + Documents can be either: + - Document-grouped (new format): Has `document_id`, `chunks` list, and `content` (concatenated) + - Chunk-based (legacy format): Individual chunks with `chunk_id` and `content` Args: query_text: The query text to use for reranking documents: List of document dictionaries to rerank Returns: - List[Dict[str, Any]]: Reranked documents + List[Dict[str, Any]]: Reranked documents with preserved structure """ if not self.reranker_instance or not documents: return documents @@ -38,7 +42,9 @@ class RerankerService: # Create Document objects for the rerankers library reranker_docs = [] for i, doc in enumerate(documents): - chunk_id = doc.get("chunk_id", f"chunk_{i}") + # Use document_id for matching + doc_id = doc.get("document_id") or f"doc_{i}" + # Use concatenated content for reranking content = doc.get("content", "") score = doc.get("score", 0.0) document_info = doc.get("document", {}) @@ -46,12 +52,14 @@ class RerankerService: reranker_docs.append( RerankerDocument( text=content, - doc_id=chunk_id, + doc_id=doc_id, metadata={ "document_id": document_info.get("id", ""), "document_title": document_info.get("title", ""), "document_type": document_info.get("document_type", ""), "rrf_score": score, + # Track original index for fallback matching + "original_index": i, }, ) ) @@ -62,21 +70,33 @@ class RerankerService: ) # Process the results from the reranker - # Convert to serializable dictionaries + # Convert to serializable dictionaries while preserving full structure serialized_results = [] for result in reranking_results.results: - # Find the original document by id - original_doc = next( - ( - doc - for doc in documents - if doc.get("chunk_id") == result.document.doc_id - ), - None, - ) + result_doc_id = result.document.doc_id + original_index = result.document.metadata.get("original_index") + + # Find the original document by document_id + original_doc = None + for doc in documents: + if doc.get("document_id") == result_doc_id: + original_doc = doc + break + + # Fallback to original index if ID matching fails + if ( + original_doc is None + and original_index is not None + and 0 <= original_index < len(documents) + ): + original_doc = documents[original_index] + if original_doc: - # Create a new document with the reranked score + # Create a deep copy to preserve the full structure including chunks reranked_doc = original_doc.copy() + # Preserve chunks list if present (important for citation formatting) + if "chunks" in original_doc: + reranked_doc["chunks"] = original_doc["chunks"] reranked_doc["score"] = float(result.score) reranked_doc["rank"] = result.rank serialized_results.append(reranked_doc)