diff --git a/surfsense_backend/app/connectors/composio_google_drive_connector.py b/surfsense_backend/app/connectors/composio_google_drive_connector.py index 93d7f360a..e71b4ef52 100644 --- a/surfsense_backend/app/connectors/composio_google_drive_connector.py +++ b/surfsense_backend/app/connectors/composio_google_drive_connector.py @@ -4,6 +4,7 @@ Composio Google Drive Connector Module. Provides Google Drive specific methods for data retrieval and indexing via Composio. """ +import contextlib import hashlib import json import logging @@ -321,14 +322,6 @@ async def _process_file_content( # Check if this is a binary file based on extension or MIME type is_binary = _is_binary_file(file_name, mime_type) - # Content-based binary detection as fallback - # This catches PDFs and other binary files even if MIME type is missing/incorrect - if not is_binary and content: - has_pdf_magic = content[:4] == b"%PDF" - has_null_bytes = b"\x00" in content[:1000] - if has_pdf_magic or has_null_bytes: - is_binary = True - if is_binary: # Use ETL service for binary files (PDF, Office docs, etc.) temp_file_path = None @@ -363,10 +356,8 @@ async def _process_file_content( finally: # Cleanup temp file if temp_file_path and os.path.exists(temp_file_path): - try: + with contextlib.suppress(Exception): os.unlink(temp_file_path) - except Exception: - pass else: # Text file - try to decode as UTF-8 try: diff --git a/surfsense_backend/app/routes/composio_routes.py b/surfsense_backend/app/routes/composio_routes.py index 602aa876c..e0c6c1f65 100644 --- a/surfsense_backend/app/routes/composio_routes.py +++ b/surfsense_backend/app/routes/composio_routes.py @@ -42,10 +42,6 @@ from app.utils.connector_naming import ( ) from app.utils.oauth_security import OAuthStateManager -# Note: We no longer use check_duplicate_connector for Composio connectors because -# Composio generates a new connected_account_id each time, even for the same Google account. -# Instead, we check for existing connectors by type/space/user and update them. - logger = logging.getLogger(__name__) router = APIRouter() @@ -256,11 +252,6 @@ async def composio_callback( "connectedAccountId" ) or query_params.get("connected_account_id") - # DEBUG: Log query parameter received - logger.info( - f"DEBUG: Callback received - connectedAccountId: {query_params.get('connectedAccountId')}, connected_account_id: {query_params.get('connected_account_id')}, using: {final_connected_account_id}" - ) - # If we still don't have a connected_account_id, warn but continue # (the connector will be created but indexing won't work until updated) if not final_connected_account_id: @@ -273,6 +264,9 @@ async def composio_callback( f"Successfully got connected_account_id: {final_connected_account_id}" ) + # Build entity_id for Composio API calls (same format as used in initiate) + entity_id = f"surfsense_{user_id}" + # Build connector config connector_config = { "composio_connected_account_id": final_connected_account_id, @@ -290,20 +284,51 @@ async def composio_callback( ) connector_type = SearchSourceConnectorType(connector_type_str) - # Check for existing connector of the same type for this user/space - # When reconnecting, Composio gives a new connected_account_id, so we need to - # check by connector_type, user_id, and search_space_id instead of connected_account_id + # Get the base name for this connector type (e.g., "Google Drive", "Gmail") + base_name = get_base_name_for_type(connector_type) + + # FIRST: Get the email for this connected account + # This is needed to determine if it's a reconnection (same email) or new account + email = None + try: + email = await service.get_connected_account_email( + connected_account_id=final_connected_account_id, + entity_id=entity_id, + toolkit_id=toolkit_id, + ) + if email: + logger.info(f"Retrieved email {email} for {toolkit_id} connector") + except Exception as email_error: + logger.warning(f"Could not get email for connector: {email_error!s}") + + # Generate the connector name (with email if available) + # Format: "Gmail (Composio) - john@gmail.com" or "Gmail (Composio) 1" if no email + if email: + connector_name = f"{base_name} (Composio) - {email}" + else: + # Fallback to generic naming if email not available + count = await count_connectors_of_type( + session, connector_type, space_id, user_id + ) + if count == 0: + connector_name = f"{base_name} (Composio) 1" + else: + connector_name = f"{base_name} (Composio) {count + 1}" + + # Check if a connector with this SAME name already exists (reconnection case) + # This allows multiple accounts (different emails) while supporting reconnection existing_connector_result = await session.execute( select(SearchSourceConnector).where( SearchSourceConnector.connector_type == connector_type, SearchSourceConnector.search_space_id == space_id, SearchSourceConnector.user_id == user_id, + SearchSourceConnector.name == connector_name, ) ) existing_connector = existing_connector_result.scalars().first() if existing_connector: - # Delete the old Composio connected account before updating + # This is a RECONNECTION of the same account - update existing connector old_connected_account_id = existing_connector.config.get( "composio_connected_account_id" ) @@ -320,22 +345,16 @@ async def composio_callback( f"Deleted old Composio connected account {old_connected_account_id} " f"before updating connector {existing_connector.id}" ) - else: - logger.warning( - f"Failed to delete old Composio connected account {old_connected_account_id}" - ) except Exception as delete_error: - # Log but don't fail - the old account may already be deleted logger.warning( f"Error deleting old Composio connected account {old_connected_account_id}: {delete_error!s}" ) # Update existing connector with new connected_account_id - # IMPORTANT: Merge new credentials with existing config to preserve - # user settings like selected_folders, selected_files, indexing_options, - # drive_page_token, etc. that would otherwise be wiped on reconnection. + # Merge new credentials with existing config to preserve user settings logger.info( - f"Updating existing Composio connector {existing_connector.id} with new connected_account_id {final_connected_account_id}" + f"Reconnecting existing Composio connector {existing_connector.id} ({connector_name}) " + f"with new connected_account_id {final_connected_account_id}" ) existing_config = ( existing_connector.config.copy() if existing_connector.config else {} @@ -347,28 +366,16 @@ async def composio_callback( await session.commit() await session.refresh(existing_connector) - # Get the frontend connector ID based on toolkit_id frontend_connector_id = TOOLKIT_TO_FRONTEND_CONNECTOR_ID.get( toolkit_id, "composio-connector" ) return RedirectResponse( - url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&success=true&connector={frontend_connector_id}&connectorId={existing_connector.id}" + url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&success=true&connector={frontend_connector_id}&connectorId={existing_connector.id}&view=configure" ) + # This is a NEW account - create a new connector try: - # Count existing connectors of this type to determine the number - count = await count_connectors_of_type( - session, connector_type, space_id, user_id - ) - - # Generate base name (e.g., "Gmail", "Google Drive") - base_name = get_base_name_for_type(connector_type) - - # Format: "Gmail (Composio) 1", "Gmail (Composio) 2", etc. - if count == 0: - connector_name = f"{base_name} (Composio) 1" - else: - connector_name = f"{base_name} (Composio) {count + 1}" + logger.info(f"Creating new Composio connector: {connector_name}") db_connector = SearchSourceConnector( name=connector_name, @@ -392,7 +399,7 @@ async def composio_callback( toolkit_id, "composio-connector" ) return RedirectResponse( - url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&success=true&connector={frontend_connector_id}&connectorId={db_connector.id}" + url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&success=true&connector={frontend_connector_id}&connectorId={db_connector.id}&view=configure" ) except IntegrityError as e: diff --git a/surfsense_backend/app/services/composio_service.py b/surfsense_backend/app/services/composio_service.py index 2aedfcb39..763347d5a 100644 --- a/surfsense_backend/app/services/composio_service.py +++ b/surfsense_backend/app/services/composio_service.py @@ -15,17 +15,6 @@ from app.config import config logger = logging.getLogger(__name__) -# Mapping of toolkit IDs to their Composio auth config IDs -# These use Composio's managed OAuth (no custom credentials needed) -COMPOSIO_TOOLKIT_AUTH_CONFIGS = { - "googledrive": "default", # Uses Composio's managed Google OAuth - "gmail": "default", - "googlecalendar": "default", - "slack": "default", - "notion": "default", - "github": "default", -} - # Mapping of toolkit IDs to their display names COMPOSIO_TOOLKIT_NAMES = { "googledrive": "Google Drive", @@ -234,134 +223,6 @@ class ComposioService: logger.error(f"Failed to initiate Composio connection: {e!s}") raise - async def get_connected_account( - self, connected_account_id: str - ) -> dict[str, Any] | None: - """ - Get details of a connected account. - - Args: - connected_account_id: The Composio connected account ID. - - Returns: - Connected account details or None if not found. - """ - try: - # Pass connected_account_id as positional argument (not keyword) - account = self.client.connected_accounts.get(connected_account_id) - return { - "id": account.id, - "status": getattr(account, "status", None), - "toolkit": getattr(account, "toolkit", None), - "user_id": getattr(account, "user_id", None), - } - except Exception as e: - logger.error( - f"Failed to get connected account {connected_account_id}: {e!s}" - ) - return None - - async def list_all_connections(self) -> list[dict[str, Any]]: - """ - List ALL connected accounts (for debugging). - - Returns: - List of all connected account details. - """ - try: - accounts_response = self.client.connected_accounts.list() - - if hasattr(accounts_response, "items"): - accounts = accounts_response.items - elif hasattr(accounts_response, "__iter__"): - accounts = accounts_response - else: - logger.warning( - f"Unexpected accounts response type: {type(accounts_response)}" - ) - return [] - - result = [] - for acc in accounts: - toolkit_raw = getattr(acc, "toolkit", None) - toolkit_info = None - if toolkit_raw: - if isinstance(toolkit_raw, str): - toolkit_info = toolkit_raw - elif hasattr(toolkit_raw, "slug"): - toolkit_info = toolkit_raw.slug - elif hasattr(toolkit_raw, "name"): - toolkit_info = toolkit_raw.name - else: - toolkit_info = str(toolkit_raw) - - result.append( - { - "id": acc.id, - "status": getattr(acc, "status", None), - "toolkit": toolkit_info, - "user_id": getattr(acc, "user_id", None), - } - ) - - return result - except Exception as e: - logger.error(f"Failed to list all connections: {e!s}") - return [] - - async def list_user_connections(self, user_id: str) -> list[dict[str, Any]]: - """ - List all connected accounts for a user. - - Args: - user_id: The user's unique identifier. - - Returns: - List of connected account details. - """ - try: - accounts_response = self.client.connected_accounts.list(user_id=user_id) - - # Handle paginated response (may have .items attribute) or direct list - if hasattr(accounts_response, "items"): - accounts = accounts_response.items - elif hasattr(accounts_response, "__iter__"): - accounts = accounts_response - else: - logger.warning( - f"Unexpected accounts response type: {type(accounts_response)}" - ) - return [] - - result = [] - for acc in accounts: - # Extract toolkit info - might be string or object - toolkit_raw = getattr(acc, "toolkit", None) - toolkit_info = None - if toolkit_raw: - if isinstance(toolkit_raw, str): - toolkit_info = toolkit_raw - elif hasattr(toolkit_raw, "slug"): - toolkit_info = toolkit_raw.slug - elif hasattr(toolkit_raw, "name"): - toolkit_info = toolkit_raw.name - else: - toolkit_info = toolkit_raw - - result.append( - { - "id": acc.id, - "status": getattr(acc, "status", None), - "toolkit": toolkit_info, - } - ) - - logger.info(f"Found {len(result)} connections for user {user_id}: {result}") - return result - except Exception as e: - logger.error(f"Failed to list connections for user {user_id}: {e!s}") - return [] - async def delete_connected_account(self, connected_account_id: str) -> bool: """ Delete a connected account from Composio. @@ -1018,6 +879,178 @@ class ComposioService: logger.error(f"Failed to list Calendar events: {e!s}") return [], str(e) + # ===== User Info Methods ===== + + async def get_connected_account_email( + self, + connected_account_id: str, + entity_id: str, + toolkit_id: str, + ) -> str | None: + """ + Get the email address associated with a connected account. + + Uses toolkit-specific API calls: + - Google Drive: List files and extract owner email + - Gmail: Get user profile + - Google Calendar: List events and extract organizer/creator email + + Args: + connected_account_id: Composio connected account ID. + entity_id: The entity/user ID that owns the connected account. + toolkit_id: The toolkit identifier (googledrive, gmail, googlecalendar). + + Returns: + Email address string or None if not available. + """ + try: + email = await self._extract_email_for_toolkit( + connected_account_id, entity_id, toolkit_id + ) + + if email: + logger.info(f"Retrieved email {email} for {toolkit_id} connector") + else: + logger.warning(f"Could not retrieve email for {toolkit_id} connector") + + return email + + except Exception as e: + logger.error(f"Failed to get email for {toolkit_id} connector: {e!s}") + return None + + async def _extract_email_for_toolkit( + self, + connected_account_id: str, + entity_id: str, + toolkit_id: str, + ) -> str | None: + """Extract email based on toolkit type.""" + if toolkit_id == "googledrive": + return await self._get_drive_owner_email(connected_account_id, entity_id) + elif toolkit_id == "gmail": + return await self._get_gmail_profile_email(connected_account_id, entity_id) + elif toolkit_id == "googlecalendar": + return await self._get_calendar_user_email(connected_account_id, entity_id) + return None + + async def _get_drive_owner_email( + self, connected_account_id: str, entity_id: str + ) -> str | None: + """Get email from Google Drive file owner where me=True.""" + # List files owned by the user and find one where owner.me=True + result = await self.execute_tool( + connected_account_id=connected_account_id, + tool_name="GOOGLEDRIVE_LIST_FILES", + params={ + "page_size": 10, + "fields": "files(owners)", + "q": "'me' in owners", # Only files owned by current user + }, + entity_id=entity_id, + ) + + if not result.get("success"): + return None + + data = result.get("data", {}) + if not isinstance(data, dict): + return None + + files = data.get("files") or data.get("data", {}).get("files", []) + for file in files: + owners = file.get("owners", []) + for owner in owners: + # Only return email if this is the current user (me=True) + if owner.get("me") and owner.get("emailAddress"): + return owner.get("emailAddress") + + return None + + async def _get_gmail_profile_email( + self, connected_account_id: str, entity_id: str + ) -> str | None: + """Get email from Gmail profile.""" + result = await self.execute_tool( + connected_account_id=connected_account_id, + tool_name="GMAIL_GET_PROFILE", + params={}, + entity_id=entity_id, + ) + + if not result.get("success"): + return None + + data = result.get("data", {}) + if not isinstance(data, dict): + return None + + return data.get("emailAddress") or data.get("data", {}).get("emailAddress") + + async def _get_calendar_user_email( + self, connected_account_id: str, entity_id: str + ) -> str | None: + """Get email from Google Calendar primary calendar or event organizer/creator.""" + # Method 1: Get primary calendar - the "summary" field is the user's email + result = await self.execute_tool( + connected_account_id=connected_account_id, + tool_name="GOOGLECALENDAR_GET_CALENDAR", + params={"calendar_id": "primary"}, + entity_id=entity_id, + ) + + if result.get("success"): + data = result.get("data", {}) + if isinstance(data, dict): + # Handle nested structure: data['data']['calendar_data']['summary'] + calendar_data = ( + data.get("data", {}).get("calendar_data", {}) + if isinstance(data.get("data"), dict) + else {} + ) + summary = ( + calendar_data.get("summary") + or calendar_data.get("id") + or data.get("data", {}).get("summary") + or data.get("summary") + ) + if summary and "@" in summary: + return summary + + # Method 2: Fallback - list events to get calendar summary (owner's email) + result = await self.execute_tool( + connected_account_id=connected_account_id, + tool_name="GOOGLECALENDAR_EVENTS_LIST", + params={"max_results": 20}, + entity_id=entity_id, + ) + + if not result.get("success"): + return None + + data = result.get("data", {}) + if not isinstance(data, dict): + return None + + # The events list response contains 'summary' which is the calendar owner's email + nested_data = data.get("data", {}) if isinstance(data.get("data"), dict) else {} + summary = nested_data.get("summary") or data.get("summary") + if summary and "@" in summary: + return summary + + # Method 3: Check event organizers/creators + items = nested_data.get("items", []) or data.get("items", []) + for event in items: + organizer = event.get("organizer", {}) + if organizer.get("self"): + return organizer.get("email") + + creator = event.get("creator", {}) + if creator.get("self"): + return creator.get("email") + + return None + # Singleton instance _composio_service: ComposioService | None = None diff --git a/surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py b/surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py index 0fd68637c..e024aca29 100644 --- a/surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py +++ b/surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py @@ -166,8 +166,8 @@ async def _delete_connector_async( user_id=UUID(user_id), search_space_id=search_space_id, type="connector_deletion", - title=f"{connector_name} Removed", - message=f"Connector and {total_deleted} {doc_text} have been removed from your knowledge base.", + title=f"{connector_name} removed", + message=f"Cleanup complete. {total_deleted} {doc_text} removed.", notification_metadata={ "connector_id": connector_id, "connector_name": connector_name,