diff --git a/surfsense_backend/app/connectors/composio_connector.py b/surfsense_backend/app/connectors/composio_connector.py index 8cb91355d..301296378 100644 --- a/surfsense_backend/app/connectors/composio_connector.py +++ b/surfsense_backend/app/connectors/composio_connector.py @@ -1,7 +1,7 @@ """ -Composio Connector Module. +Composio Connector Base Module. -Provides a unified interface for interacting with various services via Composio, +Provides a base class for interacting with various services via Composio, primarily used during indexing operations. """ @@ -19,10 +19,10 @@ logger = logging.getLogger(__name__) class ComposioConnector: """ - Generic Composio connector for data retrieval. + Base Composio connector for data retrieval. Wraps the ComposioService to provide toolkit-specific data access - for indexing operations. + for indexing operations. Subclasses implement toolkit-specific methods. """ def __init__( @@ -89,354 +89,12 @@ class ComposioConnector: toolkit_id = await self.get_toolkit_id() return toolkit_id in INDEXABLE_TOOLKITS - # ===== Google Drive Methods ===== + @property + def session(self) -> AsyncSession: + """Get the database session.""" + return self._session - async def list_drive_files( - self, - folder_id: str | None = None, - page_token: str | None = None, - page_size: int = 100, - ) -> tuple[list[dict[str, Any]], str | None, str | None]: - """ - List files from Google Drive via Composio. - - Args: - folder_id: Optional folder ID to list contents of. - page_token: Pagination token. - page_size: Number of files per page. - - Returns: - Tuple of (files list, next_page_token, error message). - """ - connected_account_id = await self.get_connected_account_id() - if not connected_account_id: - return [], None, "No connected account ID found" - - entity_id = await self.get_entity_id() - service = await self._get_service() - return await service.get_drive_files( - connected_account_id=connected_account_id, - entity_id=entity_id, - folder_id=folder_id, - page_token=page_token, - page_size=page_size, - ) - - async def get_drive_file_content( - self, file_id: str - ) -> tuple[bytes | None, str | None]: - """ - Download file content from Google Drive via Composio. - - Args: - file_id: Google Drive file ID. - - Returns: - Tuple of (file content bytes, error message). - """ - connected_account_id = await self.get_connected_account_id() - if not connected_account_id: - return None, "No connected account ID found" - - entity_id = await self.get_entity_id() - service = await self._get_service() - return await service.get_drive_file_content( - connected_account_id=connected_account_id, - entity_id=entity_id, - file_id=file_id, - ) - - async def get_drive_start_page_token(self) -> tuple[str | None, str | None]: - """ - Get the starting page token for Google Drive change tracking. - - Returns: - Tuple of (start_page_token, error message). - """ - connected_account_id = await self.get_connected_account_id() - if not connected_account_id: - return None, "No connected account ID found" - - entity_id = await self.get_entity_id() - service = await self._get_service() - return await service.get_drive_start_page_token( - connected_account_id=connected_account_id, - entity_id=entity_id, - ) - - async def list_drive_changes( - self, - page_token: str | None = None, - page_size: int = 100, - include_removed: bool = True, - ) -> tuple[list[dict[str, Any]], str | None, str | None]: - """ - List changes in Google Drive since the given page token. - - Args: - page_token: Page token from previous sync (optional). - page_size: Number of changes per page. - include_removed: Whether to include removed items. - - Returns: - Tuple of (changes list, new_start_page_token, error message). - """ - connected_account_id = await self.get_connected_account_id() - if not connected_account_id: - return [], None, "No connected account ID found" - - entity_id = await self.get_entity_id() - service = await self._get_service() - return await service.list_drive_changes( - connected_account_id=connected_account_id, - entity_id=entity_id, - page_token=page_token, - page_size=page_size, - include_removed=include_removed, - ) - - # ===== Gmail Methods ===== - - async def list_gmail_messages( - self, - query: str = "", - max_results: int = 50, - page_token: str | None = None, - ) -> tuple[list[dict[str, Any]], str | None, int | None, str | None]: - """ - List Gmail messages via Composio with pagination support. - - Args: - query: Gmail search query. - max_results: Maximum number of messages per page (default: 50). - page_token: Optional pagination token for next page. - - Returns: - Tuple of (messages list, next_page_token, result_size_estimate, error message). - """ - connected_account_id = await self.get_connected_account_id() - if not connected_account_id: - return [], None, None, "No connected account ID found" - - entity_id = await self.get_entity_id() - service = await self._get_service() - return await service.get_gmail_messages( - connected_account_id=connected_account_id, - entity_id=entity_id, - query=query, - max_results=max_results, - page_token=page_token, - ) - - async def get_gmail_message_detail( - self, message_id: str - ) -> tuple[dict[str, Any] | None, str | None]: - """ - Get full details of a Gmail message via Composio. - - Args: - message_id: Gmail message ID. - - Returns: - Tuple of (message details, error message). - """ - connected_account_id = await self.get_connected_account_id() - if not connected_account_id: - return None, "No connected account ID found" - - entity_id = await self.get_entity_id() - service = await self._get_service() - return await service.get_gmail_message_detail( - connected_account_id=connected_account_id, - entity_id=entity_id, - message_id=message_id, - ) - - # ===== Google Calendar Methods ===== - - async def list_calendar_events( - self, - time_min: str | None = None, - time_max: str | None = None, - max_results: int = 250, - ) -> tuple[list[dict[str, Any]], str | None]: - """ - List Google Calendar events via Composio. - - Args: - time_min: Start time (RFC3339 format). - time_max: End time (RFC3339 format). - max_results: Maximum number of events. - - Returns: - Tuple of (events list, error message). - """ - connected_account_id = await self.get_connected_account_id() - if not connected_account_id: - return [], "No connected account ID found" - - entity_id = await self.get_entity_id() - service = await self._get_service() - return await service.get_calendar_events( - connected_account_id=connected_account_id, - entity_id=entity_id, - time_min=time_min, - time_max=time_max, - max_results=max_results, - ) - - # ===== Utility Methods ===== - - def format_gmail_message_to_markdown(self, message: dict[str, Any]) -> str: - """ - Format a Gmail message to markdown. - - Args: - message: Message object from Composio's GMAIL_FETCH_EMAILS response. - Composio structure: messageId, messageText, messageTimestamp, - payload.headers, labelIds, attachmentList - - Returns: - Formatted markdown string. - """ - try: - # Composio uses 'messageId' (camelCase) - message_id = message.get("messageId", "") or message.get("id", "") - label_ids = message.get("labelIds", []) - - # Extract headers from payload - payload = message.get("payload", {}) - headers = payload.get("headers", []) - - # Parse headers into a dict - header_dict = {} - for header in headers: - name = header.get("name", "").lower() - value = header.get("value", "") - header_dict[name] = value - - # Extract key information - subject = header_dict.get("subject", "No Subject") - from_email = header_dict.get("from", "Unknown Sender") - to_email = header_dict.get("to", "Unknown Recipient") - # Composio provides messageTimestamp directly - date_str = message.get("messageTimestamp", "") or header_dict.get( - "date", "Unknown Date" - ) - - # Build markdown content - markdown_content = f"# {subject}\n\n" - markdown_content += f"**From:** {from_email}\n" - markdown_content += f"**To:** {to_email}\n" - markdown_content += f"**Date:** {date_str}\n" - - if label_ids: - markdown_content += f"**Labels:** {', '.join(label_ids)}\n" - - markdown_content += "\n---\n\n" - - # Composio provides full message text in 'messageText' - message_text = message.get("messageText", "") - if message_text: - markdown_content += f"## Content\n\n{message_text}\n\n" - else: - # Fallback to snippet if no messageText - snippet = message.get("snippet", "") - if snippet: - markdown_content += f"## Preview\n\n{snippet}\n\n" - - # Add attachment info if present - attachments = message.get("attachmentList", []) - if attachments: - markdown_content += "## Attachments\n\n" - for att in attachments: - att_name = att.get("filename", att.get("name", "Unknown")) - markdown_content += f"- {att_name}\n" - markdown_content += "\n" - - # Add message metadata - markdown_content += "## Message Details\n\n" - markdown_content += f"- **Message ID:** {message_id}\n" - - return markdown_content - - except Exception as e: - return f"Error formatting message to markdown: {e!s}" - - def format_calendar_event_to_markdown(self, event: dict[str, Any]) -> str: - """ - Format a Google Calendar event to markdown. - - Args: - event: Event object from Google Calendar API. - - Returns: - Formatted markdown string. - """ - from datetime import datetime - - try: - # Extract basic event information - summary = event.get("summary", "No Title") - description = event.get("description", "") - location = event.get("location", "") - - # Extract start and end times - start = event.get("start", {}) - end = event.get("end", {}) - - start_time = start.get("dateTime") or start.get("date", "") - end_time = end.get("dateTime") or end.get("date", "") - - # Format times for display - def format_time(time_str: str) -> str: - if not time_str: - return "Unknown" - try: - if "T" in time_str: - dt = datetime.fromisoformat(time_str.replace("Z", "+00:00")) - return dt.strftime("%Y-%m-%d %H:%M") - return time_str - except Exception: - return time_str - - start_formatted = format_time(start_time) - end_formatted = format_time(end_time) - - # Extract attendees - attendees = event.get("attendees", []) - attendee_list = [] - for attendee in attendees: - email = attendee.get("email", "") - display_name = attendee.get("displayName", email) - response_status = attendee.get("responseStatus", "") - attendee_list.append(f"- {display_name} ({response_status})") - - # Build markdown content - markdown_content = f"# {summary}\n\n" - markdown_content += f"**Start:** {start_formatted}\n" - markdown_content += f"**End:** {end_formatted}\n" - - if location: - markdown_content += f"**Location:** {location}\n" - - markdown_content += "\n" - - if description: - markdown_content += f"## Description\n\n{description}\n\n" - - if attendee_list: - markdown_content += "## Attendees\n\n" - markdown_content += "\n".join(attendee_list) - markdown_content += "\n\n" - - # Add event metadata - markdown_content += "## Event Details\n\n" - markdown_content += f"- **Event ID:** {event.get('id', 'Unknown')}\n" - markdown_content += f"- **Created:** {event.get('created', 'Unknown')}\n" - markdown_content += f"- **Updated:** {event.get('updated', 'Unknown')}\n" - - return markdown_content - - except Exception as e: - return f"Error formatting event to markdown: {e!s}" + @property + def connector_id(self) -> int: + """Get the connector ID.""" + return self._connector_id diff --git a/surfsense_backend/app/connectors/composio_gmail_connector.py b/surfsense_backend/app/connectors/composio_gmail_connector.py new file mode 100644 index 000000000..5a9645a66 --- /dev/null +++ b/surfsense_backend/app/connectors/composio_gmail_connector.py @@ -0,0 +1,614 @@ +""" +Composio Gmail Connector Module. + +Provides Gmail specific methods for data retrieval and indexing via Composio. +""" + +import logging +from datetime import UTC, datetime +from typing import Any + +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select +from sqlalchemy.orm import selectinload + +from app.config import config +from app.connectors.composio_connector import ComposioConnector +from app.db import Document, DocumentType +from app.services.composio_service import TOOLKIT_TO_DOCUMENT_TYPE +from app.services.llm_service import get_user_long_context_llm +from app.services.task_logging_service import TaskLoggingService +from app.tasks.connector_indexers.base import calculate_date_range +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, + generate_unique_identifier_hash, +) + +logger = logging.getLogger(__name__) + + +def get_current_timestamp() -> datetime: + """Get the current timestamp with timezone for updated_at field.""" + return datetime.now(UTC) + + +async def check_document_by_unique_identifier( + session: AsyncSession, unique_identifier_hash: str +) -> Document | None: + """Check if a document with the given unique identifier hash already exists.""" + existing_doc_result = await session.execute( + select(Document) + .options(selectinload(Document.chunks)) + .where(Document.unique_identifier_hash == unique_identifier_hash) + ) + return existing_doc_result.scalars().first() + + +async def update_connector_last_indexed( + session: AsyncSession, + connector, + update_last_indexed: bool = True, +) -> None: + """Update the last_indexed_at timestamp for a connector.""" + if update_last_indexed: + connector.last_indexed_at = datetime.now(UTC) + logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}") + + +class ComposioGmailConnector(ComposioConnector): + """ + Gmail specific Composio connector. + + Provides methods for listing messages, getting message details, and formatting + Gmail messages from Gmail via Composio. + """ + + async def list_gmail_messages( + self, + query: str = "", + max_results: int = 50, + page_token: str | None = None, + ) -> tuple[list[dict[str, Any]], str | None, int | None, str | None]: + """ + List Gmail messages via Composio with pagination support. + + Args: + query: Gmail search query. + max_results: Maximum number of messages per page (default: 50). + page_token: Optional pagination token for next page. + + Returns: + Tuple of (messages list, next_page_token, result_size_estimate, error message). + """ + connected_account_id = await self.get_connected_account_id() + if not connected_account_id: + return [], None, None, "No connected account ID found" + + entity_id = await self.get_entity_id() + service = await self._get_service() + return await service.get_gmail_messages( + connected_account_id=connected_account_id, + entity_id=entity_id, + query=query, + max_results=max_results, + page_token=page_token, + ) + + async def get_gmail_message_detail( + self, message_id: str + ) -> tuple[dict[str, Any] | None, str | None]: + """ + Get full details of a Gmail message via Composio. + + Args: + message_id: Gmail message ID. + + Returns: + Tuple of (message details, error message). + """ + connected_account_id = await self.get_connected_account_id() + if not connected_account_id: + return None, "No connected account ID found" + + entity_id = await self.get_entity_id() + service = await self._get_service() + return await service.get_gmail_message_detail( + connected_account_id=connected_account_id, + entity_id=entity_id, + message_id=message_id, + ) + + def format_gmail_message_to_markdown(self, message: dict[str, Any]) -> str: + """ + Format a Gmail message to markdown. + + Args: + message: Message object from Composio's GMAIL_FETCH_EMAILS response. + Composio structure: messageId, messageText, messageTimestamp, + payload.headers, labelIds, attachmentList + + Returns: + Formatted markdown string. + """ + try: + # Composio uses 'messageId' (camelCase) + message_id = message.get("messageId", "") or message.get("id", "") + label_ids = message.get("labelIds", []) + + # Extract headers from payload + payload = message.get("payload", {}) + headers = payload.get("headers", []) + + # Parse headers into a dict + header_dict = {} + for header in headers: + name = header.get("name", "").lower() + value = header.get("value", "") + header_dict[name] = value + + # Extract key information + subject = header_dict.get("subject", "No Subject") + from_email = header_dict.get("from", "Unknown Sender") + to_email = header_dict.get("to", "Unknown Recipient") + # Composio provides messageTimestamp directly + date_str = message.get("messageTimestamp", "") or header_dict.get( + "date", "Unknown Date" + ) + + # Build markdown content + markdown_content = f"# {subject}\n\n" + markdown_content += f"**From:** {from_email}\n" + markdown_content += f"**To:** {to_email}\n" + markdown_content += f"**Date:** {date_str}\n" + + if label_ids: + markdown_content += f"**Labels:** {', '.join(label_ids)}\n" + + markdown_content += "\n---\n\n" + + # Composio provides full message text in 'messageText' + message_text = message.get("messageText", "") + if message_text: + markdown_content += f"## Content\n\n{message_text}\n\n" + else: + # Fallback to snippet if no messageText + snippet = message.get("snippet", "") + if snippet: + markdown_content += f"## Preview\n\n{snippet}\n\n" + + # Add attachment info if present + attachments = message.get("attachmentList", []) + if attachments: + markdown_content += "## Attachments\n\n" + for att in attachments: + att_name = att.get("filename", att.get("name", "Unknown")) + markdown_content += f"- {att_name}\n" + markdown_content += "\n" + + # Add message metadata + markdown_content += "## Message Details\n\n" + markdown_content += f"- **Message ID:** {message_id}\n" + + return markdown_content + + except Exception as e: + return f"Error formatting message to markdown: {e!s}" + + +# ============ Indexer Functions ============ + + +async def _process_gmail_message_batch( + session: AsyncSession, + messages: list[dict[str, Any]], + composio_connector: ComposioGmailConnector, + connector_id: int, + search_space_id: int, + user_id: str, + total_documents_indexed: int = 0, +) -> tuple[int, int]: + """ + Process a batch of Gmail messages and index them. + + Args: + total_documents_indexed: Running total of documents indexed so far (for batch commits). + + Returns: + Tuple of (documents_indexed, documents_skipped) + """ + documents_indexed = 0 + documents_skipped = 0 + + for message in messages: + try: + # Composio uses 'messageId' (camelCase), not 'id' + message_id = message.get("messageId", "") or message.get("id", "") + if not message_id: + documents_skipped += 1 + continue + + # Composio's GMAIL_FETCH_EMAILS already returns full message content + # No need for a separate detail API call + + # Extract message info from Composio response + # Composio structure: messageId, messageText, messageTimestamp, payload.headers, labelIds + payload = message.get("payload", {}) + headers = payload.get("headers", []) + + subject = "No Subject" + sender = "Unknown Sender" + date_str = message.get("messageTimestamp", "Unknown Date") + + for header in headers: + name = header.get("name", "").lower() + value = header.get("value", "") + if name == "subject": + subject = value + elif name == "from": + sender = value + elif name == "date": + date_str = value + + # Format to markdown using the full message data + markdown_content = composio_connector.format_gmail_message_to_markdown( + message + ) + + # Check for empty content (defensive parsing per Composio best practices) + if not markdown_content.strip(): + logger.warning(f"Skipping Gmail message with no content: {subject}") + documents_skipped += 1 + continue + + # Generate unique identifier + document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["gmail"]) + unique_identifier_hash = generate_unique_identifier_hash( + document_type, f"gmail_{message_id}", search_space_id + ) + + content_hash = generate_content_hash(markdown_content, search_space_id) + + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + + # Get label IDs from Composio response + label_ids = message.get("labelIds", []) + # Extract thread_id if available (for consistency with non-Composio implementation) + thread_id = message.get("threadId", "") or message.get("thread_id", "") + + if existing_document: + if existing_document.content_hash == content_hash: + documents_skipped += 1 + continue + + # Update existing + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata = { + "message_id": message_id, + "thread_id": thread_id, + "subject": subject, + "sender": sender, + "document_type": "Gmail Message (Composio)", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + markdown_content, user_llm, document_metadata + ) + else: + summary_content = ( + f"Gmail: {subject}\n\nFrom: {sender}\nDate: {date_str}" + ) + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + chunks = await create_document_chunks(markdown_content) + + existing_document.title = f"Gmail: {subject}" + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = { + "message_id": message_id, + "thread_id": thread_id, + "subject": subject, + "sender": sender, + "date": date_str, + "labels": label_ids, + "connector_id": connector_id, + "source": "composio", + } + existing_document.chunks = chunks + existing_document.updated_at = get_current_timestamp() + + documents_indexed += 1 + + # Batch commit every 10 documents + current_total = total_documents_indexed + documents_indexed + if current_total % 10 == 0: + logger.info( + f"Committing batch: {current_total} Gmail messages processed so far" + ) + await session.commit() + continue + + # Create new document + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata = { + "message_id": message_id, + "thread_id": thread_id, + "subject": subject, + "sender": sender, + "document_type": "Gmail Message (Composio)", + } + summary_content, summary_embedding = await generate_document_summary( + markdown_content, user_llm, document_metadata + ) + else: + summary_content = ( + f"Gmail: {subject}\n\nFrom: {sender}\nDate: {date_str}" + ) + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + chunks = await create_document_chunks(markdown_content) + + document = Document( + search_space_id=search_space_id, + title=f"Gmail: {subject}", + document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["gmail"]), + document_metadata={ + "message_id": message_id, + "thread_id": thread_id, + "subject": subject, + "sender": sender, + "date": date_str, + "labels": label_ids, + "connector_id": connector_id, + "toolkit_id": "gmail", + "source": "composio", + }, + content=summary_content, + content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, + embedding=summary_embedding, + chunks=chunks, + updated_at=get_current_timestamp(), + ) + session.add(document) + documents_indexed += 1 + + # Batch commit every 10 documents + current_total = total_documents_indexed + documents_indexed + if current_total % 10 == 0: + logger.info( + f"Committing batch: {current_total} Gmail messages processed so far" + ) + await session.commit() + + except Exception as e: + logger.error(f"Error processing Gmail message: {e!s}", exc_info=True) + documents_skipped += 1 + # Rollback on error to avoid partial state (per Composio best practices) + try: + await session.rollback() + except Exception as rollback_error: + logger.error( + f"Error during rollback: {rollback_error!s}", exc_info=True + ) + continue + + return documents_indexed, documents_skipped + + +async def index_composio_gmail( + session: AsyncSession, + connector, + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str | None, + end_date: str | None, + task_logger: TaskLoggingService, + log_entry, + update_last_indexed: bool = True, + max_items: int = 1000, +) -> tuple[int, str]: + """Index Gmail messages via Composio with pagination and incremental processing.""" + try: + composio_connector = ComposioGmailConnector(session, connector_id) + + # Normalize date values - handle "undefined" strings from frontend + if start_date == "undefined" or start_date == "": + start_date = None + if end_date == "undefined" or end_date == "": + end_date = None + + # Use provided dates directly if both are provided, otherwise calculate from last_indexed_at + # This ensures user-selected dates are respected (matching non-Composio Gmail connector behavior) + if start_date is not None and end_date is not None: + # User provided both dates - use them directly + start_date_str = start_date + end_date_str = end_date + else: + # Calculate date range with defaults (uses last_indexed_at or 365 days back) + # This ensures indexing works even when user doesn't specify dates + start_date_str, end_date_str = calculate_date_range( + connector, start_date, end_date, default_days_back=365 + ) + + # Build query with date range + query_parts = [] + if start_date_str: + query_parts.append(f"after:{start_date_str.replace('-', '/')}") + if end_date_str: + query_parts.append(f"before:{end_date_str.replace('-', '/')}") + query = " ".join(query_parts) if query_parts else "" + + logger.info( + f"Gmail query for connector {connector_id}: '{query}' " + f"(start_date={start_date_str}, end_date={end_date_str})" + ) + + # Use smaller batch size to avoid 413 payload too large errors + batch_size = 50 + page_token = None + total_documents_indexed = 0 + total_documents_skipped = 0 + total_messages_fetched = 0 + result_size_estimate = None # Will be set from first API response + + while total_messages_fetched < max_items: + # Calculate how many messages to fetch in this batch + remaining = max_items - total_messages_fetched + current_batch_size = min(batch_size, remaining) + + # Use result_size_estimate if available, otherwise fall back to max_items + estimated_total = ( + result_size_estimate if result_size_estimate is not None else max_items + ) + # Cap estimated_total at max_items to avoid showing misleading progress + estimated_total = min(estimated_total, max_items) + + await task_logger.log_task_progress( + log_entry, + f"Fetching Gmail messages batch via Composio for connector {connector_id} " + f"({total_messages_fetched}/{estimated_total} fetched, {total_documents_indexed} indexed)", + { + "stage": "fetching_messages", + "batch_size": current_batch_size, + "total_fetched": total_messages_fetched, + "total_indexed": total_documents_indexed, + "estimated_total": estimated_total, + }, + ) + + # Fetch batch of messages + ( + messages, + next_token, + result_size_estimate_batch, + error, + ) = await composio_connector.list_gmail_messages( + query=query, + max_results=current_batch_size, + page_token=page_token, + ) + + if error: + await task_logger.log_task_failure( + log_entry, f"Failed to fetch Gmail messages: {error}", {} + ) + return 0, f"Failed to fetch Gmail messages: {error}" + + if not messages: + # No more messages available + break + + # Update result_size_estimate from first response (Gmail provides this estimate) + if result_size_estimate is None and result_size_estimate_batch is not None: + result_size_estimate = result_size_estimate_batch + logger.info( + f"Gmail API estimated {result_size_estimate} total messages for query: '{query}'" + ) + + total_messages_fetched += len(messages) + # Recalculate estimated_total after potentially updating result_size_estimate + estimated_total = ( + result_size_estimate if result_size_estimate is not None else max_items + ) + estimated_total = min(estimated_total, max_items) + + logger.info( + f"Fetched batch of {len(messages)} Gmail messages " + f"(total: {total_messages_fetched}/{estimated_total})" + ) + + # Process batch incrementally + batch_indexed, batch_skipped = await _process_gmail_message_batch( + session=session, + messages=messages, + composio_connector=composio_connector, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + total_documents_indexed=total_documents_indexed, + ) + + total_documents_indexed += batch_indexed + total_documents_skipped += batch_skipped + + logger.info( + f"Processed batch: {batch_indexed} indexed, {batch_skipped} skipped " + f"(total: {total_documents_indexed} indexed, {total_documents_skipped} skipped)" + ) + + # Batch commits happen in _process_gmail_message_batch every 10 documents + # This ensures progress is saved incrementally, preventing data loss on crashes + + # Check if we should continue + if not next_token: + # No more pages available + break + + if len(messages) < current_batch_size: + # Last page had fewer items than requested, we're done + break + + # Continue with next page + page_token = next_token + + if total_messages_fetched == 0: + success_msg = "No Gmail messages found in the specified date range" + await task_logger.log_task_success( + log_entry, success_msg, {"messages_count": 0} + ) + # CRITICAL: Update timestamp even when no messages found so Electric SQL syncs and UI shows indexed status + await update_connector_last_indexed(session, connector, update_last_indexed) + await session.commit() + return 0, None # Return None (not error) when no items found + + # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs + # This ensures the UI shows "Last indexed" instead of "Never indexed" + await update_connector_last_indexed(session, connector, update_last_indexed) + + # Final commit to ensure all documents are persisted (safety net) + # This matches the pattern used in non-Composio Gmail indexer + logger.info( + f"Final commit: Total {total_documents_indexed} Gmail messages processed" + ) + await session.commit() + logger.info( + "Successfully committed all Composio Gmail document changes to database" + ) + + await task_logger.log_task_success( + log_entry, + f"Successfully completed Gmail indexing via Composio for connector {connector_id}", + { + "documents_indexed": total_documents_indexed, + "documents_skipped": total_documents_skipped, + "messages_fetched": total_messages_fetched, + }, + ) + + return total_documents_indexed, None + + except Exception as e: + logger.error(f"Failed to index Gmail via Composio: {e!s}", exc_info=True) + return 0, f"Failed to index Gmail via Composio: {e!s}" + diff --git a/surfsense_backend/app/connectors/composio_google_calendar_connector.py b/surfsense_backend/app/connectors/composio_google_calendar_connector.py new file mode 100644 index 000000000..ab8bde53c --- /dev/null +++ b/surfsense_backend/app/connectors/composio_google_calendar_connector.py @@ -0,0 +1,453 @@ +""" +Composio Google Calendar Connector Module. + +Provides Google Calendar specific methods for data retrieval and indexing via Composio. +""" + +import logging +from datetime import UTC, datetime +from typing import Any + +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select +from sqlalchemy.orm import selectinload + +from app.config import config +from app.connectors.composio_connector import ComposioConnector +from app.db import Document, DocumentType +from app.services.composio_service import TOOLKIT_TO_DOCUMENT_TYPE +from app.services.llm_service import get_user_long_context_llm +from app.services.task_logging_service import TaskLoggingService +from app.tasks.connector_indexers.base import calculate_date_range +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, + generate_unique_identifier_hash, +) + +logger = logging.getLogger(__name__) + + +def get_current_timestamp() -> datetime: + """Get the current timestamp with timezone for updated_at field.""" + return datetime.now(UTC) + + +async def check_document_by_unique_identifier( + session: AsyncSession, unique_identifier_hash: str +) -> Document | None: + """Check if a document with the given unique identifier hash already exists.""" + existing_doc_result = await session.execute( + select(Document) + .options(selectinload(Document.chunks)) + .where(Document.unique_identifier_hash == unique_identifier_hash) + ) + return existing_doc_result.scalars().first() + + +async def update_connector_last_indexed( + session: AsyncSession, + connector, + update_last_indexed: bool = True, +) -> None: + """Update the last_indexed_at timestamp for a connector.""" + if update_last_indexed: + connector.last_indexed_at = datetime.now(UTC) + logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}") + + +class ComposioGoogleCalendarConnector(ComposioConnector): + """ + Google Calendar specific Composio connector. + + Provides methods for listing calendar events and formatting them from + Google Calendar via Composio. + """ + + async def list_calendar_events( + self, + time_min: str | None = None, + time_max: str | None = None, + max_results: int = 250, + ) -> tuple[list[dict[str, Any]], str | None]: + """ + List Google Calendar events via Composio. + + Args: + time_min: Start time (RFC3339 format). + time_max: End time (RFC3339 format). + max_results: Maximum number of events. + + Returns: + Tuple of (events list, error message). + """ + connected_account_id = await self.get_connected_account_id() + if not connected_account_id: + return [], "No connected account ID found" + + entity_id = await self.get_entity_id() + service = await self._get_service() + return await service.get_calendar_events( + connected_account_id=connected_account_id, + entity_id=entity_id, + time_min=time_min, + time_max=time_max, + max_results=max_results, + ) + + def format_calendar_event_to_markdown(self, event: dict[str, Any]) -> str: + """ + Format a Google Calendar event to markdown. + + Args: + event: Event object from Google Calendar API. + + Returns: + Formatted markdown string. + """ + try: + # Extract basic event information + summary = event.get("summary", "No Title") + description = event.get("description", "") + location = event.get("location", "") + + # Extract start and end times + start = event.get("start", {}) + end = event.get("end", {}) + + start_time = start.get("dateTime") or start.get("date", "") + end_time = end.get("dateTime") or end.get("date", "") + + # Format times for display + def format_time(time_str: str) -> str: + if not time_str: + return "Unknown" + try: + if "T" in time_str: + dt = datetime.fromisoformat(time_str.replace("Z", "+00:00")) + return dt.strftime("%Y-%m-%d %H:%M") + return time_str + except Exception: + return time_str + + start_formatted = format_time(start_time) + end_formatted = format_time(end_time) + + # Extract attendees + attendees = event.get("attendees", []) + attendee_list = [] + for attendee in attendees: + email = attendee.get("email", "") + display_name = attendee.get("displayName", email) + response_status = attendee.get("responseStatus", "") + attendee_list.append(f"- {display_name} ({response_status})") + + # Build markdown content + markdown_content = f"# {summary}\n\n" + markdown_content += f"**Start:** {start_formatted}\n" + markdown_content += f"**End:** {end_formatted}\n" + + if location: + markdown_content += f"**Location:** {location}\n" + + markdown_content += "\n" + + if description: + markdown_content += f"## Description\n\n{description}\n\n" + + if attendee_list: + markdown_content += "## Attendees\n\n" + markdown_content += "\n".join(attendee_list) + markdown_content += "\n\n" + + # Add event metadata + markdown_content += "## Event Details\n\n" + markdown_content += f"- **Event ID:** {event.get('id', 'Unknown')}\n" + markdown_content += f"- **Created:** {event.get('created', 'Unknown')}\n" + markdown_content += f"- **Updated:** {event.get('updated', 'Unknown')}\n" + + return markdown_content + + except Exception as e: + return f"Error formatting event to markdown: {e!s}" + + +# ============ Indexer Functions ============ + + +async def index_composio_google_calendar( + session: AsyncSession, + connector, + connector_id: int, + search_space_id: int, + user_id: str, + start_date: str | None, + end_date: str | None, + task_logger: TaskLoggingService, + log_entry, + update_last_indexed: bool = True, + max_items: int = 2500, +) -> tuple[int, str]: + """Index Google Calendar events via Composio.""" + try: + composio_connector = ComposioGoogleCalendarConnector(session, connector_id) + + await task_logger.log_task_progress( + log_entry, + f"Fetching Google Calendar events via Composio for connector {connector_id}", + {"stage": "fetching_events"}, + ) + + # Normalize date values - handle "undefined" strings from frontend + if start_date == "undefined" or start_date == "": + start_date = None + if end_date == "undefined" or end_date == "": + end_date = None + + # Use provided dates directly if both are provided, otherwise calculate from last_indexed_at + # This ensures user-selected dates are respected (matching non-Composio Calendar connector behavior) + if start_date is not None and end_date is not None: + # User provided both dates - use them directly + start_date_str = start_date + end_date_str = end_date + else: + # Calculate date range with defaults (uses last_indexed_at or 365 days back) + # This ensures indexing works even when user doesn't specify dates + start_date_str, end_date_str = calculate_date_range( + connector, start_date, end_date, default_days_back=365 + ) + + # Build time range for API call + time_min = f"{start_date_str}T00:00:00Z" + time_max = f"{end_date_str}T23:59:59Z" + + logger.info( + f"Google Calendar query for connector {connector_id}: " + f"(start_date={start_date_str}, end_date={end_date_str})" + ) + + events, error = await composio_connector.list_calendar_events( + time_min=time_min, + time_max=time_max, + max_results=max_items, + ) + + if error: + await task_logger.log_task_failure( + log_entry, f"Failed to fetch Calendar events: {error}", {} + ) + return 0, f"Failed to fetch Calendar events: {error}" + + if not events: + success_msg = "No Google Calendar events found in the specified date range" + await task_logger.log_task_success( + log_entry, success_msg, {"events_count": 0} + ) + # CRITICAL: Update timestamp even when no events found so Electric SQL syncs and UI shows indexed status + await update_connector_last_indexed(session, connector, update_last_indexed) + await session.commit() + return ( + 0, + None, + ) # Return None (not error) when no items found - this is success with 0 items + + logger.info(f"Found {len(events)} Google Calendar events to index via Composio") + + documents_indexed = 0 + documents_skipped = 0 + + for event in events: + try: + # Handle both standard Google API and potential Composio variations + event_id = event.get("id", "") or event.get("eventId", "") + summary = ( + event.get("summary", "") or event.get("title", "") or "No Title" + ) + + if not event_id: + documents_skipped += 1 + continue + + # Format to markdown + markdown_content = composio_connector.format_calendar_event_to_markdown( + event + ) + + # Generate unique identifier + document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googlecalendar"]) + unique_identifier_hash = generate_unique_identifier_hash( + document_type, f"calendar_{event_id}", search_space_id + ) + + content_hash = generate_content_hash(markdown_content, search_space_id) + + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + + # Extract event times + start = event.get("start", {}) + end = event.get("end", {}) + start_time = start.get("dateTime") or start.get("date", "") + end_time = end.get("dateTime") or end.get("date", "") + location = event.get("location", "") + + if existing_document: + if existing_document.content_hash == content_hash: + documents_skipped += 1 + continue + + # Update existing + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata = { + "event_id": event_id, + "summary": summary, + "start_time": start_time, + "document_type": "Google Calendar Event (Composio)", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + markdown_content, user_llm, document_metadata + ) + else: + summary_content = f"Calendar: {summary}\n\nStart: {start_time}\nEnd: {end_time}" + if location: + summary_content += f"\nLocation: {location}" + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + chunks = await create_document_chunks(markdown_content) + + existing_document.title = f"Calendar: {summary}" + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = { + "event_id": event_id, + "summary": summary, + "start_time": start_time, + "end_time": end_time, + "location": location, + "connector_id": connector_id, + "source": "composio", + } + existing_document.chunks = chunks + existing_document.updated_at = get_current_timestamp() + + documents_indexed += 1 + + # Batch commit every 10 documents + if documents_indexed % 10 == 0: + logger.info( + f"Committing batch: {documents_indexed} Google Calendar events processed so far" + ) + await session.commit() + continue + + # Create new document + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata = { + "event_id": event_id, + "summary": summary, + "start_time": start_time, + "document_type": "Google Calendar Event (Composio)", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + markdown_content, user_llm, document_metadata + ) + else: + summary_content = ( + f"Calendar: {summary}\n\nStart: {start_time}\nEnd: {end_time}" + ) + if location: + summary_content += f"\nLocation: {location}" + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + chunks = await create_document_chunks(markdown_content) + + document = Document( + search_space_id=search_space_id, + title=f"Calendar: {summary}", + document_type=DocumentType( + TOOLKIT_TO_DOCUMENT_TYPE["googlecalendar"] + ), + document_metadata={ + "event_id": event_id, + "summary": summary, + "start_time": start_time, + "end_time": end_time, + "location": location, + "connector_id": connector_id, + "toolkit_id": "googlecalendar", + "source": "composio", + }, + content=summary_content, + content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, + embedding=summary_embedding, + chunks=chunks, + updated_at=get_current_timestamp(), + ) + session.add(document) + documents_indexed += 1 + + # Batch commit every 10 documents + if documents_indexed % 10 == 0: + logger.info( + f"Committing batch: {documents_indexed} Google Calendar events processed so far" + ) + await session.commit() + + except Exception as e: + logger.error(f"Error processing Calendar event: {e!s}", exc_info=True) + documents_skipped += 1 + continue + + # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs + # This ensures the UI shows "Last indexed" instead of "Never indexed" + await update_connector_last_indexed(session, connector, update_last_indexed) + + # Final commit to ensure all documents are persisted (safety net) + # This matches the pattern used in non-Composio Gmail indexer + logger.info( + f"Final commit: Total {documents_indexed} Google Calendar events processed" + ) + await session.commit() + logger.info( + "Successfully committed all Composio Google Calendar document changes to database" + ) + + await task_logger.log_task_success( + log_entry, + f"Successfully completed Google Calendar indexing via Composio for connector {connector_id}", + { + "documents_indexed": documents_indexed, + "documents_skipped": documents_skipped, + }, + ) + + return documents_indexed, None + + except Exception as e: + logger.error( + f"Failed to index Google Calendar via Composio: {e!s}", exc_info=True + ) + return 0, f"Failed to index Google Calendar via Composio: {e!s}" + diff --git a/surfsense_backend/app/connectors/composio_google_drive_connector.py b/surfsense_backend/app/connectors/composio_google_drive_connector.py new file mode 100644 index 000000000..e19436611 --- /dev/null +++ b/surfsense_backend/app/connectors/composio_google_drive_connector.py @@ -0,0 +1,1162 @@ +""" +Composio Google Drive Connector Module. + +Provides Google Drive specific methods for data retrieval and indexing via Composio. +""" + +import logging +import os +import tempfile +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm.attributes import flag_modified + +from app.config import config +from app.connectors.composio_connector import ComposioConnector +from app.db import Document, DocumentType, Log +from app.services.composio_service import TOOLKIT_TO_DOCUMENT_TYPE +from app.services.llm_service import get_user_long_context_llm +from app.services.task_logging_service import TaskLoggingService +from app.utils.document_converters import ( + create_document_chunks, + generate_content_hash, + generate_document_summary, + generate_unique_identifier_hash, +) + +logger = logging.getLogger(__name__) + + +# Binary file extensions that need file processor +BINARY_FILE_EXTENSIONS = { + ".pdf", + ".doc", + ".docx", + ".xls", + ".xlsx", + ".ppt", + ".pptx", + ".png", + ".jpg", + ".jpeg", + ".gif", + ".bmp", + ".tiff", + ".webp", + ".zip", + ".tar", + ".gz", + ".rar", + ".7z", + ".mp3", + ".mp4", + ".wav", + ".avi", + ".mov", + ".exe", + ".dll", + ".so", + ".bin", +} + +# Text file extensions that can be decoded as UTF-8 +TEXT_FILE_EXTENSIONS = { + ".txt", + ".md", + ".markdown", + ".json", + ".xml", + ".html", + ".htm", + ".css", + ".js", + ".ts", + ".py", + ".java", + ".c", + ".cpp", + ".h", + ".yaml", + ".yml", + ".toml", + ".ini", + ".cfg", + ".conf", + ".sh", + ".bash", + ".zsh", + ".fish", + ".sql", + ".csv", + ".tsv", + ".rst", + ".tex", + ".log", +} + + +def get_current_timestamp() -> datetime: + """Get the current timestamp with timezone for updated_at field.""" + return datetime.now(UTC) + + +def _is_binary_file(file_name: str, mime_type: str) -> bool: + """Check if a file is binary based on extension or mime type.""" + extension = Path(file_name).suffix.lower() + + # Check extension first + if extension in BINARY_FILE_EXTENSIONS: + return True + if extension in TEXT_FILE_EXTENSIONS: + return False + + # Check mime type + if mime_type: + if mime_type.startswith(("image/", "audio/", "video/", "application/pdf")): + return True + if mime_type.startswith(("text/", "application/json", "application/xml")): + return False + # Office documents + if ( + "spreadsheet" in mime_type + or "document" in mime_type + or "presentation" in mime_type + ): + return True + + # Default to text for unknown types + return False + + +class ComposioGoogleDriveConnector(ComposioConnector): + """ + Google Drive specific Composio connector. + + Provides methods for listing files, downloading content, and tracking changes + from Google Drive via Composio. + """ + + async def list_drive_files( + self, + folder_id: str | None = None, + page_token: str | None = None, + page_size: int = 100, + ) -> tuple[list[dict[str, Any]], str | None, str | None]: + """ + List files from Google Drive via Composio. + + Args: + folder_id: Optional folder ID to list contents of. + page_token: Pagination token. + page_size: Number of files per page. + + Returns: + Tuple of (files list, next_page_token, error message). + """ + connected_account_id = await self.get_connected_account_id() + if not connected_account_id: + return [], None, "No connected account ID found" + + entity_id = await self.get_entity_id() + service = await self._get_service() + return await service.get_drive_files( + connected_account_id=connected_account_id, + entity_id=entity_id, + folder_id=folder_id, + page_token=page_token, + page_size=page_size, + ) + + async def get_drive_file_content( + self, file_id: str + ) -> tuple[bytes | None, str | None]: + """ + Download file content from Google Drive via Composio. + + Args: + file_id: Google Drive file ID. + + Returns: + Tuple of (file content bytes, error message). + """ + connected_account_id = await self.get_connected_account_id() + if not connected_account_id: + return None, "No connected account ID found" + + entity_id = await self.get_entity_id() + service = await self._get_service() + return await service.get_drive_file_content( + connected_account_id=connected_account_id, + entity_id=entity_id, + file_id=file_id, + ) + + async def get_drive_start_page_token(self) -> tuple[str | None, str | None]: + """ + Get the starting page token for Google Drive change tracking. + + Returns: + Tuple of (start_page_token, error message). + """ + connected_account_id = await self.get_connected_account_id() + if not connected_account_id: + return None, "No connected account ID found" + + entity_id = await self.get_entity_id() + service = await self._get_service() + return await service.get_drive_start_page_token( + connected_account_id=connected_account_id, + entity_id=entity_id, + ) + + async def list_drive_changes( + self, + page_token: str | None = None, + page_size: int = 100, + include_removed: bool = True, + ) -> tuple[list[dict[str, Any]], str | None, str | None]: + """ + List changes in Google Drive since the given page token. + + Args: + page_token: Page token from previous sync (optional). + page_size: Number of changes per page. + include_removed: Whether to include removed items. + + Returns: + Tuple of (changes list, new_start_page_token, error message). + """ + connected_account_id = await self.get_connected_account_id() + if not connected_account_id: + return [], None, "No connected account ID found" + + entity_id = await self.get_entity_id() + service = await self._get_service() + return await service.list_drive_changes( + connected_account_id=connected_account_id, + entity_id=entity_id, + page_token=page_token, + page_size=page_size, + include_removed=include_removed, + ) + + +# ============ File Processing Utilities ============ + + +async def _process_file_content( + content: bytes | str, + file_name: str, + file_id: str, + mime_type: str, + search_space_id: int, + user_id: str, + session: AsyncSession, + task_logger: TaskLoggingService, + log_entry: Log, + processing_errors: list[str], +) -> str: + """ + Process file content and return markdown text. + + For binary files (PDFs, images, etc.), uses Surfsense's ETL service. + For text files, decodes as UTF-8. + + Args: + content: File content as bytes or string + file_name: Name of the file + file_id: Google Drive file ID + mime_type: MIME type of the file + search_space_id: Search space ID + user_id: User ID + session: Database session + task_logger: Task logging service + log_entry: Log entry for tracking + processing_errors: List to append errors to + + Returns: + Markdown content string + """ + # Ensure content is bytes + if isinstance(content, str): + content = content.encode("utf-8") + + # Check if this is a binary file + if _is_binary_file(file_name, mime_type): + # Use ETL service for binary files (PDF, Office docs, etc.) + temp_file_path = None + try: + # Get file extension + extension = Path(file_name).suffix or ".bin" + + # Write to temp file + with tempfile.NamedTemporaryFile( + delete=False, suffix=extension + ) as tmp_file: + tmp_file.write(content) + temp_file_path = tmp_file.name + + # Use the configured ETL service to extract text + extracted_text = await _extract_text_with_etl( + temp_file_path, file_name, task_logger, log_entry + ) + + if extracted_text: + return extracted_text + else: + # Fallback if extraction fails + logger.warning(f"Could not extract text from binary file {file_name}") + return f"# {file_name}\n\n[Binary file - text extraction failed]\n\n**File ID:** {file_id}\n**Type:** {mime_type}\n" + + except Exception as e: + error_msg = f"Error processing binary file {file_name}: {e!s}" + logger.error(error_msg) + processing_errors.append(error_msg) + return f"# {file_name}\n\n[Binary file - processing error]\n\n**File ID:** {file_id}\n**Type:** {mime_type}\n" + finally: + # Cleanup temp file + if temp_file_path and os.path.exists(temp_file_path): + try: + os.unlink(temp_file_path) + except Exception as e: + logger.debug(f"Could not delete temp file {temp_file_path}: {e}") + else: + # Text file - try to decode as UTF-8 + try: + return content.decode("utf-8") + except UnicodeDecodeError: + # Try other encodings + for encoding in ["latin-1", "cp1252", "iso-8859-1"]: + try: + return content.decode(encoding) + except UnicodeDecodeError: + continue + + # If all encodings fail, treat as binary + error_msg = f"Could not decode text file {file_name} with any encoding" + logger.warning(error_msg) + processing_errors.append(error_msg) + return f"# {file_name}\n\n[File content could not be decoded]\n\n**File ID:** {file_id}\n**Type:** {mime_type}\n" + + +async def _extract_text_with_etl( + file_path: str, + file_name: str, + task_logger: TaskLoggingService, + log_entry: Log, +) -> str | None: + """ + Extract text from a file using the configured ETL service. + + Args: + file_path: Path to the file + file_name: Name of the file + task_logger: Task logging service + log_entry: Log entry for tracking + + Returns: + Extracted text as markdown, or None if extraction fails + """ + import warnings + from logging import ERROR, getLogger + + etl_service = config.ETL_SERVICE + + try: + if etl_service == "UNSTRUCTURED": + from langchain_unstructured import UnstructuredLoader + + from app.utils.document_converters import convert_document_to_markdown + + loader = UnstructuredLoader( + file_path, + mode="elements", + post_processors=[], + languages=["eng"], + include_orig_elements=False, + include_metadata=False, + strategy="auto", + ) + + docs = await loader.aload() + if docs: + return await convert_document_to_markdown(docs) + return None + + elif etl_service == "LLAMACLOUD": + from app.tasks.document_processors.file_processors import ( + parse_with_llamacloud_retry, + ) + + # Estimate pages (rough estimate based on file size) + file_size = os.path.getsize(file_path) + estimated_pages = max(1, file_size // (80 * 1024)) + + result = await parse_with_llamacloud_retry( + file_path=file_path, + estimated_pages=estimated_pages, + task_logger=task_logger, + log_entry=log_entry, + ) + + markdown_documents = await result.aget_markdown_documents( + split_by_page=False + ) + if markdown_documents: + return markdown_documents[0].text + return None + + elif etl_service == "DOCLING": + from app.services.docling_service import create_docling_service + + docling_service = create_docling_service() + + # Suppress pdfminer warnings + pdfminer_logger = getLogger("pdfminer") + original_level = pdfminer_logger.level + + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", category=UserWarning, module="pdfminer" + ) + warnings.filterwarnings( + "ignore", message=".*Cannot set gray non-stroke color.*" + ) + warnings.filterwarnings("ignore", message=".*invalid float value.*") + + pdfminer_logger.setLevel(ERROR) + + try: + result = await docling_service.process_document( + file_path, file_name + ) + finally: + pdfminer_logger.setLevel(original_level) + + return result.get("content") + else: + logger.warning(f"Unknown ETL service: {etl_service}") + return None + + except Exception as e: + logger.error(f"ETL extraction failed for {file_name}: {e!s}") + return None + + +# ============ Indexer Functions ============ + + +async def check_document_by_unique_identifier( + session: AsyncSession, unique_identifier_hash: str +) -> Document | None: + """Check if a document with the given unique identifier hash already exists.""" + from sqlalchemy.orm import selectinload + from sqlalchemy.future import select + + existing_doc_result = await session.execute( + select(Document) + .options(selectinload(Document.chunks)) + .where(Document.unique_identifier_hash == unique_identifier_hash) + ) + return existing_doc_result.scalars().first() + + +async def update_connector_last_indexed( + session: AsyncSession, + connector, + update_last_indexed: bool = True, +) -> None: + """Update the last_indexed_at timestamp for a connector.""" + if update_last_indexed: + connector.last_indexed_at = datetime.now( + UTC + ) # Use UTC for timezone consistency + logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}") + + +async def index_composio_google_drive( + session: AsyncSession, + connector, + connector_id: int, + search_space_id: int, + user_id: str, + task_logger: TaskLoggingService, + log_entry, + update_last_indexed: bool = True, + max_items: int = 1000, +) -> tuple[int, str]: + """Index Google Drive files via Composio with delta sync support. + + Delta Sync Flow: + 1. First sync: Full scan + get initial page token + 2. Subsequent syncs: Use LIST_CHANGES to process only changed files + + Supports folder/file selection via connector config: + - selected_folders: List of {id, name} for folders to index + - selected_files: List of {id, name} for individual files to index + - indexing_options: {max_files_per_folder, incremental_sync, include_subfolders} + """ + try: + composio_connector = ComposioGoogleDriveConnector(session, connector_id) + connector_config = await composio_connector.get_config() + + # Get folder/file selection configuration + selected_folders = connector_config.get("selected_folders", []) + selected_files = connector_config.get("selected_files", []) + indexing_options = connector_config.get("indexing_options", {}) + + # Check for stored page token for delta sync + stored_page_token = connector_config.get("drive_page_token") + use_delta_sync = stored_page_token and connector.last_indexed_at + + max_files_per_folder = indexing_options.get("max_files_per_folder", 100) + include_subfolders = indexing_options.get("include_subfolders", True) + + # Route to delta sync or full scan + if use_delta_sync: + logger.info(f"Using delta sync for Composio Google Drive connector {connector_id}") + await task_logger.log_task_progress( + log_entry, + f"Starting delta sync for Google Drive via Composio (connector {connector_id})", + {"stage": "delta_sync", "token": stored_page_token[:20] + "..."}, + ) + + documents_indexed, documents_skipped, processing_errors = await _index_composio_drive_delta_sync( + session=session, + composio_connector=composio_connector, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + page_token=stored_page_token, + max_items=max_items, + task_logger=task_logger, + log_entry=log_entry, + ) + else: + logger.info(f"Using full scan for Composio Google Drive connector {connector_id} (first sync or no token)") + await task_logger.log_task_progress( + log_entry, + f"Fetching Google Drive files via Composio for connector {connector_id}", + { + "stage": "full_scan", + "selected_folders": len(selected_folders), + "selected_files": len(selected_files), + }, + ) + + documents_indexed, documents_skipped, processing_errors = await _index_composio_drive_full_scan( + session=session, + composio_connector=composio_connector, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + selected_folders=selected_folders, + selected_files=selected_files, + max_files_per_folder=max_files_per_folder, + include_subfolders=include_subfolders, + max_items=max_items, + task_logger=task_logger, + log_entry=log_entry, + ) + + # Get new page token for next sync (always update after successful sync) + new_token, token_error = await composio_connector.get_drive_start_page_token() + if new_token and not token_error: + # Refresh connector to avoid stale state + await session.refresh(connector) + + if not connector.config: + connector.config = {} + connector.config["drive_page_token"] = new_token + flag_modified(connector, "config") + logger.info(f"Updated drive_page_token for connector {connector_id}") + elif token_error: + logger.warning(f"Failed to get new page token: {token_error}") + + # CRITICAL: Always update timestamp so Electric SQL syncs and UI shows indexed status + await update_connector_last_indexed(session, connector, update_last_indexed) + + # Final commit + logger.info(f"Final commit: Total {documents_indexed} Google Drive files processed") + await session.commit() + logger.info("Successfully committed all Composio Google Drive document changes to database") + + # Handle processing errors + error_message = None + if processing_errors: + if len(processing_errors) == 1: + error_message = processing_errors[0] + else: + error_message = f"Failed to process {len(processing_errors)} file(s). First error: {processing_errors[0]}" + await task_logger.log_task_failure( + log_entry, + f"Completed Google Drive indexing with {len(processing_errors)} error(s) for connector {connector_id}", + { + "documents_indexed": documents_indexed, + "documents_skipped": documents_skipped, + "sync_type": "delta" if use_delta_sync else "full", + "errors": processing_errors, + }, + ) + else: + await task_logger.log_task_success( + log_entry, + f"Successfully completed Google Drive indexing via Composio for connector {connector_id}", + { + "documents_indexed": documents_indexed, + "documents_skipped": documents_skipped, + "sync_type": "delta" if use_delta_sync else "full", + }, + ) + + return documents_indexed, error_message + + except Exception as e: + logger.error(f"Failed to index Google Drive via Composio: {e!s}", exc_info=True) + return 0, f"Failed to index Google Drive via Composio: {e!s}" + + +async def _index_composio_drive_delta_sync( + session: AsyncSession, + composio_connector: ComposioGoogleDriveConnector, + connector_id: int, + search_space_id: int, + user_id: str, + page_token: str, + max_items: int, + task_logger: TaskLoggingService, + log_entry, +) -> tuple[int, int, list[str]]: + """Index Google Drive files using delta sync (only changed files). + + Uses GOOGLEDRIVE_LIST_CHANGES to fetch only files that changed since last sync. + Handles: new files, modified files, and deleted files. + """ + documents_indexed = 0 + documents_skipped = 0 + processing_errors = [] + + # Fetch all changes with pagination + all_changes = [] + current_token = page_token + + while len(all_changes) < max_items: + changes, next_token, error = await composio_connector.list_drive_changes( + page_token=current_token, + page_size=100, + include_removed=True, + ) + + if error: + logger.error(f"Error fetching Drive changes: {error}") + processing_errors.append(f"Failed to fetch changes: {error}") + break + + all_changes.extend(changes) + + if not next_token or next_token == current_token: + break + current_token = next_token + + if not all_changes: + logger.info("No changes detected since last sync") + return 0, 0, [] + + logger.info(f"Processing {len(all_changes)} changes from delta sync") + + for change in all_changes[:max_items]: + try: + # Handle removed files + is_removed = change.get("removed", False) + file_info = change.get("file", {}) + file_id = change.get("fileId") or file_info.get("id", "") + + if not file_id: + documents_skipped += 1 + continue + + # Check if file was trashed or removed + if is_removed or file_info.get("trashed", False): + # Remove document from database + document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]) + unique_identifier_hash = generate_unique_identifier_hash( + document_type, f"drive_{file_id}", search_space_id + ) + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + if existing_document: + await session.delete(existing_document) + documents_indexed += 1 + logger.info(f"Deleted document for removed/trashed file: {file_id}") + continue + + # Process changed file + file_name = file_info.get("name", "") or "Untitled" + mime_type = file_info.get("mimeType", "") or file_info.get("mime_type", "") + + # Skip folders + if mime_type == "application/vnd.google-apps.folder": + continue + + # Process the file + indexed, skipped, errors = await _process_single_drive_file( + session=session, + composio_connector=composio_connector, + file_id=file_id, + file_name=file_name, + mime_type=mime_type, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + task_logger=task_logger, + log_entry=log_entry, + ) + + documents_indexed += indexed + documents_skipped += skipped + processing_errors.extend(errors) + + # Batch commit every 10 documents + if documents_indexed > 0 and documents_indexed % 10 == 0: + await session.commit() + logger.info(f"Committed batch: {documents_indexed} changes processed") + + except Exception as e: + error_msg = f"Error processing change for file {file_id}: {e!s}" + logger.error(error_msg, exc_info=True) + processing_errors.append(error_msg) + documents_skipped += 1 + + logger.info(f"Delta sync complete: {documents_indexed} indexed, {documents_skipped} skipped") + return documents_indexed, documents_skipped, processing_errors + + +async def _index_composio_drive_full_scan( + session: AsyncSession, + composio_connector: ComposioGoogleDriveConnector, + connector_id: int, + search_space_id: int, + user_id: str, + selected_folders: list[dict], + selected_files: list[dict], + max_files_per_folder: int, + include_subfolders: bool, + max_items: int, + task_logger: TaskLoggingService, + log_entry, +) -> tuple[int, int, list[str]]: + """Index Google Drive files using full scan (first sync or when no delta token).""" + documents_indexed = 0 + documents_skipped = 0 + processing_errors = [] + + all_files = [] + + # If specific folders/files are selected, fetch from those + if selected_folders or selected_files: + # Fetch files from selected folders + for folder in selected_folders: + folder_id = folder.get("id") + folder_name = folder.get("name", "Unknown") + + if not folder_id: + continue + + # Handle special case for "root" folder + actual_folder_id = None if folder_id == "root" else folder_id + + logger.info(f"Fetching files from folder: {folder_name} ({folder_id})") + + # Fetch files from this folder + folder_files = [] + page_token = None + + while len(folder_files) < max_files_per_folder: + ( + files, + next_token, + error, + ) = await composio_connector.list_drive_files( + folder_id=actual_folder_id, + page_token=page_token, + page_size=min(100, max_files_per_folder - len(folder_files)), + ) + + if error: + logger.warning( + f"Failed to fetch files from folder {folder_name}: {error}" + ) + break + + # Process files + for file_info in files: + mime_type = file_info.get("mimeType", "") or file_info.get( + "mime_type", "" + ) + + # If it's a folder and include_subfolders is enabled, recursively fetch + if mime_type == "application/vnd.google-apps.folder": + if include_subfolders: + # Add subfolder files recursively + subfolder_files = await _fetch_folder_files_recursively( + composio_connector, + file_info.get("id"), + max_files=max_files_per_folder, + current_count=len(folder_files), + ) + folder_files.extend(subfolder_files) + else: + folder_files.append(file_info) + + if not next_token: + break + page_token = next_token + + all_files.extend(folder_files[:max_files_per_folder]) + logger.info(f"Found {len(folder_files)} files in folder {folder_name}") + + # Add specifically selected files + for selected_file in selected_files: + file_id = selected_file.get("id") + file_name = selected_file.get("name", "Unknown") + + if not file_id: + continue + + # Add file info (we'll fetch content later during indexing) + all_files.append( + { + "id": file_id, + "name": file_name, + "mimeType": "", # Will be determined later + } + ) + else: + # No selection specified - fetch all files (original behavior) + page_token = None + + while len(all_files) < max_items: + files, next_token, error = await composio_connector.list_drive_files( + page_token=page_token, + page_size=min(100, max_items - len(all_files)), + ) + + if error: + return 0, 0, [f"Failed to fetch Drive files: {error}"] + + all_files.extend(files) + + if not next_token: + break + page_token = next_token + + if not all_files: + logger.info("No Google Drive files found") + return 0, 0, [] + + logger.info(f"Found {len(all_files)} Google Drive files to index via Composio (full scan)") + + for file_info in all_files: + try: + # Handle both standard Google API and potential Composio variations + file_id = file_info.get("id", "") or file_info.get("fileId", "") + file_name = ( + file_info.get("name", "") + or file_info.get("fileName", "") + or "Untitled" + ) + mime_type = file_info.get("mimeType", "") or file_info.get( + "mime_type", "" + ) + + if not file_id: + documents_skipped += 1 + continue + + # Skip folders + if mime_type == "application/vnd.google-apps.folder": + continue + + # Process the file + indexed, skipped, errors = await _process_single_drive_file( + session=session, + composio_connector=composio_connector, + file_id=file_id, + file_name=file_name, + mime_type=mime_type, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + task_logger=task_logger, + log_entry=log_entry, + ) + + documents_indexed += indexed + documents_skipped += skipped + processing_errors.extend(errors) + + # Batch commit every 10 documents + if documents_indexed > 0 and documents_indexed % 10 == 0: + logger.info(f"Committing batch: {documents_indexed} Google Drive files processed so far") + await session.commit() + + except Exception as e: + error_msg = f"Error processing Drive file {file_name or 'unknown'}: {e!s}" + logger.error(error_msg, exc_info=True) + processing_errors.append(error_msg) + documents_skipped += 1 + + logger.info(f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped") + return documents_indexed, documents_skipped, processing_errors + + +async def _process_single_drive_file( + session: AsyncSession, + composio_connector: ComposioGoogleDriveConnector, + file_id: str, + file_name: str, + mime_type: str, + connector_id: int, + search_space_id: int, + user_id: str, + task_logger: TaskLoggingService, + log_entry, +) -> tuple[int, int, list[str]]: + """Process a single Google Drive file for indexing. + + Returns: + Tuple of (documents_indexed, documents_skipped, processing_errors) + """ + processing_errors = [] + + # Generate unique identifier hash + document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]) + unique_identifier_hash = generate_unique_identifier_hash( + document_type, f"drive_{file_id}", search_space_id + ) + + # Check if document exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + + # Get file content + content, content_error = await composio_connector.get_drive_file_content(file_id) + + if content_error or not content: + logger.warning( + f"Could not get content for file {file_name}: {content_error}" + ) + # Use metadata as content fallback + markdown_content = f"# {file_name}\n\n" + markdown_content += f"**File ID:** {file_id}\n" + markdown_content += f"**Type:** {mime_type}\n" + elif isinstance(content, dict): + # Safety check: if content is still a dict, log error and use fallback + error_msg = f"Unexpected dict content format for file {file_name}: {list(content.keys())}" + logger.error(error_msg) + processing_errors.append(error_msg) + markdown_content = f"# {file_name}\n\n" + markdown_content += f"**File ID:** {file_id}\n" + markdown_content += f"**Type:** {mime_type}\n" + else: + # Process content based on file type + markdown_content = await _process_file_content( + content=content, + file_name=file_name, + file_id=file_id, + mime_type=mime_type, + search_space_id=search_space_id, + user_id=user_id, + session=session, + task_logger=task_logger, + log_entry=log_entry, + processing_errors=processing_errors, + ) + + content_hash = generate_content_hash(markdown_content, search_space_id) + + if existing_document: + if existing_document.content_hash == content_hash: + return 0, 1, processing_errors # Skipped + + # Update existing document + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata = { + "file_id": file_id, + "file_name": file_name, + "mime_type": mime_type, + "document_type": "Google Drive File (Composio)", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + markdown_content, user_llm, document_metadata + ) + else: + summary_content = ( + f"Google Drive File: {file_name}\n\nType: {mime_type}" + ) + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + chunks = await create_document_chunks(markdown_content) + + existing_document.title = f"Drive: {file_name}" + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = { + "file_id": file_id, + "file_name": file_name, + "FILE_NAME": file_name, # For compatibility + "mime_type": mime_type, + "connector_id": connector_id, + "source": "composio", + } + existing_document.chunks = chunks + existing_document.updated_at = get_current_timestamp() + + return 1, 0, processing_errors # Indexed + + # Create new document + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata = { + "file_id": file_id, + "file_name": file_name, + "mime_type": mime_type, + "document_type": "Google Drive File (Composio)", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + markdown_content, user_llm, document_metadata + ) + else: + summary_content = ( + f"Google Drive File: {file_name}\n\nType: {mime_type}" + ) + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + chunks = await create_document_chunks(markdown_content) + + document = Document( + search_space_id=search_space_id, + title=f"Drive: {file_name}", + document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]), + document_metadata={ + "file_id": file_id, + "file_name": file_name, + "FILE_NAME": file_name, # For compatibility + "mime_type": mime_type, + "connector_id": connector_id, + "toolkit_id": "googledrive", + "source": "composio", + }, + content=summary_content, + content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, + embedding=summary_embedding, + chunks=chunks, + updated_at=get_current_timestamp(), + ) + session.add(document) + + return 1, 0, processing_errors # Indexed + + +async def _fetch_folder_files_recursively( + composio_connector: ComposioGoogleDriveConnector, + folder_id: str, + max_files: int = 100, + current_count: int = 0, + depth: int = 0, + max_depth: int = 10, +) -> list[dict[str, Any]]: + """ + Recursively fetch files from a Google Drive folder via Composio. + + Args: + composio_connector: The Composio connector instance + folder_id: Google Drive folder ID + max_files: Maximum number of files to fetch + current_count: Current number of files already fetched + depth: Current recursion depth + max_depth: Maximum recursion depth to prevent infinite loops + + Returns: + List of file info dictionaries + """ + if depth >= max_depth: + logger.warning(f"Max recursion depth reached for folder {folder_id}") + return [] + + if current_count >= max_files: + return [] + + all_files = [] + page_token = None + + try: + while len(all_files) + current_count < max_files: + files, next_token, error = await composio_connector.list_drive_files( + folder_id=folder_id, + page_token=page_token, + page_size=min(100, max_files - len(all_files) - current_count), + ) + + if error: + logger.warning( + f"Error fetching files from subfolder {folder_id}: {error}" + ) + break + + for file_info in files: + mime_type = file_info.get("mimeType", "") or file_info.get( + "mime_type", "" + ) + + if mime_type == "application/vnd.google-apps.folder": + # Recursively fetch from subfolders + subfolder_files = await _fetch_folder_files_recursively( + composio_connector, + file_info.get("id"), + max_files=max_files, + current_count=current_count + len(all_files), + depth=depth + 1, + max_depth=max_depth, + ) + all_files.extend(subfolder_files) + else: + all_files.append(file_info) + + if len(all_files) + current_count >= max_files: + break + + if not next_token: + break + page_token = next_token + + return all_files[: max_files - current_count] + + except Exception as e: + logger.error(f"Error in recursive folder fetch: {e!s}") + return all_files + diff --git a/surfsense_backend/app/routes/composio_routes.py b/surfsense_backend/app/routes/composio_routes.py index 5af332760..5ad2266b7 100644 --- a/surfsense_backend/app/routes/composio_routes.py +++ b/surfsense_backend/app/routes/composio_routes.py @@ -46,6 +46,13 @@ logger = logging.getLogger(__name__) router = APIRouter() +# Map toolkit_id to frontend connector ID +TOOLKIT_TO_FRONTEND_CONNECTOR_ID = { + "googledrive": "composio-googledrive", + "gmail": "composio-gmail", + "googlecalendar": "composio-googlecalendar", +} + # Initialize security utilities _state_manager = None @@ -327,8 +334,12 @@ async def composio_callback( await session.commit() await session.refresh(existing_connector) + # Get the frontend connector ID based on toolkit_id + frontend_connector_id = TOOLKIT_TO_FRONTEND_CONNECTOR_ID.get( + toolkit_id, "composio-connector" + ) return RedirectResponse( - url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&success=true&connector=composio-connector&connectorId={existing_connector.id}" + url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&success=true&connector={frontend_connector_id}&connectorId={existing_connector.id}" ) try: @@ -358,8 +369,12 @@ async def composio_callback( f"Successfully created Composio connector {db_connector.id} for user {user_id}, toolkit {toolkit_id}" ) + # Get the frontend connector ID based on toolkit_id + frontend_connector_id = TOOLKIT_TO_FRONTEND_CONNECTOR_ID.get( + toolkit_id, "composio-connector" + ) return RedirectResponse( - url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&success=true&connector=composio-connector&connectorId={db_connector.id}" + url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&success=true&connector={frontend_connector_id}&connectorId={db_connector.id}" ) except IntegrityError as e: diff --git a/surfsense_backend/app/services/composio_service.py b/surfsense_backend/app/services/composio_service.py index 3810f03a4..3ea2d1bf2 100644 --- a/surfsense_backend/app/services/composio_service.py +++ b/surfsense_backend/app/services/composio_service.py @@ -53,6 +53,27 @@ TOOLKIT_TO_DOCUMENT_TYPE = { "googlecalendar": "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR", } +# Mapping of toolkit IDs to their indexer functions +# Format: toolkit_id -> (module_path, function_name, supports_date_filter) +# supports_date_filter: True if the indexer accepts start_date/end_date params +TOOLKIT_TO_INDEXER = { + "googledrive": ( + "app.connectors.composio_google_drive_connector", + "index_composio_google_drive", + False, # Google Drive doesn't use date filtering + ), + "gmail": ( + "app.connectors.composio_gmail_connector", + "index_composio_gmail", + True, # Gmail uses date filtering + ), + "googlecalendar": ( + "app.connectors.composio_google_calendar_connector", + "index_composio_google_calendar", + True, # Calendar uses date filtering + ), +} + class ComposioService: """Service for interacting with Composio API.""" diff --git a/surfsense_backend/app/tasks/composio_indexer.py b/surfsense_backend/app/tasks/composio_indexer.py index 3eed8470e..f97652114 100644 --- a/surfsense_backend/app/tasks/composio_indexer.py +++ b/surfsense_backend/app/tasks/composio_indexer.py @@ -2,65 +2,39 @@ Composio connector indexer. Routes indexing requests to toolkit-specific handlers (Google Drive, Gmail, Calendar). +Uses a registry pattern for clean, extensible connector routing. Note: This module is intentionally placed in app/tasks/ (not in connector_indexers/) to avoid circular import issues with the connector_indexers package. """ import logging -import os -import tempfile -from datetime import UTC, datetime -from pathlib import Path -from typing import Any +from importlib import import_module from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select -from sqlalchemy.orm import selectinload -from app.config import config -from app.connectors.composio_connector import ComposioConnector from app.db import ( - Document, - DocumentType, - Log, SearchSourceConnector, SearchSourceConnectorType, ) -from app.services.composio_service import INDEXABLE_TOOLKITS, TOOLKIT_TO_DOCUMENT_TYPE -from app.services.llm_service import get_user_long_context_llm +from app.services.composio_service import INDEXABLE_TOOLKITS, TOOLKIT_TO_INDEXER from app.services.task_logging_service import TaskLoggingService -from app.tasks.connector_indexers.base import calculate_date_range -from app.utils.document_converters import ( - create_document_chunks, - generate_content_hash, - generate_document_summary, - generate_unique_identifier_hash, -) # Set up logging logger = logging.getLogger(__name__) -# ============ Utility functions (copied from connector_indexers.base to avoid circular imports) ============ +# Valid Composio connector types +COMPOSIO_CONNECTOR_TYPES = { + SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, + SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR, + SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR, +} -def get_current_timestamp() -> datetime: - """Get the current timestamp with timezone for updated_at field.""" - return datetime.now(UTC) - - -async def check_document_by_unique_identifier( - session: AsyncSession, unique_identifier_hash: str -) -> Document | None: - """Check if a document with the given unique identifier hash already exists.""" - existing_doc_result = await session.execute( - select(Document) - .options(selectinload(Document.chunks)) - .where(Document.unique_identifier_hash == unique_identifier_hash) - ) - return existing_doc_result.scalars().first() +# ============ Utility functions ============ async def get_connector_by_id( @@ -78,312 +52,26 @@ async def get_connector_by_id( return result.scalars().first() -async def update_connector_last_indexed( - session: AsyncSession, - connector: SearchSourceConnector, - update_last_indexed: bool = True, -) -> None: - """Update the last_indexed_at timestamp for a connector.""" - if update_last_indexed: - connector.last_indexed_at = datetime.now( - UTC - ) # Use UTC for timezone consistency - logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}") - - -# Binary file extensions that need file processor -BINARY_FILE_EXTENSIONS = { - ".pdf", - ".doc", - ".docx", - ".xls", - ".xlsx", - ".ppt", - ".pptx", - ".png", - ".jpg", - ".jpeg", - ".gif", - ".bmp", - ".tiff", - ".webp", - ".zip", - ".tar", - ".gz", - ".rar", - ".7z", - ".mp3", - ".mp4", - ".wav", - ".avi", - ".mov", - ".exe", - ".dll", - ".so", - ".bin", -} - -# Text file extensions that can be decoded as UTF-8 -TEXT_FILE_EXTENSIONS = { - ".txt", - ".md", - ".markdown", - ".json", - ".xml", - ".html", - ".htm", - ".css", - ".js", - ".ts", - ".py", - ".java", - ".c", - ".cpp", - ".h", - ".yaml", - ".yml", - ".toml", - ".ini", - ".cfg", - ".conf", - ".sh", - ".bash", - ".zsh", - ".fish", - ".sql", - ".csv", - ".tsv", - ".rst", - ".tex", - ".log", -} - - -def _is_binary_file(file_name: str, mime_type: str) -> bool: - """Check if a file is binary based on extension or mime type.""" - extension = Path(file_name).suffix.lower() - - # Check extension first - if extension in BINARY_FILE_EXTENSIONS: - return True - if extension in TEXT_FILE_EXTENSIONS: - return False - - # Check mime type - if mime_type: - if mime_type.startswith(("image/", "audio/", "video/", "application/pdf")): - return True - if mime_type.startswith(("text/", "application/json", "application/xml")): - return False - # Office documents - if ( - "spreadsheet" in mime_type - or "document" in mime_type - or "presentation" in mime_type - ): - return True - - # Default to text for unknown types - return False - - -async def _process_file_content( - content: bytes | str, - file_name: str, - file_id: str, - mime_type: str, - search_space_id: int, - user_id: str, - session: AsyncSession, - task_logger: TaskLoggingService, - log_entry: Log, - processing_errors: list[str], -) -> str: +def get_indexer_function(toolkit_id: str): """ - Process file content and return markdown text. - - For binary files (PDFs, images, etc.), uses Surfsense's ETL service. - For text files, decodes as UTF-8. + Dynamically import and return the indexer function for a toolkit. Args: - content: File content as bytes or string - file_name: Name of the file - file_id: Google Drive file ID - mime_type: MIME type of the file - search_space_id: Search space ID - user_id: User ID - session: Database session - task_logger: Task logging service - log_entry: Log entry for tracking - processing_errors: List to append errors to + toolkit_id: The toolkit ID (e.g., "googledrive", "gmail") Returns: - Markdown content string + Tuple of (indexer_function, supports_date_filter) + + Raises: + ValueError: If toolkit not found in registry """ - # Ensure content is bytes - if isinstance(content, str): - content = content.encode("utf-8") + if toolkit_id not in TOOLKIT_TO_INDEXER: + raise ValueError(f"No indexer registered for toolkit: {toolkit_id}") - # Check if this is a binary file - if _is_binary_file(file_name, mime_type): - # Use ETL service for binary files (PDF, Office docs, etc.) - temp_file_path = None - try: - # Get file extension - extension = Path(file_name).suffix or ".bin" - - # Write to temp file - with tempfile.NamedTemporaryFile( - delete=False, suffix=extension - ) as tmp_file: - tmp_file.write(content) - temp_file_path = tmp_file.name - - # Use the configured ETL service to extract text - extracted_text = await _extract_text_with_etl( - temp_file_path, file_name, task_logger, log_entry - ) - - if extracted_text: - return extracted_text - else: - # Fallback if extraction fails - logger.warning(f"Could not extract text from binary file {file_name}") - return f"# {file_name}\n\n[Binary file - text extraction failed]\n\n**File ID:** {file_id}\n**Type:** {mime_type}\n" - - except Exception as e: - error_msg = f"Error processing binary file {file_name}: {e!s}" - logger.error(error_msg) - processing_errors.append(error_msg) - return f"# {file_name}\n\n[Binary file - processing error]\n\n**File ID:** {file_id}\n**Type:** {mime_type}\n" - finally: - # Cleanup temp file - if temp_file_path and os.path.exists(temp_file_path): - try: - os.unlink(temp_file_path) - except Exception as e: - logger.debug(f"Could not delete temp file {temp_file_path}: {e}") - else: - # Text file - try to decode as UTF-8 - try: - return content.decode("utf-8") - except UnicodeDecodeError: - # Try other encodings - for encoding in ["latin-1", "cp1252", "iso-8859-1"]: - try: - return content.decode(encoding) - except UnicodeDecodeError: - continue - - # If all encodings fail, treat as binary - error_msg = f"Could not decode text file {file_name} with any encoding" - logger.warning(error_msg) - processing_errors.append(error_msg) - return f"# {file_name}\n\n[File content could not be decoded]\n\n**File ID:** {file_id}\n**Type:** {mime_type}\n" - - -async def _extract_text_with_etl( - file_path: str, - file_name: str, - task_logger: TaskLoggingService, - log_entry: Log, -) -> str | None: - """ - Extract text from a file using the configured ETL service. - - Args: - file_path: Path to the file - file_name: Name of the file - task_logger: Task logging service - log_entry: Log entry for tracking - - Returns: - Extracted text as markdown, or None if extraction fails - """ - import warnings - from logging import ERROR, getLogger - - etl_service = config.ETL_SERVICE - - try: - if etl_service == "UNSTRUCTURED": - from langchain_unstructured import UnstructuredLoader - - from app.utils.document_converters import convert_document_to_markdown - - loader = UnstructuredLoader( - file_path, - mode="elements", - post_processors=[], - languages=["eng"], - include_orig_elements=False, - include_metadata=False, - strategy="auto", - ) - - docs = await loader.aload() - if docs: - return await convert_document_to_markdown(docs) - return None - - elif etl_service == "LLAMACLOUD": - from app.tasks.document_processors.file_processors import ( - parse_with_llamacloud_retry, - ) - - # Estimate pages (rough estimate based on file size) - file_size = os.path.getsize(file_path) - estimated_pages = max(1, file_size // (80 * 1024)) - - result = await parse_with_llamacloud_retry( - file_path=file_path, - estimated_pages=estimated_pages, - task_logger=task_logger, - log_entry=log_entry, - ) - - markdown_documents = await result.aget_markdown_documents( - split_by_page=False - ) - if markdown_documents: - return markdown_documents[0].text - return None - - elif etl_service == "DOCLING": - from app.services.docling_service import create_docling_service - - docling_service = create_docling_service() - - # Suppress pdfminer warnings - pdfminer_logger = getLogger("pdfminer") - original_level = pdfminer_logger.level - - with warnings.catch_warnings(): - warnings.filterwarnings( - "ignore", category=UserWarning, module="pdfminer" - ) - warnings.filterwarnings( - "ignore", message=".*Cannot set gray non-stroke color.*" - ) - warnings.filterwarnings("ignore", message=".*invalid float value.*") - - pdfminer_logger.setLevel(ERROR) - - try: - result = await docling_service.process_document( - file_path, file_name - ) - finally: - pdfminer_logger.setLevel(original_level) - - return result.get("content") - else: - logger.warning(f"Unknown ETL service: {etl_service}") - return None - - except Exception as e: - logger.error(f"ETL extraction failed for {file_name}: {e!s}") - return None + module_path, function_name, supports_date_filter = TOOLKIT_TO_INDEXER[toolkit_id] + module = import_module(module_path) + indexer_func = getattr(module, function_name) + return indexer_func, supports_date_filter # ============ Main indexer function ============ @@ -403,6 +91,7 @@ async def index_composio_connector( Index content from a Composio connector. Routes to toolkit-specific indexing based on the connector's toolkit_id. + Uses a registry pattern for clean, extensible connector routing. Args: session: Database session @@ -435,19 +124,10 @@ async def index_composio_connector( try: # Get connector by id - accept any Composio connector type - # We'll check the actual type after loading - connector = await get_connector_by_id( - session, - connector_id, - None, # Don't filter by type, we'll validate after - ) + connector = await get_connector_by_id(session, connector_id, None) # Validate it's a Composio connector - if connector and connector.connector_type not in [ - SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, - SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR, - SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR, - ]: + if connector and connector.connector_type not in COMPOSIO_CONNECTOR_TYPES: error_msg = f"Connector {connector_id} is not a Composio connector" await task_logger.log_task_failure( log_entry, error_msg, {"error_type": "InvalidConnectorType"} @@ -480,53 +160,35 @@ async def index_composio_connector( ) return 0, error_msg - # Route to toolkit-specific indexer - if toolkit_id == "googledrive": - return await _index_composio_google_drive( - session=session, - connector=connector, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - task_logger=task_logger, - log_entry=log_entry, - update_last_indexed=update_last_indexed, - max_items=max_items, - ) - elif toolkit_id == "gmail": - return await _index_composio_gmail( - session=session, - connector=connector, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - start_date=start_date, - end_date=end_date, - task_logger=task_logger, - log_entry=log_entry, - update_last_indexed=update_last_indexed, - max_items=max_items, - ) - elif toolkit_id == "googlecalendar": - return await _index_composio_google_calendar( - session=session, - connector=connector, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - start_date=start_date, - end_date=end_date, - task_logger=task_logger, - log_entry=log_entry, - update_last_indexed=update_last_indexed, - max_items=max_items, - ) - else: - error_msg = f"No indexer implemented for toolkit: {toolkit_id}" + # Get indexer function from registry + try: + indexer_func, supports_date_filter = get_indexer_function(toolkit_id) + except ValueError as e: await task_logger.log_task_failure( - log_entry, error_msg, {"error_type": "NoIndexerImplemented"} + log_entry, str(e), {"error_type": "NoIndexerImplemented"} ) - return 0, error_msg + return 0, str(e) + + # Build kwargs for the indexer function + kwargs = { + "session": session, + "connector": connector, + "connector_id": connector_id, + "search_space_id": search_space_id, + "user_id": user_id, + "task_logger": task_logger, + "log_entry": log_entry, + "update_last_indexed": update_last_indexed, + "max_items": max_items, + } + + # Add date params for toolkits that support them + if supports_date_filter: + kwargs["start_date"] = start_date + kwargs["end_date"] = end_date + + # Call the toolkit-specific indexer + return await indexer_func(**kwargs) except SQLAlchemyError as db_error: await session.rollback() @@ -548,1378 +210,3 @@ async def index_composio_connector( ) logger.error(f"Failed to index Composio connector: {e!s}", exc_info=True) return 0, f"Failed to index Composio connector: {e!s}" - - -async def _index_composio_google_drive( - session: AsyncSession, - connector, - connector_id: int, - search_space_id: int, - user_id: str, - task_logger: TaskLoggingService, - log_entry, - update_last_indexed: bool = True, - max_items: int = 1000, -) -> tuple[int, str]: - """Index Google Drive files via Composio with delta sync support. - - Delta Sync Flow: - 1. First sync: Full scan + get initial page token - 2. Subsequent syncs: Use LIST_CHANGES to process only changed files - - Supports folder/file selection via connector config: - - selected_folders: List of {id, name} for folders to index - - selected_files: List of {id, name} for individual files to index - - indexing_options: {max_files_per_folder, incremental_sync, include_subfolders} - """ - try: - composio_connector = ComposioConnector(session, connector_id) - connector_config = await composio_connector.get_config() - - # Get folder/file selection configuration - selected_folders = connector_config.get("selected_folders", []) - selected_files = connector_config.get("selected_files", []) - indexing_options = connector_config.get("indexing_options", {}) - - # Check for stored page token for delta sync - stored_page_token = connector_config.get("drive_page_token") - use_delta_sync = stored_page_token and connector.last_indexed_at - - max_files_per_folder = indexing_options.get("max_files_per_folder", 100) - include_subfolders = indexing_options.get("include_subfolders", True) - - # Route to delta sync or full scan - if use_delta_sync: - logger.info(f"Using delta sync for Composio Google Drive connector {connector_id}") - await task_logger.log_task_progress( - log_entry, - f"Starting delta sync for Google Drive via Composio (connector {connector_id})", - {"stage": "delta_sync", "token": stored_page_token[:20] + "..."}, - ) - - documents_indexed, documents_skipped, processing_errors = await _index_composio_drive_delta_sync( - session=session, - composio_connector=composio_connector, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - page_token=stored_page_token, - max_items=max_items, - task_logger=task_logger, - log_entry=log_entry, - ) - else: - logger.info(f"Using full scan for Composio Google Drive connector {connector_id} (first sync or no token)") - await task_logger.log_task_progress( - log_entry, - f"Fetching Google Drive files via Composio for connector {connector_id}", - { - "stage": "full_scan", - "selected_folders": len(selected_folders), - "selected_files": len(selected_files), - }, - ) - - documents_indexed, documents_skipped, processing_errors = await _index_composio_drive_full_scan( - session=session, - composio_connector=composio_connector, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - selected_folders=selected_folders, - selected_files=selected_files, - max_files_per_folder=max_files_per_folder, - include_subfolders=include_subfolders, - max_items=max_items, - task_logger=task_logger, - log_entry=log_entry, - ) - - # Get new page token for next sync (always update after successful sync) - new_token, token_error = await composio_connector.get_drive_start_page_token() - if new_token and not token_error: - from sqlalchemy.orm.attributes import flag_modified - - # Refresh connector to avoid stale state - await session.refresh(connector) - - if not connector.config: - connector.config = {} - connector.config["drive_page_token"] = new_token - flag_modified(connector, "config") - logger.info(f"Updated drive_page_token for connector {connector_id}") - elif token_error: - logger.warning(f"Failed to get new page token: {token_error}") - - # CRITICAL: Always update timestamp so Electric SQL syncs and UI shows indexed status - await update_connector_last_indexed(session, connector, update_last_indexed) - - # Final commit - logger.info(f"Final commit: Total {documents_indexed} Google Drive files processed") - await session.commit() - logger.info("Successfully committed all Composio Google Drive document changes to database") - - # Handle processing errors - error_message = None - if processing_errors: - if len(processing_errors) == 1: - error_message = processing_errors[0] - else: - error_message = f"Failed to process {len(processing_errors)} file(s). First error: {processing_errors[0]}" - await task_logger.log_task_failure( - log_entry, - f"Completed Google Drive indexing with {len(processing_errors)} error(s) for connector {connector_id}", - { - "documents_indexed": documents_indexed, - "documents_skipped": documents_skipped, - "sync_type": "delta" if use_delta_sync else "full", - "errors": processing_errors, - }, - ) - else: - await task_logger.log_task_success( - log_entry, - f"Successfully completed Google Drive indexing via Composio for connector {connector_id}", - { - "documents_indexed": documents_indexed, - "documents_skipped": documents_skipped, - "sync_type": "delta" if use_delta_sync else "full", - }, - ) - - return documents_indexed, error_message - - except Exception as e: - logger.error(f"Failed to index Google Drive via Composio: {e!s}", exc_info=True) - return 0, f"Failed to index Google Drive via Composio: {e!s}" - - -async def _index_composio_drive_delta_sync( - session: AsyncSession, - composio_connector: ComposioConnector, - connector_id: int, - search_space_id: int, - user_id: str, - page_token: str, - max_items: int, - task_logger: TaskLoggingService, - log_entry, -) -> tuple[int, int, list[str]]: - """Index Google Drive files using delta sync (only changed files). - - Uses GOOGLEDRIVE_LIST_CHANGES to fetch only files that changed since last sync. - Handles: new files, modified files, and deleted files. - """ - documents_indexed = 0 - documents_skipped = 0 - processing_errors = [] - - # Fetch all changes with pagination - all_changes = [] - current_token = page_token - - while len(all_changes) < max_items: - changes, next_token, error = await composio_connector.list_drive_changes( - page_token=current_token, - page_size=100, - include_removed=True, - ) - - if error: - logger.error(f"Error fetching Drive changes: {error}") - processing_errors.append(f"Failed to fetch changes: {error}") - break - - all_changes.extend(changes) - - if not next_token or next_token == current_token: - break - current_token = next_token - - if not all_changes: - logger.info("No changes detected since last sync") - return 0, 0, [] - - logger.info(f"Processing {len(all_changes)} changes from delta sync") - - for change in all_changes[:max_items]: - try: - # Handle removed files - is_removed = change.get("removed", False) - file_info = change.get("file", {}) - file_id = change.get("fileId") or file_info.get("id", "") - - if not file_id: - documents_skipped += 1 - continue - - # Check if file was trashed or removed - if is_removed or file_info.get("trashed", False): - # Remove document from database - document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]) - unique_identifier_hash = generate_unique_identifier_hash( - document_type, f"drive_{file_id}", search_space_id - ) - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - if existing_document: - await session.delete(existing_document) - documents_indexed += 1 - logger.info(f"Deleted document for removed/trashed file: {file_id}") - continue - - # Process changed file - file_name = file_info.get("name", "") or "Untitled" - mime_type = file_info.get("mimeType", "") or file_info.get("mime_type", "") - - # Skip folders - if mime_type == "application/vnd.google-apps.folder": - continue - - # Process the file - indexed, skipped, errors = await _process_single_drive_file( - session=session, - composio_connector=composio_connector, - file_id=file_id, - file_name=file_name, - mime_type=mime_type, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - task_logger=task_logger, - log_entry=log_entry, - ) - - documents_indexed += indexed - documents_skipped += skipped - processing_errors.extend(errors) - - # Batch commit every 10 documents - if documents_indexed > 0 and documents_indexed % 10 == 0: - await session.commit() - logger.info(f"Committed batch: {documents_indexed} changes processed") - - except Exception as e: - error_msg = f"Error processing change for file {file_id}: {e!s}" - logger.error(error_msg, exc_info=True) - processing_errors.append(error_msg) - documents_skipped += 1 - - logger.info(f"Delta sync complete: {documents_indexed} indexed, {documents_skipped} skipped") - return documents_indexed, documents_skipped, processing_errors - - -async def _index_composio_drive_full_scan( - session: AsyncSession, - composio_connector: ComposioConnector, - connector_id: int, - search_space_id: int, - user_id: str, - selected_folders: list[dict], - selected_files: list[dict], - max_files_per_folder: int, - include_subfolders: bool, - max_items: int, - task_logger: TaskLoggingService, - log_entry, -) -> tuple[int, int, list[str]]: - """Index Google Drive files using full scan (first sync or when no delta token).""" - documents_indexed = 0 - documents_skipped = 0 - processing_errors = [] - - all_files = [] - - # If specific folders/files are selected, fetch from those - if selected_folders or selected_files: - # Fetch files from selected folders - for folder in selected_folders: - folder_id = folder.get("id") - folder_name = folder.get("name", "Unknown") - - if not folder_id: - continue - - # Handle special case for "root" folder - actual_folder_id = None if folder_id == "root" else folder_id - - logger.info(f"Fetching files from folder: {folder_name} ({folder_id})") - - # Fetch files from this folder - folder_files = [] - page_token = None - - while len(folder_files) < max_files_per_folder: - ( - files, - next_token, - error, - ) = await composio_connector.list_drive_files( - folder_id=actual_folder_id, - page_token=page_token, - page_size=min(100, max_files_per_folder - len(folder_files)), - ) - - if error: - logger.warning( - f"Failed to fetch files from folder {folder_name}: {error}" - ) - break - - # Process files - for file_info in files: - mime_type = file_info.get("mimeType", "") or file_info.get( - "mime_type", "" - ) - - # If it's a folder and include_subfolders is enabled, recursively fetch - if mime_type == "application/vnd.google-apps.folder": - if include_subfolders: - # Add subfolder files recursively - subfolder_files = await _fetch_folder_files_recursively( - composio_connector, - file_info.get("id"), - max_files=max_files_per_folder, - current_count=len(folder_files), - ) - folder_files.extend(subfolder_files) - else: - folder_files.append(file_info) - - if not next_token: - break - page_token = next_token - - all_files.extend(folder_files[:max_files_per_folder]) - logger.info(f"Found {len(folder_files)} files in folder {folder_name}") - - # Add specifically selected files - for selected_file in selected_files: - file_id = selected_file.get("id") - file_name = selected_file.get("name", "Unknown") - - if not file_id: - continue - - # Add file info (we'll fetch content later during indexing) - all_files.append( - { - "id": file_id, - "name": file_name, - "mimeType": "", # Will be determined later - } - ) - else: - # No selection specified - fetch all files (original behavior) - page_token = None - - while len(all_files) < max_items: - files, next_token, error = await composio_connector.list_drive_files( - page_token=page_token, - page_size=min(100, max_items - len(all_files)), - ) - - if error: - return 0, 0, [f"Failed to fetch Drive files: {error}"] - - all_files.extend(files) - - if not next_token: - break - page_token = next_token - - if not all_files: - logger.info("No Google Drive files found") - return 0, 0, [] - - logger.info(f"Found {len(all_files)} Google Drive files to index via Composio (full scan)") - - for file_info in all_files: - try: - # Handle both standard Google API and potential Composio variations - file_id = file_info.get("id", "") or file_info.get("fileId", "") - file_name = ( - file_info.get("name", "") - or file_info.get("fileName", "") - or "Untitled" - ) - mime_type = file_info.get("mimeType", "") or file_info.get( - "mime_type", "" - ) - - if not file_id: - documents_skipped += 1 - continue - - # Skip folders - if mime_type == "application/vnd.google-apps.folder": - continue - - # Process the file - indexed, skipped, errors = await _process_single_drive_file( - session=session, - composio_connector=composio_connector, - file_id=file_id, - file_name=file_name, - mime_type=mime_type, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - task_logger=task_logger, - log_entry=log_entry, - ) - - documents_indexed += indexed - documents_skipped += skipped - processing_errors.extend(errors) - - # Batch commit every 10 documents - if documents_indexed > 0 and documents_indexed % 10 == 0: - logger.info(f"Committing batch: {documents_indexed} Google Drive files processed so far") - await session.commit() - - except Exception as e: - error_msg = f"Error processing Drive file {file_name or 'unknown'}: {e!s}" - logger.error(error_msg, exc_info=True) - processing_errors.append(error_msg) - documents_skipped += 1 - - logger.info(f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped") - return documents_indexed, documents_skipped, processing_errors - - -async def _process_single_drive_file( - session: AsyncSession, - composio_connector: ComposioConnector, - file_id: str, - file_name: str, - mime_type: str, - connector_id: int, - search_space_id: int, - user_id: str, - task_logger: TaskLoggingService, - log_entry, -) -> tuple[int, int, list[str]]: - """Process a single Google Drive file for indexing. - - Returns: - Tuple of (documents_indexed, documents_skipped, processing_errors) - """ - processing_errors = [] - - # Generate unique identifier hash - document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]) - unique_identifier_hash = generate_unique_identifier_hash( - document_type, f"drive_{file_id}", search_space_id - ) - - # Check if document exists - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - - # Get file content - content, content_error = await composio_connector.get_drive_file_content(file_id) - - if content_error or not content: - logger.warning( - f"Could not get content for file {file_name}: {content_error}" - ) - # Use metadata as content fallback - markdown_content = f"# {file_name}\n\n" - markdown_content += f"**File ID:** {file_id}\n" - markdown_content += f"**Type:** {mime_type}\n" - elif isinstance(content, dict): - # Safety check: if content is still a dict, log error and use fallback - error_msg = f"Unexpected dict content format for file {file_name}: {list(content.keys())}" - logger.error(error_msg) - processing_errors.append(error_msg) - markdown_content = f"# {file_name}\n\n" - markdown_content += f"**File ID:** {file_id}\n" - markdown_content += f"**Type:** {mime_type}\n" - else: - # Process content based on file type - markdown_content = await _process_file_content( - content=content, - file_name=file_name, - file_id=file_id, - mime_type=mime_type, - search_space_id=search_space_id, - user_id=user_id, - session=session, - task_logger=task_logger, - log_entry=log_entry, - processing_errors=processing_errors, - ) - - content_hash = generate_content_hash(markdown_content, search_space_id) - - if existing_document: - if existing_document.content_hash == content_hash: - return 0, 1, processing_errors # Skipped - - # Update existing document - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "file_id": file_id, - "file_name": file_name, - "mime_type": mime_type, - "document_type": "Google Drive File (Composio)", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - markdown_content, user_llm, document_metadata - ) - else: - summary_content = ( - f"Google Drive File: {file_name}\n\nType: {mime_type}" - ) - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) - - chunks = await create_document_chunks(markdown_content) - - existing_document.title = f"Drive: {file_name}" - existing_document.content = summary_content - existing_document.content_hash = content_hash - existing_document.embedding = summary_embedding - existing_document.document_metadata = { - "file_id": file_id, - "file_name": file_name, - "FILE_NAME": file_name, # For compatibility - "mime_type": mime_type, - "connector_id": connector_id, - "source": "composio", - } - existing_document.chunks = chunks - existing_document.updated_at = get_current_timestamp() - - return 1, 0, processing_errors # Indexed - - # Create new document - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "file_id": file_id, - "file_name": file_name, - "mime_type": mime_type, - "document_type": "Google Drive File (Composio)", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - markdown_content, user_llm, document_metadata - ) - else: - summary_content = ( - f"Google Drive File: {file_name}\n\nType: {mime_type}" - ) - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) - - chunks = await create_document_chunks(markdown_content) - - document = Document( - search_space_id=search_space_id, - title=f"Drive: {file_name}", - document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]), - document_metadata={ - "file_id": file_id, - "file_name": file_name, - "FILE_NAME": file_name, # For compatibility - "mime_type": mime_type, - "connector_id": connector_id, - "toolkit_id": "googledrive", - "source": "composio", - }, - content=summary_content, - content_hash=content_hash, - unique_identifier_hash=unique_identifier_hash, - embedding=summary_embedding, - chunks=chunks, - updated_at=get_current_timestamp(), - ) - session.add(document) - - return 1, 0, processing_errors # Indexed - - -async def _fetch_folder_files_recursively( - composio_connector: ComposioConnector, - folder_id: str, - max_files: int = 100, - current_count: int = 0, - depth: int = 0, - max_depth: int = 10, -) -> list[dict[str, Any]]: - """ - Recursively fetch files from a Google Drive folder via Composio. - - Args: - composio_connector: The Composio connector instance - folder_id: Google Drive folder ID - max_files: Maximum number of files to fetch - current_count: Current number of files already fetched - depth: Current recursion depth - max_depth: Maximum recursion depth to prevent infinite loops - - Returns: - List of file info dictionaries - """ - if depth >= max_depth: - logger.warning(f"Max recursion depth reached for folder {folder_id}") - return [] - - if current_count >= max_files: - return [] - - all_files = [] - page_token = None - - try: - while len(all_files) + current_count < max_files: - files, next_token, error = await composio_connector.list_drive_files( - folder_id=folder_id, - page_token=page_token, - page_size=min(100, max_files - len(all_files) - current_count), - ) - - if error: - logger.warning( - f"Error fetching files from subfolder {folder_id}: {error}" - ) - break - - for file_info in files: - mime_type = file_info.get("mimeType", "") or file_info.get( - "mime_type", "" - ) - - if mime_type == "application/vnd.google-apps.folder": - # Recursively fetch from subfolders - subfolder_files = await _fetch_folder_files_recursively( - composio_connector, - file_info.get("id"), - max_files=max_files, - current_count=current_count + len(all_files), - depth=depth + 1, - max_depth=max_depth, - ) - all_files.extend(subfolder_files) - else: - all_files.append(file_info) - - if len(all_files) + current_count >= max_files: - break - - if not next_token: - break - page_token = next_token - - return all_files[: max_files - current_count] - - except Exception as e: - logger.error(f"Error in recursive folder fetch: {e!s}") - return all_files - - -async def _process_gmail_message_batch( - session: AsyncSession, - messages: list[dict[str, Any]], - composio_connector: ComposioConnector, - connector_id: int, - search_space_id: int, - user_id: str, - total_documents_indexed: int = 0, -) -> tuple[int, int]: - """ - Process a batch of Gmail messages and index them. - - Args: - total_documents_indexed: Running total of documents indexed so far (for batch commits). - - Returns: - Tuple of (documents_indexed, documents_skipped) - """ - documents_indexed = 0 - documents_skipped = 0 - - for message in messages: - try: - # Composio uses 'messageId' (camelCase), not 'id' - message_id = message.get("messageId", "") or message.get("id", "") - if not message_id: - documents_skipped += 1 - continue - - # Composio's GMAIL_FETCH_EMAILS already returns full message content - # No need for a separate detail API call - - # Extract message info from Composio response - # Composio structure: messageId, messageText, messageTimestamp, payload.headers, labelIds - payload = message.get("payload", {}) - headers = payload.get("headers", []) - - subject = "No Subject" - sender = "Unknown Sender" - date_str = message.get("messageTimestamp", "Unknown Date") - - for header in headers: - name = header.get("name", "").lower() - value = header.get("value", "") - if name == "subject": - subject = value - elif name == "from": - sender = value - elif name == "date": - date_str = value - - # Format to markdown using the full message data - markdown_content = composio_connector.format_gmail_message_to_markdown( - message - ) - - # Check for empty content (defensive parsing per Composio best practices) - if not markdown_content.strip(): - logger.warning(f"Skipping Gmail message with no content: {subject}") - documents_skipped += 1 - continue - - # Generate unique identifier - document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["gmail"]) - unique_identifier_hash = generate_unique_identifier_hash( - document_type, f"gmail_{message_id}", search_space_id - ) - - content_hash = generate_content_hash(markdown_content, search_space_id) - - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - - # Get label IDs from Composio response - label_ids = message.get("labelIds", []) - # Extract thread_id if available (for consistency with non-Composio implementation) - thread_id = message.get("threadId", "") or message.get("thread_id", "") - - if existing_document: - if existing_document.content_hash == content_hash: - documents_skipped += 1 - continue - - # Update existing - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "message_id": message_id, - "thread_id": thread_id, - "subject": subject, - "sender": sender, - "document_type": "Gmail Message (Composio)", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - markdown_content, user_llm, document_metadata - ) - else: - summary_content = ( - f"Gmail: {subject}\n\nFrom: {sender}\nDate: {date_str}" - ) - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) - - chunks = await create_document_chunks(markdown_content) - - existing_document.title = f"Gmail: {subject}" - existing_document.content = summary_content - existing_document.content_hash = content_hash - existing_document.embedding = summary_embedding - existing_document.document_metadata = { - "message_id": message_id, - "thread_id": thread_id, - "subject": subject, - "sender": sender, - "date": date_str, - "labels": label_ids, - "connector_id": connector_id, - "source": "composio", - } - existing_document.chunks = chunks - existing_document.updated_at = get_current_timestamp() - - documents_indexed += 1 - - # Batch commit every 10 documents - current_total = total_documents_indexed + documents_indexed - if current_total % 10 == 0: - logger.info( - f"Committing batch: {current_total} Gmail messages processed so far" - ) - await session.commit() - continue - - # Create new document - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "message_id": message_id, - "thread_id": thread_id, - "subject": subject, - "sender": sender, - "document_type": "Gmail Message (Composio)", - } - summary_content, summary_embedding = await generate_document_summary( - markdown_content, user_llm, document_metadata - ) - else: - summary_content = ( - f"Gmail: {subject}\n\nFrom: {sender}\nDate: {date_str}" - ) - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) - - chunks = await create_document_chunks(markdown_content) - - document = Document( - search_space_id=search_space_id, - title=f"Gmail: {subject}", - document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["gmail"]), - document_metadata={ - "message_id": message_id, - "thread_id": thread_id, - "subject": subject, - "sender": sender, - "date": date_str, - "labels": label_ids, - "connector_id": connector_id, - "toolkit_id": "gmail", - "source": "composio", - }, - content=summary_content, - content_hash=content_hash, - unique_identifier_hash=unique_identifier_hash, - embedding=summary_embedding, - chunks=chunks, - updated_at=get_current_timestamp(), - ) - session.add(document) - documents_indexed += 1 - - # Batch commit every 10 documents - current_total = total_documents_indexed + documents_indexed - if current_total % 10 == 0: - logger.info( - f"Committing batch: {current_total} Gmail messages processed so far" - ) - await session.commit() - - except Exception as e: - logger.error(f"Error processing Gmail message: {e!s}", exc_info=True) - documents_skipped += 1 - # Rollback on error to avoid partial state (per Composio best practices) - try: - await session.rollback() - except Exception as rollback_error: - logger.error( - f"Error during rollback: {rollback_error!s}", exc_info=True - ) - continue - - return documents_indexed, documents_skipped - - -async def _index_composio_gmail( - session: AsyncSession, - connector, - connector_id: int, - search_space_id: int, - user_id: str, - start_date: str | None, - end_date: str | None, - task_logger: TaskLoggingService, - log_entry, - update_last_indexed: bool = True, - max_items: int = 1000, -) -> tuple[int, str]: - """Index Gmail messages via Composio with pagination and incremental processing.""" - try: - composio_connector = ComposioConnector(session, connector_id) - - # Normalize date values - handle "undefined" strings from frontend - if start_date == "undefined" or start_date == "": - start_date = None - if end_date == "undefined" or end_date == "": - end_date = None - - # Use provided dates directly if both are provided, otherwise calculate from last_indexed_at - # This ensures user-selected dates are respected (matching non-Composio Gmail connector behavior) - if start_date is not None and end_date is not None: - # User provided both dates - use them directly - start_date_str = start_date - end_date_str = end_date - else: - # Calculate date range with defaults (uses last_indexed_at or 365 days back) - # This ensures indexing works even when user doesn't specify dates - start_date_str, end_date_str = calculate_date_range( - connector, start_date, end_date, default_days_back=365 - ) - - # Build query with date range - query_parts = [] - if start_date_str: - query_parts.append(f"after:{start_date_str.replace('-', '/')}") - if end_date_str: - query_parts.append(f"before:{end_date_str.replace('-', '/')}") - query = " ".join(query_parts) if query_parts else "" - - logger.info( - f"Gmail query for connector {connector_id}: '{query}' " - f"(start_date={start_date_str}, end_date={end_date_str})" - ) - - # Use smaller batch size to avoid 413 payload too large errors - batch_size = 50 - page_token = None - total_documents_indexed = 0 - total_documents_skipped = 0 - total_messages_fetched = 0 - result_size_estimate = None # Will be set from first API response - - while total_messages_fetched < max_items: - # Calculate how many messages to fetch in this batch - remaining = max_items - total_messages_fetched - current_batch_size = min(batch_size, remaining) - - # Use result_size_estimate if available, otherwise fall back to max_items - estimated_total = ( - result_size_estimate if result_size_estimate is not None else max_items - ) - # Cap estimated_total at max_items to avoid showing misleading progress - estimated_total = min(estimated_total, max_items) - - await task_logger.log_task_progress( - log_entry, - f"Fetching Gmail messages batch via Composio for connector {connector_id} " - f"({total_messages_fetched}/{estimated_total} fetched, {total_documents_indexed} indexed)", - { - "stage": "fetching_messages", - "batch_size": current_batch_size, - "total_fetched": total_messages_fetched, - "total_indexed": total_documents_indexed, - "estimated_total": estimated_total, - }, - ) - - # Fetch batch of messages - ( - messages, - next_token, - result_size_estimate_batch, - error, - ) = await composio_connector.list_gmail_messages( - query=query, - max_results=current_batch_size, - page_token=page_token, - ) - - if error: - await task_logger.log_task_failure( - log_entry, f"Failed to fetch Gmail messages: {error}", {} - ) - return 0, f"Failed to fetch Gmail messages: {error}" - - if not messages: - # No more messages available - break - - # Update result_size_estimate from first response (Gmail provides this estimate) - if result_size_estimate is None and result_size_estimate_batch is not None: - result_size_estimate = result_size_estimate_batch - logger.info( - f"Gmail API estimated {result_size_estimate} total messages for query: '{query}'" - ) - - total_messages_fetched += len(messages) - # Recalculate estimated_total after potentially updating result_size_estimate - estimated_total = ( - result_size_estimate if result_size_estimate is not None else max_items - ) - estimated_total = min(estimated_total, max_items) - - logger.info( - f"Fetched batch of {len(messages)} Gmail messages " - f"(total: {total_messages_fetched}/{estimated_total})" - ) - - # Process batch incrementally - batch_indexed, batch_skipped = await _process_gmail_message_batch( - session=session, - messages=messages, - composio_connector=composio_connector, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - total_documents_indexed=total_documents_indexed, - ) - - total_documents_indexed += batch_indexed - total_documents_skipped += batch_skipped - - logger.info( - f"Processed batch: {batch_indexed} indexed, {batch_skipped} skipped " - f"(total: {total_documents_indexed} indexed, {total_documents_skipped} skipped)" - ) - - # Batch commits happen in _process_gmail_message_batch every 10 documents - # This ensures progress is saved incrementally, preventing data loss on crashes - - # Check if we should continue - if not next_token: - # No more pages available - break - - if len(messages) < current_batch_size: - # Last page had fewer items than requested, we're done - break - - # Continue with next page - page_token = next_token - - if total_messages_fetched == 0: - success_msg = "No Gmail messages found in the specified date range" - await task_logger.log_task_success( - log_entry, success_msg, {"messages_count": 0} - ) - # CRITICAL: Update timestamp even when no messages found so Electric SQL syncs and UI shows indexed status - await update_connector_last_indexed(session, connector, update_last_indexed) - await session.commit() - return 0, None # Return None (not error) when no items found - - # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs - # This ensures the UI shows "Last indexed" instead of "Never indexed" - await update_connector_last_indexed(session, connector, update_last_indexed) - - # Final commit to ensure all documents are persisted (safety net) - # This matches the pattern used in non-Composio Gmail indexer - logger.info( - f"Final commit: Total {total_documents_indexed} Gmail messages processed" - ) - await session.commit() - logger.info( - "Successfully committed all Composio Gmail document changes to database" - ) - - await task_logger.log_task_success( - log_entry, - f"Successfully completed Gmail indexing via Composio for connector {connector_id}", - { - "documents_indexed": total_documents_indexed, - "documents_skipped": total_documents_skipped, - "messages_fetched": total_messages_fetched, - }, - ) - - return total_documents_indexed, None - - except Exception as e: - logger.error(f"Failed to index Gmail via Composio: {e!s}", exc_info=True) - return 0, f"Failed to index Gmail via Composio: {e!s}" - - -async def _index_composio_google_calendar( - session: AsyncSession, - connector, - connector_id: int, - search_space_id: int, - user_id: str, - start_date: str | None, - end_date: str | None, - task_logger: TaskLoggingService, - log_entry, - update_last_indexed: bool = True, - max_items: int = 2500, -) -> tuple[int, str]: - """Index Google Calendar events via Composio.""" - try: - composio_connector = ComposioConnector(session, connector_id) - - await task_logger.log_task_progress( - log_entry, - f"Fetching Google Calendar events via Composio for connector {connector_id}", - {"stage": "fetching_events"}, - ) - - # Normalize date values - handle "undefined" strings from frontend - if start_date == "undefined" or start_date == "": - start_date = None - if end_date == "undefined" or end_date == "": - end_date = None - - # Use provided dates directly if both are provided, otherwise calculate from last_indexed_at - # This ensures user-selected dates are respected (matching non-Composio Calendar connector behavior) - if start_date is not None and end_date is not None: - # User provided both dates - use them directly - start_date_str = start_date - end_date_str = end_date - else: - # Calculate date range with defaults (uses last_indexed_at or 365 days back) - # This ensures indexing works even when user doesn't specify dates - start_date_str, end_date_str = calculate_date_range( - connector, start_date, end_date, default_days_back=365 - ) - - # Build time range for API call - time_min = f"{start_date_str}T00:00:00Z" - time_max = f"{end_date_str}T23:59:59Z" - - logger.info( - f"Google Calendar query for connector {connector_id}: " - f"(start_date={start_date_str}, end_date={end_date_str})" - ) - - events, error = await composio_connector.list_calendar_events( - time_min=time_min, - time_max=time_max, - max_results=max_items, - ) - - if error: - await task_logger.log_task_failure( - log_entry, f"Failed to fetch Calendar events: {error}", {} - ) - return 0, f"Failed to fetch Calendar events: {error}" - - if not events: - success_msg = "No Google Calendar events found in the specified date range" - await task_logger.log_task_success( - log_entry, success_msg, {"events_count": 0} - ) - # CRITICAL: Update timestamp even when no events found so Electric SQL syncs and UI shows indexed status - await update_connector_last_indexed(session, connector, update_last_indexed) - await session.commit() - return ( - 0, - None, - ) # Return None (not error) when no items found - this is success with 0 items - - logger.info(f"Found {len(events)} Google Calendar events to index via Composio") - - documents_indexed = 0 - documents_skipped = 0 - - for event in events: - try: - # Handle both standard Google API and potential Composio variations - event_id = event.get("id", "") or event.get("eventId", "") - summary = ( - event.get("summary", "") or event.get("title", "") or "No Title" - ) - - if not event_id: - documents_skipped += 1 - continue - - # Format to markdown - markdown_content = composio_connector.format_calendar_event_to_markdown( - event - ) - - # Generate unique identifier - document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googlecalendar"]) - unique_identifier_hash = generate_unique_identifier_hash( - document_type, f"calendar_{event_id}", search_space_id - ) - - content_hash = generate_content_hash(markdown_content, search_space_id) - - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - - # Extract event times - start = event.get("start", {}) - end = event.get("end", {}) - start_time = start.get("dateTime") or start.get("date", "") - end_time = end.get("dateTime") or end.get("date", "") - location = event.get("location", "") - - if existing_document: - if existing_document.content_hash == content_hash: - documents_skipped += 1 - continue - - # Update existing - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "event_id": event_id, - "summary": summary, - "start_time": start_time, - "document_type": "Google Calendar Event (Composio)", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - markdown_content, user_llm, document_metadata - ) - else: - summary_content = f"Calendar: {summary}\n\nStart: {start_time}\nEnd: {end_time}" - if location: - summary_content += f"\nLocation: {location}" - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) - - chunks = await create_document_chunks(markdown_content) - - existing_document.title = f"Calendar: {summary}" - existing_document.content = summary_content - existing_document.content_hash = content_hash - existing_document.embedding = summary_embedding - existing_document.document_metadata = { - "event_id": event_id, - "summary": summary, - "start_time": start_time, - "end_time": end_time, - "location": location, - "connector_id": connector_id, - "source": "composio", - } - existing_document.chunks = chunks - existing_document.updated_at = get_current_timestamp() - - documents_indexed += 1 - - # Batch commit every 10 documents - if documents_indexed % 10 == 0: - logger.info( - f"Committing batch: {documents_indexed} Google Calendar events processed so far" - ) - await session.commit() - continue - - # Create new document - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "event_id": event_id, - "summary": summary, - "start_time": start_time, - "document_type": "Google Calendar Event (Composio)", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - markdown_content, user_llm, document_metadata - ) - else: - summary_content = ( - f"Calendar: {summary}\n\nStart: {start_time}\nEnd: {end_time}" - ) - if location: - summary_content += f"\nLocation: {location}" - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) - - chunks = await create_document_chunks(markdown_content) - - document = Document( - search_space_id=search_space_id, - title=f"Calendar: {summary}", - document_type=DocumentType( - TOOLKIT_TO_DOCUMENT_TYPE["googlecalendar"] - ), - document_metadata={ - "event_id": event_id, - "summary": summary, - "start_time": start_time, - "end_time": end_time, - "location": location, - "connector_id": connector_id, - "toolkit_id": "googlecalendar", - "source": "composio", - }, - content=summary_content, - content_hash=content_hash, - unique_identifier_hash=unique_identifier_hash, - embedding=summary_embedding, - chunks=chunks, - updated_at=get_current_timestamp(), - ) - session.add(document) - documents_indexed += 1 - - # Batch commit every 10 documents - if documents_indexed % 10 == 0: - logger.info( - f"Committing batch: {documents_indexed} Google Calendar events processed so far" - ) - await session.commit() - - except Exception as e: - logger.error(f"Error processing Calendar event: {e!s}", exc_info=True) - documents_skipped += 1 - continue - - # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs - # This ensures the UI shows "Last indexed" instead of "Never indexed" - await update_connector_last_indexed(session, connector, update_last_indexed) - - # Final commit to ensure all documents are persisted (safety net) - # This matches the pattern used in non-Composio Gmail indexer - logger.info( - f"Final commit: Total {documents_indexed} Google Calendar events processed" - ) - await session.commit() - logger.info( - "Successfully committed all Composio Google Calendar document changes to database" - ) - - await task_logger.log_task_success( - log_entry, - f"Successfully completed Google Calendar indexing via Composio for connector {connector_id}", - { - "documents_indexed": documents_indexed, - "documents_skipped": documents_skipped, - }, - ) - - return documents_indexed, None - - except Exception as e: - logger.error( - f"Failed to index Google Calendar via Composio: {e!s}", exc_info=True - ) - return 0, f"Failed to index Google Calendar via Composio: {e!s}" diff --git a/surfsense_web/components/assistant-ui/connector-popup/components/composio-connector-card.tsx b/surfsense_web/components/assistant-ui/connector-popup/components/composio-connector-card.tsx deleted file mode 100644 index 671fc3ce6..000000000 --- a/surfsense_web/components/assistant-ui/connector-popup/components/composio-connector-card.tsx +++ /dev/null @@ -1,78 +0,0 @@ -"use client"; - -import { Zap } from "lucide-react"; -import Image from "next/image"; -import type { FC } from "react"; -import { Button } from "@/components/ui/button"; -import { cn } from "@/lib/utils"; - -interface ComposioConnectorCardProps { - id: string; - title: string; - description: string; - connectorCount?: number; - onConnect: () => void; -} - -export const ComposioConnectorCard: FC = ({ - id, - title, - description, - connectorCount = 0, - onConnect, -}) => { - const hasConnections = connectorCount > 0; - - return ( -
-
- Composio -
-
-
- {title} - -
- {hasConnections ? ( -

- - {connectorCount} {connectorCount === 1 ? "connection" : "connections"} - -

- ) : ( -

{description}

- )} -
- -
- ); -}; diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/composio-calendar-config.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/composio-calendar-config.tsx new file mode 100644 index 000000000..6e7a06073 --- /dev/null +++ b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/composio-calendar-config.tsx @@ -0,0 +1,220 @@ +"use client"; + +import { Calendar, Clock } from "lucide-react"; +import type { FC } from "react"; +import { useEffect, useState } from "react"; +import { Label } from "@/components/ui/label"; +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from "@/components/ui/select"; +import { Switch } from "@/components/ui/switch"; +import type { SearchSourceConnector } from "@/contracts/types/connector.types"; + +interface ComposioCalendarConfigProps { + connector: SearchSourceConnector; + onConfigChange?: (config: Record) => void; + onNameChange?: (name: string) => void; +} + +interface CalendarIndexingOptions { + max_events: number; + include_recurring: boolean; + include_past_events: boolean; + days_ahead: number; +} + +const DEFAULT_CALENDAR_OPTIONS: CalendarIndexingOptions = { + max_events: 500, + include_recurring: true, + include_past_events: true, + days_ahead: 365, +}; + +export const ComposioCalendarConfig: FC = ({ connector, onConfigChange }) => { + const isIndexable = connector.config?.is_indexable as boolean; + + // Initialize with existing options from connector config + const existingOptions = + (connector.config?.calendar_options as CalendarIndexingOptions | undefined) || DEFAULT_CALENDAR_OPTIONS; + + const [calendarOptions, setCalendarOptions] = useState(existingOptions); + + // Update options when connector config changes + useEffect(() => { + const options = + (connector.config?.calendar_options as CalendarIndexingOptions | undefined) || + DEFAULT_CALENDAR_OPTIONS; + setCalendarOptions(options); + }, [connector.config]); + + const updateConfig = (options: CalendarIndexingOptions) => { + if (onConfigChange) { + onConfigChange({ + ...connector.config, + calendar_options: options, + }); + } + }; + + const handleOptionChange = (key: keyof CalendarIndexingOptions, value: number | boolean) => { + const newOptions = { ...calendarOptions, [key]: value }; + setCalendarOptions(newOptions); + updateConfig(newOptions); + }; + + // Only show configuration if the connector is indexable + if (!isIndexable) { + return
; + } + + return ( +
+ {/* Calendar Indexing Options */} +
+
+
+ +

Calendar Indexing Options

+
+

+ Configure how events are indexed from your Google Calendar. +

+
+ + {/* Max events to index */} +
+
+
+ +

+ Maximum number of events to index per sync +

+
+ +
+
+ + {/* Days ahead */} +
+
+
+
+ + +
+

+ How far ahead to index future events +

+
+ +
+
+ + {/* Include recurring events toggle */} +
+
+ +

+ Index individual instances of recurring events +

+
+ + handleOptionChange("include_recurring", checked) + } + /> +
+ + {/* Include past events toggle */} +
+
+ +

+ Index events from before the selected date range +

+
+ + handleOptionChange("include_past_events", checked) + } + /> +
+
+
+ ); +}; + diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/composio-config.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/composio-config.tsx deleted file mode 100644 index fdff956e5..000000000 --- a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/composio-config.tsx +++ /dev/null @@ -1,353 +0,0 @@ -"use client"; - -import { File, FileSpreadsheet, FileText, FolderClosed, Image, Presentation } from "lucide-react"; -import type { FC } from "react"; -import { useEffect, useState } from "react"; -import { ComposioDriveFolderTree } from "@/components/connectors/composio-drive-folder-tree"; -import { Badge } from "@/components/ui/badge"; -import { Button } from "@/components/ui/button"; -import { Label } from "@/components/ui/label"; -import { - Select, - SelectContent, - SelectItem, - SelectTrigger, - SelectValue, -} from "@/components/ui/select"; -import { Switch } from "@/components/ui/switch"; -import type { SearchSourceConnector } from "@/contracts/types/connector.types"; -import { cn } from "@/lib/utils"; - -interface ComposioConfigProps { - connector: SearchSourceConnector; - onConfigChange?: (config: Record) => void; - onNameChange?: (name: string) => void; -} - -interface SelectedFolder { - id: string; - name: string; -} - -interface IndexingOptions { - max_files_per_folder: number; - incremental_sync: boolean; - include_subfolders: boolean; -} - -const DEFAULT_INDEXING_OPTIONS: IndexingOptions = { - max_files_per_folder: 100, - incremental_sync: true, - include_subfolders: true, -}; - -// Helper to get appropriate icon for file type based on file name -function getFileIconFromName(fileName: string, className: string = "size-3.5 shrink-0") { - const lowerName = fileName.toLowerCase(); - // Spreadsheets - if ( - lowerName.endsWith(".xlsx") || - lowerName.endsWith(".xls") || - lowerName.endsWith(".csv") || - lowerName.includes("spreadsheet") - ) { - return ; - } - // Presentations - if ( - lowerName.endsWith(".pptx") || - lowerName.endsWith(".ppt") || - lowerName.includes("presentation") - ) { - return ; - } - // Documents (word, text only - not PDF) - if ( - lowerName.endsWith(".docx") || - lowerName.endsWith(".doc") || - lowerName.endsWith(".txt") || - lowerName.includes("document") || - lowerName.includes("word") || - lowerName.includes("text") - ) { - return ; - } - // Images - if ( - lowerName.endsWith(".png") || - lowerName.endsWith(".jpg") || - lowerName.endsWith(".jpeg") || - lowerName.endsWith(".gif") || - lowerName.endsWith(".webp") || - lowerName.endsWith(".svg") - ) { - return ; - } - // Default (including PDF) - return ; -} - -export const ComposioConfig: FC = ({ connector, onConfigChange }) => { - const toolkitId = connector.config?.toolkit_id as string; - const isIndexable = connector.config?.is_indexable as boolean; - const composioAccountId = connector.config?.composio_connected_account_id as string; - - // Check if this is a Google Drive Composio connector - const isGoogleDrive = toolkitId === "googledrive"; - - // Initialize with existing selected folders and files from connector config - const existingFolders = - (connector.config?.selected_folders as SelectedFolder[] | undefined) || []; - const existingFiles = (connector.config?.selected_files as SelectedFolder[] | undefined) || []; - const existingIndexingOptions = - (connector.config?.indexing_options as IndexingOptions | undefined) || DEFAULT_INDEXING_OPTIONS; - - const [selectedFolders, setSelectedFolders] = useState(existingFolders); - const [selectedFiles, setSelectedFiles] = useState(existingFiles); - const [showFolderSelector, setShowFolderSelector] = useState(false); - const [indexingOptions, setIndexingOptions] = useState(existingIndexingOptions); - - // Update selected folders and files when connector config changes - useEffect(() => { - const folders = (connector.config?.selected_folders as SelectedFolder[] | undefined) || []; - const files = (connector.config?.selected_files as SelectedFolder[] | undefined) || []; - const options = - (connector.config?.indexing_options as IndexingOptions | undefined) || - DEFAULT_INDEXING_OPTIONS; - setSelectedFolders(folders); - setSelectedFiles(files); - setIndexingOptions(options); - }, [connector.config]); - - const updateConfig = ( - folders: SelectedFolder[], - files: SelectedFolder[], - options: IndexingOptions - ) => { - if (onConfigChange) { - onConfigChange({ - ...connector.config, - selected_folders: folders, - selected_files: files, - indexing_options: options, - }); - } - }; - - const handleSelectFolders = (folders: SelectedFolder[]) => { - setSelectedFolders(folders); - updateConfig(folders, selectedFiles, indexingOptions); - }; - - const handleSelectFiles = (files: SelectedFolder[]) => { - setSelectedFiles(files); - updateConfig(selectedFolders, files, indexingOptions); - }; - - const handleIndexingOptionChange = (key: keyof IndexingOptions, value: number | boolean) => { - const newOptions = { ...indexingOptions, [key]: value }; - setIndexingOptions(newOptions); - updateConfig(selectedFolders, selectedFiles, newOptions); - }; - - const totalSelected = selectedFolders.length + selectedFiles.length; - - return ( -
- {/* Connection Details */} -
-

- Connection Details -

-
-
- Toolkit - {toolkitId} -
-
- Indexing Supported - - {isIndexable ? "Yes" : "Coming Soon"} - -
- {composioAccountId && ( -
- Account ID - - {composioAccountId} - -
- )} -
-
- - {/* Google Drive specific: Folder & File Selection */} - {isGoogleDrive && isIndexable && ( - <> -
-
-

Folder & File Selection

-

- Select specific folders and/or individual files to index. -

-
- - {totalSelected > 0 && ( -
-

- Selected {totalSelected} item{totalSelected > 1 ? "s" : ""}: {(() => { - const parts: string[] = []; - if (selectedFolders.length > 0) { - parts.push( - `${selectedFolders.length} folder${selectedFolders.length > 1 ? "s" : ""}` - ); - } - if (selectedFiles.length > 0) { - parts.push( - `${selectedFiles.length} file${selectedFiles.length > 1 ? "s" : ""}` - ); - } - return parts.length > 0 ? `(${parts.join(" ")})` : ""; - })()} -

-
- {selectedFolders.map((folder) => ( -

- - {folder.name} -

- ))} - {selectedFiles.map((file) => ( -

- {getFileIconFromName(file.name)} - {file.name} -

- ))} -
-
- )} - - {showFolderSelector ? ( -
- - -
- ) : ( - - )} -
- - {/* Indexing Options */} -
-
-

Indexing Options

-

- Configure how files are indexed from your Google Drive. -

-
- - {/* Max files per folder */} -
-
-
- -

- Maximum number of files to index from each folder -

-
- -
-
- - {/* Include subfolders toggle */} -
-
- -

- Recursively index files in subfolders of selected folders -

-
- - handleIndexingOptionChange("include_subfolders", checked) - } - /> -
-
- - )} -
- ); -}; diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/composio-drive-config.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/composio-drive-config.tsx new file mode 100644 index 000000000..755b91a5a --- /dev/null +++ b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/composio-drive-config.tsx @@ -0,0 +1,313 @@ +"use client"; + +import { File, FileSpreadsheet, FileText, FolderClosed, Image, Presentation } from "lucide-react"; +import type { FC } from "react"; +import { useEffect, useState } from "react"; +import { ComposioDriveFolderTree } from "@/components/connectors/composio-drive-folder-tree"; +import { Button } from "@/components/ui/button"; +import { Label } from "@/components/ui/label"; +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from "@/components/ui/select"; +import { Switch } from "@/components/ui/switch"; +import type { SearchSourceConnector } from "@/contracts/types/connector.types"; + +interface ComposioDriveConfigProps { + connector: SearchSourceConnector; + onConfigChange?: (config: Record) => void; + onNameChange?: (name: string) => void; +} + +interface SelectedFolder { + id: string; + name: string; +} + +interface IndexingOptions { + max_files_per_folder: number; + incremental_sync: boolean; + include_subfolders: boolean; +} + +const DEFAULT_INDEXING_OPTIONS: IndexingOptions = { + max_files_per_folder: 100, + incremental_sync: true, + include_subfolders: true, +}; + +// Helper to get appropriate icon for file type based on file name +function getFileIconFromName(fileName: string, className: string = "size-3.5 shrink-0") { + const lowerName = fileName.toLowerCase(); + // Spreadsheets + if ( + lowerName.endsWith(".xlsx") || + lowerName.endsWith(".xls") || + lowerName.endsWith(".csv") || + lowerName.includes("spreadsheet") + ) { + return ; + } + // Presentations + if ( + lowerName.endsWith(".pptx") || + lowerName.endsWith(".ppt") || + lowerName.includes("presentation") + ) { + return ; + } + // Documents (word, text only - not PDF) + if ( + lowerName.endsWith(".docx") || + lowerName.endsWith(".doc") || + lowerName.endsWith(".txt") || + lowerName.includes("document") || + lowerName.includes("word") || + lowerName.includes("text") + ) { + return ; + } + // Images + if ( + lowerName.endsWith(".png") || + lowerName.endsWith(".jpg") || + lowerName.endsWith(".jpeg") || + lowerName.endsWith(".gif") || + lowerName.endsWith(".webp") || + lowerName.endsWith(".svg") + ) { + return ; + } + // Default (including PDF) + return ; +} + +export const ComposioDriveConfig: FC = ({ connector, onConfigChange }) => { + const isIndexable = connector.config?.is_indexable as boolean; + + // Initialize with existing selected folders and files from connector config + const existingFolders = + (connector.config?.selected_folders as SelectedFolder[] | undefined) || []; + const existingFiles = (connector.config?.selected_files as SelectedFolder[] | undefined) || []; + const existingIndexingOptions = + (connector.config?.indexing_options as IndexingOptions | undefined) || DEFAULT_INDEXING_OPTIONS; + + const [selectedFolders, setSelectedFolders] = useState(existingFolders); + const [selectedFiles, setSelectedFiles] = useState(existingFiles); + const [showFolderSelector, setShowFolderSelector] = useState(false); + const [indexingOptions, setIndexingOptions] = useState(existingIndexingOptions); + + // Update selected folders and files when connector config changes + useEffect(() => { + const folders = (connector.config?.selected_folders as SelectedFolder[] | undefined) || []; + const files = (connector.config?.selected_files as SelectedFolder[] | undefined) || []; + const options = + (connector.config?.indexing_options as IndexingOptions | undefined) || + DEFAULT_INDEXING_OPTIONS; + setSelectedFolders(folders); + setSelectedFiles(files); + setIndexingOptions(options); + }, [connector.config]); + + const updateConfig = ( + folders: SelectedFolder[], + files: SelectedFolder[], + options: IndexingOptions + ) => { + if (onConfigChange) { + onConfigChange({ + ...connector.config, + selected_folders: folders, + selected_files: files, + indexing_options: options, + }); + } + }; + + const handleSelectFolders = (folders: SelectedFolder[]) => { + setSelectedFolders(folders); + updateConfig(folders, selectedFiles, indexingOptions); + }; + + const handleSelectFiles = (files: SelectedFolder[]) => { + setSelectedFiles(files); + updateConfig(selectedFolders, files, indexingOptions); + }; + + const handleIndexingOptionChange = (key: keyof IndexingOptions, value: number | boolean) => { + const newOptions = { ...indexingOptions, [key]: value }; + setIndexingOptions(newOptions); + updateConfig(selectedFolders, selectedFiles, newOptions); + }; + + const totalSelected = selectedFolders.length + selectedFiles.length; + + // Only show configuration if the connector is indexable + if (!isIndexable) { + return
; + } + + return ( +
+ {/* Folder & File Selection */} +
+
+

Folder & File Selection

+

+ Select specific folders and/or individual files to index from your Google Drive. +

+
+ + {totalSelected > 0 && ( +
+

+ Selected {totalSelected} item{totalSelected > 1 ? "s" : ""}: {(() => { + const parts: string[] = []; + if (selectedFolders.length > 0) { + parts.push( + `${selectedFolders.length} folder${selectedFolders.length > 1 ? "s" : ""}` + ); + } + if (selectedFiles.length > 0) { + parts.push( + `${selectedFiles.length} file${selectedFiles.length > 1 ? "s" : ""}` + ); + } + return parts.length > 0 ? `(${parts.join(" ")})` : ""; + })()} +

+
+ {selectedFolders.map((folder) => ( +

+ + {folder.name} +

+ ))} + {selectedFiles.map((file) => ( +

+ {getFileIconFromName(file.name)} + {file.name} +

+ ))} +
+
+ )} + + {showFolderSelector ? ( +
+ + +
+ ) : ( + + )} +
+ + {/* Indexing Options */} +
+
+

Indexing Options

+

+ Configure how files are indexed from your Google Drive. +

+
+ + {/* Max files per folder */} +
+
+
+ +

+ Maximum number of files to index from each folder +

+
+ +
+
+ + {/* Include subfolders toggle */} +
+
+ +

+ Recursively index files in subfolders of selected folders +

+
+ + handleIndexingOptionChange("include_subfolders", checked) + } + /> +
+
+
+ ); +}; + diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/composio-gmail-config.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/composio-gmail-config.tsx new file mode 100644 index 000000000..963753ab3 --- /dev/null +++ b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/composio-gmail-config.tsx @@ -0,0 +1,174 @@ +"use client"; + +import { Mail, Tag } from "lucide-react"; +import type { FC } from "react"; +import { useEffect, useState } from "react"; +import { Input } from "@/components/ui/input"; +import { Label } from "@/components/ui/label"; +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from "@/components/ui/select"; +import type { SearchSourceConnector } from "@/contracts/types/connector.types"; + +interface ComposioGmailConfigProps { + connector: SearchSourceConnector; + onConfigChange?: (config: Record) => void; + onNameChange?: (name: string) => void; +} + +interface GmailIndexingOptions { + max_emails: number; + label_filter: string; + search_query: string; +} + +const DEFAULT_GMAIL_OPTIONS: GmailIndexingOptions = { + max_emails: 500, + label_filter: "", + search_query: "", +}; + +export const ComposioGmailConfig: FC = ({ connector, onConfigChange }) => { + const isIndexable = connector.config?.is_indexable as boolean; + + // Initialize with existing options from connector config + const existingOptions = + (connector.config?.gmail_options as GmailIndexingOptions | undefined) || DEFAULT_GMAIL_OPTIONS; + + const [gmailOptions, setGmailOptions] = useState(existingOptions); + + // Update options when connector config changes + useEffect(() => { + const options = + (connector.config?.gmail_options as GmailIndexingOptions | undefined) || + DEFAULT_GMAIL_OPTIONS; + setGmailOptions(options); + }, [connector.config]); + + const updateConfig = (options: GmailIndexingOptions) => { + if (onConfigChange) { + onConfigChange({ + ...connector.config, + gmail_options: options, + }); + } + }; + + const handleOptionChange = (key: keyof GmailIndexingOptions, value: number | string) => { + const newOptions = { ...gmailOptions, [key]: value }; + setGmailOptions(newOptions); + updateConfig(newOptions); + }; + + // Only show configuration if the connector is indexable + if (!isIndexable) { + return
; + } + + return ( +
+ {/* Gmail Indexing Options */} +
+
+
+ +

Gmail Indexing Options

+
+

+ Configure how emails are indexed from your Gmail account. +

+
+ + {/* Max emails to index */} +
+
+
+ +

+ Maximum number of emails to index per sync +

+
+ +
+
+ + {/* Label filter */} +
+
+
+ + +
+

+ Only index emails with this label (e.g., "INBOX", "IMPORTANT", "work") +

+
+ handleOptionChange("label_filter", e.target.value)} + placeholder="Enter label name..." + className="bg-slate-400/5 dark:bg-slate-400/5 border-slate-400/20 text-xs sm:text-sm" + /> +
+ + {/* Search query */} +
+
+ +

+ Gmail search query to filter emails (e.g., "from:boss@company.com", "has:attachment") +

+
+ handleOptionChange("search_query", e.target.value)} + placeholder="Enter Gmail search query..." + className="bg-slate-400/5 dark:bg-slate-400/5 border-slate-400/20 text-xs sm:text-sm" + /> +
+
+
+ ); +}; + diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/index.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/index.tsx index 1a713a5a0..6b4d86b5a 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/index.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/index.tsx @@ -6,7 +6,9 @@ import { BaiduSearchApiConfig } from "./components/baidu-search-api-config"; import { BookStackConfig } from "./components/bookstack-config"; import { CirclebackConfig } from "./components/circleback-config"; import { ClickUpConfig } from "./components/clickup-config"; -import { ComposioConfig } from "./components/composio-config"; +import { ComposioCalendarConfig } from "./components/composio-calendar-config"; +import { ComposioDriveConfig } from "./components/composio-drive-config"; +import { ComposioGmailConfig } from "./components/composio-gmail-config"; import { ConfluenceConfig } from "./components/confluence-config"; import { DiscordConfig } from "./components/discord-config"; import { ElasticsearchConfig } from "./components/elasticsearch-config"; @@ -78,9 +80,11 @@ export function getConnectorConfigComponent( case "OBSIDIAN_CONNECTOR": return ObsidianConfig; case "COMPOSIO_GOOGLE_DRIVE_CONNECTOR": + return ComposioDriveConfig; case "COMPOSIO_GMAIL_CONNECTOR": + return ComposioGmailConfig; case "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR": - return ComposioConfig; + return ComposioCalendarConfig; // OAuth connectors (Gmail, Calendar, Airtable, Notion) and others don't need special config UI default: return null; diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/connector-edit-view.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/connector-edit-view.tsx index fbdffed7a..6b1a8c92b 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/connector-edit-view.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/connector-edit-view.tsx @@ -206,8 +206,9 @@ export const ConnectorEditView: FC = ({ {/* Date range selector and periodic sync - only shown for indexable connectors */} {connector.is_indexable && ( <> - {/* Date range selector - not shown for Google Drive, Webcrawler, or GitHub (indexes full repo snapshots) */} + {/* Date range selector - not shown for Google Drive (regular and Composio), Webcrawler, or GitHub (indexes full repo snapshots) */} {connector.connector_type !== "GOOGLE_DRIVE_CONNECTOR" && + connector.connector_type !== "COMPOSIO_GOOGLE_DRIVE_CONNECTOR" && connector.connector_type !== "WEBCRAWLER_CONNECTOR" && connector.connector_type !== "GITHUB_CONNECTOR" && ( = ({ onEndDateChange={onEndDateChange} allowFutureDates={ connector.connector_type === "GOOGLE_CALENDAR_CONNECTOR" || + connector.connector_type === "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR" || connector.connector_type === "LUMA_CONNECTOR" } /> diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/indexing-configuration-view.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/indexing-configuration-view.tsx index 68fc688c3..17995fdfa 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/indexing-configuration-view.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/indexing-configuration-view.tsx @@ -9,11 +9,7 @@ import { getConnectorTypeDisplay } from "@/lib/connectors/utils"; import { cn } from "@/lib/utils"; import { DateRangeSelector } from "../../components/date-range-selector"; import { PeriodicSyncConfig } from "../../components/periodic-sync-config"; -import { - COMPOSIO_CONNECTORS, - type IndexingConfigState, - OAUTH_CONNECTORS, -} from "../../constants/connector-constants"; +import type { IndexingConfigState } from "../../constants/connector-constants"; import { getConnectorDisplayName } from "../../tabs/all-connectors-tab"; import { getConnectorConfigComponent } from "../index"; @@ -95,11 +91,6 @@ export const IndexingConfigurationView: FC = ({ }; }, [checkScrollState]); - // Check both OAUTH_CONNECTORS and COMPOSIO_CONNECTORS - const authConnector = - OAUTH_CONNECTORS.find((c) => c.connectorType === connector?.connector_type) || - COMPOSIO_CONNECTORS.find((c) => c.connectorType === connector?.connector_type); - return (
{/* Fixed Header */} @@ -158,8 +149,9 @@ export const IndexingConfigurationView: FC = ({ {/* Date range selector and periodic sync - only shown for indexable connectors */} {connector?.is_indexable && ( <> - {/* Date range selector - not shown for Google Drive, Webcrawler, or GitHub (indexes full repo snapshots) */} + {/* Date range selector - not shown for Google Drive (regular and Composio), Webcrawler, or GitHub (indexes full repo snapshots) */} {config.connectorType !== "GOOGLE_DRIVE_CONNECTOR" && + config.connectorType !== "COMPOSIO_GOOGLE_DRIVE_CONNECTOR" && config.connectorType !== "WEBCRAWLER_CONNECTOR" && config.connectorType !== "GITHUB_CONNECTOR" && ( = ({ onEndDateChange={onEndDateChange} allowFutureDates={ config.connectorType === "GOOGLE_CALENDAR_CONNECTOR" || + config.connectorType === "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR" || config.connectorType === "LUMA_CONNECTOR" } /> )} - {/* Periodic sync - not shown for Google Drive */} - {config.connectorType !== "GOOGLE_DRIVE_CONNECTOR" && ( + {/* Periodic sync - not shown for Google Drive (regular and Composio) */} + {config.connectorType !== "GOOGLE_DRIVE_CONNECTOR" && + config.connectorType !== "COMPOSIO_GOOGLE_DRIVE_CONNECTOR" && ( { if ( params.success === "true" && - params.connector && searchSpaceId && params.modal === "connectors" ) { - const oauthConnector = OAUTH_CONNECTORS.find((c) => c.id === params.connector); - if (oauthConnector) { - refetchAllConnectors().then((result) => { - if (!result.data) return; + refetchAllConnectors().then((result) => { + if (!result.data) return; - let newConnector: SearchSourceConnector | undefined; - if (params.connectorId) { - const connectorId = parseInt(params.connectorId, 10); - newConnector = result.data.find((c: SearchSourceConnector) => c.id === connectorId); - } else { + let newConnector: SearchSourceConnector | undefined; + let oauthConnector: + | (typeof OAUTH_CONNECTORS)[number] + | (typeof COMPOSIO_CONNECTORS)[number] + | undefined; + + // First, try to find connector by connectorId if provided + if (params.connectorId) { + const connectorId = parseInt(params.connectorId, 10); + newConnector = result.data.find((c: SearchSourceConnector) => c.id === connectorId); + + // If we found the connector, find the matching OAuth/Composio connector by type + if (newConnector) { + oauthConnector = + OAUTH_CONNECTORS.find( + (c) => c.connectorType === newConnector!.connector_type + ) || + COMPOSIO_CONNECTORS.find( + (c) => c.connectorType === newConnector!.connector_type + ); + } + } + + // If we don't have a connector yet, try to find by connector param + if (!newConnector && params.connector) { + oauthConnector = + OAUTH_CONNECTORS.find((c) => c.id === params.connector) || + COMPOSIO_CONNECTORS.find((c) => c.id === params.connector); + + if (oauthConnector) { newConnector = result.data.find( - (c: SearchSourceConnector) => c.connector_type === oauthConnector.connectorType + (c: SearchSourceConnector) => c.connector_type === oauthConnector!.connectorType ); } + } - if (newConnector) { - const connectorValidation = searchSourceConnector.safeParse(newConnector); - if (connectorValidation.success) { - // Track connector connected event for OAuth connectors - trackConnectorConnected( - Number(searchSpaceId), - oauthConnector.connectorType, - newConnector.id - ); + if (newConnector && oauthConnector) { + const connectorValidation = searchSourceConnector.safeParse(newConnector); + if (connectorValidation.success) { + // Track connector connected event for OAuth/Composio connectors + trackConnectorConnected( + Number(searchSpaceId), + oauthConnector.connectorType, + newConnector.id + ); - const config = validateIndexingConfigState({ - connectorType: oauthConnector.connectorType, - connectorId: newConnector.id, - connectorTitle: oauthConnector.title, - }); - setIndexingConfig(config); - setIndexingConnector(newConnector); - setIndexingConnectorConfig(newConnector.config); - setIsOpen(true); - const url = new URL(window.location.href); - url.searchParams.delete("success"); - url.searchParams.set("connectorId", newConnector.id.toString()); - url.searchParams.set("view", "configure"); - window.history.replaceState({}, "", url.toString()); - } else { - console.warn("Invalid connector data after OAuth:", connectorValidation.error); - toast.error("Failed to validate connector data"); - } + const config = validateIndexingConfigState({ + connectorType: oauthConnector.connectorType, + connectorId: newConnector.id, + connectorTitle: oauthConnector.title, + }); + setIndexingConfig(config); + setIndexingConnector(newConnector); + setIndexingConnectorConfig(newConnector.config); + setIsOpen(true); + const url = new URL(window.location.href); + url.searchParams.delete("success"); + url.searchParams.set("connectorId", newConnector.id.toString()); + url.searchParams.set("view", "configure"); + window.history.replaceState({}, "", url.toString()); + } else { + console.warn("Invalid connector data after OAuth:", connectorValidation.error); + toast.error("Failed to validate connector data"); } - }); - } + } + }); } } catch (error) { // Invalid query params - log but don't crash @@ -863,9 +885,10 @@ export const useConnectorDialog = () => { async (refreshConnectors: () => void) => { if (!indexingConfig || !searchSpaceId) return; - // Validate date range (skip for Google Drive and Webcrawler) + // Validate date range (skip for Google Drive, Composio Drive, and Webcrawler) if ( indexingConfig.connectorType !== "GOOGLE_DRIVE_CONNECTOR" && + indexingConfig.connectorType !== "COMPOSIO_GOOGLE_DRIVE_CONNECTOR" && indexingConfig.connectorType !== "WEBCRAWLER_CONNECTOR" ) { const dateRangeValidation = dateRangeSchema.safeParse({ startDate, endDate }); @@ -910,8 +933,12 @@ export const useConnectorDialog = () => { }); } - // Handle Google Drive folder selection - if (indexingConfig.connectorType === "GOOGLE_DRIVE_CONNECTOR" && indexingConnectorConfig) { + // Handle Google Drive folder selection (regular and Composio) + if ( + (indexingConfig.connectorType === "GOOGLE_DRIVE_CONNECTOR" || + indexingConfig.connectorType === "COMPOSIO_GOOGLE_DRIVE_CONNECTOR") && + indexingConnectorConfig + ) { const selectedFolders = indexingConnectorConfig.selected_folders as | Array<{ id: string; name: string }> | undefined; diff --git a/surfsense_web/lib/connectors/utils.ts b/surfsense_web/lib/connectors/utils.ts index 34721a6aa..0ca1c1ea9 100644 --- a/surfsense_web/lib/connectors/utils.ts +++ b/surfsense_web/lib/connectors/utils.ts @@ -16,6 +16,9 @@ export const getConnectorTypeDisplay = (type: string): string => { GOOGLE_CALENDAR_CONNECTOR: "Google Calendar", GOOGLE_GMAIL_CONNECTOR: "Google Gmail", GOOGLE_DRIVE_CONNECTOR: "Google Drive", + COMPOSIO_GOOGLE_DRIVE_CONNECTOR: "Google Drive", + COMPOSIO_GMAIL_CONNECTOR: "Gmail", + COMPOSIO_GOOGLE_CALENDAR_CONNECTOR: "Google Calendar", AIRTABLE_CONNECTOR: "Airtable", LUMA_CONNECTOR: "Luma", ELASTICSEARCH_CONNECTOR: "Elasticsearch",