feat: streamline Composio connector logic by removing redundant checks and enhancing email retrieval for user accounts

This commit is contained in:
Anish Sarkar 2026-02-04 03:03:40 +05:30
parent 65b79f3705
commit 30c6f42102
4 changed files with 221 additions and 190 deletions

View file

@ -4,6 +4,7 @@ Composio Google Drive Connector Module.
Provides Google Drive specific methods for data retrieval and indexing via Composio.
"""
import contextlib
import hashlib
import json
import logging
@ -321,14 +322,6 @@ async def _process_file_content(
# Check if this is a binary file based on extension or MIME type
is_binary = _is_binary_file(file_name, mime_type)
# Content-based binary detection as fallback
# This catches PDFs and other binary files even if MIME type is missing/incorrect
if not is_binary and content:
has_pdf_magic = content[:4] == b"%PDF"
has_null_bytes = b"\x00" in content[:1000]
if has_pdf_magic or has_null_bytes:
is_binary = True
if is_binary:
# Use ETL service for binary files (PDF, Office docs, etc.)
temp_file_path = None
@ -363,10 +356,8 @@ async def _process_file_content(
finally:
# Cleanup temp file
if temp_file_path and os.path.exists(temp_file_path):
try:
with contextlib.suppress(Exception):
os.unlink(temp_file_path)
except Exception:
pass
else:
# Text file - try to decode as UTF-8
try:

View file

@ -42,10 +42,6 @@ from app.utils.connector_naming import (
)
from app.utils.oauth_security import OAuthStateManager
# Note: We no longer use check_duplicate_connector for Composio connectors because
# Composio generates a new connected_account_id each time, even for the same Google account.
# Instead, we check for existing connectors by type/space/user and update them.
logger = logging.getLogger(__name__)
router = APIRouter()
@ -256,11 +252,6 @@ async def composio_callback(
"connectedAccountId"
) or query_params.get("connected_account_id")
# DEBUG: Log query parameter received
logger.info(
f"DEBUG: Callback received - connectedAccountId: {query_params.get('connectedAccountId')}, connected_account_id: {query_params.get('connected_account_id')}, using: {final_connected_account_id}"
)
# If we still don't have a connected_account_id, warn but continue
# (the connector will be created but indexing won't work until updated)
if not final_connected_account_id:
@ -273,6 +264,9 @@ async def composio_callback(
f"Successfully got connected_account_id: {final_connected_account_id}"
)
# Build entity_id for Composio API calls (same format as used in initiate)
entity_id = f"surfsense_{user_id}"
# Build connector config
connector_config = {
"composio_connected_account_id": final_connected_account_id,
@ -290,20 +284,51 @@ async def composio_callback(
)
connector_type = SearchSourceConnectorType(connector_type_str)
# Check for existing connector of the same type for this user/space
# When reconnecting, Composio gives a new connected_account_id, so we need to
# check by connector_type, user_id, and search_space_id instead of connected_account_id
# Get the base name for this connector type (e.g., "Google Drive", "Gmail")
base_name = get_base_name_for_type(connector_type)
# FIRST: Get the email for this connected account
# This is needed to determine if it's a reconnection (same email) or new account
email = None
try:
email = await service.get_connected_account_email(
connected_account_id=final_connected_account_id,
entity_id=entity_id,
toolkit_id=toolkit_id,
)
if email:
logger.info(f"Retrieved email {email} for {toolkit_id} connector")
except Exception as email_error:
logger.warning(f"Could not get email for connector: {email_error!s}")
# Generate the connector name (with email if available)
# Format: "Gmail (Composio) - john@gmail.com" or "Gmail (Composio) 1" if no email
if email:
connector_name = f"{base_name} (Composio) - {email}"
else:
# Fallback to generic naming if email not available
count = await count_connectors_of_type(
session, connector_type, space_id, user_id
)
if count == 0:
connector_name = f"{base_name} (Composio) 1"
else:
connector_name = f"{base_name} (Composio) {count + 1}"
# Check if a connector with this SAME name already exists (reconnection case)
# This allows multiple accounts (different emails) while supporting reconnection
existing_connector_result = await session.execute(
select(SearchSourceConnector).where(
SearchSourceConnector.connector_type == connector_type,
SearchSourceConnector.search_space_id == space_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.name == connector_name,
)
)
existing_connector = existing_connector_result.scalars().first()
if existing_connector:
# Delete the old Composio connected account before updating
# This is a RECONNECTION of the same account - update existing connector
old_connected_account_id = existing_connector.config.get(
"composio_connected_account_id"
)
@ -320,22 +345,16 @@ async def composio_callback(
f"Deleted old Composio connected account {old_connected_account_id} "
f"before updating connector {existing_connector.id}"
)
else:
logger.warning(
f"Failed to delete old Composio connected account {old_connected_account_id}"
)
except Exception as delete_error:
# Log but don't fail - the old account may already be deleted
logger.warning(
f"Error deleting old Composio connected account {old_connected_account_id}: {delete_error!s}"
)
# Update existing connector with new connected_account_id
# IMPORTANT: Merge new credentials with existing config to preserve
# user settings like selected_folders, selected_files, indexing_options,
# drive_page_token, etc. that would otherwise be wiped on reconnection.
# Merge new credentials with existing config to preserve user settings
logger.info(
f"Updating existing Composio connector {existing_connector.id} with new connected_account_id {final_connected_account_id}"
f"Reconnecting existing Composio connector {existing_connector.id} ({connector_name}) "
f"with new connected_account_id {final_connected_account_id}"
)
existing_config = (
existing_connector.config.copy() if existing_connector.config else {}
@ -347,28 +366,16 @@ async def composio_callback(
await session.commit()
await session.refresh(existing_connector)
# Get the frontend connector ID based on toolkit_id
frontend_connector_id = TOOLKIT_TO_FRONTEND_CONNECTOR_ID.get(
toolkit_id, "composio-connector"
)
return RedirectResponse(
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&success=true&connector={frontend_connector_id}&connectorId={existing_connector.id}"
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&success=true&connector={frontend_connector_id}&connectorId={existing_connector.id}&view=configure"
)
# This is a NEW account - create a new connector
try:
# Count existing connectors of this type to determine the number
count = await count_connectors_of_type(
session, connector_type, space_id, user_id
)
# Generate base name (e.g., "Gmail", "Google Drive")
base_name = get_base_name_for_type(connector_type)
# Format: "Gmail (Composio) 1", "Gmail (Composio) 2", etc.
if count == 0:
connector_name = f"{base_name} (Composio) 1"
else:
connector_name = f"{base_name} (Composio) {count + 1}"
logger.info(f"Creating new Composio connector: {connector_name}")
db_connector = SearchSourceConnector(
name=connector_name,
@ -392,7 +399,7 @@ async def composio_callback(
toolkit_id, "composio-connector"
)
return RedirectResponse(
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&success=true&connector={frontend_connector_id}&connectorId={db_connector.id}"
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&success=true&connector={frontend_connector_id}&connectorId={db_connector.id}&view=configure"
)
except IntegrityError as e:

View file

@ -15,17 +15,6 @@ from app.config import config
logger = logging.getLogger(__name__)
# Mapping of toolkit IDs to their Composio auth config IDs
# These use Composio's managed OAuth (no custom credentials needed)
COMPOSIO_TOOLKIT_AUTH_CONFIGS = {
"googledrive": "default", # Uses Composio's managed Google OAuth
"gmail": "default",
"googlecalendar": "default",
"slack": "default",
"notion": "default",
"github": "default",
}
# Mapping of toolkit IDs to their display names
COMPOSIO_TOOLKIT_NAMES = {
"googledrive": "Google Drive",
@ -234,134 +223,6 @@ class ComposioService:
logger.error(f"Failed to initiate Composio connection: {e!s}")
raise
async def get_connected_account(
self, connected_account_id: str
) -> dict[str, Any] | None:
"""
Get details of a connected account.
Args:
connected_account_id: The Composio connected account ID.
Returns:
Connected account details or None if not found.
"""
try:
# Pass connected_account_id as positional argument (not keyword)
account = self.client.connected_accounts.get(connected_account_id)
return {
"id": account.id,
"status": getattr(account, "status", None),
"toolkit": getattr(account, "toolkit", None),
"user_id": getattr(account, "user_id", None),
}
except Exception as e:
logger.error(
f"Failed to get connected account {connected_account_id}: {e!s}"
)
return None
async def list_all_connections(self) -> list[dict[str, Any]]:
"""
List ALL connected accounts (for debugging).
Returns:
List of all connected account details.
"""
try:
accounts_response = self.client.connected_accounts.list()
if hasattr(accounts_response, "items"):
accounts = accounts_response.items
elif hasattr(accounts_response, "__iter__"):
accounts = accounts_response
else:
logger.warning(
f"Unexpected accounts response type: {type(accounts_response)}"
)
return []
result = []
for acc in accounts:
toolkit_raw = getattr(acc, "toolkit", None)
toolkit_info = None
if toolkit_raw:
if isinstance(toolkit_raw, str):
toolkit_info = toolkit_raw
elif hasattr(toolkit_raw, "slug"):
toolkit_info = toolkit_raw.slug
elif hasattr(toolkit_raw, "name"):
toolkit_info = toolkit_raw.name
else:
toolkit_info = str(toolkit_raw)
result.append(
{
"id": acc.id,
"status": getattr(acc, "status", None),
"toolkit": toolkit_info,
"user_id": getattr(acc, "user_id", None),
}
)
return result
except Exception as e:
logger.error(f"Failed to list all connections: {e!s}")
return []
async def list_user_connections(self, user_id: str) -> list[dict[str, Any]]:
"""
List all connected accounts for a user.
Args:
user_id: The user's unique identifier.
Returns:
List of connected account details.
"""
try:
accounts_response = self.client.connected_accounts.list(user_id=user_id)
# Handle paginated response (may have .items attribute) or direct list
if hasattr(accounts_response, "items"):
accounts = accounts_response.items
elif hasattr(accounts_response, "__iter__"):
accounts = accounts_response
else:
logger.warning(
f"Unexpected accounts response type: {type(accounts_response)}"
)
return []
result = []
for acc in accounts:
# Extract toolkit info - might be string or object
toolkit_raw = getattr(acc, "toolkit", None)
toolkit_info = None
if toolkit_raw:
if isinstance(toolkit_raw, str):
toolkit_info = toolkit_raw
elif hasattr(toolkit_raw, "slug"):
toolkit_info = toolkit_raw.slug
elif hasattr(toolkit_raw, "name"):
toolkit_info = toolkit_raw.name
else:
toolkit_info = toolkit_raw
result.append(
{
"id": acc.id,
"status": getattr(acc, "status", None),
"toolkit": toolkit_info,
}
)
logger.info(f"Found {len(result)} connections for user {user_id}: {result}")
return result
except Exception as e:
logger.error(f"Failed to list connections for user {user_id}: {e!s}")
return []
async def delete_connected_account(self, connected_account_id: str) -> bool:
"""
Delete a connected account from Composio.
@ -1018,6 +879,178 @@ class ComposioService:
logger.error(f"Failed to list Calendar events: {e!s}")
return [], str(e)
# ===== User Info Methods =====
async def get_connected_account_email(
self,
connected_account_id: str,
entity_id: str,
toolkit_id: str,
) -> str | None:
"""
Get the email address associated with a connected account.
Uses toolkit-specific API calls:
- Google Drive: List files and extract owner email
- Gmail: Get user profile
- Google Calendar: List events and extract organizer/creator email
Args:
connected_account_id: Composio connected account ID.
entity_id: The entity/user ID that owns the connected account.
toolkit_id: The toolkit identifier (googledrive, gmail, googlecalendar).
Returns:
Email address string or None if not available.
"""
try:
email = await self._extract_email_for_toolkit(
connected_account_id, entity_id, toolkit_id
)
if email:
logger.info(f"Retrieved email {email} for {toolkit_id} connector")
else:
logger.warning(f"Could not retrieve email for {toolkit_id} connector")
return email
except Exception as e:
logger.error(f"Failed to get email for {toolkit_id} connector: {e!s}")
return None
async def _extract_email_for_toolkit(
self,
connected_account_id: str,
entity_id: str,
toolkit_id: str,
) -> str | None:
"""Extract email based on toolkit type."""
if toolkit_id == "googledrive":
return await self._get_drive_owner_email(connected_account_id, entity_id)
elif toolkit_id == "gmail":
return await self._get_gmail_profile_email(connected_account_id, entity_id)
elif toolkit_id == "googlecalendar":
return await self._get_calendar_user_email(connected_account_id, entity_id)
return None
async def _get_drive_owner_email(
self, connected_account_id: str, entity_id: str
) -> str | None:
"""Get email from Google Drive file owner where me=True."""
# List files owned by the user and find one where owner.me=True
result = await self.execute_tool(
connected_account_id=connected_account_id,
tool_name="GOOGLEDRIVE_LIST_FILES",
params={
"page_size": 10,
"fields": "files(owners)",
"q": "'me' in owners", # Only files owned by current user
},
entity_id=entity_id,
)
if not result.get("success"):
return None
data = result.get("data", {})
if not isinstance(data, dict):
return None
files = data.get("files") or data.get("data", {}).get("files", [])
for file in files:
owners = file.get("owners", [])
for owner in owners:
# Only return email if this is the current user (me=True)
if owner.get("me") and owner.get("emailAddress"):
return owner.get("emailAddress")
return None
async def _get_gmail_profile_email(
self, connected_account_id: str, entity_id: str
) -> str | None:
"""Get email from Gmail profile."""
result = await self.execute_tool(
connected_account_id=connected_account_id,
tool_name="GMAIL_GET_PROFILE",
params={},
entity_id=entity_id,
)
if not result.get("success"):
return None
data = result.get("data", {})
if not isinstance(data, dict):
return None
return data.get("emailAddress") or data.get("data", {}).get("emailAddress")
async def _get_calendar_user_email(
self, connected_account_id: str, entity_id: str
) -> str | None:
"""Get email from Google Calendar primary calendar or event organizer/creator."""
# Method 1: Get primary calendar - the "summary" field is the user's email
result = await self.execute_tool(
connected_account_id=connected_account_id,
tool_name="GOOGLECALENDAR_GET_CALENDAR",
params={"calendar_id": "primary"},
entity_id=entity_id,
)
if result.get("success"):
data = result.get("data", {})
if isinstance(data, dict):
# Handle nested structure: data['data']['calendar_data']['summary']
calendar_data = (
data.get("data", {}).get("calendar_data", {})
if isinstance(data.get("data"), dict)
else {}
)
summary = (
calendar_data.get("summary")
or calendar_data.get("id")
or data.get("data", {}).get("summary")
or data.get("summary")
)
if summary and "@" in summary:
return summary
# Method 2: Fallback - list events to get calendar summary (owner's email)
result = await self.execute_tool(
connected_account_id=connected_account_id,
tool_name="GOOGLECALENDAR_EVENTS_LIST",
params={"max_results": 20},
entity_id=entity_id,
)
if not result.get("success"):
return None
data = result.get("data", {})
if not isinstance(data, dict):
return None
# The events list response contains 'summary' which is the calendar owner's email
nested_data = data.get("data", {}) if isinstance(data.get("data"), dict) else {}
summary = nested_data.get("summary") or data.get("summary")
if summary and "@" in summary:
return summary
# Method 3: Check event organizers/creators
items = nested_data.get("items", []) or data.get("items", [])
for event in items:
organizer = event.get("organizer", {})
if organizer.get("self"):
return organizer.get("email")
creator = event.get("creator", {})
if creator.get("self"):
return creator.get("email")
return None
# Singleton instance
_composio_service: ComposioService | None = None

View file

@ -166,8 +166,8 @@ async def _delete_connector_async(
user_id=UUID(user_id),
search_space_id=search_space_id,
type="connector_deletion",
title=f"{connector_name} Removed",
message=f"Connector and {total_deleted} {doc_text} have been removed from your knowledge base.",
title=f"{connector_name} removed",
message=f"Cleanup complete. {total_deleted} {doc_text} removed.",
notification_metadata={
"connector_id": connector_id,
"connector_name": connector_name,