diff --git a/surfsense_backend/app/connectors/composio_connector.py b/surfsense_backend/app/connectors/composio_connector.py index b49988887..8cb91355d 100644 --- a/surfsense_backend/app/connectors/composio_connector.py +++ b/surfsense_backend/app/connectors/composio_connector.py @@ -146,6 +146,55 @@ class ComposioConnector: file_id=file_id, ) + async def get_drive_start_page_token(self) -> tuple[str | None, str | None]: + """ + Get the starting page token for Google Drive change tracking. + + Returns: + Tuple of (start_page_token, error message). + """ + connected_account_id = await self.get_connected_account_id() + if not connected_account_id: + return None, "No connected account ID found" + + entity_id = await self.get_entity_id() + service = await self._get_service() + return await service.get_drive_start_page_token( + connected_account_id=connected_account_id, + entity_id=entity_id, + ) + + async def list_drive_changes( + self, + page_token: str | None = None, + page_size: int = 100, + include_removed: bool = True, + ) -> tuple[list[dict[str, Any]], str | None, str | None]: + """ + List changes in Google Drive since the given page token. + + Args: + page_token: Page token from previous sync (optional). + page_size: Number of changes per page. + include_removed: Whether to include removed items. + + Returns: + Tuple of (changes list, new_start_page_token, error message). + """ + connected_account_id = await self.get_connected_account_id() + if not connected_account_id: + return [], None, "No connected account ID found" + + entity_id = await self.get_entity_id() + service = await self._get_service() + return await service.list_drive_changes( + connected_account_id=connected_account_id, + entity_id=entity_id, + page_token=page_token, + page_size=page_size, + include_removed=include_removed, + ) + # ===== Gmail Methods ===== async def list_gmail_messages( diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index db1b884e0..82f452c61 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -2288,8 +2288,8 @@ async def run_composio_indexing( connector_id: int, search_space_id: int, user_id: str, - start_date: str, - end_date: str, + start_date: str | None, + end_date: str | None, ): """ Run Composio connector indexing with real-time notifications. diff --git a/surfsense_backend/app/services/composio_service.py b/surfsense_backend/app/services/composio_service.py index 0d6189cd9..3810f03a4 100644 --- a/surfsense_backend/app/services/composio_service.py +++ b/surfsense_backend/app/services/composio_service.py @@ -57,17 +57,30 @@ TOOLKIT_TO_DOCUMENT_TYPE = { class ComposioService: """Service for interacting with Composio API.""" - def __init__(self, api_key: str | None = None): + # Default download directory for files from Composio + DEFAULT_DOWNLOAD_DIR = "/tmp/composio_downloads" + + def __init__(self, api_key: str | None = None, file_download_dir: str | None = None): """ Initialize the Composio service. Args: api_key: Composio API key. If not provided, uses config.COMPOSIO_API_KEY. + file_download_dir: Directory for downloaded files. Defaults to /tmp/composio_downloads. """ + import os + self.api_key = api_key or config.COMPOSIO_API_KEY if not self.api_key: raise ValueError("COMPOSIO_API_KEY is required but not configured") - self.client = Composio(api_key=self.api_key) + + # Set up download directory + self.file_download_dir = file_download_dir or self.DEFAULT_DOWNLOAD_DIR + os.makedirs(self.file_download_dir, exist_ok=True) + + # Initialize Composio client with download directory + # Per docs: file_download_dir configures where files are downloaded + self.client = Composio(api_key=self.api_key, file_download_dir=self.file_download_dir) @staticmethod def is_enabled() -> bool: @@ -465,6 +478,10 @@ class ComposioService: """ Download file content from Google Drive via Composio. + Per Composio docs: When tools return files, they are automatically downloaded + to a local directory, and the local file path is provided in the response. + Response includes: file_path, file_name, size fields. + Args: connected_account_id: Composio connected account ID. entity_id: The entity/user ID that owns the connected account. @@ -473,11 +490,13 @@ class ComposioService: Returns: Tuple of (file content bytes, error message). """ + from pathlib import Path + try: result = await self.execute_tool( connected_account_id=connected_account_id, tool_name="GOOGLEDRIVE_DOWNLOAD_FILE", - params={"file_id": file_id}, # snake_case + params={"file_id": file_id}, entity_id=entity_id, ) @@ -485,100 +504,234 @@ class ComposioService: return None, result.get("error", "Unknown error") data = result.get("data") - - # Composio GOOGLEDRIVE_DOWNLOAD_FILE returns a dict with file info - # The actual content is in "downloaded_file_content" field - if isinstance(data, dict): - # Try known Composio response fields in order of preference - content = None - - # Primary field from GOOGLEDRIVE_DOWNLOAD_FILE - if "downloaded_file_content" in data: - content = data["downloaded_file_content"] - # downloaded_file_content might itself be a dict with the actual content inside - if isinstance(content, dict): - # Try to extract actual content from nested dict - # Note: Composio nests downloaded_file_content inside another downloaded_file_content - actual_content = ( - content.get("downloaded_file_content") - or content.get("content") - or content.get("data") - or content.get("file_content") - or content.get("body") - or content.get("text") - ) - if actual_content is not None: - content = actual_content - else: - # Log structure for debugging - logger.warning( - f"downloaded_file_content is dict with keys: {list(content.keys())}" - ) - return ( - None, - f"Cannot extract content from downloaded_file_content. Keys: {list(content.keys())}", - ) - # Fallback fields for compatibility - elif "content" in data: - content = data["content"] - elif "file_content" in data: - content = data["file_content"] - elif "data" in data: - content = data["data"] - - if content is None: - # Log available keys for debugging - logger.warning(f"Composio response dict keys: {list(data.keys())}") - return ( - None, - f"No file content found in Composio response. Available keys: {list(data.keys())}", - ) - - # Convert content to bytes - if isinstance(content, str): - # Check if it's base64 encoded - import base64 - - try: - # Try to decode as base64 first - content = base64.b64decode(content) - except Exception: - # If not base64, encode as UTF-8 - content = content.encode("utf-8") - elif isinstance(content, bytes): - pass # Already bytes - elif isinstance(content, dict): - # Still a dict after all extraction attempts - log structure - logger.warning( - f"Content still dict after extraction: {list(content.keys())}" - ) - return ( - None, - f"Unexpected nested content structure: {list(content.keys())}", - ) - else: - return ( - None, - f"Unexpected content type in Composio response: {type(content).__name__}", - ) - - return content, None - elif isinstance(data, str): - return data.encode("utf-8"), None - elif isinstance(data, bytes): - return data, None - elif data is None: + if not data: return None, "No data returned from Composio" - else: - return ( - None, - f"Unexpected data type from Composio: {type(data).__name__}", + + # Per Composio docs, response includes file_path where file was downloaded + # Response structure: {data: {...}, error: ..., successful: ...} + # The actual file info is nested inside data["data"] + file_path = None + + if isinstance(data, dict): + # Handle nested response structure: data contains {data, error, successful} + # The actual file info is in data["data"] + inner_data = data + if "data" in data and isinstance(data["data"], dict): + inner_data = data["data"] + logger.debug(f"Found nested data structure. Inner keys: {list(inner_data.keys())}") + elif "successful" in data and "data" in data: + # Standard Composio response wrapper + inner_data = data["data"] if data["data"] else data + + # Try documented fields: file_path, downloaded_file_content, path, uri + file_path = ( + inner_data.get("file_path") or + inner_data.get("downloaded_file_content") or + inner_data.get("path") or + inner_data.get("uri") ) + + # Handle nested dict case where downloaded_file_content contains the path + if isinstance(file_path, dict): + file_path = ( + file_path.get("file_path") or + file_path.get("downloaded_file_content") or + file_path.get("path") or + file_path.get("uri") + ) + + # If still no path, check if inner_data itself has the nested structure + if not file_path and isinstance(inner_data, dict): + for key in ["downloaded_file_content", "file_path", "path", "uri"]: + if key in inner_data: + val = inner_data[key] + if isinstance(val, str): + file_path = val + break + elif isinstance(val, dict): + # One more level of nesting + file_path = ( + val.get("file_path") or + val.get("downloaded_file_content") or + val.get("path") or + val.get("uri") + ) + if file_path: + break + + logger.debug(f"Composio response keys: {list(data.keys())}, inner keys: {list(inner_data.keys()) if isinstance(inner_data, dict) else 'N/A'}, extracted path: {file_path}") + elif isinstance(data, str): + # Direct string response (could be path or content) + file_path = data + elif isinstance(data, bytes): + # Direct bytes response + return data, None + + # Read file from the path + if file_path and isinstance(file_path, str): + path_obj = Path(file_path) + + # Check if it's a valid file path (absolute or in .composio directory) + if path_obj.is_absolute() or '.composio' in str(path_obj): + try: + if path_obj.exists(): + content = path_obj.read_bytes() + logger.info(f"Successfully read {len(content)} bytes from Composio file: {file_path}") + return content, None + else: + logger.warning(f"File path from Composio does not exist: {file_path}") + return None, f"File not found at path: {file_path}" + except Exception as e: + logger.error(f"Failed to read file from Composio path {file_path}: {e!s}") + return None, f"Failed to read file: {e!s}" + else: + # Not a file path - might be base64 encoded content + try: + import base64 + content = base64.b64decode(file_path) + return content, None + except Exception: + # Not base64, return as UTF-8 bytes + return file_path.encode("utf-8"), None + + # If we got here, couldn't extract file path + if isinstance(data, dict): + # Log full structure for debugging + inner_data = data.get("data", {}) + logger.warning( + f"Could not extract file path from Composio response. " + f"Top keys: {list(data.keys())}, " + f"Inner data keys: {list(inner_data.keys()) if isinstance(inner_data, dict) else type(inner_data).__name__}, " + f"Full inner data: {inner_data}" + ) + return None, f"No file path in Composio response. Keys: {list(data.keys())}, inner: {list(inner_data.keys()) if isinstance(inner_data, dict) else 'N/A'}" + + return None, f"Unexpected data type from Composio: {type(data).__name__}" except Exception as e: logger.error(f"Failed to get Drive file content: {e!s}") return None, str(e) + async def get_drive_start_page_token( + self, connected_account_id: str, entity_id: str + ) -> tuple[str | None, str | None]: + """ + Get the starting page token for Google Drive change tracking. + + This token represents the current state and is used for future delta syncs. + Per Composio docs: Use GOOGLEDRIVE_GET_CHANGES_START_PAGE_TOKEN to get initial token. + + Args: + connected_account_id: Composio connected account ID. + entity_id: The entity/user ID that owns the connected account. + + Returns: + Tuple of (start_page_token, error message). + """ + try: + result = await self.execute_tool( + connected_account_id=connected_account_id, + tool_name="GOOGLEDRIVE_GET_CHANGES_START_PAGE_TOKEN", + params={}, + entity_id=entity_id, + ) + + if not result.get("success"): + return None, result.get("error", "Unknown error") + + data = result.get("data", {}) + # Handle nested response: {data: {startPageToken: ...}, successful: ...} + if isinstance(data, dict): + inner_data = data.get("data", data) + token = ( + inner_data.get("startPageToken") or + inner_data.get("start_page_token") or + data.get("startPageToken") or + data.get("start_page_token") + ) + if token: + logger.info(f"Got Drive start page token: {token}") + return token, None + + logger.warning(f"Could not extract start page token from response: {data}") + return None, "No start page token in response" + + except Exception as e: + logger.error(f"Failed to get Drive start page token: {e!s}") + return None, str(e) + + async def list_drive_changes( + self, + connected_account_id: str, + entity_id: str, + page_token: str | None = None, + page_size: int = 100, + include_removed: bool = True, + ) -> tuple[list[dict[str, Any]], str | None, str | None]: + """ + List changes in Google Drive since the given page token. + + Per Composio docs: GOOGLEDRIVE_LIST_CHANGES tracks modifications to files/folders. + If pageToken is not provided, it auto-fetches the current start page token. + Response includes nextPageToken for pagination and newStartPageToken for future syncs. + + Args: + connected_account_id: Composio connected account ID. + entity_id: The entity/user ID that owns the connected account. + page_token: Page token from previous sync (optional - will auto-fetch if not provided). + page_size: Number of changes per page. + include_removed: Whether to include removed items in the response. + + Returns: + Tuple of (changes list, new_start_page_token, error message). + """ + try: + params = { + "pageSize": min(page_size, 100), + "includeRemoved": include_removed, + } + if page_token: + params["pageToken"] = page_token + + result = await self.execute_tool( + connected_account_id=connected_account_id, + tool_name="GOOGLEDRIVE_LIST_CHANGES", + params=params, + entity_id=entity_id, + ) + + if not result.get("success"): + return [], None, result.get("error", "Unknown error") + + data = result.get("data", {}) + + # Handle nested response structure + changes = [] + new_start_token = None + + if isinstance(data, dict): + inner_data = data.get("data", data) + changes = inner_data.get("changes", []) or data.get("changes", []) + + # Get the token for next sync + # newStartPageToken is returned when all changes have been fetched + # nextPageToken is for pagination within the current fetch + new_start_token = ( + inner_data.get("newStartPageToken") or + inner_data.get("new_start_page_token") or + inner_data.get("nextPageToken") or + inner_data.get("next_page_token") or + data.get("newStartPageToken") or + data.get("nextPageToken") + ) + + logger.info(f"Got {len(changes)} Drive changes, new token: {new_start_token[:20] if new_start_token else 'None'}...") + return changes, new_start_token, None + + except Exception as e: + logger.error(f"Failed to list Drive changes: {e!s}") + return [], None, str(e) + # ===== Gmail specific methods ===== async def get_gmail_messages( diff --git a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py index 81cafaa2c..d0710d246 100644 --- a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py @@ -810,8 +810,8 @@ def index_composio_connector_task( connector_id: int, search_space_id: int, user_id: str, - start_date: str, - end_date: str, + start_date: str | None, + end_date: str | None, ): """Celery task to index Composio connector content (Google Drive, Gmail, Calendar via Composio).""" import asyncio @@ -833,8 +833,8 @@ async def _index_composio_connector( connector_id: int, search_space_id: int, user_id: str, - start_date: str, - end_date: str, + start_date: str | None, + end_date: str | None, ): """Index Composio connector content with new session and real-time notifications.""" # Import from routes to use the notification-wrapped version diff --git a/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py b/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py index 21855f73f..bf80cbe78 100644 --- a/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py +++ b/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py @@ -66,6 +66,7 @@ async def _check_and_trigger_schedules(): from app.tasks.celery_tasks.connector_tasks import ( index_airtable_records_task, index_clickup_tasks_task, + index_composio_connector_task, index_confluence_pages_task, index_crawled_urls_task, index_discord_messages_task, @@ -98,6 +99,10 @@ async def _check_and_trigger_schedules(): SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task, SearchSourceConnectorType.WEBCRAWLER_CONNECTOR: index_crawled_urls_task, SearchSourceConnectorType.GOOGLE_DRIVE_CONNECTOR: index_google_drive_files_task, + # Composio connector types + SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR: index_composio_connector_task, + SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR: index_composio_connector_task, + SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR: index_composio_connector_task, } # Trigger indexing for each due connector diff --git a/surfsense_backend/app/tasks/composio_indexer.py b/surfsense_backend/app/tasks/composio_indexer.py index e5c8b701e..3eed8470e 100644 --- a/surfsense_backend/app/tasks/composio_indexer.py +++ b/surfsense_backend/app/tasks/composio_indexer.py @@ -561,8 +561,12 @@ async def _index_composio_google_drive( update_last_indexed: bool = True, max_items: int = 1000, ) -> tuple[int, str]: - """Index Google Drive files via Composio. + """Index Google Drive files via Composio with delta sync support. + Delta Sync Flow: + 1. First sync: Full scan + get initial page token + 2. Subsequent syncs: Use LIST_CHANGES to process only changed files + Supports folder/file selection via connector config: - selected_folders: List of {id, name} for folders to index - selected_files: List of {id, name} for individual files to index @@ -576,354 +580,88 @@ async def _index_composio_google_drive( selected_folders = connector_config.get("selected_folders", []) selected_files = connector_config.get("selected_files", []) indexing_options = connector_config.get("indexing_options", {}) + + # Check for stored page token for delta sync + stored_page_token = connector_config.get("drive_page_token") + use_delta_sync = stored_page_token and connector.last_indexed_at max_files_per_folder = indexing_options.get("max_files_per_folder", 100) include_subfolders = indexing_options.get("include_subfolders", True) - await task_logger.log_task_progress( - log_entry, - f"Fetching Google Drive files via Composio for connector {connector_id}", - { - "stage": "fetching_files", - "selected_folders": len(selected_folders), - "selected_files": len(selected_files), - }, - ) - - all_files = [] - - # If specific folders/files are selected, fetch from those - if selected_folders or selected_files: - # Fetch files from selected folders - for folder in selected_folders: - folder_id = folder.get("id") - folder_name = folder.get("name", "Unknown") - - if not folder_id: - continue - - # Handle special case for "root" folder - actual_folder_id = None if folder_id == "root" else folder_id - - logger.info(f"Fetching files from folder: {folder_name} ({folder_id})") - - # Fetch files from this folder - folder_files = [] - page_token = None - - while len(folder_files) < max_files_per_folder: - ( - files, - next_token, - error, - ) = await composio_connector.list_drive_files( - folder_id=actual_folder_id, - page_token=page_token, - page_size=min(100, max_files_per_folder - len(folder_files)), - ) - - if error: - logger.warning( - f"Failed to fetch files from folder {folder_name}: {error}" - ) - break - - # Process files - for file_info in files: - mime_type = file_info.get("mimeType", "") or file_info.get( - "mime_type", "" - ) - - # If it's a folder and include_subfolders is enabled, recursively fetch - if mime_type == "application/vnd.google-apps.folder": - if include_subfolders: - # Add subfolder files recursively - subfolder_files = await _fetch_folder_files_recursively( - composio_connector, - file_info.get("id"), - max_files=max_files_per_folder, - current_count=len(folder_files), - ) - folder_files.extend(subfolder_files) - else: - folder_files.append(file_info) - - if not next_token: - break - page_token = next_token - - all_files.extend(folder_files[:max_files_per_folder]) - logger.info(f"Found {len(folder_files)} files in folder {folder_name}") - - # Add specifically selected files - for selected_file in selected_files: - file_id = selected_file.get("id") - file_name = selected_file.get("name", "Unknown") - - if not file_id: - continue - - # Add file info (we'll fetch content later during indexing) - all_files.append( - { - "id": file_id, - "name": file_name, - "mimeType": "", # Will be determined later - } - ) - else: - # No selection specified - fetch all files (original behavior) - page_token = None - - while len(all_files) < max_items: - files, next_token, error = await composio_connector.list_drive_files( - page_token=page_token, - page_size=min(100, max_items - len(all_files)), - ) - - if error: - await task_logger.log_task_failure( - log_entry, f"Failed to fetch Drive files: {error}", {} - ) - return 0, f"Failed to fetch Drive files: {error}" - - all_files.extend(files) - - if not next_token: - break - page_token = next_token - - if not all_files: - success_msg = "No Google Drive files found" - await task_logger.log_task_success( - log_entry, success_msg, {"files_count": 0} + # Route to delta sync or full scan + if use_delta_sync: + logger.info(f"Using delta sync for Composio Google Drive connector {connector_id}") + await task_logger.log_task_progress( + log_entry, + f"Starting delta sync for Google Drive via Composio (connector {connector_id})", + {"stage": "delta_sync", "token": stored_page_token[:20] + "..."}, + ) + + documents_indexed, documents_skipped, processing_errors = await _index_composio_drive_delta_sync( + session=session, + composio_connector=composio_connector, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + page_token=stored_page_token, + max_items=max_items, + task_logger=task_logger, + log_entry=log_entry, + ) + else: + logger.info(f"Using full scan for Composio Google Drive connector {connector_id} (first sync or no token)") + await task_logger.log_task_progress( + log_entry, + f"Fetching Google Drive files via Composio for connector {connector_id}", + { + "stage": "full_scan", + "selected_folders": len(selected_folders), + "selected_files": len(selected_files), + }, + ) + + documents_indexed, documents_skipped, processing_errors = await _index_composio_drive_full_scan( + session=session, + composio_connector=composio_connector, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + selected_folders=selected_folders, + selected_files=selected_files, + max_files_per_folder=max_files_per_folder, + include_subfolders=include_subfolders, + max_items=max_items, + task_logger=task_logger, + log_entry=log_entry, ) - # CRITICAL: Update timestamp even when no files found so Electric SQL syncs and UI shows indexed status - await update_connector_last_indexed(session, connector, update_last_indexed) - await session.commit() - return ( - 0, - None, - ) # Return None (not error) when no items found - this is success with 0 items - logger.info(f"Found {len(all_files)} Google Drive files to index via Composio") + # Get new page token for next sync (always update after successful sync) + new_token, token_error = await composio_connector.get_drive_start_page_token() + if new_token and not token_error: + from sqlalchemy.orm.attributes import flag_modified + + # Refresh connector to avoid stale state + await session.refresh(connector) + + if not connector.config: + connector.config = {} + connector.config["drive_page_token"] = new_token + flag_modified(connector, "config") + logger.info(f"Updated drive_page_token for connector {connector_id}") + elif token_error: + logger.warning(f"Failed to get new page token: {token_error}") - documents_indexed = 0 - documents_skipped = 0 - processing_errors = [] - - for file_info in all_files: - try: - # Handle both standard Google API and potential Composio variations - file_id = file_info.get("id", "") or file_info.get("fileId", "") - file_name = ( - file_info.get("name", "") - or file_info.get("fileName", "") - or "Untitled" - ) - mime_type = file_info.get("mimeType", "") or file_info.get( - "mime_type", "" - ) - - if not file_id: - documents_skipped += 1 - continue - - # Skip folders - if mime_type == "application/vnd.google-apps.folder": - continue - - # Generate unique identifier hash - document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]) - unique_identifier_hash = generate_unique_identifier_hash( - document_type, f"drive_{file_id}", search_space_id - ) - - # Check if document exists - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash - ) - - # Get file content - ( - content, - content_error, - ) = await composio_connector.get_drive_file_content(file_id) - - if content_error or not content: - logger.warning( - f"Could not get content for file {file_name}: {content_error}" - ) - # Use metadata as content fallback - markdown_content = f"# {file_name}\n\n" - markdown_content += f"**File ID:** {file_id}\n" - markdown_content += f"**Type:** {mime_type}\n" - elif isinstance(content, dict): - # Safety check: if content is still a dict, log error and use fallback - error_msg = f"Unexpected dict content format for file {file_name}: {list(content.keys())}" - logger.error(error_msg) - processing_errors.append(error_msg) - markdown_content = f"# {file_name}\n\n" - markdown_content += f"**File ID:** {file_id}\n" - markdown_content += f"**Type:** {mime_type}\n" - else: - # Process content based on file type - markdown_content = await _process_file_content( - content=content, - file_name=file_name, - file_id=file_id, - mime_type=mime_type, - search_space_id=search_space_id, - user_id=user_id, - session=session, - task_logger=task_logger, - log_entry=log_entry, - processing_errors=processing_errors, - ) - - content_hash = generate_content_hash(markdown_content, search_space_id) - - if existing_document: - if existing_document.content_hash == content_hash: - documents_skipped += 1 - continue - - # Update existing document - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "file_id": file_id, - "file_name": file_name, - "mime_type": mime_type, - "document_type": "Google Drive File (Composio)", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - markdown_content, user_llm, document_metadata - ) - else: - summary_content = ( - f"Google Drive File: {file_name}\n\nType: {mime_type}" - ) - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) - - chunks = await create_document_chunks(markdown_content) - - existing_document.title = f"Drive: {file_name}" - existing_document.content = summary_content - existing_document.content_hash = content_hash - existing_document.embedding = summary_embedding - existing_document.document_metadata = { - "file_id": file_id, - "file_name": file_name, - "mime_type": mime_type, - "connector_id": connector_id, - "source": "composio", - } - existing_document.chunks = chunks - existing_document.updated_at = get_current_timestamp() - - documents_indexed += 1 - - # Batch commit every 10 documents - if documents_indexed % 10 == 0: - logger.info( - f"Committing batch: {documents_indexed} Google Drive files processed so far" - ) - await session.commit() - continue - - # Create new document - user_llm = await get_user_long_context_llm( - session, user_id, search_space_id - ) - - if user_llm: - document_metadata = { - "file_id": file_id, - "file_name": file_name, - "mime_type": mime_type, - "document_type": "Google Drive File (Composio)", - } - ( - summary_content, - summary_embedding, - ) = await generate_document_summary( - markdown_content, user_llm, document_metadata - ) - else: - summary_content = ( - f"Google Drive File: {file_name}\n\nType: {mime_type}" - ) - summary_embedding = config.embedding_model_instance.embed( - summary_content - ) - - chunks = await create_document_chunks(markdown_content) - - document = Document( - search_space_id=search_space_id, - title=f"Drive: {file_name}", - document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]), - document_metadata={ - "file_id": file_id, - "file_name": file_name, - "mime_type": mime_type, - "connector_id": connector_id, - "toolkit_id": "googledrive", - "source": "composio", - }, - content=summary_content, - content_hash=content_hash, - unique_identifier_hash=unique_identifier_hash, - embedding=summary_embedding, - chunks=chunks, - updated_at=get_current_timestamp(), - ) - session.add(document) - documents_indexed += 1 - - # Batch commit every 10 documents - if documents_indexed % 10 == 0: - logger.info( - f"Committing batch: {documents_indexed} Google Drive files processed so far" - ) - await session.commit() - - except Exception as e: - error_msg = ( - f"Error processing Drive file {file_name or 'unknown'}: {e!s}" - ) - logger.error(error_msg, exc_info=True) - processing_errors.append(error_msg) - documents_skipped += 1 - continue - - # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs - # This ensures the UI shows "Last indexed" instead of "Never indexed" + # CRITICAL: Always update timestamp so Electric SQL syncs and UI shows indexed status await update_connector_last_indexed(session, connector, update_last_indexed) - # Final commit to ensure all documents are persisted (safety net) - # This matches the pattern used in non-Composio Gmail indexer - logger.info( - f"Final commit: Total {documents_indexed} Google Drive files processed" - ) + # Final commit + logger.info(f"Final commit: Total {documents_indexed} Google Drive files processed") await session.commit() - logger.info( - "Successfully committed all Composio Google Drive document changes to database" - ) + logger.info("Successfully committed all Composio Google Drive document changes to database") - # If there were processing errors, return them so notification can show them + # Handle processing errors error_message = None if processing_errors: - # Combine all errors into a single message if len(processing_errors) == 1: error_message = processing_errors[0] else: @@ -934,6 +672,7 @@ async def _index_composio_google_drive( { "documents_indexed": documents_indexed, "documents_skipped": documents_skipped, + "sync_type": "delta" if use_delta_sync else "full", "errors": processing_errors, }, ) @@ -944,6 +683,7 @@ async def _index_composio_google_drive( { "documents_indexed": documents_indexed, "documents_skipped": documents_skipped, + "sync_type": "delta" if use_delta_sync else "full", }, ) @@ -954,6 +694,469 @@ async def _index_composio_google_drive( return 0, f"Failed to index Google Drive via Composio: {e!s}" +async def _index_composio_drive_delta_sync( + session: AsyncSession, + composio_connector: ComposioConnector, + connector_id: int, + search_space_id: int, + user_id: str, + page_token: str, + max_items: int, + task_logger: TaskLoggingService, + log_entry, +) -> tuple[int, int, list[str]]: + """Index Google Drive files using delta sync (only changed files). + + Uses GOOGLEDRIVE_LIST_CHANGES to fetch only files that changed since last sync. + Handles: new files, modified files, and deleted files. + """ + documents_indexed = 0 + documents_skipped = 0 + processing_errors = [] + + # Fetch all changes with pagination + all_changes = [] + current_token = page_token + + while len(all_changes) < max_items: + changes, next_token, error = await composio_connector.list_drive_changes( + page_token=current_token, + page_size=100, + include_removed=True, + ) + + if error: + logger.error(f"Error fetching Drive changes: {error}") + processing_errors.append(f"Failed to fetch changes: {error}") + break + + all_changes.extend(changes) + + if not next_token or next_token == current_token: + break + current_token = next_token + + if not all_changes: + logger.info("No changes detected since last sync") + return 0, 0, [] + + logger.info(f"Processing {len(all_changes)} changes from delta sync") + + for change in all_changes[:max_items]: + try: + # Handle removed files + is_removed = change.get("removed", False) + file_info = change.get("file", {}) + file_id = change.get("fileId") or file_info.get("id", "") + + if not file_id: + documents_skipped += 1 + continue + + # Check if file was trashed or removed + if is_removed or file_info.get("trashed", False): + # Remove document from database + document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]) + unique_identifier_hash = generate_unique_identifier_hash( + document_type, f"drive_{file_id}", search_space_id + ) + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + if existing_document: + await session.delete(existing_document) + documents_indexed += 1 + logger.info(f"Deleted document for removed/trashed file: {file_id}") + continue + + # Process changed file + file_name = file_info.get("name", "") or "Untitled" + mime_type = file_info.get("mimeType", "") or file_info.get("mime_type", "") + + # Skip folders + if mime_type == "application/vnd.google-apps.folder": + continue + + # Process the file + indexed, skipped, errors = await _process_single_drive_file( + session=session, + composio_connector=composio_connector, + file_id=file_id, + file_name=file_name, + mime_type=mime_type, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + task_logger=task_logger, + log_entry=log_entry, + ) + + documents_indexed += indexed + documents_skipped += skipped + processing_errors.extend(errors) + + # Batch commit every 10 documents + if documents_indexed > 0 and documents_indexed % 10 == 0: + await session.commit() + logger.info(f"Committed batch: {documents_indexed} changes processed") + + except Exception as e: + error_msg = f"Error processing change for file {file_id}: {e!s}" + logger.error(error_msg, exc_info=True) + processing_errors.append(error_msg) + documents_skipped += 1 + + logger.info(f"Delta sync complete: {documents_indexed} indexed, {documents_skipped} skipped") + return documents_indexed, documents_skipped, processing_errors + + +async def _index_composio_drive_full_scan( + session: AsyncSession, + composio_connector: ComposioConnector, + connector_id: int, + search_space_id: int, + user_id: str, + selected_folders: list[dict], + selected_files: list[dict], + max_files_per_folder: int, + include_subfolders: bool, + max_items: int, + task_logger: TaskLoggingService, + log_entry, +) -> tuple[int, int, list[str]]: + """Index Google Drive files using full scan (first sync or when no delta token).""" + documents_indexed = 0 + documents_skipped = 0 + processing_errors = [] + + all_files = [] + + # If specific folders/files are selected, fetch from those + if selected_folders or selected_files: + # Fetch files from selected folders + for folder in selected_folders: + folder_id = folder.get("id") + folder_name = folder.get("name", "Unknown") + + if not folder_id: + continue + + # Handle special case for "root" folder + actual_folder_id = None if folder_id == "root" else folder_id + + logger.info(f"Fetching files from folder: {folder_name} ({folder_id})") + + # Fetch files from this folder + folder_files = [] + page_token = None + + while len(folder_files) < max_files_per_folder: + ( + files, + next_token, + error, + ) = await composio_connector.list_drive_files( + folder_id=actual_folder_id, + page_token=page_token, + page_size=min(100, max_files_per_folder - len(folder_files)), + ) + + if error: + logger.warning( + f"Failed to fetch files from folder {folder_name}: {error}" + ) + break + + # Process files + for file_info in files: + mime_type = file_info.get("mimeType", "") or file_info.get( + "mime_type", "" + ) + + # If it's a folder and include_subfolders is enabled, recursively fetch + if mime_type == "application/vnd.google-apps.folder": + if include_subfolders: + # Add subfolder files recursively + subfolder_files = await _fetch_folder_files_recursively( + composio_connector, + file_info.get("id"), + max_files=max_files_per_folder, + current_count=len(folder_files), + ) + folder_files.extend(subfolder_files) + else: + folder_files.append(file_info) + + if not next_token: + break + page_token = next_token + + all_files.extend(folder_files[:max_files_per_folder]) + logger.info(f"Found {len(folder_files)} files in folder {folder_name}") + + # Add specifically selected files + for selected_file in selected_files: + file_id = selected_file.get("id") + file_name = selected_file.get("name", "Unknown") + + if not file_id: + continue + + # Add file info (we'll fetch content later during indexing) + all_files.append( + { + "id": file_id, + "name": file_name, + "mimeType": "", # Will be determined later + } + ) + else: + # No selection specified - fetch all files (original behavior) + page_token = None + + while len(all_files) < max_items: + files, next_token, error = await composio_connector.list_drive_files( + page_token=page_token, + page_size=min(100, max_items - len(all_files)), + ) + + if error: + return 0, 0, [f"Failed to fetch Drive files: {error}"] + + all_files.extend(files) + + if not next_token: + break + page_token = next_token + + if not all_files: + logger.info("No Google Drive files found") + return 0, 0, [] + + logger.info(f"Found {len(all_files)} Google Drive files to index via Composio (full scan)") + + for file_info in all_files: + try: + # Handle both standard Google API and potential Composio variations + file_id = file_info.get("id", "") or file_info.get("fileId", "") + file_name = ( + file_info.get("name", "") + or file_info.get("fileName", "") + or "Untitled" + ) + mime_type = file_info.get("mimeType", "") or file_info.get( + "mime_type", "" + ) + + if not file_id: + documents_skipped += 1 + continue + + # Skip folders + if mime_type == "application/vnd.google-apps.folder": + continue + + # Process the file + indexed, skipped, errors = await _process_single_drive_file( + session=session, + composio_connector=composio_connector, + file_id=file_id, + file_name=file_name, + mime_type=mime_type, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + task_logger=task_logger, + log_entry=log_entry, + ) + + documents_indexed += indexed + documents_skipped += skipped + processing_errors.extend(errors) + + # Batch commit every 10 documents + if documents_indexed > 0 and documents_indexed % 10 == 0: + logger.info(f"Committing batch: {documents_indexed} Google Drive files processed so far") + await session.commit() + + except Exception as e: + error_msg = f"Error processing Drive file {file_name or 'unknown'}: {e!s}" + logger.error(error_msg, exc_info=True) + processing_errors.append(error_msg) + documents_skipped += 1 + + logger.info(f"Full scan complete: {documents_indexed} indexed, {documents_skipped} skipped") + return documents_indexed, documents_skipped, processing_errors + + +async def _process_single_drive_file( + session: AsyncSession, + composio_connector: ComposioConnector, + file_id: str, + file_name: str, + mime_type: str, + connector_id: int, + search_space_id: int, + user_id: str, + task_logger: TaskLoggingService, + log_entry, +) -> tuple[int, int, list[str]]: + """Process a single Google Drive file for indexing. + + Returns: + Tuple of (documents_indexed, documents_skipped, processing_errors) + """ + processing_errors = [] + + # Generate unique identifier hash + document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]) + unique_identifier_hash = generate_unique_identifier_hash( + document_type, f"drive_{file_id}", search_space_id + ) + + # Check if document exists + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash + ) + + # Get file content + content, content_error = await composio_connector.get_drive_file_content(file_id) + + if content_error or not content: + logger.warning( + f"Could not get content for file {file_name}: {content_error}" + ) + # Use metadata as content fallback + markdown_content = f"# {file_name}\n\n" + markdown_content += f"**File ID:** {file_id}\n" + markdown_content += f"**Type:** {mime_type}\n" + elif isinstance(content, dict): + # Safety check: if content is still a dict, log error and use fallback + error_msg = f"Unexpected dict content format for file {file_name}: {list(content.keys())}" + logger.error(error_msg) + processing_errors.append(error_msg) + markdown_content = f"# {file_name}\n\n" + markdown_content += f"**File ID:** {file_id}\n" + markdown_content += f"**Type:** {mime_type}\n" + else: + # Process content based on file type + markdown_content = await _process_file_content( + content=content, + file_name=file_name, + file_id=file_id, + mime_type=mime_type, + search_space_id=search_space_id, + user_id=user_id, + session=session, + task_logger=task_logger, + log_entry=log_entry, + processing_errors=processing_errors, + ) + + content_hash = generate_content_hash(markdown_content, search_space_id) + + if existing_document: + if existing_document.content_hash == content_hash: + return 0, 1, processing_errors # Skipped + + # Update existing document + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata = { + "file_id": file_id, + "file_name": file_name, + "mime_type": mime_type, + "document_type": "Google Drive File (Composio)", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + markdown_content, user_llm, document_metadata + ) + else: + summary_content = ( + f"Google Drive File: {file_name}\n\nType: {mime_type}" + ) + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + chunks = await create_document_chunks(markdown_content) + + existing_document.title = f"Drive: {file_name}" + existing_document.content = summary_content + existing_document.content_hash = content_hash + existing_document.embedding = summary_embedding + existing_document.document_metadata = { + "file_id": file_id, + "file_name": file_name, + "FILE_NAME": file_name, # For compatibility + "mime_type": mime_type, + "connector_id": connector_id, + "source": "composio", + } + existing_document.chunks = chunks + existing_document.updated_at = get_current_timestamp() + + return 1, 0, processing_errors # Indexed + + # Create new document + user_llm = await get_user_long_context_llm( + session, user_id, search_space_id + ) + + if user_llm: + document_metadata = { + "file_id": file_id, + "file_name": file_name, + "mime_type": mime_type, + "document_type": "Google Drive File (Composio)", + } + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( + markdown_content, user_llm, document_metadata + ) + else: + summary_content = ( + f"Google Drive File: {file_name}\n\nType: {mime_type}" + ) + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) + + chunks = await create_document_chunks(markdown_content) + + document = Document( + search_space_id=search_space_id, + title=f"Drive: {file_name}", + document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]), + document_metadata={ + "file_id": file_id, + "file_name": file_name, + "FILE_NAME": file_name, # For compatibility + "mime_type": mime_type, + "connector_id": connector_id, + "toolkit_id": "googledrive", + "source": "composio", + }, + content=summary_content, + content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, + embedding=summary_embedding, + chunks=chunks, + updated_at=get_current_timestamp(), + ) + session.add(document) + + return 1, 0, processing_errors # Indexed + + async def _fetch_folder_files_recursively( composio_connector: ComposioConnector, folder_id: str, @@ -1271,11 +1474,18 @@ async def _index_composio_gmail( if end_date == "undefined" or end_date == "": end_date = None - # Calculate date range with defaults (uses last_indexed_at or 365 days back) - # This ensures indexing works even when user doesn't specify dates - start_date_str, end_date_str = calculate_date_range( - connector, start_date, end_date, default_days_back=365 - ) + # Use provided dates directly if both are provided, otherwise calculate from last_indexed_at + # This ensures user-selected dates are respected (matching non-Composio Gmail connector behavior) + if start_date is not None and end_date is not None: + # User provided both dates - use them directly + start_date_str = start_date + end_date_str = end_date + else: + # Calculate date range with defaults (uses last_indexed_at or 365 days back) + # This ensures indexing works even when user doesn't specify dates + start_date_str, end_date_str = calculate_date_range( + connector, start_date, end_date, default_days_back=365 + ) # Build query with date range query_parts = [] @@ -1468,11 +1678,18 @@ async def _index_composio_google_calendar( if end_date == "undefined" or end_date == "": end_date = None - # Calculate date range with defaults (uses last_indexed_at or 365 days back) - # This ensures indexing works even when user doesn't specify dates - start_date_str, end_date_str = calculate_date_range( - connector, start_date, end_date, default_days_back=365 - ) + # Use provided dates directly if both are provided, otherwise calculate from last_indexed_at + # This ensures user-selected dates are respected (matching non-Composio Calendar connector behavior) + if start_date is not None and end_date is not None: + # User provided both dates - use them directly + start_date_str = start_date + end_date_str = end_date + else: + # Calculate date range with defaults (uses last_indexed_at or 365 days back) + # This ensures indexing works even when user doesn't specify dates + start_date_str, end_date_str = calculate_date_range( + connector, start_date, end_date, default_days_back=365 + ) # Build time range for API call time_min = f"{start_date_str}T00:00:00Z" diff --git a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/RowActions.tsx b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/RowActions.tsx index d277a84ee..d9a894e5a 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/RowActions.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/RowActions.tsx @@ -209,7 +209,7 @@ export function RowActions({ disabled={isDeleting} className="bg-destructive text-destructive-foreground hover:bg-destructive/90" > - {isDeleting ? "Deleting..." : "Delete"} + {isDeleting ? "Deleting" : "Delete"} diff --git a/surfsense_web/components/assistant-ui/connector-popup.tsx b/surfsense_web/components/assistant-ui/connector-popup.tsx index a1108f7c8..045c3c586 100644 --- a/surfsense_web/components/assistant-ui/connector-popup.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup.tsx @@ -16,7 +16,7 @@ import { ConnectorDialogHeader } from "./connector-popup/components/connector-di import { ConnectorConnectView } from "./connector-popup/connector-configs/views/connector-connect-view"; import { ConnectorEditView } from "./connector-popup/connector-configs/views/connector-edit-view"; import { IndexingConfigurationView } from "./connector-popup/connector-configs/views/indexing-configuration-view"; -import { OAUTH_CONNECTORS } from "./connector-popup/constants/connector-constants"; +import { COMPOSIO_CONNECTORS, OAUTH_CONNECTORS } from "./connector-popup/constants/connector-constants"; import { useConnectorDialog } from "./connector-popup/hooks/use-connector-dialog"; import { useIndexingConnectors } from "./connector-popup/hooks/use-indexing-connectors"; import { ActiveConnectorsTab } from "./connector-popup/tabs/active-connectors-tab"; @@ -196,9 +196,14 @@ export const ConnectorIndicator: FC = () => { onBack={handleBackFromAccountsList} onManage={handleStartEdit} onAddAccount={() => { - const oauthConnector = OAUTH_CONNECTORS.find( - (c) => c.connectorType === viewingAccountsType.connectorType - ); + // Check both OAUTH_CONNECTORS and COMPOSIO_CONNECTORS + const oauthConnector = + OAUTH_CONNECTORS.find( + (c) => c.connectorType === viewingAccountsType.connectorType + ) || + COMPOSIO_CONNECTORS.find( + (c) => c.connectorType === viewingAccountsType.connectorType + ); if (oauthConnector) { handleConnectOAuth(oauthConnector); } diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/connector-edit-view.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/connector-edit-view.tsx index 71258a519..234898922 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/connector-edit-view.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/connector-edit-view.tsx @@ -341,7 +341,7 @@ export const ConnectorEditView: FC = ({ {isSaving ? ( <> - Saving... + Saving ) : ( "Save Changes" diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/indexing-configuration-view.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/indexing-configuration-view.tsx index ea489aec8..68fc688c3 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/indexing-configuration-view.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/indexing-configuration-view.tsx @@ -9,7 +9,11 @@ import { getConnectorTypeDisplay } from "@/lib/connectors/utils"; import { cn } from "@/lib/utils"; import { DateRangeSelector } from "../../components/date-range-selector"; import { PeriodicSyncConfig } from "../../components/periodic-sync-config"; -import { type IndexingConfigState, OAUTH_CONNECTORS } from "../../constants/connector-constants"; +import { + COMPOSIO_CONNECTORS, + type IndexingConfigState, + OAUTH_CONNECTORS, +} from "../../constants/connector-constants"; import { getConnectorDisplayName } from "../../tabs/all-connectors-tab"; import { getConnectorConfigComponent } from "../index"; @@ -91,7 +95,10 @@ export const IndexingConfigurationView: FC = ({ }; }, [checkScrollState]); - const authConnector = OAUTH_CONNECTORS.find((c) => c.connectorType === connector?.connector_type); + // Check both OAUTH_CONNECTORS and COMPOSIO_CONNECTORS + const authConnector = + OAUTH_CONNECTORS.find((c) => c.connectorType === connector?.connector_type) || + COMPOSIO_CONNECTORS.find((c) => c.connectorType === connector?.connector_type); return (
diff --git a/surfsense_web/components/assistant-ui/connector-popup/hooks/use-connector-dialog.ts b/surfsense_web/components/assistant-ui/connector-popup/hooks/use-connector-dialog.ts index 2923ab823..a2b1168bd 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/hooks/use-connector-dialog.ts +++ b/surfsense_web/components/assistant-ui/connector-popup/hooks/use-connector-dialog.ts @@ -191,7 +191,10 @@ export const useConnectorDialog = () => { // Handle configure view (for page refresh support) if (params.view === "configure" && params.connector && !indexingConfig && allConnectors) { - const oauthConnector = OAUTH_CONNECTORS.find((c) => c.id === params.connector); + // Check both OAUTH_CONNECTORS and COMPOSIO_CONNECTORS + const oauthConnector = + OAUTH_CONNECTORS.find((c) => c.id === params.connector) || + COMPOSIO_CONNECTORS.find((c) => c.id === params.connector); if (oauthConnector) { let existingConnector: SearchSourceConnector | undefined; if (params.connectorId) { diff --git a/surfsense_web/components/assistant-ui/connector-popup/tabs/active-connectors-tab.tsx b/surfsense_web/components/assistant-ui/connector-popup/tabs/active-connectors-tab.tsx index a518d63a6..e45888bb1 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/tabs/active-connectors-tab.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/tabs/active-connectors-tab.tsx @@ -13,7 +13,7 @@ import type { SearchSourceConnector } from "@/contracts/types/connector.types"; import type { LogActiveTask, LogSummary } from "@/contracts/types/log.types"; import { connectorsApiService } from "@/lib/apis/connectors-api.service"; import { cn } from "@/lib/utils"; -import { OAUTH_CONNECTORS } from "../constants/connector-constants"; +import { COMPOSIO_CONNECTORS, OAUTH_CONNECTORS } from "../constants/connector-constants"; import { getDocumentCountForConnector } from "../utils/connector-document-mapping"; interface ActiveConnectorsTabProps { @@ -113,7 +113,10 @@ export const ActiveConnectorsTab: FC = ({ // Get display info for OAuth connector type const getOAuthConnectorTypeInfo = (connectorType: string) => { - const oauthConnector = OAUTH_CONNECTORS.find((c) => c.connectorType === connectorType); + // Check both OAUTH_CONNECTORS and COMPOSIO_CONNECTORS + const oauthConnector = + OAUTH_CONNECTORS.find((c) => c.connectorType === connectorType) || + COMPOSIO_CONNECTORS.find((c) => c.connectorType === connectorType); return { title: oauthConnector?.title || diff --git a/surfsense_web/components/settings/llm-role-manager.tsx b/surfsense_web/components/settings/llm-role-manager.tsx index ba4c4970c..c41a2d3bf 100644 --- a/surfsense_web/components/settings/llm-role-manager.tsx +++ b/surfsense_web/components/settings/llm-role-manager.tsx @@ -398,7 +398,7 @@ export function LLMRoleManager({ searchSpaceId }: LLMRoleManagerProps) { className="flex items-center gap-2 text-xs md:text-sm h-9 md:h-10" > - {isSaving ? "Saving..." : "Save Changes"} + {isSaving ? "Saving" : "Save Changes"}
diff --git a/surfsense_web/messages/en.json b/surfsense_web/messages/en.json index 94e44c8ec..8ca382669 100644 --- a/surfsense_web/messages/en.json +++ b/surfsense_web/messages/en.json @@ -157,7 +157,7 @@ "delete_note": "Delete Note", "delete_note_confirm": "Are you sure you want to delete", "action_cannot_undone": "This action cannot be undone.", - "deleting": "Deleting...", + "deleting": "Deleting", "surfsense_dashboard": "SurfSense Dashboard", "welcome_message": "Welcome to your SurfSense dashboard.", "your_search_spaces": "Your Search Spaces", @@ -498,7 +498,7 @@ "base": "Base", "all_roles_assigned": "All roles are assigned and ready to use! Your LLM configuration is complete.", "save_changes": "Save Changes", - "saving": "Saving...", + "saving": "Saving", "reset": "Reset", "status": "Status", "status_ready": "Ready", @@ -548,7 +548,7 @@ "log_deleted_error": "Failed to delete log", "confirm_delete_log_title": "Are you sure?", "confirm_delete_log_desc": "This action cannot be undone. This will permanently delete the log entry.", - "deleting": "Deleting..." + "deleting": "Deleting" }, "onboard": { "welcome_title": "Welcome to SurfSense",