feat: add Composio connector types and enhance integration

- Introduced new enum values for Composio connectors: COMPOSIO_GOOGLE_DRIVE_CONNECTOR, COMPOSIO_GMAIL_CONNECTOR, and COMPOSIO_GOOGLE_CALENDAR_CONNECTOR.
- Updated database migration to add these new enum values to the relevant types.
- Refactored Composio integration logic to handle specific connector types, improving the management of connected accounts and indexing processes.
- Enhanced frontend components to support the new Composio connector types, including updated UI elements and connector configuration handling.
- Improved backend services to manage Composio connected accounts more effectively, including deletion and indexing tasks.
This commit is contained in:
Anish Sarkar 2026-01-22 22:33:28 +05:30
parent 3a1fa25a6f
commit be5715cfeb
19 changed files with 437 additions and 277 deletions

View file

@ -1,16 +1,21 @@
"""Add COMPOSIO_CONNECTOR to SearchSourceConnectorType and DocumentType enums
"""Add Composio connector types to SearchSourceConnectorType and DocumentType enums
Revision ID: 74
Revises: 73
Create Date: 2026-01-21
This migration adds the COMPOSIO_CONNECTOR enum value to both:
This migration adds the Composio connector enum values to both:
- searchsourceconnectortype (for connector type tracking)
- documenttype (for document type tracking)
Composio is a managed OAuth integration service that allows connecting
to various third-party services (Google Drive, Gmail, Calendar, etc.)
without requiring separate OAuth app verification.
This migration adds three specific connector types:
- COMPOSIO_GOOGLE_DRIVE_CONNECTOR
- COMPOSIO_GMAIL_CONNECTOR
- COMPOSIO_GOOGLE_CALENDAR_CONNECTOR
"""
from collections.abc import Sequence
@ -23,55 +28,65 @@ down_revision: str | None = "73"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
# Define the ENUM type names and the new value
# Define the ENUM type names and the new values
CONNECTOR_ENUM = "searchsourceconnectortype"
CONNECTOR_NEW_VALUE = "COMPOSIO_CONNECTOR"
CONNECTOR_NEW_VALUES = [
"COMPOSIO_GOOGLE_DRIVE_CONNECTOR",
"COMPOSIO_GMAIL_CONNECTOR",
"COMPOSIO_GOOGLE_CALENDAR_CONNECTOR",
]
DOCUMENT_ENUM = "documenttype"
DOCUMENT_NEW_VALUE = "COMPOSIO_CONNECTOR"
DOCUMENT_NEW_VALUES = [
"COMPOSIO_GOOGLE_DRIVE_CONNECTOR",
"COMPOSIO_GMAIL_CONNECTOR",
"COMPOSIO_GOOGLE_CALENDAR_CONNECTOR",
]
def upgrade() -> None:
"""Upgrade schema - add COMPOSIO_CONNECTOR to connector and document enums safely."""
# Add COMPOSIO_CONNECTOR to searchsourceconnectortype only if not exists
op.execute(
f"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_enum
WHERE enumlabel = '{CONNECTOR_NEW_VALUE}'
AND enumtypid = (SELECT oid FROM pg_type WHERE typname = '{CONNECTOR_ENUM}')
) THEN
ALTER TYPE {CONNECTOR_ENUM} ADD VALUE '{CONNECTOR_NEW_VALUE}';
END IF;
END$$;
"""
)
"""Upgrade schema - add Composio connector types to connector and document enums safely."""
# Add each Composio connector type to searchsourceconnectortype only if not exists
for value in CONNECTOR_NEW_VALUES:
op.execute(
f"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_enum e
JOIN pg_type t ON e.enumtypid = t.oid
WHERE t.typname = '{CONNECTOR_ENUM}' AND e.enumlabel = '{value}'
) THEN
ALTER TYPE {CONNECTOR_ENUM} ADD VALUE '{value}';
END IF;
END$$;
"""
)
# Add COMPOSIO_CONNECTOR to documenttype only if not exists
op.execute(
f"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_enum
WHERE enumlabel = '{DOCUMENT_NEW_VALUE}'
AND enumtypid = (SELECT oid FROM pg_type WHERE typname = '{DOCUMENT_ENUM}')
) THEN
ALTER TYPE {DOCUMENT_ENUM} ADD VALUE '{DOCUMENT_NEW_VALUE}';
END IF;
END$$;
"""
)
# Add each Composio connector type to documenttype only if not exists
for value in DOCUMENT_NEW_VALUES:
op.execute(
f"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_enum e
JOIN pg_type t ON e.enumtypid = t.oid
WHERE t.typname = '{DOCUMENT_ENUM}' AND e.enumlabel = '{value}'
) THEN
ALTER TYPE {DOCUMENT_ENUM} ADD VALUE '{value}';
END IF;
END$$;
"""
)
def downgrade() -> None:
"""Downgrade schema - remove COMPOSIO_CONNECTOR from connector and document enums.
"""Downgrade schema - remove Composio connector types from connector and document enums.
Note: PostgreSQL does not support removing enum values directly.
To properly downgrade, you would need to:
1. Delete any rows using the COMPOSIO_CONNECTOR value
2. Create new enums without COMPOSIO_CONNECTOR
1. Delete any rows using the Composio connector type values
2. Create new enums without the Composio connector types
3. Alter the columns to use the new enums
4. Drop the old enums

View file

@ -54,7 +54,9 @@ class DocumentType(str, Enum):
BOOKSTACK_CONNECTOR = "BOOKSTACK_CONNECTOR"
CIRCLEBACK = "CIRCLEBACK"
NOTE = "NOTE"
COMPOSIO_CONNECTOR = "COMPOSIO_CONNECTOR" # Generic Composio integration
COMPOSIO_GOOGLE_DRIVE_CONNECTOR = "COMPOSIO_GOOGLE_DRIVE_CONNECTOR"
COMPOSIO_GMAIL_CONNECTOR = "COMPOSIO_GMAIL_CONNECTOR"
COMPOSIO_GOOGLE_CALENDAR_CONNECTOR = "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR"
class SearchSourceConnectorType(str, Enum):
@ -82,7 +84,9 @@ class SearchSourceConnectorType(str, Enum):
BOOKSTACK_CONNECTOR = "BOOKSTACK_CONNECTOR"
CIRCLEBACK_CONNECTOR = "CIRCLEBACK_CONNECTOR"
MCP_CONNECTOR = "MCP_CONNECTOR" # Model Context Protocol - User-defined API tools
COMPOSIO_CONNECTOR = "COMPOSIO_CONNECTOR" # Generic Composio integration (Google, Slack, etc.)
COMPOSIO_GOOGLE_DRIVE_CONNECTOR = "COMPOSIO_GOOGLE_DRIVE_CONNECTOR"
COMPOSIO_GMAIL_CONNECTOR = "COMPOSIO_GMAIL_CONNECTOR"
COMPOSIO_GOOGLE_CALENDAR_CONNECTOR = "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR"
class LiteLLMProvider(str, Enum):

View file

@ -19,6 +19,7 @@ from fastapi.responses import RedirectResponse
from pydantic import ValidationError
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from app.config import config
from app.db import (
@ -30,15 +31,17 @@ from app.db import (
from app.services.composio_service import (
COMPOSIO_TOOLKIT_NAMES,
INDEXABLE_TOOLKITS,
TOOLKIT_TO_CONNECTOR_TYPE,
ComposioService,
)
from app.users import current_active_user
from app.utils.connector_naming import (
check_duplicate_connector,
generate_unique_connector_name,
)
from app.utils.connector_naming import generate_unique_connector_name
from app.utils.oauth_security import OAuthStateManager
# Note: We no longer use check_duplicate_connector for Composio connectors because
# Composio generates a new connected_account_id each time, even for the same Google account.
# Instead, we check for existing connectors by type/space/user and update them.
logger = logging.getLogger(__name__)
router = APIRouter()
@ -260,30 +263,65 @@ async def composio_callback(
"is_indexable": toolkit_id in INDEXABLE_TOOLKITS,
}
# Check for duplicate connector
# For Composio, we use toolkit_id + connected_account_id as unique identifier
identifier = final_connected_account_id or f"{toolkit_id}_{user_id}"
is_duplicate = await check_duplicate_connector(
session,
SearchSourceConnectorType.COMPOSIO_CONNECTOR,
space_id,
user_id,
identifier,
)
if is_duplicate:
logger.warning(
f"Duplicate Composio connector detected for user {user_id} with toolkit {toolkit_id}"
# Get the specific connector type for this toolkit
connector_type_str = TOOLKIT_TO_CONNECTOR_TYPE.get(toolkit_id)
if not connector_type_str:
raise HTTPException(
status_code=400,
detail=f"Unknown toolkit: {toolkit_id}. Available: {list(TOOLKIT_TO_CONNECTOR_TYPE.keys())}",
)
connector_type = SearchSourceConnectorType(connector_type_str)
# Check for existing connector of the same type for this user/space
# When reconnecting, Composio gives a new connected_account_id, so we need to
# check by connector_type, user_id, and search_space_id instead of connected_account_id
existing_connector_result = await session.execute(
select(SearchSourceConnector).where(
SearchSourceConnector.connector_type == connector_type,
SearchSourceConnector.search_space_id == space_id,
SearchSourceConnector.user_id == user_id,
)
)
existing_connector = existing_connector_result.scalars().first()
if existing_connector:
# Delete the old Composio connected account before updating
old_connected_account_id = existing_connector.config.get("composio_connected_account_id")
if old_connected_account_id and old_connected_account_id != final_connected_account_id:
try:
deleted = await service.delete_connected_account(old_connected_account_id)
if deleted:
logger.info(
f"Deleted old Composio connected account {old_connected_account_id} "
f"before updating connector {existing_connector.id}"
)
else:
logger.warning(
f"Failed to delete old Composio connected account {old_connected_account_id}"
)
except Exception as delete_error:
# Log but don't fail - the old account may already be deleted
logger.warning(
f"Error deleting old Composio connected account {old_connected_account_id}: {delete_error!s}"
)
# Update existing connector with new connected_account_id
logger.info(
f"Updating existing Composio connector {existing_connector.id} with new connected_account_id {final_connected_account_id}"
)
existing_connector.config = connector_config
await session.commit()
await session.refresh(existing_connector)
return RedirectResponse(
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&error=duplicate_account&connector=composio-connector"
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/new-chat?modal=connectors&tab=all&success=true&connector=composio-connector&connectorId={existing_connector.id}"
)
try:
# Generate a unique, user-friendly connector name
connector_name = await generate_unique_connector_name(
session,
SearchSourceConnectorType.COMPOSIO_CONNECTOR,
connector_type,
space_id,
user_id,
f"{toolkit_name} (Composio)",
@ -291,7 +329,7 @@ async def composio_callback(
db_connector = SearchSourceConnector(
name=connector_name,
connector_type=SearchSourceConnectorType.COMPOSIO_CONNECTOR,
connector_type=connector_type,
config=connector_config,
search_space_id=space_id,
user_id=user_id,

View file

@ -37,6 +37,7 @@ from app.db import (
async_session_maker,
get_async_session,
)
from app.services.composio_service import ComposioService
from app.schemas import (
GoogleDriveIndexRequest,
MCPConnectorCreate,
@ -529,6 +530,34 @@ async def delete_search_source_connector(
f"Failed to delete periodic schedule for connector {connector_id}"
)
# For Composio connectors, also delete the connected account in Composio
composio_connector_types = [
SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR,
]
if db_connector.connector_type in composio_connector_types:
composio_connected_account_id = db_connector.config.get("composio_connected_account_id")
if composio_connected_account_id and ComposioService.is_enabled():
try:
service = ComposioService()
deleted = await service.delete_connected_account(composio_connected_account_id)
if deleted:
logger.info(
f"Successfully deleted Composio connected account {composio_connected_account_id} "
f"for connector {connector_id}"
)
else:
logger.warning(
f"Failed to delete Composio connected account {composio_connected_account_id} "
f"for connector {connector_id}"
)
except Exception as composio_error:
# Log but don't fail the deletion - Composio account may already be deleted
logger.warning(
f"Error deleting Composio connected account {composio_connected_account_id}: {composio_error!s}"
)
await session.delete(db_connector)
await session.commit()
return {"message": "Search source connector deleted successfully"}
@ -868,7 +897,11 @@ async def index_connector_content(
)
response_message = "Web page indexing started in the background."
elif connector.connector_type == SearchSourceConnectorType.COMPOSIO_CONNECTOR:
elif connector.connector_type in [
SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR,
]:
from app.tasks.celery_tasks.connector_tasks import (
index_composio_connector_task,
)
@ -2086,6 +2119,59 @@ async def run_bookstack_indexing(
)
async def run_composio_indexing_with_new_session(
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""
Create a new session and run the Composio indexing task.
This prevents session leaks by creating a dedicated session for the background task.
"""
async with async_session_maker() as session:
await run_composio_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date
)
async def run_composio_indexing(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
):
"""
Run Composio connector indexing with real-time notifications.
This wraps the Composio indexer with the notification system so that
Electric SQL can sync indexing progress to the frontend in real-time.
Args:
session: Database session
connector_id: ID of the Composio connector
search_space_id: ID of the search space
user_id: ID of the user
start_date: Start date for indexing
end_date: End date for indexing
"""
from app.tasks.composio_indexer import index_composio_connector
await _run_indexing_with_notifications(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
indexing_function=index_composio_connector,
update_timestamp_func=_update_connector_timestamp_by_id,
)
# =============================================================================
# MCP Connector Routes
# =============================================================================

View file

@ -39,6 +39,20 @@ COMPOSIO_TOOLKIT_NAMES = {
# Toolkits that support indexing (Phase 1: Google services only)
INDEXABLE_TOOLKITS = {"googledrive", "gmail", "googlecalendar"}
# Mapping of toolkit IDs to connector types
TOOLKIT_TO_CONNECTOR_TYPE = {
"googledrive": "COMPOSIO_GOOGLE_DRIVE_CONNECTOR",
"gmail": "COMPOSIO_GMAIL_CONNECTOR",
"googlecalendar": "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR",
}
# Mapping of toolkit IDs to document types
TOOLKIT_TO_DOCUMENT_TYPE = {
"googledrive": "COMPOSIO_GOOGLE_DRIVE_CONNECTOR",
"gmail": "COMPOSIO_GMAIL_CONNECTOR",
"googlecalendar": "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR",
}
class ComposioService:
"""Service for interacting with Composio API."""
@ -298,6 +312,26 @@ class ComposioService:
logger.error(f"Failed to list connections for user {user_id}: {e!s}")
return []
async def delete_connected_account(self, connected_account_id: str) -> bool:
"""
Delete a connected account from Composio.
This permanently removes the connected account and revokes access tokens.
Args:
connected_account_id: The Composio connected account ID to delete.
Returns:
True if deletion was successful, False otherwise.
"""
try:
self.client.connected_accounts.delete(connected_account_id)
logger.info(f"Successfully deleted Composio connected account: {connected_account_id}")
return True
except Exception as e:
logger.error(f"Failed to delete Composio connected account {connected_account_id}: {e!s}")
return False
async def execute_tool(
self,
connected_account_id: str,

View file

@ -793,11 +793,13 @@ async def _index_composio_connector(
start_date: str,
end_date: str,
):
"""Index Composio connector content with new session."""
# Import from tasks folder (not connector_indexers) to avoid circular import
from app.tasks.composio_indexer import index_composio_connector
"""Index Composio connector content with new session and real-time notifications."""
# Import from routes to use the notification-wrapped version
from app.routes.search_source_connectors_routes import (
run_composio_indexing,
)
async with get_celery_session_maker()() as session:
await index_composio_connector(
await run_composio_indexing(
session, connector_id, search_space_id, user_id, start_date, end_date
)

View file

@ -23,7 +23,7 @@ from app.db import (
SearchSourceConnector,
SearchSourceConnectorType,
)
from app.services.composio_service import INDEXABLE_TOOLKITS
from app.services.composio_service import INDEXABLE_TOOLKITS, TOOLKIT_TO_DOCUMENT_TYPE
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
@ -58,15 +58,13 @@ async def check_document_by_unique_identifier(
async def get_connector_by_id(
session: AsyncSession, connector_id: int, connector_type: SearchSourceConnectorType
session: AsyncSession, connector_id: int, connector_type: SearchSourceConnectorType | None
) -> SearchSourceConnector | None:
"""Get a connector by ID and type from the database."""
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == connector_id,
SearchSourceConnector.connector_type == connector_type,
)
)
"""Get a connector by ID and optionally by type from the database."""
query = select(SearchSourceConnector).filter(SearchSourceConnector.id == connector_id)
if connector_type is not None:
query = query.filter(SearchSourceConnector.connector_type == connector_type)
result = await session.execute(query)
return result.scalars().first()
@ -129,10 +127,23 @@ async def index_composio_connector(
)
try:
# Get connector by id
# Get connector by id - accept any Composio connector type
# We'll check the actual type after loading
connector = await get_connector_by_id(
session, connector_id, SearchSourceConnectorType.COMPOSIO_CONNECTOR
session, connector_id, None # Don't filter by type, we'll validate after
)
# Validate it's a Composio connector
if connector and connector.connector_type not in [
SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR,
]:
error_msg = f"Connector {connector_id} is not a Composio connector"
await task_logger.log_task_failure(
log_entry, error_msg, {"error_type": "InvalidConnectorType"}
)
return 0, error_msg
if not connector:
error_msg = f"Composio connector with ID {connector_id} not found"
@ -276,7 +287,7 @@ async def _index_composio_google_drive(
await task_logger.log_task_success(
log_entry, success_msg, {"files_count": 0}
)
return 0, success_msg
return 0, None # Return None (not error) when no items found - this is success with 0 items
logger.info(f"Found {len(all_files)} Google Drive files to index via Composio")
@ -299,8 +310,9 @@ async def _index_composio_google_drive(
continue
# Generate unique identifier hash
document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"])
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.COMPOSIO_CONNECTOR, f"drive_{file_id}", search_space_id
document_type, f"drive_{file_id}", search_space_id
)
# Check if document exists
@ -394,7 +406,7 @@ async def _index_composio_google_drive(
document = Document(
search_space_id=search_space_id,
title=f"Drive: {file_name}",
document_type=DocumentType.COMPOSIO_CONNECTOR,
document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]),
document_metadata={
"file_id": file_id,
"file_name": file_name,
@ -489,7 +501,7 @@ async def _index_composio_gmail(
await task_logger.log_task_success(
log_entry, success_msg, {"messages_count": 0}
)
return 0, success_msg
return 0, None # Return None (not error) when no items found - this is success with 0 items
logger.info(f"Found {len(messages)} Gmail messages to index via Composio")
@ -530,8 +542,9 @@ async def _index_composio_gmail(
markdown_content = composio_connector.format_gmail_message_to_markdown(message)
# Generate unique identifier
document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["gmail"])
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.COMPOSIO_CONNECTOR, f"gmail_{message_id}", search_space_id
document_type, f"gmail_{message_id}", search_space_id
)
content_hash = generate_content_hash(markdown_content, search_space_id)
@ -612,7 +625,7 @@ async def _index_composio_gmail(
document = Document(
search_space_id=search_space_id,
title=f"Gmail: {subject}",
document_type=DocumentType.COMPOSIO_CONNECTOR,
document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["gmail"]),
document_metadata={
"message_id": message_id,
"subject": subject,
@ -717,7 +730,7 @@ async def _index_composio_google_calendar(
await task_logger.log_task_success(
log_entry, success_msg, {"events_count": 0}
)
return 0, success_msg
return 0, None # Return None (not error) when no items found - this is success with 0 items
logger.info(f"Found {len(events)} Google Calendar events to index via Composio")
@ -738,8 +751,9 @@ async def _index_composio_google_calendar(
markdown_content = composio_connector.format_calendar_event_to_markdown(event)
# Generate unique identifier
document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googlecalendar"])
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.COMPOSIO_CONNECTOR, f"calendar_{event_id}", search_space_id
document_type, f"calendar_{event_id}", search_space_id
)
content_hash = generate_content_hash(markdown_content, search_space_id)
@ -828,7 +842,7 @@ async def _index_composio_google_calendar(
document = Document(
search_space_id=search_space_id,
title=f"Calendar: {summary}",
document_type=DocumentType.COMPOSIO_CONNECTOR,
document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googlecalendar"]),
document_metadata={
"event_id": event_id,
"summary": summary,