diff --git a/surfsense_backend/alembic/versions/74_add_composio_connector_enums.py b/surfsense_backend/alembic/versions/74_add_composio_connector_enums.py index cadf70cb6..2996d9d07 100644 --- a/surfsense_backend/alembic/versions/74_add_composio_connector_enums.py +++ b/surfsense_backend/alembic/versions/74_add_composio_connector_enums.py @@ -82,14 +82,14 @@ def upgrade() -> None: def downgrade() -> None: """Downgrade schema - remove Composio connector types from connector and document enums. - + Note: PostgreSQL does not support removing enum values directly. To properly downgrade, you would need to: 1. Delete any rows using the Composio connector type values 2. Create new enums without the Composio connector types 3. Alter the columns to use the new enums 4. Drop the old enums - + This is left as a no-op since removing enum values is complex and typically not needed in practice. """ diff --git a/surfsense_backend/app/connectors/composio_connector.py b/surfsense_backend/app/connectors/composio_connector.py index 21e339d12..b49988887 100644 --- a/surfsense_backend/app/connectors/composio_connector.py +++ b/surfsense_backend/app/connectors/composio_connector.py @@ -12,7 +12,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select from app.db import SearchSourceConnector -from app.services.composio_service import ComposioService, INDEXABLE_TOOLKITS +from app.services.composio_service import INDEXABLE_TOOLKITS, ComposioService logger = logging.getLogger(__name__) @@ -271,7 +271,9 @@ class ComposioConnector: from_email = header_dict.get("from", "Unknown Sender") to_email = header_dict.get("to", "Unknown Recipient") # Composio provides messageTimestamp directly - date_str = message.get("messageTimestamp", "") or header_dict.get("date", "Unknown Date") + date_str = message.get("messageTimestamp", "") or header_dict.get( + "date", "Unknown Date" + ) # Build markdown content markdown_content = f"# {subject}\n\n" diff --git a/surfsense_backend/app/connectors/github_connector.py b/surfsense_backend/app/connectors/github_connector.py index 6f04ccdba..9d4b98c4b 100644 --- a/surfsense_backend/app/connectors/github_connector.py +++ b/surfsense_backend/app/connectors/github_connector.py @@ -58,7 +58,9 @@ class GitHubConnector: if self.token: logger.info("GitHub connector initialized with authentication token.") else: - logger.info("GitHub connector initialized without token (public repos only).") + logger.info( + "GitHub connector initialized without token (public repos only)." + ) def ingest_repository( self, @@ -95,17 +97,27 @@ class GitHubConnector: cmd = [ "gitingest", repo_url, - "--output", output_path, - "--max-size", str(max_file_size), + "--output", + output_path, + "--max-size", + str(max_file_size), # Common exclude patterns - "-e", "node_modules/*", - "-e", "vendor/*", - "-e", ".git/*", - "-e", "__pycache__/*", - "-e", "dist/*", - "-e", "build/*", - "-e", "*.lock", - "-e", "package-lock.json", + "-e", + "node_modules/*", + "-e", + "vendor/*", + "-e", + ".git/*", + "-e", + "__pycache__/*", + "-e", + "dist/*", + "-e", + "build/*", + "-e", + "*.lock", + "-e", + "package-lock.json", ] # Add branch if specified @@ -147,7 +159,9 @@ class GitHubConnector: os.unlink(output_path) if not full_content or not full_content.strip(): - logger.warning(f"No content retrieved from repository: {repo_full_name}") + logger.warning( + f"No content retrieved from repository: {repo_full_name}" + ) return None # Parse the gitingest output @@ -171,11 +185,11 @@ class GitHubConnector: logger.error(f"gitingest timed out for repository: {repo_full_name}") return None except FileNotFoundError: - logger.error( - "gitingest CLI not found. Falling back to Python library." - ) + logger.error("gitingest CLI not found. Falling back to Python library.") # Fall back to Python library - return self._ingest_with_python_library(repo_full_name, branch, max_file_size) + return self._ingest_with_python_library( + repo_full_name, branch, max_file_size + ) except Exception as e: logger.error(f"Failed to ingest repository {repo_full_name}: {e}") return None diff --git a/surfsense_backend/app/routes/composio_routes.py b/surfsense_backend/app/routes/composio_routes.py index 25e545dfb..dec9beb02 100644 --- a/surfsense_backend/app/routes/composio_routes.py +++ b/surfsense_backend/app/routes/composio_routes.py @@ -11,7 +11,6 @@ Endpoints: - GET /connectors/{connector_id}/composio-drive/folders - List folders/files for Composio Google Drive """ -import asyncio import logging from uuid import UUID @@ -89,7 +88,9 @@ async def list_composio_toolkits(user: User = Depends(current_active_user)): @router.get("/auth/composio/connector/add") async def initiate_composio_auth( space_id: int, - toolkit_id: str = Query(..., description="Composio toolkit ID (e.g., 'googledrive', 'gmail')"), + toolkit_id: str = Query( + ..., description="Composio toolkit ID (e.g., 'googledrive', 'gmail')" + ), user: User = Depends(current_active_user), ): """ @@ -239,13 +240,15 @@ async def composio_callback( # Initialize Composio service service = ComposioService() entity_id = f"surfsense_{user_id}" - + # Use camelCase param if provided (Composio's format), fallback to snake_case final_connected_account_id = connectedAccountId or connected_account_id - + # DEBUG: Log all query parameters received - logger.info(f"DEBUG: Callback received - connectedAccountId: {connectedAccountId}, connected_account_id: {connected_account_id}, using: {final_connected_account_id}") - + logger.info( + f"DEBUG: Callback received - connectedAccountId: {connectedAccountId}, connected_account_id: {connected_account_id}, using: {final_connected_account_id}" + ) + # If we still don't have a connected_account_id, warn but continue # (the connector will be created but indexing won't work until updated) if not final_connected_account_id: @@ -254,7 +257,9 @@ async def composio_callback( "The connector will be created but indexing may not work." ) else: - logger.info(f"Successfully got connected_account_id: {final_connected_account_id}") + logger.info( + f"Successfully got connected_account_id: {final_connected_account_id}" + ) # Build connector config connector_config = { @@ -287,10 +292,17 @@ async def composio_callback( if existing_connector: # Delete the old Composio connected account before updating - old_connected_account_id = existing_connector.config.get("composio_connected_account_id") - if old_connected_account_id and old_connected_account_id != final_connected_account_id: + old_connected_account_id = existing_connector.config.get( + "composio_connected_account_id" + ) + if ( + old_connected_account_id + and old_connected_account_id != final_connected_account_id + ): try: - deleted = await service.delete_connected_account(old_connected_account_id) + deleted = await service.delete_connected_account( + old_connected_account_id + ) if deleted: logger.info( f"Deleted old Composio connected account {old_connected_account_id} " @@ -422,7 +434,9 @@ async def list_composio_drive_folders( ) # Get Composio connected account ID from config - composio_connected_account_id = connector.config.get("composio_connected_account_id") + composio_connected_account_id = connector.config.get( + "composio_connected_account_id" + ) if not composio_connected_account_id: raise HTTPException( status_code=400, @@ -451,27 +465,37 @@ async def list_composio_drive_folders( items = [] for file_info in files: file_id = file_info.get("id", "") or file_info.get("fileId", "") - file_name = file_info.get("name", "") or file_info.get("fileName", "") or "Untitled" + file_name = ( + file_info.get("name", "") or file_info.get("fileName", "") or "Untitled" + ) mime_type = file_info.get("mimeType", "") or file_info.get("mime_type", "") - + if not file_id: continue is_folder = mime_type == "application/vnd.google-apps.folder" - - items.append({ - "id": file_id, - "name": file_name, - "mimeType": mime_type, - "isFolder": is_folder, - "parents": file_info.get("parents", []), - "size": file_info.get("size"), - "iconLink": file_info.get("iconLink"), - }) + + items.append( + { + "id": file_id, + "name": file_name, + "mimeType": mime_type, + "isFolder": is_folder, + "parents": file_info.get("parents", []), + "size": file_info.get("size"), + "iconLink": file_info.get("iconLink"), + } + ) # Sort: folders first, then files, both alphabetically - folders = sorted([item for item in items if item["isFolder"]], key=lambda x: x["name"].lower()) - files_list = sorted([item for item in items if not item["isFolder"]], key=lambda x: x["name"].lower()) + folders = sorted( + [item for item in items if item["isFolder"]], + key=lambda x: x["name"].lower(), + ) + files_list = sorted( + [item for item in items if not item["isFolder"]], + key=lambda x: x["name"].lower(), + ) items = folders + files_list folder_count = len(folders) diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index ed306c7bc..433acac1c 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -37,7 +37,6 @@ from app.db import ( async_session_maker, get_async_session, ) -from app.services.composio_service import ComposioService from app.schemas import ( GoogleDriveIndexRequest, MCPConnectorCreate, @@ -48,6 +47,7 @@ from app.schemas import ( SearchSourceConnectorRead, SearchSourceConnectorUpdate, ) +from app.services.composio_service import ComposioService from app.services.notification_service import NotificationService from app.tasks.connector_indexers import ( index_airtable_records, @@ -537,11 +537,15 @@ async def delete_search_source_connector( SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR, ] if db_connector.connector_type in composio_connector_types: - composio_connected_account_id = db_connector.config.get("composio_connected_account_id") + composio_connected_account_id = db_connector.config.get( + "composio_connected_account_id" + ) if composio_connected_account_id and ComposioService.is_enabled(): try: service = ComposioService() - deleted = await service.delete_connected_account(composio_connected_account_id) + deleted = await service.delete_connected_account( + composio_connected_account_id + ) if deleted: logger.info( f"Successfully deleted Composio connected account {composio_connected_account_id} " @@ -897,7 +901,10 @@ async def index_connector_content( ) response_message = "Web page indexing started in the background." - elif connector.connector_type == SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR: + elif ( + connector.connector_type + == SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR + ): from app.tasks.celery_tasks.connector_tasks import ( index_composio_connector_task, ) @@ -907,8 +914,12 @@ async def index_connector_content( if drive_items and drive_items.has_items(): # Update connector config with the selected folders/files config = connector.config or {} - config["selected_folders"] = [{"id": f.id, "name": f.name} for f in drive_items.folders] - config["selected_files"] = [{"id": f.id, "name": f.name} for f in drive_items.files] + config["selected_folders"] = [ + {"id": f.id, "name": f.name} for f in drive_items.folders + ] + config["selected_files"] = [ + {"id": f.id, "name": f.name} for f in drive_items.files + ] if drive_items.indexing_options: config["indexing_options"] = { "max_files_per_folder": drive_items.indexing_options.max_files_per_folder, @@ -917,6 +928,7 @@ async def index_connector_content( } connector.config = config from sqlalchemy.orm.attributes import flag_modified + flag_modified(connector, "config") await session.commit() await session.refresh(connector) @@ -934,7 +946,9 @@ async def index_connector_content( index_composio_connector_task.delay( connector_id, search_space_id, str(user.id), indexing_from, indexing_to ) - response_message = "Composio Google Drive indexing started in the background." + response_message = ( + "Composio Google Drive indexing started in the background." + ) elif connector.connector_type in [ SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR, @@ -995,7 +1009,9 @@ async def _update_connector_timestamp_by_id(session: AsyncSession, connector_id: connector = result.scalars().first() if connector: - connector.last_indexed_at = datetime.now(UTC) # Use UTC for timezone consistency + connector.last_indexed_at = datetime.now( + UTC + ) # Use UTC for timezone consistency await session.commit() logger.info(f"Updated last_indexed_at for connector {connector_id}") except Exception as e: @@ -1150,7 +1166,9 @@ async def _run_indexing_with_notifications( indexed_count=documents_processed, error_message=error_or_warning, # Show errors even if some documents were indexed ) - await session.commit() # Commit to ensure Electric SQL syncs the notification update + await ( + session.commit() + ) # Commit to ensure Electric SQL syncs the notification update elif documents_processed > 0: # Update notification to storing stage if notification: @@ -1174,7 +1192,9 @@ async def _run_indexing_with_notifications( indexed_count=documents_processed, error_message=error_or_warning, # Show errors even if some documents were indexed ) - await session.commit() # Commit to ensure Electric SQL syncs the notification update + await ( + session.commit() + ) # Commit to ensure Electric SQL syncs the notification update else: # No new documents processed - check if this is an error or just no changes if error_or_warning: @@ -1189,7 +1209,9 @@ async def _run_indexing_with_notifications( indexed_count=0, error_message=error_or_warning, ) - await session.commit() # Commit to ensure Electric SQL syncs the notification update + await ( + session.commit() + ) # Commit to ensure Electric SQL syncs the notification update else: # Success - just no new documents to index (all skipped/unchanged) logger.info( @@ -1208,7 +1230,9 @@ async def _run_indexing_with_notifications( indexed_count=0, error_message=None, # No error - sync succeeded ) - await session.commit() # Commit to ensure Electric SQL syncs the notification update + await ( + session.commit() + ) # Commit to ensure Electric SQL syncs the notification update except Exception as e: logger.error(f"Error in indexing task: {e!s}", exc_info=True) diff --git a/surfsense_backend/app/services/composio_service.py b/surfsense_backend/app/services/composio_service.py index 1173cfb6a..0d6189cd9 100644 --- a/surfsense_backend/app/services/composio_service.py +++ b/surfsense_backend/app/services/composio_service.py @@ -111,7 +111,7 @@ class ComposioService: config_toolkit = getattr(auth_config, "toolkit", None) if config_toolkit is None: continue - + # Extract toolkit name/slug from the object toolkit_name = None if isinstance(config_toolkit, str): @@ -122,18 +122,22 @@ class ComposioService: toolkit_name = config_toolkit.name elif hasattr(config_toolkit, "id"): toolkit_name = config_toolkit.id - + # Compare case-insensitively if toolkit_name and toolkit_name.lower() == toolkit_id.lower(): - logger.info(f"Found auth config {auth_config.id} for toolkit {toolkit_id}") + logger.info( + f"Found auth config {auth_config.id} for toolkit {toolkit_id}" + ) return auth_config.id - + # Log available auth configs for debugging - logger.warning(f"No auth config found for toolkit '{toolkit_id}'. Available auth configs:") + logger.warning( + f"No auth config found for toolkit '{toolkit_id}'. Available auth configs:" + ) for auth_config in auth_configs.items: config_toolkit = getattr(auth_config, "toolkit", None) logger.warning(f" - {auth_config.id}: toolkit={config_toolkit}") - + return None except Exception as e: logger.error(f"Failed to list auth configs: {e!s}") @@ -162,7 +166,7 @@ class ComposioService: try: # First, get the auth_config_id for this toolkit auth_config_id = self._get_auth_config_for_toolkit(toolkit_id) - + if not auth_config_id: raise ValueError( f"No auth config found for toolkit '{toolkit_id}'. " @@ -214,7 +218,9 @@ class ComposioService: "user_id": getattr(account, "user_id", None), } except Exception as e: - logger.error(f"Failed to get connected account {connected_account_id}: {e!s}") + logger.error( + f"Failed to get connected account {connected_account_id}: {e!s}" + ) return None async def list_all_connections(self) -> list[dict[str, Any]]: @@ -226,15 +232,17 @@ class ComposioService: """ try: accounts_response = self.client.connected_accounts.list() - + if hasattr(accounts_response, "items"): accounts = accounts_response.items elif hasattr(accounts_response, "__iter__"): accounts = accounts_response else: - logger.warning(f"Unexpected accounts response type: {type(accounts_response)}") + logger.warning( + f"Unexpected accounts response type: {type(accounts_response)}" + ) return [] - + result = [] for acc in accounts: toolkit_raw = getattr(acc, "toolkit", None) @@ -248,14 +256,16 @@ class ComposioService: toolkit_info = toolkit_raw.name else: toolkit_info = str(toolkit_raw) - - result.append({ - "id": acc.id, - "status": getattr(acc, "status", None), - "toolkit": toolkit_info, - "user_id": getattr(acc, "user_id", None), - }) - + + result.append( + { + "id": acc.id, + "status": getattr(acc, "status", None), + "toolkit": toolkit_info, + "user_id": getattr(acc, "user_id", None), + } + ) + return result except Exception as e: logger.error(f"Failed to list all connections: {e!s}") @@ -273,16 +283,18 @@ class ComposioService: """ try: accounts_response = self.client.connected_accounts.list(user_id=user_id) - + # Handle paginated response (may have .items attribute) or direct list if hasattr(accounts_response, "items"): accounts = accounts_response.items elif hasattr(accounts_response, "__iter__"): accounts = accounts_response else: - logger.warning(f"Unexpected accounts response type: {type(accounts_response)}") + logger.warning( + f"Unexpected accounts response type: {type(accounts_response)}" + ) return [] - + result = [] for acc in accounts: # Extract toolkit info - might be string or object @@ -297,13 +309,15 @@ class ComposioService: toolkit_info = toolkit_raw.name else: toolkit_info = toolkit_raw - - result.append({ - "id": acc.id, - "status": getattr(acc, "status", None), - "toolkit": toolkit_info, - }) - + + result.append( + { + "id": acc.id, + "status": getattr(acc, "status", None), + "toolkit": toolkit_info, + } + ) + logger.info(f"Found {len(result)} connections for user {user_id}: {result}") return result except Exception as e: @@ -324,10 +338,14 @@ class ComposioService: """ try: self.client.connected_accounts.delete(connected_account_id) - logger.info(f"Successfully deleted Composio connected account: {connected_account_id}") + logger.info( + f"Successfully deleted Composio connected account: {connected_account_id}" + ) return True except Exception as e: - logger.error(f"Failed to delete Composio connected account {connected_account_id}: {e!s}") + logger.error( + f"Failed to delete Composio connected account {connected_account_id}: {e!s}" + ) return False async def execute_tool( @@ -398,10 +416,14 @@ class ComposioService: } if folder_id: # List contents of a specific folder (exclude shortcuts - we don't have access to them) - params["q"] = f"'{folder_id}' in parents and trashed = false and mimeType != 'application/vnd.google-apps.shortcut'" + params["q"] = ( + f"'{folder_id}' in parents and trashed = false and mimeType != 'application/vnd.google-apps.shortcut'" + ) else: # List root-level items only (My Drive root), exclude shortcuts - params["q"] = "'root' in parents and trashed = false and mimeType != 'application/vnd.google-apps.shortcut'" + params["q"] = ( + "'root' in parents and trashed = false and mimeType != 'application/vnd.google-apps.shortcut'" + ) if page_token: params["page_token"] = page_token @@ -416,17 +438,21 @@ class ComposioService: return [], None, result.get("error", "Unknown error") data = result.get("data", {}) - + # Handle nested response structure from Composio files = [] next_token = None if isinstance(data, dict): # Try direct access first, then nested files = data.get("files", []) or data.get("data", {}).get("files", []) - next_token = data.get("nextPageToken") or data.get("next_page_token") or data.get("data", {}).get("nextPageToken") + next_token = ( + data.get("nextPageToken") + or data.get("next_page_token") + or data.get("data", {}).get("nextPageToken") + ) elif isinstance(data, list): files = data - + return files, next_token, None except Exception as e: @@ -459,13 +485,13 @@ class ComposioService: return None, result.get("error", "Unknown error") data = result.get("data") - + # Composio GOOGLEDRIVE_DOWNLOAD_FILE returns a dict with file info # The actual content is in "downloaded_file_content" field if isinstance(data, dict): # Try known Composio response fields in order of preference content = None - + # Primary field from GOOGLEDRIVE_DOWNLOAD_FILE if "downloaded_file_content" in data: content = data["downloaded_file_content"] @@ -474,19 +500,24 @@ class ComposioService: # Try to extract actual content from nested dict # Note: Composio nests downloaded_file_content inside another downloaded_file_content actual_content = ( - content.get("downloaded_file_content") or - content.get("content") or - content.get("data") or - content.get("file_content") or - content.get("body") or - content.get("text") + content.get("downloaded_file_content") + or content.get("content") + or content.get("data") + or content.get("file_content") + or content.get("body") + or content.get("text") ) if actual_content is not None: content = actual_content else: # Log structure for debugging - logger.warning(f"downloaded_file_content is dict with keys: {list(content.keys())}") - return None, f"Cannot extract content from downloaded_file_content. Keys: {list(content.keys())}" + logger.warning( + f"downloaded_file_content is dict with keys: {list(content.keys())}" + ) + return ( + None, + f"Cannot extract content from downloaded_file_content. Keys: {list(content.keys())}", + ) # Fallback fields for compatibility elif "content" in data: content = data["content"] @@ -494,16 +525,20 @@ class ComposioService: content = data["file_content"] elif "data" in data: content = data["data"] - + if content is None: # Log available keys for debugging logger.warning(f"Composio response dict keys: {list(data.keys())}") - return None, f"No file content found in Composio response. Available keys: {list(data.keys())}" - + return ( + None, + f"No file content found in Composio response. Available keys: {list(data.keys())}", + ) + # Convert content to bytes if isinstance(content, str): # Check if it's base64 encoded import base64 + try: # Try to decode as base64 first content = base64.b64decode(content) @@ -514,11 +549,19 @@ class ComposioService: pass # Already bytes elif isinstance(content, dict): # Still a dict after all extraction attempts - log structure - logger.warning(f"Content still dict after extraction: {list(content.keys())}") - return None, f"Unexpected nested content structure: {list(content.keys())}" + logger.warning( + f"Content still dict after extraction: {list(content.keys())}" + ) + return ( + None, + f"Unexpected nested content structure: {list(content.keys())}", + ) else: - return None, f"Unexpected content type in Composio response: {type(content).__name__}" - + return ( + None, + f"Unexpected content type in Composio response: {type(content).__name__}", + ) + return content, None elif isinstance(data, str): return data.encode("utf-8"), None @@ -527,7 +570,10 @@ class ComposioService: elif data is None: return None, "No data returned from Composio" else: - return None, f"Unexpected data type from Composio: {type(data).__name__}" + return ( + None, + f"Unexpected data type from Composio: {type(data).__name__}", + ) except Exception as e: logger.error(f"Failed to get Drive file content: {e!s}") @@ -576,17 +622,21 @@ class ComposioService: return [], None, result.get("error", "Unknown error") data = result.get("data", {}) - + # Try different possible response structures messages = [] next_token = None result_size_estimate = None if isinstance(data, dict): - messages = data.get("messages", []) or data.get("data", {}).get("messages", []) or data.get("emails", []) + messages = ( + data.get("messages", []) + or data.get("data", {}).get("messages", []) + or data.get("emails", []) + ) # Check for pagination token in various possible locations next_token = ( - data.get("nextPageToken") - or data.get("next_page_token") + data.get("nextPageToken") + or data.get("next_page_token") or data.get("data", {}).get("nextPageToken") or data.get("data", {}).get("next_page_token") ) @@ -599,7 +649,7 @@ class ComposioService: ) elif isinstance(data, list): messages = data - + return messages, next_token, result_size_estimate, None except Exception as e: @@ -683,14 +733,18 @@ class ComposioService: return [], result.get("error", "Unknown error") data = result.get("data", {}) - + # Try different possible response structures events = [] if isinstance(data, dict): - events = data.get("items", []) or data.get("data", {}).get("items", []) or data.get("events", []) + events = ( + data.get("items", []) + or data.get("data", {}).get("items", []) + or data.get("events", []) + ) elif isinstance(data, list): events = data - + return events, None except Exception as e: diff --git a/surfsense_backend/app/tasks/composio_indexer.py b/surfsense_backend/app/tasks/composio_indexer.py index 6f40e6d66..e5c8b701e 100644 --- a/surfsense_backend/app/tasks/composio_indexer.py +++ b/surfsense_backend/app/tasks/composio_indexer.py @@ -64,10 +64,14 @@ async def check_document_by_unique_identifier( async def get_connector_by_id( - session: AsyncSession, connector_id: int, connector_type: SearchSourceConnectorType | None + session: AsyncSession, + connector_id: int, + connector_type: SearchSourceConnectorType | None, ) -> SearchSourceConnector | None: """Get a connector by ID and optionally by type from the database.""" - query = select(SearchSourceConnector).filter(SearchSourceConnector.id == connector_id) + query = select(SearchSourceConnector).filter( + SearchSourceConnector.id == connector_id + ) if connector_type is not None: query = query.filter(SearchSourceConnector.connector_type == connector_type) result = await session.execute(query) @@ -81,40 +85,90 @@ async def update_connector_last_indexed( ) -> None: """Update the last_indexed_at timestamp for a connector.""" if update_last_indexed: - connector.last_indexed_at = datetime.now(UTC) # Use UTC for timezone consistency + connector.last_indexed_at = datetime.now( + UTC + ) # Use UTC for timezone consistency logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}") # Binary file extensions that need file processor BINARY_FILE_EXTENSIONS = { - ".pdf", ".doc", ".docx", ".xls", ".xlsx", ".ppt", ".pptx", - ".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".webp", - ".zip", ".tar", ".gz", ".rar", ".7z", - ".mp3", ".mp4", ".wav", ".avi", ".mov", - ".exe", ".dll", ".so", ".bin", + ".pdf", + ".doc", + ".docx", + ".xls", + ".xlsx", + ".ppt", + ".pptx", + ".png", + ".jpg", + ".jpeg", + ".gif", + ".bmp", + ".tiff", + ".webp", + ".zip", + ".tar", + ".gz", + ".rar", + ".7z", + ".mp3", + ".mp4", + ".wav", + ".avi", + ".mov", + ".exe", + ".dll", + ".so", + ".bin", } # Text file extensions that can be decoded as UTF-8 TEXT_FILE_EXTENSIONS = { - ".txt", ".md", ".markdown", ".json", ".xml", ".html", ".htm", - ".css", ".js", ".ts", ".py", ".java", ".c", ".cpp", ".h", - ".yaml", ".yml", ".toml", ".ini", ".cfg", ".conf", - ".sh", ".bash", ".zsh", ".fish", - ".sql", ".csv", ".tsv", - ".rst", ".tex", ".log", + ".txt", + ".md", + ".markdown", + ".json", + ".xml", + ".html", + ".htm", + ".css", + ".js", + ".ts", + ".py", + ".java", + ".c", + ".cpp", + ".h", + ".yaml", + ".yml", + ".toml", + ".ini", + ".cfg", + ".conf", + ".sh", + ".bash", + ".zsh", + ".fish", + ".sql", + ".csv", + ".tsv", + ".rst", + ".tex", + ".log", } def _is_binary_file(file_name: str, mime_type: str) -> bool: """Check if a file is binary based on extension or mime type.""" extension = Path(file_name).suffix.lower() - + # Check extension first if extension in BINARY_FILE_EXTENSIONS: return True if extension in TEXT_FILE_EXTENSIONS: return False - + # Check mime type if mime_type: if mime_type.startswith(("image/", "audio/", "video/", "application/pdf")): @@ -122,9 +176,13 @@ def _is_binary_file(file_name: str, mime_type: str) -> bool: if mime_type.startswith(("text/", "application/json", "application/xml")): return False # Office documents - if "spreadsheet" in mime_type or "document" in mime_type or "presentation" in mime_type: + if ( + "spreadsheet" in mime_type + or "document" in mime_type + or "presentation" in mime_type + ): return True - + # Default to text for unknown types return False @@ -143,10 +201,10 @@ async def _process_file_content( ) -> str: """ Process file content and return markdown text. - + For binary files (PDFs, images, etc.), uses Surfsense's ETL service. For text files, decodes as UTF-8. - + Args: content: File content as bytes or string file_name: Name of the file @@ -158,14 +216,14 @@ async def _process_file_content( task_logger: Task logging service log_entry: Log entry for tracking processing_errors: List to append errors to - + Returns: Markdown content string """ # Ensure content is bytes if isinstance(content, str): content = content.encode("utf-8") - + # Check if this is a binary file if _is_binary_file(file_name, mime_type): # Use ETL service for binary files (PDF, Office docs, etc.) @@ -173,24 +231,26 @@ async def _process_file_content( try: # Get file extension extension = Path(file_name).suffix or ".bin" - + # Write to temp file - with tempfile.NamedTemporaryFile(delete=False, suffix=extension) as tmp_file: + with tempfile.NamedTemporaryFile( + delete=False, suffix=extension + ) as tmp_file: tmp_file.write(content) temp_file_path = tmp_file.name - + # Use the configured ETL service to extract text extracted_text = await _extract_text_with_etl( temp_file_path, file_name, task_logger, log_entry ) - + if extracted_text: return extracted_text else: # Fallback if extraction fails logger.warning(f"Could not extract text from binary file {file_name}") return f"# {file_name}\n\n[Binary file - text extraction failed]\n\n**File ID:** {file_id}\n**Type:** {mime_type}\n" - + except Exception as e: error_msg = f"Error processing binary file {file_name}: {e!s}" logger.error(error_msg) @@ -214,7 +274,7 @@ async def _process_file_content( return content.decode(encoding) except UnicodeDecodeError: continue - + # If all encodings fail, treat as binary error_msg = f"Could not decode text file {file_name} with any encoding" logger.warning(error_msg) @@ -230,27 +290,27 @@ async def _extract_text_with_etl( ) -> str | None: """ Extract text from a file using the configured ETL service. - + Args: file_path: Path to the file file_name: Name of the file task_logger: Task logging service log_entry: Log entry for tracking - + Returns: Extracted text as markdown, or None if extraction fails """ import warnings from logging import ERROR, getLogger - + etl_service = config.ETL_SERVICE - + try: if etl_service == "UNSTRUCTURED": from langchain_unstructured import UnstructuredLoader from app.utils.document_converters import convert_document_to_markdown - + loader = UnstructuredLoader( file_path, mode="elements", @@ -260,57 +320,67 @@ async def _extract_text_with_etl( include_metadata=False, strategy="auto", ) - + docs = await loader.aload() if docs: return await convert_document_to_markdown(docs) return None - + elif etl_service == "LLAMACLOUD": - from app.tasks.document_processors.file_processors import parse_with_llamacloud_retry - + from app.tasks.document_processors.file_processors import ( + parse_with_llamacloud_retry, + ) + # Estimate pages (rough estimate based on file size) file_size = os.path.getsize(file_path) estimated_pages = max(1, file_size // (80 * 1024)) - + result = await parse_with_llamacloud_retry( file_path=file_path, estimated_pages=estimated_pages, task_logger=task_logger, log_entry=log_entry, ) - - markdown_documents = await result.aget_markdown_documents(split_by_page=False) + + markdown_documents = await result.aget_markdown_documents( + split_by_page=False + ) if markdown_documents: return markdown_documents[0].text return None - + elif etl_service == "DOCLING": from app.services.docling_service import create_docling_service - + docling_service = create_docling_service() - + # Suppress pdfminer warnings pdfminer_logger = getLogger("pdfminer") original_level = pdfminer_logger.level - + with warnings.catch_warnings(): - warnings.filterwarnings("ignore", category=UserWarning, module="pdfminer") - warnings.filterwarnings("ignore", message=".*Cannot set gray non-stroke color.*") + warnings.filterwarnings( + "ignore", category=UserWarning, module="pdfminer" + ) + warnings.filterwarnings( + "ignore", message=".*Cannot set gray non-stroke color.*" + ) warnings.filterwarnings("ignore", message=".*invalid float value.*") - + pdfminer_logger.setLevel(ERROR) - + try: - result = await docling_service.process_document(file_path, file_name) + result = await docling_service.process_document( + file_path, file_name + ) finally: pdfminer_logger.setLevel(original_level) - + return result.get("content") else: logger.warning(f"Unknown ETL service: {etl_service}") return None - + except Exception as e: logger.error(f"ETL extraction failed for {file_name}: {e!s}") return None @@ -367,9 +437,11 @@ async def index_composio_connector( # Get connector by id - accept any Composio connector type # We'll check the actual type after loading connector = await get_connector_by_id( - session, connector_id, None # Don't filter by type, we'll validate after + session, + connector_id, + None, # Don't filter by type, we'll validate after ) - + # Validate it's a Composio connector if connector and connector.connector_type not in [ SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, @@ -392,7 +464,9 @@ async def index_composio_connector( # Get toolkit ID from config toolkit_id = connector.config.get("toolkit_id") if not toolkit_id: - error_msg = f"Composio connector {connector_id} has no toolkit_id configured" + error_msg = ( + f"Composio connector {connector_id} has no toolkit_id configured" + ) await task_logger.log_task_failure( log_entry, error_msg, {"error_type": "MissingToolkitId"} ) @@ -488,7 +562,7 @@ async def _index_composio_google_drive( max_items: int = 1000, ) -> tuple[int, str]: """Index Google Drive files via Composio. - + Supports folder/file selection via connector config: - selected_folders: List of {id, name} for folders to index - selected_files: List of {id, name} for individual files to index @@ -502,14 +576,18 @@ async def _index_composio_google_drive( selected_folders = connector_config.get("selected_folders", []) selected_files = connector_config.get("selected_files", []) indexing_options = connector_config.get("indexing_options", {}) - + max_files_per_folder = indexing_options.get("max_files_per_folder", 100) include_subfolders = indexing_options.get("include_subfolders", True) await task_logger.log_task_progress( log_entry, f"Fetching Google Drive files via Composio for connector {connector_id}", - {"stage": "fetching_files", "selected_folders": len(selected_folders), "selected_files": len(selected_files)}, + { + "stage": "fetching_files", + "selected_folders": len(selected_folders), + "selected_files": len(selected_files), + }, ) all_files = [] @@ -520,34 +598,42 @@ async def _index_composio_google_drive( for folder in selected_folders: folder_id = folder.get("id") folder_name = folder.get("name", "Unknown") - + if not folder_id: continue - + # Handle special case for "root" folder actual_folder_id = None if folder_id == "root" else folder_id - + logger.info(f"Fetching files from folder: {folder_name} ({folder_id})") - + # Fetch files from this folder folder_files = [] page_token = None - + while len(folder_files) < max_files_per_folder: - files, next_token, error = await composio_connector.list_drive_files( + ( + files, + next_token, + error, + ) = await composio_connector.list_drive_files( folder_id=actual_folder_id, page_token=page_token, page_size=min(100, max_files_per_folder - len(folder_files)), ) if error: - logger.warning(f"Failed to fetch files from folder {folder_name}: {error}") + logger.warning( + f"Failed to fetch files from folder {folder_name}: {error}" + ) break # Process files for file_info in files: - mime_type = file_info.get("mimeType", "") or file_info.get("mime_type", "") - + mime_type = file_info.get("mimeType", "") or file_info.get( + "mime_type", "" + ) + # If it's a folder and include_subfolders is enabled, recursively fetch if mime_type == "application/vnd.google-apps.folder": if include_subfolders: @@ -565,7 +651,7 @@ async def _index_composio_google_drive( if not next_token: break page_token = next_token - + all_files.extend(folder_files[:max_files_per_folder]) logger.info(f"Found {len(folder_files)} files in folder {folder_name}") @@ -573,16 +659,18 @@ async def _index_composio_google_drive( for selected_file in selected_files: file_id = selected_file.get("id") file_name = selected_file.get("name", "Unknown") - + if not file_id: continue - + # Add file info (we'll fetch content later during indexing) - all_files.append({ - "id": file_id, - "name": file_name, - "mimeType": "", # Will be determined later - }) + all_files.append( + { + "id": file_id, + "name": file_name, + "mimeType": "", # Will be determined later + } + ) else: # No selection specified - fetch all files (original behavior) page_token = None @@ -613,7 +701,10 @@ async def _index_composio_google_drive( # CRITICAL: Update timestamp even when no files found so Electric SQL syncs and UI shows indexed status await update_connector_last_indexed(session, connector, update_last_indexed) await session.commit() - return 0, None # Return None (not error) when no items found - this is success with 0 items + return ( + 0, + None, + ) # Return None (not error) when no items found - this is success with 0 items logger.info(f"Found {len(all_files)} Google Drive files to index via Composio") @@ -625,8 +716,14 @@ async def _index_composio_google_drive( try: # Handle both standard Google API and potential Composio variations file_id = file_info.get("id", "") or file_info.get("fileId", "") - file_name = file_info.get("name", "") or file_info.get("fileName", "") or "Untitled" - mime_type = file_info.get("mimeType", "") or file_info.get("mime_type", "") + file_name = ( + file_info.get("name", "") + or file_info.get("fileName", "") + or "Untitled" + ) + mime_type = file_info.get("mimeType", "") or file_info.get( + "mime_type", "" + ) if not file_id: documents_skipped += 1 @@ -648,12 +745,15 @@ async def _index_composio_google_drive( ) # Get file content - content, content_error = await composio_connector.get_drive_file_content( - file_id - ) + ( + content, + content_error, + ) = await composio_connector.get_drive_file_content(file_id) if content_error or not content: - logger.warning(f"Could not get content for file {file_name}: {content_error}") + logger.warning( + f"Could not get content for file {file_name}: {content_error}" + ) # Use metadata as content fallback markdown_content = f"# {file_name}\n\n" markdown_content += f"**File ID:** {file_id}\n" @@ -700,12 +800,19 @@ async def _index_composio_google_drive( "mime_type": mime_type, "document_type": "Google Drive File (Composio)", } - summary_content, summary_embedding = await generate_document_summary( + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( markdown_content, user_llm, document_metadata ) else: - summary_content = f"Google Drive File: {file_name}\n\nType: {mime_type}" - summary_embedding = config.embedding_model_instance.embed(summary_content) + summary_content = ( + f"Google Drive File: {file_name}\n\nType: {mime_type}" + ) + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) chunks = await create_document_chunks(markdown_content) @@ -724,8 +831,8 @@ async def _index_composio_google_drive( existing_document.updated_at = get_current_timestamp() documents_indexed += 1 - - # Batch commit every 10 documents + + # Batch commit every 10 documents if documents_indexed % 10 == 0: logger.info( f"Committing batch: {documents_indexed} Google Drive files processed so far" @@ -745,12 +852,19 @@ async def _index_composio_google_drive( "mime_type": mime_type, "document_type": "Google Drive File (Composio)", } - summary_content, summary_embedding = await generate_document_summary( + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( markdown_content, user_llm, document_metadata ) else: - summary_content = f"Google Drive File: {file_name}\n\nType: {mime_type}" - summary_embedding = config.embedding_model_instance.embed(summary_content) + summary_content = ( + f"Google Drive File: {file_name}\n\nType: {mime_type}" + ) + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) chunks = await create_document_chunks(markdown_content) @@ -776,7 +890,7 @@ async def _index_composio_google_drive( session.add(document) documents_indexed += 1 - # Batch commit every 10 documents + # Batch commit every 10 documents if documents_indexed % 10 == 0: logger.info( f"Committing batch: {documents_indexed} Google Drive files processed so far" @@ -784,7 +898,9 @@ async def _index_composio_google_drive( await session.commit() except Exception as e: - error_msg = f"Error processing Drive file {file_name or 'unknown'}: {e!s}" + error_msg = ( + f"Error processing Drive file {file_name or 'unknown'}: {e!s}" + ) logger.error(error_msg, exc_info=True) processing_errors.append(error_msg) documents_skipped += 1 @@ -848,7 +964,7 @@ async def _fetch_folder_files_recursively( ) -> list[dict[str, Any]]: """ Recursively fetch files from a Google Drive folder via Composio. - + Args: composio_connector: The Composio connector instance folder_id: Google Drive folder ID @@ -856,20 +972,20 @@ async def _fetch_folder_files_recursively( current_count: Current number of files already fetched depth: Current recursion depth max_depth: Maximum recursion depth to prevent infinite loops - + Returns: List of file info dictionaries """ if depth >= max_depth: logger.warning(f"Max recursion depth reached for folder {folder_id}") return [] - + if current_count >= max_files: return [] - + all_files = [] page_token = None - + try: while len(all_files) + current_count < max_files: files, next_token, error = await composio_connector.list_drive_files( @@ -877,14 +993,18 @@ async def _fetch_folder_files_recursively( page_token=page_token, page_size=min(100, max_files - len(all_files) - current_count), ) - + if error: - logger.warning(f"Error fetching files from subfolder {folder_id}: {error}") + logger.warning( + f"Error fetching files from subfolder {folder_id}: {error}" + ) break - + for file_info in files: - mime_type = file_info.get("mimeType", "") or file_info.get("mime_type", "") - + mime_type = file_info.get("mimeType", "") or file_info.get( + "mime_type", "" + ) + if mime_type == "application/vnd.google-apps.folder": # Recursively fetch from subfolders subfolder_files = await _fetch_folder_files_recursively( @@ -898,16 +1018,16 @@ async def _fetch_folder_files_recursively( all_files.extend(subfolder_files) else: all_files.append(file_info) - + if len(all_files) + current_count >= max_files: break - + if not next_token: break page_token = next_token - - return all_files[:max_files - current_count] - + + return all_files[: max_files - current_count] + except Exception as e: logger.error(f"Error in recursive folder fetch: {e!s}") return all_files @@ -924,10 +1044,10 @@ async def _process_gmail_message_batch( ) -> tuple[int, int]: """ Process a batch of Gmail messages and index them. - + Args: total_documents_indexed: Running total of documents indexed so far (for batch commits). - + Returns: Tuple of (documents_indexed, documents_skipped) """ @@ -965,7 +1085,9 @@ async def _process_gmail_message_batch( date_str = value # Format to markdown using the full message data - markdown_content = composio_connector.format_gmail_message_to_markdown(message) + markdown_content = composio_connector.format_gmail_message_to_markdown( + message + ) # Check for empty content (defensive parsing per Composio best practices) if not markdown_content.strip(): @@ -1008,12 +1130,19 @@ async def _process_gmail_message_batch( "sender": sender, "document_type": "Gmail Message (Composio)", } - summary_content, summary_embedding = await generate_document_summary( + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( markdown_content, user_llm, document_metadata ) else: - summary_content = f"Gmail: {subject}\n\nFrom: {sender}\nDate: {date_str}" - summary_embedding = config.embedding_model_instance.embed(summary_content) + summary_content = ( + f"Gmail: {subject}\n\nFrom: {sender}\nDate: {date_str}" + ) + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) chunks = await create_document_chunks(markdown_content) @@ -1035,8 +1164,8 @@ async def _process_gmail_message_batch( existing_document.updated_at = get_current_timestamp() documents_indexed += 1 - - # Batch commit every 10 documents + + # Batch commit every 10 documents current_total = total_documents_indexed + documents_indexed if current_total % 10 == 0: logger.info( @@ -1062,8 +1191,12 @@ async def _process_gmail_message_batch( markdown_content, user_llm, document_metadata ) else: - summary_content = f"Gmail: {subject}\n\nFrom: {sender}\nDate: {date_str}" - summary_embedding = config.embedding_model_instance.embed(summary_content) + summary_content = ( + f"Gmail: {subject}\n\nFrom: {sender}\nDate: {date_str}" + ) + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) chunks = await create_document_chunks(markdown_content) @@ -1092,7 +1225,7 @@ async def _process_gmail_message_batch( session.add(document) documents_indexed += 1 - # Batch commit every 10 documents + # Batch commit every 10 documents current_total = total_documents_indexed + documents_indexed if current_total % 10 == 0: logger.info( @@ -1107,7 +1240,9 @@ async def _process_gmail_message_batch( try: await session.rollback() except Exception as rollback_error: - logger.error(f"Error during rollback: {rollback_error!s}", exc_info=True) + logger.error( + f"Error during rollback: {rollback_error!s}", exc_info=True + ) continue return documents_indexed, documents_skipped @@ -1169,7 +1304,9 @@ async def _index_composio_gmail( current_batch_size = min(batch_size, remaining) # Use result_size_estimate if available, otherwise fall back to max_items - estimated_total = result_size_estimate if result_size_estimate is not None else max_items + estimated_total = ( + result_size_estimate if result_size_estimate is not None else max_items + ) # Cap estimated_total at max_items to avoid showing misleading progress estimated_total = min(estimated_total, max_items) @@ -1187,7 +1324,12 @@ async def _index_composio_gmail( ) # Fetch batch of messages - messages, next_token, result_size_estimate_batch, error = await composio_connector.list_gmail_messages( + ( + messages, + next_token, + result_size_estimate_batch, + error, + ) = await composio_connector.list_gmail_messages( query=query, max_results=current_batch_size, page_token=page_token, @@ -1206,13 +1348,17 @@ async def _index_composio_gmail( # Update result_size_estimate from first response (Gmail provides this estimate) if result_size_estimate is None and result_size_estimate_batch is not None: result_size_estimate = result_size_estimate_batch - logger.info(f"Gmail API estimated {result_size_estimate} total messages for query: '{query}'") + logger.info( + f"Gmail API estimated {result_size_estimate} total messages for query: '{query}'" + ) total_messages_fetched += len(messages) # Recalculate estimated_total after potentially updating result_size_estimate - estimated_total = result_size_estimate if result_size_estimate is not None else max_items + estimated_total = ( + result_size_estimate if result_size_estimate is not None else max_items + ) estimated_total = min(estimated_total, max_items) - + logger.info( f"Fetched batch of {len(messages)} Gmail messages " f"(total: {total_messages_fetched}/{estimated_total})" @@ -1357,7 +1503,10 @@ async def _index_composio_google_calendar( # CRITICAL: Update timestamp even when no events found so Electric SQL syncs and UI shows indexed status await update_connector_last_indexed(session, connector, update_last_indexed) await session.commit() - return 0, None # Return None (not error) when no items found - this is success with 0 items + return ( + 0, + None, + ) # Return None (not error) when no items found - this is success with 0 items logger.info(f"Found {len(events)} Google Calendar events to index via Composio") @@ -1368,14 +1517,18 @@ async def _index_composio_google_calendar( try: # Handle both standard Google API and potential Composio variations event_id = event.get("id", "") or event.get("eventId", "") - summary = event.get("summary", "") or event.get("title", "") or "No Title" + summary = ( + event.get("summary", "") or event.get("title", "") or "No Title" + ) if not event_id: documents_skipped += 1 continue # Format to markdown - markdown_content = composio_connector.format_calendar_event_to_markdown(event) + markdown_content = composio_connector.format_calendar_event_to_markdown( + event + ) # Generate unique identifier document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googlecalendar"]) @@ -1413,14 +1566,19 @@ async def _index_composio_google_calendar( "start_time": start_time, "document_type": "Google Calendar Event (Composio)", } - summary_content, summary_embedding = await generate_document_summary( + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( markdown_content, user_llm, document_metadata ) else: summary_content = f"Calendar: {summary}\n\nStart: {start_time}\nEnd: {end_time}" if location: summary_content += f"\nLocation: {location}" - summary_embedding = config.embedding_model_instance.embed(summary_content) + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) chunks = await create_document_chunks(markdown_content) @@ -1441,8 +1599,8 @@ async def _index_composio_google_calendar( existing_document.updated_at = get_current_timestamp() documents_indexed += 1 - - # Batch commit every 10 documents + + # Batch commit every 10 documents if documents_indexed % 10 == 0: logger.info( f"Committing batch: {documents_indexed} Google Calendar events processed so far" @@ -1462,21 +1620,30 @@ async def _index_composio_google_calendar( "start_time": start_time, "document_type": "Google Calendar Event (Composio)", } - summary_content, summary_embedding = await generate_document_summary( + ( + summary_content, + summary_embedding, + ) = await generate_document_summary( markdown_content, user_llm, document_metadata ) else: - summary_content = f"Calendar: {summary}\n\nStart: {start_time}\nEnd: {end_time}" + summary_content = ( + f"Calendar: {summary}\n\nStart: {start_time}\nEnd: {end_time}" + ) if location: summary_content += f"\nLocation: {location}" - summary_embedding = config.embedding_model_instance.embed(summary_content) + summary_embedding = config.embedding_model_instance.embed( + summary_content + ) chunks = await create_document_chunks(markdown_content) document = Document( search_space_id=search_space_id, title=f"Calendar: {summary}", - document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googlecalendar"]), + document_type=DocumentType( + TOOLKIT_TO_DOCUMENT_TYPE["googlecalendar"] + ), document_metadata={ "event_id": event_id, "summary": summary, @@ -1497,7 +1664,7 @@ async def _index_composio_google_calendar( session.add(document) documents_indexed += 1 - # Batch commit every 10 documents + # Batch commit every 10 documents if documents_indexed % 10 == 0: logger.info( f"Committing batch: {documents_indexed} Google Calendar events processed so far" @@ -1535,5 +1702,7 @@ async def _index_composio_google_calendar( return documents_indexed, None except Exception as e: - logger.error(f"Failed to index Google Calendar via Composio: {e!s}", exc_info=True) + logger.error( + f"Failed to index Google Calendar via Composio: {e!s}", exc_info=True + ) return 0, f"Failed to index Google Calendar via Composio: {e!s}" diff --git a/surfsense_backend/app/tasks/connector_indexers/__init__.py b/surfsense_backend/app/tasks/connector_indexers/__init__.py index 35b5fde4c..8f25e6fdd 100644 --- a/surfsense_backend/app/tasks/connector_indexers/__init__.py +++ b/surfsense_backend/app/tasks/connector_indexers/__init__.py @@ -26,6 +26,7 @@ Available indexers: # Calendar and scheduling from .airtable_indexer import index_airtable_records from .bookstack_indexer import index_bookstack_pages + # Note: composio_indexer is imported directly in connector_tasks.py to avoid circular imports from .clickup_indexer import index_clickup_tasks from .confluence_indexer import index_confluence_pages diff --git a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py index f16ee0156..4a8df4918 100644 --- a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py @@ -128,7 +128,9 @@ async def index_github_repos( if github_pat: logger.info("Using GitHub PAT for authentication (private repos supported)") else: - logger.info("No GitHub PAT provided - only public repositories can be indexed") + logger.info( + "No GitHub PAT provided - only public repositories can be indexed" + ) # 3. Initialize GitHub connector with gitingest backend await task_logger.log_task_progress( @@ -308,9 +310,7 @@ async def _process_repository_digest( if existing_document: # Document exists - check if content has changed if existing_document.content_hash == content_hash: - logger.info( - f"Repository {repo_full_name} unchanged. Skipping." - ) + logger.info(f"Repository {repo_full_name} unchanged. Skipping.") return 0 else: logger.info( @@ -341,7 +341,7 @@ async def _process_repository_digest( summary_content = ( f"# Repository: {repo_full_name}\n\n" f"## File Structure\n\n{digest.tree}\n\n" - f"## File Contents (truncated)\n\n{digest.content[:MAX_DIGEST_CHARS - len(digest.tree) - 200]}..." + f"## File Contents (truncated)\n\n{digest.content[: MAX_DIGEST_CHARS - len(digest.tree) - 200]}..." ) summary_text, summary_embedding = await generate_document_summary( @@ -362,9 +362,7 @@ async def _process_repository_digest( # This preserves file-level granularity in search chunks_data = await create_document_chunks(digest.content) except Exception as chunk_err: - logger.error( - f"Failed to chunk repository {repo_full_name}: {chunk_err}" - ) + logger.error(f"Failed to chunk repository {repo_full_name}: {chunk_err}") # Fall back to a simpler chunking approach chunks_data = await _simple_chunk_content(digest.content) diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/composio-config.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/composio-config.tsx index 255d0cef4..fdff956e5 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/composio-config.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/components/composio-config.tsx @@ -211,7 +211,9 @@ export const ComposioConfig: FC = ({ connector, onConfigCha ); } if (selectedFiles.length > 0) { - parts.push(`${selectedFiles.length} file${selectedFiles.length > 1 ? "s" : ""}`); + parts.push( + `${selectedFiles.length} file${selectedFiles.length > 1 ? "s" : ""}` + ); } return parts.length > 0 ? `(${parts.join(" ")})` : ""; })()} @@ -338,7 +340,9 @@ export const ComposioConfig: FC = ({ connector, onConfigCha handleIndexingOptionChange("include_subfolders", checked)} + onCheckedChange={(checked) => + handleIndexingOptionChange("include_subfolders", checked) + } /> diff --git a/surfsense_web/components/assistant-ui/connector-popup/constants/connector-popup.schemas.ts b/surfsense_web/components/assistant-ui/connector-popup/constants/connector-popup.schemas.ts index c7e77f666..5a0a8e8c8 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/constants/connector-popup.schemas.ts +++ b/surfsense_web/components/assistant-ui/connector-popup/constants/connector-popup.schemas.ts @@ -7,7 +7,9 @@ import { searchSourceConnectorTypeEnum } from "@/contracts/types/connector.types export const connectorPopupQueryParamsSchema = z.object({ modal: z.enum(["connectors"]).optional(), tab: z.enum(["all", "active"]).optional(), - view: z.enum(["configure", "edit", "connect", "youtube", "accounts", "mcp-list", "composio"]).optional(), + view: z + .enum(["configure", "edit", "connect", "youtube", "accounts", "mcp-list", "composio"]) + .optional(), connector: z.string().optional(), connectorId: z.string().optional(), connectorType: z.string().optional(), diff --git a/surfsense_web/components/assistant-ui/connector-popup/hooks/use-connector-dialog.ts b/surfsense_web/components/assistant-ui/connector-popup/hooks/use-connector-dialog.ts index 1be8a7983..b30337de3 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/hooks/use-connector-dialog.ts +++ b/surfsense_web/components/assistant-ui/connector-popup/hooks/use-connector-dialog.ts @@ -26,7 +26,11 @@ import { import { cacheKeys } from "@/lib/query-client/cache-keys"; import { queryClient } from "@/lib/query-client/client"; import type { IndexingConfigState } from "../constants/connector-constants"; -import { COMPOSIO_CONNECTORS, OAUTH_CONNECTORS, OTHER_CONNECTORS } from "../constants/connector-constants"; +import { + COMPOSIO_CONNECTORS, + OAUTH_CONNECTORS, + OTHER_CONNECTORS, +} from "../constants/connector-constants"; import { dateRangeSchema, frequencyMinutesSchema, @@ -83,7 +87,6 @@ export const useConnectorDialog = () => { // MCP list view state (for managing multiple MCP connectors) const [viewingMCPList, setViewingMCPList] = useState(false); - // Track if we came from accounts list when entering edit mode const [cameFromAccountsList, setCameFromAccountsList] = useState<{ connectorType: string; @@ -164,16 +167,14 @@ export const useConnectorDialog = () => { // Handle accounts view if (params.view === "accounts" && params.connectorType) { // Update state if not set, or if connectorType has changed - const needsUpdate = !viewingAccountsType || - viewingAccountsType.connectorType !== params.connectorType; - + const needsUpdate = + !viewingAccountsType || viewingAccountsType.connectorType !== params.connectorType; + if (needsUpdate) { // Check both OAUTH_CONNECTORS and COMPOSIO_CONNECTORS - const oauthConnector = OAUTH_CONNECTORS.find( - (c) => c.connectorType === params.connectorType - ) || COMPOSIO_CONNECTORS.find( - (c) => c.connectorType === params.connectorType - ); + const oauthConnector = + OAUTH_CONNECTORS.find((c) => c.connectorType === params.connectorType) || + COMPOSIO_CONNECTORS.find((c) => c.connectorType === params.connectorType); if (oauthConnector) { setViewingAccountsType({ connectorType: oauthConnector.connectorType, @@ -395,11 +396,8 @@ export const useConnectorDialog = () => { // Check if authEndpoint already has query parameters const separator = connector.authEndpoint.includes("?") ? "&" : "?"; const url = `${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL}${connector.authEndpoint}${separator}space_id=${searchSpaceId}`; - - const response = await authenticatedFetch( - url, - { method: "GET" } - ); + + const response = await authenticatedFetch(url, { method: "GET" }); if (!response.ok) { throw new Error(`Failed to initiate ${connector.title} OAuth`); diff --git a/surfsense_web/components/assistant-ui/connector-popup/tabs/all-connectors-tab.tsx b/surfsense_web/components/assistant-ui/connector-popup/tabs/all-connectors-tab.tsx index ffe879d5d..6b38a37d2 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/tabs/all-connectors-tab.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/tabs/all-connectors-tab.tsx @@ -4,7 +4,12 @@ import type { FC } from "react"; import { EnumConnectorName } from "@/contracts/enums/connector"; import type { SearchSourceConnector } from "@/contracts/types/connector.types"; import { ConnectorCard } from "../components/connector-card"; -import { CRAWLERS, OAUTH_CONNECTORS, OTHER_CONNECTORS, COMPOSIO_CONNECTORS } from "../constants/connector-constants"; +import { + CRAWLERS, + OAUTH_CONNECTORS, + OTHER_CONNECTORS, + COMPOSIO_CONNECTORS, +} from "../constants/connector-constants"; import { getDocumentCountForConnector } from "../utils/connector-document-mapping"; /** @@ -28,7 +33,9 @@ interface AllConnectorsTabProps { allConnectors: SearchSourceConnector[] | undefined; documentTypeCounts?: Record; indexingConnectorIds?: Set; - onConnectOAuth: (connector: (typeof OAUTH_CONNECTORS)[number] | (typeof COMPOSIO_CONNECTORS)[number]) => void; + onConnectOAuth: ( + connector: (typeof OAUTH_CONNECTORS)[number] | (typeof COMPOSIO_CONNECTORS)[number] + ) => void; onConnectNonOAuth?: (connectorType: string) => void; onCreateWebcrawler?: () => void; onCreateYouTubeCrawler?: () => void; @@ -82,7 +89,9 @@ export const AllConnectorsTab: FC = ({ {filteredComposio.length > 0 && (
-

Managed OAuth (Composio)

+

+ Managed OAuth (Composio) +

{filteredComposio.map((connector) => { @@ -99,7 +108,6 @@ export const AllConnectorsTab: FC = ({ const accountCount = typeConnectors.length; - const documentCount = getDocumentCountForConnector( connector.connectorType, documentTypeCounts @@ -154,7 +162,6 @@ export const AllConnectorsTab: FC = ({ const accountCount = typeConnectors.length; - const documentCount = getDocumentCountForConnector( connector.connectorType, documentTypeCounts diff --git a/surfsense_web/components/connectors/composio-drive-folder-tree.tsx b/surfsense_web/components/connectors/composio-drive-folder-tree.tsx index 72c36edd5..76ae218cb 100644 --- a/surfsense_web/components/connectors/composio-drive-folder-tree.tsx +++ b/surfsense_web/components/connectors/composio-drive-folder-tree.tsx @@ -362,4 +362,3 @@ export function ComposioDriveFolderTree({
); } - diff --git a/surfsense_web/hooks/use-composio-drive-folders.ts b/surfsense_web/hooks/use-composio-drive-folders.ts index af8da1a81..31e516286 100644 --- a/surfsense_web/hooks/use-composio-drive-folders.ts +++ b/surfsense_web/hooks/use-composio-drive-folders.ts @@ -26,4 +26,3 @@ export function useComposioDriveFolders({ retry: 2, }); } -