diff --git a/surfsense_backend/app/agents/new_chat/chat_deepagent.py b/surfsense_backend/app/agents/new_chat/chat_deepagent.py index 713545b0b..53da9b607 100644 --- a/surfsense_backend/app/agents/new_chat/chat_deepagent.py +++ b/surfsense_backend/app/agents/new_chat/chat_deepagent.py @@ -65,10 +65,10 @@ _CONNECTOR_TYPE_TO_SEARCHABLE: dict[str, str] = { "BOOKSTACK_CONNECTOR": "BOOKSTACK_CONNECTOR", "CIRCLEBACK_CONNECTOR": "CIRCLEBACK", # Connector type differs from document type "OBSIDIAN_CONNECTOR": "OBSIDIAN_CONNECTOR", - # Composio connectors - "COMPOSIO_GOOGLE_DRIVE_CONNECTOR": "COMPOSIO_GOOGLE_DRIVE_CONNECTOR", - "COMPOSIO_GMAIL_CONNECTOR": "COMPOSIO_GMAIL_CONNECTOR", - "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR": "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR", + # Composio connectors (unified to native document types) + "COMPOSIO_GOOGLE_DRIVE_CONNECTOR": "GOOGLE_DRIVE_FILE", + "COMPOSIO_GMAIL_CONNECTOR": "GOOGLE_GMAIL_CONNECTOR", + "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR": "GOOGLE_CALENDAR_CONNECTOR", } # Document types that don't come from SearchSourceConnector but should always be searchable diff --git a/surfsense_backend/app/agents/new_chat/tools/google_drive/create_file.py b/surfsense_backend/app/agents/new_chat/tools/google_drive/create_file.py index 9371b1827..ad7c1b9b1 100644 --- a/surfsense_backend/app/agents/new_chat/tools/google_drive/create_file.py +++ b/surfsense_backend/app/agents/new_chat/tools/google_drive/create_file.py @@ -157,14 +157,18 @@ def create_create_google_drive_file_tool( from app.db import SearchSourceConnector, SearchSourceConnectorType + _DRIVE_TYPES = [ + SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR, + SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, + ] + if final_connector_id is not None: result = await db_session.execute( select(SearchSourceConnector).filter( SearchSourceConnector.id == final_connector_id, SearchSourceConnector.search_space_id == search_space_id, SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR, + SearchSourceConnector.connector_type.in_(_DRIVE_TYPES), ) ) connector = result.scalars().first() @@ -179,8 +183,7 @@ def create_create_google_drive_file_tool( select(SearchSourceConnector).filter( SearchSourceConnector.search_space_id == search_space_id, SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR, + SearchSourceConnector.connector_type.in_(_DRIVE_TYPES), ) ) connector = result.scalars().first() @@ -194,8 +197,19 @@ def create_create_google_drive_file_tool( logger.info( f"Creating Google Drive file: name='{final_name}', type='{final_file_type}', connector={actual_connector_id}" ) + + pre_built_creds = None + if connector.connector_type == SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR: + from app.utils.google_credentials import build_composio_credentials + + cca_id = connector.config.get("composio_connected_account_id") + if cca_id: + pre_built_creds = build_composio_credentials(cca_id) + client = GoogleDriveClient( - session=db_session, connector_id=actual_connector_id + session=db_session, + connector_id=actual_connector_id, + credentials=pre_built_creds, ) try: created = await client.create_file( diff --git a/surfsense_backend/app/agents/new_chat/tools/google_drive/trash_file.py b/surfsense_backend/app/agents/new_chat/tools/google_drive/trash_file.py index 917ba3376..4d1c3c8de 100644 --- a/surfsense_backend/app/agents/new_chat/tools/google_drive/trash_file.py +++ b/surfsense_backend/app/agents/new_chat/tools/google_drive/trash_file.py @@ -151,13 +151,17 @@ def create_delete_google_drive_file_tool( from app.db import SearchSourceConnector, SearchSourceConnectorType + _DRIVE_TYPES = [ + SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR, + SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, + ] + result = await db_session.execute( select(SearchSourceConnector).filter( SearchSourceConnector.id == final_connector_id, SearchSourceConnector.search_space_id == search_space_id, SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR, + SearchSourceConnector.connector_type.in_(_DRIVE_TYPES), ) ) connector = result.scalars().first() @@ -170,7 +174,20 @@ def create_delete_google_drive_file_tool( logger.info( f"Deleting Google Drive file: file_id='{final_file_id}', connector={final_connector_id}" ) - client = GoogleDriveClient(session=db_session, connector_id=connector.id) + + pre_built_creds = None + if connector.connector_type == SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR: + from app.utils.google_credentials import build_composio_credentials + + cca_id = connector.config.get("composio_connected_account_id") + if cca_id: + pre_built_creds = build_composio_credentials(cca_id) + + client = GoogleDriveClient( + session=db_session, + connector_id=connector.id, + credentials=pre_built_creds, + ) try: await client.trash_file(file_id=final_file_id) except HttpError as http_err: diff --git a/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py b/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py index a683b1c17..7e6bf9994 100644 --- a/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py +++ b/surfsense_backend/app/agents/new_chat/tools/knowledge_base.py @@ -60,7 +60,7 @@ def _is_degenerate_query(query: str) -> bool: async def _browse_recent_documents( search_space_id: int, - document_type: str | None, + document_type: str | list[str] | None, top_k: int, start_date: datetime | None, end_date: datetime | None, @@ -83,14 +83,22 @@ async def _browse_recent_documents( base_conditions = [Document.search_space_id == search_space_id] if document_type is not None: - if isinstance(document_type, str): - try: - doc_type_enum = DocumentType[document_type] - base_conditions.append(Document.document_type == doc_type_enum) - except KeyError: - return [] + type_list = document_type if isinstance(document_type, list) else [document_type] + doc_type_enums = [] + for dt in type_list: + if isinstance(dt, str): + try: + doc_type_enums.append(DocumentType[dt]) + except KeyError: + pass + else: + doc_type_enums.append(dt) + if not doc_type_enums: + return [] + if len(doc_type_enums) == 1: + base_conditions.append(Document.document_type == doc_type_enums[0]) else: - base_conditions.append(Document.document_type == document_type) + base_conditions.append(Document.document_type.in_(doc_type_enums)) if start_date is not None: base_conditions.append(Document.updated_at >= start_date) @@ -195,10 +203,6 @@ _ALL_CONNECTORS: list[str] = [ "CRAWLED_URL", "CIRCLEBACK", "OBSIDIAN_CONNECTOR", - # Composio connectors - "COMPOSIO_GOOGLE_DRIVE_CONNECTOR", - "COMPOSIO_GMAIL_CONNECTOR", - "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR", ] # Human-readable descriptions for each connector type @@ -228,10 +232,6 @@ CONNECTOR_DESCRIPTIONS: dict[str, str] = { "BOOKSTACK_CONNECTOR": "BookStack pages (personal documentation)", "CIRCLEBACK": "Circleback meeting notes, transcripts, and action items", "OBSIDIAN_CONNECTOR": "Obsidian vault notes and markdown files (personal notes)", - # Composio connectors - "COMPOSIO_GOOGLE_DRIVE_CONNECTOR": "Google Drive files via Composio (personal cloud storage)", - "COMPOSIO_GMAIL_CONNECTOR": "Gmail emails via Composio (personal emails)", - "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR": "Google Calendar events via Composio (personal calendar)", } @@ -654,6 +654,20 @@ async def search_knowledge_base_async( ) browse_connectors = connectors if connectors else [None] # type: ignore[list-item] + # Expand native Google types to include legacy Composio equivalents + # so old documents remain searchable until re-indexed. + _LEGACY_ALIASES: dict[str, str] = { + "GOOGLE_DRIVE_FILE": "COMPOSIO_GOOGLE_DRIVE_CONNECTOR", + "GOOGLE_GMAIL_CONNECTOR": "COMPOSIO_GMAIL_CONNECTOR", + "GOOGLE_CALENDAR_CONNECTOR": "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR", + } + expanded_browse = [] + for c in browse_connectors: + if c is not None and c in _LEGACY_ALIASES: + expanded_browse.append([c, _LEGACY_ALIASES[c]]) + else: + expanded_browse.append(c) + browse_results = await asyncio.gather( *[ _browse_recent_documents( @@ -663,7 +677,7 @@ async def search_knowledge_base_async( start_date=resolved_start_date, end_date=resolved_end_date, ) - for c in browse_connectors + for c in expanded_browse ] ) for docs in browse_results: diff --git a/surfsense_backend/app/connectors/composio_gmail_connector.py b/surfsense_backend/app/connectors/composio_gmail_connector.py deleted file mode 100644 index e675085db..000000000 --- a/surfsense_backend/app/connectors/composio_gmail_connector.py +++ /dev/null @@ -1,719 +0,0 @@ -""" -Composio Gmail Connector Module. - -Provides Gmail specific methods for data retrieval and indexing via Composio. -""" - -import logging -import time -from collections.abc import Awaitable, Callable -from datetime import UTC, datetime -from typing import Any - -from bs4 import BeautifulSoup -from markdownify import markdownify as md -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.future import select -from sqlalchemy.orm import selectinload - -from app.connectors.composio_connector import ComposioConnector -from app.db import Document, DocumentStatus, DocumentType -from app.services.composio_service import TOOLKIT_TO_DOCUMENT_TYPE -from app.services.llm_service import get_user_long_context_llm -from app.services.task_logging_service import TaskLoggingService -from app.tasks.connector_indexers.base import ( - calculate_date_range, - check_duplicate_document_by_hash, - safe_set_chunks, -) -from app.utils.document_converters import ( - create_document_chunks, - embed_text, - generate_content_hash, - generate_document_summary, - generate_unique_identifier_hash, -) - -# Heartbeat configuration -HeartbeatCallbackType = Callable[[int], Awaitable[None]] -HEARTBEAT_INTERVAL_SECONDS = 30 - -logger = logging.getLogger(__name__) - - -def get_current_timestamp() -> datetime: - """Get the current timestamp with timezone for updated_at field.""" - return datetime.now(UTC) - - -async def check_document_by_unique_identifier( - session: AsyncSession, unique_identifier_hash: str -) -> Document | None: - """Check if a document with the given unique identifier hash already exists.""" - existing_doc_result = await session.execute( - select(Document) - .options(selectinload(Document.chunks)) - .where(Document.unique_identifier_hash == unique_identifier_hash) - ) - return existing_doc_result.scalars().first() - - -async def update_connector_last_indexed( - session: AsyncSession, - connector, - update_last_indexed: bool = True, -) -> None: - """Update the last_indexed_at timestamp for a connector.""" - if update_last_indexed: - connector.last_indexed_at = datetime.now(UTC) - logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}") - - -class ComposioGmailConnector(ComposioConnector): - """ - Gmail specific Composio connector. - - Provides methods for listing messages, getting message details, and formatting - Gmail messages from Gmail via Composio. - """ - - async def list_gmail_messages( - self, - query: str = "", - max_results: int = 50, - page_token: str | None = None, - ) -> tuple[list[dict[str, Any]], str | None, int | None, str | None]: - """ - List Gmail messages via Composio with pagination support. - - Args: - query: Gmail search query. - max_results: Maximum number of messages per page (default: 50). - page_token: Optional pagination token for next page. - - Returns: - Tuple of (messages list, next_page_token, result_size_estimate, error message). - """ - connected_account_id = await self.get_connected_account_id() - if not connected_account_id: - return [], None, None, "No connected account ID found" - - entity_id = await self.get_entity_id() - service = await self._get_service() - return await service.get_gmail_messages( - connected_account_id=connected_account_id, - entity_id=entity_id, - query=query, - max_results=max_results, - page_token=page_token, - ) - - async def get_gmail_message_detail( - self, message_id: str - ) -> tuple[dict[str, Any] | None, str | None]: - """ - Get full details of a Gmail message via Composio. - - Args: - message_id: Gmail message ID. - - Returns: - Tuple of (message details, error message). - """ - connected_account_id = await self.get_connected_account_id() - if not connected_account_id: - return None, "No connected account ID found" - - entity_id = await self.get_entity_id() - service = await self._get_service() - return await service.get_gmail_message_detail( - connected_account_id=connected_account_id, - entity_id=entity_id, - message_id=message_id, - ) - - @staticmethod - def _html_to_markdown(html: str) -> str: - """Convert HTML (especially email layouts with nested tables) to clean markdown.""" - soup = BeautifulSoup(html, "html.parser") - for tag in soup.find_all(["style", "script", "img"]): - tag.decompose() - for tag in soup.find_all( - ["table", "thead", "tbody", "tfoot", "tr", "td", "th"] - ): - tag.unwrap() - return md(str(soup)).strip() - - def format_gmail_message_to_markdown(self, message: dict[str, Any]) -> str: - """ - Format a Gmail message to markdown. - - Args: - message: Message object from Composio's GMAIL_FETCH_EMAILS response. - Composio structure: messageId, messageText, messageTimestamp, - payload.headers, labelIds, attachmentList - - Returns: - Formatted markdown string. - """ - try: - # Composio uses 'messageId' (camelCase) - message_id = message.get("messageId", "") or message.get("id", "") - label_ids = message.get("labelIds", []) - - # Extract headers from payload - payload = message.get("payload", {}) - headers = payload.get("headers", []) - - # Parse headers into a dict - header_dict = {} - for header in headers: - name = header.get("name", "").lower() - value = header.get("value", "") - header_dict[name] = value - - # Extract key information - subject = header_dict.get("subject", "No Subject") - from_email = header_dict.get("from", "Unknown Sender") - to_email = header_dict.get("to", "Unknown Recipient") - # Composio provides messageTimestamp directly - date_str = message.get("messageTimestamp", "") or header_dict.get( - "date", "Unknown Date" - ) - - # Build markdown content - markdown_content = f"# {subject}\n\n" - markdown_content += f"**From:** {from_email}\n" - markdown_content += f"**To:** {to_email}\n" - markdown_content += f"**Date:** {date_str}\n" - - if label_ids: - markdown_content += f"**Labels:** {', '.join(label_ids)}\n" - - markdown_content += "\n---\n\n" - - # Composio provides full message text in 'messageText' which is often raw HTML - message_text = message.get("messageText", "") - if message_text: - message_text = self._html_to_markdown(message_text) - markdown_content += f"## Content\n\n{message_text}\n\n" - else: - # Fallback to snippet if no messageText - snippet = message.get("snippet", "") - if snippet: - markdown_content += f"## Preview\n\n{snippet}\n\n" - - # Add attachment info if present - attachments = message.get("attachmentList", []) - if attachments: - markdown_content += "## Attachments\n\n" - for att in attachments: - att_name = att.get("filename", att.get("name", "Unknown")) - markdown_content += f"- {att_name}\n" - markdown_content += "\n" - - # Add message metadata - markdown_content += "## Message Details\n\n" - markdown_content += f"- **Message ID:** {message_id}\n" - - return markdown_content - - except Exception as e: - return f"Error formatting message to markdown: {e!s}" - - -# ============ Indexer Functions ============ - - -async def _analyze_gmail_messages_phase1( - session: AsyncSession, - messages: list[dict[str, Any]], - composio_connector: ComposioGmailConnector, - connector_id: int, - search_space_id: int, - user_id: str, -) -> tuple[list[dict[str, Any]], int, int]: - """ - Phase 1: Analyze all messages, create pending documents. - Makes ALL documents visible in the UI immediately with pending status. - - Returns: - Tuple of (messages_to_process, documents_skipped, duplicate_content_count) - """ - messages_to_process = [] - documents_skipped = 0 - duplicate_content_count = 0 - - for message in messages: - try: - # Composio uses 'messageId' (camelCase), not 'id' - message_id = message.get("messageId", "") or message.get("id", "") - if not message_id: - documents_skipped += 1 - continue - - # Extract message info from Composio response - payload = message.get("payload", {}) - headers = payload.get("headers", []) - - subject = "No Subject" - sender = "Unknown Sender" - date_str = message.get("messageTimestamp", "Unknown Date") - - for header in headers: - name = header.get("name", "").lower() - value = header.get("value", "") - if name == "subject": - subject = value - elif name == "from": - sender = value - elif name == "date": - date_str = value - - # Format to markdown using the full message data - markdown_content = composio_connector.format_gmail_message_to_markdown( - message - ) - - # Check for empty content - if not markdown_content.strip(): - logger.warning(f"Skipping Gmail message with no content: {subject}") - documents_skipped += 1 - continue - - # Generate unique identifier - document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["gmail"]) - unique_identifier_hash = generate_unique_identifier_hash( - document_type, f"gmail_{message_id}", search_space_id - ) - - content_hash = generate_content_hash(markdown_content, search_space_id) - - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - - # Get label IDs and thread_id from Composio response - label_ids = message.get("labelIds", []) - thread_id = message.get("threadId", "") or message.get("thread_id", "") - - if existing_document: - if existing_document.content_hash == content_hash: - # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state( - existing_document.status, DocumentStatus.READY - ): - existing_document.status = DocumentStatus.ready() - documents_skipped += 1 - continue - - # Queue existing document for update (will be set to processing in Phase 2) - messages_to_process.append( - { - "document": existing_document, - "is_new": False, - "markdown_content": markdown_content, - "content_hash": content_hash, - "message_id": message_id, - "thread_id": thread_id, - "subject": subject, - "sender": sender, - "date_str": date_str, - "label_ids": label_ids, - } - ) - continue - - # Document doesn't exist by unique_identifier_hash - # Check if a document with the same content_hash exists (from standard connector) - with session.no_autoflush: - duplicate_by_content = await check_duplicate_document_by_hash( - session, content_hash - ) - - if duplicate_by_content: - logger.info( - f"Message {subject} already indexed by another connector " - f"(existing document ID: {duplicate_by_content.id}, " - f"type: {duplicate_by_content.document_type}). Skipping." - ) - duplicate_content_count += 1 - documents_skipped += 1 - continue - - # Create new document with PENDING status (visible in UI immediately) - document = Document( - search_space_id=search_space_id, - title=subject, - document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["gmail"]), - document_metadata={ - "message_id": message_id, - "thread_id": thread_id, - "subject": subject, - "sender": sender, - "date": date_str, - "labels": label_ids, - "connector_id": connector_id, - "toolkit_id": "gmail", - "source": "composio", - }, - content="Pending...", # Placeholder until processed - content_hash=unique_identifier_hash, # Temporary unique value - updated when ready - unique_identifier_hash=unique_identifier_hash, - embedding=None, - chunks=[], # Empty at creation - safe for async - status=DocumentStatus.pending(), # Pending until processing starts - updated_at=get_current_timestamp(), - created_by_id=user_id, - connector_id=connector_id, - ) - session.add(document) - - messages_to_process.append( - { - "document": document, - "is_new": True, - "markdown_content": markdown_content, - "content_hash": content_hash, - "message_id": message_id, - "thread_id": thread_id, - "subject": subject, - "sender": sender, - "date_str": date_str, - "label_ids": label_ids, - } - ) - - except Exception as e: - logger.error(f"Error in Phase 1 for message: {e!s}", exc_info=True) - documents_skipped += 1 - continue - - return messages_to_process, documents_skipped, duplicate_content_count - - -async def _process_gmail_messages_phase2( - session: AsyncSession, - messages_to_process: list[dict[str, Any]], - connector_id: int, - search_space_id: int, - user_id: str, - enable_summary: bool = False, - on_heartbeat_callback: HeartbeatCallbackType | None = None, -) -> tuple[int, int]: - """ - Phase 2: Process each document one by one. - Each document transitions: pending → processing → ready/failed - - Returns: - Tuple of (documents_indexed, documents_failed) - """ - documents_indexed = 0 - documents_failed = 0 - last_heartbeat_time = time.time() - - for item in messages_to_process: - # Send heartbeat periodically - if on_heartbeat_callback: - current_time = time.time() - if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = current_time - - document = item["document"] - try: - # Set to PROCESSING and commit - shows "processing" in UI for THIS document only - document.status = DocumentStatus.processing() - await session.commit() - - # Heavy processing (LLM, embeddings, chunks) - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm and enable_summary: - document_metadata_for_summary = { - "message_id": item["message_id"], - "thread_id": item["thread_id"], - "subject": item["subject"], - "sender": item["sender"], - "document_type": "Gmail Message (Composio)", - } - summary_content, summary_embedding = await generate_document_summary( - item["markdown_content"], user_llm, document_metadata_for_summary - ) - else: - summary_content = f"Gmail: {item['subject']}\n\nFrom: {item['sender']}\nDate: {item['date_str']}\n\n{item['markdown_content']}" - summary_embedding = embed_text(summary_content) - - chunks = await create_document_chunks(item["markdown_content"]) - - # Update document to READY with actual content - document.title = item["subject"] - document.content = summary_content - document.content_hash = item["content_hash"] - document.embedding = summary_embedding - document.document_metadata = { - "message_id": item["message_id"], - "thread_id": item["thread_id"], - "subject": item["subject"], - "sender": item["sender"], - "date": item["date_str"], - "labels": item["label_ids"], - "connector_id": connector_id, - "source": "composio", - } - await safe_set_chunks(session, document, chunks) - document.updated_at = get_current_timestamp() - document.status = DocumentStatus.ready() - - documents_indexed += 1 - - # Batch commit every 10 documents (for ready status updates) - if documents_indexed % 10 == 0: - logger.info( - f"Committing batch: {documents_indexed} Gmail messages processed so far" - ) - await session.commit() - - except Exception as e: - logger.error(f"Error processing Gmail message: {e!s}", exc_info=True) - # Mark document as failed with reason (visible in UI) - try: - document.status = DocumentStatus.failed(str(e)) - document.updated_at = get_current_timestamp() - except Exception as status_error: - logger.error( - f"Failed to update document status to failed: {status_error}" - ) - documents_failed += 1 - continue - - return documents_indexed, documents_failed - - -async def index_composio_gmail( - session: AsyncSession, - connector, - connector_id: int, - search_space_id: int, - user_id: str, - start_date: str | None, - end_date: str | None, - task_logger: TaskLoggingService, - log_entry, - update_last_indexed: bool = True, - max_items: int = 1000, - on_heartbeat_callback: HeartbeatCallbackType | None = None, -) -> tuple[int, str]: - """Index Gmail messages via Composio with real-time document status updates.""" - try: - composio_connector = ComposioGmailConnector(session, connector_id) - - # Normalize date values - handle "undefined" strings from frontend - if start_date == "undefined" or start_date == "": - start_date = None - if end_date == "undefined" or end_date == "": - end_date = None - - # Use provided dates directly if both are provided, otherwise calculate from last_indexed_at - if start_date is not None and end_date is not None: - start_date_str = start_date - end_date_str = end_date - else: - start_date_str, end_date_str = calculate_date_range( - connector, start_date, end_date, default_days_back=365 - ) - - # Build query with date range - query_parts = [] - if start_date_str: - query_parts.append(f"after:{start_date_str.replace('-', '/')}") - if end_date_str: - query_parts.append(f"before:{end_date_str.replace('-', '/')}") - query = " ".join(query_parts) if query_parts else "" - - logger.info( - f"Gmail query for connector {connector_id}: '{query}' " - f"(start_date={start_date_str}, end_date={end_date_str})" - ) - - await task_logger.log_task_progress( - log_entry, - f"Fetching Gmail messages via Composio for connector {connector_id}", - {"stage": "fetching_messages"}, - ) - - # ======================================================================= - # FETCH ALL MESSAGES FIRST - # ======================================================================= - batch_size = 50 - page_token = None - all_messages = [] - result_size_estimate = None - last_heartbeat_time = time.time() - - while len(all_messages) < max_items: - # Send heartbeat periodically - if on_heartbeat_callback: - current_time = time.time() - if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: - await on_heartbeat_callback(len(all_messages)) - last_heartbeat_time = current_time - - remaining = max_items - len(all_messages) - current_batch_size = min(batch_size, remaining) - - ( - messages, - next_token, - result_size_estimate_batch, - error, - ) = await composio_connector.list_gmail_messages( - query=query, - max_results=current_batch_size, - page_token=page_token, - ) - - if error: - await task_logger.log_task_failure( - log_entry, f"Failed to fetch Gmail messages: {error}", {} - ) - return 0, f"Failed to fetch Gmail messages: {error}" - - if not messages: - break - - if result_size_estimate is None and result_size_estimate_batch is not None: - result_size_estimate = result_size_estimate_batch - logger.info( - f"Gmail API estimated {result_size_estimate} total messages for query: '{query}'" - ) - - all_messages.extend(messages) - logger.info( - f"Fetched {len(messages)} messages (total: {len(all_messages)})" - ) - - if not next_token or len(messages) < current_batch_size: - break - - page_token = next_token - - if not all_messages: - success_msg = "No Gmail messages found in the specified date range" - await task_logger.log_task_success( - log_entry, success_msg, {"messages_count": 0} - ) - await update_connector_last_indexed(session, connector, update_last_indexed) - await session.commit() - return ( - 0, - None, - ) # Return None (not error) when no items found - this is success with 0 items - - logger.info(f"Found {len(all_messages)} Gmail messages to index via Composio") - - # ======================================================================= - # PHASE 1: Analyze all messages, create pending documents - # This makes ALL documents visible in the UI immediately with pending status - # ======================================================================= - await task_logger.log_task_progress( - log_entry, - f"Phase 1: Creating pending documents for {len(all_messages)} messages", - {"stage": "phase1_pending"}, - ) - - ( - messages_to_process, - documents_skipped, - duplicate_content_count, - ) = await _analyze_gmail_messages_phase1( - session=session, - messages=all_messages, - composio_connector=composio_connector, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - ) - - # Commit all pending documents - they all appear in UI now - new_documents_count = len([m for m in messages_to_process if m["is_new"]]) - if new_documents_count > 0: - logger.info(f"Phase 1: Committing {new_documents_count} pending documents") - await session.commit() - - # ======================================================================= - # PHASE 2: Process each document one by one - # Each document transitions: pending → processing → ready/failed - # ======================================================================= - logger.info(f"Phase 2: Processing {len(messages_to_process)} documents") - await task_logger.log_task_progress( - log_entry, - f"Phase 2: Processing {len(messages_to_process)} documents", - {"stage": "phase2_processing"}, - ) - - documents_indexed, documents_failed = await _process_gmail_messages_phase2( - session=session, - messages_to_process=messages_to_process, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - enable_summary=getattr(connector, "enable_summary", False), - on_heartbeat_callback=on_heartbeat_callback, - ) - - # CRITICAL: Always update timestamp so Electric SQL syncs - await update_connector_last_indexed(session, connector, update_last_indexed) - - # Final commit to ensure all documents are persisted - logger.info(f"Final commit: Total {documents_indexed} Gmail messages processed") - try: - await session.commit() - logger.info( - "Successfully committed all Composio Gmail document changes to database" - ) - except Exception as e: - # Handle any remaining integrity errors gracefully - if ( - "duplicate key value violates unique constraint" in str(e).lower() - or "uniqueviolationerror" in str(e).lower() - ): - logger.warning( - f"Duplicate content_hash detected during final commit. " - f"Rolling back and continuing. Error: {e!s}" - ) - await session.rollback() - else: - raise - - # Build warning message if there were issues - warning_parts = [] - if duplicate_content_count > 0: - warning_parts.append(f"{duplicate_content_count} duplicate") - if documents_failed > 0: - warning_parts.append(f"{documents_failed} failed") - warning_message = ", ".join(warning_parts) if warning_parts else None - - await task_logger.log_task_success( - log_entry, - f"Successfully completed Gmail indexing via Composio for connector {connector_id}", - { - "documents_indexed": documents_indexed, - "documents_skipped": documents_skipped, - "documents_failed": documents_failed, - "duplicate_content_count": duplicate_content_count, - }, - ) - - logger.info( - f"Composio Gmail indexing completed: {documents_indexed} ready, " - f"{documents_skipped} skipped, {documents_failed} failed " - f"({duplicate_content_count} duplicate content)" - ) - return documents_indexed, warning_message - - except Exception as e: - logger.error(f"Failed to index Gmail via Composio: {e!s}", exc_info=True) - return 0, f"Failed to index Gmail via Composio: {e!s}" diff --git a/surfsense_backend/app/connectors/composio_google_calendar_connector.py b/surfsense_backend/app/connectors/composio_google_calendar_connector.py deleted file mode 100644 index 6344f9f38..000000000 --- a/surfsense_backend/app/connectors/composio_google_calendar_connector.py +++ /dev/null @@ -1,566 +0,0 @@ -""" -Composio Google Calendar Connector Module. - -Provides Google Calendar specific methods for data retrieval and indexing via Composio. -""" - -import logging -import time -from collections.abc import Awaitable, Callable -from datetime import UTC, datetime -from typing import Any - -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.future import select -from sqlalchemy.orm import selectinload - -from app.connectors.composio_connector import ComposioConnector -from app.db import Document, DocumentStatus, DocumentType -from app.services.composio_service import TOOLKIT_TO_DOCUMENT_TYPE -from app.services.llm_service import get_user_long_context_llm -from app.services.task_logging_service import TaskLoggingService -from app.tasks.connector_indexers.base import ( - calculate_date_range, - check_duplicate_document_by_hash, - safe_set_chunks, -) -from app.utils.document_converters import ( - create_document_chunks, - embed_text, - generate_content_hash, - generate_document_summary, - generate_unique_identifier_hash, -) - -# Heartbeat configuration -HeartbeatCallbackType = Callable[[int], Awaitable[None]] -HEARTBEAT_INTERVAL_SECONDS = 30 - -logger = logging.getLogger(__name__) - - -def get_current_timestamp() -> datetime: - """Get the current timestamp with timezone for updated_at field.""" - return datetime.now(UTC) - - -async def check_document_by_unique_identifier( - session: AsyncSession, unique_identifier_hash: str -) -> Document | None: - """Check if a document with the given unique identifier hash already exists.""" - existing_doc_result = await session.execute( - select(Document) - .options(selectinload(Document.chunks)) - .where(Document.unique_identifier_hash == unique_identifier_hash) - ) - return existing_doc_result.scalars().first() - - -async def update_connector_last_indexed( - session: AsyncSession, - connector, - update_last_indexed: bool = True, -) -> None: - """Update the last_indexed_at timestamp for a connector.""" - if update_last_indexed: - connector.last_indexed_at = datetime.now(UTC) - logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}") - - -class ComposioGoogleCalendarConnector(ComposioConnector): - """ - Google Calendar specific Composio connector. - - Provides methods for listing calendar events and formatting them from - Google Calendar via Composio. - """ - - async def list_calendar_events( - self, - time_min: str | None = None, - time_max: str | None = None, - max_results: int = 250, - ) -> tuple[list[dict[str, Any]], str | None]: - """ - List Google Calendar events via Composio. - - Args: - time_min: Start time (RFC3339 format). - time_max: End time (RFC3339 format). - max_results: Maximum number of events. - - Returns: - Tuple of (events list, error message). - """ - connected_account_id = await self.get_connected_account_id() - if not connected_account_id: - return [], "No connected account ID found" - - entity_id = await self.get_entity_id() - service = await self._get_service() - return await service.get_calendar_events( - connected_account_id=connected_account_id, - entity_id=entity_id, - time_min=time_min, - time_max=time_max, - max_results=max_results, - ) - - def format_calendar_event_to_markdown(self, event: dict[str, Any]) -> str: - """ - Format a Google Calendar event to markdown. - - Args: - event: Event object from Google Calendar API. - - Returns: - Formatted markdown string. - """ - try: - # Extract basic event information - summary = event.get("summary", "No Title") - description = event.get("description", "") - location = event.get("location", "") - - # Extract start and end times - start = event.get("start", {}) - end = event.get("end", {}) - - start_time = start.get("dateTime") or start.get("date", "") - end_time = end.get("dateTime") or end.get("date", "") - - # Format times for display - def format_time(time_str: str) -> str: - if not time_str: - return "Unknown" - try: - if "T" in time_str: - dt = datetime.fromisoformat(time_str.replace("Z", "+00:00")) - return dt.strftime("%Y-%m-%d %H:%M") - return time_str - except Exception: - return time_str - - start_formatted = format_time(start_time) - end_formatted = format_time(end_time) - - # Extract attendees - attendees = event.get("attendees", []) - attendee_list = [] - for attendee in attendees: - email = attendee.get("email", "") - display_name = attendee.get("displayName", email) - response_status = attendee.get("responseStatus", "") - attendee_list.append(f"- {display_name} ({response_status})") - - # Build markdown content - markdown_content = f"# {summary}\n\n" - markdown_content += f"**Start:** {start_formatted}\n" - markdown_content += f"**End:** {end_formatted}\n" - - if location: - markdown_content += f"**Location:** {location}\n" - - markdown_content += "\n" - - if description: - markdown_content += f"## Description\n\n{description}\n\n" - - if attendee_list: - markdown_content += "## Attendees\n\n" - markdown_content += "\n".join(attendee_list) - markdown_content += "\n\n" - - # Add event metadata - markdown_content += "## Event Details\n\n" - markdown_content += f"- **Event ID:** {event.get('id', 'Unknown')}\n" - markdown_content += f"- **Created:** {event.get('created', 'Unknown')}\n" - markdown_content += f"- **Updated:** {event.get('updated', 'Unknown')}\n" - - return markdown_content - - except Exception as e: - return f"Error formatting event to markdown: {e!s}" - - -# ============ Indexer Functions ============ - - -async def index_composio_google_calendar( - session: AsyncSession, - connector, - connector_id: int, - search_space_id: int, - user_id: str, - start_date: str | None, - end_date: str | None, - task_logger: TaskLoggingService, - log_entry, - update_last_indexed: bool = True, - max_items: int = 2500, - on_heartbeat_callback: HeartbeatCallbackType | None = None, -) -> tuple[int, str]: - """Index Google Calendar events via Composio.""" - try: - composio_connector = ComposioGoogleCalendarConnector(session, connector_id) - - await task_logger.log_task_progress( - log_entry, - f"Fetching Google Calendar events via Composio for connector {connector_id}", - {"stage": "fetching_events"}, - ) - - # Normalize date values - handle "undefined" strings from frontend - if start_date == "undefined" or start_date == "": - start_date = None - if end_date == "undefined" or end_date == "": - end_date = None - - # Use provided dates directly if both are provided, otherwise calculate from last_indexed_at - # This ensures user-selected dates are respected (matching non-Composio Calendar connector behavior) - if start_date is not None and end_date is not None: - # User provided both dates - use them directly - start_date_str = start_date - end_date_str = end_date - else: - # Calculate date range with defaults (uses last_indexed_at or 365 days back) - # This ensures indexing works even when user doesn't specify dates - start_date_str, end_date_str = calculate_date_range( - connector, start_date, end_date, default_days_back=365 - ) - - # Build time range for API call - time_min = f"{start_date_str}T00:00:00Z" - time_max = f"{end_date_str}T23:59:59Z" - - logger.info( - f"Google Calendar query for connector {connector_id}: " - f"(start_date={start_date_str}, end_date={end_date_str})" - ) - - events, error = await composio_connector.list_calendar_events( - time_min=time_min, - time_max=time_max, - max_results=max_items, - ) - - if error: - await task_logger.log_task_failure( - log_entry, f"Failed to fetch Calendar events: {error}", {} - ) - return 0, f"Failed to fetch Calendar events: {error}" - - if not events: - success_msg = "No Google Calendar events found in the specified date range" - await task_logger.log_task_success( - log_entry, success_msg, {"events_count": 0} - ) - # CRITICAL: Update timestamp even when no events found so Electric SQL syncs and UI shows indexed status - await update_connector_last_indexed(session, connector, update_last_indexed) - await session.commit() - return ( - 0, - None, - ) # Return None (not error) when no items found - this is success with 0 items - - logger.info(f"Found {len(events)} Google Calendar events to index via Composio") - - documents_indexed = 0 - documents_skipped = 0 - documents_failed = 0 # Track events that failed processing - duplicate_content_count = ( - 0 # Track events skipped due to duplicate content_hash - ) - last_heartbeat_time = time.time() - - # ======================================================================= - # PHASE 1: Analyze all events, create pending documents - # This makes ALL documents visible in the UI immediately with pending status - # ======================================================================= - events_to_process = [] # List of dicts with document and event data - new_documents_created = False - - for event in events: - try: - # Handle both standard Google API and potential Composio variations - event_id = event.get("id", "") or event.get("eventId", "") - summary = ( - event.get("summary", "") or event.get("title", "") or "No Title" - ) - - if not event_id: - documents_skipped += 1 - continue - - # Format to markdown - markdown_content = composio_connector.format_calendar_event_to_markdown( - event - ) - - # Generate unique identifier - document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googlecalendar"]) - unique_identifier_hash = generate_unique_identifier_hash( - document_type, f"calendar_{event_id}", search_space_id - ) - - content_hash = generate_content_hash(markdown_content, search_space_id) - - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - - # Extract event times - start = event.get("start", {}) - end = event.get("end", {}) - start_time = start.get("dateTime") or start.get("date", "") - end_time = end.get("dateTime") or end.get("date", "") - location = event.get("location", "") - - if existing_document: - if existing_document.content_hash == content_hash: - # Ensure status is ready (might have been stuck in processing/pending) - if not DocumentStatus.is_state( - existing_document.status, DocumentStatus.READY - ): - existing_document.status = DocumentStatus.ready() - documents_skipped += 1 - continue - - # Queue existing document for update (will be set to processing in Phase 2) - events_to_process.append( - { - "document": existing_document, - "is_new": False, - "markdown_content": markdown_content, - "content_hash": content_hash, - "event_id": event_id, - "summary": summary, - "start_time": start_time, - "end_time": end_time, - "location": location, - } - ) - continue - - # Document doesn't exist by unique_identifier_hash - # Check if a document with the same content_hash exists (from standard connector) - with session.no_autoflush: - duplicate_by_content = await check_duplicate_document_by_hash( - session, content_hash - ) - - if duplicate_by_content: - logger.info( - f"Event {summary} already indexed by another connector " - f"(existing document ID: {duplicate_by_content.id}, " - f"type: {duplicate_by_content.document_type}). Skipping." - ) - duplicate_content_count += 1 - documents_skipped += 1 - continue - - # Create new document with PENDING status (visible in UI immediately) - document = Document( - search_space_id=search_space_id, - title=summary, - document_type=DocumentType( - TOOLKIT_TO_DOCUMENT_TYPE["googlecalendar"] - ), - document_metadata={ - "event_id": event_id, - "summary": summary, - "start_time": start_time, - "end_time": end_time, - "location": location, - "connector_id": connector_id, - "toolkit_id": "googlecalendar", - "source": "composio", - }, - content="Pending...", # Placeholder until processed - content_hash=unique_identifier_hash, # Temporary unique value - updated when ready - unique_identifier_hash=unique_identifier_hash, - embedding=None, - chunks=[], # Empty at creation - safe for async - status=DocumentStatus.pending(), # Pending until processing starts - updated_at=get_current_timestamp(), - created_by_id=user_id, - connector_id=connector_id, - ) - session.add(document) - new_documents_created = True - - events_to_process.append( - { - "document": document, - "is_new": True, - "markdown_content": markdown_content, - "content_hash": content_hash, - "event_id": event_id, - "summary": summary, - "start_time": start_time, - "end_time": end_time, - "location": location, - } - ) - - except Exception as e: - logger.error(f"Error in Phase 1 for event: {e!s}", exc_info=True) - documents_failed += 1 - continue - - # Commit all pending documents - they all appear in UI now - if new_documents_created: - logger.info( - f"Phase 1: Committing {len([e for e in events_to_process if e['is_new']])} pending documents" - ) - await session.commit() - - # ======================================================================= - # PHASE 2: Process each document one by one - # Each document transitions: pending → processing → ready/failed - # ======================================================================= - logger.info(f"Phase 2: Processing {len(events_to_process)} documents") - - for item in events_to_process: - # Send heartbeat periodically - if on_heartbeat_callback: - current_time = time.time() - if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = current_time - - document = item["document"] - try: - # Set to PROCESSING and commit - shows "processing" in UI for THIS document only - document.status = DocumentStatus.processing() - await session.commit() - - # Heavy processing (LLM, embeddings, chunks) - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm and connector.enable_summary: - document_metadata_for_summary = { - "event_id": item["event_id"], - "summary": item["summary"], - "start_time": item["start_time"], - "document_type": "Google Calendar Event (Composio)", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - item["markdown_content"], - user_llm, - document_metadata_for_summary, - ) - else: - summary_content = ( - f"Calendar: {item['summary']}\n\n{item['markdown_content']}" - ) - summary_embedding = embed_text(summary_content) - - chunks = await create_document_chunks(item["markdown_content"]) - - # Update document to READY with actual content - document.title = item["summary"] - document.content = summary_content - document.content_hash = item["content_hash"] - document.embedding = summary_embedding - document.document_metadata = { - "event_id": item["event_id"], - "summary": item["summary"], - "start_time": item["start_time"], - "end_time": item["end_time"], - "location": item["location"], - "connector_id": connector_id, - "source": "composio", - } - await safe_set_chunks(session, document, chunks) - document.updated_at = get_current_timestamp() - document.status = DocumentStatus.ready() - - documents_indexed += 1 - - # Batch commit every 10 documents (for ready status updates) - if documents_indexed % 10 == 0: - logger.info( - f"Committing batch: {documents_indexed} Google Calendar events processed so far" - ) - await session.commit() - - except Exception as e: - logger.error(f"Error processing Calendar event: {e!s}", exc_info=True) - # Mark document as failed with reason (visible in UI) - try: - document.status = DocumentStatus.failed(str(e)) - document.updated_at = get_current_timestamp() - except Exception as status_error: - logger.error( - f"Failed to update document status to failed: {status_error}" - ) - documents_failed += 1 - continue - - # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs - # This ensures the UI shows "Last indexed" instead of "Never indexed" - await update_connector_last_indexed(session, connector, update_last_indexed) - - # Final commit to ensure all documents are persisted (safety net) - # This matches the pattern used in non-Composio Gmail indexer - logger.info( - f"Final commit: Total {documents_indexed} Google Calendar events processed" - ) - try: - await session.commit() - logger.info( - "Successfully committed all Composio Google Calendar document changes to database" - ) - except Exception as e: - # Handle any remaining integrity errors gracefully (race conditions, etc.) - if ( - "duplicate key value violates unique constraint" in str(e).lower() - or "uniqueviolationerror" in str(e).lower() - ): - logger.warning( - f"Duplicate content_hash detected during final commit. " - f"This may occur if the same event was indexed by multiple connectors. " - f"Rolling back and continuing. Error: {e!s}" - ) - await session.rollback() - # Don't fail the entire task - some documents may have been successfully indexed - else: - raise - - # Build warning message if there were issues - warning_parts = [] - if duplicate_content_count > 0: - warning_parts.append(f"{duplicate_content_count} duplicate") - if documents_failed > 0: - warning_parts.append(f"{documents_failed} failed") - warning_message = ", ".join(warning_parts) if warning_parts else None - - await task_logger.log_task_success( - log_entry, - f"Successfully completed Google Calendar indexing via Composio for connector {connector_id}", - { - "documents_indexed": documents_indexed, - "documents_skipped": documents_skipped, - "documents_failed": documents_failed, - "duplicate_content_count": duplicate_content_count, - }, - ) - - logger.info( - f"Composio Google Calendar indexing completed: {documents_indexed} ready, " - f"{documents_skipped} skipped, {documents_failed} failed " - f"({duplicate_content_count} duplicate content)" - ) - return documents_indexed, warning_message - - except Exception as e: - logger.error( - f"Failed to index Google Calendar via Composio: {e!s}", exc_info=True - ) - return 0, f"Failed to index Google Calendar via Composio: {e!s}" diff --git a/surfsense_backend/app/connectors/composio_google_drive_connector.py b/surfsense_backend/app/connectors/composio_google_drive_connector.py deleted file mode 100644 index 30ce4a77b..000000000 --- a/surfsense_backend/app/connectors/composio_google_drive_connector.py +++ /dev/null @@ -1,1634 +0,0 @@ -""" -Composio Google Drive Connector Module. - -Provides Google Drive specific methods for data retrieval and indexing via Composio. -""" - -import contextlib -import hashlib -import json -import logging -import os -import tempfile -import time -from collections.abc import Awaitable, Callable -from datetime import UTC, datetime -from pathlib import Path -from typing import Any - -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm.attributes import flag_modified - -from app.config import config -from app.connectors.composio_connector import ComposioConnector -from app.db import Document, DocumentStatus, DocumentType, Log -from app.services.composio_service import TOOLKIT_TO_DOCUMENT_TYPE -from app.services.llm_service import get_user_long_context_llm -from app.services.task_logging_service import TaskLoggingService -from app.tasks.connector_indexers.base import ( - check_duplicate_document_by_hash, - safe_set_chunks, -) -from app.utils.document_converters import ( - create_document_chunks, - embed_text, - generate_content_hash, - generate_document_summary, - generate_unique_identifier_hash, -) - -# Heartbeat configuration -HeartbeatCallbackType = Callable[[int], Awaitable[None]] -HEARTBEAT_INTERVAL_SECONDS = 30 - -logger = logging.getLogger(__name__) - - -# Binary file extensions that need file processor -BINARY_FILE_EXTENSIONS = { - ".pdf", - ".doc", - ".docx", - ".xls", - ".xlsx", - ".ppt", - ".pptx", - ".png", - ".jpg", - ".jpeg", - ".gif", - ".bmp", - ".tiff", - ".webp", - ".zip", - ".tar", - ".gz", - ".rar", - ".7z", - ".mp3", - ".mp4", - ".wav", - ".avi", - ".mov", - ".exe", - ".dll", - ".so", - ".bin", -} - -# Text file extensions that can be decoded as UTF-8 -TEXT_FILE_EXTENSIONS = { - ".txt", - ".md", - ".markdown", - ".json", - ".xml", - ".html", - ".htm", - ".css", - ".js", - ".ts", - ".py", - ".java", - ".c", - ".cpp", - ".h", - ".yaml", - ".yml", - ".toml", - ".ini", - ".cfg", - ".conf", - ".sh", - ".bash", - ".zsh", - ".fish", - ".sql", - ".csv", - ".tsv", - ".rst", - ".tex", - ".log", -} - - -def get_current_timestamp() -> datetime: - """Get the current timestamp with timezone for updated_at field.""" - return datetime.now(UTC) - - -def _is_binary_file(file_name: str, mime_type: str) -> bool: - """Check if a file is binary based on extension or mime type.""" - extension = Path(file_name).suffix.lower() - - # Check extension first - if extension in BINARY_FILE_EXTENSIONS: - return True - if extension in TEXT_FILE_EXTENSIONS: - return False - - # Check mime type - if mime_type: - if mime_type.startswith(("image/", "audio/", "video/", "application/pdf")): - return True - if mime_type.startswith(("text/", "application/json", "application/xml")): - return False - # Office documents - if ( - "spreadsheet" in mime_type - or "document" in mime_type - or "presentation" in mime_type - ): - return True - - # Default to text for unknown types - return False - - -class ComposioGoogleDriveConnector(ComposioConnector): - """ - Google Drive specific Composio connector. - - Provides methods for listing files, downloading content, and tracking changes - from Google Drive via Composio. - """ - - async def list_drive_files( - self, - folder_id: str | None = None, - page_token: str | None = None, - page_size: int = 100, - ) -> tuple[list[dict[str, Any]], str | None, str | None]: - """ - List files from Google Drive via Composio. - - Args: - folder_id: Optional folder ID to list contents of. - page_token: Pagination token. - page_size: Number of files per page. - - Returns: - Tuple of (files list, next_page_token, error message). - """ - connected_account_id = await self.get_connected_account_id() - if not connected_account_id: - return [], None, "No connected account ID found" - - entity_id = await self.get_entity_id() - service = await self._get_service() - return await service.get_drive_files( - connected_account_id=connected_account_id, - entity_id=entity_id, - folder_id=folder_id, - page_token=page_token, - page_size=page_size, - ) - - async def get_drive_file_content( - self, file_id: str, original_mime_type: str | None = None - ) -> tuple[bytes | None, str | None]: - """ - Download file content from Google Drive via Composio. - - Args: - file_id: Google Drive file ID. - original_mime_type: Original MIME type (used to detect Google Workspace files for export). - - Returns: - Tuple of (file content bytes, error message). - """ - connected_account_id = await self.get_connected_account_id() - if not connected_account_id: - return None, "No connected account ID found" - - entity_id = await self.get_entity_id() - service = await self._get_service() - return await service.get_drive_file_content( - connected_account_id=connected_account_id, - entity_id=entity_id, - file_id=file_id, - original_mime_type=original_mime_type, - ) - - async def get_file_metadata( - self, file_id: str - ) -> tuple[dict[str, Any] | None, str | None]: - """ - Get metadata for a specific file from Google Drive. - - Args: - file_id: The ID of the file to get metadata for. - - Returns: - Tuple of (metadata dict, error message). - """ - connected_account_id = await self.get_connected_account_id() - if not connected_account_id: - return None, "No connected account ID found" - - entity_id = await self.get_entity_id() - service = await self._get_service() - return await service.get_file_metadata( - connected_account_id=connected_account_id, - entity_id=entity_id, - file_id=file_id, - ) - - async def get_drive_start_page_token(self) -> tuple[str | None, str | None]: - """ - Get the starting page token for Google Drive change tracking. - - Returns: - Tuple of (start_page_token, error message). - """ - connected_account_id = await self.get_connected_account_id() - if not connected_account_id: - return None, "No connected account ID found" - - entity_id = await self.get_entity_id() - service = await self._get_service() - return await service.get_drive_start_page_token( - connected_account_id=connected_account_id, - entity_id=entity_id, - ) - - async def list_drive_changes( - self, - page_token: str | None = None, - page_size: int = 100, - include_removed: bool = True, - ) -> tuple[list[dict[str, Any]], str | None, str | None]: - """ - List changes in Google Drive since the given page token. - - Args: - page_token: Page token from previous sync (optional). - page_size: Number of changes per page. - include_removed: Whether to include removed items. - - Returns: - Tuple of (changes list, new_start_page_token, error message). - """ - connected_account_id = await self.get_connected_account_id() - if not connected_account_id: - return [], None, "No connected account ID found" - - entity_id = await self.get_entity_id() - service = await self._get_service() - return await service.list_drive_changes( - connected_account_id=connected_account_id, - entity_id=entity_id, - page_token=page_token, - page_size=page_size, - include_removed=include_removed, - ) - - -# ============ File Processing Utilities ============ - - -async def _process_file_content( - content: bytes | str, - file_name: str, - file_id: str, - mime_type: str, - search_space_id: int, - user_id: str, - session: AsyncSession, - task_logger: TaskLoggingService, - log_entry: Log, - processing_errors: list[str], -) -> str: - """ - Process file content and return markdown text. - - For binary files (PDFs, images, etc.), uses Surfsense's ETL service. - For text files, decodes as UTF-8. - - Args: - content: File content as bytes or string - file_name: Name of the file - file_id: Google Drive file ID - mime_type: MIME type of the file - search_space_id: Search space ID - user_id: User ID - session: Database session - task_logger: Task logging service - log_entry: Log entry for tracking - processing_errors: List to append errors to - - Returns: - Markdown content string - """ - # Ensure content is bytes - if isinstance(content, str): - content = content.encode("utf-8") - - # Check if this is a binary file based on extension or MIME type - is_binary = _is_binary_file(file_name, mime_type) - - if is_binary: - # Use ETL service for binary files (PDF, Office docs, etc.) - temp_file_path = None - try: - # Get file extension - extension = Path(file_name).suffix or ".bin" - - # Write to temp file - with tempfile.NamedTemporaryFile( - delete=False, suffix=extension - ) as tmp_file: - tmp_file.write(content) - temp_file_path = tmp_file.name - - # Use the configured ETL service to extract text - extracted_text = await _extract_text_with_etl( - temp_file_path, file_name, task_logger, log_entry - ) - - if extracted_text: - return extracted_text - else: - # Fallback if extraction fails - logger.warning(f"ETL returned empty for binary file {file_name}") - return f"# {file_name}\n\n[Binary file - text extraction failed]\n\n**File ID:** {file_id}\n**Type:** {mime_type}\n" - - except Exception as e: - error_msg = f"Error processing binary file {file_name}: {e!s}" - logger.error(error_msg) - processing_errors.append(error_msg) - return f"# {file_name}\n\n[Binary file - processing error]\n\n**File ID:** {file_id}\n**Type:** {mime_type}\n" - finally: - # Cleanup temp file - if temp_file_path and os.path.exists(temp_file_path): - with contextlib.suppress(Exception): - os.unlink(temp_file_path) - else: - # Text file - try to decode as UTF-8 - try: - return content.decode("utf-8") - except UnicodeDecodeError: - # Try other encodings - for encoding in ["latin-1", "cp1252", "iso-8859-1"]: - try: - return content.decode(encoding) - except UnicodeDecodeError: - continue - - # If all encodings fail, treat as binary - error_msg = f"Could not decode text file {file_name} with any encoding" - logger.warning(error_msg) - processing_errors.append(error_msg) - return f"# {file_name}\n\n[File content could not be decoded]\n\n**File ID:** {file_id}\n**Type:** {mime_type}\n" - - -async def _extract_text_with_etl( - file_path: str, - file_name: str, - task_logger: TaskLoggingService, - log_entry: Log, -) -> str | None: - """ - Extract text from a file using the configured ETL service. - - Args: - file_path: Path to the file - file_name: Name of the file - task_logger: Task logging service - log_entry: Log entry for tracking - - Returns: - Extracted text as markdown, or None if extraction fails - """ - import warnings - from logging import ERROR, getLogger - - etl_service = config.ETL_SERVICE - logger.debug( - f"[_extract_text_with_etl] START - file_path={file_path}, file_name={file_name}, etl_service={etl_service}" - ) - - try: - if etl_service == "UNSTRUCTURED": - logger.debug("[_extract_text_with_etl] Using UNSTRUCTURED ETL") - from langchain_unstructured import UnstructuredLoader - - from app.utils.document_converters import convert_document_to_markdown - - loader = UnstructuredLoader( - file_path, - mode="elements", - post_processors=[], - languages=["eng"], - include_orig_elements=False, - include_metadata=False, - strategy="auto", - ) - - docs = await loader.aload() - logger.debug( - f"[_extract_text_with_etl] UNSTRUCTURED loaded {len(docs) if docs else 0} docs" - ) - if docs: - result = await convert_document_to_markdown(docs) - logger.debug( - f"[_extract_text_with_etl] UNSTRUCTURED result: {len(result) if result else 0} chars" - ) - return result - logger.debug("[_extract_text_with_etl] UNSTRUCTURED returned no docs") - return None - - elif etl_service == "LLAMACLOUD": - logger.debug("[_extract_text_with_etl] Using LLAMACLOUD ETL") - from app.tasks.document_processors.file_processors import ( - parse_with_llamacloud_retry, - ) - - # Estimate pages (rough estimate based on file size) - file_size = os.path.getsize(file_path) - estimated_pages = max(1, file_size // (80 * 1024)) - - result = await parse_with_llamacloud_retry( - file_path=file_path, - estimated_pages=estimated_pages, - task_logger=task_logger, - log_entry=log_entry, - ) - - markdown_documents = await result.aget_markdown_documents( - split_by_page=False - ) - logger.debug( - f"[_extract_text_with_etl] LLAMACLOUD got {len(markdown_documents) if markdown_documents else 0} markdown docs" - ) - if markdown_documents: - text = markdown_documents[0].text - logger.debug( - f"[_extract_text_with_etl] LLAMACLOUD result: {len(text) if text else 0} chars" - ) - return text - logger.debug( - "[_extract_text_with_etl] LLAMACLOUD returned no markdown docs" - ) - return None - - elif etl_service == "DOCLING": - logger.debug("[_extract_text_with_etl] Using DOCLING ETL") - from app.services.docling_service import create_docling_service - - docling_service = create_docling_service() - - # Suppress pdfminer warnings - pdfminer_logger = getLogger("pdfminer") - original_level = pdfminer_logger.level - - with warnings.catch_warnings(): - warnings.filterwarnings( - "ignore", category=UserWarning, module="pdfminer" - ) - warnings.filterwarnings( - "ignore", message=".*Cannot set gray non-stroke color.*" - ) - warnings.filterwarnings("ignore", message=".*invalid float value.*") - - pdfminer_logger.setLevel(ERROR) - - try: - result = await docling_service.process_document( - file_path, file_name - ) - logger.debug( - f"[_extract_text_with_etl] DOCLING result keys: {list(result.keys()) if result else 'None'}" - ) - finally: - pdfminer_logger.setLevel(original_level) - - content = result.get("content") - logger.debug( - f"[_extract_text_with_etl] DOCLING content: {len(content) if content else 0} chars" - ) - return content - else: - logger.warning( - f"[_extract_text_with_etl] Unknown ETL service: {etl_service}" - ) - return None - - except Exception as e: - logger.error( - f"[_extract_text_with_etl] ETL extraction EXCEPTION for {file_name}: {e!s}" - ) - import traceback - - logger.error(f"[_extract_text_with_etl] Traceback: {traceback.format_exc()}") - return None - - -# ============ Indexer Functions ============ - - -async def check_document_by_unique_identifier( - session: AsyncSession, unique_identifier_hash: str -) -> Document | None: - """Check if a document with the given unique identifier hash already exists.""" - from sqlalchemy.future import select - from sqlalchemy.orm import selectinload - - existing_doc_result = await session.execute( - select(Document) - .options(selectinload(Document.chunks)) - .where(Document.unique_identifier_hash == unique_identifier_hash) - ) - return existing_doc_result.scalars().first() - - -async def check_document_by_google_drive_file_id( - session: AsyncSession, file_id: str, search_space_id: int -) -> Document | None: - """Check if a document with this Google Drive file ID exists (from any connector). - - This checks both metadata key formats: - - 'google_drive_file_id' (normal Google Drive connector) - - 'file_id' (Composio Google Drive connector) - - This allows detecting duplicates BEFORE downloading/ETL, saving expensive API calls. - """ - from sqlalchemy import String, cast, or_ - from sqlalchemy.future import select - - # When casting JSON to String, the result includes quotes: "value" instead of value - # So we need to compare with the quoted version - quoted_file_id = f'"{file_id}"' - - existing_doc_result = await session.execute( - select(Document).where( - Document.search_space_id == search_space_id, - or_( - # Normal Google Drive connector format - cast(Document.document_metadata["google_drive_file_id"], String) - == quoted_file_id, - # Composio Google Drive connector format - cast(Document.document_metadata["file_id"], String) == quoted_file_id, - ), - ) - ) - return existing_doc_result.scalars().first() - - -async def update_connector_last_indexed( - session: AsyncSession, - connector, - update_last_indexed: bool = True, -) -> None: - """Update the last_indexed_at timestamp for a connector.""" - if update_last_indexed: - connector.last_indexed_at = datetime.now( - UTC - ) # Use UTC for timezone consistency - logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}") - - -def generate_indexing_settings_hash( - selected_folders: list[dict], - selected_files: list[dict], - indexing_options: dict, -) -> str: - """Generate a hash of indexing settings to detect configuration changes. - - This hash is used to determine if indexing settings have changed since - the last index, which would require a full re-scan instead of delta sync. - - Args: - selected_folders: List of {id, name} for folders to index - selected_files: List of {id, name} for individual files to index - indexing_options: Dict with max_files_per_folder, include_subfolders, etc. - - Returns: - MD5 hash string of the settings - """ - settings = { - "folders": sorted([f.get("id", "") for f in selected_folders]), - "files": sorted([f.get("id", "") for f in selected_files]), - "include_subfolders": indexing_options.get("include_subfolders", True), - "max_files_per_folder": indexing_options.get("max_files_per_folder", 100), - } - return hashlib.md5( - json.dumps(settings, sort_keys=True).encode(), usedforsecurity=False - ).hexdigest() - - -async def index_composio_google_drive( - session: AsyncSession, - connector, - connector_id: int, - search_space_id: int, - user_id: str, - task_logger: TaskLoggingService, - log_entry, - update_last_indexed: bool = True, - max_items: int = 1000, - on_heartbeat_callback: HeartbeatCallbackType | None = None, -) -> tuple[int, int, str | None]: - """Index Google Drive files via Composio with delta sync support. - - Returns: - Tuple of (documents_indexed, documents_skipped, error_message or None) - - Delta Sync Flow: - 1. First sync: Full scan + get initial page token - 2. Subsequent syncs: Use LIST_CHANGES to process only changed files - (unless settings changed or incremental_sync is disabled) - - Supports folder/file selection via connector config: - - selected_folders: List of {id, name} for folders to index - - selected_files: List of {id, name} for individual files to index - - indexing_options: {max_files_per_folder, incremental_sync, include_subfolders} - """ - try: - composio_connector = ComposioGoogleDriveConnector(session, connector_id) - connector_config = await composio_connector.get_config() - - # Get folder/file selection configuration - selected_folders = connector_config.get("selected_folders", []) - selected_files = connector_config.get("selected_files", []) - indexing_options = connector_config.get("indexing_options", {}) - - max_files_per_folder = indexing_options.get("max_files_per_folder", 100) - include_subfolders = indexing_options.get("include_subfolders", True) - incremental_sync = indexing_options.get("incremental_sync", True) - - # Generate current settings hash to detect configuration changes - current_settings_hash = generate_indexing_settings_hash( - selected_folders, selected_files, indexing_options - ) - last_settings_hash = connector_config.get("last_indexed_settings_hash") - - # Detect if settings changed since last index - settings_changed = ( - last_settings_hash is not None - and current_settings_hash != last_settings_hash - ) - - if settings_changed: - logger.info( - f"Indexing settings changed for connector {connector_id}. " - f"Will perform full re-scan to apply new configuration." - ) - - # Check for stored page token for delta sync - stored_page_token = connector_config.get("drive_page_token") - - # Determine whether to use delta sync: - # - Must have a stored page token - # - Must have been indexed before (last_indexed_at exists) - # - User must have incremental_sync enabled - # - Settings must not have changed (folder/subfolder config) - use_delta_sync = ( - incremental_sync - and stored_page_token - and connector.last_indexed_at - and not settings_changed - ) - - # Route to delta sync or full scan - if use_delta_sync: - logger.info( - f"Using delta sync for Composio Google Drive connector {connector_id}" - ) - await task_logger.log_task_progress( - log_entry, - f"Starting delta sync for Google Drive via Composio (connector {connector_id})", - {"stage": "delta_sync", "token": stored_page_token[:20] + "..."}, - ) - - ( - documents_indexed, - documents_skipped, - processing_errors, - ) = await _index_composio_drive_delta_sync( - session=session, - composio_connector=composio_connector, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - page_token=stored_page_token, - max_items=max_items, - task_logger=task_logger, - log_entry=log_entry, - enable_summary=getattr(connector, "enable_summary", False), - on_heartbeat_callback=on_heartbeat_callback, - ) - else: - logger.info( - f"Using full scan for Composio Google Drive connector {connector_id} (first sync or no token)" - ) - await task_logger.log_task_progress( - log_entry, - f"Fetching Google Drive files via Composio for connector {connector_id}", - { - "stage": "full_scan", - "selected_folders": len(selected_folders), - "selected_files": len(selected_files), - }, - ) - - ( - documents_indexed, - documents_skipped, - processing_errors, - ) = await _index_composio_drive_full_scan( - session=session, - composio_connector=composio_connector, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - selected_folders=selected_folders, - selected_files=selected_files, - max_files_per_folder=max_files_per_folder, - include_subfolders=include_subfolders, - max_items=max_items, - task_logger=task_logger, - log_entry=log_entry, - enable_summary=getattr(connector, "enable_summary", False), - on_heartbeat_callback=on_heartbeat_callback, - ) - - # Get new page token for next sync (always update after successful sync) - new_token, token_error = await composio_connector.get_drive_start_page_token() - if new_token and not token_error: - # Refresh connector to avoid stale state - await session.refresh(connector) - - if not connector.config: - connector.config = {} - connector.config["drive_page_token"] = new_token - flag_modified(connector, "config") - logger.info(f"Updated drive_page_token for connector {connector_id}") - elif token_error: - logger.warning(f"Failed to get new page token: {token_error}") - - # Save current settings hash for future change detection - # This allows detecting when folder/subfolder settings change - if not connector.config: - connector.config = {} - connector.config["last_indexed_settings_hash"] = current_settings_hash - flag_modified(connector, "config") - logger.info(f"Saved indexing settings hash for connector {connector_id}") - - # CRITICAL: Always update timestamp so Electric SQL syncs and UI shows indexed status - await update_connector_last_indexed(session, connector, update_last_indexed) - - # Final commit - logger.info( - f"Final commit: Total {documents_indexed} Google Drive files processed" - ) - await session.commit() - logger.info( - "Successfully committed all Composio Google Drive document changes to database" - ) - - # Handle processing errors - error_message = None - if processing_errors: - if len(processing_errors) == 1: - error_message = processing_errors[0] - else: - error_message = f"Failed to process {len(processing_errors)} file(s). First error: {processing_errors[0]}" - await task_logger.log_task_failure( - log_entry, - f"Completed Google Drive indexing with {len(processing_errors)} error(s) for connector {connector_id}", - { - "documents_indexed": documents_indexed, - "documents_skipped": documents_skipped, - "sync_type": "delta" if use_delta_sync else "full", - "errors": processing_errors, - }, - ) - else: - await task_logger.log_task_success( - log_entry, - f"Successfully completed Google Drive indexing via Composio for connector {connector_id}", - { - "documents_indexed": documents_indexed, - "documents_skipped": documents_skipped, - "sync_type": "delta" if use_delta_sync else "full", - }, - ) - - return documents_indexed, documents_skipped, error_message - - except Exception as e: - logger.error(f"Failed to index Google Drive via Composio: {e!s}", exc_info=True) - return 0, 0, f"Failed to index Google Drive via Composio: {e!s}" - - -async def _index_composio_drive_delta_sync( - session: AsyncSession, - composio_connector: ComposioGoogleDriveConnector, - connector_id: int, - search_space_id: int, - user_id: str, - page_token: str, - max_items: int, - task_logger: TaskLoggingService, - log_entry, - enable_summary: bool = False, - on_heartbeat_callback: HeartbeatCallbackType | None = None, -) -> tuple[int, int, list[str]]: - """Index Google Drive files using delta sync with real-time document status updates. - - Uses GOOGLEDRIVE_LIST_CHANGES to fetch only files that changed since last sync. - Handles: new files, modified files, and deleted files. - """ - documents_indexed = 0 - documents_skipped = 0 - documents_failed = 0 - processing_errors = [] - duplicate_content_count = 0 - last_heartbeat_time = time.time() - - # Fetch all changes with pagination - all_changes = [] - current_token = page_token - - while len(all_changes) < max_items: - changes, next_token, error = await composio_connector.list_drive_changes( - page_token=current_token, - page_size=100, - include_removed=True, - ) - - if error: - logger.error(f"Error fetching Drive changes: {error}") - processing_errors.append(f"Failed to fetch changes: {error}") - break - - all_changes.extend(changes) - - if not next_token or next_token == current_token: - break - current_token = next_token - - if not all_changes: - logger.info("No changes detected since last sync") - return 0, 0, [] - - logger.info(f"Processing {len(all_changes)} changes from delta sync") - - # ======================================================================= - # PHASE 1: Analyze all changes, handle deletions, create pending documents - # ======================================================================= - files_to_process = [] - new_documents_created = False - - for change in all_changes[:max_items]: - try: - # Handle removed files - is_removed = change.get("removed", False) - file_info = change.get("file", {}) - file_id = change.get("fileId") or file_info.get("id", "") - - if not file_id: - documents_skipped += 1 - continue - - # Check if file was trashed or removed - handle deletions immediately - if is_removed or file_info.get("trashed", False): - document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]) - unique_identifier_hash = generate_unique_identifier_hash( - document_type, f"drive_{file_id}", search_space_id - ) - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - if existing_document: - await session.delete(existing_document) - documents_indexed += 1 - logger.info(f"Deleted document for removed/trashed file: {file_id}") - continue - - # Process changed file - file_name = file_info.get("name", "") or "Untitled" - mime_type = file_info.get("mimeType", "") or file_info.get("mime_type", "") - - # Skip folders - if mime_type == "application/vnd.google-apps.folder": - continue - - # Check for existing document by file ID (from any connector) - existing_by_file_id = await check_document_by_google_drive_file_id( - session, file_id, search_space_id - ) - - # Generate unique identifier hash - document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]) - unique_identifier_hash = generate_unique_identifier_hash( - document_type, f"drive_{file_id}", search_space_id - ) - - # Check if document exists by unique identifier - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - - if existing_by_file_id and not existing_document: - # File already indexed by different connector - skip - logger.info( - f"Skipping file {file_name} (file_id={file_id}): already indexed " - f"by {existing_by_file_id.document_type.value}" - ) - documents_skipped += 1 - continue - - if existing_document: - # Queue existing document for update - files_to_process.append( - { - "document": existing_document, - "is_new": False, - "file_id": file_id, - "file_name": file_name, - "mime_type": mime_type, - } - ) - continue - - # Create new document with PENDING status - document = Document( - search_space_id=search_space_id, - title=file_name, - document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]), - document_metadata={ - "file_id": file_id, - "file_name": file_name, - "FILE_NAME": file_name, - "mime_type": mime_type, - "connector_id": connector_id, - "toolkit_id": "googledrive", - "source": "composio", - }, - content="Pending...", - content_hash=unique_identifier_hash, - unique_identifier_hash=unique_identifier_hash, - embedding=None, - chunks=[], - status=DocumentStatus.pending(), - updated_at=get_current_timestamp(), - created_by_id=user_id, - connector_id=connector_id, - ) - session.add(document) - new_documents_created = True - - files_to_process.append( - { - "document": document, - "is_new": True, - "file_id": file_id, - "file_name": file_name, - "mime_type": mime_type, - } - ) - - except Exception as e: - logger.error(f"Error in Phase 1 for change: {e!s}", exc_info=True) - documents_skipped += 1 - continue - - # Commit all pending documents - they all appear in UI now - if new_documents_created: - logger.info( - f"Phase 1: Committing {len([f for f in files_to_process if f['is_new']])} pending documents" - ) - await session.commit() - - # ======================================================================= - # PHASE 2: Process each document one by one - # ======================================================================= - logger.info(f"Phase 2: Processing {len(files_to_process)} documents") - - for item in files_to_process: - # Send heartbeat periodically - if on_heartbeat_callback: - current_time = time.time() - if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = current_time - - document = item["document"] - try: - # Set to PROCESSING and commit - document.status = DocumentStatus.processing() - await session.commit() - - # Get file content - content, content_error = await composio_connector.get_drive_file_content( - item["file_id"], original_mime_type=item["mime_type"] - ) - - if content_error or not content: - logger.warning( - f"Could not get content for file {item['file_name']}: {content_error}" - ) - markdown_content = f"# {item['file_name']}\n\n" - markdown_content += f"**File ID:** {item['file_id']}\n" - markdown_content += f"**Type:** {item['mime_type']}\n" - elif isinstance(content, dict): - error_msg = f"Unexpected dict content format for file {item['file_name']}: {list(content.keys())}" - logger.error(error_msg) - processing_errors.append(error_msg) - markdown_content = f"# {item['file_name']}\n\n" - markdown_content += f"**File ID:** {item['file_id']}\n" - markdown_content += f"**Type:** {item['mime_type']}\n" - else: - markdown_content = await _process_file_content( - content=content, - file_name=item["file_name"], - file_id=item["file_id"], - mime_type=item["mime_type"], - search_space_id=search_space_id, - user_id=user_id, - session=session, - task_logger=task_logger, - log_entry=log_entry, - processing_errors=processing_errors, - ) - - content_hash = generate_content_hash(markdown_content, search_space_id) - - # For existing documents, check if content changed - if not item["is_new"] and document.content_hash == content_hash: - if not DocumentStatus.is_state(document.status, DocumentStatus.READY): - document.status = DocumentStatus.ready() - documents_skipped += 1 - continue - - # Check for duplicate content hash (for new documents) - if item["is_new"]: - with session.no_autoflush: - duplicate_by_content = await check_duplicate_document_by_hash( - session, content_hash - ) - if duplicate_by_content: - logger.info( - f"File {item['file_name']} already indexed by another connector. Skipping." - ) - await session.delete(document) - duplicate_content_count += 1 - documents_skipped += 1 - continue - - # Heavy processing (LLM, embeddings, chunks) - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm and enable_summary: - document_metadata_for_summary = { - "file_id": item["file_id"], - "file_name": item["file_name"], - "mime_type": item["mime_type"], - "document_type": "Google Drive File (Composio)", - } - summary_content, summary_embedding = await generate_document_summary( - markdown_content, user_llm, document_metadata_for_summary - ) - else: - summary_content = f"Google Drive File: {item['file_name']}\n\nType: {item['mime_type']}\n\n{markdown_content}" - summary_embedding = embed_text(summary_content) - - chunks = await create_document_chunks(markdown_content) - - # Update document to READY - document.title = item["file_name"] - document.content = summary_content - document.content_hash = content_hash - document.embedding = summary_embedding - document.document_metadata = { - "file_id": item["file_id"], - "file_name": item["file_name"], - "FILE_NAME": item["file_name"], - "mime_type": item["mime_type"], - "connector_id": connector_id, - "source": "composio", - } - await safe_set_chunks(session, document, chunks) - document.updated_at = get_current_timestamp() - document.status = DocumentStatus.ready() - - documents_indexed += 1 - - # Batch commit every 10 documents - if documents_indexed % 10 == 0: - await session.commit() - logger.info(f"Committed batch: {documents_indexed} changes processed") - - except Exception as e: - error_msg = f"Error processing change for file {item['file_id']}: {e!s}" - logger.error(error_msg, exc_info=True) - processing_errors.append(error_msg) - try: - document.status = DocumentStatus.failed(str(e)) - document.updated_at = get_current_timestamp() - except Exception as status_error: - logger.error( - f"Failed to update document status to failed: {status_error}" - ) - documents_failed += 1 - continue - - logger.info( - f"Delta sync complete: {documents_indexed} indexed, {documents_skipped} skipped, " - f"{documents_failed} failed ({duplicate_content_count} duplicate content)" - ) - return documents_indexed, documents_skipped, processing_errors - - -async def _index_composio_drive_full_scan( - session: AsyncSession, - composio_connector: ComposioGoogleDriveConnector, - connector_id: int, - search_space_id: int, - user_id: str, - selected_folders: list[dict], - selected_files: list[dict], - max_files_per_folder: int, - include_subfolders: bool, - max_items: int, - task_logger: TaskLoggingService, - log_entry, - enable_summary: bool = False, - on_heartbeat_callback: HeartbeatCallbackType | None = None, -) -> tuple[int, int, list[str]]: - """Index Google Drive files using full scan with real-time document status updates.""" - documents_indexed = 0 - documents_skipped = 0 - documents_failed = 0 - processing_errors = [] - duplicate_content_count = 0 - last_heartbeat_time = time.time() - - all_files = [] - - # If specific folders/files are selected, fetch from those - if selected_folders or selected_files: - # Fetch files from selected folders - for folder in selected_folders: - folder_id = folder.get("id") - folder_name = folder.get("name", "Unknown") - - if not folder_id: - continue - - # Handle special case for "root" folder - actual_folder_id = None if folder_id == "root" else folder_id - - logger.info(f"Fetching files from folder: {folder_name} ({folder_id})") - - # Fetch files from this folder - folder_files = [] - page_token = None - - while len(folder_files) < max_files_per_folder: - ( - files, - next_token, - error, - ) = await composio_connector.list_drive_files( - folder_id=actual_folder_id, - page_token=page_token, - page_size=min(100, max_files_per_folder - len(folder_files)), - ) - - if error: - logger.warning( - f"Failed to fetch files from folder {folder_name}: {error}" - ) - break - - # Process files - for file_info in files: - mime_type = file_info.get("mimeType", "") or file_info.get( - "mime_type", "" - ) - - # If it's a folder and include_subfolders is enabled, recursively fetch - if mime_type == "application/vnd.google-apps.folder": - if include_subfolders: - # Add subfolder files recursively - subfolder_files = await _fetch_folder_files_recursively( - composio_connector, - file_info.get("id"), - max_files=max_files_per_folder, - current_count=len(folder_files), - ) - folder_files.extend(subfolder_files) - else: - folder_files.append(file_info) - - if not next_token: - break - page_token = next_token - - all_files.extend(folder_files[:max_files_per_folder]) - logger.info(f"Found {len(folder_files)} files in folder {folder_name}") - - # Add specifically selected files - fetch metadata to get mimeType - for selected_file in selected_files: - file_id = selected_file.get("id") - file_name = selected_file.get("name", "Unknown") - - if not file_id: - continue - - # Fetch file metadata to get proper mimeType - metadata, meta_error = await composio_connector.get_file_metadata(file_id) - if metadata and not meta_error: - all_files.append( - { - "id": file_id, - "name": metadata.get("name") or file_name, - "mimeType": metadata.get("mimeType", ""), - "modifiedTime": metadata.get("modifiedTime", ""), - "createdTime": metadata.get("createdTime", ""), - } - ) - logger.info( - f"Fetched metadata for UI-selected file: {file_name} " - f"(mimeType={metadata.get('mimeType', 'unknown')})" - ) - else: - # Fallback if metadata fetch fails - content-based detection will handle it - logger.warning( - f"Could not fetch metadata for file {file_name}: {meta_error}. " - f"Falling back to content-based detection." - ) - all_files.append( - { - "id": file_id, - "name": file_name, - "mimeType": "", # Content-based detection will handle this - } - ) - else: - # No selection specified - fetch all files (original behavior) - page_token = None - - while len(all_files) < max_items: - files, next_token, error = await composio_connector.list_drive_files( - page_token=page_token, - page_size=min(100, max_items - len(all_files)), - ) - - if error: - return 0, 0, [f"Failed to fetch Drive files: {error}"] - - all_files.extend(files) - - if not next_token: - break - page_token = next_token - - if not all_files: - logger.info("No Google Drive files found") - return 0, 0, [] - - logger.info( - f"Found {len(all_files)} Google Drive files to index via Composio (full scan)" - ) - - # ======================================================================= - # PHASE 1: Analyze all files, create pending documents - # This makes ALL documents visible in the UI immediately with pending status - # ======================================================================= - files_to_process = [] # List of dicts with document and file data - new_documents_created = False - - for file_info in all_files: - try: - # Handle both standard Google API and potential Composio variations - file_id = file_info.get("id", "") or file_info.get("fileId", "") - file_name = ( - file_info.get("name", "") or file_info.get("fileName", "") or "Untitled" - ) - mime_type = file_info.get("mimeType", "") or file_info.get("mime_type", "") - - if not file_id: - documents_skipped += 1 - continue - - # Skip folders - if mime_type == "application/vnd.google-apps.folder": - continue - - # ========== EARLY DUPLICATE CHECK BY FILE ID ========== - existing_by_file_id = await check_document_by_google_drive_file_id( - session, file_id, search_space_id - ) - if existing_by_file_id: - logger.info( - f"Skipping file {file_name} (file_id={file_id}): already indexed " - f"by {existing_by_file_id.document_type.value}" - ) - documents_skipped += 1 - continue - - # Generate unique identifier hash - document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]) - unique_identifier_hash = generate_unique_identifier_hash( - document_type, f"drive_{file_id}", search_space_id - ) - - # Check if document exists by unique identifier - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - - if existing_document: - # Queue existing document for update (will be set to processing in Phase 2) - files_to_process.append( - { - "document": existing_document, - "is_new": False, - "file_id": file_id, - "file_name": file_name, - "mime_type": mime_type, - } - ) - continue - - # Create new document with PENDING status (visible in UI immediately) - document = Document( - search_space_id=search_space_id, - title=file_name, - document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]), - document_metadata={ - "file_id": file_id, - "file_name": file_name, - "FILE_NAME": file_name, - "mime_type": mime_type, - "connector_id": connector_id, - "toolkit_id": "googledrive", - "source": "composio", - }, - content="Pending...", # Placeholder until processed - content_hash=unique_identifier_hash, # Temporary unique value - updated when ready - unique_identifier_hash=unique_identifier_hash, - embedding=None, - chunks=[], # Empty at creation - safe for async - status=DocumentStatus.pending(), # Pending until processing starts - updated_at=get_current_timestamp(), - created_by_id=user_id, - connector_id=connector_id, - ) - session.add(document) - new_documents_created = True - - files_to_process.append( - { - "document": document, - "is_new": True, - "file_id": file_id, - "file_name": file_name, - "mime_type": mime_type, - } - ) - - except Exception as e: - logger.error(f"Error in Phase 1 for file: {e!s}", exc_info=True) - documents_skipped += 1 - continue - - # Commit all pending documents - they all appear in UI now - if new_documents_created: - logger.info( - f"Phase 1: Committing {len([f for f in files_to_process if f['is_new']])} pending documents" - ) - await session.commit() - - # ======================================================================= - # PHASE 2: Process each document one by one - # Each document transitions: pending → processing → ready/failed - # ======================================================================= - logger.info(f"Phase 2: Processing {len(files_to_process)} documents") - - for item in files_to_process: - # Send heartbeat periodically - if on_heartbeat_callback: - current_time = time.time() - if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = current_time - - document = item["document"] - try: - # Set to PROCESSING and commit - shows "processing" in UI for THIS document only - document.status = DocumentStatus.processing() - await session.commit() - - # Get file content (pass mime_type for Google Workspace export handling) - content, content_error = await composio_connector.get_drive_file_content( - item["file_id"], original_mime_type=item["mime_type"] - ) - - if content_error or not content: - logger.warning( - f"Could not get content for file {item['file_name']}: {content_error}" - ) - markdown_content = f"# {item['file_name']}\n\n" - markdown_content += f"**File ID:** {item['file_id']}\n" - markdown_content += f"**Type:** {item['mime_type']}\n" - elif isinstance(content, dict): - error_msg = f"Unexpected dict content format for file {item['file_name']}: {list(content.keys())}" - logger.error(error_msg) - processing_errors.append(error_msg) - markdown_content = f"# {item['file_name']}\n\n" - markdown_content += f"**File ID:** {item['file_id']}\n" - markdown_content += f"**Type:** {item['mime_type']}\n" - else: - # Process content based on file type - markdown_content = await _process_file_content( - content=content, - file_name=item["file_name"], - file_id=item["file_id"], - mime_type=item["mime_type"], - search_space_id=search_space_id, - user_id=user_id, - session=session, - task_logger=task_logger, - log_entry=log_entry, - processing_errors=processing_errors, - ) - - content_hash = generate_content_hash(markdown_content, search_space_id) - - # For existing documents, check if content changed - if not item["is_new"] and document.content_hash == content_hash: - # Ensure status is ready - if not DocumentStatus.is_state(document.status, DocumentStatus.READY): - document.status = DocumentStatus.ready() - documents_skipped += 1 - continue - - # Check for duplicate content hash (for new documents) - if item["is_new"]: - with session.no_autoflush: - duplicate_by_content = await check_duplicate_document_by_hash( - session, content_hash - ) - if duplicate_by_content: - logger.info( - f"File {item['file_name']} already indexed by another connector. Skipping." - ) - # Remove the pending document we created - await session.delete(document) - duplicate_content_count += 1 - documents_skipped += 1 - continue - - # Heavy processing (LLM, embeddings, chunks) - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm and enable_summary: - document_metadata_for_summary = { - "file_id": item["file_id"], - "file_name": item["file_name"], - "mime_type": item["mime_type"], - "document_type": "Google Drive File (Composio)", - } - summary_content, summary_embedding = await generate_document_summary( - markdown_content, user_llm, document_metadata_for_summary - ) - else: - summary_content = f"Google Drive File: {item['file_name']}\n\nType: {item['mime_type']}\n\n{markdown_content}" - summary_embedding = embed_text(summary_content) - - chunks = await create_document_chunks(markdown_content) - - # Update document to READY with actual content - document.title = item["file_name"] - document.content = summary_content - document.content_hash = content_hash - document.embedding = summary_embedding - document.document_metadata = { - "file_id": item["file_id"], - "file_name": item["file_name"], - "FILE_NAME": item["file_name"], - "mime_type": item["mime_type"], - "connector_id": connector_id, - "source": "composio", - } - await safe_set_chunks(session, document, chunks) - document.updated_at = get_current_timestamp() - document.status = DocumentStatus.ready() - - documents_indexed += 1 - - # Batch commit every 10 documents - if documents_indexed % 10 == 0: - logger.info( - f"Committing batch: {documents_indexed} Google Drive files processed so far" - ) - await session.commit() - - except Exception as e: - error_msg = f"Error processing Drive file {item['file_name']}: {e!s}" - logger.error(error_msg, exc_info=True) - processing_errors.append(error_msg) - # Mark document as failed with reason (visible in UI) - try: - document.status = DocumentStatus.failed(str(e)) - document.updated_at = get_current_timestamp() - except Exception as status_error: - logger.error( - f"Failed to update document status to failed: {status_error}" - ) - documents_failed += 1 - continue - - logger.info( - f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped, " - f"{documents_failed} failed ({duplicate_content_count} duplicate content)" - ) - return documents_indexed, documents_skipped, processing_errors - - -async def _fetch_folder_files_recursively( - composio_connector: ComposioGoogleDriveConnector, - folder_id: str, - max_files: int = 100, - current_count: int = 0, - depth: int = 0, - max_depth: int = 10, -) -> list[dict[str, Any]]: - """ - Recursively fetch files from a Google Drive folder via Composio. - - Args: - composio_connector: The Composio connector instance - folder_id: Google Drive folder ID - max_files: Maximum number of files to fetch - current_count: Current number of files already fetched - depth: Current recursion depth - max_depth: Maximum recursion depth to prevent infinite loops - - Returns: - List of file info dictionaries - """ - if depth >= max_depth: - logger.warning(f"Max recursion depth reached for folder {folder_id}") - return [] - - if current_count >= max_files: - return [] - - all_files = [] - page_token = None - - try: - while len(all_files) + current_count < max_files: - files, next_token, error = await composio_connector.list_drive_files( - folder_id=folder_id, - page_token=page_token, - page_size=min(100, max_files - len(all_files) - current_count), - ) - - if error: - logger.warning( - f"Error fetching files from subfolder {folder_id}: {error}" - ) - break - - for file_info in files: - mime_type = file_info.get("mimeType", "") or file_info.get( - "mime_type", "" - ) - - if mime_type == "application/vnd.google-apps.folder": - # Recursively fetch from subfolders - subfolder_files = await _fetch_folder_files_recursively( - composio_connector, - file_info.get("id"), - max_files=max_files, - current_count=current_count + len(all_files), - depth=depth + 1, - max_depth=max_depth, - ) - all_files.extend(subfolder_files) - else: - all_files.append(file_info) - - if len(all_files) + current_count >= max_files: - break - - if not next_token: - break - page_token = next_token - - return all_files[: max_files - current_count] - - except Exception as e: - logger.error(f"Error in recursive folder fetch: {e!s}") - return all_files diff --git a/surfsense_backend/app/connectors/google_calendar_connector.py b/surfsense_backend/app/connectors/google_calendar_connector.py index 4681251ad..0350413e2 100644 --- a/surfsense_backend/app/connectors/google_calendar_connector.py +++ b/surfsense_backend/app/connectors/google_calendar_connector.py @@ -52,44 +52,40 @@ class GoogleCalendarConnector: ) -> Credentials: """ Get valid Google OAuth credentials. - Returns: - Google OAuth credentials - Raises: - ValueError: If credentials have not been set - Exception: If credential refresh fails + + Supports both native OAuth (with refresh_token) and Composio-sourced + credentials (with refresh_handler). For Composio credentials, validation + and DB persistence are skipped since Composio manages its own tokens. """ - if not all( - [ - self._credentials.client_id, - self._credentials.client_secret, - self._credentials.refresh_token, - ] - ): - raise ValueError( - "Google OAuth credentials (client_id, client_secret, refresh_token) must be set" - ) + has_standard_refresh = bool(self._credentials.refresh_token) + + if has_standard_refresh: + if not all( + [self._credentials.client_id, self._credentials.client_secret] + ): + raise ValueError( + "Google OAuth credentials (client_id, client_secret) must be set" + ) if self._credentials and not self._credentials.expired: return self._credentials - # Create credentials from refresh token - self._credentials = Credentials( - token=self._credentials.token, - refresh_token=self._credentials.refresh_token, - token_uri=self._credentials.token_uri, - client_id=self._credentials.client_id, - client_secret=self._credentials.client_secret, - scopes=self._credentials.scopes, - expiry=self._credentials.expiry, - ) + if has_standard_refresh: + self._credentials = Credentials( + token=self._credentials.token, + refresh_token=self._credentials.refresh_token, + token_uri=self._credentials.token_uri, + client_id=self._credentials.client_id, + client_secret=self._credentials.client_secret, + scopes=self._credentials.scopes, + expiry=self._credentials.expiry, + ) - # Refresh the token if needed if self._credentials.expired or not self._credentials.valid: try: self._credentials.refresh(Request()) - # Update the connector config in DB - if self._session: - # Use connector_id if available, otherwise fall back to user_id query + # Only persist refreshed token for native OAuth (Composio manages its own) + if has_standard_refresh and self._session: if self._connector_id: result = await self._session.execute( select(SearchSourceConnector).filter( @@ -110,7 +106,6 @@ class GoogleCalendarConnector: "GOOGLE_CALENDAR_CONNECTOR connector not found; cannot persist refreshed token." ) - # Encrypt sensitive credentials before storing from app.config import config from app.utils.oauth_security import TokenEncryption @@ -119,7 +114,6 @@ class GoogleCalendarConnector: if token_encrypted and config.SECRET_KEY: token_encryption = TokenEncryption(config.SECRET_KEY) - # Encrypt sensitive fields if creds_dict.get("token"): creds_dict["token"] = token_encryption.encrypt_token( creds_dict["token"] @@ -143,7 +137,6 @@ class GoogleCalendarConnector: await self._session.commit() except Exception as e: error_str = str(e) - # Check if this is an invalid_grant error (token expired/revoked) if ( "invalid_grant" in error_str.lower() or "token has been expired or revoked" in error_str.lower() diff --git a/surfsense_backend/app/connectors/google_drive/client.py b/surfsense_backend/app/connectors/google_drive/client.py index 2910320b2..4bb01b84c 100644 --- a/surfsense_backend/app/connectors/google_drive/client.py +++ b/surfsense_backend/app/connectors/google_drive/client.py @@ -15,16 +15,24 @@ from .file_types import GOOGLE_DOC, GOOGLE_SHEET class GoogleDriveClient: """Client for Google Drive API operations.""" - def __init__(self, session: AsyncSession, connector_id: int): + def __init__( + self, + session: AsyncSession, + connector_id: int, + credentials: "Credentials | None" = None, + ): """ Initialize Google Drive client. Args: session: Database session connector_id: ID of the Drive connector + credentials: Pre-built credentials (e.g. from Composio). If None, + credentials are loaded from the DB connector config. """ self.session = session self.connector_id = connector_id + self._credentials = credentials self.service = None async def get_service(self): @@ -41,7 +49,12 @@ class GoogleDriveClient: return self.service try: - credentials = await get_valid_credentials(self.session, self.connector_id) + if self._credentials: + credentials = self._credentials + else: + credentials = await get_valid_credentials( + self.session, self.connector_id + ) self.service = build("drive", "v3", credentials=credentials) return self.service except Exception as e: diff --git a/surfsense_backend/app/connectors/google_drive/content_extractor.py b/surfsense_backend/app/connectors/google_drive/content_extractor.py index 39a92f95f..1d08d38f7 100644 --- a/surfsense_backend/app/connectors/google_drive/content_extractor.py +++ b/surfsense_backend/app/connectors/google_drive/content_extractor.py @@ -26,6 +26,7 @@ async def download_and_process_file( task_logger: TaskLoggingService, log_entry: Log, connector_id: int | None = None, + enable_summary: bool = True, ) -> tuple[Any, str | None, dict[str, Any] | None]: """ Download Google Drive file and process using Surfsense file processors. @@ -95,6 +96,7 @@ async def download_and_process_file( }, } # Include connector_id for de-indexing support + connector_info["enable_summary"] = enable_summary if connector_id is not None: connector_info["connector_id"] = connector_id diff --git a/surfsense_backend/app/connectors/google_gmail_connector.py b/surfsense_backend/app/connectors/google_gmail_connector.py index e8347ea73..5568dceb0 100644 --- a/surfsense_backend/app/connectors/google_gmail_connector.py +++ b/surfsense_backend/app/connectors/google_gmail_connector.py @@ -81,44 +81,40 @@ class GoogleGmailConnector: ) -> Credentials: """ Get valid Google OAuth credentials. - Returns: - Google OAuth credentials - Raises: - ValueError: If credentials have not been set - Exception: If credential refresh fails + + Supports both native OAuth (with refresh_token) and Composio-sourced + credentials (with refresh_handler). For Composio credentials, validation + and DB persistence are skipped since Composio manages its own tokens. """ - if not all( - [ - self._credentials.client_id, - self._credentials.client_secret, - self._credentials.refresh_token, - ] - ): - raise ValueError( - "Google OAuth credentials (client_id, client_secret, refresh_token) must be set" - ) + has_standard_refresh = bool(self._credentials.refresh_token) + + if has_standard_refresh: + if not all( + [self._credentials.client_id, self._credentials.client_secret] + ): + raise ValueError( + "Google OAuth credentials (client_id, client_secret) must be set" + ) if self._credentials and not self._credentials.expired: return self._credentials - # Create credentials from refresh token - self._credentials = Credentials( - token=self._credentials.token, - refresh_token=self._credentials.refresh_token, - token_uri=self._credentials.token_uri, - client_id=self._credentials.client_id, - client_secret=self._credentials.client_secret, - scopes=self._credentials.scopes, - expiry=self._credentials.expiry, - ) + if has_standard_refresh: + self._credentials = Credentials( + token=self._credentials.token, + refresh_token=self._credentials.refresh_token, + token_uri=self._credentials.token_uri, + client_id=self._credentials.client_id, + client_secret=self._credentials.client_secret, + scopes=self._credentials.scopes, + expiry=self._credentials.expiry, + ) - # Refresh the token if needed if self._credentials.expired or not self._credentials.valid: try: self._credentials.refresh(Request()) - # Update the connector config in DB - if self._session: - # Use connector_id if available, otherwise fall back to user_id query + # Only persist refreshed token for native OAuth (Composio manages its own) + if has_standard_refresh and self._session: if self._connector_id: result = await self._session.execute( select(SearchSourceConnector).filter( @@ -143,7 +139,6 @@ class GoogleGmailConnector: await self._session.commit() except Exception as e: error_str = str(e) - # Check if this is an invalid_grant error (token expired/revoked) if ( "invalid_grant" in error_str.lower() or "token has been expired or revoked" in error_str.lower() diff --git a/surfsense_backend/app/retriever/chunks_hybrid_search.py b/surfsense_backend/app/retriever/chunks_hybrid_search.py index d8b009655..e37d140d8 100644 --- a/surfsense_backend/app/retriever/chunks_hybrid_search.py +++ b/surfsense_backend/app/retriever/chunks_hybrid_search.py @@ -157,7 +157,7 @@ class ChucksHybridSearchRetriever: query_text: str, top_k: int, search_space_id: int, - document_type: str | None = None, + document_type: str | list[str] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, query_embedding: list | None = None, @@ -217,18 +217,24 @@ class ChucksHybridSearchRetriever: func.coalesce(Document.status["state"].astext, "ready") != "deleting", ] - # Add document type filter if provided + # Add document type filter if provided (single string or list of strings) if document_type is not None: - # Convert string to enum value if needed - if isinstance(document_type, str): - try: - doc_type_enum = DocumentType[document_type] - base_conditions.append(Document.document_type == doc_type_enum) - except KeyError: - # If the document type doesn't exist in the enum, return empty results - return [] + type_list = document_type if isinstance(document_type, list) else [document_type] + doc_type_enums = [] + for dt in type_list: + if isinstance(dt, str): + try: + doc_type_enums.append(DocumentType[dt]) + except KeyError: + pass + else: + doc_type_enums.append(dt) + if not doc_type_enums: + return [] + if len(doc_type_enums) == 1: + base_conditions.append(Document.document_type == doc_type_enums[0]) else: - base_conditions.append(Document.document_type == document_type) + base_conditions.append(Document.document_type.in_(doc_type_enums)) # Add time-based filtering if provided if start_date is not None: diff --git a/surfsense_backend/app/retriever/documents_hybrid_search.py b/surfsense_backend/app/retriever/documents_hybrid_search.py index a95e41038..ff2a50db7 100644 --- a/surfsense_backend/app/retriever/documents_hybrid_search.py +++ b/surfsense_backend/app/retriever/documents_hybrid_search.py @@ -149,7 +149,7 @@ class DocumentHybridSearchRetriever: query_text: str, top_k: int, search_space_id: int, - document_type: str | None = None, + document_type: str | list[str] | None = None, start_date: datetime | None = None, end_date: datetime | None = None, query_embedding: list | None = None, @@ -197,18 +197,24 @@ class DocumentHybridSearchRetriever: func.coalesce(Document.status["state"].astext, "ready") != "deleting", ] - # Add document type filter if provided + # Add document type filter if provided (single string or list of strings) if document_type is not None: - # Convert string to enum value if needed - if isinstance(document_type, str): - try: - doc_type_enum = DocumentType[document_type] - base_conditions.append(Document.document_type == doc_type_enum) - except KeyError: - # If the document type doesn't exist in the enum, return empty results - return [] + type_list = document_type if isinstance(document_type, list) else [document_type] + doc_type_enums = [] + for dt in type_list: + if isinstance(dt, str): + try: + doc_type_enums.append(DocumentType[dt]) + except KeyError: + pass + else: + doc_type_enums.append(dt) + if not doc_type_enums: + return [] + if len(doc_type_enums) == 1: + base_conditions.append(Document.document_type == doc_type_enums[0]) else: - base_conditions.append(Document.document_type == document_type) + base_conditions.append(Document.document_type.in_(doc_type_enums)) # Add time-based filtering if provided if start_date is not None: diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index bd7d02b73..28bb95e5e 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -1068,7 +1068,7 @@ async def index_connector_content( == SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR ): from app.tasks.celery_tasks.connector_tasks import ( - index_composio_connector_task, + index_google_drive_files_task, ) # For Composio Google Drive, if drive_items is provided, update connector config @@ -1102,34 +1102,72 @@ async def index_connector_content( else: logger.info( f"Triggering Composio Google Drive indexing for connector {connector_id} into search space {search_space_id} " - f"using existing config (from {indexing_from} to {indexing_to})" + f"using existing config" ) - index_composio_connector_task.delay( - connector_id, search_space_id, str(user.id), indexing_from, indexing_to + # Extract config and build items_dict for index_google_drive_files_task + config = connector.config or {} + selected_folders = config.get("selected_folders", []) + selected_files = config.get("selected_files", []) + if not selected_folders and not selected_files: + raise HTTPException( + status_code=400, + detail="Composio Google Drive indexing requires folders or files to be configured. " + "Please select folders/files to index.", + ) + indexing_options = config.get( + "indexing_options", + { + "max_files_per_folder": 100, + "incremental_sync": True, + "include_subfolders": True, + }, + ) + items_dict = { + "folders": selected_folders, + "files": selected_files, + "indexing_options": indexing_options, + } + index_google_drive_files_task.delay( + connector_id, search_space_id, str(user.id), items_dict ) response_message = ( "Composio Google Drive indexing started in the background." ) - elif connector.connector_type in [ - SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR, - SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR, - ]: + elif ( + connector.connector_type + == SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR + ): from app.tasks.celery_tasks.connector_tasks import ( - index_composio_connector_task, + index_google_gmail_messages_task, ) - # For Composio Gmail and Calendar, use the same date calculation logic as normal connectors - # This ensures consistent behavior and uses last_indexed_at to reduce API calls - # (includes special case: if indexed today, go back 1 day to avoid missing data) logger.info( - f"Triggering Composio connector indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}" + f"Triggering Composio Gmail indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}" ) - index_composio_connector_task.delay( + index_google_gmail_messages_task.delay( connector_id, search_space_id, str(user.id), indexing_from, indexing_to ) - response_message = "Composio connector indexing started in the background." + response_message = "Composio Gmail indexing started in the background." + + elif ( + connector.connector_type + == SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR + ): + from app.tasks.celery_tasks.connector_tasks import ( + index_google_calendar_events_task, + ) + + logger.info( + f"Triggering Composio Google Calendar indexing for connector {connector_id} into search space {search_space_id} from {indexing_from} to {indexing_to}" + ) + index_google_calendar_events_task.delay( + connector_id, search_space_id, str(user.id), indexing_from, indexing_to + ) + response_message = ( + "Composio Google Calendar indexing started in the background." + ) else: raise HTTPException( diff --git a/surfsense_backend/app/services/composio_service.py b/surfsense_backend/app/services/composio_service.py index 3930d38ad..7c20be9a4 100644 --- a/surfsense_backend/app/services/composio_service.py +++ b/surfsense_backend/app/services/composio_service.py @@ -36,32 +36,14 @@ TOOLKIT_TO_CONNECTOR_TYPE = { } # Mapping of toolkit IDs to document types -TOOLKIT_TO_DOCUMENT_TYPE = { - "googledrive": "COMPOSIO_GOOGLE_DRIVE_CONNECTOR", - "gmail": "COMPOSIO_GMAIL_CONNECTOR", - "googlecalendar": "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR", -} +# Google Drive, Gmail, Calendar use unified native indexers - not in this registry +TOOLKIT_TO_DOCUMENT_TYPE: dict[str, str] = {} # Mapping of toolkit IDs to their indexer functions # Format: toolkit_id -> (module_path, function_name, supports_date_filter) # supports_date_filter: True if the indexer accepts start_date/end_date params -TOOLKIT_TO_INDEXER = { - "googledrive": ( - "app.connectors.composio_google_drive_connector", - "index_composio_google_drive", - False, # Google Drive doesn't use date filtering - ), - "gmail": ( - "app.connectors.composio_gmail_connector", - "index_composio_gmail", - True, # Gmail uses date filtering - ), - "googlecalendar": ( - "app.connectors.composio_google_calendar_connector", - "index_composio_google_calendar", - True, # Calendar uses date filtering - ), -} +# Google Drive, Gmail, Calendar use unified native indexers - not in this registry +TOOLKIT_TO_INDEXER: dict[str, tuple[str, str, bool]] = {} class ComposioService: diff --git a/surfsense_backend/app/services/connector_service.py b/surfsense_backend/app/services/connector_service.py index 1dd2ac0c2..730b946a3 100644 --- a/surfsense_backend/app/services/connector_service.py +++ b/surfsense_backend/app/services/connector_service.py @@ -215,11 +215,20 @@ class ConnectorService: return result_object, files_docs + # Composio connectors that were unified into native Google pipelines. + # Old documents may still carry the legacy type until re-indexed; searching + # for a native type should transparently include its legacy equivalent. + _LEGACY_TYPE_ALIASES: dict[str, str] = { + "GOOGLE_DRIVE_FILE": "COMPOSIO_GOOGLE_DRIVE_CONNECTOR", + "GOOGLE_GMAIL_CONNECTOR": "COMPOSIO_GMAIL_CONNECTOR", + "GOOGLE_CALENDAR_CONNECTOR": "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR", + } + async def _combined_rrf_search( self, query_text: str, search_space_id: int, - document_type: str, + document_type: str | list[str], top_k: int = 20, start_date: datetime | None = None, end_date: datetime | None = None, @@ -241,7 +250,8 @@ class ConnectorService: Args: query_text: The search query text search_space_id: The search space ID to search within - document_type: Document type to filter (e.g., "FILE", "CRAWLED_URL") + document_type: Document type(s) to filter (e.g., "FILE", "CRAWLED_URL", + or a list for multi-type queries) top_k: Number of results to return start_date: Optional start date for filtering documents by updated_at end_date: Optional end date for filtering documents by updated_at @@ -254,6 +264,16 @@ class ConnectorService: perf = get_perf_logger() t0 = time.perf_counter() + # Expand native Google types to include legacy Composio equivalents + # so old documents remain searchable until re-indexed. + if isinstance(document_type, str) and document_type in self._LEGACY_TYPE_ALIASES: + resolved_type: str | list[str] = [ + document_type, + self._LEGACY_TYPE_ALIASES[document_type], + ] + else: + resolved_type = document_type + # RRF constant k = 60 @@ -276,7 +296,7 @@ class ConnectorService: "query_text": query_text, "top_k": retriever_top_k, "search_space_id": search_space_id, - "document_type": document_type, + "document_type": resolved_type, "start_date": start_date, "end_date": end_date, "query_embedding": query_embedding, @@ -2746,299 +2766,6 @@ class ConnectorService: return result_object, obsidian_docs - # ========================================================================= - # Composio Connector Search Methods - # ========================================================================= - - async def search_composio_google_drive( - self, - user_query: str, - search_space_id: int, - top_k: int = 20, - start_date: datetime | None = None, - end_date: datetime | None = None, - ) -> tuple: - """ - Search for Composio Google Drive files and return both the source information - and langchain documents. - - Uses combined chunk-level and document-level hybrid search with RRF fusion. - - Args: - user_query: The user's query - search_space_id: The search space ID to search in - top_k: Maximum number of results to return - start_date: Optional start date for filtering documents by updated_at - end_date: Optional end date for filtering documents by updated_at - - Returns: - tuple: (sources_info, langchain_documents) - """ - composio_drive_docs = await self._combined_rrf_search( - query_text=user_query, - search_space_id=search_space_id, - document_type="COMPOSIO_GOOGLE_DRIVE_CONNECTOR", - top_k=top_k, - start_date=start_date, - end_date=end_date, - ) - - # Early return if no results - if not composio_drive_docs: - return { - "id": 54, - "name": "Google Drive (Composio)", - "type": "COMPOSIO_GOOGLE_DRIVE_CONNECTOR", - "sources": [], - }, [] - - def _title_fn(doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: - return ( - doc_info.get("title") - or metadata.get("title") - or metadata.get("file_name") - or "Untitled Document" - ) - - def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: - return metadata.get("url") or metadata.get("web_view_link") or "" - - def _description_fn( - chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] - ) -> str: - description = self._chunk_preview(chunk.get("content", ""), limit=200) - info_parts = [] - mime_type = metadata.get("mime_type") - modified_time = metadata.get("modified_time") - if mime_type: - info_parts.append(f"Type: {mime_type}") - if modified_time: - info_parts.append(f"Modified: {modified_time}") - if info_parts: - description = (description + " | " + " | ".join(info_parts)).strip(" |") - return description - - def _extra_fields_fn( - _chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] - ) -> dict[str, Any]: - return { - "mime_type": metadata.get("mime_type", ""), - "file_id": metadata.get("file_id", ""), - "modified_time": metadata.get("modified_time", ""), - } - - sources_list = self._build_chunk_sources_from_documents( - composio_drive_docs, - title_fn=_title_fn, - url_fn=_url_fn, - description_fn=_description_fn, - extra_fields_fn=_extra_fields_fn, - ) - - # Create result object - result_object = { - "id": 54, - "name": "Google Drive (Composio)", - "type": "COMPOSIO_GOOGLE_DRIVE_CONNECTOR", - "sources": sources_list, - } - - return result_object, composio_drive_docs - - async def search_composio_gmail( - self, - user_query: str, - search_space_id: int, - top_k: int = 20, - start_date: datetime | None = None, - end_date: datetime | None = None, - ) -> tuple: - """ - Search for Composio Gmail messages and return both the source information - and langchain documents. - - Uses combined chunk-level and document-level hybrid search with RRF fusion. - - Args: - user_query: The user's query - search_space_id: The search space ID to search in - top_k: Maximum number of results to return - start_date: Optional start date for filtering documents by updated_at - end_date: Optional end date for filtering documents by updated_at - - Returns: - tuple: (sources_info, langchain_documents) - """ - composio_gmail_docs = await self._combined_rrf_search( - query_text=user_query, - search_space_id=search_space_id, - document_type="COMPOSIO_GMAIL_CONNECTOR", - top_k=top_k, - start_date=start_date, - end_date=end_date, - ) - - # Early return if no results - if not composio_gmail_docs: - return { - "id": 55, - "name": "Gmail (Composio)", - "type": "COMPOSIO_GMAIL_CONNECTOR", - "sources": [], - }, [] - - def _title_fn(doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: - return ( - doc_info.get("title") - or metadata.get("subject") - or metadata.get("title") - or "Untitled Email" - ) - - def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: - return metadata.get("url") or "" - - def _description_fn( - chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] - ) -> str: - description = self._chunk_preview(chunk.get("content", ""), limit=200) - info_parts = [] - sender = metadata.get("from") or metadata.get("sender") - date = metadata.get("date") or metadata.get("received_at") - if sender: - info_parts.append(f"From: {sender}") - if date: - info_parts.append(f"Date: {date}") - if info_parts: - description = (description + " | " + " | ".join(info_parts)).strip(" |") - return description - - def _extra_fields_fn( - _chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] - ) -> dict[str, Any]: - return { - "message_id": metadata.get("message_id", ""), - "thread_id": metadata.get("thread_id", ""), - "from": metadata.get("from", ""), - "to": metadata.get("to", ""), - "date": metadata.get("date", ""), - } - - sources_list = self._build_chunk_sources_from_documents( - composio_gmail_docs, - title_fn=_title_fn, - url_fn=_url_fn, - description_fn=_description_fn, - extra_fields_fn=_extra_fields_fn, - ) - - # Create result object - result_object = { - "id": 55, - "name": "Gmail (Composio)", - "type": "COMPOSIO_GMAIL_CONNECTOR", - "sources": sources_list, - } - - return result_object, composio_gmail_docs - - async def search_composio_google_calendar( - self, - user_query: str, - search_space_id: int, - top_k: int = 20, - start_date: datetime | None = None, - end_date: datetime | None = None, - ) -> tuple: - """ - Search for Composio Google Calendar events and return both the source information - and langchain documents. - - Uses combined chunk-level and document-level hybrid search with RRF fusion. - - Args: - user_query: The user's query - search_space_id: The search space ID to search in - top_k: Maximum number of results to return - start_date: Optional start date for filtering documents by updated_at - end_date: Optional end date for filtering documents by updated_at - - Returns: - tuple: (sources_info, langchain_documents) - """ - composio_calendar_docs = await self._combined_rrf_search( - query_text=user_query, - search_space_id=search_space_id, - document_type="COMPOSIO_GOOGLE_CALENDAR_CONNECTOR", - top_k=top_k, - start_date=start_date, - end_date=end_date, - ) - - # Early return if no results - if not composio_calendar_docs: - return { - "id": 56, - "name": "Google Calendar (Composio)", - "type": "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR", - "sources": [], - }, [] - - def _title_fn(doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: - return ( - doc_info.get("title") - or metadata.get("summary") - or metadata.get("title") - or "Untitled Event" - ) - - def _url_fn(_doc_info: dict[str, Any], metadata: dict[str, Any]) -> str: - return metadata.get("url") or metadata.get("html_link") or "" - - def _description_fn( - chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] - ) -> str: - description = self._chunk_preview(chunk.get("content", ""), limit=200) - info_parts = [] - start_time = metadata.get("start_time") or metadata.get("start") - end_time = metadata.get("end_time") or metadata.get("end") - if start_time: - info_parts.append(f"Start: {start_time}") - if end_time: - info_parts.append(f"End: {end_time}") - if info_parts: - description = (description + " | " + " | ".join(info_parts)).strip(" |") - return description - - def _extra_fields_fn( - _chunk: dict[str, Any], _doc_info: dict[str, Any], metadata: dict[str, Any] - ) -> dict[str, Any]: - return { - "event_id": metadata.get("event_id", ""), - "calendar_id": metadata.get("calendar_id", ""), - "start_time": metadata.get("start_time", ""), - "end_time": metadata.get("end_time", ""), - "location": metadata.get("location", ""), - } - - sources_list = self._build_chunk_sources_from_documents( - composio_calendar_docs, - title_fn=_title_fn, - url_fn=_url_fn, - description_fn=_description_fn, - extra_fields_fn=_extra_fields_fn, - ) - - # Create result object - result_object = { - "id": 56, - "name": "Google Calendar (Composio)", - "type": "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR", - "sources": sources_list, - } - - return result_object, composio_calendar_docs - # ========================================================================= # Utility Methods for Connector Discovery # ========================================================================= diff --git a/surfsense_backend/app/services/google_drive/tool_metadata_service.py b/surfsense_backend/app/services/google_drive/tool_metadata_service.py index 8bc60ecbc..e5da663ec 100644 --- a/surfsense_backend/app/services/google_drive/tool_metadata_service.py +++ b/surfsense_backend/app/services/google_drive/tool_metadata_service.py @@ -112,8 +112,10 @@ class GoogleDriveToolMetadataService: and_( SearchSourceConnector.id == document.connector_id, SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR, + SearchSourceConnector.connector_type.in_([ + SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR, + SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, + ]), ) ) ) @@ -139,8 +141,10 @@ class GoogleDriveToolMetadataService: and_( SearchSourceConnector.search_space_id == search_space_id, SearchSourceConnector.user_id == user_id, - SearchSourceConnector.connector_type - == SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR, + SearchSourceConnector.connector_type.in_([ + SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR, + SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, + ]), ) ) .order_by(SearchSourceConnector.last_indexed_at.desc()) diff --git a/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py b/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py index 0ba8bc80a..e6890b0a8 100644 --- a/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py +++ b/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py @@ -55,7 +55,6 @@ async def _check_and_trigger_schedules(): from app.tasks.celery_tasks.connector_tasks import ( index_airtable_records_task, index_clickup_tasks_task, - index_composio_connector_task, index_confluence_pages_task, index_crawled_urls_task, index_discord_messages_task, @@ -88,10 +87,10 @@ async def _check_and_trigger_schedules(): SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task, SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: index_crawled_urls_task, SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR: index_google_drive_files_task, - # Composio connector types - SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR: index_composio_connector_task, - SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR: index_composio_connector_task, - SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR: index_composio_connector_task, + # Composio connector types (unified with native Google tasks) + SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR: index_google_drive_files_task, + SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR: index_google_gmail_messages_task, + SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR: index_google_calendar_events_task, } # Trigger indexing for each due connector @@ -129,11 +128,11 @@ async def _check_and_trigger_schedules(): f"({connector.connector_type.value})" ) - # Special handling for Google Drive - uses config for folder/file selection - if ( - connector.connector_type - == SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR - ): + # Special handling for Google Drive (native and Composio) - uses config for folder/file selection + if connector.connector_type in [ + SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR, + SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, + ]: connector_config = connector.config or {} selected_folders = connector_config.get("selected_folders", []) selected_files = connector_config.get("selected_files", []) diff --git a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py index 24e822060..9ea76c851 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py @@ -16,6 +16,15 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.connectors.google_calendar_connector import GoogleCalendarConnector from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType +from app.utils.google_credentials import ( + COMPOSIO_GOOGLE_CONNECTOR_TYPES, + build_composio_credentials, +) + +ACCEPTED_CALENDAR_CONNECTOR_TYPES = { + SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR, + SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR, +} from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( @@ -87,10 +96,12 @@ async def index_google_calendar_events( ) try: - # Get the connector from the database - connector = await get_connector_by_id( - session, connector_id, SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR - ) + # Accept both native and Composio Calendar connectors + connector = None + for ct in ACCEPTED_CALENDAR_CONNECTOR_TYPES: + connector = await get_connector_by_id(session, connector_id, ct) + if connector: + break if not connector: await task_logger.log_task_failure( @@ -101,69 +112,80 @@ async def index_google_calendar_events( ) return 0, f"Connector with ID {connector_id} not found" - # Get the Google Calendar credentials from the connector config - config_data = connector.config - - # Decrypt sensitive credentials if encrypted (for backward compatibility) - from app.config import config - from app.utils.oauth_security import TokenEncryption - - token_encrypted = config_data.get("_token_encrypted", False) - if token_encrypted and config.SECRET_KEY: - try: - token_encryption = TokenEncryption(config.SECRET_KEY) - - # Decrypt sensitive fields - if config_data.get("token"): - config_data["token"] = token_encryption.decrypt_token( - config_data["token"] - ) - if config_data.get("refresh_token"): - config_data["refresh_token"] = token_encryption.decrypt_token( - config_data["refresh_token"] - ) - if config_data.get("client_secret"): - config_data["client_secret"] = token_encryption.decrypt_token( - config_data["client_secret"] - ) - - logger.info( - f"Decrypted Google Calendar credentials for connector {connector_id}" - ) - except Exception as e: + # Build credentials based on connector type + if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES: + connected_account_id = connector.config.get( + "composio_connected_account_id" + ) + if not connected_account_id: await task_logger.log_task_failure( log_entry, - f"Failed to decrypt Google Calendar credentials for connector {connector_id}: {e!s}", - "Credential decryption failed", - {"error_type": "CredentialDecryptionError"}, + f"Composio connected_account_id not found for connector {connector_id}", + "Missing Composio account", + {"error_type": "MissingComposioAccount"}, ) - return 0, f"Failed to decrypt Google Calendar credentials: {e!s}" + return 0, "Composio connected_account_id not found" + credentials = build_composio_credentials(connected_account_id) + else: + config_data = connector.config - exp = config_data.get("expiry", "").replace("Z", "") - credentials = Credentials( - token=config_data.get("token"), - refresh_token=config_data.get("refresh_token"), - token_uri=config_data.get("token_uri"), - client_id=config_data.get("client_id"), - client_secret=config_data.get("client_secret"), - scopes=config_data.get("scopes"), - expiry=datetime.fromisoformat(exp) if exp else None, - ) + from app.config import config + from app.utils.oauth_security import TokenEncryption - if ( - not credentials.client_id - or not credentials.client_secret - or not credentials.refresh_token - ): - await task_logger.log_task_failure( - log_entry, - f"Google Calendar credentials not found in connector config for connector {connector_id}", - "Missing Google Calendar credentials", - {"error_type": "MissingCredentials"}, + token_encrypted = config_data.get("_token_encrypted", False) + if token_encrypted and config.SECRET_KEY: + try: + token_encryption = TokenEncryption(config.SECRET_KEY) + if config_data.get("token"): + config_data["token"] = token_encryption.decrypt_token( + config_data["token"] + ) + if config_data.get("refresh_token"): + config_data["refresh_token"] = token_encryption.decrypt_token( + config_data["refresh_token"] + ) + if config_data.get("client_secret"): + config_data["client_secret"] = token_encryption.decrypt_token( + config_data["client_secret"] + ) + logger.info( + f"Decrypted Google Calendar credentials for connector {connector_id}" + ) + except Exception as e: + await task_logger.log_task_failure( + log_entry, + f"Failed to decrypt Google Calendar credentials for connector {connector_id}: {e!s}", + "Credential decryption failed", + {"error_type": "CredentialDecryptionError"}, + ) + return 0, f"Failed to decrypt Google Calendar credentials: {e!s}" + + exp = config_data.get("expiry", "") + if exp: + exp = exp.replace("Z", "") + credentials = Credentials( + token=config_data.get("token"), + refresh_token=config_data.get("refresh_token"), + token_uri=config_data.get("token_uri"), + client_id=config_data.get("client_id"), + client_secret=config_data.get("client_secret"), + scopes=config_data.get("scopes", []), + expiry=datetime.fromisoformat(exp) if exp else None, ) - return 0, "Google Calendar credentials not found in connector config" - # Initialize Google Calendar client + if ( + not credentials.client_id + or not credentials.client_secret + or not credentials.refresh_token + ): + await task_logger.log_task_failure( + log_entry, + f"Google Calendar credentials not found in connector config for connector {connector_id}", + "Missing Google Calendar credentials", + {"error_type": "MissingCredentials"}, + ) + return 0, "Google Calendar credentials not found in connector config" + await task_logger.log_task_progress( log_entry, f"Initializing Google Calendar client for connector {connector_id}", diff --git a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py index 20c54d3fc..276be8cc3 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py @@ -31,6 +31,15 @@ from app.tasks.connector_indexers.base import ( update_connector_last_indexed, ) from app.utils.document_converters import generate_unique_identifier_hash +from app.utils.google_credentials import ( + COMPOSIO_GOOGLE_CONNECTOR_TYPES, + build_composio_credentials, +) + +ACCEPTED_DRIVE_CONNECTOR_TYPES = { + SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR, + SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, +} # Type hint for heartbeat callback HeartbeatCallbackType = Callable[[int], Awaitable[None]] @@ -89,14 +98,17 @@ async def index_google_drive_files( ) try: - connector = await get_connector_by_id( - session, connector_id, SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR - ) + # Accept both native and Composio Drive connectors + connector = None + for ct in ACCEPTED_DRIVE_CONNECTOR_TYPES: + connector = await get_connector_by_id(session, connector_id, ct) + if connector: + break if not connector: error_msg = f"Google Drive connector with ID {connector_id} not found" await task_logger.log_task_failure( - log_entry, error_msg, {"error_type": "ConnectorNotFound"} + log_entry, error_msg, None, {"error_type": "ConnectorNotFound"} ) return 0, error_msg @@ -106,27 +118,43 @@ async def index_google_drive_files( {"stage": "client_initialization"}, ) - # Check if credentials are encrypted (only when explicitly marked) - token_encrypted = connector.config.get("_token_encrypted", False) - if token_encrypted: - # Credentials are explicitly marked as encrypted, will be decrypted during client initialization - if not config.SECRET_KEY: - await task_logger.log_task_failure( - log_entry, - f"SECRET_KEY not configured but credentials are marked as encrypted for connector {connector_id}", - "Missing SECRET_KEY for token decryption", - {"error_type": "MissingSecretKey"}, - ) - return ( - 0, - "SECRET_KEY not configured but credentials are marked as encrypted", - ) - logger.info( - f"Google Drive credentials are encrypted for connector {connector_id}, will decrypt during client initialization" + # Build credentials based on connector type + pre_built_credentials = None + if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES: + connected_account_id = connector.config.get( + "composio_connected_account_id" ) - # If _token_encrypted is False or not set, treat credentials as plaintext + if not connected_account_id: + error_msg = f"Composio connected_account_id not found for connector {connector_id}" + await task_logger.log_task_failure( + log_entry, error_msg, "Missing Composio account", + {"error_type": "MissingComposioAccount"}, + ) + return 0, error_msg + pre_built_credentials = build_composio_credentials(connected_account_id) + else: + token_encrypted = connector.config.get("_token_encrypted", False) + if token_encrypted: + if not config.SECRET_KEY: + await task_logger.log_task_failure( + log_entry, + f"SECRET_KEY not configured but credentials are marked as encrypted for connector {connector_id}", + "Missing SECRET_KEY for token decryption", + {"error_type": "MissingSecretKey"}, + ) + return ( + 0, + "SECRET_KEY not configured but credentials are marked as encrypted", + ) + logger.info( + f"Google Drive credentials are encrypted for connector {connector_id}, will decrypt during client initialization" + ) - drive_client = GoogleDriveClient(session, connector_id) + connector_enable_summary = getattr(connector, "enable_summary", True) + + drive_client = GoogleDriveClient( + session, connector_id, credentials=pre_built_credentials + ) if not folder_id: error_msg = "folder_id is required for Google Drive indexing" @@ -164,6 +192,7 @@ async def index_google_drive_files( max_files=max_files, include_subfolders=include_subfolders, on_heartbeat_callback=on_heartbeat_callback, + enable_summary=connector_enable_summary, ) else: logger.info(f"Using full scan for connector {connector_id}") @@ -181,6 +210,7 @@ async def index_google_drive_files( max_files=max_files, include_subfolders=include_subfolders, on_heartbeat_callback=on_heartbeat_callback, + enable_summary=connector_enable_summary, ) documents_indexed, documents_skipped = result @@ -278,14 +308,17 @@ async def index_google_drive_single_file( ) try: - connector = await get_connector_by_id( - session, connector_id, SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR - ) + # Accept both native and Composio Drive connectors + connector = None + for ct in ACCEPTED_DRIVE_CONNECTOR_TYPES: + connector = await get_connector_by_id(session, connector_id, ct) + if connector: + break if not connector: error_msg = f"Google Drive connector with ID {connector_id} not found" await task_logger.log_task_failure( - log_entry, error_msg, {"error_type": "ConnectorNotFound"} + log_entry, error_msg, None, {"error_type": "ConnectorNotFound"} ) return 0, error_msg @@ -295,27 +328,42 @@ async def index_google_drive_single_file( {"stage": "client_initialization"}, ) - # Check if credentials are encrypted (only when explicitly marked) - token_encrypted = connector.config.get("_token_encrypted", False) - if token_encrypted: - # Credentials are explicitly marked as encrypted, will be decrypted during client initialization - if not config.SECRET_KEY: - await task_logger.log_task_failure( - log_entry, - f"SECRET_KEY not configured but credentials are marked as encrypted for connector {connector_id}", - "Missing SECRET_KEY for token decryption", - {"error_type": "MissingSecretKey"}, - ) - return ( - 0, - "SECRET_KEY not configured but credentials are marked as encrypted", - ) - logger.info( - f"Google Drive credentials are encrypted for connector {connector_id}, will decrypt during client initialization" + pre_built_credentials = None + if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES: + connected_account_id = connector.config.get( + "composio_connected_account_id" ) - # If _token_encrypted is False or not set, treat credentials as plaintext + if not connected_account_id: + error_msg = f"Composio connected_account_id not found for connector {connector_id}" + await task_logger.log_task_failure( + log_entry, error_msg, "Missing Composio account", + {"error_type": "MissingComposioAccount"}, + ) + return 0, error_msg + pre_built_credentials = build_composio_credentials(connected_account_id) + else: + token_encrypted = connector.config.get("_token_encrypted", False) + if token_encrypted: + if not config.SECRET_KEY: + await task_logger.log_task_failure( + log_entry, + f"SECRET_KEY not configured but credentials are marked as encrypted for connector {connector_id}", + "Missing SECRET_KEY for token decryption", + {"error_type": "MissingSecretKey"}, + ) + return ( + 0, + "SECRET_KEY not configured but credentials are marked as encrypted", + ) + logger.info( + f"Google Drive credentials are encrypted for connector {connector_id}, will decrypt during client initialization" + ) - drive_client = GoogleDriveClient(session, connector_id) + connector_enable_summary = getattr(connector, "enable_summary", True) + + drive_client = GoogleDriveClient( + session, connector_id, credentials=pre_built_credentials + ) # Fetch the file metadata file, error = await get_file_by_id(drive_client, file_id) @@ -362,6 +410,7 @@ async def index_google_drive_single_file( task_logger=task_logger, log_entry=log_entry, pending_document=pending_doc, + enable_summary=connector_enable_summary, ) await session.commit() @@ -433,6 +482,7 @@ async def _index_full_scan( max_files: int, include_subfolders: bool = False, on_heartbeat_callback: HeartbeatCallbackType | None = None, + enable_summary: bool = True, ) -> tuple[int, int]: """Perform full scan indexing of a folder. @@ -562,6 +612,7 @@ async def _index_full_scan( task_logger=task_logger, log_entry=log_entry, pending_document=pending_doc, + enable_summary=enable_summary, ) documents_indexed += indexed @@ -592,6 +643,7 @@ async def _index_with_delta_sync( max_files: int, include_subfolders: bool = False, on_heartbeat_callback: HeartbeatCallbackType | None = None, + enable_summary: bool = True, ) -> tuple[int, int]: """Perform delta sync indexing using change tracking. @@ -703,6 +755,7 @@ async def _index_with_delta_sync( task_logger=task_logger, log_entry=log_entry, pending_document=pending_doc, + enable_summary=enable_summary, ) documents_indexed += indexed @@ -957,6 +1010,7 @@ async def _process_single_file( task_logger: TaskLoggingService, log_entry: any, pending_document: Document | None = None, + enable_summary: bool = True, ) -> tuple[int, int, int]: """ Process a single file by downloading and using Surfsense's file processor. @@ -1020,6 +1074,7 @@ async def _process_single_file( task_logger=task_logger, log_entry=log_entry, connector_id=connector_id, + enable_summary=enable_summary, ) if error: diff --git a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py index 6e2408cbd..a1eee91d9 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py @@ -21,6 +21,15 @@ from app.db import ( DocumentType, SearchSourceConnectorType, ) +from app.utils.google_credentials import ( + COMPOSIO_GOOGLE_CONNECTOR_TYPES, + build_composio_credentials, +) + +ACCEPTED_GMAIL_CONNECTOR_TYPES = { + SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR, + SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR, +} from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( @@ -94,90 +103,100 @@ async def index_google_gmail_messages( ) try: - # Get connector by id - connector = await get_connector_by_id( - session, connector_id, SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR - ) + # Accept both native and Composio Gmail connectors + connector = None + for ct in ACCEPTED_GMAIL_CONNECTOR_TYPES: + connector = await get_connector_by_id(session, connector_id, ct) + if connector: + break if not connector: error_msg = f"Gmail connector with ID {connector_id} not found" await task_logger.log_task_failure( - log_entry, error_msg, {"error_type": "ConnectorNotFound"} + log_entry, error_msg, None, {"error_type": "ConnectorNotFound"} ) return 0, error_msg - # Get the Google Gmail credentials from the connector config - config_data = connector.config - - # Decrypt sensitive credentials if encrypted (for backward compatibility) - from app.config import config - from app.utils.oauth_security import TokenEncryption - - token_encrypted = config_data.get("_token_encrypted", False) - if token_encrypted and config.SECRET_KEY: - try: - token_encryption = TokenEncryption(config.SECRET_KEY) - - # Decrypt sensitive fields - if config_data.get("token"): - config_data["token"] = token_encryption.decrypt_token( - config_data["token"] - ) - if config_data.get("refresh_token"): - config_data["refresh_token"] = token_encryption.decrypt_token( - config_data["refresh_token"] - ) - if config_data.get("client_secret"): - config_data["client_secret"] = token_encryption.decrypt_token( - config_data["client_secret"] - ) - - logger.info( - f"Decrypted Google Gmail credentials for connector {connector_id}" - ) - except Exception as e: + # Build credentials based on connector type + if connector.connector_type in COMPOSIO_GOOGLE_CONNECTOR_TYPES: + connected_account_id = connector.config.get( + "composio_connected_account_id" + ) + if not connected_account_id: await task_logger.log_task_failure( log_entry, - f"Failed to decrypt Google Gmail credentials for connector {connector_id}: {e!s}", - "Credential decryption failed", - {"error_type": "CredentialDecryptionError"}, + f"Composio connected_account_id not found for connector {connector_id}", + "Missing Composio account", + {"error_type": "MissingComposioAccount"}, ) - return 0, f"Failed to decrypt Google Gmail credentials: {e!s}" + return 0, "Composio connected_account_id not found" + credentials = build_composio_credentials(connected_account_id) + else: + config_data = connector.config - exp = config_data.get("expiry", "") - if exp: - exp = exp.replace("Z", "") - credentials = Credentials( - token=config_data.get("token"), - refresh_token=config_data.get("refresh_token"), - token_uri=config_data.get("token_uri"), - client_id=config_data.get("client_id"), - client_secret=config_data.get("client_secret"), - scopes=config_data.get("scopes", []), - expiry=datetime.fromisoformat(exp) if exp else None, - ) + from app.config import config + from app.utils.oauth_security import TokenEncryption - if ( - not credentials.client_id - or not credentials.client_secret - or not credentials.refresh_token - ): - await task_logger.log_task_failure( - log_entry, - f"Google gmail credentials not found in connector config for connector {connector_id}", - "Missing Google gmail credentials", - {"error_type": "MissingCredentials"}, + token_encrypted = config_data.get("_token_encrypted", False) + if token_encrypted and config.SECRET_KEY: + try: + token_encryption = TokenEncryption(config.SECRET_KEY) + if config_data.get("token"): + config_data["token"] = token_encryption.decrypt_token( + config_data["token"] + ) + if config_data.get("refresh_token"): + config_data["refresh_token"] = token_encryption.decrypt_token( + config_data["refresh_token"] + ) + if config_data.get("client_secret"): + config_data["client_secret"] = token_encryption.decrypt_token( + config_data["client_secret"] + ) + logger.info( + f"Decrypted Google Gmail credentials for connector {connector_id}" + ) + except Exception as e: + await task_logger.log_task_failure( + log_entry, + f"Failed to decrypt Google Gmail credentials for connector {connector_id}: {e!s}", + "Credential decryption failed", + {"error_type": "CredentialDecryptionError"}, + ) + return 0, f"Failed to decrypt Google Gmail credentials: {e!s}" + + exp = config_data.get("expiry", "") + if exp: + exp = exp.replace("Z", "") + credentials = Credentials( + token=config_data.get("token"), + refresh_token=config_data.get("refresh_token"), + token_uri=config_data.get("token_uri"), + client_id=config_data.get("client_id"), + client_secret=config_data.get("client_secret"), + scopes=config_data.get("scopes", []), + expiry=datetime.fromisoformat(exp) if exp else None, ) - return 0, "Google gmail credentials not found in connector config" - # Initialize Google gmail client + if ( + not credentials.client_id + or not credentials.client_secret + or not credentials.refresh_token + ): + await task_logger.log_task_failure( + log_entry, + f"Google gmail credentials not found in connector config for connector {connector_id}", + "Missing Google gmail credentials", + {"error_type": "MissingCredentials"}, + ) + return 0, "Google gmail credentials not found in connector config" + await task_logger.log_task_progress( log_entry, f"Initializing Google gmail client for connector {connector_id}", {"stage": "client_initialization"}, ) - # Initialize Google gmail connector gmail_connector = GoogleGmailConnector( credentials, session, user_id, connector_id ) diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index 647435213..35494cc92 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -411,6 +411,7 @@ async def add_received_file_document_using_unstructured( search_space_id: int, user_id: str, connector: dict | None = None, + enable_summary: bool = True, ) -> Document | None: """ Process and store a file document using Unstructured service. @@ -471,9 +472,13 @@ async def add_received_file_document_using_unstructured( "etl_service": "UNSTRUCTURED", "document_type": "File Document", } - summary_content, summary_embedding = await generate_document_summary( - file_in_markdown, user_llm, document_metadata - ) + if enable_summary: + summary_content, summary_embedding = await generate_document_summary( + file_in_markdown, user_llm, document_metadata + ) + else: + summary_content = f"File: {file_name}\n\n{file_in_markdown[:4000]}" + summary_embedding = embed_text(summary_content) # Process chunks chunks = await create_document_chunks(file_in_markdown) @@ -493,14 +498,13 @@ async def add_received_file_document_using_unstructured( existing_document.source_markdown = file_in_markdown existing_document.content_needs_reindexing = False existing_document.updated_at = get_current_timestamp() - existing_document.status = DocumentStatus.ready() # Mark as ready + existing_document.status = DocumentStatus.ready() await session.commit() await session.refresh(existing_document) document = existing_document else: # Create new document - # Determine document type based on connector doc_type = DocumentType.FILE if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE: doc_type = DocumentType.GOOGLE_DRIVE_FILE @@ -523,7 +527,7 @@ async def add_received_file_document_using_unstructured( updated_at=get_current_timestamp(), created_by_id=user_id, connector_id=connector.get("connector_id") if connector else None, - status=DocumentStatus.ready(), # Mark as ready + status=DocumentStatus.ready(), ) session.add(document) @@ -546,6 +550,7 @@ async def add_received_file_document_using_llamacloud( search_space_id: int, user_id: str, connector: dict | None = None, + enable_summary: bool = True, ) -> Document | None: """ Process and store document content parsed by LlamaCloud. @@ -605,16 +610,19 @@ async def add_received_file_document_using_llamacloud( "etl_service": "LLAMACLOUD", "document_type": "File Document", } - summary_content, summary_embedding = await generate_document_summary( - file_in_markdown, user_llm, document_metadata - ) + if enable_summary: + summary_content, summary_embedding = await generate_document_summary( + file_in_markdown, user_llm, document_metadata + ) + else: + summary_content = f"File: {file_name}\n\n{file_in_markdown[:4000]}" + summary_embedding = embed_text(summary_content) # Process chunks chunks = await create_document_chunks(file_in_markdown) # Update or create document if existing_document: - # Update existing document existing_document.title = file_name existing_document.content = summary_content existing_document.content_hash = content_hash @@ -627,14 +635,12 @@ async def add_received_file_document_using_llamacloud( existing_document.source_markdown = file_in_markdown existing_document.content_needs_reindexing = False existing_document.updated_at = get_current_timestamp() - existing_document.status = DocumentStatus.ready() # Mark as ready + existing_document.status = DocumentStatus.ready() await session.commit() await session.refresh(existing_document) document = existing_document else: - # Create new document - # Determine document type based on connector doc_type = DocumentType.FILE if connector and connector.get("type") == DocumentType.GOOGLE_DRIVE_FILE: doc_type = DocumentType.GOOGLE_DRIVE_FILE @@ -657,7 +663,7 @@ async def add_received_file_document_using_llamacloud( updated_at=get_current_timestamp(), created_by_id=user_id, connector_id=connector.get("connector_id") if connector else None, - status=DocumentStatus.ready(), # Mark as ready + status=DocumentStatus.ready(), ) session.add(document) @@ -682,6 +688,7 @@ async def add_received_file_document_using_docling( search_space_id: int, user_id: str, connector: dict | None = None, + enable_summary: bool = True, ) -> Document | None: """ Process and store document content parsed by Docling. @@ -734,33 +741,32 @@ async def add_received_file_document_using_docling( f"No long context LLM configured for user {user_id} in search_space {search_space_id}" ) - # Generate summary using chunked processing for large documents - from app.services.docling_service import create_docling_service + if enable_summary: + from app.services.docling_service import create_docling_service - docling_service = create_docling_service() + docling_service = create_docling_service() - summary_content = await docling_service.process_large_document_summary( - content=file_in_markdown, llm=user_llm, document_title=file_name - ) + summary_content = await docling_service.process_large_document_summary( + content=file_in_markdown, llm=user_llm, document_title=file_name + ) - # Enhance summary with metadata - document_metadata = { - "file_name": file_name, - "etl_service": "DOCLING", - "document_type": "File Document", - } - metadata_parts = [] - metadata_parts.append("# DOCUMENT METADATA") + document_metadata = { + "file_name": file_name, + "etl_service": "DOCLING", + "document_type": "File Document", + } + metadata_parts = ["# DOCUMENT METADATA"] + for key, value in document_metadata.items(): + if value: + formatted_key = key.replace("_", " ").title() + metadata_parts.append(f"**{formatted_key}:** {value}") - for key, value in document_metadata.items(): - if value: # Only include non-empty values - formatted_key = key.replace("_", " ").title() - metadata_parts.append(f"**{formatted_key}:** {value}") - - metadata_section = "\n".join(metadata_parts) - enhanced_summary_content = ( - f"{metadata_section}\n\n# DOCUMENT SUMMARY\n\n{summary_content}" - ) + metadata_section = "\n".join(metadata_parts) + enhanced_summary_content = ( + f"{metadata_section}\n\n# DOCUMENT SUMMARY\n\n{summary_content}" + ) + else: + enhanced_summary_content = f"File: {file_name}\n\n{file_in_markdown[:4000]}" summary_embedding = embed_text(enhanced_summary_content) @@ -1219,9 +1225,10 @@ async def process_file_in_background( print("Error deleting temp file", e) pass - # Pass the documents to the existing background task + enable_summary = connector.get("enable_summary", True) if connector else True result = await add_received_file_document_using_unstructured( - session, filename, docs, search_space_id, user_id, connector + session, filename, docs, search_space_id, user_id, connector, + enable_summary=enable_summary, ) if connector: @@ -1362,7 +1369,7 @@ async def process_file_in_background( # Extract text content from the markdown documents markdown_content = doc.text - # Process the documents using our LlamaCloud background task + enable_summary = connector.get("enable_summary", True) if connector else True doc_result = await add_received_file_document_using_llamacloud( session, filename, @@ -1370,6 +1377,7 @@ async def process_file_in_background( search_space_id=search_space_id, user_id=user_id, connector=connector, + enable_summary=enable_summary, ) # Track if this document was successfully created @@ -1516,7 +1524,7 @@ async def process_file_in_background( session, notification, stage="chunking" ) - # Process the document using our Docling background task + enable_summary = connector.get("enable_summary", True) if connector else True doc_result = await add_received_file_document_using_docling( session, filename, @@ -1524,6 +1532,7 @@ async def process_file_in_background( search_space_id=search_space_id, user_id=user_id, connector=connector, + enable_summary=enable_summary, ) if doc_result: diff --git a/surfsense_backend/app/utils/google_credentials.py b/surfsense_backend/app/utils/google_credentials.py new file mode 100644 index 000000000..b490768bc --- /dev/null +++ b/surfsense_backend/app/utils/google_credentials.py @@ -0,0 +1,41 @@ +"""Shared Google OAuth credential utilities for native and Composio connectors.""" + +import logging +from datetime import datetime, timedelta, timezone + +from google.oauth2.credentials import Credentials + +from app.db import SearchSourceConnectorType + +logger = logging.getLogger(__name__) + +COMPOSIO_GOOGLE_CONNECTOR_TYPES = { + SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, + SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR, + SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR, +} + + +def build_composio_credentials(connected_account_id: str) -> Credentials: + """ + Build Google OAuth Credentials backed by Composio's token management. + + The returned Credentials object uses a refresh_handler that fetches + fresh access tokens from Composio on demand, so it works seamlessly + with googleapiclient.discovery.build(). + """ + from app.services.composio_service import ComposioService + + service = ComposioService() + access_token = service.get_access_token(connected_account_id) + + def composio_refresh_handler(request, scopes): + fresh_token = service.get_access_token(connected_account_id) + expiry = datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=55) + return fresh_token, expiry + + return Credentials( + token=access_token, + expiry=datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=55), + refresh_handler=composio_refresh_handler, + ) diff --git a/surfsense_web/components/assistant-ui/connector-popup/utils/connector-document-mapping.ts b/surfsense_web/components/assistant-ui/connector-popup/utils/connector-document-mapping.ts index 090207bbb..9bf3b61e4 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/utils/connector-document-mapping.ts +++ b/surfsense_web/components/assistant-ui/connector-popup/utils/connector-document-mapping.ts @@ -31,10 +31,10 @@ export const CONNECTOR_TO_DOCUMENT_TYPE: Record = { // Special mappings (connector type differs from document type) GOOGLE_DRIVE_CONNECTOR: "GOOGLE_DRIVE_FILE", WEBCRAWLER_CONNECTOR: "CRAWLED_URL", - // Composio connectors map to their own document types - COMPOSIO_GOOGLE_DRIVE_CONNECTOR: "COMPOSIO_GOOGLE_DRIVE_CONNECTOR", - COMPOSIO_GMAIL_CONNECTOR: "COMPOSIO_GMAIL_CONNECTOR", - COMPOSIO_GOOGLE_CALENDAR_CONNECTOR: "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR", + // Composio connectors map to unified Google document types + COMPOSIO_GOOGLE_DRIVE_CONNECTOR: "GOOGLE_DRIVE_FILE", + COMPOSIO_GMAIL_CONNECTOR: "GOOGLE_GMAIL_CONNECTOR", + COMPOSIO_GOOGLE_CALENDAR_CONNECTOR: "GOOGLE_CALENDAR_CONNECTOR", }; /**