diff --git a/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py b/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py index 7e6bf9994..2ab1dc704 100644 --- a/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py +++ b/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py @@ -614,10 +614,23 @@ async def search_knowledge_base_async( connectors = _normalize_connectors(connectors_to_search, available_connectors) # --- Optimization 1: skip connectors that have zero indexed documents --- + # Native Google types must also match their legacy Composio equivalents + # (old documents may still carry the Composio type until re-indexed). + _NATIVE_TO_LEGACY: dict[str, str] = { + "GOOGLE_DRIVE_FILE": "COMPOSIO_GOOGLE_DRIVE_CONNECTOR", + "GOOGLE_GMAIL_CONNECTOR": "COMPOSIO_GMAIL_CONNECTOR", + "GOOGLE_CALENDAR_CONNECTOR": "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR", + } + if available_document_types: doc_types_set = set(available_document_types) before_count = len(connectors) - connectors = [c for c in connectors if c in doc_types_set] + connectors = [ + c + for c in connectors + if c in doc_types_set + or _NATIVE_TO_LEGACY.get(c, "") in doc_types_set + ] skipped = before_count - len(connectors) if skipped: perf.info( @@ -793,6 +806,10 @@ async def search_knowledge_base_async( deduplicated.append(doc) + # Sort by RRF score so the most relevant documents from ANY connector + # appear first, preventing budget truncation from hiding top results. + deduplicated.sort(key=lambda d: d.get("score", 0), reverse=True) + output_budget = _compute_tool_output_budget(max_input_tokens) result = format_documents_for_context(deduplicated, max_chars=output_budget) diff --git a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py index 2a866e411..2dc6fd18b 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py @@ -385,6 +385,22 @@ async def index_google_calendar_events( session, unique_identifier_hash ) + # Fallback: legacy Composio hash + if not existing_document: + legacy_hash = generate_unique_identifier_hash( + DocumentType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR, + event_id, + search_space_id, + ) + existing_document = await check_document_by_unique_identifier( + session, legacy_hash + ) + if existing_document: + existing_document.unique_identifier_hash = unique_identifier_hash + if existing_document.document_type == DocumentType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR: + existing_document.document_type = DocumentType.GOOGLE_CALENDAR_CONNECTOR + logger.info(f"Migrated legacy Composio Calendar document: {event_id}") + if existing_document: # Document exists - check if content has changed if existing_document.content_hash == content_hash: diff --git a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py index 07e2614e3..a865bee28 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py @@ -859,10 +859,22 @@ async def _create_pending_document_for_file( DocumentType.GOOGLE_DRIVE_FILE, file_id, search_space_id ) - # Check if document exists + # Check if document exists (primary hash first, then legacy Composio hash) existing_document = await check_document_by_unique_identifier( session, unique_identifier_hash ) + if not existing_document: + legacy_hash = generate_unique_identifier_hash( + DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, file_id, search_space_id + ) + existing_document = await check_document_by_unique_identifier( + session, legacy_hash + ) + if existing_document: + existing_document.unique_identifier_hash = unique_identifier_hash + if existing_document.document_type == DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR: + existing_document.document_type = DocumentType.GOOGLE_DRIVE_FILE + logger.info(f"Migrated legacy Composio document to native type: {file_id}") if existing_document: # Check if this is a rename-only update (content unchanged) @@ -958,12 +970,24 @@ async def _check_rename_only_update( ) existing_document = await check_document_by_unique_identifier(session, primary_hash) - # If not found by primary hash, try searching by metadata (for legacy documents) + # Fallback: legacy Composio hash + if not existing_document: + legacy_hash = generate_unique_identifier_hash( + DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, file_id, search_space_id + ) + existing_document = await check_document_by_unique_identifier( + session, legacy_hash + ) + + # Fallback: metadata search (covers old filename-based hashes) if not existing_document: result = await session.execute( select(Document).where( Document.search_space_id == search_space_id, - Document.document_type == DocumentType.GOOGLE_DRIVE_FILE, + Document.document_type.in_([ + DocumentType.GOOGLE_DRIVE_FILE, + DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, + ]), cast(Document.document_metadata["google_drive_file_id"], String) == file_id, ) @@ -972,6 +996,14 @@ async def _check_rename_only_update( if existing_document: logger.debug(f"Found legacy document by metadata for file_id: {file_id}") + # Migrate legacy Composio document to native type + if existing_document: + if existing_document.unique_identifier_hash != primary_hash: + existing_document.unique_identifier_hash = primary_hash + if existing_document.document_type == DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR: + existing_document.document_type = DocumentType.GOOGLE_DRIVE_FILE + logger.info(f"Migrated legacy Composio Drive document: {file_id}") + if not existing_document: # New file, needs full processing return False, None @@ -1186,12 +1218,24 @@ async def _remove_document(session: AsyncSession, file_id: str, search_space_id: session, unique_identifier_hash ) - # If not found, search by metadata (for legacy documents with filename-based hash) + # Fallback: legacy Composio hash + if not existing_document: + legacy_hash = generate_unique_identifier_hash( + DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, file_id, search_space_id + ) + existing_document = await check_document_by_unique_identifier( + session, legacy_hash + ) + + # Fallback: metadata search (covers old filename-based hashes, both native and Composio) if not existing_document: result = await session.execute( select(Document).where( Document.search_space_id == search_space_id, - Document.document_type == DocumentType.GOOGLE_DRIVE_FILE, + Document.document_type.in_([ + DocumentType.GOOGLE_DRIVE_FILE, + DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, + ]), cast(Document.document_metadata["google_drive_file_id"], String) == file_id, ) diff --git a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py index 88b25adaa..edd03ae02 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py @@ -312,6 +312,22 @@ async def index_google_gmail_messages( session, unique_identifier_hash ) + # Fallback: legacy Composio hash + if not existing_document: + legacy_hash = generate_unique_identifier_hash( + DocumentType.COMPOSIO_GMAIL_CONNECTOR, + message_id, + search_space_id, + ) + existing_document = await check_document_by_unique_identifier( + session, legacy_hash + ) + if existing_document: + existing_document.unique_identifier_hash = unique_identifier_hash + if existing_document.document_type == DocumentType.COMPOSIO_GMAIL_CONNECTOR: + existing_document.document_type = DocumentType.GOOGLE_GMAIL_CONNECTOR + logger.info(f"Migrated legacy Composio Gmail document: {message_id}") + if existing_document: # Document exists - check if content has changed if existing_document.content_hash == content_hash: