feat: enhance Composio integration with pagination and improved error handling

- Updated the list_gmail_messages method to support pagination with page tokens, allowing for more efficient message retrieval.
- Modified the return structure to include next_page_token and result_size_estimate for better client-side handling.
- Improved error handling and logging throughout the Gmail indexing process, ensuring better visibility into failures.
- Implemented batch processing for Gmail messages, committing changes incrementally to prevent data loss.
- Ensured consistent timestamp updates for connectors, even when no documents are indexed, to maintain accurate UI states.
- Refactored the indexing logic to streamline message processing and enhance overall performance.
This commit is contained in:
Anish Sarkar 2026-01-23 04:44:37 +05:30
parent 6139b07a66
commit 4cbf80d73a
4 changed files with 451 additions and 213 deletions

View file

@ -9,6 +9,7 @@ to avoid circular import issues with the connector_indexers package.
import logging
from datetime import UTC, datetime
from typing import Any
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
@ -26,6 +27,7 @@ from app.db import (
from app.services.composio_service import INDEXABLE_TOOLKITS, TOOLKIT_TO_DOCUMENT_TYPE
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.tasks.connector_indexers.base import calculate_date_range
from app.utils.document_converters import (
create_document_chunks,
generate_content_hash,
@ -75,7 +77,7 @@ async def update_connector_last_indexed(
) -> None:
"""Update the last_indexed_at timestamp for a connector."""
if update_last_indexed:
connector.last_indexed_at = datetime.now()
connector.last_indexed_at = datetime.now(UTC) # Use UTC for timezone consistency
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")
@ -287,6 +289,9 @@ async def _index_composio_google_drive(
await task_logger.log_task_success(
log_entry, success_msg, {"files_count": 0}
)
# CRITICAL: Update timestamp even when no files found so Electric SQL syncs and UI shows indexed status
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
return 0, None # Return None (not error) when no items found - this is success with 0 items
logger.info(f"Found {len(all_files)} Google Drive files to index via Composio")
@ -380,6 +385,13 @@ async def _index_composio_google_drive(
existing_document.updated_at = get_current_timestamp()
documents_indexed += 1
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Google Drive files processed so far"
)
await session.commit()
continue
# Create new document
@ -425,7 +437,11 @@ async def _index_composio_google_drive(
session.add(document)
documents_indexed += 1
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Google Drive files processed so far"
)
await session.commit()
except Exception as e:
@ -433,10 +449,19 @@ async def _index_composio_google_drive(
documents_skipped += 1
continue
if documents_indexed > 0:
await update_connector_last_indexed(session, connector, update_last_indexed)
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# This ensures the UI shows "Last indexed" instead of "Never indexed"
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit to ensure all documents are persisted (safety net)
# This matches the pattern used in non-Composio Gmail indexer
logger.info(
f"Final commit: Total {documents_indexed} Google Drive files processed"
)
await session.commit()
logger.info(
"Successfully committed all Composio Google Drive document changes to database"
)
await task_logger.log_task_success(
log_entry,
@ -454,154 +479,89 @@ async def _index_composio_google_drive(
return 0, f"Failed to index Google Drive via Composio: {e!s}"
async def _index_composio_gmail(
async def _process_gmail_message_batch(
session: AsyncSession,
connector,
messages: list[dict[str, Any]],
composio_connector: ComposioConnector,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str | None,
end_date: str | None,
task_logger: TaskLoggingService,
log_entry,
update_last_indexed: bool = True,
max_items: int = 1000,
) -> tuple[int, str]:
"""Index Gmail messages via Composio."""
try:
composio_connector = ComposioConnector(session, connector_id)
total_documents_indexed: int = 0,
) -> tuple[int, int]:
"""
Process a batch of Gmail messages and index them.
Args:
total_documents_indexed: Running total of documents indexed so far (for batch commits).
Returns:
Tuple of (documents_indexed, documents_skipped)
"""
documents_indexed = 0
documents_skipped = 0
await task_logger.log_task_progress(
log_entry,
f"Fetching Gmail messages via Composio for connector {connector_id}",
{"stage": "fetching_messages"},
)
for message in messages:
try:
# Composio uses 'messageId' (camelCase), not 'id'
message_id = message.get("messageId", "") or message.get("id", "")
if not message_id:
documents_skipped += 1
continue
# Build query with date range
query_parts = []
if start_date:
query_parts.append(f"after:{start_date.replace('-', '/')}")
if end_date:
query_parts.append(f"before:{end_date.replace('-', '/')}")
query = " ".join(query_parts)
# Composio's GMAIL_FETCH_EMAILS already returns full message content
# No need for a separate detail API call
messages, error = await composio_connector.list_gmail_messages(
query=query,
max_results=max_items,
)
# Extract message info from Composio response
# Composio structure: messageId, messageText, messageTimestamp, payload.headers, labelIds
payload = message.get("payload", {})
headers = payload.get("headers", [])
if error:
await task_logger.log_task_failure(
log_entry, f"Failed to fetch Gmail messages: {error}", {}
subject = "No Subject"
sender = "Unknown Sender"
date_str = message.get("messageTimestamp", "Unknown Date")
for header in headers:
name = header.get("name", "").lower()
value = header.get("value", "")
if name == "subject":
subject = value
elif name == "from":
sender = value
elif name == "date":
date_str = value
# Format to markdown using the full message data
markdown_content = composio_connector.format_gmail_message_to_markdown(message)
# Check for empty content (defensive parsing per Composio best practices)
if not markdown_content.strip():
logger.warning(f"Skipping Gmail message with no content: {subject}")
documents_skipped += 1
continue
# Generate unique identifier
document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["gmail"])
unique_identifier_hash = generate_unique_identifier_hash(
document_type, f"gmail_{message_id}", search_space_id
)
return 0, f"Failed to fetch Gmail messages: {error}"
if not messages:
success_msg = "No Gmail messages found in the specified date range"
await task_logger.log_task_success(
log_entry, success_msg, {"messages_count": 0}
content_hash = generate_content_hash(markdown_content, search_space_id)
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
return 0, None # Return None (not error) when no items found - this is success with 0 items
logger.info(f"Found {len(messages)} Gmail messages to index via Composio")
# Get label IDs from Composio response
label_ids = message.get("labelIds", [])
# Extract thread_id if available (for consistency with non-Composio implementation)
thread_id = message.get("threadId", "") or message.get("thread_id", "")
documents_indexed = 0
documents_skipped = 0
for message in messages:
try:
# Composio uses 'messageId' (camelCase), not 'id'
message_id = message.get("messageId", "") or message.get("id", "")
if not message_id:
if existing_document:
if existing_document.content_hash == content_hash:
documents_skipped += 1
continue
# Composio's GMAIL_FETCH_EMAILS already returns full message content
# No need for a separate detail API call
# Extract message info from Composio response
# Composio structure: messageId, messageText, messageTimestamp, payload.headers, labelIds
payload = message.get("payload", {})
headers = payload.get("headers", [])
subject = "No Subject"
sender = "Unknown Sender"
date_str = message.get("messageTimestamp", "Unknown Date")
for header in headers:
name = header.get("name", "").lower()
value = header.get("value", "")
if name == "subject":
subject = value
elif name == "from":
sender = value
elif name == "date":
date_str = value
# Format to markdown using the full message data
markdown_content = composio_connector.format_gmail_message_to_markdown(message)
# Generate unique identifier
document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["gmail"])
unique_identifier_hash = generate_unique_identifier_hash(
document_type, f"gmail_{message_id}", search_space_id
)
content_hash = generate_content_hash(markdown_content, search_space_id)
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
# Get label IDs from Composio response
label_ids = message.get("labelIds", [])
if existing_document:
if existing_document.content_hash == content_hash:
documents_skipped += 1
continue
# Update existing
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"message_id": message_id,
"subject": subject,
"sender": sender,
"document_type": "Gmail Message (Composio)",
}
summary_content, summary_embedding = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
else:
summary_content = f"Gmail: {subject}\n\nFrom: {sender}\nDate: {date_str}"
summary_embedding = config.embedding_model_instance.embed(summary_content)
chunks = await create_document_chunks(markdown_content)
existing_document.title = f"Gmail: {subject}"
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"message_id": message_id,
"subject": subject,
"sender": sender,
"date": date_str,
"labels": label_ids,
"connector_id": connector_id,
"source": "composio",
}
existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp()
documents_indexed += 1
continue
# Create new document
# Update existing
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
@ -609,6 +569,7 @@ async def _index_composio_gmail(
if user_llm:
document_metadata = {
"message_id": message_id,
"thread_id": thread_id,
"subject": subject,
"sender": sender,
"document_type": "Gmail Message (Composio)",
@ -622,53 +583,276 @@ async def _index_composio_gmail(
chunks = await create_document_chunks(markdown_content)
document = Document(
search_space_id=search_space_id,
title=f"Gmail: {subject}",
document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["gmail"]),
document_metadata={
"message_id": message_id,
"subject": subject,
"sender": sender,
"date": date_str,
"labels": label_ids,
"connector_id": connector_id,
"toolkit_id": "gmail",
"source": "composio",
},
content=summary_content,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
)
session.add(document)
existing_document.title = f"Gmail: {subject}"
existing_document.content = summary_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.document_metadata = {
"message_id": message_id,
"thread_id": thread_id,
"subject": subject,
"sender": sender,
"date": date_str,
"labels": label_ids,
"connector_id": connector_id,
"source": "composio",
}
existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp()
documents_indexed += 1
if documents_indexed % 10 == 0:
# Batch commit every 10 documents
current_total = total_documents_indexed + documents_indexed
if current_total % 10 == 0:
logger.info(
f"Committing batch: {current_total} Gmail messages processed so far"
)
await session.commit()
except Exception as e:
logger.error(f"Error processing Gmail message: {e!s}", exc_info=True)
documents_skipped += 1
continue
if documents_indexed > 0:
await update_connector_last_indexed(session, connector, update_last_indexed)
# Create new document
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
if user_llm:
document_metadata = {
"message_id": message_id,
"thread_id": thread_id,
"subject": subject,
"sender": sender,
"document_type": "Gmail Message (Composio)",
}
summary_content, summary_embedding = await generate_document_summary(
markdown_content, user_llm, document_metadata
)
else:
summary_content = f"Gmail: {subject}\n\nFrom: {sender}\nDate: {date_str}"
summary_embedding = config.embedding_model_instance.embed(summary_content)
chunks = await create_document_chunks(markdown_content)
document = Document(
search_space_id=search_space_id,
title=f"Gmail: {subject}",
document_type=DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["gmail"]),
document_metadata={
"message_id": message_id,
"thread_id": thread_id,
"subject": subject,
"sender": sender,
"date": date_str,
"labels": label_ids,
"connector_id": connector_id,
"toolkit_id": "gmail",
"source": "composio",
},
content=summary_content,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
embedding=summary_embedding,
chunks=chunks,
updated_at=get_current_timestamp(),
)
session.add(document)
documents_indexed += 1
# Batch commit every 10 documents
current_total = total_documents_indexed + documents_indexed
if current_total % 10 == 0:
logger.info(
f"Committing batch: {current_total} Gmail messages processed so far"
)
await session.commit()
except Exception as e:
logger.error(f"Error processing Gmail message: {e!s}", exc_info=True)
documents_skipped += 1
# Rollback on error to avoid partial state (per Composio best practices)
try:
await session.rollback()
except Exception as rollback_error:
logger.error(f"Error during rollback: {rollback_error!s}", exc_info=True)
continue
return documents_indexed, documents_skipped
async def _index_composio_gmail(
session: AsyncSession,
connector,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str | None,
end_date: str | None,
task_logger: TaskLoggingService,
log_entry,
update_last_indexed: bool = True,
max_items: int = 1000,
) -> tuple[int, str]:
"""Index Gmail messages via Composio with pagination and incremental processing."""
try:
composio_connector = ComposioConnector(session, connector_id)
# Normalize date values - handle "undefined" strings from frontend
if start_date == "undefined" or start_date == "":
start_date = None
if end_date == "undefined" or end_date == "":
end_date = None
# Calculate date range with defaults (uses last_indexed_at or 365 days back)
# This ensures indexing works even when user doesn't specify dates
start_date_str, end_date_str = calculate_date_range(
connector, start_date, end_date, default_days_back=365
)
# Build query with date range
query_parts = []
if start_date_str:
query_parts.append(f"after:{start_date_str.replace('-', '/')}")
if end_date_str:
query_parts.append(f"before:{end_date_str.replace('-', '/')}")
query = " ".join(query_parts) if query_parts else ""
logger.info(
f"Gmail query for connector {connector_id}: '{query}' "
f"(start_date={start_date_str}, end_date={end_date_str})"
)
# Use smaller batch size to avoid 413 payload too large errors
batch_size = 50
page_token = None
total_documents_indexed = 0
total_documents_skipped = 0
total_messages_fetched = 0
result_size_estimate = None # Will be set from first API response
while total_messages_fetched < max_items:
# Calculate how many messages to fetch in this batch
remaining = max_items - total_messages_fetched
current_batch_size = min(batch_size, remaining)
# Use result_size_estimate if available, otherwise fall back to max_items
estimated_total = result_size_estimate if result_size_estimate is not None else max_items
# Cap estimated_total at max_items to avoid showing misleading progress
estimated_total = min(estimated_total, max_items)
await task_logger.log_task_progress(
log_entry,
f"Fetching Gmail messages batch via Composio for connector {connector_id} "
f"({total_messages_fetched}/{estimated_total} fetched, {total_documents_indexed} indexed)",
{
"stage": "fetching_messages",
"batch_size": current_batch_size,
"total_fetched": total_messages_fetched,
"total_indexed": total_documents_indexed,
"estimated_total": estimated_total,
},
)
# Fetch batch of messages
messages, next_token, result_size_estimate_batch, error = await composio_connector.list_gmail_messages(
query=query,
max_results=current_batch_size,
page_token=page_token,
)
if error:
await task_logger.log_task_failure(
log_entry, f"Failed to fetch Gmail messages: {error}", {}
)
return 0, f"Failed to fetch Gmail messages: {error}"
if not messages:
# No more messages available
break
# Update result_size_estimate from first response (Gmail provides this estimate)
if result_size_estimate is None and result_size_estimate_batch is not None:
result_size_estimate = result_size_estimate_batch
logger.info(f"Gmail API estimated {result_size_estimate} total messages for query: '{query}'")
total_messages_fetched += len(messages)
# Recalculate estimated_total after potentially updating result_size_estimate
estimated_total = result_size_estimate if result_size_estimate is not None else max_items
estimated_total = min(estimated_total, max_items)
logger.info(
f"Fetched batch of {len(messages)} Gmail messages "
f"(total: {total_messages_fetched}/{estimated_total})"
)
# Process batch incrementally
batch_indexed, batch_skipped = await _process_gmail_message_batch(
session=session,
messages=messages,
composio_connector=composio_connector,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
total_documents_indexed=total_documents_indexed,
)
total_documents_indexed += batch_indexed
total_documents_skipped += batch_skipped
logger.info(
f"Processed batch: {batch_indexed} indexed, {batch_skipped} skipped "
f"(total: {total_documents_indexed} indexed, {total_documents_skipped} skipped)"
)
# Batch commits happen in _process_gmail_message_batch every 10 documents
# This ensures progress is saved incrementally, preventing data loss on crashes
# Check if we should continue
if not next_token:
# No more pages available
break
if len(messages) < current_batch_size:
# Last page had fewer items than requested, we're done
break
# Continue with next page
page_token = next_token
if total_messages_fetched == 0:
success_msg = "No Gmail messages found in the specified date range"
await task_logger.log_task_success(
log_entry, success_msg, {"messages_count": 0}
)
# CRITICAL: Update timestamp even when no messages found so Electric SQL syncs and UI shows indexed status
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
return 0, None # Return None (not error) when no items found
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# This ensures the UI shows "Last indexed" instead of "Never indexed"
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit to ensure all documents are persisted (safety net)
# This matches the pattern used in non-Composio Gmail indexer
logger.info(
f"Final commit: Total {total_documents_indexed} Gmail messages processed"
)
await session.commit()
logger.info(
"Successfully committed all Composio Gmail document changes to database"
)
await task_logger.log_task_success(
log_entry,
f"Successfully completed Gmail indexing via Composio for connector {connector_id}",
{
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"documents_indexed": total_documents_indexed,
"documents_skipped": total_documents_skipped,
"messages_fetched": total_messages_fetched,
},
)
return documents_indexed, None
return total_documents_indexed, None
except Exception as e:
logger.error(f"Failed to index Gmail via Composio: {e!s}", exc_info=True)
@ -689,8 +873,6 @@ async def _index_composio_google_calendar(
max_items: int = 2500,
) -> tuple[int, str]:
"""Index Google Calendar events via Composio."""
from datetime import datetime, timedelta
try:
composio_connector = ComposioConnector(session, connector_id)
@ -700,18 +882,26 @@ async def _index_composio_google_calendar(
{"stage": "fetching_events"},
)
# Build time range
if start_date:
time_min = f"{start_date}T00:00:00Z"
else:
# Default to 365 days ago
default_start = datetime.now() - timedelta(days=365)
time_min = default_start.strftime("%Y-%m-%dT00:00:00Z")
# Normalize date values - handle "undefined" strings from frontend
if start_date == "undefined" or start_date == "":
start_date = None
if end_date == "undefined" or end_date == "":
end_date = None
if end_date:
time_max = f"{end_date}T23:59:59Z"
else:
time_max = datetime.now().strftime("%Y-%m-%dT23:59:59Z")
# Calculate date range with defaults (uses last_indexed_at or 365 days back)
# This ensures indexing works even when user doesn't specify dates
start_date_str, end_date_str = calculate_date_range(
connector, start_date, end_date, default_days_back=365
)
# Build time range for API call
time_min = f"{start_date_str}T00:00:00Z"
time_max = f"{end_date_str}T23:59:59Z"
logger.info(
f"Google Calendar query for connector {connector_id}: "
f"(start_date={start_date_str}, end_date={end_date_str})"
)
events, error = await composio_connector.list_calendar_events(
time_min=time_min,
@ -730,6 +920,9 @@ async def _index_composio_google_calendar(
await task_logger.log_task_success(
log_entry, success_msg, {"events_count": 0}
)
# CRITICAL: Update timestamp even when no events found so Electric SQL syncs and UI shows indexed status
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
return 0, None # Return None (not error) when no items found - this is success with 0 items
logger.info(f"Found {len(events)} Google Calendar events to index via Composio")
@ -814,6 +1007,13 @@ async def _index_composio_google_calendar(
existing_document.updated_at = get_current_timestamp()
documents_indexed += 1
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Google Calendar events processed so far"
)
await session.commit()
continue
# Create new document
@ -863,7 +1063,11 @@ async def _index_composio_google_calendar(
session.add(document)
documents_indexed += 1
# Batch commit every 10 documents
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Google Calendar events processed so far"
)
await session.commit()
except Exception as e:
@ -871,10 +1075,19 @@ async def _index_composio_google_calendar(
documents_skipped += 1
continue
if documents_indexed > 0:
await update_connector_last_indexed(session, connector, update_last_indexed)
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
# This ensures the UI shows "Last indexed" instead of "Never indexed"
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit to ensure all documents are persisted (safety net)
# This matches the pattern used in non-Composio Gmail indexer
logger.info(
f"Final commit: Total {documents_indexed} Google Calendar events processed"
)
await session.commit()
logger.info(
"Successfully committed all Composio Google Calendar document changes to database"
)
await task_logger.log_task_success(
log_entry,